Skip to content

Commit 58fa2bf

Browse files
committed
Refactoring the internal keep alive mechanism for persistent session, completely eliminated the periodic touch operation
1 parent 5504f17 commit 58fa2bf

File tree

43 files changed

+396
-1121
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+396
-1121
lines changed

bifromq-inbox/bifromq-inbox-client/src/main/java/com/baidu/bifromq/inbox/client/IInboxClient.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import com.baidu.bifromq.inbox.rpc.proto.SendLWTRequest;
3535
import com.baidu.bifromq.inbox.rpc.proto.SubReply;
3636
import com.baidu.bifromq.inbox.rpc.proto.SubRequest;
37-
import com.baidu.bifromq.inbox.rpc.proto.TouchReply;
38-
import com.baidu.bifromq.inbox.rpc.proto.TouchRequest;
3937
import com.baidu.bifromq.inbox.rpc.proto.UnsubReply;
4038
import com.baidu.bifromq.inbox.rpc.proto.UnsubRequest;
4139
import com.baidu.bifromq.inbox.storage.proto.Fetched;
@@ -62,8 +60,6 @@ default int id() {
6260

6361
CompletableFuture<DetachReply> detach(DetachRequest request);
6462

65-
CompletableFuture<TouchReply> touch(TouchRequest request);
66-
6763
CompletableFuture<SubReply> sub(SubRequest request);
6864

6965
CompletableFuture<UnsubReply> unsub(UnsubRequest request);

bifromq-inbox/bifromq-inbox-client/src/main/java/com/baidu/bifromq/inbox/client/InboxClient.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import com.baidu.bifromq.inbox.rpc.proto.SendLWTRequest;
3838
import com.baidu.bifromq.inbox.rpc.proto.SubReply;
3939
import com.baidu.bifromq.inbox.rpc.proto.SubRequest;
40-
import com.baidu.bifromq.inbox.rpc.proto.TouchReply;
41-
import com.baidu.bifromq.inbox.rpc.proto.TouchRequest;
4240
import com.baidu.bifromq.inbox.rpc.proto.UnsubReply;
4341
import com.baidu.bifromq.inbox.rpc.proto.UnsubRequest;
4442
import com.baidu.bifromq.plugin.subbroker.CheckReply;
@@ -155,17 +153,6 @@ public CompletableFuture<DetachReply> detach(DetachRequest request) {
155153
});
156154
}
157155

158-
@Override
159-
public CompletableFuture<TouchReply> touch(TouchRequest request) {
160-
return rpcClient.invoke(request.getTenantId(), null, request, InboxServiceGrpc.getTouchMethod())
161-
.exceptionally(e -> {
162-
log.debug("Touch inbox failed", e);
163-
return TouchReply.newBuilder()
164-
.setReqId(request.getReqId())
165-
.setCode(TouchReply.Code.ERROR).build();
166-
});
167-
}
168-
169156
@Override
170157
public CompletableFuture<SubReply> sub(SubRequest request) {
171158
return rpcClient.invoke(request.getTenantId(), null, request, InboxServiceGrpc.getSubMethod())

bifromq-inbox/bifromq-inbox-client/src/test/java/com.baidu.bifromq.inbox.client/InboxClientTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@
3838
import com.baidu.bifromq.inbox.rpc.proto.GetRequest;
3939
import com.baidu.bifromq.inbox.rpc.proto.SubReply;
4040
import com.baidu.bifromq.inbox.rpc.proto.SubRequest;
41-
import com.baidu.bifromq.inbox.rpc.proto.TouchReply;
42-
import com.baidu.bifromq.inbox.rpc.proto.TouchRequest;
4341
import com.baidu.bifromq.inbox.rpc.proto.UnsubReply;
4442
import com.baidu.bifromq.inbox.rpc.proto.UnsubRequest;
4543
import com.baidu.bifromq.plugin.subbroker.CheckReply;
@@ -180,17 +178,6 @@ public void unsubRPCException() {
180178
assertEquals(reply.getCode(), UnsubReply.Code.ERROR);
181179
}
182180

183-
@Test
184-
public void touchRPCException() {
185-
TouchRequest touchRequest = TouchRequest.newBuilder().setTenantId("TenantId").build();
186-
187-
when(rpcClient.invoke(anyString(), isNull(), any(), any()))
188-
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("Mocked")));
189-
TouchReply reply = inboxClient.touch(touchRequest).join();
190-
verify(rpcClient).invoke(eq(touchRequest.getTenantId()), isNull(), eq(touchRequest), any());
191-
assertEquals(reply.getCode(), TouchReply.Code.ERROR);
192-
}
193-
194181
@Test
195182
public void expireRPCException() {
196183
ExpireRequest expireRequest = ExpireRequest.newBuilder().setTenantId("TenantId").build();

bifromq-inbox/bifromq-inbox-coproc-proto/src/main/proto/inboxservice/InboxStoreCoProc.proto

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ message BatchCreateRequest{
6868
uint64 now = 9;
6969
}
7070
repeated Params params = 1;
71-
Replica leader = 2; // the leader replica
7271
}
7372

