From c5f0408d84a87420d5070172295db43ddfa20444 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Tue, 4 Nov 2025 22:23:45 -0800 Subject: [PATCH 1/2] fix(streams): commit tasks before rebalance in case of blocking on state updater --- .../processor/internals/StreamsPartitionAssignor.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index f458629cf1f2c..f1e7af0599c41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMet @Override public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { + Set tasksWithOpenTransactions = taskManager.allOwnedTasks() + .values() + .stream() + .filter(t -> t.commitNeeded()) + .collect(Collectors.toSet()); + log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size()); + taskManager.commit(tasksWithOpenTransactions); + final List partitions = new ArrayList<>(assignment.partitions()); partitions.sort(PARTITION_COMPARATOR); From 163f2c9de19c70b3a79ff3fa6505aaf43c394209 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Tue, 4 Nov 2025 22:41:23 -0800 Subject: [PATCH 2/2] adds test and fixes checkstyle --- .../processor/internals/StreamsPartitionAssignor.java | 2 +- .../internals/StreamsPartitionAssignorTest.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index f1e7af0599c41..2f3360a3f53ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1458,7 +1458,7 @@ protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMet @Override public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { - Set tasksWithOpenTransactions = taskManager.allOwnedTasks() + final Set tasksWithOpenTransactions = taskManager.allOwnedTasks() .values() .stream() .filter(t -> t.commitNeeded()) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 08674857b7182..8e072f08012d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1442,6 +1442,14 @@ public void testOnAssignment(final Map parameterizedConfig) { setUp(parameterizedConfig, false); taskManager = mock(TaskManager.class); + final Task notToCommit = mock(Task.class); + final Task toCommit = mock(Task.class); + final TaskId notToCommitId = new TaskId(0, 0); + final TaskId toCommitId = new TaskId(0, 1); + when(notToCommit.commitNeeded()).thenReturn(false); + when(toCommit.commitNeeded()).thenReturn(true); + when(taskManager.allOwnedTasks()).thenReturn(Map.of(notToCommitId, notToCommit, toCommitId, toCommit)); + final Map> hostState = Collections.singletonMap( new HostInfo("localhost", 9090), Set.of(t3p0, t3p3)); @@ -1465,6 +1473,7 @@ public void testOnAssignment(final Map parameterizedConfig) { verify(streamsMetadataState).onChange(eq(hostState), any(), topicPartitionInfoCaptor.capture()); verify(taskManager).handleAssignment(activeTasks, standbyTasks); + verify(taskManager).commit(Set.of(toCommit)); assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p0)); assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p3));