14
14
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
15
* KIND, either express or implied. See the License for the
16
16
* specific language governing permissions and limitations
17
- * under the License.
17
+ * under the License.
18
18
*/
19
19
20
20
package org .apache .bifromq .basecluster .memberlist ;
25
25
import static org .apache .bifromq .basecrdt .core .api .CausalCRDTType .mvreg ;
26
26
import static org .apache .bifromq .basecrdt .store .ReplicaIdGenerator .generate ;
27
27
28
- import org .apache .bifromq .basecluster .agent .proto .AgentEndpoint ;
29
- import org .apache .bifromq .basecluster .memberlist .agent .Agent ;
30
- import org .apache .bifromq .basecluster .memberlist .agent .AgentAddressProvider ;
31
- import org .apache .bifromq .basecluster .memberlist .agent .AgentMessenger ;
32
- import org .apache .bifromq .basecluster .memberlist .agent .IAgent ;
33
- import org .apache .bifromq .basecluster .membership .proto .Doubt ;
34
- import org .apache .bifromq .basecluster .membership .proto .Fail ;
35
- import org .apache .bifromq .basecluster .membership .proto .HostEndpoint ;
36
- import org .apache .bifromq .basecluster .membership .proto .HostMember ;
37
- import org .apache .bifromq .basecluster .membership .proto .Join ;
38
- import org .apache .bifromq .basecluster .membership .proto .Quit ;
39
- import org .apache .bifromq .basecluster .messenger .IMessenger ;
40
- import org .apache .bifromq .basecluster .proto .ClusterMessage ;
41
- import org .apache .bifromq .basecrdt .core .api .IORMap ;
42
- import org .apache .bifromq .basecrdt .core .api .MVRegOperation ;
43
- import org .apache .bifromq .basecrdt .core .api .ORMapOperation ;
44
- import org .apache .bifromq .basecrdt .proto .Replica ;
45
- import org .apache .bifromq .basecrdt .store .ICRDTStore ;
46
- import org .apache .bifromq .basehlc .HLC ;
47
28
import com .google .common .base .Preconditions ;
48
29
import com .google .common .collect .Maps ;
49
- import com .google .common .collect .Sets ;
50
30
import com .google .protobuf .AbstractMessageLite ;
51
31
import com .google .protobuf .ByteString ;
52
32
import io .micrometer .core .instrument .Gauge ;
57
37
import io .reactivex .rxjava3 .core .Scheduler ;
58
38
import io .reactivex .rxjava3 .disposables .CompositeDisposable ;
59
39
import io .reactivex .rxjava3 .subjects .BehaviorSubject ;
40
+ import io .reactivex .rxjava3 .subjects .PublishSubject ;
60
41
import java .net .InetSocketAddress ;
61
42
import java .util .HashSet ;
62
43
import java .util .Iterator ;
69
50
import java .util .concurrent .atomic .AtomicReference ;
70
51
import java .util .stream .Collectors ;
71
52
import lombok .extern .slf4j .Slf4j ;
53
+ import org .apache .bifromq .base .util .RendezvousHash ;
54
+ import org .apache .bifromq .basecluster .agent .proto .AgentEndpoint ;
55
+ import org .apache .bifromq .basecluster .memberlist .agent .Agent ;
56
+ import org .apache .bifromq .basecluster .memberlist .agent .AgentAddressProvider ;
57
+ import org .apache .bifromq .basecluster .memberlist .agent .AgentMessenger ;
58
+ import org .apache .bifromq .basecluster .memberlist .agent .IAgent ;
59
+ import org .apache .bifromq .basecluster .membership .proto .Doubt ;
60
+ import org .apache .bifromq .basecluster .membership .proto .Fail ;
61
+ import org .apache .bifromq .basecluster .membership .proto .HostEndpoint ;
62
+ import org .apache .bifromq .basecluster .membership .proto .HostMember ;
63
+ import org .apache .bifromq .basecluster .membership .proto .Join ;
64
+ import org .apache .bifromq .basecluster .membership .proto .Quit ;
65
+ import org .apache .bifromq .basecluster .messenger .IMessenger ;
66
+ import org .apache .bifromq .basecluster .proto .ClusterMessage ;
67
+ import org .apache .bifromq .basecrdt .core .api .IORMap ;
68
+ import org .apache .bifromq .basecrdt .core .api .MVRegOperation ;
69
+ import org .apache .bifromq .basecrdt .core .api .ORMapOperation ;
70
+ import org .apache .bifromq .basecrdt .proto .Replica ;
71
+ import org .apache .bifromq .basecrdt .store .ICRDTStore ;
72
+ import org .apache .bifromq .basehlc .HLC ;
72
73
74
+ /**
75
+ * HostMemberList implementation using CRDT for achieving a consistent view of the host members in the cluster.
76
+ */
73
77
@ Slf4j
74
78
public class HostMemberList implements IHostMemberList {
75
79
private final AtomicReference <State > state = new AtomicReference <>(State .JOINED );
@@ -79,12 +83,25 @@ public class HostMemberList implements IHostMemberList {
79
83
private final IHostAddressResolver addressResolver ;
80
84
private final BehaviorSubject <Map <HostEndpoint , HostMember >> membershipSubject = BehaviorSubject .createDefault (
81
85
new ConcurrentHashMap <>());
86
+ private final PublishSubject <Long > refuteSubject = PublishSubject .create ();
82
87
private final Map <String , Agent > agentMap = new ConcurrentHashMap <>();
83
88
private final IORMap hostListCRDT ;
84
89
private final CompositeDisposable disposables = new CompositeDisposable ();
85
90
private final MetricManager metricManager ;
86
91
private final String [] tags ;
87
92
private volatile HostMember local ;
93
+
94
+ /**
95
+ * Constructor of HostMemberList.
96
+ *
97
+ * @param bindAddr the address to bind the host member
98
+ * @param port the port to bind the host member
99
+ * @param messenger the messenger to use for communication
100
+ * @param scheduler the scheduler to use for scheduling tasks
101
+ * @param store the CRDT store to use for storing internal OR-Map
102
+ * @param addressResolver the address resolver to resolve host endpoints to addresses
103
+ * @param tags the tags to be used for metrics
104
+ */
88
105
public HostMemberList (String bindAddr ,
89
106
int port ,
90
107
IMessenger messenger ,
@@ -134,10 +151,13 @@ private boolean join(HostMember member) {
134
151
if (joined ) {
135
152
// add it into crdt
136
153
log .debug ("Member[{}] joins the cluster: local={}" , member , local );
137
- Optional <HostMember > memberInCRDT = getHostMember (hostListCRDT , member .getEndpoint ());
138
- if (memberInCRDT .isEmpty () || memberInCRDT .get ().getIncarnation () < member .getIncarnation ()) {
139
- hostListCRDT .execute (ORMapOperation .update (member .getEndpoint ().toByteString ())
140
- .with (MVRegOperation .write (member .toByteString ())));
154
+ if (member == local ) {
155
+ // only update crdt if it's local member
156
+ Optional <HostMember > memberInCRDT = getHostMember (hostListCRDT , member .getEndpoint ());
157
+ if (memberInCRDT .isEmpty () || memberInCRDT .get ().getIncarnation () < member .getIncarnation ()) {
158
+ hostListCRDT .execute (ORMapOperation .update (member .getEndpoint ().toByteString ())
159
+ .with (MVRegOperation .write (member .toByteString ())));
160
+ }
141
161
}
142
162
// update crdt landscape
143
163
store .join (hostListCRDT .id (), currentMembers ().keySet ().stream ()
@@ -148,12 +168,11 @@ private boolean join(HostMember member) {
148
168
}
149
169
}
150
170
151
- private void drop (HostEndpoint memberEndpoint , int incarnation ) {
171
+ private void drop (HostEndpoint memberEndpoint , int incarnation , boolean fromQuit ) {
152
172
synchronized (this ) {
153
173
boolean removed = removeMember (memberEndpoint , incarnation );
154
174
Optional <HostMember > memberInCRDT = getHostMember (hostListCRDT , memberEndpoint );
155
- if (memberInCRDT .isPresent ()) {
156
- // remove it from crdt if any
175
+ if (!fromQuit && memberInCRDT .isPresent () && shouldReportFailure (memberInCRDT .get ().getEndpoint ())) {
157
176
hostListCRDT .execute (ORMapOperation .remove (memberEndpoint .toByteString ()).of (mvreg ));
158
177
}
159
178
if (removed ) {
@@ -165,6 +184,17 @@ private void drop(HostEndpoint memberEndpoint, int incarnation) {
165
184
}
166
185
}
167
186
187
+ private boolean shouldReportFailure (HostEndpoint failedMemberEndpoint ) {
188
+ // if local member is responsible for removing the failed member from CRDT
189
+ RendezvousHash <HostEndpoint , HostEndpoint > hash = RendezvousHash .<HostEndpoint , HostEndpoint >builder ()
190
+ .keyFunnel ((from , into ) -> into .putBytes (from .getId ().asReadOnlyByteBuffer ()))
191
+ .nodeFunnel ((from , into ) -> into .putBytes (from .getId ().asReadOnlyByteBuffer ()))
192
+ .nodes (currentMembers ().keySet ())
193
+ .build ();
194
+ HostEndpoint reporter = hash .get (failedMemberEndpoint );
195
+ return reporter .getId ().equals (local .getEndpoint ().getId ());
196
+ }
197
+
168
198
@ Override
169
199
public boolean isZombie (HostEndpoint endpoint ) {
170
200
return !endpoint .getId ().equals (local .getEndpoint ().getId ())
@@ -207,6 +237,7 @@ public CompletableFuture<Void> stop() {
207
237
.thenCompose (v -> store .stopHosting (hostListCRDT .id ()))
208
238
.whenComplete ((v , e ) -> {
209
239
membershipSubject .onComplete ();
240
+ refuteSubject .onComplete ();
210
241
metricManager .close ();
211
242
state .set (State .QUITED );
212
243
});
@@ -226,6 +257,8 @@ private void renew(int atLeastIncarnation) {
226
257
synchronized (this ) {
227
258
local = local .toBuilder ().setIncarnation (Math .max (local .getIncarnation (), atLeastIncarnation ) + 1 ).build ();
228
259
join (local );
260
+ agentMap .values ().forEach (Agent ::refreshRegistration );
261
+ refuteSubject .onNext (HLC .INST .get ());
229
262
}
230
263
}
231
264
@@ -247,7 +280,6 @@ public IAgent host(String agentId) {
247
280
tags ));
248
281
local = local .toBuilder ()
249
282
.setIncarnation (local .getIncarnation () + 1 )
250
- .addAgentId (agentId ) // deprecate since 3.3.3
251
283
.putAgent (agentId , agentEndpoint .getIncarnation ())
252
284
.build ();
253
285
join (local );
@@ -265,8 +297,6 @@ public CompletableFuture<Void> stopHosting(String agentId) {
265
297
synchronized (this ) {
266
298
local = local .toBuilder ()
267
299
.setIncarnation (local .getIncarnation () + 1 )
268
- .clearAgentId ()
269
- .addAllAgentId (agentMap .keySet ()) // deprecate since 3.3.3
270
300
.clearAgent ()
271
301
.putAllAgent (Maps .transformValues (agentMap , a -> a .local ().getIncarnation ()))
272
302
.build ();
@@ -279,7 +309,12 @@ public CompletableFuture<Void> stopHosting(String agentId) {
279
309
280
310
@ Override
281
311
public Observable <Map <HostEndpoint , Set <String >>> landscape () {
282
- return membershipSubject .map (m -> Maps .transformValues (m , v -> Sets .newHashSet (v .getAgentIdList ())));
312
+ return membershipSubject .map (m -> Maps .transformValues (m , v -> v .getAgentMap ().keySet ()));
313
+ }
314
+
315
+ @ Override
316
+ public Observable <Long > refuteSignal () {
317
+ return refuteSubject ;
283
318
}
284
319
285
320
private Map <HostEndpoint , HostMember > currentMembers () {
@@ -327,6 +362,9 @@ private void handleMessage(ClusterMessage message) {
327
362
case QUIT -> handleQuit (message .getQuit ());
328
363
case FAIL -> handleFail (message .getFail ());
329
364
case DOUBT -> handleDoubt (message .getDoubt ());
365
+ default -> {
366
+ // never happen
367
+ }
330
368
}
331
369
}
332
370
@@ -363,15 +401,15 @@ private void handleFail(Fail fail) {
363
401
} else if (isZombie (failedEndpoint )) {
364
402
clearZombie (failedEndpoint );
365
403
} else {
366
- drop (failedEndpoint , fail .getIncarnation ());
404
+ drop (failedEndpoint , fail .getIncarnation (), false );
367
405
}
368
406
}
369
407
370
408
private void handleQuit (Quit quit ) {
371
409
HostEndpoint quitEndpoint = quit .getEndpoint ();
372
410
log .debug ("Member[{}] quits the cluster" , quitEndpoint );
373
411
if (!quitEndpoint .equals (local .getEndpoint ()) && !isZombie (quitEndpoint )) {
374
- drop (quitEndpoint , quit .getIncarnation ());
412
+ drop (quitEndpoint , quit .getIncarnation (), true );
375
413
}
376
414
}
377
415
@@ -388,7 +426,7 @@ private void handleDoubt(Doubt doubt) {
388
426
389
427
private void clearZombie (HostEndpoint zombieEndpoint ) {
390
428
// drop zombie if any, and broadcast a quit on behalf of it
391
- drop (zombieEndpoint , Integer .MAX_VALUE );
429
+ drop (zombieEndpoint , Integer .MAX_VALUE , false );
392
430
messenger .spread (ClusterMessage .newBuilder ()
393
431
.setQuit (Quit .newBuilder ().setEndpoint (zombieEndpoint ).setIncarnation (Integer .MAX_VALUE ).build ())
394
432
.build ());
0 commit comments