From 652265613abea0a2793f4a9df772c5228956032b Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Mon, 13 Oct 2025 18:08:06 -0400 Subject: [PATCH 1/9] Add more efficient mechanism for managing connections For many types of jobs, connection changes will be far less frequent then `route` calls. Therefore, it can be quite expensive to re-calculate the underlying connection routing on every call to `route`. This update introduces a new router type `ProactiveRouter`. These routers effectively subscribe to changes in the connections and are responsible for maintaining the state of the connections when routing. For example on the `ConsistentHashingRouter` this means that they can just add or remove a connection from the map instead of re-calculating everything from scratch. I disabled this feature by default and required it to be explicitly turned on. Additionally, I add some hooks to the RouterFactory (should be the only breaking change) so that we can hook in our own custom router factories for these connections. --- .../io/mantisrx/common/SystemParameters.java | 3 + .../mantis/network/push/ChunkProcessor.java | 7 +- .../mantis/network/push/ConnectionGroup.java | 40 +++++- .../network/push/ConnectionManager.java | 45 +++++-- .../network/push/GroupChunkProcessor.java | 8 +- .../ProactiveConsistentHashingRouter.java | 119 ++++++++++++++++++ .../push/ProactiveRoundRobinRouter.java | 77 ++++++++++++ .../mantis/network/push/ProactiveRouter.java | 15 +++ .../mantis/network/push/PushServer.java | 4 +- .../mantis/network/push/RouterFactory.java | 6 +- .../mantis/network/push/Routers.java | 51 +++++--- .../mantis/network/push/ServerConfig.java | 15 ++- .../WorkerPublisherRemoteObservable.java | 19 ++- .../runtime/parameter/ParameterUtils.java | 14 +++ .../runtime/sink/ServerSentEventsSink.java | 22 +++- 15 files changed, 390 insertions(+), 55 deletions(-) create mode 100644 mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java create mode 100644 mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java create mode 100644 mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRouter.java diff --git a/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java b/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java index 5ca293d51..dc99a295d 100644 --- a/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java +++ b/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java @@ -27,6 +27,9 @@ public final class SystemParameters { public static final String JOB_WORKER_HEARTBEAT_INTERVAL_SECS = "mantis.job.worker.heartbeat.interval.secs"; public static final String JOB_WORKER_TIMEOUT_SECS = "mantis.job.worker.timeout.secs"; + public static final String W2W_USE_PROACTIVE_ROUTER = "mantis.w2w.useProactiveRouter"; + public static final String SSE_USE_PROACTIVE_ROUTER = "mantis.sse.useProactiveRouter"; + public static final String JOB_AUTOSCALE_V2_ENABLED_PARAM = "mantis.job.autoscale.v2.enabled"; public static final String JOB_AUTOSCALE_V2_LOADER_CONFIG_PARAM = "mantis.job.autoscale.v2.loader.config"; diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java index bde053a5d..331106453 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java @@ -20,15 +20,14 @@ public class ChunkProcessor { - - protected Router router; + protected Router fallbackRouter; public ChunkProcessor(Router router) { - this.router = router; + this.fallbackRouter = router; } public void process(ConnectionManager connectionManager, List chunks) { - router.route(connectionManager.connections(), chunks); + connectionManager.route(chunks, this.fallbackRouter); } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java index 74d2c79d9..4b8c84c9c 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java @@ -23,14 +23,16 @@ import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.spectator.GaugeCallback; import io.mantisrx.common.metrics.spectator.MetricGroupId; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; + +import java.util.*; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; import rx.functions.Func0; +import rx.functions.Func1; +import rx.subjects.BehaviorSubject; public class ConnectionGroup { @@ -43,10 +45,12 @@ public class ConnectionGroup { private Counter successfulWrites; private Counter numSlotSwaps; private Counter failedWrites; + private final ProactiveRouter router; - public ConnectionGroup(String groupId) { + public ConnectionGroup(String groupId, ProactiveRouter router) { this.groupId = groupId; this.connections = new HashMap<>(); + this.router = router; final String grpId = Optional.ofNullable(groupId).orElse("none"); final BasicTag groupIdTag = new BasicTag(MantisMetricStringConstants.GROUP_ID_TAG, grpId); @@ -93,6 +97,9 @@ public synchronized void removeConnection(AsyncConnection connection) { + " a new connection has already been swapped in the place of the old connection"); } + if (this.router != null) { + this.router.removeConnection(connection); + } } public synchronized void addConnection(AsyncConnection connection) { @@ -107,6 +114,19 @@ public synchronized void addConnection(AsyncConnection connection) { previousConnection.close(); numSlotSwaps.increment(); } + if (this.router != null) { + this.router.addConnection(connection); + } + } + + public void close() { + if (this.router != null) { + try { + this.router.close(); + } catch (Exception e) { + logger.warn("Error closing router for group " + groupId + ": " + e.getMessage(), e); + } + } } public synchronized boolean isEmpty() { @@ -132,4 +152,12 @@ public String toString() { return "ConnectionGroup [groupId=" + groupId + ", connections=" + connections + "]"; } + + public void route(List chunks, Router fallbackRouter) { + if (router == null) { + fallbackRouter.route(this.getConnections(), chunks); + return; + } + this.router.route(chunks); + } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java index 0d14fe320..5c5272e30 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java @@ -20,18 +20,21 @@ import io.mantisrx.common.metrics.MetricsRegistry; import io.mantisrx.common.metrics.spectator.GaugeCallback; import io.mantisrx.common.metrics.spectator.MetricGroupId; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; + +import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rx.Observable; import rx.functions.Action0; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.subjects.BehaviorSubject; +import rx.subjects.PublishSubject; public class ConnectionManager { @@ -46,12 +49,21 @@ public class ConnectionManager { private Action0 doOnZeroConnections; private Lock connectionState = new ReentrantLock(); private AtomicBoolean subscribed = new AtomicBoolean(); + private final Func1> routerFactory; + private final ProactiveRouter defaultRouter; public ConnectionManager(MetricsRegistry metricsRegistry, - Action0 doOnFirstConnection, Action0 doOnZeroConnections) { + Action0 doOnFirstConnection, + Action0 doOnZeroConnections, + Func1> routerFactory) { + this.routerFactory = routerFactory; this.doOnFirstConnection = doOnFirstConnection; this.doOnZeroConnections = doOnZeroConnections; this.metricsRegistry = metricsRegistry; + this.defaultRouter = routerFactory.call("default"); + if (this.defaultRouter != null) { + metricsRegistry.registerAndGet(this.defaultRouter.getMetrics()); + } } private int activeConnections() { @@ -119,14 +131,21 @@ protected void add(AsyncConnection connection) { String groupId = connection.getGroupId(); ConnectionGroup current = managedConnections.get(groupId); if (current == null) { - ConnectionGroup newGroup = new ConnectionGroup(groupId); + ProactiveRouter router = routerFactory.call(groupId); + ConnectionGroup newGroup = new ConnectionGroup(groupId, router); current = managedConnections.putIfAbsent(groupId, newGroup); if (current == null) { current = newGroup; metricsRegistry.registerAndGet(current.getMetrics()); + if (router != null) { + metricsRegistry.registerAndGet(router.getMetrics()); + } } } current.addConnection(connection); + if (this.defaultRouter != null) { + this.defaultRouter.addConnection(connection); + } logger.debug("Connection added to group: " + groupId + ", connection: " + connection + ", group: " + current); } finally { connectionState.unlock(); @@ -153,8 +172,12 @@ protected void remove(AsyncConnection connection) { metricsRegistry.remove(current.getMetricsGroup()); // remove group managedConnections.remove(groupId); + current.close(); } } + if (this.defaultRouter != null) { + this.defaultRouter.removeConnection(connection); + } } finally { connectionState.unlock(); } @@ -188,4 +211,12 @@ public Map> groups() { connectionState.unlock(); } } + + public void route(List chunks, Router fallbackRouter) { + if (this.defaultRouter != null) { + this.defaultRouter.route(chunks); + } else { + fallbackRouter.route(this.connections(), chunks); + } + } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java index 8216424e5..d30ec6ac5 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java @@ -21,17 +21,15 @@ public class GroupChunkProcessor extends ChunkProcessor { - - public GroupChunkProcessor(Router router) { - super(router); + public GroupChunkProcessor(Router fallbackRouter) { + super(fallbackRouter); } @Override public void process(ConnectionManager connectionManager, List chunks) { Map> groups = connectionManager.groups(); for (ConnectionGroup group : groups.values()) { - router.route(group.getConnections(), chunks); + group.route(chunks, fallbackRouter); } } - } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java new file mode 100644 index 000000000..47ffa987a --- /dev/null +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java @@ -0,0 +1,119 @@ +package io.reactivex.mantis.network.push; + +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.functions.Func1; + +import java.util.*; + +public class ProactiveConsistentHashingRouter implements ProactiveRouter> { + private static final Logger logger = LoggerFactory.getLogger(ProactiveConsistentHashingRouter.class); + private static final int connectionRepetitionOnRing = 1000; + + protected final Func1, byte[]> encoder; + protected final Counter numEventsRouted; + protected final Counter numEventsProcessed; + protected final Counter numConnectionUpdates; + protected final Metrics metrics; + private final HashFunction hashFunction; + private final NavigableMap>> ring = new TreeMap<>(); + + public ProactiveConsistentHashingRouter( + String name, + Func1, byte[]> dataEncoder, + HashFunction hashFunction) { + this.encoder = dataEncoder; + metrics = new Metrics.Builder() + .name("Router_" + name) + .addCounter("numEventsRouted") + .addCounter("numEventsProcessed") + .addCounter("numConnectionUpdates") + .build(); + numEventsRouted = metrics.getCounter("numEventsRouted"); + numEventsProcessed = metrics.getCounter("numEventsProcessed"); + numConnectionUpdates = metrics.getCounter("numConnectionUpdates"); + this.hashFunction = hashFunction; + } + + @Override + public synchronized void route(List> chunks) { + if (ring.isEmpty() || chunks == null || chunks.isEmpty()) { + return; + } + + int numConnections = ring.size() / connectionRepetitionOnRing; + int bufferCapacity = (chunks.size() / numConnections) + 1; // assume even distribution + Map>, List> writes = new HashMap<>(numConnections); + + // process chunks + for (KeyValuePair kvp : chunks) { + long hash = kvp.getKeyBytesHashed(); + // lookup slot + AsyncConnection> connection = lookupConnection(hash); + // add to writes + Func1, Boolean> predicate = connection.getPredicate(); + if (predicate == null || predicate.call(kvp)) { + List buffer = writes.computeIfAbsent(connection, k -> new ArrayList<>(bufferCapacity)); + buffer.add(encoder.call(kvp)); + } + } + + // process writes + if (!writes.isEmpty()) { + for (Map.Entry>, List> entry : writes.entrySet()) { + AsyncConnection> connection = entry.getKey(); + List toWrite = entry.getValue(); + connection.write(toWrite); + numEventsRouted.increment(toWrite.size()); + } + } + } + + @Override + public synchronized void addConnection(AsyncConnection> connection) { + List hashCollisions = new ArrayList<>(); + for (int i = 0; i < connectionRepetitionOnRing; i++) { + // hash node on ring + String connectionId = connection.getSlotId(); + if (connectionId == null) { + throw new IllegalStateException("Connection must specify an id for consistent hashing"); + } + byte[] connectionBytes = (connectionId + "-" + i).getBytes(); + long hash = hashFunction.computeHash(connectionBytes); + if (ring.containsKey(hash)) { + hashCollisions.add(connectionId + "-" + i); + } + ring.put(hash, connection); + } + if (!hashCollisions.isEmpty()) { + logger.error("Hash collisions detected when adding connection {}: {}", connection.getSlotId(), hashCollisions); + } + } + + @Override + public synchronized void removeConnection(AsyncConnection> connection) { + for (int i = 0; i < connectionRepetitionOnRing; i++) { + // hash node on ring + String connectionId = connection.getSlotId(); + if (connectionId == null) { + throw new IllegalStateException("Connection must specify an id for consistent hashing"); + } + + byte[] connectionBytes = (connectionId + "-" + i).getBytes(); + long hash = hashFunction.computeHash(connectionBytes); + ring.remove(hash); + } + } + + @Override + public Metrics getMetrics() { + return metrics; + } + + private AsyncConnection> lookupConnection(long hash) { + Map.Entry>> connection = ring.ceilingEntry(hash); + return (connection == null ? ring.firstEntry() : connection).getValue(); + } +} diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java new file mode 100644 index 000000000..5127e78da --- /dev/null +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java @@ -0,0 +1,77 @@ +package io.reactivex.mantis.network.push; + +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import rx.functions.Func1; + +import java.util.*; + +public class ProactiveRoundRobinRouter implements ProactiveRouter { + private final List> connections = new ArrayList<>(); + private int currentIndex = 0; + + protected Func1 encoder; + protected final Counter numEventsRouted; + protected final Counter numEventsProcessed; + protected final Counter numConnectionUpdates; + protected final Metrics metrics; + + public ProactiveRoundRobinRouter(String name, Func1 encoder) { + this.encoder = encoder; + metrics = new Metrics.Builder() + .name("Router_" + name) + .addCounter("numEventsRouted") + .addCounter("numEventsProcessed") + .addCounter("numConnectionUpdates") + .build(); + numEventsRouted = metrics.getCounter("numEventsRouted"); + numEventsProcessed = metrics.getCounter("numEventsProcessed"); + numConnectionUpdates = metrics.getCounter("numConnectionUpdates"); + } + + @Override + public synchronized void addConnection(AsyncConnection connection) { + // We do not need to shuffle because we are constantly looping through + connections.add(connection); + } + + @Override + public synchronized void removeConnection(AsyncConnection connection) { + connections.remove(connection); + } + + @Override + public synchronized void route(List chunks) { + if (connections.isEmpty() || chunks == null || chunks.isEmpty()) { + return; + } + numEventsProcessed.increment(chunks.size()); + Map, List> writes = new HashMap<>(); + int arrayListSize = chunks.size() / connections.size() + 1; // assume even distribution + Iterator> iter = connections.iterator(); + // process chunks + for (T chunk : chunks) { + AsyncConnection connection = connections.get(currentIndex++ % connections.size()); + Func1 predicate = connection.getPredicate(); + if (predicate == null || predicate.call(chunk)) { + List buffer = writes.get(connection); + if (buffer == null) { + buffer = new ArrayList<>(arrayListSize); + writes.put(connection, buffer); + } + buffer.add(encoder.call(chunk)); + } + } + for (Map.Entry, List> entry : writes.entrySet()) { + AsyncConnection connection = entry.getKey(); + List toWrite = entry.getValue(); + connection.write(toWrite); + numEventsRouted.increment(toWrite.size()); + } + } + + @Override + public Metrics getMetrics() { + return metrics; + } +} diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRouter.java new file mode 100644 index 000000000..18f72d92c --- /dev/null +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRouter.java @@ -0,0 +1,15 @@ +package io.reactivex.mantis.network.push; + +import io.mantisrx.common.metrics.Metrics; + +import java.util.List; + +public interface ProactiveRouter { + void route(List chunks); + + void addConnection(AsyncConnection connection); + + void removeConnection(AsyncConnection connection); + + Metrics getMetrics(); +} diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java index c9ba2326f..485416e99 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java @@ -108,7 +108,7 @@ public void call() { final MetricGroupId metricsGroup = new MetricGroupId("PushServer", idTag); // manager will auto add metrics for connection groups connectionManager = new ConnectionManager(metricsRegistry, doOnFirstConnection, - doOnZeroConnections); + doOnZeroConnections, config.getRouterFactory()); int numQueueProcessingThreads = config.getNumQueueConsumers(); @@ -155,7 +155,7 @@ public void call() { registerMetrics(metricsRegistry, serverMetrics, consumerThreads.getMetrics(), outboundBuffer.getMetrics(), trigger.getMetrics(), - config.getChunkProcessor().router.getMetrics()); + config.getChunkProcessor().fallbackRouter.getMetrics()); port = config.getPort(); writeRetryCount = config.getWriteRetryCount(); diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java index 6fc97c9e8..bf159bd8e 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java @@ -3,5 +3,9 @@ import rx.functions.Func1; public interface RouterFactory { - public Router scalarStageToStageRouter(String name, final Func1 toBytes); + Router scalarStageToStageRouter(String name, final Func1 toBytes); + + ProactiveRouter scalarStageToStageProactiveRouter(String name, final Func1 toBytes); + + ProactiveRouter> keyedStageToStageProactiveRouter(String name, Func1 keyEncoder, Func1 valueEncoder); } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java index 7e8592fba..c48cd4e9a 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java @@ -34,23 +34,30 @@ public Routers() {} public static Router> consistentHashingLegacyTcpProtocol(String name, final Func1 keyEncoder, final Func1 valueEncoder) { - return new ConsistentHashingRouter(name, new Func1, byte[]>() { - @Override - public byte[] call(KeyValuePair kvp) { - byte[] keyBytes = kvp.getKeyBytes(); - byte[] valueBytes = valueEncoder.call(kvp.getValue()); - return - // length + opcode + notification type + key length - ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) - .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length - .put((byte) 1) // opcode - .put((byte) 1) // notification type - .putInt(keyBytes.length) // key length - .put(keyBytes) // key bytes - .put(valueBytes) // value bytes - .array(); - } - }, HashFunctions.xxh3()); + return new ConsistentHashingRouter(name, consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); + } + + public static ProactiveRouter> proactiveConsistentHashingLegacyTcpProtocol(String name, + final Func1 keyEncoder, + final Func1 valueEncoder) { + return new ProactiveConsistentHashingRouter<>(name, consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); + } + + private static Func1, byte[]> consistentHashingEncoder(final Func1 valueEncoder) { + return kvp -> { + byte[] keyBytes = kvp.getKeyBytes(); + byte[] valueBytes = valueEncoder.call(kvp.getValue()); + return + // length + opcode + notification type + key length + ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) + .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length + .put((byte) 1) // opcode + .put((byte) 1) // notification type + .putInt(keyBytes.length) // key length + .put(keyBytes) // key bytes + .put(valueBytes) // value bytes + .array(); + }; } private static byte[] dataPayload(byte[] data) { @@ -124,4 +131,14 @@ public static Func1 string() { public Router scalarStageToStageRouter(String name, Func1 toBytes) { return roundRobinLegacyTcpProtocol(name, toBytes); } + + @Override + public ProactiveRouter scalarStageToStageProactiveRouter(String name, Func1 toBytes) { + return new ProactiveRoundRobinRouter<>(name, toBytes); + } + + @Override + public ProactiveRouter> keyedStageToStageProactiveRouter(String name, Func1 keyEncoder, Func1 valueEncoder) { + return proactiveConsistentHashingLegacyTcpProtocol(name, keyEncoder, valueEncoder); + } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java index 1c3088dda..ad39bcbbd 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java @@ -36,6 +36,7 @@ public class ServerConfig { private MetricsRegistry metricsRegistry; // registry used to store metrics private Func1>, Func1> predicate; private boolean useSpscQueue = false; + private final Func1> routerFactory; public ServerConfig(Builder builder) { this.name = builder.name; @@ -50,6 +51,7 @@ public ServerConfig(Builder builder) { this.predicate = builder.predicate; this.useSpscQueue = builder.useSpscQueue; this.maxNotWritableTimeSec = builder.maxNotWritableTimeSec; + this.routerFactory = builder.routerFactory; } public Func1>, Func1> getPredicate() { @@ -100,6 +102,10 @@ public boolean useSpscQueue() { return useSpscQueue; } + public Func1> getRouterFactory() { + return this.routerFactory; + } + public static class Builder { private String name; @@ -114,6 +120,7 @@ public static class Builder { private MetricsRegistry metricsRegistry; // registry used to store metrics private Func1>, Func1> predicate; private boolean useSpscQueue = false; + private Func1> routerFactory = (String groupId) -> null; public Builder predicate(Func1>, Func1> predicate) { this.predicate = predicate; @@ -170,13 +177,13 @@ public Builder groupRouter(Router router) { return this; } - public Builder router(Router router) { - this.chunkProcessor = new ChunkProcessor<>(router); + public Builder metricsRegistry(MetricsRegistry metricsRegistry) { + this.metricsRegistry = metricsRegistry; return this; } - public Builder metricsRegistry(MetricsRegistry metricsRegistry) { - this.metricsRegistry = metricsRegistry; + public Builder proactiveRouterFactory(Func1> routerFactory) { + this.routerFactory = routerFactory; return this; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 73092c926..150a61ed1 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -39,6 +39,8 @@ import rx.Observable; import rx.functions.Func1; +import static io.mantisrx.common.SystemParameters.W2W_USE_PROACTIVE_ROUTER; + /** * Execution of WorkerPublisher that publishes the stream to the next stage. * @@ -86,12 +88,17 @@ public void start(final StageConfig stage, Observable> toSer Func1 encoder = t1 -> stage.getOutputCodec().encode(t1); Router router = this.routerFactory.scalarStageToStageRouter(name, encoder); + Func1> proactiveFactory = (String k) -> null; + if (useProactiveRouters()) { + proactiveFactory = (String name) -> routerFactory.scalarStageToStageProactiveRouter(name, encoder); + } ServerConfig config = new ServerConfig.Builder() .name(name) .port(serverPort) .metricsRegistry(MetricsRegistry.getInstance()) - .router(router) + .groupRouter(router) + .proactiveRouterFactory(proactiveFactory) .build(); final LegacyTcpPushServer modernServer = PushServers.infiniteStreamLegacyTcpNested(config, toServe); @@ -144,7 +151,9 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS .maxChunkTimeMSec(maxChunkTimeMSec()) .bufferCapacity(bufferCapacity()) .useSpscQueue(useSpsc()) - .router(Routers.consistentHashingLegacyTcpProtocol(jobName, keyEncoder, valueEncoder)) + .groupRouter(Routers.consistentHashingLegacyTcpProtocol(jobName, keyEncoder, valueEncoder)) + .proactiveRouterFactory(useProactiveRouters() ? + routerName -> routerFactory.keyedStageToStageProactiveRouter(routerName, keyEncoder, valueEncoder) : (k) -> null) .build(); if (stage instanceof ScalarToGroup || stage instanceof GroupToGroup) { @@ -158,10 +167,14 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS HashFunctions.xxh3()); } + private boolean useProactiveRouters() { + String stringValue = propService.getStringValue(W2W_USE_PROACTIVE_ROUTER, "false"); + return Boolean.parseBoolean(stringValue); + } + private boolean useSpsc() { String stringValue = propService.getStringValue("mantis.w2w.spsc", "false"); return Boolean.parseBoolean(stringValue); - } private int bufferCapacity() { diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java index 23d357edf..2a31778b7 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java @@ -94,6 +94,20 @@ public class ParameterUtils { .build(); systemParams.put(w2wtoKeyThreads.getName(), w2wtoKeyThreads); + ParameterDefinition w2wUseProactiveRouter = new BooleanParameter() + .name(W2W_USE_PROACTIVE_ROUTER) + .description("Whether to use proactive routing for worker to worker connections") + .defaultValue(false) + .build(); + systemParams.put(w2wUseProactiveRouter.getName(), w2wUseProactiveRouter); + + ParameterDefinition sseUseProactiveRouter = new BooleanParameter() + .name(SSE_USE_PROACTIVE_ROUTER) + .description("Whether to use proactive routing for SSE connections") + .defaultValue(false) + .build(); + systemParams.put(sseUseProactiveRouter.getName(), sseUseProactiveRouter); + // mantis.sse.bufferCapacity 25000 ParameterDefinition sseBuffer = new IntParameter() diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java index 71c36504a..6d74c28a9 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java @@ -25,11 +25,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOption; import io.netty.channel.WriteBufferWaterMark; -import io.reactivex.mantis.network.push.PushServerSse; -import io.reactivex.mantis.network.push.PushServers; -import io.reactivex.mantis.network.push.Routers; -import io.reactivex.mantis.network.push.ServerConfig; -import io.reactivex.mantis.network.push.Router; +import io.reactivex.mantis.network.push.*; + import java.io.IOException; import java.util.List; import java.util.Map; @@ -58,6 +55,7 @@ public class ServerSentEventsSink implements SelfDocumentingSink { private int port = -1; private final MantisPropertiesLoader propService; private final Router router; + private Func1> proactiveRouterFactory = (String routerName) -> null; private PushServerSse pushServerSse; private HttpServer httpServer; @@ -90,6 +88,7 @@ public ServerSentEventsSink(Func1 encoder) { this.subscribeProcessor = builder.subscribeProcessor; this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); this.router = builder.router; + this.proactiveRouterFactory = builder.proactiveRouterFactory; } @Override @@ -147,6 +146,11 @@ private boolean useSpsc() { return Boolean.parseBoolean(useSpsc); } + private boolean useProactiveRouter() { + String useProactiveRouter = propService.getStringValue("mantis.sse.useProactiveRouter", "false"); + return Boolean.parseBoolean(useProactiveRouter); + } + @Override public void call(Context context, PortRequest portRequest, final Observable observable) { port = portRequest.getPort(); @@ -165,7 +169,8 @@ public void call(Context context, PortRequest portRequest, final Observable o .numQueueConsumers(numConsumerThreads()) .useSpscQueue(useSpsc()) .maxChunkTimeMSec(getBatchInterval()) - .maxNotWritableTimeSec(maxNotWritableTimeSec()); + .maxNotWritableTimeSec(maxNotWritableTimeSec()) + .proactiveRouterFactory(proactiveRouterFactory); if (predicate != null) { config.predicate(predicate.getPredicate()); } @@ -254,6 +259,7 @@ public static class Builder { private Predicate predicate; private Func2>, Context, Void> subscribeProcessor; private Router router; + private Func1> proactiveRouterFactory = (String routerName) -> null;; public Builder withEncoder(Func1 encoder) { this.encoder = encoder; @@ -291,6 +297,10 @@ public Builder withRouter(Router router) { return this; } + public void withProactiveRouterFactory(Func1> proactiveRouterFactory) { + this.proactiveRouterFactory = proactiveRouterFactory; + } + public ServerSentEventsSink build() { return new ServerSentEventsSink<>(this); } From 899f8104b5c5cd51fff711ac374fd43d67dd70ef Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Thu, 23 Oct 2025 11:56:02 -0400 Subject: [PATCH 2/9] add some tests --- .../mantis/network/push/ConnectionGroup.java | 14 - .../network/push/ConnectionManager.java | 1 - .../ProactiveConsistentHashingRouterTest.java | 321 ++++++++++++++ .../push/ProactiveRoundRobinRouterTest.java | 413 ++++++++++++++++++ 4 files changed, 734 insertions(+), 15 deletions(-) create mode 100644 mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java create mode 100644 mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java index 4b8c84c9c..a044064bc 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java @@ -25,14 +25,10 @@ import io.mantisrx.common.metrics.spectator.MetricGroupId; import java.util.*; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Observable; import rx.functions.Func0; -import rx.functions.Func1; -import rx.subjects.BehaviorSubject; public class ConnectionGroup { @@ -119,16 +115,6 @@ public synchronized void addConnection(AsyncConnection connection) { } } - public void close() { - if (this.router != null) { - try { - this.router.close(); - } catch (Exception e) { - logger.warn("Error closing router for group " + groupId + ": " + e.getMessage(), e); - } - } - } - public synchronized boolean isEmpty() { return connections.isEmpty(); } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java index 5c5272e30..1fb2a57ad 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java @@ -172,7 +172,6 @@ protected void remove(AsyncConnection connection) { metricsRegistry.remove(current.getMetricsGroup()); // remove group managedConnections.remove(groupId); - current.close(); } } if (this.defaultRouter != null) { diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java new file mode 100644 index 000000000..b9fefad35 --- /dev/null +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java @@ -0,0 +1,321 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.mantis.network.push; + +import io.mantisrx.common.metrics.Metrics; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import rx.subjects.PublishSubject; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class ProactiveConsistentHashingRouterTest { + + private ProactiveConsistentHashingRouter router; + private HashFunction hashFunction; + private List connections; + + @BeforeEach + public void setup() { + hashFunction = HashFunctions.xxh3(); + router = new ProactiveConsistentHashingRouter<>("test-router", + kvp -> kvp.getValue().getBytes(), hashFunction); + connections = new ArrayList<>(); + } + + @Test + public void testAddConnection() { + TestAsyncConnection connection = createConnection("slot-1"); + + router.addConnection(connection); + + // Route some data to verify connection was added + List> data = createTestData("key1", "value1"); + router.route(data); + + assertTrue(connection.getWrittenData().size() > 0); + } + + @Test + public void testRemoveConnection() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Remove first connection + router.removeConnection(connection1); + + // Route data - should only go to connection2 + List> data = createTestData("key1", "value1"); + router.route(data); + + assertEquals(0, connection1.getWrittenData().size()); + assertTrue(connection2.getWrittenData().size() > 0); + } + + @Test + public void testRouteWithNoConnections() { + List> data = createTestData("key1", "value1"); + + // Should not throw exception + assertDoesNotThrow(() -> router.route(data)); + } + + @Test + public void testRouteWithNullData() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + // Should not throw exception + assertDoesNotThrow(() -> router.route(null)); + assertEquals(0, connection.getWrittenData().size()); + } + + @Test + public void testRouteWithEmptyData() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + router.route(Collections.emptyList()); + + assertEquals(0, connection.getWrittenData().size()); + } + + @Test + public void testConsistentHashing() { + // Add multiple connections + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + TestAsyncConnection connection3 = createConnection("slot-3"); + + router.addConnection(connection1); + router.addConnection(connection2); + router.addConnection(connection3); + + // Route the same key multiple times - should always go to same connection + String key = "consistent-key"; + for (int i = 0; i < 5; i++) { + List> data = createTestData(key, "value" + i); + router.route(data); + } + + // Find which connection received the data + TestAsyncConnection targetConnection = null; + for (TestAsyncConnection conn : Arrays.asList(connection1, connection2, connection3)) { + if (conn.getWrittenData().size() > 0) { + if (targetConnection == null) { + targetConnection = conn; + } else { + // Should only be one connection receiving data for this key + fail("Data was routed to multiple connections for the same key"); + } + } + } + + assertNotNull(targetConnection); + assertEquals(5, targetConnection.getWrittenData().size()); + } + + @Test + public void testDistributionAcrossMultipleConnections() { + // Add multiple connections + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + TestAsyncConnection connection3 = createConnection("slot-3"); + + router.addConnection(connection1); + router.addConnection(connection2); + router.addConnection(connection3); + + // Route many different keys + List> data = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + String key = "key-" + i; + long hash = hashFunction.computeHash(key.getBytes()); + data.add(new KeyValuePair<>(hash, key.getBytes(), "value-" + i)); + } + router.route(data); + + // Verify all connections received some data (probabilistically should happen with 100 keys) + int totalRouted = connection1.getWrittenData().size() + + connection2.getWrittenData().size() + + connection3.getWrittenData().size(); + + assertEquals(100, totalRouted); + assertTrue(connection1.getWrittenData().size() > 0); + assertTrue(connection2.getWrittenData().size() > 0); + assertTrue(connection3.getWrittenData().size() > 0); + } + + @Test + public void testRouteWithPredicate() { + // Create connection with predicate that filters out certain values + TestAsyncConnection connection = createConnection("slot-1", + kvp -> kvp.getValue().startsWith("accept")); + + router.addConnection(connection); + + // Route data - some should be filtered out + List> data = new ArrayList<>(); + data.add(createKeyValuePair("key1", "accept-value1")); + data.add(createKeyValuePair("key2", "reject-value2")); + data.add(createKeyValuePair("key3", "accept-value3")); + + router.route(data); + + // Only 2 values should have been routed + assertEquals(2, connection.getWrittenData().size()); + } + + @Test + public void testAddConnectionWithNullSlotId() { + PublishSubject> subject = PublishSubject.create(); + AsyncConnection> connection = + new AsyncConnection<>("host", 1234, "id1", null, "group1", subject, null); + + assertThrows(IllegalStateException.class, () -> router.addConnection(connection)); + } + + @Test + public void testRemoveConnectionWithNullSlotId() { + PublishSubject> subject = PublishSubject.create(); + AsyncConnection> connection = + new AsyncConnection<>("host", 1234, "id1", null, "group1", subject, null); + + assertThrows(IllegalStateException.class, () -> router.removeConnection(connection)); + } + + @Test + public void testMetrics() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + Metrics metrics = router.getMetrics(); + assertNotNull(metrics); + assertEquals("Router_test-router", metrics.getMetricGroupId().id()); + + // Route some data + List> data = createTestData("key1", "value1"); + router.route(data); + + // Verify metrics are updated + assertTrue(metrics.getCounter("numEventsRouted").value() > 0); + } + + @Test + public void testMultipleDataItemsInSingleRoute() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Route multiple items at once + List> data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add(createKeyValuePair("key-" + i, "value-" + i)); + } + + router.route(data); + + int totalRouted = connection1.getWrittenData().size() + connection2.getWrittenData().size(); + assertEquals(10, totalRouted); + } + + @Test + public void testThreadSafety() throws InterruptedException { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + // Create multiple threads that route data concurrently + List threads = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int threadNum = i; + Thread thread = new Thread(() -> { + for (int j = 0; j < 10; j++) { + List> data = + createTestData("key-" + threadNum + "-" + j, "value-" + threadNum + "-" + j); + router.route(data); + } + }); + threads.add(thread); + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify all data was routed + assertEquals(100, connection.getWrittenData().size()); + } + + // Helper methods + + private TestAsyncConnection createConnection(String slotId) { + return createConnection(slotId, null); + } + + private TestAsyncConnection createConnection(String slotId, + rx.functions.Func1, Boolean> predicate) { + TestAsyncConnection connection = new TestAsyncConnection(slotId, predicate); + connections.add(connection); + return connection; + } + + private List> createTestData(String key, String value) { + List> data = new ArrayList<>(); + data.add(createKeyValuePair(key, value)); + return data; + } + + private KeyValuePair createKeyValuePair(String key, String value) { + long hash = hashFunction.computeHash(key.getBytes()); + return new KeyValuePair<>(hash, key.getBytes(), value); + } + + // Test helper class + private static class TestAsyncConnection extends AsyncConnection> { + private final List writtenData = new ArrayList<>(); + private int writeCalls = 0; + + public TestAsyncConnection(String slotId, + rx.functions.Func1, Boolean> predicate) { + super("test-host", 1234, "id-" + slotId, slotId, "test-group", + PublishSubject.create(), predicate); + } + + @Override + public void write(List data) { + writeCalls++; + writtenData.addAll(data); + } + + public List getWrittenData() { + return writtenData; + } + + public int getWriteCalls() { + return writeCalls; + } + } +} diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java new file mode 100644 index 000000000..dea255730 --- /dev/null +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java @@ -0,0 +1,413 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.mantis.network.push; + +import io.mantisrx.common.metrics.Metrics; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import rx.subjects.PublishSubject; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class ProactiveRoundRobinRouterTest { + + private ProactiveRoundRobinRouter router; + private List connections; + + @BeforeEach + public void setup() { + router = new ProactiveRoundRobinRouter<>("test-router", data -> data.getBytes()); + connections = new ArrayList<>(); + } + + @Test + public void testAddConnection() { + TestAsyncConnection connection = createConnection("slot-1"); + + router.addConnection(connection); + + // Route some data to verify connection was added + List data = Arrays.asList("test-value"); + router.route(data); + + assertEquals(1, connection.getWrittenData().size()); + } + + @Test + public void testRemoveConnection() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Remove first connection + router.removeConnection(connection1); + + // Route data - should only go to connection2 + List data = Arrays.asList("test-value"); + router.route(data); + + assertEquals(0, connection1.getWrittenData().size()); + assertEquals(1, connection2.getWrittenData().size()); + } + + @Test + public void testRouteWithNoConnections() { + List data = Arrays.asList("test-value"); + + // Should not throw exception + assertDoesNotThrow(() -> router.route(data)); + } + + @Test + public void testRouteWithNullData() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + // Should not throw exception + assertDoesNotThrow(() -> router.route(null)); + assertEquals(0, connection.getWrittenData().size()); + } + + @Test + public void testRouteWithEmptyData() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + router.route(Collections.emptyList()); + + assertEquals(0, connection.getWrittenData().size()); + } + + @Test + public void testRoundRobinDistribution() { + // Add 3 connections + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + TestAsyncConnection connection3 = createConnection("slot-3"); + + router.addConnection(connection1); + router.addConnection(connection2); + router.addConnection(connection3); + + // Route 9 items - should be evenly distributed + List data = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + data.add("value-" + i); + } + router.route(data); + + // Each connection should receive 3 items + assertEquals(3, connection1.getWrittenData().size()); + assertEquals(3, connection2.getWrittenData().size()); + assertEquals(3, connection3.getWrittenData().size()); + } + + @Test + public void testRoundRobinOrder() { + // Add 2 connections + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Route items one at a time to verify alternating pattern + for (int i = 0; i < 4; i++) { + router.route(Arrays.asList("value-" + i)); + } + + // Should alternate between connections + assertEquals(2, connection1.getWrittenData().size()); + assertEquals(2, connection2.getWrittenData().size()); + } + + @Test + public void testUnevenDistribution() { + // Add 3 connections + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + TestAsyncConnection connection3 = createConnection("slot-3"); + + router.addConnection(connection1); + router.addConnection(connection2); + router.addConnection(connection3); + + // Route 10 items - should be distributed as 4, 3, 3 or similar + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add("value-" + i); + } + router.route(data); + + int total = connection1.getWrittenData().size() + + connection2.getWrittenData().size() + + connection3.getWrittenData().size(); + + assertEquals(10, total); + + // No connection should be empty + assertTrue(connection1.getWrittenData().size() > 0); + assertTrue(connection2.getWrittenData().size() > 0); + assertTrue(connection3.getWrittenData().size() > 0); + } + + @Test + public void testSingleConnection() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add("value-" + i); + } + router.route(data); + + // All items should go to the single connection + assertEquals(10, connection.getWrittenData().size()); + } + + @Test + public void testRouteWithPredicate() { + // Create connection with predicate that filters out certain values + TestAsyncConnection connection = createConnection("slot-1", + data -> data.startsWith("accept")); + + router.addConnection(connection); + + // Route data - some should be filtered out + List data = Arrays.asList("accept-value1", "reject-value2", "accept-value3"); + router.route(data); + + // Only 2 values should have been routed + assertEquals(2, connection.getWrittenData().size()); + } + + @Test + public void testPredicateAcrossMultipleConnections() { + // Create connections with different predicates + TestAsyncConnection connection1 = createConnection("slot-1", + data -> data.contains("even")); + TestAsyncConnection connection2 = createConnection("slot-2", + data -> data.contains("odd")); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Route data + List data = Arrays.asList("value-even-0", "value-odd-1", "value-even-2", "value-odd-3"); + router.route(data); + + // Connection 1 should get even values, connection 2 should get odd values + assertEquals(1, connection1.getWrittenData().size()); // value-even-0 + assertEquals(1, connection2.getWrittenData().size()); // value-odd-1 + } + + @Test + public void testMetrics() { + TestAsyncConnection connection = createConnection("slot-1"); + router.addConnection(connection); + + Metrics metrics = router.getMetrics(); + assertNotNull(metrics); + assertEquals("Router_test-router", metrics.getMetricGroupId().id()); + + // Route some data + List data = Arrays.asList("value1", "value2", "value3"); + router.route(data); + + // Verify metrics are updated + assertEquals(3, metrics.getCounter("numEventsProcessed").value()); + assertEquals(3, metrics.getCounter("numEventsRouted").value()); + } + + @Test + public void testMultipleRoutes() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // First route + router.route(Arrays.asList("value1", "value2")); + + // Second route + router.route(Arrays.asList("value3", "value4")); + + // Verify distribution continues across routes + int total = connection1.getWrittenData().size() + connection2.getWrittenData().size(); + assertEquals(4, total); + } + + @Test + public void testIndexWraparound() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Route many items to test index wraparound + List data = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + data.add("value-" + i); + } + router.route(data); + + // Should be evenly distributed + assertEquals(50, connection1.getWrittenData().size()); + assertEquals(50, connection2.getWrittenData().size()); + } + + @Test + public void testConnectionsAddedAfterRouting() { + TestAsyncConnection connection1 = createConnection("slot-1"); + router.addConnection(connection1); + + // Route some data + router.route(Arrays.asList("value1", "value2")); + assertEquals(2, connection1.getWrittenData().size()); + + // Add another connection + TestAsyncConnection connection2 = createConnection("slot-2"); + router.addConnection(connection2); + + // Route more data - should now distribute across both + router.route(Arrays.asList("value3", "value4", "value5", "value6")); + + assertTrue(connection2.getWrittenData().size() > 0); + } + + @Test + public void testThreadSafety() throws InterruptedException { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Create multiple threads that route data concurrently + List threads = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final int threadNum = i; + Thread thread = new Thread(() -> { + for (int j = 0; j < 10; j++) { + router.route(Arrays.asList("value-" + threadNum + "-" + j)); + } + }); + threads.add(thread); + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify all data was routed + int total = connection1.getWrittenData().size() + connection2.getWrittenData().size(); + assertEquals(100, total); + } + + @Test + public void testRemoveConnectionDuringRouting() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + TestAsyncConnection connection3 = createConnection("slot-3"); + + router.addConnection(connection1); + router.addConnection(connection2); + router.addConnection(connection3); + + // Route some data + router.route(Arrays.asList("value1", "value2", "value3")); + + // Remove middle connection + router.removeConnection(connection2); + + // Route more data + router.route(Arrays.asList("value4", "value5", "value6", "value7")); + + // Connection2 should not have received any data after removal + assertEquals(1, connection2.getWrittenData().size()); // Only from first route + } + + @Test + public void testBatchedWrites() { + TestAsyncConnection connection1 = createConnection("slot-1"); + TestAsyncConnection connection2 = createConnection("slot-2"); + + router.addConnection(connection1); + router.addConnection(connection2); + + // Route multiple items in one call + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + data.add("value-" + i); + } + router.route(data); + + // Each connection should receive its items in a single write call + // (5 items per connection in one batch) + assertEquals(1, connection1.getWriteCalls()); + assertEquals(1, connection2.getWriteCalls()); + assertEquals(5, connection1.getWrittenData().size()); + assertEquals(5, connection2.getWrittenData().size()); + } + + // Helper methods + + private TestAsyncConnection createConnection(String slotId) { + return createConnection(slotId, null); + } + + private TestAsyncConnection createConnection(String slotId, + rx.functions.Func1 predicate) { + TestAsyncConnection connection = new TestAsyncConnection(slotId, predicate); + connections.add(connection); + return connection; + } + + // Test helper class + private static class TestAsyncConnection extends AsyncConnection { + private final List writtenData = new ArrayList<>(); + private int writeCalls = 0; + + public TestAsyncConnection(String slotId, rx.functions.Func1 predicate) { + super("test-host", 1234, "id-" + slotId, slotId, "test-group", + PublishSubject.create(), predicate); + } + + @Override + public void write(List data) { + writeCalls++; + writtenData.addAll(data); + } + + public List getWrittenData() { + return writtenData; + } + + public int getWriteCalls() { + return writeCalls; + } + } +} From 1f860b06dd85e31384b924fce2b52f1ce331b1af Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Thu, 23 Oct 2025 19:23:45 -0400 Subject: [PATCH 3/9] move to using Optional instead of checking null --- .../mantis/network/push/ConnectionGroup.java | 21 ++++------ .../network/push/ConnectionManager.java | 40 ++++++------------- .../push/ProactiveRoundRobinRouter.java | 4 +- .../mantis/network/push/ServerConfig.java | 10 +++-- .../runtime/sink/ServerSentEventsSink.java | 9 +++-- 5 files changed, 35 insertions(+), 49 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java index a044064bc..997ad1b40 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java @@ -41,9 +41,9 @@ public class ConnectionGroup { private Counter successfulWrites; private Counter numSlotSwaps; private Counter failedWrites; - private final ProactiveRouter router; + private final Optional> router; - public ConnectionGroup(String groupId, ProactiveRouter router) { + public ConnectionGroup(String groupId, Optional> router) { this.groupId = groupId; this.connections = new HashMap<>(); this.router = router; @@ -93,9 +93,7 @@ public synchronized void removeConnection(AsyncConnection connection) { + " a new connection has already been swapped in the place of the old connection"); } - if (this.router != null) { - this.router.removeConnection(connection); - } + this.router.ifPresent(router -> router.removeConnection(connection)); } public synchronized void addConnection(AsyncConnection connection) { @@ -110,9 +108,7 @@ public synchronized void addConnection(AsyncConnection connection) { previousConnection.close(); numSlotSwaps.increment(); } - if (this.router != null) { - this.router.addConnection(connection); - } + this.router.ifPresent(router -> router.addConnection(connection)); } public synchronized boolean isEmpty() { @@ -140,10 +136,9 @@ public String toString() { } public void route(List chunks, Router fallbackRouter) { - if (router == null) { - fallbackRouter.route(this.getConnections(), chunks); - return; - } - this.router.route(chunks); + this.router.ifPresentOrElse( + router -> router.route(chunks), + () -> fallbackRouter.route(this.getConnections(), chunks) + ); } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java index 1fb2a57ad..7f2ffb5d0 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java @@ -22,19 +22,14 @@ import io.mantisrx.common.metrics.spectator.MetricGroupId; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Observable; import rx.functions.Action0; import rx.functions.Func1; -import rx.functions.Func2; -import rx.subjects.BehaviorSubject; -import rx.subjects.PublishSubject; public class ConnectionManager { @@ -49,21 +44,19 @@ public class ConnectionManager { private Action0 doOnZeroConnections; private Lock connectionState = new ReentrantLock(); private AtomicBoolean subscribed = new AtomicBoolean(); - private final Func1> routerFactory; - private final ProactiveRouter defaultRouter; + private final Func1>> routerFactory; + private final Optional> defaultRouter; public ConnectionManager(MetricsRegistry metricsRegistry, Action0 doOnFirstConnection, Action0 doOnZeroConnections, - Func1> routerFactory) { + Func1>> routerFactory) { this.routerFactory = routerFactory; this.doOnFirstConnection = doOnFirstConnection; this.doOnZeroConnections = doOnZeroConnections; this.metricsRegistry = metricsRegistry; this.defaultRouter = routerFactory.call("default"); - if (this.defaultRouter != null) { - metricsRegistry.registerAndGet(this.defaultRouter.getMetrics()); - } + this.defaultRouter.ifPresent(router -> metricsRegistry.registerAndGet(router.getMetrics())); } private int activeConnections() { @@ -131,21 +124,17 @@ protected void add(AsyncConnection connection) { String groupId = connection.getGroupId(); ConnectionGroup current = managedConnections.get(groupId); if (current == null) { - ProactiveRouter router = routerFactory.call(groupId); - ConnectionGroup newGroup = new ConnectionGroup(groupId, router); + Optional> groupRouter = routerFactory.call(groupId); + ConnectionGroup newGroup = new ConnectionGroup(groupId, groupRouter); current = managedConnections.putIfAbsent(groupId, newGroup); if (current == null) { current = newGroup; metricsRegistry.registerAndGet(current.getMetrics()); - if (router != null) { - metricsRegistry.registerAndGet(router.getMetrics()); - } + groupRouter.ifPresent(router -> metricsRegistry.registerAndGet(router.getMetrics())); } } current.addConnection(connection); - if (this.defaultRouter != null) { - this.defaultRouter.addConnection(connection); - } + this.defaultRouter.ifPresent(router -> router.addConnection(connection)); logger.debug("Connection added to group: " + groupId + ", connection: " + connection + ", group: " + current); } finally { connectionState.unlock(); @@ -174,9 +163,7 @@ protected void remove(AsyncConnection connection) { managedConnections.remove(groupId); } } - if (this.defaultRouter != null) { - this.defaultRouter.removeConnection(connection); - } + this.defaultRouter.ifPresent(router -> router.removeConnection(connection)); } finally { connectionState.unlock(); } @@ -212,10 +199,9 @@ public Map> groups() { } public void route(List chunks, Router fallbackRouter) { - if (this.defaultRouter != null) { - this.defaultRouter.route(chunks); - } else { - fallbackRouter.route(this.connections(), chunks); - } + this.defaultRouter.ifPresentOrElse( + router -> router.route(chunks), + () -> fallbackRouter.route(this.connections(), chunks) + ); } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java index 5127e78da..42549c43f 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java @@ -48,10 +48,10 @@ public synchronized void route(List chunks) { numEventsProcessed.increment(chunks.size()); Map, List> writes = new HashMap<>(); int arrayListSize = chunks.size() / connections.size() + 1; // assume even distribution - Iterator> iter = connections.iterator(); // process chunks for (T chunk : chunks) { - AsyncConnection connection = connections.get(currentIndex++ % connections.size()); + currentIndex = (currentIndex + 1) % connections.size(); + AsyncConnection connection = connections.get(currentIndex); Func1 predicate = connection.getPredicate(); if (predicate == null || predicate.call(chunk)) { List buffer = writes.get(connection); diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java index ad39bcbbd..d234614d5 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ServerConfig.java @@ -19,6 +19,8 @@ import io.mantisrx.common.metrics.MetricsRegistry; import java.util.List; import java.util.Map; +import java.util.Optional; + import rx.functions.Func1; @@ -36,7 +38,7 @@ public class ServerConfig { private MetricsRegistry metricsRegistry; // registry used to store metrics private Func1>, Func1> predicate; private boolean useSpscQueue = false; - private final Func1> routerFactory; + private final Func1>> routerFactory; public ServerConfig(Builder builder) { this.name = builder.name; @@ -102,7 +104,7 @@ public boolean useSpscQueue() { return useSpscQueue; } - public Func1> getRouterFactory() { + public Func1>> getRouterFactory() { return this.routerFactory; } @@ -120,7 +122,7 @@ public static class Builder { private MetricsRegistry metricsRegistry; // registry used to store metrics private Func1>, Func1> predicate; private boolean useSpscQueue = false; - private Func1> routerFactory = (String groupId) -> null; + private Func1>> routerFactory = (String groupId) -> Optional.empty(); public Builder predicate(Func1>, Func1> predicate) { this.predicate = predicate; @@ -182,7 +184,7 @@ public Builder metricsRegistry(MetricsRegistry metricsRegistry) { return this; } - public Builder proactiveRouterFactory(Func1> routerFactory) { + public Builder proactiveRouterFactory(Func1>> routerFactory) { this.routerFactory = routerFactory; return this; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java index 6d74c28a9..c0869eedc 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; + import mantis.io.reactivex.netty.RxNetty; import mantis.io.reactivex.netty.pipeline.PipelineConfigurators; import mantis.io.reactivex.netty.protocol.http.server.HttpServer; @@ -55,7 +57,7 @@ public class ServerSentEventsSink implements SelfDocumentingSink { private int port = -1; private final MantisPropertiesLoader propService; private final Router router; - private Func1> proactiveRouterFactory = (String routerName) -> null; + private Func1>> proactiveRouterFactory = (String routerName) -> Optional.empty(); private PushServerSse pushServerSse; private HttpServer httpServer; @@ -259,7 +261,7 @@ public static class Builder { private Predicate predicate; private Func2>, Context, Void> subscribeProcessor; private Router router; - private Func1> proactiveRouterFactory = (String routerName) -> null;; + private Func1>> proactiveRouterFactory = (String routerName) -> Optional.empty();; public Builder withEncoder(Func1 encoder) { this.encoder = encoder; @@ -297,8 +299,9 @@ public Builder withRouter(Router router) { return this; } - public void withProactiveRouterFactory(Func1> proactiveRouterFactory) { + public Builder withProactiveRouterFactory(Func1>> proactiveRouterFactory) { this.proactiveRouterFactory = proactiveRouterFactory; + return this; } public ServerSentEventsSink build() { From 2bac27fef4e0eb95b8d4c5b73eca5d2a02c53266 Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Fri, 24 Oct 2025 11:25:39 -0400 Subject: [PATCH 4/9] clean up and test fixes --- .../mantis/network/push/ChunkProcessor.java | 13 +- .../network/push/ConnectionManager.java | 25 -- .../network/push/GroupChunkProcessor.java | 6 +- .../ProactiveConsistentHashingRouter.java | 107 +++++--- .../push/ProactiveRoundRobinRouter.java | 9 +- .../mantis/network/push/PushServer.java | 7 +- .../mantis/network/push/RouterFactory.java | 31 ++- .../mantis/network/push/Routers.java | 35 +-- .../ProactiveConsistentHashingRouterTest.java | 155 ++--------- .../push/ProactiveRoundRobinRouterTest.java | 251 +----------------- .../mantis/network/push/TimedChunkerTest.java | 3 +- .../WorkerPublisherRemoteObservable.java | 17 +- 12 files changed, 151 insertions(+), 508 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java index 331106453..3f09d6e06 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ChunkProcessor.java @@ -19,15 +19,6 @@ import java.util.List; -public class ChunkProcessor { - protected Router fallbackRouter; - - public ChunkProcessor(Router router) { - this.fallbackRouter = router; - } - - public void process(ConnectionManager connectionManager, List chunks) { - connectionManager.route(chunks, this.fallbackRouter); - } - +public interface ChunkProcessor { + void process(ConnectionManager connectionManager, List chunks); } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java index 7f2ffb5d0..252cc7081 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java @@ -45,7 +45,6 @@ public class ConnectionManager { private Lock connectionState = new ReentrantLock(); private AtomicBoolean subscribed = new AtomicBoolean(); private final Func1>> routerFactory; - private final Optional> defaultRouter; public ConnectionManager(MetricsRegistry metricsRegistry, Action0 doOnFirstConnection, @@ -55,8 +54,6 @@ public ConnectionManager(MetricsRegistry metricsRegistry, this.doOnFirstConnection = doOnFirstConnection; this.doOnZeroConnections = doOnZeroConnections; this.metricsRegistry = metricsRegistry; - this.defaultRouter = routerFactory.call("default"); - this.defaultRouter.ifPresent(router -> metricsRegistry.registerAndGet(router.getMetrics())); } private int activeConnections() { @@ -134,7 +131,6 @@ protected void add(AsyncConnection connection) { } } current.addConnection(connection); - this.defaultRouter.ifPresent(router -> router.addConnection(connection)); logger.debug("Connection added to group: " + groupId + ", connection: " + connection + ", group: " + current); } finally { connectionState.unlock(); @@ -163,7 +159,6 @@ protected void remove(AsyncConnection connection) { managedConnections.remove(groupId); } } - this.defaultRouter.ifPresent(router -> router.removeConnection(connection)); } finally { connectionState.unlock(); } @@ -176,19 +171,6 @@ protected void remove(AsyncConnection connection) { } } - public Set> connections() { - connectionState.lock(); - try { - Set> connections = new HashSet<>(); - for (ConnectionGroup group : managedConnections.values()) { - connections.addAll(group.getConnections()); - } - return connections; - } finally { - connectionState.unlock(); - } - } - public Map> groups() { connectionState.lock(); try { @@ -197,11 +179,4 @@ public Map> groups() { connectionState.unlock(); } } - - public void route(List chunks, Router fallbackRouter) { - this.defaultRouter.ifPresentOrElse( - router -> router.route(chunks), - () -> fallbackRouter.route(this.connections(), chunks) - ); - } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java index d30ec6ac5..f9586388e 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/GroupChunkProcessor.java @@ -20,9 +20,11 @@ import java.util.Map; -public class GroupChunkProcessor extends ChunkProcessor { +public class GroupChunkProcessor implements ChunkProcessor { + protected Router fallbackRouter; + public GroupChunkProcessor(Router fallbackRouter) { - super(fallbackRouter); + this.fallbackRouter = fallbackRouter; } @Override diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java index 47ffa987a..0c3b8fb34 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java @@ -7,6 +7,8 @@ import rx.functions.Func1; import java.util.*; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class ProactiveConsistentHashingRouter implements ProactiveRouter> { private static final Logger logger = LoggerFactory.getLogger(ProactiveConsistentHashingRouter.class); @@ -19,6 +21,7 @@ public class ProactiveConsistentHashingRouter implements ProactiveRouter>> ring = new TreeMap<>(); + private final ReadWriteLock ringLock = new ReentrantReadWriteLock(); public ProactiveConsistentHashingRouter( String name, @@ -38,29 +41,40 @@ public ProactiveConsistentHashingRouter( } @Override - public synchronized void route(List> chunks) { - if (ring.isEmpty() || chunks == null || chunks.isEmpty()) { + public void route(List> chunks) { + if (chunks == null || chunks.isEmpty()) { return; } - int numConnections = ring.size() / connectionRepetitionOnRing; - int bufferCapacity = (chunks.size() / numConnections) + 1; // assume even distribution - Map>, List> writes = new HashMap<>(numConnections); - - // process chunks - for (KeyValuePair kvp : chunks) { - long hash = kvp.getKeyBytesHashed(); - // lookup slot - AsyncConnection> connection = lookupConnection(hash); - // add to writes - Func1, Boolean> predicate = connection.getPredicate(); - if (predicate == null || predicate.call(kvp)) { - List buffer = writes.computeIfAbsent(connection, k -> new ArrayList<>(bufferCapacity)); - buffer.add(encoder.call(kvp)); + // Read lock only for ring access + Map>, List> writes; + ringLock.readLock().lock(); + try { + if (ring.isEmpty()) { + return; } + + int numConnections = ring.size() / connectionRepetitionOnRing; + int bufferCapacity = (chunks.size() / numConnections) + 1; // assume even distribution + writes = new HashMap<>(numConnections); + + // process chunks (ring access inside lookupConnection) + for (KeyValuePair kvp : chunks) { + long hash = kvp.getKeyBytesHashed(); + // lookup slot + AsyncConnection> connection = lookupConnection(hash); + // add to writes + Func1, Boolean> predicate = connection.getPredicate(); + if (predicate == null || predicate.call(kvp)) { + List buffer = writes.computeIfAbsent(connection, k -> new ArrayList<>(bufferCapacity)); + buffer.add(encoder.call(kvp)); + } + } + } finally { + ringLock.readLock().unlock(); } - // process writes + // process writes (outside lock - no ring access) if (!writes.isEmpty()) { for (Map.Entry>, List> entry : writes.entrySet()) { AsyncConnection> connection = entry.getKey(); @@ -72,38 +86,51 @@ public synchronized void route(List> chunks) { } @Override - public synchronized void addConnection(AsyncConnection> connection) { + public void addConnection(AsyncConnection> connection) { + String connectionId = connection.getSlotId(); + if (connectionId == null) { + throw new IllegalStateException("Connection must specify an id for consistent hashing"); + } + List hashCollisions = new ArrayList<>(); - for (int i = 0; i < connectionRepetitionOnRing; i++) { - // hash node on ring - String connectionId = connection.getSlotId(); - if (connectionId == null) { - throw new IllegalStateException("Connection must specify an id for consistent hashing"); + ringLock.writeLock().lock(); + try { + for (int i = 0; i < connectionRepetitionOnRing; i++) { + // hash node on ring + byte[] connectionBytes = (connectionId + "-" + i).getBytes(); + long hash = hashFunction.computeHash(connectionBytes); + if (ring.containsKey(hash)) { + hashCollisions.add(connectionId + "-" + i); + } + ring.put(hash, connection); } - byte[] connectionBytes = (connectionId + "-" + i).getBytes(); - long hash = hashFunction.computeHash(connectionBytes); - if (ring.containsKey(hash)) { - hashCollisions.add(connectionId + "-" + i); - } - ring.put(hash, connection); + } finally { + ringLock.writeLock().unlock(); } + + // Log outside lock if (!hashCollisions.isEmpty()) { - logger.error("Hash collisions detected when adding connection {}: {}", connection.getSlotId(), hashCollisions); + logger.error("Hash collisions detected when adding connection {}: {}", connectionId, hashCollisions); } } @Override - public synchronized void removeConnection(AsyncConnection> connection) { - for (int i = 0; i < connectionRepetitionOnRing; i++) { - // hash node on ring - String connectionId = connection.getSlotId(); - if (connectionId == null) { - throw new IllegalStateException("Connection must specify an id for consistent hashing"); - } + public void removeConnection(AsyncConnection> connection) { + String connectionId = connection.getSlotId(); + if (connectionId == null) { + throw new IllegalStateException("Connection must specify an id for consistent hashing"); + } - byte[] connectionBytes = (connectionId + "-" + i).getBytes(); - long hash = hashFunction.computeHash(connectionBytes); - ring.remove(hash); + ringLock.writeLock().lock(); + try { + for (int i = 0; i < connectionRepetitionOnRing; i++) { + // hash node on ring + byte[] connectionBytes = (connectionId + "-" + i).getBytes(); + long hash = hashFunction.computeHash(connectionBytes); + ring.remove(hash); + } + } finally { + ringLock.writeLock().unlock(); } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java index 42549c43f..f4bc25160 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java @@ -50,16 +50,13 @@ public synchronized void route(List chunks) { int arrayListSize = chunks.size() / connections.size() + 1; // assume even distribution // process chunks for (T chunk : chunks) { - currentIndex = (currentIndex + 1) % connections.size(); + currentIndex = currentIndex % connections.size(); AsyncConnection connection = connections.get(currentIndex); Func1 predicate = connection.getPredicate(); if (predicate == null || predicate.call(chunk)) { - List buffer = writes.get(connection); - if (buffer == null) { - buffer = new ArrayList<>(arrayListSize); - writes.put(connection, buffer); - } + List buffer = writes.computeIfAbsent(connection, k -> new ArrayList<>(arrayListSize)); buffer.add(encoder.call(chunk)); + currentIndex++; } } for (Map.Entry, List> entry : writes.entrySet()) { diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java index 485416e99..a82a0d363 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/PushServer.java @@ -154,8 +154,7 @@ public void call() { processedWrites = serverMetrics.getCounter("numProcessedWrites"); registerMetrics(metricsRegistry, serverMetrics, consumerThreads.getMetrics(), - outboundBuffer.getMetrics(), trigger.getMetrics(), - config.getChunkProcessor().fallbackRouter.getMetrics()); + outboundBuffer.getMetrics(), trigger.getMetrics()); port = config.getPort(); writeRetryCount = config.getWriteRetryCount(); @@ -165,14 +164,12 @@ public void call() { private void registerMetrics(MetricsRegistry registry, Metrics serverMetrics, Metrics consumerPoolMetrics, Metrics queueMetrics, - Metrics pushTriggerMetrics, - Metrics routerMetrics) { + Metrics pushTriggerMetrics) { registry.registerAndGet(serverMetrics); registry.registerAndGet(consumerPoolMetrics); registry.registerAndGet(queueMetrics); registry.registerAndGet(pushTriggerMetrics); - registry.registerAndGet(routerMetrics); } protected Observable manageConnection(final DefaultChannelWriter writer, String host, int port, diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java index bf159bd8e..3ea7b03a8 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java @@ -2,10 +2,37 @@ import rx.functions.Func1; +import java.nio.ByteBuffer; + public interface RouterFactory { Router scalarStageToStageRouter(String name, final Func1 toBytes); - ProactiveRouter scalarStageToStageProactiveRouter(String name, final Func1 toBytes); + default ProactiveRouter scalarStageToStageProactiveRouter(String name, final Func1 toBytes) { + return new ProactiveRoundRobinRouter<>(name, toBytes); + } + + default Router> keyedRouter(String name, Func1 keyEncoder, Func1 valueEncoder) { + return new ConsistentHashingRouter<>(name, RouterFactory.consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); + } + + default ProactiveRouter> keyedProactiveRouter(String name, Func1 keyEncoder, Func1 valueEncoder) { + return new ProactiveConsistentHashingRouter<>(name, RouterFactory.consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); + } - ProactiveRouter> keyedStageToStageProactiveRouter(String name, Func1 keyEncoder, Func1 valueEncoder); + static Func1, byte[]> consistentHashingEncoder(final Func1 valueEncoder) { + return kvp -> { + byte[] keyBytes = kvp.getKeyBytes(); + byte[] valueBytes = valueEncoder.call(kvp.getValue()); + return + // length + opcode + notification type + key length + ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) + .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length + .put((byte) 1) // opcode + .put((byte) 1) // notification type + .putInt(keyBytes.length) // key length + .put(keyBytes) // key bytes + .put(valueBytes) // value bytes + .array(); + }; + } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java index c48cd4e9a..4238a627a 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java @@ -34,30 +34,7 @@ public Routers() {} public static Router> consistentHashingLegacyTcpProtocol(String name, final Func1 keyEncoder, final Func1 valueEncoder) { - return new ConsistentHashingRouter(name, consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); - } - - public static ProactiveRouter> proactiveConsistentHashingLegacyTcpProtocol(String name, - final Func1 keyEncoder, - final Func1 valueEncoder) { - return new ProactiveConsistentHashingRouter<>(name, consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); - } - - private static Func1, byte[]> consistentHashingEncoder(final Func1 valueEncoder) { - return kvp -> { - byte[] keyBytes = kvp.getKeyBytes(); - byte[] valueBytes = valueEncoder.call(kvp.getValue()); - return - // length + opcode + notification type + key length - ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) - .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length - .put((byte) 1) // opcode - .put((byte) 1) // notification type - .putInt(keyBytes.length) // key length - .put(keyBytes) // key bytes - .put(valueBytes) // value bytes - .array(); - }; + return new ConsistentHashingRouter(name, RouterFactory.consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); } private static byte[] dataPayload(byte[] data) { @@ -131,14 +108,4 @@ public static Func1 string() { public Router scalarStageToStageRouter(String name, Func1 toBytes) { return roundRobinLegacyTcpProtocol(name, toBytes); } - - @Override - public ProactiveRouter scalarStageToStageProactiveRouter(String name, Func1 toBytes) { - return new ProactiveRoundRobinRouter<>(name, toBytes); - } - - @Override - public ProactiveRouter> keyedStageToStageProactiveRouter(String name, Func1 keyEncoder, Func1 valueEncoder) { - return proactiveConsistentHashingLegacyTcpProtocol(name, keyEncoder, valueEncoder); - } } diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java index b9fefad35..d16253781 100644 --- a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java @@ -16,59 +16,26 @@ package io.reactivex.mantis.network.push; -import io.mantisrx.common.metrics.Metrics; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import rx.subjects.PublishSubject; import java.util.*; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; public class ProactiveConsistentHashingRouterTest { private ProactiveConsistentHashingRouter router; private HashFunction hashFunction; - private List connections; @BeforeEach public void setup() { hashFunction = HashFunctions.xxh3(); router = new ProactiveConsistentHashingRouter<>("test-router", kvp -> kvp.getValue().getBytes(), hashFunction); - connections = new ArrayList<>(); - } - - @Test - public void testAddConnection() { - TestAsyncConnection connection = createConnection("slot-1"); - - router.addConnection(connection); - - // Route some data to verify connection was added - List> data = createTestData("key1", "value1"); - router.route(data); - - assertTrue(connection.getWrittenData().size() > 0); - } - - @Test - public void testRemoveConnection() { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Remove first connection - router.removeConnection(connection1); - - // Route data - should only go to connection2 - List> data = createTestData("key1", "value1"); - router.route(data); - - assertEquals(0, connection1.getWrittenData().size()); - assertTrue(connection2.getWrittenData().size() > 0); } @Test @@ -116,22 +83,9 @@ public void testConsistentHashing() { List> data = createTestData(key, "value" + i); router.route(data); } - - // Find which connection received the data - TestAsyncConnection targetConnection = null; - for (TestAsyncConnection conn : Arrays.asList(connection1, connection2, connection3)) { - if (conn.getWrittenData().size() > 0) { - if (targetConnection == null) { - targetConnection = conn; - } else { - // Should only be one connection receiving data for this key - fail("Data was routed to multiple connections for the same key"); - } - } - } - - assertNotNull(targetConnection); - assertEquals(5, targetConnection.getWrittenData().size()); + assertEquals(5, connection1.getWrittenData().size()); + assertEquals(0, connection2.getWrittenData().size()); + assertEquals(0, connection3.getWrittenData().size()); } @Test @@ -147,22 +101,23 @@ public void testDistributionAcrossMultipleConnections() { // Route many different keys List> data = new ArrayList<>(); - for (int i = 0; i < 100; i++) { + int routed = 30000; + for (int i = 0; i < routed; i++) { String key = "key-" + i; long hash = hashFunction.computeHash(key.getBytes()); data.add(new KeyValuePair<>(hash, key.getBytes(), "value-" + i)); } router.route(data); - // Verify all connections received some data (probabilistically should happen with 100 keys) - int totalRouted = connection1.getWrittenData().size() + + int actualRouted = connection1.getWrittenData().size() + connection2.getWrittenData().size() + connection3.getWrittenData().size(); - assertEquals(100, totalRouted); - assertTrue(connection1.getWrittenData().size() > 0); - assertTrue(connection2.getWrittenData().size() > 0); - assertTrue(connection3.getWrittenData().size() > 0); + assertEquals(routed, actualRouted); + // roughly even distribution, but allow 2% variance + assertEquals(10000, connection1.getWrittenData().size(), routed / 50.0); + assertEquals(10000, connection2.getWrittenData().size(), routed / 50.0); + assertEquals(10000, connection3.getWrittenData().size(), routed / 50.0); } @Test @@ -203,72 +158,6 @@ public void testRemoveConnectionWithNullSlotId() { assertThrows(IllegalStateException.class, () -> router.removeConnection(connection)); } - @Test - public void testMetrics() { - TestAsyncConnection connection = createConnection("slot-1"); - router.addConnection(connection); - - Metrics metrics = router.getMetrics(); - assertNotNull(metrics); - assertEquals("Router_test-router", metrics.getMetricGroupId().id()); - - // Route some data - List> data = createTestData("key1", "value1"); - router.route(data); - - // Verify metrics are updated - assertTrue(metrics.getCounter("numEventsRouted").value() > 0); - } - - @Test - public void testMultipleDataItemsInSingleRoute() { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Route multiple items at once - List> data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - data.add(createKeyValuePair("key-" + i, "value-" + i)); - } - - router.route(data); - - int totalRouted = connection1.getWrittenData().size() + connection2.getWrittenData().size(); - assertEquals(10, totalRouted); - } - - @Test - public void testThreadSafety() throws InterruptedException { - TestAsyncConnection connection = createConnection("slot-1"); - router.addConnection(connection); - - // Create multiple threads that route data concurrently - List threads = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - final int threadNum = i; - Thread thread = new Thread(() -> { - for (int j = 0; j < 10; j++) { - List> data = - createTestData("key-" + threadNum + "-" + j, "value-" + threadNum + "-" + j); - router.route(data); - } - }); - threads.add(thread); - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(); - } - - // Verify all data was routed - assertEquals(100, connection.getWrittenData().size()); - } - // Helper methods private TestAsyncConnection createConnection(String slotId) { @@ -277,9 +166,7 @@ private TestAsyncConnection createConnection(String slotId) { private TestAsyncConnection createConnection(String slotId, rx.functions.Func1, Boolean> predicate) { - TestAsyncConnection connection = new TestAsyncConnection(slotId, predicate); - connections.add(connection); - return connection; + return new TestAsyncConnection(slotId, predicate); } private List> createTestData(String key, String value) { @@ -295,7 +182,7 @@ private KeyValuePair createKeyValuePair(String key, String value // Test helper class private static class TestAsyncConnection extends AsyncConnection> { - private final List writtenData = new ArrayList<>(); + private final List writtenData = Collections.synchronizedList(new ArrayList<>()); private int writeCalls = 0; public TestAsyncConnection(String slotId, @@ -305,17 +192,13 @@ public TestAsyncConnection(String slotId, } @Override - public void write(List data) { + public synchronized void write(List data) { writeCalls++; writtenData.addAll(data); } - public List getWrittenData() { - return writtenData; - } - - public int getWriteCalls() { - return writeCalls; + public synchronized List getWrittenData() { + return new ArrayList<>(writtenData); } } } diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java index dea255730..0de285351 100644 --- a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java @@ -16,9 +16,9 @@ package io.reactivex.mantis.network.push; -import io.mantisrx.common.metrics.Metrics; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import rx.functions.Func1; import rx.subjects.PublishSubject; import java.util.*; @@ -28,51 +28,16 @@ public class ProactiveRoundRobinRouterTest { private ProactiveRoundRobinRouter router; - private List connections; @BeforeEach public void setup() { - router = new ProactiveRoundRobinRouter<>("test-router", data -> data.getBytes()); - connections = new ArrayList<>(); - } - - @Test - public void testAddConnection() { - TestAsyncConnection connection = createConnection("slot-1"); - - router.addConnection(connection); - - // Route some data to verify connection was added - List data = Arrays.asList("test-value"); - router.route(data); - - assertEquals(1, connection.getWrittenData().size()); - } - - @Test - public void testRemoveConnection() { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Remove first connection - router.removeConnection(connection1); - - // Route data - should only go to connection2 - List data = Arrays.asList("test-value"); - router.route(data); - - assertEquals(0, connection1.getWrittenData().size()); - assertEquals(1, connection2.getWrittenData().size()); + router = new ProactiveRoundRobinRouter<>("test-router", String::getBytes); } @Test public void testRouteWithNoConnections() { List data = Arrays.asList("test-value"); - // Should not throw exception assertDoesNotThrow(() -> router.route(data)); } @@ -120,161 +85,21 @@ public void testRoundRobinDistribution() { assertEquals(3, connection3.getWrittenData().size()); } - @Test - public void testRoundRobinOrder() { - // Add 2 connections - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Route items one at a time to verify alternating pattern - for (int i = 0; i < 4; i++) { - router.route(Arrays.asList("value-" + i)); - } - - // Should alternate between connections - assertEquals(2, connection1.getWrittenData().size()); - assertEquals(2, connection2.getWrittenData().size()); - } - - @Test - public void testUnevenDistribution() { - // Add 3 connections - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - TestAsyncConnection connection3 = createConnection("slot-3"); - - router.addConnection(connection1); - router.addConnection(connection2); - router.addConnection(connection3); - - // Route 10 items - should be distributed as 4, 3, 3 or similar - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - data.add("value-" + i); - } - router.route(data); - - int total = connection1.getWrittenData().size() + - connection2.getWrittenData().size() + - connection3.getWrittenData().size(); - - assertEquals(10, total); - - // No connection should be empty - assertTrue(connection1.getWrittenData().size() > 0); - assertTrue(connection2.getWrittenData().size() > 0); - assertTrue(connection3.getWrittenData().size() > 0); - } - - @Test - public void testSingleConnection() { - TestAsyncConnection connection = createConnection("slot-1"); - router.addConnection(connection); - - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - data.add("value-" + i); - } - router.route(data); - - // All items should go to the single connection - assertEquals(10, connection.getWrittenData().size()); - } - - @Test - public void testRouteWithPredicate() { - // Create connection with predicate that filters out certain values - TestAsyncConnection connection = createConnection("slot-1", - data -> data.startsWith("accept")); - - router.addConnection(connection); - - // Route data - some should be filtered out - List data = Arrays.asList("accept-value1", "reject-value2", "accept-value3"); - router.route(data); - - // Only 2 values should have been routed - assertEquals(2, connection.getWrittenData().size()); - } - @Test public void testPredicateAcrossMultipleConnections() { // Create connections with different predicates - TestAsyncConnection connection1 = createConnection("slot-1", - data -> data.contains("even")); - TestAsyncConnection connection2 = createConnection("slot-2", - data -> data.contains("odd")); + Func1 predicate = data -> data.contains("even"); + TestAsyncConnection connection1 = createConnection("slot-1", predicate); + TestAsyncConnection connection2 = createConnection("slot-2", predicate); router.addConnection(connection1); router.addConnection(connection2); - // Route data List data = Arrays.asList("value-even-0", "value-odd-1", "value-even-2", "value-odd-3"); router.route(data); - // Connection 1 should get even values, connection 2 should get odd values - assertEquals(1, connection1.getWrittenData().size()); // value-even-0 - assertEquals(1, connection2.getWrittenData().size()); // value-odd-1 - } - - @Test - public void testMetrics() { - TestAsyncConnection connection = createConnection("slot-1"); - router.addConnection(connection); - - Metrics metrics = router.getMetrics(); - assertNotNull(metrics); - assertEquals("Router_test-router", metrics.getMetricGroupId().id()); - - // Route some data - List data = Arrays.asList("value1", "value2", "value3"); - router.route(data); - - // Verify metrics are updated - assertEquals(3, metrics.getCounter("numEventsProcessed").value()); - assertEquals(3, metrics.getCounter("numEventsRouted").value()); - } - - @Test - public void testMultipleRoutes() { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // First route - router.route(Arrays.asList("value1", "value2")); - - // Second route - router.route(Arrays.asList("value3", "value4")); - - // Verify distribution continues across routes - int total = connection1.getWrittenData().size() + connection2.getWrittenData().size(); - assertEquals(4, total); - } - - @Test - public void testIndexWraparound() { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Route many items to test index wraparound - List data = new ArrayList<>(); - for (int i = 0; i < 100; i++) { - data.add("value-" + i); - } - router.route(data); - - // Should be evenly distributed - assertEquals(50, connection1.getWrittenData().size()); - assertEquals(50, connection2.getWrittenData().size()); + assertEquals(1, connection1.getWrittenData().size()); + assertEquals(1, connection2.getWrittenData().size()); } @Test @@ -293,38 +118,7 @@ public void testConnectionsAddedAfterRouting() { // Route more data - should now distribute across both router.route(Arrays.asList("value3", "value4", "value5", "value6")); - assertTrue(connection2.getWrittenData().size() > 0); - } - - @Test - public void testThreadSafety() throws InterruptedException { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Create multiple threads that route data concurrently - List threads = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - final int threadNum = i; - Thread thread = new Thread(() -> { - for (int j = 0; j < 10; j++) { - router.route(Arrays.asList("value-" + threadNum + "-" + j)); - } - }); - threads.add(thread); - thread.start(); - } - - // Wait for all threads to complete - for (Thread thread : threads) { - thread.join(); - } - - // Verify all data was routed - int total = connection1.getWrittenData().size() + connection2.getWrittenData().size(); - assertEquals(100, total); + assertEquals(2, connection2.getWrittenData().size()); } @Test @@ -347,30 +141,9 @@ public void testRemoveConnectionDuringRouting() { router.route(Arrays.asList("value4", "value5", "value6", "value7")); // Connection2 should not have received any data after removal + assertEquals(3, connection1.getWrittenData().size()); assertEquals(1, connection2.getWrittenData().size()); // Only from first route - } - - @Test - public void testBatchedWrites() { - TestAsyncConnection connection1 = createConnection("slot-1"); - TestAsyncConnection connection2 = createConnection("slot-2"); - - router.addConnection(connection1); - router.addConnection(connection2); - - // Route multiple items in one call - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - data.add("value-" + i); - } - router.route(data); - - // Each connection should receive its items in a single write call - // (5 items per connection in one batch) - assertEquals(1, connection1.getWriteCalls()); - assertEquals(1, connection2.getWriteCalls()); - assertEquals(5, connection1.getWrittenData().size()); - assertEquals(5, connection2.getWrittenData().size()); + assertEquals(3, connection3.getWrittenData().size()); } // Helper methods @@ -381,9 +154,7 @@ private TestAsyncConnection createConnection(String slotId) { private TestAsyncConnection createConnection(String slotId, rx.functions.Func1 predicate) { - TestAsyncConnection connection = new TestAsyncConnection(slotId, predicate); - connections.add(connection); - return connection; + return new TestAsyncConnection(slotId, predicate); } // Test helper class diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/TimedChunkerTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/TimedChunkerTest.java index 3654f156d..3757a287f 100644 --- a/mantis-network/src/test/java/io/reactivex/mantis/network/push/TimedChunkerTest.java +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/TimedChunkerTest.java @@ -156,13 +156,12 @@ public void testLongProcessing() throws Exception { assertEquals(expected, processor.getProcessed()); } - public static class TestProcessor extends ChunkProcessor { + public static class TestProcessor implements ChunkProcessor { private ScheduledExecutorService scheduledService = Executors.newSingleThreadScheduledExecutor(); private List processed = new ArrayList(); private long processingTimeMs = 0; public TestProcessor(long processingTimeMs) { - super(null); this.processingTimeMs = processingTimeMs; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 150a61ed1..39b70d863 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -39,6 +39,8 @@ import rx.Observable; import rx.functions.Func1; +import java.util.Optional; + import static io.mantisrx.common.SystemParameters.W2W_USE_PROACTIVE_ROUTER; /** @@ -88,9 +90,9 @@ public void start(final StageConfig stage, Observable> toSer Func1 encoder = t1 -> stage.getOutputCodec().encode(t1); Router router = this.routerFactory.scalarStageToStageRouter(name, encoder); - Func1> proactiveFactory = (String k) -> null; + Func1>> proactiveFactory = (String k) -> Optional.empty(); if (useProactiveRouters()) { - proactiveFactory = (String name) -> routerFactory.scalarStageToStageProactiveRouter(name, encoder); + proactiveFactory = (String name) -> Optional.of(routerFactory.scalarStageToStageProactiveRouter(name, encoder)); } ServerConfig config = new ServerConfig.Builder() @@ -142,6 +144,12 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS Func1 valueEncoder = t1 -> stage.getOutputCodec().encode(t1); Func1 keyEncoder = t1 -> stage.getOutputKeyCodec().encode(t1); + Router> router = this.routerFactory.keyedRouter(name, keyEncoder, valueEncoder); + Func1>>> proactiveFactory = (String k) -> Optional.empty(); + if (useProactiveRouters()) { + proactiveFactory = (String name) -> Optional.of(routerFactory.keyedProactiveRouter(name, keyEncoder, valueEncoder)); + } + ServerConfig> config = new ServerConfig.Builder>() .name(name) .port(serverPort) @@ -151,9 +159,8 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS .maxChunkTimeMSec(maxChunkTimeMSec()) .bufferCapacity(bufferCapacity()) .useSpscQueue(useSpsc()) - .groupRouter(Routers.consistentHashingLegacyTcpProtocol(jobName, keyEncoder, valueEncoder)) - .proactiveRouterFactory(useProactiveRouters() ? - routerName -> routerFactory.keyedStageToStageProactiveRouter(routerName, keyEncoder, valueEncoder) : (k) -> null) + .groupRouter(router) + .proactiveRouterFactory(proactiveFactory) .build(); if (stage instanceof ScalarToGroup || stage instanceof GroupToGroup) { From e7d656ec8751607305c7bfe54c216a0d1717c533 Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Fri, 24 Oct 2025 12:01:01 -0400 Subject: [PATCH 5/9] set up config for proactive router --- .../io/mantisrx/common/SystemParameters.java | 3 --- .../io/mantisrx/runtime/GroupToScalar.java | 11 ++++++++- .../java/io/mantisrx/runtime/KeyToKey.java | 11 +++++++++ .../mantisrx/runtime/KeyValueStageConfig.java | 10 +++++--- .../io/mantisrx/runtime/ScalarToGroup.java | 12 +++++++++- .../io/mantisrx/runtime/ScalarToScalar.java | 17 ++++++++++---- .../java/io/mantisrx/runtime/StageConfig.java | 23 ++++++++++++++++--- .../WorkerPublisherRemoteObservable.java | 11 ++------- .../runtime/parameter/ParameterUtils.java | 14 ----------- .../runtime/sink/ServerSentEventsSink.java | 11 +++++++-- 10 files changed, 83 insertions(+), 40 deletions(-) diff --git a/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java b/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java index dc99a295d..5ca293d51 100644 --- a/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java +++ b/mantis-common/src/main/java/io/mantisrx/common/SystemParameters.java @@ -27,9 +27,6 @@ public final class SystemParameters { public static final String JOB_WORKER_HEARTBEAT_INTERVAL_SECS = "mantis.job.worker.heartbeat.interval.secs"; public static final String JOB_WORKER_TIMEOUT_SECS = "mantis.job.worker.timeout.secs"; - public static final String W2W_USE_PROACTIVE_ROUTER = "mantis.w2w.useProactiveRouter"; - public static final String SSE_USE_PROACTIVE_ROUTER = "mantis.sse.useProactiveRouter"; - public static final String JOB_AUTOSCALE_V2_ENABLED_PARAM = "mantis.job.autoscale.v2.enabled"; public static final String JOB_AUTOSCALE_V2_LOADER_CONFIG_PARAM = "mantis.job.autoscale.v2.loader.config"; diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java index cfb4cf35a..2ffc63ae8 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/GroupToScalar.java @@ -56,7 +56,7 @@ public class GroupToScalar extends StageConfig { GroupToScalar(GroupToScalarComputation computation, Config config, Codec inputKeyCodec, Codec inputCodec) { - super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, inputKeyCodec, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.useProactiveRouter); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; } @@ -80,6 +80,7 @@ public static class Config { private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private int concurrency = DEFAULT_STAGE_CONCURRENCY; private List> parameters = Collections.emptyList(); + private boolean useProactiveRouter = false; /** * @param codec is netty reactivex Codec @@ -126,6 +127,11 @@ public Config concurrentInput(final int concurrency) { return this; } + public Config shouldUseProactiveRouter(boolean useProactiveRouter) { + this.useProactiveRouter = useProactiveRouter; + return this; + } + public Codec getCodec() { return codec; } @@ -149,6 +155,9 @@ public Config withParameters(List> params) { return this; } + public boolean getUseProactiveRouter() { + return useProactiveRouter; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java index 5760934cd..9bd614995 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyToKey.java @@ -79,6 +79,7 @@ public static class Config { // do not allow config to override private final INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private List> parameters = Collections.emptyList(); + private boolean useProactiveRouter = false; /** * @param codec is a netty reactivex codec @@ -136,6 +137,16 @@ public Config withParameters(List> params) this.parameters = params; return this; } + + /** + * Configure this stage to use proactive routers for better connection management performance. + * + * @return this config for method chaining + */ + public Config withProactiveRouter() { + this.useProactiveRouter = true; + return this; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java index 4e0cd6907..aa0a447bc 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/KeyValueStageConfig.java @@ -33,11 +33,15 @@ public abstract class KeyValueStageConfig extends StageConfig { private final Codec keyCodec; public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); + this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, false); } - public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { - super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency); + public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, boolean useProactiveRouter) { + this(description, inputKeyCodec, inputCodec, outputKeyCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, useProactiveRouter); + } + + public KeyValueStageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputKeyCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency, boolean useProactiveRouter) { + super(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, concurrency, useProactiveRouter); this.keyCodec = outputKeyCodec; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java index 7fb2a3206..9368c73d8 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToGroup.java @@ -54,7 +54,7 @@ public class ScalarToGroup extends KeyValueStageConfig { public ScalarToGroup(ToGroupComputation computation, Config config, Codec inputCodec) { - super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, null, inputCodec, config.keyCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.useProactiveRouter); this.computation = computation; this.keyExpireTimeSeconds = config.keyExpireTimeSeconds; @@ -79,6 +79,7 @@ public static class Config { private int concurrency = DEFAULT_STAGE_CONCURRENCY; private long keyExpireTimeSeconds = Long.MAX_VALUE; // never expire by default private List> parameters = Collections.emptyList(); + private boolean useProactiveRouter = false; /** * @param codec is Codec of netty reactivex @@ -129,6 +130,11 @@ public Config description(String description) { return this; } + public Config useProactiveRouter(boolean useProactiveRouter) { + this.useProactiveRouter = useProactiveRouter; + return this; + } + public Codec getCodec() { return codec; } @@ -155,5 +161,9 @@ public Config withParameters(List> params) { this.parameters = params; return this; } + + public boolean isUseProactiveRouter() { + return useProactiveRouter; + } } } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java index 7ba4309e2..5cc374a80 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/ScalarToScalar.java @@ -34,15 +34,13 @@ public class ScalarToScalar extends StageConfig { */ ScalarToScalar(ScalarComputation computation, Config config, final io.reactivex.netty.codec.Codec inputCodec) { - super(config.description, NettyCodec.fromNetty(inputCodec), config.codec, config.inputStrategy, config.parameters, config.concurrency); - this.computation = computation; - this.inputStrategy = config.inputStrategy; + this(computation, config, NettyCodec.fromNetty(inputCodec)); } public ScalarToScalar(ScalarComputation computation, Config config, Codec inputCodec) { - super(config.description, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency); + super(config.description, inputCodec, config.codec, config.inputStrategy, config.parameters, config.concurrency, config.useProactiveRouter); this.computation = computation; this.inputStrategy = config.inputStrategy; this.parameters = config.parameters; @@ -68,6 +66,7 @@ public static class Config { // default input type is serial for 'collecting' use case private INPUT_STRATEGY inputStrategy = INPUT_STRATEGY.SERIAL; private volatile int concurrency = StageConfig.DEFAULT_STAGE_CONCURRENCY; + private boolean useProactiveRouter = false; private List> parameters = Collections.emptyList(); @@ -118,6 +117,16 @@ public Config withParameters(List> params) { return this; } + /** + * Configure this stage to use proactive routers for better connection management performance. + * + * @return this config for method chaining + */ + public Config withProactiveRouter() { + this.useProactiveRouter = true; + return this; + } + public String getDescription() { return description; } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java index a920c1987..5eeb796dc 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/StageConfig.java @@ -40,6 +40,9 @@ public abstract class StageConfig { // number of inner observables processed private int concurrency = DEFAULT_STAGE_CONCURRENCY; + // determines whether to use proactive routers for better connection management performance + private boolean useProactiveRouter = false; + public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy) { this(description, inputCodec, outputCodec, inputStrategy, Collections.emptyList(), DEFAULT_STAGE_CONCURRENCY); @@ -52,7 +55,7 @@ public StageConfig(String description, Codec inputCodec, public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params) { - this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY); + this(description, inputKeyCodec, inputCodec, outputCodec, inputStrategy, params, DEFAULT_STAGE_CONCURRENCY, false); } public StageConfig(String description, Codec inputCodec, @@ -63,12 +66,18 @@ public StageConfig(String description, Codec inputCodec, public StageConfig(String description, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, int concurrency) { - this(description, null, inputCodec, outputCodec, inputStrategy, params, concurrency); + this(description, inputCodec, outputCodec, inputStrategy, params, concurrency, false); + } + + public StageConfig(String description, Codec inputCodec, + Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, + int concurrency, boolean useProactiveRouter) { + this(description, null, inputCodec, outputCodec, inputStrategy, params, concurrency, useProactiveRouter); } public StageConfig(String description, Codec inputKeyCodec, Codec inputCodec, Codec outputCodec, INPUT_STRATEGY inputStrategy, List> params, - int concurrency) { + int concurrency, boolean useProactiveRouter) { this.description = description; this.inputKeyCodec = inputKeyCodec; this.inputCodec = inputCodec; @@ -76,6 +85,7 @@ public StageConfig(String description, Codec inputKeyCodec, Codec inpu this.inputStrategy = inputStrategy; this.parameters = params; this.concurrency = concurrency; + this.useProactiveRouter = useProactiveRouter; } public String getDescription() { @@ -109,5 +119,12 @@ public int getConcurrency() { return concurrency; } + /** + * @return true if proactive routers should be used for this stage + */ + public boolean shouldUseProactiveRouter() { + return useProactiveRouter; + } + public enum INPUT_STRATEGY {NONE_SPECIFIED, SERIAL, CONCURRENT} } diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java index 39b70d863..86739dd8c 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/executor/WorkerPublisherRemoteObservable.java @@ -41,8 +41,6 @@ import java.util.Optional; -import static io.mantisrx.common.SystemParameters.W2W_USE_PROACTIVE_ROUTER; - /** * Execution of WorkerPublisher that publishes the stream to the next stage. * @@ -91,7 +89,7 @@ public void start(final StageConfig stage, Observable> toSer Func1 encoder = t1 -> stage.getOutputCodec().encode(t1); Router router = this.routerFactory.scalarStageToStageRouter(name, encoder); Func1>> proactiveFactory = (String k) -> Optional.empty(); - if (useProactiveRouters()) { + if (stage.shouldUseProactiveRouter()) { proactiveFactory = (String name) -> Optional.of(routerFactory.scalarStageToStageProactiveRouter(name, encoder)); } @@ -146,7 +144,7 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS Router> router = this.routerFactory.keyedRouter(name, keyEncoder, valueEncoder); Func1>>> proactiveFactory = (String k) -> Optional.empty(); - if (useProactiveRouters()) { + if (stage.shouldUseProactiveRouter()) { proactiveFactory = (String name) -> Optional.of(routerFactory.keyedProactiveRouter(name, keyEncoder, valueEncoder)); } @@ -174,11 +172,6 @@ private LegacyTcpPushServer> startKeyValueStage(KeyValueS HashFunctions.xxh3()); } - private boolean useProactiveRouters() { - String stringValue = propService.getStringValue(W2W_USE_PROACTIVE_ROUTER, "false"); - return Boolean.parseBoolean(stringValue); - } - private boolean useSpsc() { String stringValue = propService.getStringValue("mantis.w2w.spsc", "false"); return Boolean.parseBoolean(stringValue); diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java index 2a31778b7..23d357edf 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/parameter/ParameterUtils.java @@ -94,20 +94,6 @@ public class ParameterUtils { .build(); systemParams.put(w2wtoKeyThreads.getName(), w2wtoKeyThreads); - ParameterDefinition w2wUseProactiveRouter = new BooleanParameter() - .name(W2W_USE_PROACTIVE_ROUTER) - .description("Whether to use proactive routing for worker to worker connections") - .defaultValue(false) - .build(); - systemParams.put(w2wUseProactiveRouter.getName(), w2wUseProactiveRouter); - - ParameterDefinition sseUseProactiveRouter = new BooleanParameter() - .name(SSE_USE_PROACTIVE_ROUTER) - .description("Whether to use proactive routing for SSE connections") - .defaultValue(false) - .build(); - systemParams.put(sseUseProactiveRouter.getName(), sseUseProactiveRouter); - // mantis.sse.bufferCapacity 25000 ParameterDefinition sseBuffer = new IntParameter() diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java index c0869eedc..b63e10765 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java @@ -90,7 +90,14 @@ public ServerSentEventsSink(Func1 encoder) { this.subscribeProcessor = builder.subscribeProcessor; this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); this.router = builder.router; - this.proactiveRouterFactory = builder.proactiveRouterFactory; + + // Set up proactive router factory based on builder configuration + // If proactive router is requested, caller must provide a factory + if (builder.proactiveRouterFactory != null) { + this.proactiveRouterFactory = builder.proactiveRouterFactory; + } else { + this.proactiveRouterFactory = (String routerName) -> Optional.empty(); + } } @Override @@ -261,7 +268,7 @@ public static class Builder { private Predicate predicate; private Func2>, Context, Void> subscribeProcessor; private Router router; - private Func1>> proactiveRouterFactory = (String routerName) -> Optional.empty();; + private Func1>> proactiveRouterFactory = (String routerName) -> Optional.empty(); public Builder withEncoder(Func1 encoder) { this.encoder = encoder; From b606b570215c2f54e5de1df3606944b5e3629e0c Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Fri, 24 Oct 2025 13:28:36 -0400 Subject: [PATCH 6/9] Clean up for PR --- .../mantis/network/push/ConnectionGroup.java | 7 ++++++- .../network/push/ConnectionManager.java | 7 ++++++- .../ProactiveConsistentHashingRouter.java | 16 ++++++++++++++ .../push/ProactiveRoundRobinRouter.java | 16 ++++++++++++++ .../ProactiveConsistentHashingRouterTest.java | 2 +- .../push/ProactiveRoundRobinRouterTest.java | 2 +- .../runtime/sink/ServerSentEventsSink.java | 21 +++++++------------ 7 files changed, 53 insertions(+), 18 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java index 997ad1b40..0cb1c9e5c 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java @@ -24,7 +24,12 @@ import io.mantisrx.common.metrics.spectator.GaugeCallback; import io.mantisrx.common.metrics.spectator.MetricGroupId; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java index 252cc7081..f1158bb7a 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionManager.java @@ -21,7 +21,12 @@ import io.mantisrx.common.metrics.spectator.GaugeCallback; import io.mantisrx.common.metrics.spectator.MetricGroupId; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Optional; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java index 0c3b8fb34..b1397307c 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.reactivex.mantis.network.push; import io.mantisrx.common.metrics.Counter; diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java index f4bc25160..ce0a7a2ec 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java @@ -1,3 +1,19 @@ +/* + * Copyright 2025 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.reactivex.mantis.network.push; import io.mantisrx.common.metrics.Counter; diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java index d16253781..01e0216b4 100644 --- a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 Netflix, Inc. + * Copyright 2025 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java index 0de285351..ba78ba6d6 100644 --- a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 Netflix, Inc. + * Copyright 2025 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java index b63e10765..f71fb849d 100644 --- a/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java +++ b/mantis-runtime/src/main/java/io/mantisrx/runtime/sink/ServerSentEventsSink.java @@ -25,7 +25,12 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOption; import io.netty.channel.WriteBufferWaterMark; -import io.reactivex.mantis.network.push.*; +import io.reactivex.mantis.network.push.ProactiveRouter; +import io.reactivex.mantis.network.push.PushServerSse; +import io.reactivex.mantis.network.push.PushServers; +import io.reactivex.mantis.network.push.Routers; +import io.reactivex.mantis.network.push.ServerConfig; +import io.reactivex.mantis.network.push.Router; import java.io.IOException; import java.util.List; @@ -90,14 +95,7 @@ public ServerSentEventsSink(Func1 encoder) { this.subscribeProcessor = builder.subscribeProcessor; this.propService = ServiceRegistry.INSTANCE.getPropertiesService(); this.router = builder.router; - - // Set up proactive router factory based on builder configuration - // If proactive router is requested, caller must provide a factory - if (builder.proactiveRouterFactory != null) { - this.proactiveRouterFactory = builder.proactiveRouterFactory; - } else { - this.proactiveRouterFactory = (String routerName) -> Optional.empty(); - } + this.proactiveRouterFactory = builder.proactiveRouterFactory; } @Override @@ -155,11 +153,6 @@ private boolean useSpsc() { return Boolean.parseBoolean(useSpsc); } - private boolean useProactiveRouter() { - String useProactiveRouter = propService.getStringValue("mantis.sse.useProactiveRouter", "false"); - return Boolean.parseBoolean(useProactiveRouter); - } - @Override public void call(Context context, PortRequest portRequest, final Observable observable) { port = portRequest.getPort(); From 46cf57b70bdbc3bdef1335cc7d24e00fe67ba158 Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Fri, 24 Oct 2025 16:07:16 -0400 Subject: [PATCH 7/9] actually increment connection updates --- .../ProactiveConsistentHashingRouter.java | 22 +++++++++++++------ .../push/ProactiveRoundRobinRouter.java | 2 ++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java index b1397307c..e0f54e80a 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java @@ -28,7 +28,7 @@ public class ProactiveConsistentHashingRouter implements ProactiveRouter> { private static final Logger logger = LoggerFactory.getLogger(ProactiveConsistentHashingRouter.class); - private static final int connectionRepetitionOnRing = 1000; + private final int connectionRepetitionOnRing; protected final Func1, byte[]> encoder; protected final Counter numEventsRouted; @@ -43,6 +43,15 @@ public ProactiveConsistentHashingRouter( String name, Func1, byte[]> dataEncoder, HashFunction hashFunction) { + this(name, dataEncoder, hashFunction, 1000); + } + + public ProactiveConsistentHashingRouter( + String name, + Func1, byte[]> dataEncoder, + HashFunction hashFunction, + int ringRepetitionPerConnection) { + this.connectionRepetitionOnRing = ringRepetitionPerConnection; this.encoder = dataEncoder; metrics = new Metrics.Builder() .name("Router_" + name) @@ -61,6 +70,7 @@ public void route(List> chunks) { if (chunks == null || chunks.isEmpty()) { return; } + numEventsProcessed.increment(chunks.size()); // Read lock only for ring access Map>, List> writes; @@ -78,7 +88,8 @@ public void route(List> chunks) { for (KeyValuePair kvp : chunks) { long hash = kvp.getKeyBytesHashed(); // lookup slot - AsyncConnection> connection = lookupConnection(hash); + Map.Entry>> connectionEntry = ring.ceilingEntry(hash); + AsyncConnection> connection = (connectionEntry == null ? ring.firstEntry() : connectionEntry).getValue(); // add to writes Func1, Boolean> predicate = connection.getPredicate(); if (predicate == null || predicate.call(kvp)) { @@ -123,6 +134,7 @@ public void addConnection(AsyncConnection> connection) { } finally { ringLock.writeLock().unlock(); } + numConnectionUpdates.increment(); // Log outside lock if (!hashCollisions.isEmpty()) { @@ -148,15 +160,11 @@ public void removeConnection(AsyncConnection> connection) { } finally { ringLock.writeLock().unlock(); } + numConnectionUpdates.increment(); } @Override public Metrics getMetrics() { return metrics; } - - private AsyncConnection> lookupConnection(long hash) { - Map.Entry>> connection = ring.ceilingEntry(hash); - return (connection == null ? ring.firstEntry() : connection).getValue(); - } } diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java index ce0a7a2ec..9399a2596 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java @@ -48,11 +48,13 @@ public ProactiveRoundRobinRouter(String name, Func1 encoder) { @Override public synchronized void addConnection(AsyncConnection connection) { // We do not need to shuffle because we are constantly looping through + numConnectionUpdates.increment(); connections.add(connection); } @Override public synchronized void removeConnection(AsyncConnection connection) { + numConnectionUpdates.increment(); connections.remove(connection); } From 7159abd26fe5093695b2ddb54e4af2964181173e Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Fri, 24 Oct 2025 16:29:53 -0400 Subject: [PATCH 8/9] some more clean up --- .../io/reactivex/mantis/network/push/RouterFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java index 3ea7b03a8..8166ff349 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/RouterFactory.java @@ -19,14 +19,14 @@ default ProactiveRouter> keyedProactiveRouter(String n return new ProactiveConsistentHashingRouter<>(name, RouterFactory.consistentHashingEncoder(valueEncoder), HashFunctions.xxh3()); } - static Func1, byte[]> consistentHashingEncoder(final Func1 valueEncoder) { + public static Func1, byte[]> consistentHashingEncoder(final Func1 valueEncoder) { return kvp -> { byte[] keyBytes = kvp.getKeyBytes(); byte[] valueBytes = valueEncoder.call(kvp.getValue()); return // length + opcode + notification type + key length - ByteBuffer.allocate(4 + 1 + 1 + 4 + keyBytes.length + valueBytes.length) - .putInt(1 + 1 + 4 + keyBytes.length + valueBytes.length) // length + ByteBuffer.allocate(2 * Integer.BYTES + 2 * Byte.BYTES + keyBytes.length + valueBytes.length) + .putInt(2 * Byte.BYTES + Integer.BYTES + keyBytes.length + valueBytes.length) // length .put((byte) 1) // opcode .put((byte) 1) // notification type .putInt(keyBytes.length) // key length From 4e8aeab0f71c7d63fb98b40c96d30ae3c0a60273 Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Tue, 28 Oct 2025 11:54:16 -0400 Subject: [PATCH 9/9] address PR comments --- .../mantis/network/push/ConnectionGroup.java | 12 ++++++------ .../push/ProactiveConsistentHashingRouter.java | 3 ++- .../network/push/ProactiveRoundRobinRouter.java | 3 ++- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java index 0cb1c9e5c..b65776305 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConnectionGroup.java @@ -46,12 +46,12 @@ public class ConnectionGroup { private Counter successfulWrites; private Counter numSlotSwaps; private Counter failedWrites; - private final Optional> router; + private final Optional> routerO; - public ConnectionGroup(String groupId, Optional> router) { + public ConnectionGroup(String groupId, Optional> routerO) { this.groupId = groupId; this.connections = new HashMap<>(); - this.router = router; + this.routerO = routerO; final String grpId = Optional.ofNullable(groupId).orElse("none"); final BasicTag groupIdTag = new BasicTag(MantisMetricStringConstants.GROUP_ID_TAG, grpId); @@ -98,7 +98,7 @@ public synchronized void removeConnection(AsyncConnection connection) { + " a new connection has already been swapped in the place of the old connection"); } - this.router.ifPresent(router -> router.removeConnection(connection)); + this.routerO.ifPresent(router -> router.removeConnection(connection)); } public synchronized void addConnection(AsyncConnection connection) { @@ -113,7 +113,7 @@ public synchronized void addConnection(AsyncConnection connection) { previousConnection.close(); numSlotSwaps.increment(); } - this.router.ifPresent(router -> router.addConnection(connection)); + this.routerO.ifPresent(router -> router.addConnection(connection)); } public synchronized boolean isEmpty() { @@ -141,7 +141,7 @@ public String toString() { } public void route(List chunks, Router fallbackRouter) { - this.router.ifPresentOrElse( + this.routerO.ifPresentOrElse( router -> router.route(chunks), () -> fallbackRouter.route(this.getConnections(), chunks) ); diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java index e0f54e80a..58d19daf4 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveConsistentHashingRouter.java @@ -16,6 +16,7 @@ package io.reactivex.mantis.network.push; +import com.netflix.spectator.api.Tag; import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Metrics; import org.slf4j.Logger; @@ -54,7 +55,7 @@ public ProactiveConsistentHashingRouter( this.connectionRepetitionOnRing = ringRepetitionPerConnection; this.encoder = dataEncoder; metrics = new Metrics.Builder() - .name("Router_" + name) + .id("Router_" + name, Tag.of("router_type", "proactive_consistent_hashing")) .addCounter("numEventsRouted") .addCounter("numEventsProcessed") .addCounter("numConnectionUpdates") diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java index 9399a2596..377bd6aa0 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ProactiveRoundRobinRouter.java @@ -16,6 +16,7 @@ package io.reactivex.mantis.network.push; +import com.netflix.spectator.api.Tag; import io.mantisrx.common.metrics.Counter; import io.mantisrx.common.metrics.Metrics; import rx.functions.Func1; @@ -35,7 +36,7 @@ public class ProactiveRoundRobinRouter implements ProactiveRouter { public ProactiveRoundRobinRouter(String name, Func1 encoder) { this.encoder = encoder; metrics = new Metrics.Builder() - .name("Router_" + name) + .id("Router_" + name, Tag.of("router_type", "proactive_round_robin")) .addCounter("numEventsRouted") .addCounter("numEventsProcessed") .addCounter("numConnectionUpdates")