Skip to content

Conversation

@coltmcnealy-lh
Copy link
Contributor

@coltmcnealy-lh coltmcnealy-lh commented Nov 5, 2025

Under stress with active restorations going on, the
StateUpdater#runOnce() method can block on write stalls. This causes
the StreamThread to block on TaskManager#handleAssignment() in the
consumer rebalance callback. This is because
TaskManager#handleAssignment() waits for a future on the State
Updater. If rocksdb is stalling, which is very common during restoration
or when processing a warmup task, that future can take some time to show
up.

This blocking can cause transaction timeouts in EOS, which is
disruptive. This commit mitigates that issue by committing any open
transactions before blocking on the State Updater.

@github-actions github-actions bot added triage PRs from the community streams small Small PRs labels Nov 5, 2025
@coltmcnealy-lh
Copy link
Contributor Author

In an internal PR here, it was advised that I should handle TaskMigratedException. However, I'm not sure the best way to do that. Could anyone who knows this code better advise please? I will poke around but would appreciate help.

Thanks @eduwercamacaro for pointing that out!

@coltmcnealy-lh
Copy link
Contributor Author

In an internal PR here, it was advised that I should handle TaskMigratedException. However, I'm not sure the best way to do that. Could anyone who knows this code better advise please? I will poke around but would appreciate help.

Thanks @eduwercamacaro for pointing that out!

@eduwercamacaro I looked further into the code of StreamThread.java. It turns out that there isn't any special exception handling done in the runOnceWithProcessingThreads() nor the runOnceWithoutProcessingThreads() when they call maybeCommit() which calls TaskManager#commit(). The TaskMigratedException is handled further up the stack.

That is great news, because any TaskMigratedExceptions thrown by where I call TaskManager#commit() would be handled by the same place that those exceptions are handled.

@mjsax would you mind confirming this?

@mjsax mjsax changed the title fix(streams): commit open tasks before handling assignment KAFKA-19853: commit open tasks before handling assignment Nov 8, 2025
@mjsax mjsax added ci-approved and removed triage PRs from the community labels Nov 8, 2025

@Override
public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem we would need to check if EOS is enabled or not? Or would we want to commit also for the ALOS case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Good point, no need to commit in ALOS.

.filter(t -> t.commitNeeded())
.collect(Collectors.toSet());
log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size());
taskManager.commit(tasksWithOpenTransactions);
Copy link
Member

@mjsax mjsax Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskManger.commit might throw exceptions. We need to handle them here. onAssignment() is called from within the KafkaConsumer which is not setup to handle any of these exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I thought the exceptions propagate up to the poll() method, which is handled inside the StreamThread? Or does this blow up and mess up the rebalance handling?

Copy link
Member

@mjsax mjsax Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it messes up the rebalancing and might not just cleanly bubble up to poll(). \cc @lianetm to confirm.

I know for sure, that we have error handling logic inside StreamsRebalanceListener#onPartitionsRevoked(), to avoid throwing any exception from the handler back into the consumer. I assume it's the same for the partition assignor at hand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we throw from onPartitionsRevoked as well. But we indeed try to finish as much as possible of the revocation before we rethrow the first exception. For KIP-1071, handleAssignment is called from onPartitionsAssigned so it will work fairly similarly.

Note also the comment below:

        // we do not capture any exceptions but just let the exception thrown from consumer.poll directly
        // since when stream thread captures it, either we close all tasks as dirty or we close thread

That sounds like it should do the right thing already?

But yes, we need to dig deep to make sure we do not break the error handling here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code here is right (as long as we do want to bubble up the commit failures, which I think we do).
No matter what we throw inside the onAssignment, it will bubble up as a KafkaException on the call to consumer.poll (same for all rebalance callbacks). The ConsumerCoordinator ensures that, so no worries here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank Lianet.

Atm, the KS code is not setup to handle an exception bubbling out of poll(). It would crash the whole StreamsThreads.

