-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19853: commit open tasks before handling assignment #20833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19853: commit open tasks before handling assignment #20833
Conversation
|
In an internal PR here, it was advised that I should handle Thanks @eduwercamacaro for pointing that out! |
@eduwercamacaro I looked further into the code of That is great news, because any @mjsax would you mind confirming this? |
|
|
||
| @Override | ||
| public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { | ||
| final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem we would need to check if EOS is enabled or not? Or would we want to commit also for the ALOS case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Good point, no need to commit in ALOS.
| .filter(t -> t.commitNeeded()) | ||
| .collect(Collectors.toSet()); | ||
| log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size()); | ||
| taskManager.commit(tasksWithOpenTransactions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskManger.commit might throw exceptions. We need to handle them here. onAssignment() is called from within the KafkaConsumer which is not setup to handle any of these exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I thought the exceptions propagate up to the poll() method, which is handled inside the StreamThread? Or does this blow up and mess up the rebalance handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it messes up the rebalancing and might not just cleanly bubble up to poll(). \cc @lianetm to confirm.
I know for sure, that we have error handling logic inside StreamsRebalanceListener#onPartitionsRevoked(), to avoid throwing any exception from the handler back into the consumer. I assume it's the same for the partition assignor at hand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we throw from onPartitionsRevoked as well. But we indeed try to finish as much as possible of the revocation before we rethrow the first exception. For KIP-1071, handleAssignment is called from onPartitionsAssigned so it will work fairly similarly.
Note also the comment below:
// we do not capture any exceptions but just let the exception thrown from consumer.poll directly
// since when stream thread captures it, either we close all tasks as dirty or we close thread
That sounds like it should do the right thing already?
But yes, we need to dig deep to make sure we do not break the error handling here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code here is right (as long as we do want to bubble up the commit failures, which I think we do).
No matter what we throw inside the onAssignment, it will bubble up as a KafkaException on the call to consumer.poll (same for all rebalance callbacks). The ConsumerCoordinator ensures that, so no worries here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank Lianet.
Atm, the KS code is not setup to handle an exception bubbling out of poll(). It would crash the whole StreamsThreads.
Also, if we fail here, we would not executed any of the assignment logic, so we need to make sure we don't get into corrupted internal state when we exit onAssignment with an error, w/o completing the assignment logic. Of course, if we let the thread crash, nothing to worry about 😂 -- but if we don't want to let StreamsThread just crash, we need to be very careful how to handle it.
The question is: if we cannot commit, do we really need to do an "abort everything" -- might depend on the commit error, too. In general, for EOS when a commit fails, many tasks are affected as we commit many tasks at ones, so a larger cleanup is required.
| final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks() | ||
| .values() | ||
| .stream() | ||
| .filter(t -> t.commitNeeded()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this check the right one? Honest question. Could be correct. Just not sure. -- Btw: We actually do verify commitNeeded() inside taskManager.commit anyway.
We when call taskManager.commit() base on regular commit interval, we are using .filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) to find all tasks we want to commit.
On some other code path, we .filter(Task::isActive).
Note: the tricky thing about EOS is, that if a single task need to be committed, we need to make sure to commit all tasks, to not violate EOS...
🤔 -- Maybe @lucasbru can help on this question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Will the commit actually do something here in the classic protocol? I think rebalanceInProgress will be true when onAssignment is called, so the commit will be skipped, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good point. I checked the code, and yes rebalancInProgress would be true, however, the call below uses taskManager.commit(...) directly which does not use this flag...
So we would try to execute the commit, but the commit should return with an error...? So we cannot really commit here from my understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why not tests are failing on this PR? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax and @lianetm thanks for the comments here. I did test this in our soak test, and the commit() did not fail. In fact it did help—deploying the fix allowed the soak to go from an unrecoverable state to finish restoration and be healthy again.
My reading of StreamThread.java led me to the same conclusion that Lianet came to, which is that the exception would bubble up to the call to poll().
The reasoning for this is that pollPhase() is inside runOnceWithProcessingThreads() which is inside all of the exception handling in runLoop().
I see what Matthias is saying: failing to do further exception handling will cause the StreamThread to crash. But if a commit fails, we have to close all tasks dirty anyways, which is extremely disruptive in EOS already; does losing and recreating a thread (couple hundred milliseconds perhaps) matter compared to wiping 100's of GB's of RocksDB state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But onAssignment is called after "syncGroup Response", so from a GC POV the rebalance did complete already, and thus committing is allowed again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does make sense that the onAssignment() would only be called after the rest of the rebalance completes, and perhaps the Group Coordinator doesn't care to consider the rebalance as "ongoing" while the group members are handling onAssignment().
Anecdotally, the rebalance-latency-avg consumer metric was quite low (few hundred milliseconds) during the case when we had StreamThreads blocking for >20 seconds waiting for TaskManager to finish its assignment. (As a Kafka non-expert) I think that lends credence to the idea that the GC considers the rebalance to be "done" at this point.
mjsax
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed on the ticket. I am overall not a fan of committing, as it seems to be a workaround, not addressing the actual root cause.
Why would increasing transaction.timeout.ms not help to address the issue?
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments. I think we may need to understand better how the error handling works.
I agree that it would be nice to explore higher values of transaction timeout to address this problem.
|
|
||
| @Override | ||
| public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { | ||
| final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this go into TaskManager.handleAssignment.
| .filter(t -> t.commitNeeded()) | ||
| .collect(Collectors.toSet()); | ||
| log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size()); | ||
| taskManager.commit(tasksWithOpenTransactions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we throw from onPartitionsRevoked as well. But we indeed try to finish as much as possible of the revocation before we rethrow the first exception. For KIP-1071, handleAssignment is called from onPartitionsAssigned so it will work fairly similarly.
Note also the comment below:
// we do not capture any exceptions but just let the exception thrown from consumer.poll directly
// since when stream thread captures it, either we close all tasks as dirty or we close thread
That sounds like it should do the right thing already?
But yes, we need to dig deep to make sure we do not break the error handling here.
| final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks() | ||
| .values() | ||
| .stream() | ||
| .filter(t -> t.commitNeeded()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Will the commit actually do something here in the classic protocol? I think rebalanceInProgress will be true when onAssignment is called, so the commit will be skipped, no?
| final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks() | ||
| .values() | ||
| .stream() | ||
| .filter(t -> t.commitNeeded()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can be a method reference
Under stress with active restorations going on, the
StateUpdater#runOnce()method can block on write stalls. This causesthe
StreamThreadto block onTaskManager#handleAssignment()in theconsumer rebalance callback. This is because
TaskManager#handleAssignment()waits for a future on the StateUpdater. If rocksdb is stalling, which is very common during restoration
or when processing a warmup task, that future can take some time to show
up.
This blocking can cause transaction timeouts in EOS, which is
disruptive. This commit mitigates that issue by committing any open
transactions before blocking on the State Updater.