-
Notifications
You must be signed in to change notification settings - Fork 211
Add mechanism for pro-active routing connection management #802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add mechanism for pro-active routing connection management #802
Conversation
For many types of jobs, connection changes will be far less frequent then `route` calls. Therefore, it can be quite expensive to re-calculate the underlying connection routing on every call to `route`. This update introduces a new router type `ProactiveRouter`. These routers effectively subscribe to changes in the connections and are responsible for maintaining the state of the connections when routing. For example on the `ConsistentHashingRouter` this means that they can just add or remove a connection from the map instead of re-calculating everything from scratch. I disabled this feature by default and required it to be explicitly turned on. Additionally, I add some hooks to the RouterFactory (should be the only breaking change) so that we can hook in our own custom router factories for these connections.
| import java.util.concurrent.locks.ReadWriteLock; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
|
||
| public class ProactiveConsistentHashingRouter<K, V> implements ProactiveRouter<KeyValuePair<K, V>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically the same as ConsistentHashingRouter except that does not recalculate the whole thing. It just makes the updates to add/remove a connection.
| <T> Router<T> scalarStageToStageRouter(String name, final Func1<T, byte[]> toBytes); | ||
|
|
||
| default <T> ProactiveRouter<T> scalarStageToStageProactiveRouter(String name, final Func1<T, byte[]> toBytes) { | ||
| return new ProactiveRoundRobinRouter<>(name, toBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaults for backwards compatibility.
| public static <K, V> Router<KeyValuePair<K, V>> consistentHashingLegacyTcpProtocol(String name, | ||
| final Func1<K, byte[]> keyEncoder, | ||
| final Func1<V, byte[]> valueEncoder) { | ||
| return new ConsistentHashingRouter<K, V>(name, new Func1<KeyValuePair<K, V>, byte[]>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just moved to a static method on the factory to avoid circular dependencies.
…nection-management
| private final Optional<ProactiveRouter<T>> router; | ||
|
|
||
| public ConnectionGroup(String groupId) { | ||
| public ConnectionGroup(String groupId, Optional<ProactiveRouter<T>> router) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: router -> routerO
| public ProactiveRoundRobinRouter(String name, Func1<T, byte[]> encoder) { | ||
| this.encoder = encoder; | ||
| metrics = new Metrics.Builder() | ||
| .name("Router_" + name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefix metrix name with "ProactiveRoundRobin"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or add a tag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a tag
| this.connectionRepetitionOnRing = ringRepetitionPerConnection; | ||
| this.encoder = dataEncoder; | ||
| metrics = new Metrics.Builder() | ||
| .name("Router_" + name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefix metrix name with "ProactiveRoundRobin"?
|
@Andyz26 Thanks for the review! Made the requested updates. |
| return this; | ||
| } | ||
|
|
||
| public Config<K, T, R> shouldUseProactiveRouter(boolean useProactiveRouter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall these names be consistent across stage config? e.g withProactiveRouter in keytokey.
Context
For many types of jobs, connection changes will be far less frequent
then
routecalls. Therefore, it can be quite expensive tore-calculate the underlying connection routing on every call to
route.In some of our jobs at Stripe, the ConsistentHashingRouter was a limiting
factor in how many events/second/host we could process.
This update introduces a new router type
ProactiveRouter. Theserouters effectively subscribe to changes in the connections and are
responsible for maintaining the state of the connections when routing.
For example on the
ConsistentHashingRouterthis means that they canjust add or remove a connection from the map instead of re-calculating
everything from scratch.
I disabled this feature by default and required it to be explicitly
turned on.
Checklist
./gradlew buildcompiles code correctly./gradlew testpasses all tests