Skip to content

Commit 8628d74

Browse files
authored
KAFKA-19661 [6/N]: Use heaps also on the process-level (#20523)
In the current solution, we only use a heap to select the right process, but resort to linear search for selecting a member within a process. This means use cases where a lot of threads run within the same process can yield slow assignment. The number of threads in a process shouldn’t scale arbitrarily (our assumed case for benchmarking of 50 threads in a single process seems quite extreme already), however, we can optimize for this case to reduce the runtime further. Other assignment algorithms assign directly on the member-level, but we cannot do this in Kafka Streams, since we cannot assign tasks to processes that already own the task. Defining a heap directly on members would mean that we may have to skip through 10s of member before finding one that does not belong to a process that does not yet own the member. Instead, we can define a separate heap for each process, which keeps the members of the process by load. We can only keep the heap as long as we are only changing the load of the top-most member (which we usually do). This means we keep track of a lot of heaps, but since heaps are backed by arrays in Java, this should not result in extreme memory inefficiencies. In our worst-performing benchmark, this improves the runtime by ~2x on top of the optimization above. Also piggybacked are some minor optimizations / clean-ups: - initialize HashMaps and ArrayLists with the right capacity - fix some comments - improve logging output Note that this is a pure performance change, so there are no changes to the unit tests. Reviewers: Bill Bejeck<[email protected]>
1 parent 749c2d9 commit 8628d74

File tree

4 files changed

+100
-49
lines changed

4 files changed

+100
-49
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/AssignmentMemberSpec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
*
2828
* @param instanceId The instance ID if provided.
2929
* @param rackId The rack ID if provided.
30-
* @param activeTasks Reconciled active tasks
31-
* @param standbyTasks Reconciled standby tasks
32-
* @param warmupTasks Reconciled warm-up tasks
30+
* @param activeTasks Current target active tasks
31+
* @param standbyTasks Current target standby tasks
32+
* @param warmupTasks Current target warm-up tasks
3333
* @param processId The process ID.
3434
* @param clientTags The client tags for a rack-aware assignment.
3535
* @param taskOffsets The last received cumulative task offsets of assigned tasks or dormant tasks.

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/ProcessState.java

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*/
1717
package org.apache.kafka.coordinator.group.streams.assignor;
1818

19+
import java.util.AbstractMap;
1920
import java.util.HashMap;
2021
import java.util.HashSet;
2122
import java.util.Map;
23+
import java.util.PriorityQueue;
2224
import java.util.Set;
2325
import java.util.stream.Collectors;
2426

@@ -36,6 +38,7 @@ public class ProcessState {
3638
private final Map<String, Set<TaskId>> assignedActiveTasks;
3739
private final Map<String, Set<TaskId>> assignedStandbyTasks;
3840
private final Set<TaskId> assignedTasks;
41+
private PriorityQueue<Map.Entry<String, Integer>> membersByLoad;
3942

4043
ProcessState(final String processId) {
4144
this.processId = processId;
@@ -45,9 +48,9 @@ public class ProcessState {
4548
this.assignedActiveTasks = new HashMap<>();
4649
this.assignedStandbyTasks = new HashMap<>();
4750
this.memberToTaskCounts = new HashMap<>();
51+
this.membersByLoad = null;
4852
}
4953

50-
5154
public String processId() {
5255
return processId;
5356
}
@@ -84,7 +87,26 @@ public Map<String, Set<TaskId>> assignedStandbyTasksByMember() {
8487
return assignedStandbyTasks;
8588
}
8689

87-
public void addTask(final String memberId, final TaskId taskId, final boolean isActive) {
90+
/**
91+
* Assigns a task to a member of this process.
92+
*
93+
* @param memberId The member to assign to.
94+
* @param taskId The task to assign.
95+
* @param isActive Whether the task is an active task (true) or a standby task (false).
96+
* @return the number of tasks that `memberId` has assigned after adding the new task.
97+
*/
98+
public int addTask(final String memberId, final TaskId taskId, final boolean isActive) {
99+
int newTaskCount = addTaskInternal(memberId, taskId, isActive);
100+
// We cannot efficiently add a task to a specific member and keep the memberByLoad ordered correctly.
101+
// So we just drop the heap here.
102+
//
103+
// The order in which addTask and addTaskToLeastLoadedMember is called ensures that the heaps are built at most
104+
// twice (once for active, once for standby)
105+
membersByLoad = null;
106+
return newTaskCount;
107+
}
108+
109+
private int addTaskInternal(final String memberId, final TaskId taskId, final boolean isActive) {
88110
taskCount += 1;
89111
assignedTasks.add(taskId);
90112
if (isActive) {
@@ -94,8 +116,46 @@ public void addTask(final String memberId, final TaskId taskId, final boolean is
94116
assignedStandbyTasks.putIfAbsent(memberId, new HashSet<>());
95117
assignedStandbyTasks.get(memberId).add(taskId);
96118
}
97-
memberToTaskCounts.put(memberId, memberToTaskCounts.get(memberId) + 1);
119+
int newTaskCount = memberToTaskCounts.get(memberId) + 1;
120+
memberToTaskCounts.put(memberId, newTaskCount);
98121
computeLoad();
122+
return newTaskCount;
123+
}
124+
125+
/**
126+
* Assigns a task to the least loaded member of this process
127+
*
128+
* @param taskId The task to assign.
129+
* @param isActive Whether the task is an active task (true) or a standby task (false).
130+
* @return the number of tasks that `memberId` has assigned after adding the new task, or -1 if the
131+
* task was not assigned to any member.
132+
*/
133+
public int addTaskToLeastLoadedMember(final TaskId taskId, final boolean isActive) {
134+
if (memberToTaskCounts.isEmpty()) {
135+
return -1;
136+
}
137+
if (memberToTaskCounts.size() == 1) {
138+
return addTaskInternal(memberToTaskCounts.keySet().iterator().next(), taskId, isActive);
139+
}
140+
if (membersByLoad == null) {
141+
membersByLoad = new PriorityQueue<>(
142+
memberToTaskCounts.size(),
143+
Map.Entry.comparingByValue()
144+
);
145+
for (Map.Entry<String, Integer> entry : memberToTaskCounts.entrySet()) {
146+
// Copy here, since map entry objects are allowed to be reused by the underlying map implementation.
147+
membersByLoad.add(new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));
148+
}
149+
}
150+
Map.Entry<String, Integer> member = membersByLoad.poll();
151+
if (member != null) {
152+
int newTaskCount = addTaskInternal(member.getKey(), taskId, isActive);
153+
member.setValue(newTaskCount);
154+
membersByLoad.add(member); // Reinsert the updated member back into the priority queue
155+
return newTaskCount;
156+
} else {
157+
throw new TaskAssignorException("No members available to assign task " + taskId);
158+
}
99159
}
100160

101161
private void incrementCapacity() {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java

Lines changed: 29 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.Iterator;
2828
import java.util.LinkedList;
2929
import java.util.Map;
30-
import java.util.Optional;
3130
import java.util.PriorityQueue;
3231
import java.util.Set;
3332
import java.util.stream.Collectors;
@@ -97,11 +96,11 @@ private void initialize(final GroupSpec groupSpec, final TopologyDescriber topol
9796
localState.totalMembersWithActiveTaskCapacity = groupSpec.members().size();
9897
localState.totalMembersWithTaskCapacity = groupSpec.members().size();
9998
localState.activeTasksPerMember = computeTasksPerMember(localState.totalActiveTasks, localState.totalMembersWithActiveTaskCapacity);
100-
localState.tasksPerMember = computeTasksPerMember(localState.totalTasks, localState.totalMembersWithTaskCapacity);
99+
localState.totalTasksPerMember = computeTasksPerMember(localState.totalTasks, localState.totalMembersWithTaskCapacity);
101100

102-
localState.processIdToState = new HashMap<>();
103-
localState.activeTaskToPrevMember = new HashMap<>();
104-
localState.standbyTaskToPrevMember = new HashMap<>();
101+
localState.processIdToState = new HashMap<>(localState.totalMembersWithActiveTaskCapacity);
102+
localState.activeTaskToPrevMember = new HashMap<>(localState.totalActiveTasks);
103+
localState.standbyTaskToPrevMember = new HashMap<>(localState.numStandbyReplicas > 0 ? (localState.totalTasks - localState.totalActiveTasks) / localState.numStandbyReplicas : 0);
105104
for (final Map.Entry<String, AssignmentMemberSpec> memberEntry : groupSpec.members().entrySet()) {
106105
final String memberId = memberEntry.getKey();
107106
final String processId = memberEntry.getValue().processId();
@@ -124,7 +123,7 @@ private void initialize(final GroupSpec groupSpec, final TopologyDescriber topol
124123
final Set<Integer> partitionNoSet = entry.getValue();
125124
for (final int partitionNo : partitionNoSet) {
126125
final TaskId taskId = new TaskId(entry.getKey(), partitionNo);
127-
localState.standbyTaskToPrevMember.putIfAbsent(taskId, new ArrayList<>());
126+
localState.standbyTaskToPrevMember.putIfAbsent(taskId, new ArrayList<>(localState.numStandbyReplicas));
128127
localState.standbyTaskToPrevMember.get(taskId).add(member);
129128
}
130129
}
@@ -185,8 +184,9 @@ private void assignActive(final LinkedList<TaskId> activeTasks) {
185184
if (prevMember != null) {
186185
final ProcessState processState = localState.processIdToState.get(prevMember.processId);
187186
if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
188-
processState.addTask(prevMember.memberId, task, true);
189-
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
187+
int newActiveTasks = processState.addTask(prevMember.memberId, task, true);
188+
maybeUpdateActiveTasksPerMember(newActiveTasks);
189+
maybeUpdateTotalTasksPerMember(newActiveTasks);
190190
it.remove();
191191
}
192192
}
@@ -200,8 +200,9 @@ private void assignActive(final LinkedList<TaskId> activeTasks) {
200200
if (prevMember != null) {
201201
final ProcessState processState = localState.processIdToState.get(prevMember.processId);
202202
if (hasUnfulfilledActiveTaskQuota(processState, prevMember)) {
203-
processState.addTask(prevMember.memberId, task, true);
204-
maybeUpdateActiveTasksPerMember(processState.memberToTaskCounts().get(prevMember.memberId));
203+
int newActiveTasks = processState.addTask(prevMember.memberId, task, true);
204+
maybeUpdateActiveTasksPerMember(newActiveTasks);
205+
maybeUpdateTotalTasksPerMember(newActiveTasks);
205206
it.remove();
206207
}
207208
}
@@ -213,19 +214,18 @@ private void assignActive(final LinkedList<TaskId> activeTasks) {
213214
// 3. assign any remaining unassigned tasks
214215
final PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
215216
processByLoad.addAll(localState.processIdToState.values());
216-
for (final Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
217-
final TaskId task = it.next();
217+
for (final TaskId task: activeTasks) {
218218
final ProcessState processWithLeastLoad = processByLoad.poll();
219219
if (processWithLeastLoad == null) {
220220
throw new TaskAssignorException(String.format("No process available to assign active task %s.", task));
221221
}
222-
final String member = memberWithLeastLoad(processWithLeastLoad);
223-
if (member == null) {
222+
final int newTaskCount = processWithLeastLoad.addTaskToLeastLoadedMember(task, true);
223+
if (newTaskCount != -1) {
224+
maybeUpdateActiveTasksPerMember(newTaskCount);
225+
maybeUpdateTotalTasksPerMember(newTaskCount);
226+
} else {
224227
throw new TaskAssignorException(String.format("No member available to assign active task %s.", task));
225228
}
226-
processWithLeastLoad.addTask(member, task, true);
227-
it.remove();
228-
maybeUpdateActiveTasksPerMember(processWithLeastLoad.memberToTaskCounts().get(member));
229229
processByLoad.add(processWithLeastLoad); // Add it back to the queue after updating its state
230230
}
231231
}
@@ -238,11 +238,11 @@ private void maybeUpdateActiveTasksPerMember(final int activeTasksNo) {
238238
}
239239
}
240240

241-
private void maybeUpdateTasksPerMember(final int taskNo) {
242-
if (taskNo == localState.tasksPerMember) {
241+
private void maybeUpdateTotalTasksPerMember(final int taskNo) {
242+
if (taskNo == localState.totalTasksPerMember) {
243243
localState.totalMembersWithTaskCapacity--;
244244
localState.totalTasks -= taskNo;
245-
localState.tasksPerMember = computeTasksPerMember(localState.totalTasks, localState.totalMembersWithTaskCapacity);
245+
localState.totalTasksPerMember = computeTasksPerMember(localState.totalTasks, localState.totalMembersWithTaskCapacity);
246246
}
247247
}
248248

@@ -253,10 +253,10 @@ private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue<ProcessState> q
253253
}
254254
boolean found = false;
255255
if (!processWithLeastLoad.hasTask(taskId)) {
256-
final String memberId = memberWithLeastLoad(processWithLeastLoad);
257-
if (memberId != null) {
258-
processWithLeastLoad.addTask(memberId, taskId, false);
256+
final int newTaskCount = processWithLeastLoad.addTaskToLeastLoadedMember(taskId, false);
257+
if (newTaskCount != -1) {
259258
found = true;
259+
maybeUpdateTotalTasksPerMember(newTaskCount);
260260
}
261261
} else if (!queue.isEmpty()) {
262262
found = assignStandbyToMemberWithLeastLoad(queue, taskId);
@@ -303,26 +303,12 @@ private Member findPrevMemberWithLeastLoad(final ArrayList<Member> members, fina
303303
return null;
304304
}
305305

306-
private String memberWithLeastLoad(final ProcessState processWithLeastLoad) {
307-
final Map<String, Integer> members = processWithLeastLoad.memberToTaskCounts();
308-
if (members.isEmpty()) {
309-
return null;
310-
}
311-
if (members.size() == 1) {
312-
return members.keySet().iterator().next();
313-
}
314-
final Optional<String> memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream()
315-
.min(Map.Entry.comparingByValue())
316-
.map(Map.Entry::getKey);
317-
return memberWithLeastLoad.orElse(null);
318-
}
319-
320306
private boolean hasUnfulfilledActiveTaskQuota(final ProcessState process, final Member member) {
321307
return process.memberToTaskCounts().get(member.memberId) < localState.activeTasksPerMember;
322308
}
323309

324310
private boolean hasUnfulfilledTaskQuota(final ProcessState process, final Member member) {
325-
return process.memberToTaskCounts().get(member.memberId) < localState.tasksPerMember;
311+
return process.memberToTaskCounts().get(member.memberId) < localState.totalTasksPerMember;
326312
}
327313

328314
private void assignStandby(final LinkedList<TaskId> standbyTasks) {
@@ -339,8 +325,8 @@ private void assignStandby(final LinkedList<TaskId> standbyTasks) {
339325
if (prevActiveMember != null) {
340326
final ProcessState prevActiveMemberProcessState = localState.processIdToState.get(prevActiveMember.processId);
341327
if (!prevActiveMemberProcessState.hasTask(task) && hasUnfulfilledTaskQuota(prevActiveMemberProcessState, prevActiveMember)) {
342-
prevActiveMemberProcessState.addTask(prevActiveMember.memberId, task, false);
343-
maybeUpdateTasksPerMember(prevActiveMemberProcessState.memberToTaskCounts().get(prevActiveMember.memberId));
328+
int newTaskCount = prevActiveMemberProcessState.addTask(prevActiveMember.memberId, task, false);
329+
maybeUpdateTotalTasksPerMember(newTaskCount);
344330
continue;
345331
}
346332
}
@@ -352,8 +338,8 @@ private void assignStandby(final LinkedList<TaskId> standbyTasks) {
352338
if (prevStandbyMember != null) {
353339
final ProcessState prevStandbyMemberProcessState = localState.processIdToState.get(prevStandbyMember.processId);
354340
if (hasUnfulfilledTaskQuota(prevStandbyMemberProcessState, prevStandbyMember)) {
355-
prevStandbyMemberProcessState.addTask(prevStandbyMember.memberId, task, false);
356-
maybeUpdateTasksPerMember(prevStandbyMemberProcessState.memberToTaskCounts().get(prevStandbyMember.memberId));
341+
int newTaskCount = prevStandbyMemberProcessState.addTask(prevStandbyMember.memberId, task, false);
342+
maybeUpdateTotalTasksPerMember(newTaskCount);
357343
continue;
358344
}
359345
}
@@ -430,6 +416,6 @@ private static class LocalState {
430416
int totalMembersWithActiveTaskCapacity;
431417
int totalMembersWithTaskCapacity;
432418
int activeTasksPerMember;
433-
int tasksPerMember;
419+
int totalTasksPerMember;
434420
}
435421
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/TaskId.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,9 @@ public int compareTo(final TaskId other) {
3838
.compare(this, other);
3939
}
4040

41+
@Override
42+
public String toString() {
43+
return subtopologyId + '_' + partition;
44+
}
45+
4146
}

0 commit comments

Comments
 (0)