Skip to content

Conversation

@timmartin-stripe
Copy link
Contributor

@timmartin-stripe timmartin-stripe commented Oct 24, 2025

Context

For many types of jobs, connection changes will be far less frequent
then route calls. Therefore, it can be quite expensive to
re-calculate the underlying connection routing on every call to route.
In some of our jobs at Stripe, the ConsistentHashingRouter was a limiting
factor in how many events/second/host we could process.

This update introduces a new router type ProactiveRouter. These
routers effectively subscribe to changes in the connections and are
responsible for maintaining the state of the connections when routing.
For example on the ConsistentHashingRouter this means that they can
just add or remove a connection from the map instead of re-calculating
everything from scratch.

I disabled this feature by default and required it to be explicitly
turned on.

Checklist

  • ./gradlew build compiles code correctly
  • Added new tests where applicable
  • ./gradlew test passes all tests
  • Extended README or added javadocs where applicable

For many types of jobs, connection changes will be far less frequent
then `route` calls.  Therefore, it can be quite expensive to
re-calculate the underlying connection routing on every call to `route`.

This update introduces a new router type `ProactiveRouter`.  These
routers effectively subscribe to changes in the connections and are
responsible for maintaining the state of the connections when routing.
For example on the `ConsistentHashingRouter` this means that they can
just add or remove a connection from the map instead of re-calculating
everything from scratch.

I disabled this feature by default and required it to be explicitly
turned on.  Additionally, I add some hooks to the RouterFactory (should
be the only breaking change) so that we can hook in our own custom
router factories for these connections.
@timmartin-stripe timmartin-stripe changed the title Timmartin/add mechanism for pro active connection management Add mechanism for pro-active routing connection management Oct 24, 2025
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ProactiveConsistentHashingRouter<K, V> implements ProactiveRouter<KeyValuePair<K, V>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the same as ConsistentHashingRouter except that does not recalculate the whole thing. It just makes the updates to add/remove a connection.

<T> Router<T> scalarStageToStageRouter(String name, final Func1<T, byte[]> toBytes);

default <T> ProactiveRouter<T> scalarStageToStageProactiveRouter(String name, final Func1<T, byte[]> toBytes) {
return new ProactiveRoundRobinRouter<>(name, toBytes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaults for backwards compatibility.

public static <K, V> Router<KeyValuePair<K, V>> consistentHashingLegacyTcpProtocol(String name,
final Func1<K, byte[]> keyEncoder,
final Func1<V, byte[]> valueEncoder) {
return new ConsistentHashingRouter<K, V>(name, new Func1<KeyValuePair<K, V>, byte[]>() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved to a static method on the factory to avoid circular dependencies.

private final Optional<ProactiveRouter<T>> router;

public ConnectionGroup(String groupId) {
public ConnectionGroup(String groupId, Optional<ProactiveRouter<T>> router) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: router -> routerO

public ProactiveRoundRobinRouter(String name, Func1<T, byte[]> encoder) {
this.encoder = encoder;
metrics = new Metrics.Builder()
.name("Router_" + name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefix metrix name with "ProactiveRoundRobin"?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or add a tag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a tag

this.connectionRepetitionOnRing = ringRepetitionPerConnection;
this.encoder = dataEncoder;
metrics = new Metrics.Builder()
.name("Router_" + name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefix metrix name with "ProactiveRoundRobin"?

@timmartin-stripe
Copy link
Contributor Author

@Andyz26 Thanks for the review! Made the requested updates.

@github-actions
Copy link

Test Results

154 files  + 2  154 suites  +2   8m 56s ⏱️ -35s
676 tests +15  665 ✅ +17  11 💤 ±0  0 ❌  - 2 
676 runs  +14  665 ✅ +16  11 💤 ±0  0 ❌  - 2 

Results for commit 4e8aeab. ± Comparison against base commit 48e024a.

return this;
}

public Config<K, T, R> shouldUseProactiveRouter(boolean useProactiveRouter) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall these names be consistent across stage config? e.g withProactiveRouter in keytokey.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants