-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19853: commit open tasks before handling assignment #20833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1458,6 +1458,14 @@ protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMet | |
|
|
||
| @Override | ||
| public void onAssignment(final Assignment assignment, final ConsumerGroupMetadata metadata) { | ||
| final Set<Task> tasksWithOpenTransactions = taskManager.allOwnedTasks() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this go into |
||
| .values() | ||
| .stream() | ||
| .filter(t -> t.commitNeeded()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this check the right one? Honest question. Could be correct. Just not sure. -- Btw: We actually do verify We when call On some other code path, we Note: the tricky thing about EOS is, that if a single task need to be committed, we need to make sure to commit all tasks, to not violate EOS... 🤔 -- Maybe @lucasbru can help on this question?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm a bit confused. Will the commit actually do something here in the classic protocol? I think rebalanceInProgress will be true when
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh good point. I checked the code, and yes So we would try to execute the commit, but the commit should return with an error...? So we cannot really commit here from my understanding.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering why not tests are failing on this PR? 🤔
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mjsax and @lianetm thanks for the comments here. I did test this in our soak test, and the My reading of The reasoning for this is that I see what Matthias is saying: failing to do further exception handling will cause the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it does make sense that the onAssignment() would only be called after the rest of the rebalance completes, and perhaps the Group Coordinator doesn't care to consider the rebalance as "ongoing" while the group members are handling onAssignment(). Anecdotally, the rebalance-latency-avg consumer metric was quite low (few hundred milliseconds) during the case when we had StreamThreads blocking for >20 seconds waiting for TaskManager to finish its assignment. (As a Kafka non-expert) I think that lends credence to the idea that the GC considers the rebalance to be "done" at this point.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can be a method reference |
||
| .collect(Collectors.toSet()); | ||
| log.info("Committing {} tasks with open transactions before onAssignment()", tasksWithOpenTransactions.size()); | ||
| taskManager.commit(tasksWithOpenTransactions); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I thought the exceptions propagate up to the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe it messes up the rebalancing and might not just cleanly bubble up to I know for sure, that we have error handling logic inside
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we throw from Note also the comment below: That sounds like it should do the right thing already? But yes, we need to dig deep to make sure we do not break the error handling here.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the code here is right (as long as we do want to bubble up the commit failures, which I think we do).
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank Lianet. Atm, the KS code is not setup to handle an exception bubbling out of Also, if we fail here, we would not executed any of the assignment logic, so we need to make sure we don't get into corrupted internal state when we exit The question is: if we cannot commit, do we really need to do an "abort everything" -- might depend on the commit error, too. In general, for EOS when a commit fails, many tasks are affected as we commit many tasks at ones, so a larger cleanup is required. |
||
|
|
||
| final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions()); | ||
| partitions.sort(PARTITION_COMPARATOR); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem we would need to check if EOS is enabled or not? Or would we want to commit also for the ALOS case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Good point, no need to commit in ALOS.