Skip to content
8 changes: 4 additions & 4 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.client_async import KafkaClient, selectors
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError,
Expand Down Expand Up @@ -1242,7 +1242,7 @@ def _describe_consumer_groups_process_response(self, response):
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type)
if isinstance(group_information_field, Array):
member_information_list = []
member_schema = group_information_field.array_of
Expand All @@ -1251,9 +1251,9 @@ def _describe_consumer_groups_process_response(self, response):
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
if protocol_type_is_consumer:
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member))
else:
member_information.append(member)
member_info_tuple = MemberInformation._make(member_information)
Expand Down
24 changes: 24 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import copy
import logging
import re
import socket
import time

Expand Down Expand Up @@ -57,6 +58,14 @@ class KafkaConsumer(six.Iterator):
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str): A unique identifier of the consumer instance
provided by end user. Only non-empty strings are permitted. If set,
the consumer is treated as a static member, which means that only
one instance with this ID is allowed in the consumer group at any
time. This can be used in combination with a larger session timeout
to avoid group rebalances caused by transient unavailability (e.g.
process restarts). If not set, the consumer will join the group as
a dynamic member, which is the traditional behavior. Default: None
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable): Any callable that takes a
Expand Down Expand Up @@ -276,6 +285,7 @@ class KafkaConsumer(six.Iterator):
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'group_id': None,
'group_instance_id': None,
'key_deserializer': None,
'value_deserializer': None,
'enable_incremental_fetch_sessions': True,
Expand Down Expand Up @@ -408,6 +418,10 @@ def __init__(self, *topics, **configs):
"Request timeout (%s) must be larger than session timeout (%s)" %
(self.config['request_timeout_ms'], self.config['session_timeout_ms']))

if self.config['group_instance_id'] is not None:
if self.config['group_id'] is None:
raise KafkaConfigurationError("group_instance_id requires group_id")

self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, metrics=self._metrics, **self.config)
Expand All @@ -423,6 +437,16 @@ def __init__(self, *topics, **configs):
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)

def _validate_group_instance_id(self, group_instance_id):
if not group_instance_id or not isinstance(group_instance_id, str):
raise KafkaConfigurationError("group_instance_id must be non-empty string")
if group_instance_id in (".", ".."):
raise KafkaConfigurationError("group_instance_id cannot be \".\" or \"..\"")
if len(group_instance_id) > 249:
raise KafkaConfigurationError("group_instance_id can't be longer than 249 characters")
if not re.match(r'^[A-Za-z0-9\.\_\-]+$', group_instance_id):
raise KafkaConfigurationError("group_instance_id is illegal: it contains a character other than ASCII alphanumerics, '.', '_' and '-'")

def bootstrap_connected(self):
"""Return True if the bootstrap is connected."""
return self._client.bootstrap_connected()
Expand Down
5 changes: 3 additions & 2 deletions kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ def assign(self, cluster, members):

Arguments:
cluster (ClusterMetadata): metadata for use in assignment
members (dict of {member_id: MemberMetadata}): decoded metadata for
each member in the group.
members (dict of {member_id: Subscription}): decoded metadata
for each member in the group, including group_instance_id
when available.

Returns:
dict: {member_id: MemberAssignment}
Expand Down
27 changes: 16 additions & 11 deletions kafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from __future__ import absolute_import

import collections
import itertools
import logging

from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0

log = logging.getLogger(__name__)

Expand All @@ -32,45 +33,49 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
def assign(cls, cluster, group_subscriptions):
consumers_per_topic = collections.defaultdict(list)
for member, metadata in six.iteritems(member_metadata):
for topic in metadata.subscription:
consumers_per_topic[topic].append(member)
for member_id, subscription in six.iteritems(group_subscriptions):
for topic in subscription.topics:
consumers_per_topic[topic].append((subscription.group_instance_id, member_id))

# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)

