Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMet

@Override
public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem we would need to check if EOS is enabled or not? Or would we want to commit also for the ALOS case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Good point, no need to commit in ALOS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this go into TaskManager.handleAssignment.

.values()
.stream()
.filter(t -> t.commitNeeded())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check the right one? Honest question. Could be correct. Just not sure. -- Btw: We actually do verify commitNeeded() inside taskManager.commit anyway.

We when call taskManager.commit() base on regular commit interval, we are using .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) to find all tasks we want to commit.

On some other code path, we .filter(Task::isActive).

Note: the tricky thing about EOS is, that if a single task need to be committed, we need to make sure to commit all tasks, to not violate EOS...

🤔 -- Maybe @lucasbru can help on this question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Will the commit actually do something here in the classic protocol? I think rebalanceInProgress will be true when onAssignment is called, so the commit will be skipped, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point. I checked the code, and yes rebalancInProgress would be true, however, the call below uses taskManager.commit(...) directly which does not use this flag...

So we would try to execute the commit, but the commit should return with an error...? So we cannot really commit here from my understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why not tests are failing on this PR? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax and @lianetm thanks for the comments here. I did test this in our soak test, and the commit() did not fail. In fact it did help—deploying the fix allowed the soak to go from an unrecoverable state to finish restoration and be healthy again.

My reading of StreamThread.java led me to the same conclusion that Lianet came to, which is that the exception would bubble up to the call to poll().

The reasoning for this is that pollPhase() is inside runOnceWithProcessingThreads() which is inside all of the exception handling in runLoop().

I see what Matthias is saying: failing to do further exception handling will cause the StreamThread to crash. But if a commit fails, we have to close all tasks dirty anyways, which is extremely disruptive in EOS already; does losing and recreating a thread (couple hundred milliseconds perhaps) matter compared to wiping 100's of GB's of RocksDB state?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But onAssignment is called after "syncGroup Response", so from a GC POV the rebalance did complete already, and thus committing is allowed again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does make sense that the onAssignment() would only be called after the rest of the rebalance completes, and perhaps the Group Coordinator doesn't care to consider the rebalance as "ongoing" while the group members are handling onAssignment().

Anecdotally, the rebalance-latency-avg consumer metric was quite low (few hundred milliseconds) during the case when we had StreamThreads blocking for >20 seconds waiting for TaskManager to finish its assignment. (As a Kafka non-expert) I think that lends credence to the idea that the GC considers the rebalance to be "done" at this point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be a method reference

.collect(Collectors.toSet());
log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size());
taskManager.commit(tasksWithOpenTransactions);
Copy link
Member

@mjsax mjsax Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskManger.commit might throw exceptions. We need to handle them here. onAssignment() is called from within the KafkaConsumer which is not setup to handle any of these exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I thought the exceptions propagate up to the poll() method, which is handled inside the StreamThread? Or does this blow up and mess up the rebalance handling?

Copy link
Member

@mjsax mjsax Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it messes up the rebalancing and might not just cleanly bubble up to poll(). \cc @lianetm to confirm.

I know for sure, that we have error handling logic inside StreamsRebalanceListener#onPartitionsRevoked(), to avoid throwing any exception from the handler back into the consumer. I assume it's the same for the partition assignor at hand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we throw from onPartitionsRevoked as well. But we indeed try to finish as much as possible of the revocation before we rethrow the first exception. For KIP-1071, handleAssignment is called from onPartitionsAssigned so it will work fairly similarly.

Note also the comment below:

        // we do not capture any exceptions but just let the exception thrown from consumer.poll directly
        // since when stream thread captures it, either we close all tasks as dirty or we close thread

That sounds like it should do the right thing already?

But yes, we need to dig deep to make sure we do not break the error handling here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code here is right (as long as we do want to bubble up the commit failures, which I think we do).
No matter what we throw inside the onAssignment, it will bubble up as a KafkaException on the call to consumer.poll (same for all rebalance callbacks). The ConsumerCoordinator ensures that, so no worries here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank Lianet.

Atm, the KS code is not setup to handle an exception bubbling out of poll(). It would crash the whole StreamsThreads.

Also, if we fail here, we would not executed any of the assignment logic, so we need to make sure we don't get into corrupted internal state when we exit onAssignment with an error, w/o completing the assignment logic. Of course, if we let the thread crash, nothing to worry about 😂 -- but if we don't want to let StreamsThread just crash, we need to be very careful how to handle it.

The question is: if we cannot commit, do we really need to do an "abort everything" -- might depend on the commit error, too. In general, for EOS when a commit fails, many tasks are affected as we commit many tasks at ones, so a larger cleanup is required.


final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
partitions.sort(PARTITION_COMPARATOR);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,14 @@ public void testOnAssignment(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
taskManager = mock(TaskManager.class);

final Task notToCommit = mock(Task.class);
final Task toCommit = mock(Task.class);
final TaskId notToCommitId = new TaskId(0, 0);
final TaskId toCommitId = new TaskId(0, 1);
when(notToCommit.commitNeeded()).thenReturn(false);
when(toCommit.commitNeeded()).thenReturn(true);
when(taskManager.allOwnedTasks()).thenReturn(Map.of(notToCommitId, notToCommit, toCommitId, toCommit));

final Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(
new HostInfo("localhost", 9090),
Set.of(t3p0, t3p3));
Expand All @@ -1465,6 +1473,7 @@ public void testOnAssignment(final Map<String, Object> parameterizedConfig) {

verify(streamsMetadataState).onChange(eq(hostState), any(), topicPartitionInfoCaptor.capture());
verify(taskManager).handleAssignment(activeTasks, standbyTasks);
verify(taskManager).commit(Set.of(toCommit));

assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p0));
assertTrue(topicPartitionInfoCaptor.getValue().containsKey(t3p3));
Expand Down