diff --git a/kafka/errors.py b/kafka/errors.py index ac4eadfec..351e07375 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -15,6 +15,9 @@ def __str__(self): return '{0}: {1}'.format(self.__class__.__name__, super(KafkaError, self).__str__()) + def __eq__(self, other): + return self.__class__ == other.__class__ and self.args == other.args + class Cancelled(KafkaError): retriable = True diff --git a/kafka/producer/future.py b/kafka/producer/future.py index f67db0979..13392a96e 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,32 +29,35 @@ def wait(self, timeout=None): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): + def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future # packing args as a tuple is a minor speed optimization - self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) + self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) - def _produce_success(self, offset_and_timestamp): - offset, produce_timestamp_ms = offset_and_timestamp + def _produce_success(self, result): + offset, produce_timestamp_ms, record_exceptions_fn = result # Unpacking from args tuple is minor speed optimization - (relative_offset, timestamp_ms, checksum, + (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size) = self.args - # None is when Broker does not support the API (<0.10) and - # -1 is when the broker is configured for CREATE_TIME timestamps - if produce_timestamp_ms is not None and produce_timestamp_ms != -1: - timestamp_ms = produce_timestamp_ms - if offset != -1 and relative_offset is not None: - offset += relative_offset - tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, - checksum, serialized_key_size, - serialized_value_size, serialized_header_size) - self.success(metadata) + if record_exceptions_fn is not None: + self.failure(record_exceptions_fn(batch_index)) + else: + # None is when Broker does not support the API (<0.10) and + # -1 is when the broker is configured for CREATE_TIME timestamps + if produce_timestamp_ms is not None and produce_timestamp_ms != -1: + timestamp_ms = produce_timestamp_ms + if offset != -1 and batch_index is not None: + offset += batch_index + tp = self._produce_future.topic_partition + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, + checksum, serialized_key_size, + serialized_value_size, serialized_header_size) + self.success(metadata) def get(self, timeout=None): if not self.is_done and not self._produce_future.wait(timeout): diff --git a/kafka/producer/producer_batch.py b/kafka/producer/producer_batch.py new file mode 100644 index 000000000..8be08f575 --- /dev/null +++ b/kafka/producer/producer_batch.py @@ -0,0 +1,184 @@ +from __future__ import absolute_import, division + +import logging +import time + +try: + # enum in stdlib as of py3.4 + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + +import kafka.errors as Errors +from kafka.producer.future import FutureRecordMetadata, FutureProduceResult + + +log = logging.getLogger(__name__) + + +class FinalState(IntEnum): + ABORTED = 0 + FAILED = 1 + SUCCEEDED = 2 + + +class ProducerBatch(object): + def __init__(self, tp, records, now=None): + now = time.time() if now is None else now + self.max_record_size = 0 + self.created = now + self.drained = None + self.attempts = 0 + self.last_attempt = now + self.last_append = now + self.records = records + self.topic_partition = tp + self.produce_future = FutureProduceResult(tp) + self._retry = False + self._final_state = None + + @property + def final_state(self): + return self._final_state + + @property + def record_count(self): + return self.records.next_offset() + + @property + def producer_id(self): + return self.records.producer_id if self.records else None + + @property + def producer_epoch(self): + return self.records.producer_epoch if self.records else None + + @property + def has_sequence(self): + return self.records.has_sequence if self.records else False + + def try_append(self, timestamp_ms, key, value, headers, now=None): + metadata = self.records.append(timestamp_ms, key, value, headers) + if metadata is None: + return None + + now = time.time() if now is None else now + self.max_record_size = max(self.max_record_size, metadata.size) + self.last_append = now + future = FutureRecordMetadata( + self.produce_future, + metadata.offset, + metadata.timestamp, + metadata.crc, + len(key) if key is not None else -1, + len(value) if value is not None else -1, + sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) + return future + + def abort(self, exception): + """Abort the batch and complete the future and callbacks.""" + if self._final_state is not None: + raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state) + self._final_state = FinalState.ABORTED + + log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception) + self._complete_future(-1, -1, lambda _: exception) + + def complete(self, base_offset, log_append_time): + """Complete the batch successfully. + + Arguments: + base_offset (int): The base offset of the messages assigned by the server + log_append_time (int): The log append time or -1 if CreateTime is being used + + Returns: True if the batch was completed as a result of this call, and False + if it had been completed previously. + """ + return self.done(base_offset=base_offset, timestamp_ms=log_append_time) + + def complete_exceptionally(self, top_level_exception, record_exceptions_fn): + """ + Complete the batch exceptionally. The provided top-level exception will be used + for each record future contained in the batch. + + Arguments: + top_level_exception (Exception): top-level partition error. + record_exceptions_fn (callable int -> Exception): Record exception function mapping + batch_index to the respective record exception. + Returns: True if the batch was completed as a result of this call, and False + if it had been completed previously. + """ + assert isinstance(top_level_exception, Exception) + assert callable(record_exceptions_fn) + return self.done(top_level_exception=top_level_exception, record_exceptions_fn=record_exceptions_fn) + + def done(self, base_offset=None, timestamp_ms=None, top_level_exception=None, record_exceptions_fn=None): + """ + Finalize the state of a batch. Final state, once set, is immutable. This function may be called + once or twice on a batch. It may be called twice if + 1. An inflight batch expires before a response from the broker is received. The batch's final + state is set to FAILED. But it could succeed on the broker and second time around batch.done() may + try to set SUCCEEDED final state. + + 2. If a transaction abortion happens or if the producer is closed forcefully, the final state is + ABORTED but again it could succeed if broker responds with a success. + + Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged. + Attempted transitions from one failure state to the same or a different failed state are ignored. + Attempted transitions from SUCCEEDED to the same or a failed state throw an exception. + """ + final_state = FinalState.SUCCEEDED if top_level_exception is None else FinalState.FAILED + if self._final_state is None: + self._final_state = final_state + if final_state is FinalState.SUCCEEDED: + log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset) + else: + log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s", + self.topic_partition, base_offset, top_level_exception) + self._complete_future(base_offset, timestamp_ms, record_exceptions_fn) + return True + + elif self._final_state is not FinalState.SUCCEEDED: + if final_state is FinalState.SUCCEEDED: + # Log if a previously unsuccessful batch succeeded later on. + log.debug("ProduceResponse returned %s for %s after batch with base offset %s had already been %s.", + final_state, self.topic_partition, base_offset, self._final_state) + else: + # FAILED --> FAILED and ABORTED --> FAILED transitions are ignored. + log.debug("Ignored state transition %s -> %s for %s batch with base offset %s", + self._final_state, final_state, self.topic_partition, base_offset) + else: + # A SUCCESSFUL batch must not attempt another state change. + raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state)) + return False + + def _complete_future(self, base_offset, timestamp_ms, record_exceptions_fn): + if self.produce_future.is_done: + raise Errors.IllegalStateError('Batch is already closed!') + self.produce_future.success((base_offset, timestamp_ms, record_exceptions_fn)) + + def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None): + now = time.time() if now is None else now + return delivery_timeout_ms / 1000 <= now - self.created + + def in_retry(self): + return self._retry + + def retry(self, now=None): + now = time.time() if now is None else now + self._retry = True + self.attempts += 1 + self.last_attempt = now + self.last_append = now + + @property + def is_done(self): + return self.produce_future.is_done + + def __str__(self): + return 'ProducerBatch(topic_partition=%s, record_count=%d)' % ( + self.topic_partition, self.records.next_offset()) + + + diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 3a4e60146..1add95a3b 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -6,15 +6,8 @@ import threading import time -try: - # enum in stdlib as of py3.4 - from enum import IntEnum # pylint: disable=import-error -except ImportError: - # vendored backport module - from kafka.vendor.enum34 import IntEnum - import kafka.errors as Errors -from kafka.producer.future import FutureRecordMetadata, FutureProduceResult +from kafka.producer.producer_batch import ProducerBatch from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition @@ -41,145 +34,6 @@ def get(self): return self._val -class FinalState(IntEnum): - ABORTED = 0 - FAILED = 1 - SUCCEEDED = 2 - - -class ProducerBatch(object): - def __init__(self, tp, records, now=None): - now = time.time() if now is None else now - self.max_record_size = 0 - self.created = now - self.drained = None - self.attempts = 0 - self.last_attempt = now - self.last_append = now - self.records = records - self.topic_partition = tp - self.produce_future = FutureProduceResult(tp) - self._retry = False - self._final_state = None - - @property - def final_state(self): - return self._final_state - - @property - def record_count(self): - return self.records.next_offset() - - @property - def producer_id(self): - return self.records.producer_id if self.records else None - - @property - def producer_epoch(self): - return self.records.producer_epoch if self.records else None - - @property - def has_sequence(self): - return self.records.has_sequence if self.records else False - - def try_append(self, timestamp_ms, key, value, headers, now=None): - metadata = self.records.append(timestamp_ms, key, value, headers) - if metadata is None: - return None - - now = time.time() if now is None else now - self.max_record_size = max(self.max_record_size, metadata.size) - self.last_append = now - future = FutureRecordMetadata( - self.produce_future, - metadata.offset, - metadata.timestamp, - metadata.crc, - len(key) if key is not None else -1, - len(value) if value is not None else -1, - sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1) - return future - - def abort(self, exception): - """Abort the batch and complete the future and callbacks.""" - if self._final_state is not None: - raise Errors.IllegalStateError("Batch has already been completed in final state: %s" % self._final_state) - self._final_state = FinalState.ABORTED - - log.debug("Aborting batch for partition %s: %s", self.topic_partition, exception) - self._complete_future(-1, -1, exception) - - def done(self, base_offset=None, timestamp_ms=None, exception=None): - """ - Finalize the state of a batch. Final state, once set, is immutable. This function may be called - once or twice on a batch. It may be called twice if - 1. An inflight batch expires before a response from the broker is received. The batch's final - state is set to FAILED. But it could succeed on the broker and second time around batch.done() may - try to set SUCCEEDED final state. - - 2. If a transaction abortion happens or if the producer is closed forcefully, the final state is - ABORTED but again it could succeed if broker responds with a success. - - Attempted transitions from [FAILED | ABORTED] --> SUCCEEDED are logged. - Attempted transitions from one failure state to the same or a different failed state are ignored. - Attempted transitions from SUCCEEDED to the same or a failed state throw an exception. - """ - final_state = FinalState.SUCCEEDED if exception is None else FinalState.FAILED - if self._final_state is None: - self._final_state = final_state - if final_state is FinalState.SUCCEEDED: - log.debug("Successfully produced messages to %s with base offset %s", self.topic_partition, base_offset) - else: - log.warning("Failed to produce messages to topic-partition %s with base offset %s: %s", - self.topic_partition, base_offset, exception) - self._complete_future(base_offset, timestamp_ms, exception) - return True - - elif self._final_state is not FinalState.SUCCEEDED: - if final_state is FinalState.SUCCEEDED: - # Log if a previously unsuccessful batch succeeded later on. - log.debug("ProduceResponse returned %s for %s after batch with base offset %s had already been %s.", - final_state, self.topic_partition, base_offset, self._final_state) - else: - # FAILED --> FAILED and ABORTED --> FAILED transitions are ignored. - log.debug("Ignored state transition %s -> %s for %s batch with base offset %s", - self._final_state, final_state, self.topic_partition, base_offset) - else: - # A SUCCESSFUL batch must not attempt another state change. - raise Errors.IllegalStateError("A %s batch must not attempt another state change to %s" % (self._final_state, final_state)) - return False - - def _complete_future(self, base_offset, timestamp_ms, exception): - if self.produce_future.is_done: - raise Errors.IllegalStateError('Batch is already closed!') - elif exception is None: - self.produce_future.success((base_offset, timestamp_ms)) - else: - self.produce_future.failure(exception) - - def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None): - now = time.time() if now is None else now - return delivery_timeout_ms / 1000 <= now - self.created - - def in_retry(self): - return self._retry - - def retry(self, now=None): - now = time.time() if now is None else now - self._retry = True - self.attempts += 1 - self.last_attempt = now - self.last_append = now - - @property - def is_done(self): - return self.produce_future.is_done - - def __str__(self): - return 'ProducerBatch(topic_partition=%s, record_count=%d)' % ( - self.topic_partition, self.records.next_offset()) - - class RecordAccumulator(object): """ This class maintains a dequeue per TopicPartition that accumulates messages diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 7a4c557c8..09b9a0f10 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -21,6 +21,11 @@ log = logging.getLogger(__name__) +PartitionResponse = collections.namedtuple("PartitionResponse", + ["error", "base_offset", "last_offset", "log_append_time", "log_start_offset", "record_errors", "error_message", "current_leader"]) +PartitionResponse.__new__.__defaults__ = (Errors.NoError, -1, -1, -1, -1, (), None, (-1, -1)) + + class Sender(threading.Thread): """ The background thread that handles the sending of produce requests to the @@ -225,11 +230,10 @@ def _send_producer_data(self, now=None): needs_transaction_state_reset = False for expired_batch in expired_batches: - error = Errors.KafkaTimeoutError( - "Expiring %d record(s) for %s: %s ms has passed since batch creation" % ( - expired_batch.record_count, expired_batch.topic_partition, - int((time.time() - expired_batch.created) * 1000))) - self._fail_batch(expired_batch, error, base_offset=-1) + error_message = "Expiring %d record(s) for %s: %s ms has passed since batch creation" % ( + expired_batch.record_count, expired_batch.topic_partition, + int((time.time() - expired_batch.created) * 1000)) + self._fail_batch(expired_batch, PartitionResponse(error=Errors.KafkaTimeoutError, error_message=error_message)) if self._sensors: self._sensors.update_produce_request_metrics(batches_by_node) @@ -391,7 +395,7 @@ def _maybe_wait_for_producer_id(self): def _failed_produce(self, batches, node_id, error): log.error("%s: Error sending produce request to node %d: %s", str(self), node_id, error) # trace for batch in batches: - self._complete_batch(batch, error, -1) + self._complete_batch(batch, PartitionResponse(error=error)) def _handle_produce_response(self, node_id, send_time, batches, response): """Handle a produce response.""" @@ -403,35 +407,67 @@ def _handle_produce_response(self, node_id, send_time, batches, response): for topic, partitions in response.topics: for partition_info in partitions: + log_append_time = -1 + log_start_offset = -1 + record_errors = () + error_message = None if response.API_VERSION < 2: - partition, error_code, offset = partition_info - ts = None + partition, error_code, base_offset = partition_info elif 2 <= response.API_VERSION <= 4: - partition, error_code, offset, ts = partition_info + partition, error_code, base_offset, log_append_time = partition_info elif 5 <= response.API_VERSION <= 7: - partition, error_code, offset, ts, _log_start_offset = partition_info + partition, error_code, base_offset, log_append_time, log_start_offset = partition_info else: - # Currently unused / TODO: KIP-467 - partition, error_code, offset, ts, _log_start_offset, _record_errors, _global_error = partition_info + partition, error_code, base_offset, log_append_time, log_start_offset, record_errors, error_message = partition_info tp = TopicPartition(topic, partition) - error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset, timestamp_ms=ts) - + partition_response = PartitionResponse( + error=Errors.for_code(error_code), + base_offset=base_offset, + last_offset=-1, + log_append_time=log_append_time, + log_start_offset=log_start_offset, + record_errors=record_errors, + error_message=error_message, + ) + self._complete_batch(batch, partition_response) else: # this is the acks = 0 case, just complete all requests for batch in batches: - self._complete_batch(batch, None, -1) + self._complete_batch(batch, PartitionResponse()) + + def _record_exceptions_fn(self, top_level_exception, record_errors, error_message): + """Returns a fn mapping batch_index to exception""" + # When no record_errors, all batches resolve to top-level exception + if not record_errors: + return lambda _: top_level_exception + + record_errors_dict = dict(record_errors) + def record_exceptions_fn(batch_index): + if batch_index not in record_errors_dict: + return Errors.KafkaError( + "Failed to append record because it was part of a batch which had one more more invalid records") + record_error = record_errors_dict[batch_index] + err_msg = record_error or error_message or top_level_exception.description + exc = top_level_exception.__class__ if len(record_errors) == 1 else Errors.InvalidRecordError + return exc(err_msg) + return record_exceptions_fn + + def _fail_batch(self, batch, partition_response): + if partition_response.error is Errors.TopicAuthorizationFailedError: + exception = Errors.TopicAuthorizationFailedError(batch.topic_partition.topic) + elif partition_response.error is Errors.ClusterAuthorizationFailedError: + exception = Errors.ClusterAuthorizationFailedError("The producer is not authorized to do idempotent sends") + else: + exception = partition_response.error(partition_response.error_message) - def _fail_batch(self, batch, exception, base_offset=None, timestamp_ms=None): - exception = exception if type(exception) is not type else exception() if self._transaction_manager: if isinstance(exception, Errors.OutOfOrderSequenceNumberError) and \ not self._transaction_manager.is_transactional() and \ self._transaction_manager.has_producer_id(batch.producer_id): log.error("%s: The broker received an out of order sequence number for topic-partition %s" " at offset %s. This indicates data loss on the broker, and should be investigated.", - str(self), batch.topic_partition, base_offset) + str(self), batch.topic_partition, partition_response.base_offset) # Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees # about the previously committed message. Note that this will discard the producer id and sequence @@ -448,31 +484,31 @@ def _fail_batch(self, batch, exception, base_offset=None, timestamp_ms=None): if self._sensors: self._sensors.record_errors(batch.topic_partition.topic, batch.record_count) - if batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms, exception=exception): + record_exceptions_fn = self._record_exceptions_fn(exception, partition_response.record_errors, partition_response.error_message) + if batch.complete_exceptionally(exception, record_exceptions_fn): self._maybe_remove_from_inflight_batches(batch) self._accumulator.deallocate(batch) - def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): + def _complete_batch(self, batch, partition_response): """Complete or retry the given batch of records. Arguments: batch (ProducerBatch): The record batch - error (Exception): The error (or None if none) - base_offset (int): The base offset assigned to the records if successful - timestamp_ms (int, optional): The timestamp returned by the broker for this batch + partition_response (PartitionResponse): Response details for partition """ # Standardize no-error to None + error = partition_response.error if error is Errors.NoError: error = None if error is not None: if self._can_retry(batch, error): # retry - log.warning("%s: Got error produce response on topic-partition %s," - " retrying (%s attempts left). Error: %s", + log.warning("%s: Got error produce response on topic-partition %s, retrying (%s attempts left): %s%s", str(self), batch.topic_partition, self.config['retries'] - batch.attempts - 1, - error) + error.__class__.__name__, + (". Error Message: %s" % partition_response.error_message) if partition_response.error_message else "") # If idempotence is enabled only retry the request if the batch matches our current producer id and epoch if not self._transaction_manager or self._transaction_manager.producer_id_and_epoch.match(batch): @@ -488,13 +524,10 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): str(self), batch.producer_id, batch.producer_epoch, self._transaction_manager.producer_id_and_epoch.producer_id, self._transaction_manager.producer_id_and_epoch.epoch) - self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms) + self._fail_batch(batch, partition_response) else: - if error is Errors.TopicAuthorizationFailedError: - error = error(batch.topic_partition.topic) - # tell the user the result of their request - self._fail_batch(batch, error, base_offset=base_offset, timestamp_ms=timestamp_ms) + self._fail_batch(batch, partition_response) if error is Errors.UnknownTopicOrPartitionError: log.warning("%s: Received unknown topic or partition error in produce request on partition %s." @@ -505,7 +538,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): self._metadata.request_update() else: - if batch.done(base_offset=base_offset, timestamp_ms=timestamp_ms): + if batch.complete(partition_response.base_offset, partition_response.log_append_time): self._maybe_remove_from_inflight_batches(batch) self._accumulator.deallocate(batch) @@ -561,7 +594,7 @@ def _produce_request(self, node_id, acks, timeout, batches): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - version = self._client.api_version(ProduceRequest, max_version=7) + version = self._client.api_version(ProduceRequest, max_version=8) topic_partition_data = [ (topic, list(partition_info.items())) for topic, partition_info in six.iteritems(produce_records_by_partition)] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 3076a2810..94edd0f80 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -112,12 +112,12 @@ class ProduceResponse_v8(Response): ('error_code', Int16), ('offset', Int64), ('timestamp', Int64), - ('log_start_offset', Int64)), + ('log_start_offset', Int64), ('record_errors', (Array( ('batch_index', Int32), ('batch_index_error_message', String('utf-8')) ))), - ('error_message', String('utf-8')) + ('error_message', String('utf-8'))) ))), ('throttle_time_ms', Int32) ) diff --git a/test/test_producer_batch.py b/test/test_producer_batch.py new file mode 100644 index 000000000..bffa79fcb --- /dev/null +++ b/test/test_producer_batch.py @@ -0,0 +1,136 @@ +# pylint: skip-file +from __future__ import absolute_import, division + +import pytest + +from kafka.errors import IllegalStateError, KafkaError +from kafka.producer.future import FutureRecordMetadata, RecordMetadata +from kafka.producer.producer_batch import ProducerBatch +from kafka.record.memory_records import MemoryRecordsBuilder +from kafka.structs import TopicPartition + + +@pytest.fixture +def tp(): + return TopicPartition('foo', 0) + + +@pytest.fixture +def memory_records_builder(): + return MemoryRecordsBuilder(magic=2, compression_type=0, batch_size=100000) + + +@pytest.fixture +def batch(tp, memory_records_builder): + return ProducerBatch(tp, memory_records_builder) + + +def test_producer_batch_producer_id(tp, memory_records_builder): + batch = ProducerBatch(tp, memory_records_builder) + assert batch.producer_id == -1 + batch.records.set_producer_state(123, 456, 789, False) + assert batch.producer_id == 123 + memory_records_builder.close() + assert batch.producer_id == 123 + + +@pytest.mark.parametrize("magic", [0, 1, 2]) +def test_producer_batch_try_append(magic): + tp = TopicPartition('foo', 0) + records = MemoryRecordsBuilder( + magic=magic, compression_type=0, batch_size=100000) + batch = ProducerBatch(tp, records) + assert batch.record_count == 0 + future = batch.try_append(0, b'key', b'value', []) + assert isinstance(future, FutureRecordMetadata) + assert not future.is_done + batch.complete(123, 456) + assert future.is_done + # record-level checksum only provided in v0/v1 formats; payload includes magic-byte + if magic == 0: + checksum = 592888119 + elif magic == 1: + checksum = 213653215 + else: + checksum = None + + expected_metadata = RecordMetadata( + topic=tp[0], partition=tp[1], topic_partition=tp, + offset=123, timestamp=456, checksum=checksum, + serialized_key_size=3, serialized_value_size=5, serialized_header_size=-1) + assert future.value == expected_metadata + + +def test_producer_batch_retry(batch): + assert not batch.in_retry() + batch.retry() + assert batch.in_retry() + + +def test_batch_abort(batch): + future = batch.try_append(123, None, b'msg', []) + batch.abort(KafkaError()) + assert future.is_done + + # subsequent completion should be ignored + assert not batch.complete(500, 2342342341) + assert not batch.complete_exceptionally(KafkaError('top_level'), lambda _: KafkaError('record')) + + assert future.is_done + with pytest.raises(KafkaError): + future.get() + + +def test_batch_cannot_abort_twice(batch): + future = batch.try_append(123, None, b'msg', []) + batch.abort(KafkaError()) + with pytest.raises(IllegalStateError): + batch.abort(KafkaError()) + assert future.is_done + with pytest.raises(KafkaError): + future.get() + + +def test_batch_cannot_complete_twice(batch): + future = batch.try_append(123, None, b'msg', []) + batch.complete(500, 10) + with pytest.raises(IllegalStateError): + batch.complete(1000, 20) + record_metadata = future.get() + assert record_metadata.offset == 500 + assert record_metadata.timestamp == 10 + + +def _test_complete_exceptionally(batch, record_count, top_level_exception, record_exceptions_fn): + futures = [] + for i in range(record_count): + futures.append(batch.try_append(0, b'key', b'value', [])) + + assert record_count == batch.record_count + + batch.complete_exceptionally(top_level_exception, record_exceptions_fn) + assert batch.is_done + + for i, future in enumerate(futures): + assert future.is_done + assert future.failed() + assert isinstance(future.exception, RuntimeError) + assert record_exceptions_fn(i) == future.exception + + +def test_complete_exceptionally_with_record_errors(batch): + record_count = 5 + top_level_exception = RuntimeError() + + record_exceptions_map = {0: RuntimeError(), 3: RuntimeError()} + record_exceptions_fn = lambda i: record_exceptions_map.get(i, top_level_exception) + + _test_complete_exceptionally(batch, record_count, top_level_exception, record_exceptions_fn) + + +def test_complete_exceptionally_with_null_record_errors(batch): + record_count = 5 + top_level_exception = RuntimeError() + + with pytest.raises(AssertionError): + _test_complete_exceptionally(batch, record_count, top_level_exception, None) diff --git a/test/test_record_accumulator.py b/test/test_record_accumulator.py index 5c7134e5c..0f61c21cf 100644 --- a/test/test_record_accumulator.py +++ b/test/test_record_accumulator.py @@ -4,11 +4,8 @@ import pytest from kafka.cluster import ClusterMetadata -from kafka.errors import IllegalStateError, KafkaError -from kafka.producer.future import FutureRecordMetadata, RecordMetadata -from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch +from kafka.producer.record_accumulator import RecordAccumulator from kafka.record.default_records import DefaultRecordBatchBuilder -from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition @@ -23,103 +20,6 @@ def cluster(tp, mocker): mocker.patch.object(metadata, 'partitions_for_broker', return_value=[tp]) return metadata -def test_producer_batch_producer_id(): - tp = TopicPartition('foo', 0) - records = MemoryRecordsBuilder( - magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records) - assert batch.producer_id == -1 - batch.records.set_producer_state(123, 456, 789, False) - assert batch.producer_id == 123 - records.close() - assert batch.producer_id == 123 - -@pytest.mark.parametrize("magic", [0, 1, 2]) -def test_producer_batch_try_append(magic): - tp = TopicPartition('foo', 0) - records = MemoryRecordsBuilder( - magic=magic, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records) - assert batch.record_count == 0 - future = batch.try_append(0, b'key', b'value', []) - assert isinstance(future, FutureRecordMetadata) - assert not future.is_done - batch.done(base_offset=123, timestamp_ms=456) - assert future.is_done - # record-level checksum only provided in v0/v1 formats; payload includes magic-byte - if magic == 0: - checksum = 592888119 - elif magic == 1: - checksum = 213653215 - else: - checksum = None - - expected_metadata = RecordMetadata( - topic=tp[0], partition=tp[1], topic_partition=tp, - offset=123, timestamp=456, checksum=checksum, - serialized_key_size=3, serialized_value_size=5, serialized_header_size=-1) - assert future.value == expected_metadata - -def test_producer_batch_retry(): - tp = TopicPartition('foo', 0) - records = MemoryRecordsBuilder( - magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records) - assert not batch.in_retry() - batch.retry() - assert batch.in_retry() - -def test_batch_abort(): - tp = TopicPartition('foo', 0) - records = MemoryRecordsBuilder( - magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records) - future = batch.try_append(123, None, b'msg', []) - - batch.abort(KafkaError()) - assert future.is_done - - # subsequent completion should be ignored - batch.done(500, 2342342341) - batch.done(exception=KafkaError()) - - assert future.is_done - with pytest.raises(KafkaError): - future.get() - -def test_batch_cannot_abort_twice(): - tp = TopicPartition('foo', 0) - records = MemoryRecordsBuilder( - magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records) - future = batch.try_append(123, None, b'msg', []) - - batch.abort(KafkaError()) - - with pytest.raises(IllegalStateError): - batch.abort(KafkaError()) - - assert future.is_done - with pytest.raises(KafkaError): - future.get() - -def test_batch_cannot_complete_twice(): - tp = TopicPartition('foo', 0) - records = MemoryRecordsBuilder( - magic=2, compression_type=0, batch_size=100000) - batch = ProducerBatch(tp, records) - future = batch.try_append(123, None, b'msg', []) - - batch.done(500, 10, None) - - with pytest.raises(IllegalStateError): - batch.done(1000, 20, None) - - record_metadata = future.get() - - assert record_metadata.offset == 500 - assert record_metadata.timestamp == 10 - def test_linger(tp, cluster): now = 0 accum = RecordAccumulator(linger_ms=10) diff --git a/test/test_sender.py b/test/test_sender.py index 6d29c1e44..567f1b2ad 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -19,8 +19,10 @@ from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.producer.kafka import KafkaProducer from kafka.protocol.produce import ProduceRequest -from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch -from kafka.producer.sender import Sender +from kafka.producer.future import FutureRecordMetadata +from kafka.producer.producer_batch import ProducerBatch +from kafka.producer.record_accumulator import RecordAccumulator +from kafka.producer.sender import PartitionResponse, Sender from kafka.producer.transaction_manager import TransactionManager from kafka.record.memory_records import MemoryRecordsBuilder from kafka.structs import TopicPartition @@ -92,11 +94,11 @@ def test_complete_batch_success(sender): assert not batch.produce_future.is_done # No error, base_offset 0 - sender._complete_batch(batch, None, 0, timestamp_ms=123) + sender._complete_batch(batch, PartitionResponse(base_offset=0, log_append_time=123)) assert batch.is_done assert batch.produce_future.is_done assert batch.produce_future.succeeded() - assert batch.produce_future.value == (0, 123) + assert batch.produce_future.value == (0, 123, None) def test_complete_batch_transaction(sender, transaction_manager): @@ -106,7 +108,7 @@ def test_complete_batch_transaction(sender, transaction_manager): assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id # No error, base_offset 0 - sender._complete_batch(batch, None, 0) + sender._complete_batch(batch, PartitionResponse(base_offset=0)) assert batch.is_done assert sender._transaction_manager.sequence_number(batch.topic_partition) == batch.record_count @@ -134,14 +136,15 @@ def test_complete_batch_error(sender, error, refresh_metadata): sender.config['retries'] = 0 assert sender._client.cluster.ttl() > 0 batch = producer_batch() - sender._complete_batch(batch, error, -1) + future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) + sender._complete_batch(batch, PartitionResponse(error=error)) if refresh_metadata: assert sender._client.cluster.ttl() == 0 else: assert sender._client.cluster.ttl() > 0 assert batch.is_done - assert batch.produce_future.failed() - assert isinstance(batch.produce_future.exception, error) + assert future.failed() + assert isinstance(future.exception, error) @pytest.mark.parametrize(("error", "retry"), [ @@ -163,37 +166,40 @@ def test_complete_batch_error(sender, error, refresh_metadata): ]) def test_complete_batch_retry(sender, accumulator, mocker, error, retry): sender.config['retries'] = 1 - mocker.spy(sender, '_fail_batch') mocker.patch.object(accumulator, 'reenqueue') batch = producer_batch() - sender._complete_batch(batch, error, -1) + future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) + sender._complete_batch(batch, PartitionResponse(error=error)) if retry: assert not batch.is_done accumulator.reenqueue.assert_called_with(batch) batch.attempts += 1 # normally handled by accumulator.reenqueue, but it's mocked - sender._complete_batch(batch, error, -1) + sender._complete_batch(batch, PartitionResponse(error=error)) assert batch.is_done - assert isinstance(batch.produce_future.exception, error) + assert future.failed() + assert isinstance(future.exception, error) else: assert batch.is_done - assert isinstance(batch.produce_future.exception, error) + assert future.failed() + assert isinstance(future.exception, error) def test_complete_batch_producer_id_changed_no_retry(sender, accumulator, transaction_manager, mocker): sender._transaction_manager = transaction_manager sender.config['retries'] = 1 - mocker.spy(sender, '_fail_batch') mocker.patch.object(accumulator, 'reenqueue') error = Errors.NotLeaderForPartitionError batch = producer_batch() - sender._complete_batch(batch, error, -1) + future = FutureRecordMetadata(batch.produce_future, -1, -1, -1, -1, -1, -1) + sender._complete_batch(batch, PartitionResponse(error=error)) assert not batch.is_done accumulator.reenqueue.assert_called_with(batch) batch.records._producer_id = 123 # simulate different producer_id assert batch.producer_id != sender._transaction_manager.producer_id_and_epoch.producer_id - sender._complete_batch(batch, error, -1) + sender._complete_batch(batch, PartitionResponse(error=error)) assert batch.is_done - assert isinstance(batch.produce_future.exception, error) + assert future.failed() + assert isinstance(future.exception, error) def test_fail_batch(sender, accumulator, transaction_manager, mocker): @@ -201,9 +207,9 @@ def test_fail_batch(sender, accumulator, transaction_manager, mocker): batch = producer_batch() mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id - error = Exception('error') - sender._fail_batch(batch, base_offset=0, timestamp_ms=None, exception=error) - batch.done.assert_called_with(base_offset=0, timestamp_ms=None, exception=error) + error = Errors.KafkaError + sender._fail_batch(batch, PartitionResponse(error=error)) + batch.done.assert_called_with(top_level_exception=error(None), record_exceptions_fn=mocker.ANY) def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, transaction_manager, mocker): @@ -213,10 +219,10 @@ def test_out_of_order_sequence_number_reset_producer_id(sender, accumulator, tra batch = producer_batch() mocker.patch.object(batch, 'done') assert sender._transaction_manager.producer_id_and_epoch.producer_id == batch.producer_id - error = Errors.OutOfOrderSequenceNumberError() - sender._fail_batch(batch, base_offset=0, timestamp_ms=None, exception=error) + error = Errors.OutOfOrderSequenceNumberError + sender._fail_batch(batch, PartitionResponse(base_offset=0, log_append_time=None, error=error)) sender._transaction_manager.reset_producer_id.assert_called_once() - batch.done.assert_called_with(base_offset=0, timestamp_ms=None, exception=error) + batch.done.assert_called_with(top_level_exception=error(None), record_exceptions_fn=mocker.ANY) def test_handle_produce_response(): @@ -228,9 +234,9 @@ def test_failed_produce(sender, mocker): mock_batches = ['foo', 'bar', 'fizzbuzz'] sender._failed_produce(mock_batches, 0, 'error') sender._complete_batch.assert_has_calls([ - call('foo', 'error', -1), - call('bar', 'error', -1), - call('fizzbuzz', 'error', -1), + call('foo', PartitionResponse(error='error')), + call('bar', PartitionResponse(error='error')), + call('fizzbuzz', PartitionResponse(error='error')), ]) @@ -253,3 +259,14 @@ def test__send_producer_data_expiry_time_reset(sender, accumulator, mocker): now += accumulator.config['delivery_timeout_ms'] poll_timeout_ms = sender._send_producer_data(now=now) assert poll_timeout_ms > 0 + + +def test__record_exceptions_fn(sender): + record_exceptions_fn = sender._record_exceptions_fn(Errors.KafkaError('top-level'), [(0, 'err-0'), (3, 'err-3')], 'message') + assert record_exceptions_fn(0) == Errors.InvalidRecordError('err-0') + assert record_exceptions_fn(1) == Errors.KafkaError('Failed to append record because it was part of a batch which had one more more invalid records') + assert record_exceptions_fn(2) == Errors.KafkaError('Failed to append record because it was part of a batch which had one more more invalid records') + assert record_exceptions_fn(3) == Errors.InvalidRecordError('err-3') + + record_exceptions_fn = sender._record_exceptions_fn(Errors.KafkaError('top-level'), [(0, 'err-0')], 'message') + assert record_exceptions_fn(0) == Errors.KafkaError('err-0')