-
Notifications
You must be signed in to change notification settings - Fork 235
feat(client): add parallel execution control for receiver callbacks #1140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
flc1125
wants to merge
7
commits into
cloudevents:main
Choose a base branch
from
flc1125:parallel
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
a359f8d
upgrade-lint
duglin 77253b1
feat(client): add parallel execution control for receiver callbacks
flc1125 f4a396f
feat(client): add parallel execution control for receiver callbacks
flc1125 a9877b8
feat(client): add parallel execution control for receiver callbacks
flc1125 ce30137
Merge branch 'main' into parallel
flc1125 89496de
fix(options): validate number of parallel goroutines in WithParallelG…
flc1125 0c61762
revert: restore blocking configuration and functionality
flc1125 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am i reading this right... we don't support blocking any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the value of
WithParallelGoroutinesto1, which can achieve the same effect. Moreover, it will be more versatile.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see https://github.com/flc1125/cloudevents-sdk-go/blob/ce30137fe5dbc884c4860f2114622957b739c996/v2/client/options.go#L122
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may not matter but you're blocking at a different place now... if I'm reading this right.
Before it was: read -> block while processing
Now it's: read -> func(process)() -> read -> wait for a free spot to open in the channel
Right? This means that we're getting the next event BEFORE we're ready, where before we could only get one AFTER we were ready. Not sure it matters, but something to think about.
Also, we need testcases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, indeed. Given this situation, should we maintain the original approach or treat it as a breaking change?
I'd like to confirm our final approach so I can make some adjustments accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on whether calling:
before we're ready is risky or not. For example, if we get the next "msg" and then pause, and this entire for-loop stops. If another process tries to continue with the same responser/receiver, that one "msg" will be lost.
Abstractly it sounds like a bug. But in practice I don't know if the idea of continuing to get messages outside of the instance of this for-loop makes any sense. E.g. if we're pulling messages from a queue during the "receiver.Receive()" call, then I think this situation might occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make assumptions based on the logic of WithPollGoroutines. When there are multiple asynchronous fetches occurring simultaneously, it essentially means the scenario you just described is happening.
Can I understand it this way: event messages, when distributed across different processes (goroutines or others), inherently have the potential to encounter such situations? At the very least, the message currently being read would appear as "lost" or "non-existent" to other processes (goroutines or others).
Therefore, I believe this scenario might not be a cause for concern.
Of course, there is one extreme case where the user specifically desires to read messages "sequentially," meaning no other processes (goroutines or others) are allowed to handle the "next" message additionally.
However, I think this situation is similar to using blockingCallback, where the user intentionally makes such a choice. When this happens, it implies there is no scenario where another process (goroutine or other) is missing a message still waiting for the channel to become available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of things come to mind:
1 - I make a distinction between "lost a message because the processor of that message crashed", and "lost a message because it was pulled it from a queue and purposely never even tried to be processed". The former is kind of expected. The latter is a bug IMO. While the latter isn't 100% "purposely dropped it"... the logic comes close since we don't take into account the situation where this entire go process just dies while that message is "on hold". The queue it's being pulled from should be where it's "on hold".
2 - if we're pulling messages from a queue then this "on hold" message could be processed before the next message if there's another processor pulling messages from that queue. Now, I'm not suggesting that it's our job to try to synchronous things in such a scenario, however, I think someone would find it really odd that this "on hold" message might be delayed significantly before it's even started to be processed while the next message might be processed immediately.
3 - "this situation is similar to using blockingCallback"... exactly. We don't support "blocking" any more. At least not with the same "when are things pulled off of the original queue" semantics.
I'm curious to know @embano1's thoughts, but I'm leaning towards a "only pull the next message when we're ready" model, otherwise it kind of feels like a potential bug or we're introducing another queue into the flow... granted a queue of size one, but still a queue, that has zero persistence/resiliency to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. We need to maintain predictability for end users regarding events. Hiding certain event-related operations might make users feel things are beyond their control.
Perhaps we could slightly adjust the channel position—that might preserve the original logic. Of course, this is just my initial thought—I'll look into it more carefully when I have time to confirm.