for topic in consumers_per_topic:
# group by static members (True) v dynamic members (False)
grouped = {k: list(g) for k, g in itertools.groupby(consumers_per_topic[topic], key=lambda ids: ids[0] is not None)}
consumers_per_topic[topic] = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic

for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
continue
partitions = sorted(partitions)
consumers_for_topic.sort()

partitions_per_consumer = len(partitions) // len(consumers_for_topic)
consumers_with_extra = len(partitions) % len(consumers_for_topic)

for i, member in enumerate(consumers_for_topic):
for i, (_group_instance_id, member_id) in enumerate(consumers_for_topic):
start = partitions_per_consumer * i
start += min(i, consumers_with_extra)
length = partitions_per_consumer
if not i + 1 > consumers_with_extra:
length += 1
assignment[member][topic] = partitions[start:start+length]
assignment[member_id][topic] = partitions[start:start+length]

protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment):
Expand Down
25 changes: 15 additions & 10 deletions kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kafka.vendor import six

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.structs import TopicPartition

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -49,10 +49,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
version = 0

@classmethod
def assign(cls, cluster, member_metadata):
def assign(cls, cluster, group_subscriptions):
all_topics = set()
for metadata in six.itervalues(member_metadata):
all_topics.update(metadata.subscription)
for subscription in six.itervalues(group_subscriptions):
all_topics.update(subscription.topics)

all_topic_partitions = []
for topic in all_topics:
Expand All @@ -67,29 +67,34 @@ def assign(cls, cluster, member_metadata):
# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(lambda: collections.defaultdict(list))

member_iter = itertools.cycle(sorted(member_metadata.keys()))
# Sort static and dynamic members separately to maintain stable static assignments
ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in six.iteritems(group_subscriptions)]
grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)}
member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
member_iter = itertools.cycle(member_list)

for partition in all_topic_partitions:
member_id = next(member_iter)
_group_instance_id, member_id = next(member_iter)

# Because we constructed all_topic_partitions from the set of
# member subscribed topics, we should be safe assuming that
# each topic in all_topic_partitions is in at least one member
# subscription; otherwise this could yield an infinite loop
while partition.topic not in member_metadata[member_id].subscription:
while partition.topic not in group_subscriptions[member_id].topics:
member_id = next(member_iter)
assignment[member_id][partition.topic].append(partition.partition)

protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment):
Expand Down
17 changes: 9 additions & 8 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.coordinator.protocol import Schema
from kafka.protocol.struct import Struct
from kafka.protocol.types import String, Array, Int32
Expand Down Expand Up @@ -66,6 +66,7 @@ class StickyAssignorUserDataV1(Struct):

class StickyAssignmentExecutor:
def __init__(self, cluster, members):
# a mapping of member_id => StickyAssignorMemberMetadataV1
self.members = members
# a mapping between consumers and their assigned partitions that is updated during assignment procedure
self.current_assignment = defaultdict(list)
Expand Down Expand Up @@ -603,7 +604,7 @@ def assign(cls, cluster, members):

assignment = {}
for member_id in members:
assignment[member_id] = ConsumerProtocolMemberAssignment(
assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
cls.version, sorted(executor.get_final_assignment(member_id)), b''
)
return assignment
Expand All @@ -625,24 +626,24 @@ def parse_member_metadata(cls, metadata):
user_data = metadata.user_data
if not user_data:
return StickyAssignorMemberMetadataV1(
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
)

try:
decoded_user_data = StickyAssignorUserDataV1.decode(user_data)
except Exception as e:
except Exception:
# ignore the consumer's previous assignment if it cannot be parsed
log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args
log.exception("Could not parse member data")
return StickyAssignorMemberMetadataV1(
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
)

member_partitions = []
for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member
member_partitions.extend([TopicPartition(topic, partition) for partition in partitions])
return StickyAssignorMemberMetadataV1(
# pylint: disable=no-member
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.topics
)

@classmethod
Expand All @@ -661,7 +662,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)

@classmethod
def on_assignment(cls, assignment):
Expand Down
Loading
Loading