Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
use-cython: ['true', 'false']
experimental: [false]
include:
- python-version: 'pypy3.9'
- python-version: 'pypy3.11'
use-cython: false
experimental: true
env:
Expand Down
1 change: 1 addition & 0 deletions faust/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Python Stream processing."""

# :copyright: (c) 2017-2020, Robinhood Markets, Inc.
# All rights reserved.
# :license: BSD (3 Clause), see LICENSE for more details.
Expand Down
10 changes: 5 additions & 5 deletions faust/stores/aerospike.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _get(self, key: bytes) -> Optional[bytes]:
key = (self.namespace, self.table_name, key)
fun = self.client.get
try:
(key, meta, bins) = self.aerospike_fun_call_with_retry(fun=fun, key=key)
key, meta, bins = self.aerospike_fun_call_with_retry(fun=fun, key=key)
if bins:
return bins[self.BIN_KEY]
return None
Expand Down Expand Up @@ -173,7 +173,7 @@ def _itervalues(self) -> Iterator[bytes]:
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
(key, meta, bins) = result
key, meta, bins = result
if bins:
yield bins[self.BIN_KEY]
else:
Expand All @@ -193,8 +193,8 @@ def _iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
fun=fun, namespace=self.namespace, set=self.table_name
)
for result in scan.results():
(key_data, meta, bins) = result
(ns, set, policy, key) = key_data
key_data, meta, bins = result
ns, set, policy, key = key_data

if bins:
bins = bins[self.BIN_KEY]
Expand All @@ -214,7 +214,7 @@ def _contains(self, key: bytes) -> bool:
try:
if self.app.conf.store_check_exists:
key = (self.namespace, self.table_name, key)
(key, meta) = self.aerospike_fun_call_with_retry(
key, meta = self.aerospike_fun_call_with_retry(
fun=self.client.exists, key=key
)
if meta:
Expand Down
22 changes: 15 additions & 7 deletions faust/transport/drivers/aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
)
from aiokafka.partitioner import DefaultPartitioner, murmur2
from aiokafka.protocol.admin import CreateTopicsRequest
from aiokafka.protocol.metadata import MetadataRequest_v1
from aiokafka.protocol.metadata import MetadataRequest
from aiokafka.structs import OffsetAndMetadata, TopicPartition as _TopicPartition
from aiokafka.util import parse_kafka_version
from mode import Service, get_logger
Expand All @@ -52,6 +52,7 @@
from mode.utils.objects import cached_property
from mode.utils.times import Seconds, humanize_seconds_ago, want_seconds
from opentracing.ext import tags
from packaging.version import Version
from yarl import URL

from faust.auth import (
Expand Down Expand Up @@ -93,6 +94,8 @@

logger = get_logger(__name__)

_AIOKAFKA_HAS_API_VERSION = Version(aiokafka.__version__) < Version("0.13.0")

DEFAULT_GENERATION_ID = OffsetCommitRequest.DEFAULT_GENERATION_ID

TOPIC_LENGTH_MAX = 249
Expand Down Expand Up @@ -511,6 +514,7 @@ def _create_worker_consumer(
self._assignor = (
self.app.assignor
if self.app.conf.table_standby_replicas > 0
or bool(self.app.tables.changelog_topics)
else RoundRobinPartitionAssignor
)
auth_settings = credentials_to_aiokafka_auth(
Expand All @@ -529,8 +533,11 @@ def _create_worker_consumer(
f"broker_request_timeout={request_timeout}"
)

consumer_kwargs: dict[str, Any] = {}
if _AIOKAFKA_HAS_API_VERSION:
consumer_kwargs["api_version"] = conf.consumer_api_version
return aiokafka.AIOKafkaConsumer(
api_version=conf.consumer_api_version,
**consumer_kwargs,
client_id=conf.broker_client_id,
group_id=conf.id,
group_instance_id=conf.consumer_group_instance_id,
Expand Down Expand Up @@ -1111,7 +1118,7 @@ def __post_init__(self) -> None:

def _settings_default(self) -> Mapping[str, Any]:
transport = cast(Transport, self.transport)
return {
settings: dict[str, Any] = {
"bootstrap_servers": server_list(transport.url, transport.default_port),
"client_id": self.client_id,
"acks": self.acks,
Expand All @@ -1122,10 +1129,12 @@ def _settings_default(self) -> Mapping[str, Any]:
"security_protocol": "SSL" if self.ssl_context else "PLAINTEXT",
"partitioner": self.partitioner,
"request_timeout_ms": int(self.request_timeout * 1000),
"api_version": self._api_version,
"metadata_max_age_ms": self.app.conf.producer_metadata_max_age_ms,
"connections_max_idle_ms": self.app.conf.producer_connections_max_idle_ms,
}
if _AIOKAFKA_HAS_API_VERSION:
settings["api_version"] = self._api_version
return settings

def _settings_auth(self) -> Mapping[str, Any]:
return credentials_to_aiokafka_auth(self.credentials, self.ssl_context)
Expand Down Expand Up @@ -1505,7 +1514,7 @@ async def _get_controller_node(
for node_id in nodes:
if node_id is None:
raise NotReady("Not connected to Kafka Broker")
request = MetadataRequest_v1([])
request = MetadataRequest([])
wait_result = await owner.wait(
client.send(node_id, request),
timeout=timeout,
Expand Down Expand Up @@ -1538,7 +1547,6 @@ async def _really_create_topic(
owner.log.debug("Topic %r exists, skipping creation.", topic)
return

protocol_version = 1
extra_configs = config or {}
config = self._topic_config(retention, compacting, deleting)
config.update(extra_configs)
Expand All @@ -1555,7 +1563,7 @@ async def _really_create_topic(
else:
raise Exception("Controller node is None")

request = CreateTopicsRequest[protocol_version](
request = CreateTopicsRequest(
[(topic, partitions, replication, [], list(config.items()))],
timeout,
False,
Expand Down
1 change: 0 additions & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pytest<8
python-dateutil>=2.8
pytz>=2018.7
bandit
twine
wheel
intervaltree
-r requirements.txt
Expand Down
41 changes: 36 additions & 5 deletions tests/unit/transport/drivers/test_aiokafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from faust.sensors.monitor import Monitor
from faust.transport.drivers import aiokafka as mod
from faust.transport.drivers.aiokafka import (
_AIOKAFKA_HAS_API_VERSION,
SLOW_PROCESSING_CAUSE_AGENT,
SLOW_PROCESSING_CAUSE_STREAM,
SLOW_PROCESSING_EXPLAINED,
Expand Down Expand Up @@ -795,6 +796,26 @@ def test__create_worker_consumer__transaction(self, *, cthread, app):
isolation_level="read_committed",
)

def test__create_worker_consumer__uses_roundrobin_without_tables(
self, *, cthread, app
):
app.conf.table_standby_replicas = 0
app.tables._changelogs.clear()
transport = cthread.transport
with patch("aiokafka.AIOKafkaConsumer"):
cthread._create_worker_consumer(transport)
assert cthread._assignor is mod.RoundRobinPartitionAssignor

def test__create_worker_consumer__uses_faust_assignor_with_changelog_topics(
self, *, cthread, app
):
app.conf.table_standby_replicas = 0
app.tables._changelogs["app-foo-changelog"] = Mock(name="table")
transport = cthread.transport
with patch("aiokafka.AIOKafkaConsumer"):
cthread._create_worker_consumer(transport)
assert cthread._assignor is app.assignor

def assert_create_worker_consumer(
self,
cthread,
Expand All @@ -813,8 +834,7 @@ def assert_create_worker_consumer(
c = cthread._create_worker_consumer(transport)
assert c is AIOKafkaConsumer.return_value
max_poll_interval = conf.broker_max_poll_interval
AIOKafkaConsumer.assert_called_once_with(
api_version=app.conf.consumer_api_version,
expected_kwargs = dict(
client_id=conf.broker_client_id,
group_id=conf.id,
group_instance_id=conf.consumer_group_instance_id,
Expand All @@ -841,6 +861,9 @@ def assert_create_worker_consumer(
# flush_spans=cthread.flush_spans,
**auth_settings,
)
if _AIOKAFKA_HAS_API_VERSION:
expected_kwargs["api_version"] = app.conf.consumer_api_version
AIOKafkaConsumer.assert_called_once_with(**expected_kwargs)

def test__create_client_consumer(self, *, cthread, app):
transport = cthread.transport
Expand Down Expand Up @@ -1382,7 +1405,7 @@ def assert_new_producer(
with patch("aiokafka.AIOKafkaProducer") as AIOKafkaProducer:
p = producer._new_producer()
assert p is AIOKafkaProducer.return_value
AIOKafkaProducer.assert_called_once_with(
expected_kwargs = dict(
bootstrap_servers=bootstrap_servers,
client_id=client_id,
acks=acks,
Expand All @@ -1393,12 +1416,14 @@ def assert_new_producer(
security_protocol=security_protocol,
partitioner=producer.partitioner,
transactional_id=None,
api_version=api_version,
metadata_max_age_ms=metadata_max_age_ms,
connections_max_idle_ms=connections_max_idle_ms,
request_timeout_ms=request_timeout_ms,
**kwargs,
)
if _AIOKAFKA_HAS_API_VERSION:
expected_kwargs["api_version"] = api_version
AIOKafkaProducer.assert_called_once_with(**expected_kwargs)


class TestProducer(ProducerBaseTest):
Expand Down Expand Up @@ -1475,7 +1500,13 @@ def test__new_producer(self, *, app):
[
pytest.param(
{"api_version": "auto"},
marks=pytest.mark.conf(producer_api_version="auto"),
marks=[
pytest.mark.conf(producer_api_version="auto"),
pytest.mark.skipif(
not _AIOKAFKA_HAS_API_VERSION,
reason="api_version removed in aiokafka>=0.13.0",
),
],
),
pytest.param({"acks": -1}, marks=pytest.mark.conf(producer_acks="all")),
pytest.param(
Expand Down
Loading