Also, if we fail here, we would not executed any of the assignment logic, so we need to make sure we don't get into corrupted internal state when we exit onAssignment with an error, w/o completing the assignment logic. Of course, if we let the thread crash, nothing to worry about 😂 -- but if we don't want to let StreamsThread just crash, we need to be very careful how to handle it.

The question is: if we cannot commit, do we really need to do an "abort everything" -- might depend on the commit error, too. In general, for EOS when a commit fails, many tasks are affected as we commit many tasks at ones, so a larger cleanup is required.

final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
.values()
.stream()
.filter(t -> t.commitNeeded())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check the right one? Honest question. Could be correct. Just not sure. -- Btw: We actually do verify commitNeeded() inside taskManager.commit anyway.

We when call taskManager.commit() base on regular commit interval, we are using .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) to find all tasks we want to commit.

On some other code path, we .filter(Task::isActive).

Note: the tricky thing about EOS is, that if a single task need to be committed, we need to make sure to commit all tasks, to not violate EOS...

🤔 -- Maybe @lucasbru can help on this question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Will the commit actually do something here in the classic protocol? I think rebalanceInProgress will be true when onAssignment is called, so the commit will be skipped, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point. I checked the code, and yes rebalancInProgress would be true, however, the call below uses taskManager.commit(...) directly which does not use this flag...

So we would try to execute the commit, but the commit should return with an error...? So we cannot really commit here from my understanding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why not tests are failing on this PR? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax and @lianetm thanks for the comments here. I did test this in our soak test, and the commit() did not fail. In fact it did help—deploying the fix allowed the soak to go from an unrecoverable state to finish restoration and be healthy again.

My reading of StreamThread.java led me to the same conclusion that Lianet came to, which is that the exception would bubble up to the call to poll().

The reasoning for this is that pollPhase() is inside runOnceWithProcessingThreads() which is inside all of the exception handling in runLoop().

I see what Matthias is saying: failing to do further exception handling will cause the StreamThread to crash. But if a commit fails, we have to close all tasks dirty anyways, which is extremely disruptive in EOS already; does losing and recreating a thread (couple hundred milliseconds perhaps) matter compared to wiping 100's of GB's of RocksDB state?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But onAssignment is called after "syncGroup Response", so from a GC POV the rebalance did complete already, and thus committing is allowed again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does make sense that the onAssignment() would only be called after the rest of the rebalance completes, and perhaps the Group Coordinator doesn't care to consider the rebalance as "ongoing" while the group members are handling onAssignment().

Anecdotally, the rebalance-latency-avg consumer metric was quite low (few hundred milliseconds) during the case when we had StreamThreads blocking for >20 seconds waiting for TaskManager to finish its assignment. (As a Kafka non-expert) I think that lends credence to the idea that the GC considers the rebalance to be "done" at this point.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on the ticket. I am overall not a fan of committing, as it seems to be a workaround, not addressing the actual root cause.

Why would increasing transaction.timeout.ms not help to address the issue?

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments. I think we may need to understand better how the error handling works.

I agree that it would be nice to explore higher values of transaction timeout to address this problem.


@Override
public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) {
final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this go into TaskManager.handleAssignment.

.filter(t -> t.commitNeeded())
.collect(Collectors.toSet());
log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size());
taskManager.commit(tasksWithOpenTransactions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we throw from onPartitionsRevoked as well. But we indeed try to finish as much as possible of the revocation before we rethrow the first exception. For KIP-1071, handleAssignment is called from onPartitionsAssigned so it will work fairly similarly.

Note also the comment below:

        // we do not capture any exceptions but just let the exception thrown from consumer.poll directly
        // since when stream thread captures it, either we close all tasks as dirty or we close thread

That sounds like it should do the right thing already?

But yes, we need to dig deep to make sure we do not break the error handling here.

final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
.values()
.stream()
.filter(t -> t.commitNeeded())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused. Will the commit actually do something here in the classic protocol? I think rebalanceInProgress will be true when onAssignment is called, so the commit will be skipped, no?

final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks()
.values()
.stream()
.filter(t -> t.commitNeeded())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can be a method reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants