Skip to content

Commit ee0b9df

Browse files
committed
Use group_instance_id in range and roundrobin assignors
1 parent 625d76c commit ee0b9df

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

kafka/coordinator/assignors/range.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import absolute_import
22

33
import collections
4+
import itertools
45
import logging
56

67
from kafka.vendor import six
@@ -34,31 +35,35 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
3435
@classmethod
3536
def assign(cls, cluster, group_subscriptions):
3637
consumers_per_topic = collections.defaultdict(list)
37-
for member, subscription in six.iteritems(group_subscriptions):
38+
for member_id, subscription in six.iteritems(group_subscriptions):
3839
for topic in subscription.topics:
39-
consumers_per_topic[topic].append(member)
40+
consumers_per_topic[topic].append((subscription.group_instance_id, member_id))
4041

4142
# construct {member_id: {topic: [partition, ...]}}
4243
assignment = collections.defaultdict(dict)
4344

45+
for topic in consumers_per_topic:
46+
# group by static members (True) v dynamic members (False)
47+
grouped = {k: list(g) for k, g in itertools.groupby(consumers_per_topic[topic], key=lambda ids: ids[0] is not None)}
48+
consumers_per_topic[topic] = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
49+
4450
for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
4551
partitions = cluster.partitions_for_topic(topic)
4652
if partitions is None:
4753
log.warning('No partition metadata for topic %s', topic)
4854
continue
4955
partitions = sorted(partitions)
50-
consumers_for_topic.sort()
5156

5257
partitions_per_consumer = len(partitions) // len(consumers_for_topic)
5358
consumers_with_extra = len(partitions) % len(consumers_for_topic)
5459

55-
for i, member in enumerate(consumers_for_topic):
60+
for i, (_group_instance_id, member_id) in enumerate(consumers_for_topic):
5661
start = partitions_per_consumer * i
5762
start += min(i, consumers_with_extra)
5863
length = partitions_per_consumer
5964
if not i + 1 > consumers_with_extra:
6065
length += 1
61-
assignment[member][topic] = partitions[start:start+length]
66+
assignment[member_id][topic] = partitions[start:start+length]
6267

6368
protocol_assignment = {}
6469
for member_id in group_subscriptions:

kafka/coordinator/assignors/roundrobin.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,14 @@ def assign(cls, cluster, group_subscriptions):
6767
# construct {member_id: {topic: [partition, ...]}}
6868
assignment = collections.defaultdict(lambda: collections.defaultdict(list))
6969

70-
member_iter = itertools.cycle(sorted(group_subscriptions.keys()))
70+
# Sort static and dynamic members separately to maintain stable static assignments
71+
ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in six.iteritems(group_subscriptions)]
72+
grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)}
73+
member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
74+
member_iter = itertools.cycle(member_list)
75+
7176
for partition in all_topic_partitions:
72-
member_id = next(member_iter)
77+
_group_instance_id, member_id = next(member_iter)
7378

7479
# Because we constructed all_topic_partitions from the set of
7580
# member subscribed topics, we should be safe assuming that

0 commit comments

Comments
 (0)