Skip to content

Commit 78c4220

Browse files
committed
Code de-dup: NettyUtils
1 parent 0d90e5f commit 78c4220

File tree

13 files changed

+222
-239
lines changed

13 files changed

+222
-239
lines changed

base-cluster/src/main/java/com/baidu/bifromq/basecluster/transport/NettyUtil.java

Lines changed: 0 additions & 79 deletions
This file was deleted.

base-cluster/src/main/java/com/baidu/bifromq/basecluster/transport/TCPTransport.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.baidu.bifromq.basecluster.transport;
1515

1616
import com.baidu.bifromq.basecluster.transport.proto.Packet;
17+
import com.baidu.bifromq.baseenv.NettyEnv;
1718
import com.baidu.bifromq.basehlc.HLC;
1819
import com.github.benmanes.caffeine.cache.Caffeine;
1920
import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -77,6 +78,7 @@ public final class TCPTransport extends AbstractTransport {
7778
private final AtomicInteger nextChannelKey = new AtomicInteger(0);
7879
private final Bootstrap clientBootstrap;
7980
private final ChannelFuture tcpListeningChannel;
81+
8082
@Builder
8183
TCPTransport(@NonNull String env, InetSocketAddress bindAddr, SslContext serverSslContext,
8284
SslContext clientSslContext, TCPTransportOptions opts) {
@@ -87,7 +89,7 @@ public final class TCPTransport extends AbstractTransport {
8789
"maxBufferSizeInBytes must be a positive number");
8890
Preconditions.checkArgument(opts.maxChannelsPerHost > 0, "maxChannelsPerHost must be a positive number");
8991
this.opts = opts.toBuilder().build();
90-
elg = NettyUtil.getEventLoopGroup(4, "cluster-tcp-transport");
92+
elg = NettyEnv.createEventLoopGroup(4, "cluster-tcp-transport");
9193
clientBootstrap = setupTcpClient(clientSslContext);
9294
tcpListeningChannel = setupTcpServer(bindAddr, serverSslContext);
9395
InetSocketAddress localAddress = (InetSocketAddress) tcpListeningChannel.channel().localAddress();
@@ -117,7 +119,7 @@ private static void trace(String format, Object... args) {
117119
private Bootstrap setupTcpClient(SslContext sslContext) {
118120
Bootstrap clientBootstrap = new Bootstrap();
119121
clientBootstrap.group(elg)
120-
.channel(NettyUtil.getSocketChannelClass())
122+
.channel(NettyEnv.getSocketChannelClass())
121123
.option(ChannelOption.TCP_NODELAY, true)
122124
.option(ChannelOption.SO_KEEPALIVE, true)
123125
.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
@@ -145,7 +147,7 @@ protected void initChannel(SocketChannel channel) {
145147
private ChannelFuture setupTcpServer(InetSocketAddress serverAddr, SslContext sslContext) {
146148
ServerBootstrap serverBootstrap = new ServerBootstrap();
147149
return serverBootstrap.group(elg)
148-
.channel(NettyUtil.getServerSocketChannelClass())
150+
.channel(NettyEnv.getServerSocketChannelClass())
149151
.childOption(ChannelOption.TCP_NODELAY, true)
150152
.childOption(ChannelOption.SO_KEEPALIVE, true)
151153
.localAddress(serverAddr)

base-cluster/src/main/java/com/baidu/bifromq/basecluster/transport/UDPTransport.java

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313

1414
package com.baidu.bifromq.basecluster.transport;
1515

16-
import static com.baidu.bifromq.basecluster.transport.NettyUtil.getDatagramChannelClass;
17-
import static com.baidu.bifromq.basecluster.transport.NettyUtil.getEventLoopGroup;
18-
1916
import com.baidu.bifromq.basecluster.transport.proto.Packet;
17+
import com.baidu.bifromq.baseenv.NettyEnv;
2018
import com.baidu.bifromq.basehlc.HLC;
2119
import com.google.protobuf.InvalidProtocolBufferException;
2220
import io.micrometer.core.instrument.Counter;
@@ -48,46 +46,23 @@
4846

4947
@Slf4j
5048
public final class UDPTransport extends AbstractTransport {
51-
@ChannelHandler.Sharable
52-
private class Bridger extends SimpleChannelInboundHandler<DatagramPacket> {
53-
54-
@Override
55-
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) {
56-
// ctx.channel().remoteAddress() is null when DatagramChannel is not in 'connected' mode
57-
recvBytes.increment(dp.content().readableBytes());
58-
try {
59-
byte[] data = new byte[dp.content().readableBytes()];
60-
dp.content().readBytes(data);
61-
Packet packet = Packet.parseFrom(data);
62-
transportLatency.record(HLC.INST.getPhysical(packet.getHlc() - HLC.INST.get()));
63-
doReceive(packet, dp.sender(), dp.recipient());
64-
} catch (InvalidProtocolBufferException e) {
65-
log.error("Unable to decode packet, just ignore");
66-
}
67-
}
68-
}
69-
7049
private final Counter sendBytes;
7150
private final Counter recvBytes;
7251
private final DistributionSummary transportLatency;
73-
7452
private final EventLoopGroup elg;
75-
7653
private final Channel channel;
77-
7854
private final Bridger bridger;
7955
private final InetSocketAddress localAddress;
8056

81-
8257
@Builder
8358
UDPTransport(@NonNull String env, InetSocketAddress bindAddr) {
8459
super(env);
8560
try {
8661
bridger = new Bridger();
87-
elg = getEventLoopGroup(4, "cluster-udp-transport");
62+
elg = NettyEnv.createEventLoopGroup(4, "cluster-udp-transport");
8863
Bootstrap bootstrap = new Bootstrap();
8964
channel = bootstrap.group(elg)
90-
.channel(getDatagramChannelClass())
65+
.channel(NettyEnv.getDatagramChannelClass())
9166
.localAddress(bindAddr)
9267
.handler(new ChannelInitializer<DatagramChannel>() {
9368
@Override
@@ -159,4 +134,23 @@ protected Completable doShutdown() {
159134
.subscribe(doneSignal::onComplete);
160135
return doneSignal;
161136
}
137+
138+
@ChannelHandler.Sharable
139+
private class Bridger extends SimpleChannelInboundHandler<DatagramPacket> {
140+
141+
@Override
142+
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) {
143+
// ctx.channel().remoteAddress() is null when DatagramChannel is not in 'connected' mode
144+
recvBytes.increment(dp.content().readableBytes());
145+
try {
146+
byte[] data = new byte[dp.content().readableBytes()];
147+
dp.content().readBytes(data);
148+
Packet packet = Packet.parseFrom(data);
149+
transportLatency.record(HLC.INST.getPhysical(packet.getHlc() - HLC.INST.get()));
150+
doReceive(packet, dp.sender(), dp.recipient());
151+
} catch (InvalidProtocolBufferException e) {
152+
log.error("Unable to decode packet, just ignore");
153+
}
154+
}
155+
}
162156
}

base-env/base-env-provider/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,14 @@
3636
<groupId>io.netty</groupId>
3737
<artifactId>netty-buffer</artifactId>
3838
</dependency>
39+
<dependency>
40+
<groupId>io.netty</groupId>
41+
<artifactId>netty-transport-native-epoll</artifactId>
42+
</dependency>
43+
<dependency>
44+
<groupId>io.netty</groupId>
45+
<artifactId>netty-transport-native-kqueue</artifactId>
46+
</dependency>
3947
<dependency>
4048
<groupId>com.google.guava</groupId>
4149
<artifactId>guava</artifactId>
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright (c) 2023. The BifroMQ Authors. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* Unless required by applicable law or agreed to in writing,
9+
* software distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package com.baidu.bifromq.baseenv;
15+
16+
import io.netty.channel.EventLoopGroup;
17+
import io.netty.channel.epoll.Epoll;
18+
import io.netty.channel.epoll.EpollDatagramChannel;
19+
import io.netty.channel.epoll.EpollEventLoopGroup;
20+
import io.netty.channel.epoll.EpollServerSocketChannel;
21+
import io.netty.channel.epoll.EpollSocketChannel;
22+
import io.netty.channel.kqueue.KQueue;
23+
import io.netty.channel.kqueue.KQueueDatagramChannel;
24+
import io.netty.channel.kqueue.KQueueEventLoopGroup;
25+
import io.netty.channel.kqueue.KQueueServerSocketChannel;
26+
import io.netty.channel.kqueue.KQueueSocketChannel;
27+
import io.netty.channel.nio.NioEventLoopGroup;
28+
import io.netty.channel.socket.DatagramChannel;
29+
import io.netty.channel.socket.ServerSocketChannel;
30+
import io.netty.channel.socket.SocketChannel;
31+
import io.netty.channel.socket.nio.NioDatagramChannel;
32+
import io.netty.channel.socket.nio.NioServerSocketChannel;
33+
import io.netty.channel.socket.nio.NioSocketChannel;
34+
35+
/**
36+
* NettyEnv is a utility class that provides methods to create EventLoopGroup and determine the appropriate channel
37+
* classes.
38+
*/
39+
public class NettyEnv {
40+
/**
41+
* Create an EventLoopGroup based on the availability of Epoll or KQueue.
42+
*
43+
* @return An EventLoopGroup instance.
44+
*/
45+
public static EventLoopGroup createEventLoopGroup(String name) {
46+
IEnvProvider envProvider = EnvProvider.INSTANCE;
47+
if (Epoll.isAvailable()) {
48+
return new EpollEventLoopGroup(envProvider.newThreadFactory(name));
49+
}
50+
if (KQueue.isAvailable()) {
51+
return new KQueueEventLoopGroup(envProvider.newThreadFactory(name));
52+
}
53+
return new NioEventLoopGroup();
54+
}
55+
56+
/**
57+
* Create an EventLoopGroup with a specified number of threads and a custom name.
58+
*
59+
* @param nThreads The number of threads in the EventLoopGroup.
60+
* @param name The name to use for the threads.
61+
*
62+
* @return An EventLoopGroup instance.
63+
*/
64+
public static EventLoopGroup createEventLoopGroup(int nThreads, String name) {
65+
IEnvProvider envProvider = EnvProvider.INSTANCE;
66+
if (Epoll.isAvailable()) {
67+
return new EpollEventLoopGroup(nThreads, envProvider.newThreadFactory(name));
68+
}
69+
if (KQueue.isAvailable()) {
70+
return new KQueueEventLoopGroup(nThreads, envProvider.newThreadFactory(name));
71+
}
72+
return new NioEventLoopGroup(nThreads, envProvider.newThreadFactory(name));
73+
}
74+
75+
/**
76+
* Get the appropriate SocketChannel class based on the availability of Epoll or KQueue.
77+
*
78+
* @return The SocketChannel class.
79+
*/
80+
public static Class<? extends SocketChannel> getSocketChannelClass() {
81+
if (Epoll.isAvailable()) {
82+
return EpollSocketChannel.class;
83+
}
84+
if (KQueue.isAvailable()) {
85+
return KQueueSocketChannel.class;
86+
}
87+
return NioSocketChannel.class;
88+
}
89+
90+
/**
91+
* Get the appropriate ServerSocketChannel class based on the availability of Epoll or KQueue.
92+
*
93+
* @return The ServerSocketChannel class.
94+
*/
95+
public static Class<? extends ServerSocketChannel> getServerSocketChannelClass() {
96+
if (Epoll.isAvailable()) {
97+
return EpollServerSocketChannel.class;
98+
}
99+
if (KQueue.isAvailable()) {
100+
return KQueueServerSocketChannel.class;
101+
}
102+
return NioServerSocketChannel.class;
103+
}
104+
105+
/**
106+
* Determine the appropriate SocketChannel class based on the provided EventLoopGroup.
107+
*
108+
* @param eventLoopGroup The EventLoopGroup to check.
109+
*
110+
* @return The SocketChannel class.
111+
*/
112+
public static Class<? extends SocketChannel> determineSocketChannelClass(EventLoopGroup eventLoopGroup) {
113+
if (eventLoopGroup instanceof EpollEventLoopGroup) {
114+
return EpollSocketChannel.class;
115+
}
116+
if (eventLoopGroup instanceof KQueueEventLoopGroup) {
117+
return KQueueSocketChannel.class;
118+
}
119+
return NioSocketChannel.class;
120+
}
121+
122+
/**
123+
* Determine the appropriate ServerSocketChannel class based on the provided EventLoopGroup.
124+
*
125+
* @param eventLoopGroup The EventLoopGroup to check.
126+
* @return The ServerSocketChannel class.
127+
*/
128+
public static Class<? extends ServerSocketChannel> determineServerSocketChannelClass(
129+
EventLoopGroup eventLoopGroup) {
130+
if (eventLoopGroup instanceof EpollEventLoopGroup) {
131+
return EpollServerSocketChannel.class;
132+
}
133+
if (eventLoopGroup instanceof KQueueEventLoopGroup) {
134+
return KQueueServerSocketChannel.class;
135+
}
136+
return NioServerSocketChannel.class;
137+
}
138+
139+
/**
140+
* Get the appropriate DatagramChannel class based on the availability of Epoll or KQueue.
141+
*
142+
* @return The DatagramChannel class.
143+
*/
144+
public static Class<? extends DatagramChannel> getDatagramChannelClass() {
145+
if (Epoll.isAvailable()) {
146+
return EpollDatagramChannel.class;
147+
}
148+
if (KQueue.isAvailable()) {
149+
return KQueueDatagramChannel.class;
150+
}
151+
return NioDatagramChannel.class;
152+
}
153+
}

base-rpc/base-rpc-client/src/main/java/com/baidu/bifromq/baserpc/client/ClientChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package com.baidu.bifromq.baserpc.client;
1515

16-
import static com.baidu.bifromq.baserpc.utils.NettyUtil.determineSocketChannelClass;
16+
import static com.baidu.bifromq.baseenv.NettyEnv.determineSocketChannelClass;
1717

1818
import com.baidu.bifromq.baseenv.EnvProvider;
1919
import com.baidu.bifromq.baserpc.BluePrint;

0 commit comments

Comments
 (0)