Skip to content

Commit 53eacd5

Browse files
authored
feat: support reliable triple stream (#15712)
* feat: support reliable triple stream * code format * code format * fix log error * fix log error * fix log error * fix ci * fix ci
1 parent 9355b35 commit 53eacd5

28 files changed

+6897
-77
lines changed

dubbo-common/src/main/java/org/apache/dubbo/common/utils/NetUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,12 @@ public static String getIpByHost(String hostName) {
647647
}
648648

649649
public static String toAddressString(InetSocketAddress address) {
650-
return address.getAddress().getHostAddress() + ":" + address.getPort();
650+
if (address == null) {
651+
return null;
652+
}
653+
InetAddress inetAddress = address.getAddress();
654+
String host = inetAddress != null ? inetAddress.getHostAddress() : address.getHostString();
655+
return host + ":" + address.getPort();
651656
}
652657

653658
public static InetSocketAddress toAddress(String address) {

dubbo-common/src/test/java/org/apache/dubbo/common/utils/NetUtilsTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ void testToAddressString() {
177177
assertThat(NetUtils.toAddressString(socketAddress), equalTo("dubbo:1234"));
178178
}
179179

180+
@Test
181+
void testToAddressStringWhenUnresolved() {
182+
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved("dubbo-unresolved", 4321);
183+
assertThat(NetUtils.toAddressString(socketAddress), equalTo("dubbo-unresolved:4321"));
184+
}
185+
180186
@Test
181187
void testToAddress() {
182188
InetSocketAddress address = NetUtils.toAddress("localhost:1234");

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.dubbo.rpc.support.RpcUtils;
6262

6363
import java.util.Arrays;
64+
import java.util.Map;
6465
import java.util.Objects;
6566
import java.util.Set;
6667
import java.util.concurrent.CompletableFuture;
@@ -337,6 +338,16 @@ RequestMetadata createRequest(MethodDescriptor methodDescriptor, Invocation invo
337338
application = (String) invocation.getObjectAttachmentWithoutConvert(CommonConstants.REMOTE_APPLICATION_KEY);
338339
}
339340
meta.application = application;
341+
342+
// Add reliability negotiation for streaming calls (disabled by default for compatibility)
343+
boolean reliabilityEnabled = getUrl().getParameter("stream.reliability.enabled", false);
344+
if (reliabilityEnabled && methodDescriptor.getRpcType() != UNARY) {
345+
Map<String, Object> attachments = invocation.getObjectAttachments();
346+
attachments.put("tri-reliable-version", "1.0");
347+
attachments.put("tri-session-id", java.util.UUID.randomUUID().toString());
348+
attachments.put("tri-stream-cap", "ack,heartbeat,retry");
349+
}
350+
340351
meta.attachments = invocation.getObjectAttachments();
341352
return meta;
342353
}

dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java

Lines changed: 141 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_SERIALIZE_TRIPLE;
4242
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_STREAM_LISTENER;
4343

44-
public class TripleClientCall implements ClientCall, ClientStream.Listener {
44+
public class TripleClientCall
45+
implements ClientCall, ClientStream.Listener, org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext {
4546
private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(TripleClientCall.class);
4647
private final AbstractConnectionClient connectionClient;
4748
private final Executor executor;
@@ -253,6 +254,19 @@ public StreamObserver<Object> start(RequestMetadata metadata, ClientCall.Listene
253254
this.requestMetadata = metadata;
254255
this.listener = responseListener;
255256
this.stream = stream;
257+
258+
// Initialize reliability if the stream supports it
259+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.AbstractTripleClientStream) {
260+
org.apache.dubbo.rpc.protocol.tri.stream.AbstractTripleClientStream reliableStream =
261+
(org.apache.dubbo.rpc.protocol.tri.stream.AbstractTripleClientStream) stream;
262+
reliableStream.initializeReliability(metadata, connectionClient.getUrl());
263+
264+
// Set reconnection manager for real reconnection capability
265+
// Use the existing transport listener to maintain consistency across reconnection
266+
reliableStream.setReconnectionManager(new TripleReconnectionManager(
267+
connectionClient, this, reliableStream.getTransportListener()));
268+
}
269+
256270
return new ClientCallToObserverAdapter<>(this);
257271
}
258272
}
@@ -268,4 +282,130 @@ public boolean isAutoRequest() {
268282
public void setAutoRequest(boolean autoRequest) {
269283
this.autoRequest = autoRequest;
270284
}
285+
286+
// ========== ReliabilityContext Implementation ==========
287+
288+
@Override
289+
public String getState() {
290+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
291+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getState();
292+
}
293+
return "DISABLED";
294+
}
295+
296+
@Override
297+
public long getLastAckedSeq() {
298+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
299+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getLastAckedSeq();
300+
}
301+
return -1;
302+
}
303+
304+
@Override
305+
public int getPendingCount() {
306+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
307+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getPendingCount();
308+
}
309+
return 0;
310+
}
311+
312+
@Override
313+
public int getInFlightCount() {
314+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
315+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getInFlightCount();
316+
}
317+
return 0;
318+
}
319+
320+
@Override
321+
public String getSessionId() {
322+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
323+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getSessionId();
324+
}
325+
return null;
326+
}
327+
328+
@Override
329+
public long getTotalSentCount() {
330+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
331+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getTotalSentCount();
332+
}
333+
return 0;
334+
}
335+
336+
@Override
337+
public long getTotalRetryCount() {
338+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
339+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getTotalRetryCount();
340+
}
341+
return 0;
342+
}
343+
344+
@Override
345+
public boolean isConnectionActive() {
346+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
347+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).isConnectionActive();
348+
}
349+
return false;
350+
}
351+
352+
@Override
353+
public java.util.Map<String, Object> getStatistics() {
354+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
355+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getStatistics();
356+
}
357+
return new java.util.HashMap<>();
358+
}
359+
360+
@Override
361+
public void onStateChange(java.util.function.Consumer<String> callback) {
362+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
363+
((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).onStateChange(callback);
364+
}
365+
}
366+
367+
@Override
368+
public void onRecovery(java.util.function.Consumer<String> callback) {
369+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
370+
((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).onRecovery(callback);
371+
}
372+
}
373+
374+
@Override
375+
public void onRetry(java.util.function.Consumer<Long> callback) {
376+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
377+
((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).onRetry(callback);
378+
}
379+
}
380+
381+
@Override
382+
public boolean retryMessage(long sequence) {
383+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
384+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).retryMessage(sequence);
385+
}
386+
return false;
387+
}
388+
389+
@Override
390+
public boolean triggerRecovery() {
391+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
392+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).triggerRecovery();
393+
}
394+
return false;
395+
}
396+
397+
@Override
398+
public void setInFlightLimit(int limit) {
399+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
400+
((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).setInFlightLimit(limit);
401+
}
402+
}
403+
404+
@Override
405+
public org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityConfig getRetryConfig() {
406+
if (stream instanceof org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) {
407+
return ((org.apache.dubbo.rpc.protocol.tri.stream.ReliabilityContext) stream).getRetryConfig();
408+
}
409+
return null;
410+
}
271411
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.dubbo.rpc.protocol.tri.call;
18+
19+
import org.apache.dubbo.common.constants.LoggerCodeConstants;
20+
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
21+
import org.apache.dubbo.common.logger.LoggerFactory;
22+
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
23+
import org.apache.dubbo.rpc.protocol.tri.command.CreateStreamQueueCommand;
24+
import org.apache.dubbo.rpc.protocol.tri.stream.ReconnectionManager;
25+
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
26+
import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
27+
import org.apache.dubbo.rpc.protocol.tri.transport.TripleCommandOutBoundHandler;
28+
import org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
29+
30+
import java.util.concurrent.CompletableFuture;
31+
32+
import io.netty.channel.Channel;
33+
import io.netty.channel.ChannelHandlerContext;
34+
import io.netty.channel.ChannelInboundHandlerAdapter;
35+
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
36+
37+
/**
38+
* Implementation of ReconnectionManager that uses AbstractConnectionClient for reconnection.
39+
* This provides real reconnection capability by leveraging existing connection management.
40+
*/
41+
public class TripleReconnectionManager implements ReconnectionManager {
42+
43+
private static final ErrorTypeAwareLogger LOGGER =
44+
LoggerFactory.getErrorTypeAwareLogger(TripleReconnectionManager.class);
45+
46+
private final AbstractConnectionClient connectionClient;
47+
private final TripleClientCall clientCall;
48+
private final H2TransportListener transportListener;
49+
50+
public TripleReconnectionManager(
51+
AbstractConnectionClient connectionClient,
52+
TripleClientCall clientCall,
53+
H2TransportListener transportListener) {
54+
this.connectionClient = connectionClient;
55+
this.clientCall = clientCall;
56+
this.transportListener = transportListener;
57+
}
58+
59+
@Override
60+
public CompletableFuture<Boolean> attemptReconnection() {
61+
return CompletableFuture.supplyAsync(() -> {
62+
try {
63+
LOGGER.info("Attempting reconnection using connection client: {}", connectionClient);
64+
65+
// Force get a new channel, which will trigger reconnection if needed
66+
Object channel = connectionClient.getChannel(true);
67+
68+
boolean success = (channel != null) && isConnectionActive();
69+
70+
if (success) {
71+
LOGGER.info("Reconnection successful for connection client: {}", connectionClient);
72+
} else {
73+
LOGGER.warn(
74+
LoggerCodeConstants.INTERNAL_ERROR,
75+
"",
76+
"",
77+
"Reconnection failed for connection client: " + connectionClient);
78+
}
79+
80+
return success;
81+
} catch (Exception e) {
82+
LOGGER.error(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Error during reconnection attempt", e);
83+
return false;
84+
}
85+
});
86+
}
87+
88+
@Override
89+
public boolean isConnectionActive() {
90+
try {
91+
return connectionClient.isConnected();
92+
} catch (Exception e) {
93+
LOGGER.debug("Error checking connection status", e);
94+
return false;
95+
}
96+
}
97+
98+
@Override
99+
public Object getNewStreamChannel() {
100+
try {
101+
// Get the active channel, this should be the new channel after reconnection
102+
return connectionClient.getChannel(true);
103+
} catch (Exception e) {
104+
LOGGER.error(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Error getting new stream channel", e);
105+
return null;
106+
}
107+
}
108+
109+
@Override
110+
public TripleStreamChannelFuture createStreamFuture() {
111+
try {
112+
LOGGER.info("Creating new TripleStreamChannelFuture for reconnection");
113+
114+
// Force get a new channel, which will trigger reconnection if needed
115+
Object channel = connectionClient.getChannel(true);
116+
117+
if (channel != null && isConnectionActive()) {
118+
// Create a new Http2StreamChannel using the same mechanism as initial connection
119+
Channel parentChannel = (Channel) channel;
120+
121+
// Create Http2StreamChannelBootstrap to open a new stream channel
122+
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parentChannel);
123+
124+
// Set up the handler to configure the pipeline when the stream channel is created
125+
bootstrap.handler(new ChannelInboundHandlerAdapter() {
126+
@Override
127+
public void handlerAdded(ChannelHandlerContext ctx) {
128+
ctx.channel()
129+
.pipeline()
130+
.addLast(new TripleCommandOutBoundHandler())
131+
.addLast(new TripleHttp2ClientResponseHandler(transportListener));
132+
}
133+
});
134+
135+
// Create the future that will be completed when the stream channel is ready
136+
TripleStreamChannelFuture streamChannelFuture = new TripleStreamChannelFuture(parentChannel);
137+
138+
// Use CreateStreamQueueCommand to create the actual Http2StreamChannel
139+
CreateStreamQueueCommand createCommand =
140+
CreateStreamQueueCommand.create(bootstrap, streamChannelFuture);
141+
142+
// IMPROVED: Execute the creation command and add better error handling
143+
if (parentChannel.eventLoop().inEventLoop()) {
144+
// If we're already in the event loop, execute directly
145+
createCommand.run(parentChannel);
146+
} else {
147+
// Execute in the channel's event loop to ensure thread safety
148+
parentChannel.eventLoop().execute(() -> createCommand.run(parentChannel));
149+
}
150+
151+
// Return the future - caller is now responsible for waiting for completion
152+
LOGGER.info("Started async creation of new Http2StreamChannel for reconnection, "
153+
+ "future will be completed when stream is ready");
154+
return streamChannelFuture;
155+
156+
} else {
157+
LOGGER.warn(
158+
LoggerCodeConstants.INTERNAL_ERROR,
159+
"",
160+
"",
161+
"Failed to create TripleStreamChannelFuture - no active channel available");
162+
return null;
163+
}
164+
} catch (Exception e) {
165+
LOGGER.error(
166+
LoggerCodeConstants.INTERNAL_ERROR,
167+
"",
168+
"",
169+
"Error creating new TripleStreamChannelFuture for reconnection",
170+
e);
171+
return null;
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)