7473
message BatchCreateReply{
@@ -108,7 +107,6 @@ message BatchDetachRequest{
108107
uint32 expirySeconds = 5;
109108
bool discardLWT = 6;
110109
uint64 now = 7;
111-
optional Replica sender = 8; // the replica on which the detach task is running to trigger the detach request
112110
}
113111
repeated Params params = 1;
114112
Replica leader = 2; // the leader replica
@@ -123,26 +121,6 @@ message BatchDetachReply{
123121
repeated Code code = 1;
124122
}
125123

126-
message BatchTouchRequest{
127-
message Params{
128-
string tenantId = 1;
129-
string inboxId = 2;
130-
uint64 incarnation = 3;
131-
uint64 version = 4;
132-
uint64 now = 5;
133-
}
134-
repeated Params params = 1;
135-
}
136-
137-
message BatchTouchReply{
138-
enum Code{
139-
OK = 0;
140-
NO_INBOX = 1;
141-
CONFLICT = 2;
142-
}
143-
repeated Code code = 1;
144-
}
145-
146124
message BatchDeleteRequest{
147125
message Params{
148126
string tenantId = 1;
@@ -177,7 +155,6 @@ message BatchSubRequest{
177155
uint64 now = 7;
178156
}
179157
repeated Params params = 1;
180-
Replica leader = 2; // the leader replica
181158
}
182159

183160
message BatchSubReply{
@@ -201,7 +178,6 @@ message BatchUnsubRequest{
201178
uint64 now = 6;
202179
}
203180
repeated Params params = 1;
204-
Replica leader = 2; // the leader replica
205181
}
206182

207183
message BatchUnsubReply{
@@ -278,7 +254,6 @@ message BatchCommitRequest{
278254
uint64 now = 8;
279255
}
280256
repeated Params params = 1;
281-
Replica leader = 2; // the leader replica
282257
}
283258

284259
message BatchCommitReply{
@@ -374,10 +349,9 @@ message InboxServiceROCoProcInput{
374349
BatchGetRequest batchGet = 2;
375350
BatchFetchRequest batchFetch = 3;
376351
BatchCheckSubRequest batchCheckSub = 4;
377-
BatchTouchRequest batchTouch = 5;
378-
BatchSendLWTRequest batchSendLWT = 6;
379-
ExpireTenantRequest expireTenant = 7;
380-
GCRequest gc = 8;
352+
BatchSendLWTRequest batchSendLWT = 5;
353+
ExpireTenantRequest expireTenant = 6;
354+
GCRequest gc = 7;
381355
}
382356
}
383357

@@ -387,9 +361,8 @@ message InboxServiceROCoProcOutput{
387361
BatchGetReply batchGet = 2;
388362
BatchFetchReply batchFetch = 3;
389363
BatchCheckSubReply batchCheckSub = 4;
390-
BatchTouchReply batchTouch = 5;
391-
BatchSendLWTReply batchSendLWT = 6;
392-
ExpireTenantReply expireTenant = 7;
393-
GCReply gc = 8;
364+
BatchSendLWTReply batchSendLWT = 5;
365+
ExpireTenantReply expireTenant = 6;
366+
GCReply gc = 7;
394367
}
395368
}

bifromq-inbox/bifromq-inbox-rpc-definition/src/main/java/com/baidu/bifromq/inbox/RPCBluePrint.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.baidu.bifromq.inbox.rpc.proto.InboxServiceGrpc;
2727
import com.baidu.bifromq.inbox.rpc.proto.SendLWTRequest;
2828
import com.baidu.bifromq.inbox.rpc.proto.SubRequest;
29-
import com.baidu.bifromq.inbox.rpc.proto.TouchRequest;
3029
import com.baidu.bifromq.inbox.rpc.proto.UnsubRequest;
3130
import com.baidu.bifromq.plugin.subbroker.CheckRequest;
3231

@@ -51,8 +50,6 @@ public class RPCBluePrint {
5150
.keyHashFunc(request -> getDelivererKey(request.getTenantId(), request.getInboxId())).build())
5251
.methodSemantic(InboxServiceGrpc.getCreateMethod(), BluePrint.WCHUnaryMethod.<CreateRequest>builder()
5352
.keyHashFunc(request -> getDelivererKey(request.getClient().getTenantId(), request.getInboxId())).build())
54-
.methodSemantic(InboxServiceGrpc.getTouchMethod(), BluePrint.WCHUnaryMethod.<TouchRequest>builder()
55-
.keyHashFunc(request -> getDelivererKey(request.getTenantId(), request.getInboxId())).build())
5653
.methodSemantic(InboxServiceGrpc.getSubMethod(), BluePrint.WCHUnaryMethod.<SubRequest>builder()
5754
.keyHashFunc(request -> getDelivererKey(request.getTenantId(), request.getInboxId())).build())
5855
.methodSemantic(InboxServiceGrpc.getUnsubMethod(), BluePrint.WCHUnaryMethod.<UnsubRequest>builder()

bifromq-inbox/bifromq-inbox-rpc-definition/src/main/proto/inboxservice/InboxService.proto

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ service InboxService {
1818
rpc attach (AttachRequest) returns (AttachReply);
1919
rpc detach (DetachRequest) returns (DetachReply);
2020
rpc create (CreateRequest) returns (CreateReply);
21-
rpc touch (TouchRequest) returns (TouchReply);
2221
rpc sub(SubRequest) returns (SubReply);
2322
rpc unsub(UnsubRequest) returns (UnsubReply);
2423
rpc receive (stream SendRequest) returns (stream SendReply);
@@ -85,7 +84,6 @@ message DetachRequest {
8584
bool discardLWT = 6;
8685
commontype.ClientInfo client = 7;
8786
uint64 now = 8;
88-
optional Replica sender = 9; // if the request is triggered by detach task on the replica
8987
}
9088

9189
message DetachReply {
@@ -125,27 +123,6 @@ message CreateReply {
125123
Code code = 2;
126124
}
127125

128-
message TouchRequest {
129-
uint64 reqId = 1;
130-
string tenantId = 2;
131-
string inboxId = 3;
132-
uint64 incarnation = 4;
133-
uint64 version = 5;
134-
uint64 now = 6;
135-
}
136-
137-
message TouchReply {
138-
enum Code {
139-
OK = 0;
140-
NO_INBOX = 1;
141-
CONFLICT = 2;
142-
TRY_LATER = 3;
143-
ERROR = 4;
144-
}
145-
uint64 reqId = 1;
146-
Code code = 2;
147-
}
148-
149126
message SubRequest {
150127
uint64 reqId = 1;
151128
string tenantId = 2;

bifromq-inbox/bifromq-inbox-server/src/main/java/com/baidu/bifromq/inbox/server/InboxServer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.baidu.bifromq.inbox.server.scheduler.InboxInsertScheduler;
2727
import com.baidu.bifromq.inbox.server.scheduler.InboxSendLWTScheduler;
2828
import com.baidu.bifromq.inbox.server.scheduler.InboxSubScheduler;
29-
import com.baidu.bifromq.inbox.server.scheduler.InboxTouchScheduler;
3029
import com.baidu.bifromq.inbox.server.scheduler.InboxUnSubScheduler;
3130
import com.google.common.util.concurrent.MoreExecutors;
3231
import io.micrometer.core.instrument.Metrics;
@@ -59,7 +58,6 @@ class InboxServer implements IInboxServer {
5958
.deleteScheduler(new InboxDeleteScheduler(builder.inboxStoreClient))
6059
.subScheduler(new InboxSubScheduler(builder.inboxStoreClient))
6160
.unsubScheduler(new InboxUnSubScheduler(builder.inboxStoreClient))
62-
.touchScheduler(new InboxTouchScheduler(builder.inboxStoreClient))
6361
.tenantGCRunner(new TenantGCRunner(builder.inboxStoreClient))
6462
.build();
6563
if (builder.workerThreads == 0) {

bifromq-inbox/bifromq-inbox-server/src/main/java/com/baidu/bifromq/inbox/server/InboxService.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@
4747
import com.baidu.bifromq.inbox.rpc.proto.SendRequest;
4848
import com.baidu.bifromq.inbox.rpc.proto.SubReply;
4949
import com.baidu.bifromq.inbox.rpc.proto.SubRequest;
50-
import com.baidu.bifromq.inbox.rpc.proto.TouchReply;
51-
import com.baidu.bifromq.inbox.rpc.proto.TouchRequest;
5250
import com.baidu.bifromq.inbox.rpc.proto.UnsubReply;
5351
import com.baidu.bifromq.inbox.rpc.proto.UnsubRequest;
5452
import com.baidu.bifromq.inbox.server.scheduler.IInboxAttachScheduler;
@@ -62,7 +60,6 @@
6260
import com.baidu.bifromq.inbox.server.scheduler.IInboxInsertScheduler;
6361
import com.baidu.bifromq.inbox.server.scheduler.IInboxSendLWTScheduler;
6462
import com.baidu.bifromq.inbox.server.scheduler.IInboxSubScheduler;
65-
import com.baidu.bifromq.inbox.server.scheduler.IInboxTouchScheduler;
6663
import com.baidu.bifromq.inbox.server.scheduler.IInboxUnsubScheduler;
6764
import com.baidu.bifromq.inbox.storage.proto.TopicFilterOption;
6865
import com.baidu.bifromq.plugin.subbroker.CheckReply;
@@ -87,7 +84,6 @@ class InboxService extends InboxServiceGrpc.InboxServiceImplBase {
8784
private final IInboxCheckSubScheduler checkSubScheduler;
8885
private final IInboxInsertScheduler insertScheduler;
8986
private final IInboxCommitScheduler commitScheduler;
90-
private final IInboxTouchScheduler touchScheduler;
9187
private final IInboxCreateScheduler createScheduler;
9288
private final IInboxAttachScheduler attachScheduler;
9389
private final IInboxDetachScheduler detachScheduler;
@@ -111,7 +107,6 @@ class InboxService extends InboxServiceGrpc.InboxServiceImplBase {
111107
IInboxDeleteScheduler deleteScheduler,
112108
IInboxSubScheduler subScheduler,
113109
IInboxUnsubScheduler unsubScheduler,
114-
IInboxTouchScheduler touchScheduler,
115110
ITenantGCRunner tenantGCRunner) {
116111
this.inboxClient = inboxClient;
117112
this.distClient = distClient;
@@ -127,7 +122,6 @@ class InboxService extends InboxServiceGrpc.InboxServiceImplBase {
127122
this.deleteScheduler = deleteScheduler;
128123
this.subScheduler = subScheduler;
129124
this.unsubScheduler = unsubScheduler;
130-
this.touchScheduler = touchScheduler;
131125
this.tenantGCRunner = tenantGCRunner;
132126
}
133127

@@ -225,26 +219,6 @@ public void detach(DetachRequest request, StreamObserver<DetachReply> responseOb
225219
}), responseObserver);
226220
}
227221

228-
@Override
229-
public void touch(TouchRequest request, StreamObserver<TouchReply> responseObserver) {
230-
log.trace("Handling touch {}", request);
231-
response(tenantId -> touchScheduler.schedule(request)
232-
.exceptionally(e -> {
233-
if (e instanceof BatcherUnavailableException || e.getCause() instanceof BatcherUnavailableException) {
234-
return TouchReply.newBuilder()
235-
.setReqId(request.getReqId())
236-
.setCode(TouchReply.Code.TRY_LATER)
237-
.build();
238-
} else {
239-
log.debug("Failed to touch inbox", e);
240-
return TouchReply.newBuilder()
241-
.setReqId(request.getReqId())
242-
.setCode(TouchReply.Code.ERROR)
243-
.build();
244-
}
245-
}), responseObserver);
246-
}
247-
248222
@Override
249223
public void sub(SubRequest request, StreamObserver<SubReply> responseObserver) {
250224
log.trace("Handling sub {}", request);
@@ -612,7 +586,6 @@ public void stop() {
612586
detachScheduler.close();
613587
deleteScheduler.close();
614588
createScheduler.close();
615-
touchScheduler.close();
616589
subScheduler.close();
617590
unsubScheduler.close();
618591

bifromq-inbox/bifromq-inbox-server/src/main/java/com/baidu/bifromq/inbox/server/scheduler/BatchCommitCall.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.baidu.bifromq.inbox.rpc.proto.CommitRequest;
2929
import com.baidu.bifromq.inbox.storage.proto.BatchCommitRequest;
3030
import com.baidu.bifromq.inbox.storage.proto.InboxServiceRWCoProcInput;
31-
import com.baidu.bifromq.inbox.storage.proto.Replica;
3231
import java.util.HashSet;
3332
import java.util.Queue;
3433
import java.util.Set;
@@ -46,11 +45,7 @@ protected MutationCallTaskBatch<CommitRequest, CommitReply> newBatch(long ver) {
4645
@Override
4746
protected RWCoProcInput makeBatch(
4847
Iterable<ICallTask<CommitRequest, CommitReply, MutationCallBatcherKey>> callTasks) {
49-
BatchCommitRequest.Builder reqBuilder = BatchCommitRequest.newBuilder()
50-
.setLeader(Replica.newBuilder()
51-
.setRangeId(batcherKey.id)
52-
.setStoreId(batcherKey.leaderStoreId)
53-
.build());
48+
BatchCommitRequest.Builder reqBuilder = BatchCommitRequest.newBuilder();
5449
callTasks.forEach(call -> {
5550
CommitRequest req = call.call();
5651
BatchCommitRequest.Params.Builder paramsBuilder = BatchCommitRequest.Params.newBuilder()

bifromq-inbox/bifromq-inbox-server/src/main/java/com/baidu/bifromq/inbox/server/scheduler/BatchCreateCall.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.baidu.bifromq.inbox.rpc.proto.CreateRequest;
2929
import com.baidu.bifromq.inbox.storage.proto.BatchCreateRequest;
3030
import com.baidu.bifromq.inbox.storage.proto.InboxServiceRWCoProcInput;
31-
import com.baidu.bifromq.inbox.storage.proto.Replica;
3231
import com.baidu.bifromq.type.ClientInfo;
3332
import java.util.HashSet;
3433
import java.util.Queue;
@@ -48,11 +47,7 @@ protected MutationCallTaskBatch<CreateRequest, CreateReply> newBatch(long ver) {
4847
@Override
4948
protected RWCoProcInput makeBatch(
5049
Iterable<ICallTask<CreateRequest, CreateReply, MutationCallBatcherKey>> callTasks) {
51-
BatchCreateRequest.Builder reqBuilder = BatchCreateRequest.newBuilder()
52-
.setLeader(Replica.newBuilder()
53-
.setRangeId(batcherKey.id)
54-
.setStoreId(batcherKey.leaderStoreId)
55-
.build());
50+
BatchCreateRequest.Builder reqBuilder = BatchCreateRequest.newBuilder();
5651
callTasks.forEach(call -> {
5752
CreateRequest request = call.call();
5853
ClientInfo client = request.getClient();

0 commit comments

Comments
 (0)