Skip to content

Commit c07c2c0

Browse files
authored
Switch to use WCH-balance mode with sticky enabled for dist pub rpc (#170)
1. support hashing routing with sticky behavior for pipelined unary method which is more suitable for dist pub method; 2. Replace EnhancedMarshaller with a memory-efficient impl; 3. Make WRR deterministic.
1 parent 7534ca2 commit c07c2c0

File tree

18 files changed

+844
-179
lines changed

18 files changed

+844
-179
lines changed

base-rpc/base-rpc-client/src/main/java/org/apache/bifromq/baserpc/client/DummyServerSelector.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
package org.apache.bifromq.baserpc.client;
2121

22+
import java.util.Optional;
2223
import org.apache.bifromq.baserpc.client.loadbalancer.IServerGroupRouter;
2324
import org.apache.bifromq.baserpc.client.loadbalancer.IServerSelector;
24-
import java.util.Optional;
2525

2626
class DummyServerSelector implements IServerSelector {
2727
public static final IServerSelector INSTANCE = new DummyServerSelector();
@@ -58,6 +58,11 @@ public Optional<String> tryRoundRobin() {
5858
public Optional<String> hashing(String key) {
5959
return Optional.empty();
6060
}
61+
62+
@Override
63+
public Optional<String> stickyHashing(String key) {
64+
return Optional.empty();
65+
}
6166
};
6267
}
6368

base-rpc/base-rpc-client/src/main/java/org/apache/bifromq/baserpc/client/ManagedBiDiStream.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.baserpc.client;
@@ -52,6 +52,7 @@ abstract class ManagedBiDiStream<InT, OutT> {
5252
private final CompositeDisposable disposables = new CompositeDisposable();
5353
private final String tenantId;
5454
private final String wchKey;
55+
private final boolean sticky;
5556
private final String targetServerId;
5657
private final Supplier<Map<String, String>> metadataSupplier;
5758
private final Channel channel;
@@ -66,18 +67,20 @@ abstract class ManagedBiDiStream<InT, OutT> {
6667
ManagedBiDiStream(String tenantId,
6768
String wchKey,
6869
String targetServerId,
69-
BluePrint.BalanceMode balanceMode,
70+
BluePrint.MethodSemantic methodSemantic,
7071
Supplier<Map<String, String>> metadataSupplier,
7172
Channel channel,
7273
CallOptions callOptions,
7374
MethodDescriptor<InT, OutT> methodDescriptor) {
74-
checkArgument(balanceMode != BluePrint.BalanceMode.DDBalanced || targetServerId != null,
75+
checkArgument(methodSemantic.mode() != BluePrint.BalanceMode.DDBalanced || targetServerId != null,
7576
"targetServerId is required");
76-
checkArgument(balanceMode != BluePrint.BalanceMode.WCHBalanced | wchKey != null, "wchKey is required");
77+
checkArgument(methodSemantic.mode() != BluePrint.BalanceMode.WCHBalanced || wchKey != null,
78+
"wchKey is required");
7779
this.tenantId = tenantId;
7880
this.wchKey = wchKey;
7981
this.targetServerId = targetServerId;
80-
this.balanceMode = balanceMode;
82+
this.balanceMode = methodSemantic.mode();
83+
this.sticky = methodSemantic instanceof BluePrint.HRWPipelineUnaryMethod;
8184
this.metadataSupplier = metadataSupplier;
8285
this.channel = channel;
8386
this.callOptions = callOptions;
@@ -149,11 +152,11 @@ private void onServerSelectorChanged(IServerSelector newServerSelector) {
149152
return;
150153
}
151154
Optional<String> currentServer = prevRouter.hashing(wchKey);
152-
Optional<String> newServer = router.hashing(wchKey);
155+
Optional<String> newServer = sticky ? router.stickyHashing(wchKey) : router.hashing(wchKey);
153156
if (newServer.isEmpty()) {
154157
// cancel current bidi-stream
155158
synchronized (this) {
156-
bidiStream.get().bidiStream().cancel("no server available");
159+
bidiStream.get().bidiStream().cancel("No server available");
157160
}
158161
} else if (!newServer.equals(currentServer)) {
159162
switch (state.get()) {
@@ -179,7 +182,7 @@ private void onServerSelectorChanged(IServerSelector newServerSelector) {
179182
if (newServer.isEmpty()) {
180183
// cancel current bidi-stream
181184
synchronized (this) {
182-
bidiStream.get().bidiStream().cancel("no server available");
185+
bidiStream.get().bidiStream().cancel("No server available");
183186
}
184187
} else {
185188
switch (state.get()) {
@@ -212,7 +215,7 @@ private void onServerSelectorChanged(IServerSelector newServerSelector) {
212215
if (newServer.isEmpty()) {
213216
// cancel current bidi-stream
214217
synchronized (this) {
215-
bidiStream.get().bidiStream().cancel("no server available");
218+
bidiStream.get().bidiStream().cancel("No server available");
216219
}
217220
} else {
218221
switch (state.get()) {
@@ -294,7 +297,7 @@ private void retarget(IServerSelector serverSelector) {
294297
}
295298
case WCHBalanced -> {
296299
IServerGroupRouter router = serverSelector.get(tenantId);
297-
Optional<String> selectedServer = router.hashing(wchKey);
300+
Optional<String> selectedServer = sticky ? router.stickyHashing(wchKey) : router.hashing(wchKey);
298301
if (selectedServer.isEmpty()) {
299302
state.set(State.NoServerAvailable);
300303
reportServiceUnavailable();
@@ -423,13 +426,13 @@ public boolean isReady() {
423426
@Override
424427
public void cancel(String message) {
425428
// do nothing
426-
managedBiDiStream.onStreamError(new IllegalStateException("bidi-stream is not ready"));
429+
managedBiDiStream.onStreamError(new IllegalStateException("Stream is not ready"));
427430
}
428431

429432
@Override
430433
public void send(InT in) {
431434
// do nothing
432-
managedBiDiStream.onStreamError(new IllegalStateException("bidi-stream is not ready"));
435+
managedBiDiStream.onStreamError(new IllegalStateException("Stream is not ready"));
433436
}
434437

435438
@Override

base-rpc/base-rpc-client/src/main/java/org/apache/bifromq/baserpc/client/ManagedMessageStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.baserpc.client;
@@ -55,7 +55,7 @@ class ManagedMessageStream<MsgT, AckT> extends ManagedBiDiStream<AckT, MsgT>
5555
super(tenantId,
5656
wchKey,
5757
targetServerId,
58-
bluePrint.semantic(methodDescriptor.getFullMethodName()).mode(),
58+
bluePrint.semantic(methodDescriptor.getFullMethodName()),
5959
metadataSupplier,
6060
channelHolder.channel(),
6161
callOptions,

base-rpc/base-rpc-client/src/main/java/org/apache/bifromq/baserpc/client/ManagedRequestPipeline.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.baserpc.client;
@@ -63,7 +63,7 @@ public class ManagedRequestPipeline<ReqT, RespT> extends ManagedBiDiStream<ReqT,
6363
super(tenantId,
6464
wchKey,
6565
targetServerId,
66-
bluePrint.semantic(methodDescriptor.getFullMethodName()).mode(),
66+
bluePrint.semantic(methodDescriptor.getFullMethodName()),
6767
metadataSupplier,
6868
channelHolder.channel(),
6969
callOptions,
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.bifromq.baserpc.client.loadbalancer;
21+
22+
import com.google.common.base.Charsets;
23+
import com.google.common.hash.HashCode;
24+
import com.google.common.hash.Hashing;
25+
import java.util.Collection;
26+
import java.util.Objects;
27+
28+
/**
29+
* Rendezvous Hashing Weighted Router (HRW).
30+
*/
31+
class HRWRouter<T> {
32+
private static final long IEEE754_DOUBLE_1 = 0x3FF0000000000000L; // 1.0 in IEEE 754 double format
33+
private final Collection<T> nodes;
34+
private final KeyFunction<T> keyFunction;
35+
private final WeightFunction<T> weightFunction;
36+
private final HashFunction hashFunction;
37+
38+
HRWRouter(Collection<T> nodes, KeyFunction<T> keyFunction, WeightFunction<T> weightFunction) {
39+
this(nodes, keyFunction, weightFunction, (key) -> {
40+
HashCode code = Hashing.murmur3_128().hashString(key, Charsets.UTF_8);
41+
return code.asLong();
42+
});
43+
}
44+
45+
HRWRouter(Collection<T> nodes,
46+
KeyFunction<T> keyFunction,
47+
WeightFunction<T> weightFunction,
48+
HashFunction hashFunction) {
49+
this.nodes = Objects.requireNonNull(nodes);
50+
this.keyFunction = Objects.requireNonNull(keyFunction);
51+
this.weightFunction = Objects.requireNonNull(weightFunction);
52+
this.hashFunction = Objects.requireNonNull(hashFunction);
53+
}
54+
55+
// map unsigned long to double in (0,1) uniformly
56+
private static double hashToUnitInterval(long x) {
57+
double u = Double.longBitsToDouble((x >>> 12) | IEEE754_DOUBLE_1) - 1.0;
58+
final double eps = 1e-12;
59+
if (u <= 0) {
60+
u = eps;
61+
}
62+
if (u >= 1) {
63+
u = 1 - eps;
64+
}
65+
return u;
66+
}
67+
68+
/**
69+
* Route to the best node based on the given object key.
70+
*
71+
* @param objectKey the key to route
72+
* @return the best node, or null if no nodes are available
73+
*/
74+
T routeNode(String objectKey) {
75+
if (nodes.isEmpty()) {
76+
return null;
77+
}
78+
T bestNode = null;
79+
double bestScore = Double.POSITIVE_INFINITY;
80+
81+
for (T n : nodes) {
82+
String key = keyFunction.getKey(n);
83+
int w = weightFunction.getWeight(n);
84+
if (w <= 0) {
85+
continue;
86+
}
87+
long h = hashFunction.hash64(objectKey + key);
88+
// Rendezvous/WRH:min(-ln(U)/w)
89+
double u = hashToUnitInterval(h);
90+
double score = -Math.log(u) / (double) w;
91+
92+
if (score < bestScore) {
93+
bestScore = score;
94+
bestNode = n;
95+
}
96+
}
97+
return bestNode;
98+
}
99+
100+
interface KeyFunction<T> {
101+
String getKey(T node);
102+
}
103+
104+
interface WeightFunction<T> {
105+
int getWeight(T node);
106+
}
107+
108+
interface HashFunction {
109+
long hash64(String key);
110+
}
111+
}

base-rpc/base-rpc-client/src/main/java/org/apache/bifromq/baserpc/client/loadbalancer/IServerGroupRouter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
1515
* KIND, either express or implied. See the License for the
1616
* specific language governing permissions and limitations
17-
* under the License.
17+
* under the License.
1818
*/
1919

2020
package org.apache.bifromq.baserpc.client.loadbalancer;
@@ -31,4 +31,6 @@ public interface IServerGroupRouter {
3131
Optional<String> tryRoundRobin();
3232

3333
Optional<String> hashing(String key);
34+
35+
Optional<String> stickyHashing(String key);
3436
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.bifromq.baserpc.client.loadbalancer;
21+
22+
import com.google.common.collect.Maps;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import lombok.EqualsAndHashCode;
26+
27+
@EqualsAndHashCode
28+
class TenantAwareServerSelector implements IServerSelector {
29+
private final Map<String, Boolean> allServers;
30+
private final Map<String, Set<String>> serverGroupTags;
31+
private final Map<String, Map<String, Integer>> trafficDirective;
32+
@EqualsAndHashCode.Exclude
33+
private final ITenantRouter tenantRouter;
34+
35+
public TenantAwareServerSelector(Map<String, Boolean> allServers,
36+
Map<String, Set<String>> serverGroupTags,
37+
Map<String, Map<String, Integer>> trafficDirective) {
38+
this.allServers = Maps.newHashMap(allServers);
39+
this.serverGroupTags = Maps.newHashMap(serverGroupTags);
40+
this.trafficDirective = Maps.newHashMap(trafficDirective);
41+
this.tenantRouter = new TenantRouter(this.allServers, this.trafficDirective, this.serverGroupTags);
42+
}
43+
44+
@Override
45+
public boolean exists(String serverId) {
46+
return allServers.containsKey(serverId);
47+
}
48+
49+
@Override
50+
public IServerGroupRouter get(String tenantId) {
51+
return tenantRouter.get(tenantId);
52+
}
53+
54+
@Override
55+
public String toString() {
56+
return allServers.toString();
57+
}
58+
}

0 commit comments

Comments
 (0)