diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/Client.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/Client.java index c83ad281..f39ca4cc 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/Client.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/Client.java @@ -5,26 +5,41 @@ import com.crossoverjie.cim.common.pojo.CIMUserInfo; import com.crossoverjie.cim.route.api.vo.req.P2PReqVO; import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO; + import java.io.Closeable; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; public interface Client extends Closeable { + static ClientBuilder builder(ClientConfigurationData conf) { + Objects.requireNonNull(conf, "ClientConfigurationData must not be null"); + return new ClientBuilderImpl(conf); + } + static ClientBuilder builder() { return new ClientBuilderImpl(); } - default void sendP2P(P2PReqVO p2PReqVO) throws Exception{ + String checkHost(); + + Integer checkPort(); + + default void sendP2P(P2PReqVO p2PReqVO) throws Exception { sendP2PAsync(p2PReqVO).get(); - }; + } + + ; CompletableFuture sendP2PAsync(P2PReqVO p2PReqVO); - default void sendGroup(String msg) throws Exception{ + default void sendGroup(String msg) throws Exception { sendGroupAsync(msg).get(); - }; + } + + ; CompletableFuture sendGroupAsync(String msg); diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientBuilderImpl.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientBuilderImpl.java index c86c0419..3ffe45e5 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientBuilderImpl.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientBuilderImpl.java @@ -7,9 +7,11 @@ import com.crossoverjie.cim.client.sdk.io.ReconnectCheck; import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy; import com.crossoverjie.cim.common.util.StringUtil; -import java.util.concurrent.ThreadPoolExecutor; import okhttp3.OkHttpClient; +import java.util.Objects; +import java.util.concurrent.ThreadPoolExecutor; + public class ClientBuilderImpl implements ClientBuilder { @@ -18,10 +20,12 @@ public class ClientBuilderImpl implements ClientBuilder { public ClientBuilderImpl() { this(new ClientConfigurationData()); } + public ClientBuilderImpl(ClientConfigurationData conf) { + Objects.requireNonNull(conf, "ClientConfigurationData must not be null"); this.conf = conf; } - + @Override public Client build() { return new ClientImpl(conf); @@ -29,7 +33,7 @@ public Client build() { @Override public ClientBuilder auth(ClientConfigurationData.Auth auth) { - if (auth.getUserId() <= 0 || StringUtil.isEmpty(auth.getUserName())){ + if (auth.getUserId() <= 0 || StringUtil.isEmpty(auth.getUserName())) { throw new IllegalArgumentException("userId and userName must be set"); } this.conf.setAuth(auth); diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java index 927b7bf4..dbb1d929 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientConfigurationData.java @@ -1,33 +1,45 @@ package com.crossoverjie.cim.client.sdk.impl; import com.crossoverjie.cim.client.sdk.Event; -import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy; import com.crossoverjie.cim.client.sdk.io.MessageListener; -import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff; import com.crossoverjie.cim.client.sdk.io.ReconnectCheck; +import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy; +import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import java.util.concurrent.ThreadPoolExecutor; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import okhttp3.OkHttpClient; +import java.util.concurrent.ThreadPoolExecutor; + @Data @NoArgsConstructor @AllArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) public class ClientConfigurationData { + private boolean debug = false; + private Auth auth; + private String host; + + private Integer serverPort; + + private Integer httpPort; + @Data @AllArgsConstructor @Builder - public static class Auth{ + public static class Auth { private long userId; private String userName; + + @JsonIgnore + private String authToken; } private String routeUrl; diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java index 42d8ebd1..4d645d27 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/impl/ClientImpl.java @@ -1,12 +1,6 @@ package com.crossoverjie.cim.client.sdk.impl; -import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL; - -import com.crossoverjie.cim.client.sdk.Client; -import com.crossoverjie.cim.client.sdk.ClientState; -import com.crossoverjie.cim.client.sdk.FetchOfflineMsgJob; -import com.crossoverjie.cim.client.sdk.ReConnectManager; -import com.crossoverjie.cim.client.sdk.RouteManager; +import com.crossoverjie.cim.client.sdk.*; import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer; import com.crossoverjie.cim.common.data.construct.RingBufferWheel; import com.crossoverjie.cim.common.exception.CIMException; @@ -27,14 +21,18 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.*; import java.util.function.Consumer; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; + +import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL; @Slf4j public class ClientImpl extends ClientState implements Client { @@ -133,16 +131,23 @@ private void connectServer(Consumer success) { */ private CompletableFuture doConnectServer() { CompletableFuture future = new CompletableFuture<>(); - this.userLogin(future).ifPresentOrElse((cimServer) -> { - this.doConnectServer(cimServer, future); - this.loginServer(); - this.serverInfo = cimServer; - future.complete(true); - }, () -> { - this.conf.getEvent().error("Login fail!, cannot get server info!"); - this.conf.getEvent().fatal(this); - future.complete(false); - }); + this.userLogin(future) // save serverInfo after login success + .ifPresentOrElse((cimServer) -> { + if (StringUtils.isBlank(cimServer.getAuthToken())) { + future.complete(false); + this.conf.getEvent().error("Login fail!, auth token is blank!"); + this.conf.getEvent().fatal(this); + return; + } + getAuth().setAuthToken(cimServer.getAuthToken()); + this.doConnectServer(future); + this.loginServer(); + future.complete(true); + }, () -> { + this.conf.getEvent().error("Login fail!, cannot get server info!"); + this.conf.getEvent().fatal(this); + future.complete(false); + }); return future; } @@ -157,8 +162,7 @@ private Optional userLogin(CompletableFuture future) { CIMServerResVO cimServer = null; try { - cimServer = routeManager.getServer(loginReqVO); - log.info("cimServer=[{}]", cimServer); + serverInfo = cimServer = routeManager.getServer(loginReqVO); } catch (Exception e) { log.error("login fail", e); future.completeExceptionally(e); @@ -168,14 +172,21 @@ private Optional userLogin(CompletableFuture future) { private final EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work")); - private void doConnectServer(CIMServerResVO cimServer, CompletableFuture future) { + private void doConnectServer(CompletableFuture future) { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) - .handler(new CIMClientHandleInitializer()); + .handler(new CIMClientHandleInitializer(conf.isDebug(), getAuth())); ChannelFuture sync; try { - sync = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync(); + final String host = checkHost(); + final Integer port = checkPort(); + if (StringUtils.isBlank(host) || Objects.isNull(port)) { + this.conf.getEvent().error("cim server host or port is null"); + future.complete(false); + return; + } + sync = bootstrap.connect(host, port).sync(); if (sync.isSuccess()) { this.conf.getEvent().info("Start cim client success!"); channel = (SocketChannel) sync.channel(); @@ -205,6 +216,7 @@ private void loginServer() { * 2. reconnect. * 3. shutdown reconnect job. * 4. reset reconnect state. + * * @throws Exception */ public void reconnect() throws Exception { @@ -238,6 +250,25 @@ public void close() { ringBufferWheel.stop(true); } + @Override + public String checkHost() { + // 优先使用直连的方式 + final String host = StringUtils.isNoneBlank(conf.getHost()) ? conf.getHost() : serverInfo.getIp(); + if (StringUtils.isBlank(host)) { + throw new IllegalArgumentException("cim server host is null"); + } + return host; + } + + @Override + public Integer checkPort() { + final Integer port = Objects.nonNull(conf.getServerPort()) ? conf.getServerPort() : serverInfo.getCimServerPort(); + if (Objects.isNull(port)) { + throw new IllegalArgumentException("cim server port is null"); + } + return port; + } + @Override public CompletableFuture sendP2PAsync(P2PReqVO p2PReqVO) { CompletableFuture future = new CompletableFuture<>(); diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java index 7a904959..3fe7b77a 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandle.java @@ -1,23 +1,33 @@ package com.crossoverjie.cim.client.sdk.io; import com.crossoverjie.cim.client.sdk.ClientState; +import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData; import com.crossoverjie.cim.client.sdk.impl.ClientImpl; import com.crossoverjie.cim.common.constant.Constants; +import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; import com.crossoverjie.cim.common.protocol.BaseCommand; +import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.common.protocol.Response; import com.crossoverjie.cim.common.util.NettyAttrUtil; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; +import org.apache.zookeeper.common.StringUtils; + +import java.util.Objects; @ChannelHandler.Sharable @Slf4j public class CIMClientHandle extends SimpleChannelInboundHandler { + + private final ClientConfigurationData.Auth auth; + + public CIMClientHandle(ClientConfigurationData.Auth auth) { + this.auth = auth; + } + @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { @@ -41,8 +51,41 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc @Override public void channelActive(ChannelHandlerContext ctx) { - ClientImpl.getClient().getConf().getEvent().debug("ChannelActive"); - ClientImpl.getClient().setState(ClientState.State.Ready); + + // 获取认证信息 + // channelActive 执行时间过早,所以这些属性没办法放在 channel 上 + final String token = auth.getAuthToken(); + if (StringUtils.isBlank(token)) { + log.error("auth token is blank!"); + ctx.close(); + return; + } + final long userId = auth.getUserId(); + + // 连接建立之后就发送认证请求 + Request authReq = Request.newBuilder() + .setRequestId(userId) + .setCmd(BaseCommand.LOGIN_REQUEST) + .setReqMsg(token) + .build(); + + ctx.writeAndFlush(authReq).syncUninterruptibly().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + log.info("auth msg send success,userId:{},userName:{}", auth.getUserId(), auth.getUserName()); + ClientImpl.getClient().getConf().getEvent().debug("ChannelActive"); + ctx.channel().attr(ChannelAttributeKeys.USER_ID).set(userId); + log.info("channel is active,userId:{}", userId); + ClientImpl.getClient().setState(ClientState.State.Ready); + } else { + log.error("auth msg send failure,userId:{},userName:{}", auth.getUserId(), auth.getUserName()); + ctx.channel().close(); // 认证失败关闭连接 + } + } + }); + + } @Override @@ -66,8 +109,8 @@ protected void channelRead0(ChannelHandlerContext ctx, Response msg) { if (msg.getCmd() != BaseCommand.PING) { String receiveUserId = msg.getPropertiesMap().get(Constants.MetaKey.RECEIVE_USER_ID); - ClientImpl client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId)); - if (client == null) { + ClientImpl client; + if ((Objects.isNull(receiveUserId) || ((client = ClientImpl.getClientMap().get(Long.valueOf(receiveUserId))) == null))) { log.error("client not found for userId: {}", receiveUserId); return; } @@ -75,7 +118,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Response msg) { client.getConf().getCallbackThreadPool().execute(() -> { log.info("client address: {} :{}", ctx.channel().remoteAddress(), client); MessageListener messageListener = client.getConf().getMessageListener(); - if (msg.getBatchResMsgCount() >0 ){ + if (msg.getBatchResMsgCount() > 0) { for (int i = 0; i < msg.getBatchResMsgCount(); i++) { messageListener.received(client, msg.getPropertiesMap(), msg.getBatchResMsg(i)); } diff --git a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandleInitializer.java b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandleInitializer.java index fb50e4dc..11a7d8c2 100644 --- a/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandleInitializer.java +++ b/cim-client-sdk/src/main/java/com/crossoverjie/cim/client/sdk/io/CIMClientHandleInitializer.java @@ -1,29 +1,50 @@ package com.crossoverjie.cim.client.sdk.io; +import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData; +import com.crossoverjie.cim.common.handler.ChannelInboundDebugHandler; +import com.crossoverjie.cim.common.handler.ChannelOutboundDebugHandler; import com.crossoverjie.cim.common.protocol.Response; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class CIMClientHandleInitializer extends ChannelInitializer { - private final CIMClientHandle cimClientHandle = new CIMClientHandle(); + private final boolean debug; + + private final ClientConfigurationData.Auth auth; + + public CIMClientHandleInitializer(boolean debug, ClientConfigurationData.Auth auth) { + super(); + this.auth = auth; + this.debug = debug; + } @Override protected void initChannel(Channel ch) { - ch.pipeline() - .addLast(new IdleStateHandler(0, 10, 0)) + final ChannelPipeline pip = ch.pipeline(); + pip.addLast(new IdleStateHandler(0, 10, 0)); + + // decoder + if (debug) { + pip.addLast(ChannelInboundDebugHandler.INSTANCE); + } + pip.addLast(new ProtobufVarint32FrameDecoder()) + .addLast(new ProtobufDecoder(Response.getDefaultInstance())); - // google Protobuf - .addLast(new ProtobufVarint32FrameDecoder()) - .addLast(new ProtobufDecoder(Response.getDefaultInstance())) - .addLast(new ProtobufVarint32LengthFieldPrepender()) + // encoder + if (debug) { + pip.addLast(ChannelOutboundDebugHandler.INSTANCE); + } + pip.addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) - .addLast(cimClientHandle) - ; + .addLast(new CIMClientHandle(auth)); } } diff --git a/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java b/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java index 5629e192..2971cf3a 100644 --- a/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java +++ b/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/ClientTest.java @@ -2,21 +2,18 @@ import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData; import com.crossoverjie.cim.client.sdk.impl.ClientImpl; +import com.crossoverjie.cim.client.sdk.io.MessageListener; import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff; import com.crossoverjie.cim.client.sdk.route.AbstractRouteBaseTest; import com.crossoverjie.cim.common.constant.Constants; +import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; import com.crossoverjie.cim.common.pojo.CIMUserInfo; import com.crossoverjie.cim.route.api.vo.req.P2PReqVO; import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import com.crossoverjie.cim.route.constant.Constant; +import com.crossoverjie.cim.server.handle.CIMServerHandle; +import com.crossoverjie.cim.server.util.SessionSocketHolder; +import io.netty.channel.socket.nio.NioSocketChannel; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; @@ -24,6 +21,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + + @Slf4j public class ClientTest extends AbstractRouteBaseTest { @@ -33,6 +35,58 @@ public void tearDown() { super.close(); } + @Test + public void testClientAuthCanRead() throws Exception { + // 启动 ZK 和连接服务器 + super.starSingleServer(); + + // 启动转发服务 + super.startRoute(Constant.OfflineStoreMode.REDIS); + + // 转发服务地址 + String routeUrl = "http://localhost:8083"; + + // 登录第一个用户 + String cj = "crossoverJie"; + Long id = super.registerAccount(cj); + var auth1 = ClientConfigurationData.Auth.builder() + .userId(id) + .userName(cj) + .build(); + + AtomicReference recMsg = new AtomicReference<>(); + + @Cleanup + Client client1 = Client.builder() + .auth(auth1) + .routeUrl(routeUrl) // routeUrl 也可以用于登录获取连接服务器地址 + .messageListener(new MessageListener() { + @Override + public void received(Client client, Map properties, String msg) { + recMsg.set(msg); + log.info("test case listener msg:{}", msg); + } + }) + .build(); + TimeUnit.SECONDS.sleep(3); + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, client1.getState())); + Optional serverInfo = client1.getServerInfo(); + Assertions.assertTrue(serverInfo.isPresent()); + System.out.println("client1 serverInfo = " + serverInfo.get()); + + NioSocketChannel socketChannel = SessionSocketHolder.get(id); + socketChannel.attr(ChannelAttributeKeys.AUTH_RES).set(null); + + + client1.sendGroup("ssdasaaa"); // 此时没有认证状态,客户端应该收到 need auth client 的 + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> Assertions.assertEquals(CIMServerHandle.ERROR_MESSAGE, recMsg.get())); + super.stopSingle(); + client1.close(); + ; + } + @Test public void groupChat() throws Exception { super.starSingleServer(); @@ -210,7 +264,7 @@ public void testP2PChat() throws Exception { }); // client2 send batch msg to client1 - var batchMsg = List.of("a","b","c"); + var batchMsg = List.of("a", "b", "c"); client2.sendP2P(P2PReqVO.builder() .receiveUserId(cjId) .batchMsg(batchMsg) @@ -241,9 +295,13 @@ public void testP2PChat() throws Exception { */ @Test public void testReconnect() throws Exception { + // 启动两个连接服务器 super.startTwoServer(); + + // 启动路由服务器 super.startRoute(Constant.OfflineStoreMode.REDIS); + // 注册两个账号 String routeUrl = "http://localhost:8083"; String cj = "cj"; String zs = "zs"; @@ -259,6 +317,7 @@ public void testReconnect() throws Exception { .build(); var backoffStrategy = new RandomBackoff(); + // 建立两个客户端 @Cleanup Client client1 = Client.builder() .auth(auth1) @@ -276,10 +335,18 @@ public void testReconnect() throws Exception { Client client2 = Client.builder() .auth(auth2) .routeUrl(routeUrl) - .messageListener((client, properties, message) -> client2Receive.set(message)) + .messageListener(new MessageListener() { + @Override + public void received(Client client, Map properties, String msg) { + System.out.println("|| ================ 收到消息:" + msg); + client2Receive.set(msg); + } + }) .backoffStrategy(backoffStrategy) .build(); TimeUnit.SECONDS.sleep(3); + + // 两个客户端连接成功 ClientState.State state2 = client2.getState(); Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, state2)); @@ -288,25 +355,25 @@ public void testReconnect() throws Exception { Assertions.assertTrue(serverInfo2.isPresent()); System.out.println("client2 serverInfo = " + serverInfo2.get()); - // send msg + // 发送消息并验证 String msg = "hello"; client1.sendGroup(msg); Awaitility.await() .untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get())); client2Receive.set(""); - - System.out.println("ready to restart server"); TimeUnit.SECONDS.sleep(3); Optional serverInfo = client1.getServerInfo(); Assertions.assertTrue(serverInfo.isPresent()); System.out.println("server info = " + serverInfo.get()); + // 关闭连接的服务 super.stopServer(serverInfo.get().getCimServerPort()); System.out.println("stop server success! " + serverInfo.get()); // Waiting server stopped, and client reconnect. + // 应该会重连到另外一个服务 TimeUnit.SECONDS.sleep(30); System.out.println("reconnect state: " + client1.getState()); Awaitility.await().atMost(15, TimeUnit.SECONDS) diff --git a/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/OfflineMsgTest.java b/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/OfflineMsgTest.java index a19fa3d0..a95d259f 100644 --- a/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/OfflineMsgTest.java +++ b/cim-client-sdk/src/test/java/com/crossoverjie/cim/client/sdk/OfflineMsgTest.java @@ -9,7 +9,8 @@ import com.crossoverjie.cim.route.constant.Constant; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Optional; @@ -122,9 +123,9 @@ public void testP2POfflineChatRedis() throws Exception { }) .build(); - ClientState.State client3AgainState = client3.getState(); + Client finalClient = client3; Awaitility.await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, client3AgainState)); + .untilAsserted(() -> Assertions.assertEquals(ClientState.State.Ready, finalClient.getState())); // check offline message diff --git a/cim-client/pom.xml b/cim-client/pom.xml index 9e628ddb..5f37d73c 100644 --- a/cim-client/pom.xml +++ b/cim-client/pom.xml @@ -20,8 +20,10 @@ - - + + org.springframework.boot + spring-boot-starter-logging + com.google.protobuf protobuf-java diff --git a/cim-client/src/main/java/com/crossoverjie/cim/client/CIMClientApplication.java b/cim-client/src/main/java/com/crossoverjie/cim/client/CIMClientApplication.java index 092b95ca..31477cd3 100644 --- a/cim-client/src/main/java/com/crossoverjie/cim/client/CIMClientApplication.java +++ b/cim-client/src/main/java/com/crossoverjie/cim/client/CIMClientApplication.java @@ -11,19 +11,18 @@ */ @Slf4j @SpringBootApplication -public class CIMClientApplication implements CommandLineRunner{ +public class CIMClientApplication implements CommandLineRunner { - public static void main(String[] args) { + public static void main(String[] args) { SpringApplication.run(CIMClientApplication.class, args); - log.info("Client start success"); - } + } - @Override - public void run(String... args) { - Scan scan = new Scan() ; - Thread thread = new Thread(scan); - thread.setName("scan-thread"); - thread.start(); - } + @Override + public void run(String... args) { + Scan scan = new Scan(); + Thread thread = new Thread(scan); + thread.setName("scan-thread"); + thread.start(); + } } \ No newline at end of file diff --git a/cim-client/src/main/java/com/crossoverjie/cim/client/config/AppConfiguration.java b/cim-client/src/main/java/com/crossoverjie/cim/client/config/AppConfiguration.java index a0ffad31..74494cdb 100644 --- a/cim-client/src/main/java/com/crossoverjie/cim/client/config/AppConfiguration.java +++ b/cim-client/src/main/java/com/crossoverjie/cim/client/config/AppConfiguration.java @@ -8,32 +8,34 @@ * Function: * * @author crossoverJie - * Date: 2018/8/24 01:43 + * Date: 2018/8/24 01:43 * @since JDK 1.8 */ @Component @Data public class AppConfiguration { - @Value("${cim.user.id}") + @Value("${cim.user.id:1}") private Long userId; - @Value("${cim.user.userName}") + @Value("${cim.user.userName:test_cim}") private String userName; @Value("${cim.msg.logger.path}") - private String msgLoggerPath ; + private String msgLoggerPath; @Value("${cim.heartbeat.time}") - private long heartBeatTime ; + private long heartBeatTime; @Value("${cim.reconnect.count}") private int reconnectCount; @Value("${cim.route.url}") private String routeUrl; + @Value("${cim.callback.thread.queue.size}") private int queueSize; + @Value("${cim.callback.thread.pool.size}") private int poolSize; } diff --git a/cim-client/src/main/java/com/crossoverjie/cim/client/config/BeanConfig.java b/cim-client/src/main/java/com/crossoverjie/cim/client/config/BeanConfig.java index 30ea454f..2b7e2a77 100644 --- a/cim-client/src/main/java/com/crossoverjie/cim/client/config/BeanConfig.java +++ b/cim-client/src/main/java/com/crossoverjie/cim/client/config/BeanConfig.java @@ -10,18 +10,14 @@ import com.crossoverjie.cim.common.data.construct.RingBufferWheel; import com.google.common.util.concurrent.ThreadFactoryBuilder; import jakarta.annotation.Resource; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.concurrent.*; + /** * Function:bean 配置 * @@ -41,6 +37,14 @@ public class BeanConfig { @Resource private MsgLogger msgLogger; + @Value("${cim.direct.host:}") + private String host; + + @Value("${cim.direct.tcp.port:}") + private Integer port; + + @Value("${cim.direct.http.port:}") + private Integer httpPort; @Bean public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor callbackThreadPool, @@ -50,7 +54,11 @@ public Client buildClient(@Qualifier("callBackThreadPool") ThreadPoolExecutor ca .writeTimeout(3, TimeUnit.SECONDS) .retryOnConnectionFailure(true).build(); - return Client.builder() + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setHost(host); + conf.setServerPort(port); + conf.setHttpPort(httpPort); + return Client.builder(conf) .auth(ClientConfigurationData.Auth.builder() .userName(appConfiguration.getUserName()) .userId(appConfiguration.getUserId()) diff --git a/cim-client/src/test/java/com/crossoverjie/cim/server/test/CommonTest.java b/cim-client/src/test/java/com/crossoverjie/cim/server/test/CommonTest.java index 7233389b..e3d13aed 100644 --- a/cim-client/src/test/java/com/crossoverjie/cim/server/test/CommonTest.java +++ b/cim-client/src/test/java/com/crossoverjie/cim/server/test/CommonTest.java @@ -11,29 +11,26 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.vdurmont.emoji.EmojiParser; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import org.junit.Test; + import java.io.IOException; import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.Charset; -import java.nio.file.Files; -import java.nio.file.LinkOption; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; +import java.nio.file.*; import java.time.LocalDate; import java.util.Arrays; import java.util.List; import java.util.Set; -import lombok.extern.slf4j.Slf4j; -import okhttp3.OkHttpClient; -import org.junit.Test; /** * Function: * * @author crossoverJie - * Date: 22/05/2018 18:44 + * Date: 22/05/2018 18:44 * @since JDK 1.8 */ @Slf4j @@ -41,47 +38,46 @@ public class CommonTest { - @Test - public void searchMsg2(){ - StringBuilder sb = new StringBuilder() ; + public void searchMsg2() { + StringBuilder sb = new StringBuilder(); String allMsg = "于是在之前的基础上我完善了一些内容,先来看看这个项目的介绍吧:\n" + "\n" + "CIM(CROSS-IM) 一款面向开发者的 IM(即时通讯)系统;同时提供了一些组件帮助开发者构建一款属于自己可水平扩展的 IM 。\n" + "\n" + - "借助 CIM 你可以实现以下需求:" ; + "借助 CIM 你可以实现以下需求:"; - String key = "CIM" ; + String key = "CIM"; String[] split = allMsg.split("\n"); for (String msg : split) { - if (msg.trim().contains(key)){ - sb.append(msg).append("\n") ; + if (msg.trim().contains(key)) { + sb.append(msg).append("\n"); } } int pos = 0; String result = sb.toString(); - int count = 1 ; - int multiple = 2 ; - while((pos = result.indexOf(key, pos)) >= 0) { + int count = 1; + int multiple = 2; + while ((pos = result.indexOf(key, pos)) >= 0) { - log.info("{},{}",pos, pos + key.length()); + log.info("{},{}", pos, pos + key.length()); pos += key.length(); - count ++ ; + count++; } System.out.println(sb.toString()); - System.out.println(sb.toString().replace(key,"\033[31;4m" + key+"\033[0m")); + System.out.println(sb.toString().replace(key, "\033[31;4m" + key + "\033[0m")); } @Test - public void log(){ - String msg = "hahahdsadsd" ; + public void log() { + String msg = "hahahdsadsd"; LocalDate today = LocalDate.now(); int year = today.getYear(); int month = today.getMonthValue(); @@ -108,7 +104,7 @@ public void log(){ } @Test - public void emoji() throws Exception{ + public void emoji() throws Exception { String str = "An :grinning:awesome :smiley:string 😄with a few :wink:emojis!"; String result = EmojiParser.parseToUnicode(str); System.out.println(result); @@ -125,16 +121,16 @@ public void emoji() throws Exception{ } @Test - public void emoji2(){ - String emostring ="😂"; + public void emoji2() { + String emostring = "😂"; String face_with_tears_of_joy = emostring.replaceAll("\uD83D\uDE02", "face with tears of joy"); System.out.println(face_with_tears_of_joy); - System.out.println("======" + face_with_tears_of_joy.replaceAll("face with tears of joy","\uD83D\uDE02")); + System.out.println("======" + face_with_tears_of_joy.replaceAll("face with tears of joy", "\uD83D\uDE02")); } -// @Test + // @Test public void deSerialize() throws Exception { RouteApi routeApi = RpcProxyManager.create(RouteApi.class, "http://localhost:8083", new OkHttpClient()); @@ -143,7 +139,7 @@ public void deSerialize() throws Exception { System.out.println(login.getDataBody()); BaseResponse> setBaseResponse = routeApi.onlineUser(); - log.info("setBaseResponse={}",setBaseResponse.getDataBody()); + log.info("setBaseResponse={}", setBaseResponse.getDataBody()); } @Test @@ -153,7 +149,7 @@ public void json() throws JsonProcessingException, ClassNotFoundException { ObjectMapper objectMapper = new ObjectMapper(); Class generic = null; for (Method declaredMethod : RouteApi.class.getDeclaredMethods()) { - if (declaredMethod.getName().equals("login")){ + if (declaredMethod.getName().equals("login")) { Type returnType = declaredMethod.getGenericReturnType(); // check if the return type is a parameterized type @@ -178,12 +174,12 @@ public void json() throws JsonProcessingException, ClassNotFoundException { } - private static class Gen{ + private static class Gen { private T t; private R r; } - interface TestInterface{ + interface TestInterface { Gen login(); } @@ -211,7 +207,8 @@ public void test1() throws JsonProcessingException { // 通过反射获取 BaseResponse> 中的泛型类型 public static Type getGenericTypeOfBaseResponse() { // 这里模拟你需要处理的 BaseResponse> - ParameterizedType baseResponseType = (ParameterizedType) new TypeReference>>() {}.getType(); + ParameterizedType baseResponseType = (ParameterizedType) new TypeReference>>() { + }.getType(); // 获取 BaseResponse 的泛型参数,即 Set Type[] actualTypeArguments = baseResponseType.getActualTypeArguments(); diff --git a/cim-common/pom.xml b/cim-common/pom.xml index c83720ea..e6de1777 100644 --- a/cim-common/pom.xml +++ b/cim-common/pom.xml @@ -14,6 +14,10 @@ + + com.auth0 + java-jwt + org.projectlombok lombok diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/auth/JwtUtils.java b/cim-common/src/main/java/com/crossoverjie/cim/common/auth/JwtUtils.java new file mode 100644 index 00000000..1e354ea0 --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/auth/JwtUtils.java @@ -0,0 +1,71 @@ +package com.crossoverjie.cim.common.auth; + +import com.alibaba.fastjson.JSONObject; +import com.auth0.jwt.JWT; +import com.auth0.jwt.algorithms.Algorithm; +import com.auth0.jwt.exceptions.JWTDecodeException; +import com.auth0.jwt.interfaces.DecodedJWT; +import com.crossoverjie.cim.common.auth.jwt.dto.PayloadVO; +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +/** + * 签发和验证 JWT + * + * @author chenqwwq + * @date 2025/6/7 + **/ +@Slf4j +public class JwtUtils { + + private static final String SECRET = "cim#crossoverjie"; + + public static final Algorithm ALGORITHM = Algorithm.HMAC256(SECRET); + public static final String issuer = "cim"; + + /** + * Token 签发 + */ + public static

String generateToken(Long userId, P p) { + Preconditions.checkNotNull(userId, "userId can't be null"); + Preconditions.checkNotNull(p, "payload can't be null"); + final JSONObject payload = JSONObject.parseObject(JSONObject.toJSONString(p)); + return JWT.create() + .withIssuer(issuer) + .withSubject(String.valueOf(userId)) + .withIssuedAt(new Date()) // token 签发时间 + .withHeader(createHeader()) + .withPayload(payload) + .sign(ALGORITHM); + } + + /** + * Token 解析 + */ + public static PayloadVO verifyToken(String token) { + Preconditions.checkArgument(StringUtils.isNotBlank(token), "invalid token"); + final DecodedJWT decodedObj = JWT.require(ALGORITHM) + .build() + .verify(token); + try { + final PayloadVO vo = new PayloadVO(); + vo.setUserName(decodedObj.getClaim("userName").as(String.class)); + vo.setUserId(decodedObj.getClaim("userId").as(Long.class)); + return vo; + } catch (Exception e) { + log.warn("jwt auth token decode failure,token:{},e", token, e); + throw new JWTDecodeException("user client auth Token decoder failure"); + } + } + + private static Map createHeader() { + Map header = new HashMap<>(); + header.put("alg", "HS256"); + return header; + } +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/auth/jwt/dto/PayloadVO.java b/cim-common/src/main/java/com/crossoverjie/cim/common/auth/jwt/dto/PayloadVO.java new file mode 100644 index 00000000..d9a28d4a --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/auth/jwt/dto/PayloadVO.java @@ -0,0 +1,70 @@ +package com.crossoverjie.cim.common.auth.jwt.dto; + +import java.io.Serial; +import java.io.Serializable; + +/** + * @author chenqwwq + * @date 2025/6/7 + **/ +public class PayloadVO implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 用户id + */ + private Long userId; + + /** + * 用户名称 + */ + private String userName; + + /** + * 授权目标主机地址 + */ + private String host; + + /** + * 授权目标主机端口 + */ + private Integer port; + + public Long getUserId() { + return userId; + } + + public PayloadVO setUserId(Long userId) { + this.userId = userId; + return this; + } + + public String getUserName() { + return userName; + } + + public PayloadVO setUserName(String userName) { + this.userName = userName; + return this; + } + + public String getHost() { + return host; + } + + public PayloadVO setHost(String host) { + this.host = host; + return this; + } + + public Integer getPort() { + return port; + } + + public PayloadVO setPort(Integer port) { + this.port = port; + return this; + } +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/enums/AuthTypeEnum.java b/cim-common/src/main/java/com/crossoverjie/cim/common/enums/AuthTypeEnum.java new file mode 100644 index 00000000..f8460791 --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/enums/AuthTypeEnum.java @@ -0,0 +1,9 @@ +package com.crossoverjie.cim.common.enums; + +/** + * @author chenqwwq + * @date 2025/6/5 + **/ +public enum AuthTypeEnum { + +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/enums/ChannelAttributeKeys.java b/cim-common/src/main/java/com/crossoverjie/cim/common/enums/ChannelAttributeKeys.java new file mode 100644 index 00000000..e99cd1de --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/enums/ChannelAttributeKeys.java @@ -0,0 +1,34 @@ +package com.crossoverjie.cim.common.enums; + +import io.netty.util.AttributeKey; + +/** + * @author chenqwwq + * @date 2025/6/7 + **/ +public interface ChannelAttributeKeys { + + /** + * 认证的 Token + *

+ * 在客户端保存 + */ + AttributeKey AUTH_TOKEN = AttributeKey.newInstance("auth_token"); + + /** + * 认证结果 + */ + AttributeKey AUTH_RES = AttributeKey.newInstance("auth_res"); + + /** + * 用户id + */ + AttributeKey USER_ID = AttributeKey.newInstance("user_id"); + + /** + * 用户名 + */ + AttributeKey USER_NAME = AttributeKey.newInstance("user_name"); + + +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/enums/RegistryType.java b/cim-common/src/main/java/com/crossoverjie/cim/common/enums/RegistryType.java new file mode 100644 index 00000000..c47eb5e3 --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/enums/RegistryType.java @@ -0,0 +1,31 @@ +package com.crossoverjie.cim.common.enums; + +/** + * @author chenqwwq + * @date 2025/6/8 + **/ +public enum RegistryType { + + NO("no", "不使用注册中心"), + + ZK("zk", "使用zookeeper注册中心"); + + + private final String code; + + private final String memo; + + RegistryType(String code, String memo) { + this.memo = memo; + this.code = code; + } + + + public String getCode() { + return code; + } + + public String getMemo() { + return memo; + } +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/handler/ChannelInboundDebugHandler.java b/cim-common/src/main/java/com/crossoverjie/cim/common/handler/ChannelInboundDebugHandler.java new file mode 100644 index 00000000..3c3ca78e --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/handler/ChannelInboundDebugHandler.java @@ -0,0 +1,36 @@ +package com.crossoverjie.cim.common.handler; + +import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author chenqwwq + * @date 2025/6/14 + **/ +@Slf4j +@ChannelHandler.Sharable +public class ChannelInboundDebugHandler extends ChannelInboundHandlerAdapter { + + public static final ChannelInboundDebugHandler INSTANCE = new ChannelInboundDebugHandler(); + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + final Long userId = ctx.channel().attr(ChannelAttributeKeys.USER_ID).get(); + log.info("user id:{}, channel is inactive", userId); + ctx.fireChannelInactive(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +// if (msg instanceof ByteBuf) { +// ByteBuf buf = (ByteBuf) msg; +// String hexDump = ByteBufUtil.hexDump(buf); +// log.info("16进制报文内容:{}", hexDump); +// 用protoc解析 +// } + super.channelRead(ctx, msg); + } +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/handler/ChannelOutboundDebugHandler.java b/cim-common/src/main/java/com/crossoverjie/cim/common/handler/ChannelOutboundDebugHandler.java new file mode 100644 index 00000000..ebac32f6 --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/handler/ChannelOutboundDebugHandler.java @@ -0,0 +1,29 @@ +package com.crossoverjie.cim.common.handler; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import lombok.extern.slf4j.Slf4j; + +/** + * @author chenqwwq + * @date 2025/6/14 + **/ +@Slf4j +@ChannelHandler.Sharable +public class ChannelOutboundDebugHandler extends ChannelOutboundHandlerAdapter { + + public static final ChannelOutboundDebugHandler INSTANCE = new ChannelOutboundDebugHandler(); + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { +// if (msg instanceof ByteBuf) { +// ByteBuf buf = (ByteBuf) msg; +// String hexDump = ByteBufUtil.hexDump(buf); +// log.info("16 进制报文内容: {}", hexDump); + // 然后尝试用protoc解析 +// } + super.write(ctx, msg, promise); + } +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/metastore/NoMetaStoreImpl.java b/cim-common/src/main/java/com/crossoverjie/cim/common/metastore/NoMetaStoreImpl.java new file mode 100644 index 00000000..b32a4be6 --- /dev/null +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/metastore/NoMetaStoreImpl.java @@ -0,0 +1,36 @@ +package com.crossoverjie.cim.common.metastore; + +import com.google.common.collect.Sets; + +import java.util.Set; + +/** + * @author chenqwwq + * @date 2025/6/8 + **/ +public class NoMetaStoreImpl implements MetaStore { + @Override + public void initialize(AbstractConfiguration configuration) throws Exception { + + } + + @Override + public Set getAvailableServerList() throws Exception { + return Sets.newHashSet(); + } + + @Override + public void addServer(String ip, int cimServerPort, int httpPort) throws Exception { + + } + + @Override + public void listenServerList(ChildListener childListener) throws Exception { + + } + + @Override + public void rebuildCache() throws Exception { + + } +} diff --git a/cim-common/src/main/java/com/crossoverjie/cim/common/pojo/RouteInfo.java b/cim-common/src/main/java/com/crossoverjie/cim/common/pojo/RouteInfo.java index 0e72592f..91e81596 100644 --- a/cim-common/src/main/java/com/crossoverjie/cim/common/pojo/RouteInfo.java +++ b/cim-common/src/main/java/com/crossoverjie/cim/common/pojo/RouteInfo.java @@ -19,4 +19,7 @@ public final class RouteInfo { private String ip ; private Integer cimServerPort; private Integer httpPort; + + public RouteInfo() { + } } diff --git a/cim-common/src/test/java/com/crossoverjie/cim/common/auth/JwtAuthTest.java b/cim-common/src/test/java/com/crossoverjie/cim/common/auth/JwtAuthTest.java new file mode 100644 index 00000000..e5015a43 --- /dev/null +++ b/cim-common/src/test/java/com/crossoverjie/cim/common/auth/JwtAuthTest.java @@ -0,0 +1,32 @@ +package com.crossoverjie.cim.common.auth; + +import com.crossoverjie.cim.common.auth.jwt.dto.PayloadVO; +import org.junit.Assert; +import org.junit.Test; + +/** + * @author chenqwwq + * @date 2025/8/7 + **/ +public class JwtAuthTest { + + @Test + public void testJwtToken() { + Long userId = 111L; + String userName = "chenqwwq"; + String host = "127.0.0.1"; + Integer port = 8888; + PayloadVO vo = new PayloadVO(); + vo.setUserId(userId); + vo.setUserName(userName); + vo.setHost(host); + vo.setPort(port); + + // generate a jwt token + final String token = JwtUtils.generateToken(userId, vo); + // verify + final PayloadVO payloadVO = JwtUtils.verifyToken(token); + Assert.assertEquals(userId, payloadVO.getUserId()); + Assert.assertEquals(userName, payloadVO.getUserName()); + } +} diff --git a/cim-common/src/test/java/com/crossoverjie/cim/common/core/proxy/RpcProxyManagerTest.java b/cim-common/src/test/java/com/crossoverjie/cim/common/core/proxy/RpcProxyManagerTest.java index c1c43c02..d18f3490 100644 --- a/cim-common/src/test/java/com/crossoverjie/cim/common/core/proxy/RpcProxyManagerTest.java +++ b/cim-common/src/test/java/com/crossoverjie/cim/common/core/proxy/RpcProxyManagerTest.java @@ -3,14 +3,14 @@ import com.crossoverjie.cim.common.exception.CIMException; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.Serializable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import okhttp3.OkHttpClient; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; + +import java.io.Serializable; class RpcProxyManagerTest { @@ -53,38 +53,38 @@ public void testUrl() { Assertions.assertEquals(response.getParsedBody().getAge(), 18); Assertions.assertEquals(response.getParsedBody().getCity(), "shenzhen"); response = echo.echoTarget(request, url); - Assertions.assertEquals(response.getParsedBody().getName(), "crossoverJie"); - - String req = "/request"; - response = echo.request("http://echo.free.beeceptor.com", request); - Assertions.assertEquals(response.getPath(), req); - Assertions.assertEquals(response.getParsedBody().getAge(), 18); - - Assertions.assertThrows(CIMException.class, () -> echo.echoTarget(request)); - } - - @Test - public void testFail() { - OkHttpClient client = new OkHttpClient(); - String url = "http://echo.free.beeceptor.com"; - Echo echo = RpcProxyManager.create(Echo.class, url, client); - EchoRequest request = new EchoRequest(); - request.setName("crossoverJie"); - request.setAge(18); - request.setCity("shenzhen"); - Assertions.assertThrows(IllegalArgumentException.class, () -> echo.fail(request, "test","")); - } - - - @Test - public void testGeneric() { - OkHttpClient client = new OkHttpClient(); - String url = "http://echo.free.beeceptor.com"; - Echo echo = RpcProxyManager.create(Echo.class, url, client); - EchoRequest request = new EchoRequest(); - request.setName("crossoverJie"); - request.setAge(18); - request.setCity("shenzhen"); +Assertions.assertEquals(response.getParsedBody().getName(), "crossoverJie"); + +String req = "/request"; +response = echo.request("http://echo.free.beeceptor.com", request); +Assertions.assertEquals(response.getPath(), req); +Assertions.assertEquals(response.getParsedBody().getAge(), 18); + +Assertions.assertThrows(CIMException.class, () -> echo.echoTarget(request)); +} + +@Test +public void testFail() { +OkHttpClient client = new OkHttpClient(); +String url = "http://echo.free.beeceptor.com"; +Echo echo = RpcProxyManager.create(Echo.class, url, client); +EchoRequest request = new EchoRequest(); +request.setName("crossoverJie"); +request.setAge(18); +request.setCity("shenzhen"); +Assertions.assertThrows(IllegalArgumentException.class, () -> echo.fail(request, "test","")); +} + + +@Test +public void testGeneric() { +OkHttpClient client = new OkHttpClient(); +String url = "http://echo.free.beeceptor.com"; +Echo echo = RpcProxyManager.create(Echo.class, url, client); +EchoRequest request = new EchoRequest(); +request.setName("crossoverJie"); +request.setAge(18); +request.setCity("shenzhen"); EchoGeneric response = echo.echoGeneric(request); Assertions.assertEquals(response.getHeaders().getHost(), "echo.free.beeceptor.com"); } @@ -172,6 +172,10 @@ public static class HeadersDTO { private String contentType; @JsonProperty("Accept-Encoding") private String acceptEncoding; + @JsonProperty("Upgrade-Insecure-Requests") + private String upgradeInsecureRequests; + @JsonProperty("Via") + private String via; } @NoArgsConstructor diff --git a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/config/BeanConfig.java b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/config/BeanConfig.java index 28261ee5..48fe269d 100644 --- a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/config/BeanConfig.java +++ b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/config/BeanConfig.java @@ -1,7 +1,9 @@ package com.crossoverjie.cim.route.config; import com.crossoverjie.cim.common.core.proxy.RpcProxyManager; +import com.crossoverjie.cim.common.enums.RegistryType; import com.crossoverjie.cim.common.metastore.MetaStore; +import com.crossoverjie.cim.common.metastore.NoMetaStoreImpl; import com.crossoverjie.cim.common.metastore.ZkConfiguration; import com.crossoverjie.cim.common.metastore.ZkMetaStoreImpl; import com.crossoverjie.cim.common.pojo.CIMUserInfo; @@ -16,6 +18,7 @@ import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -44,19 +47,30 @@ public class BeanConfig { @Resource private AppConfiguration appConfiguration; + @Value("${cim.register.type:ZK}") + private RegistryType registerType; + + @Bean public MetaStore metaStore() throws Exception { - MetaStore metaStore = new ZkMetaStoreImpl(); - ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); - metaStore.initialize(ZkConfiguration.builder() - .metaServiceUri(appConfiguration.getZkAddr()) - .timeoutMs(appConfiguration.getZkConnectTimeout()) - .retryPolicy(retryPolicy) - .build()); - metaStore.listenServerList((root, currentChildren) -> { - log.info("Server list change, root=[{}], current server list=[{}]", root, currentChildren); - }); - return metaStore; + switch (registerType) { + case NO: + return new NoMetaStoreImpl(); + case ZK: + MetaStore metaStore = new ZkMetaStoreImpl(); + ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); + metaStore.initialize(ZkConfiguration.builder() + .metaServiceUri(appConfiguration.getZkAddr()) + .timeoutMs(appConfiguration.getZkConnectTimeout()) + .retryPolicy(retryPolicy) + .build()); + metaStore.listenServerList((root, currentChildren) -> { + log.info("Server list change, root=[{}], current server list=[{}]", root, currentChildren); + }); + return metaStore; + default: + throw new IllegalArgumentException("invalid register type"); + } } @@ -130,7 +144,7 @@ public Optional load(Long userId) throws Exception { } }); } - + @Bean public ServerApi serverApi(OkHttpClient okHttpClient) { return RpcProxyManager.create(ServerApi.class, okHttpClient); diff --git a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/controller/RouteController.java b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/controller/RouteController.java index b0f4f15a..230957cc 100644 --- a/cim-forward-route/src/main/java/com/crossoverjie/cim/route/controller/RouteController.java +++ b/cim-forward-route/src/main/java/com/crossoverjie/cim/route/controller/RouteController.java @@ -1,5 +1,7 @@ package com.crossoverjie.cim.route.controller; +import com.crossoverjie.cim.common.auth.JwtUtils; +import com.crossoverjie.cim.common.auth.jwt.dto.PayloadVO; import com.crossoverjie.cim.common.enums.StatusEnum; import com.crossoverjie.cim.common.exception.CIMException; import com.crossoverjie.cim.common.metastore.MetaStore; @@ -20,17 +22,19 @@ import com.crossoverjie.cim.server.api.ServerApi; import io.swagger.v3.oas.annotations.Operation; import jakarta.annotation.Resource; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Controller; +import org.springframework.util.CollectionUtils; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + import static com.crossoverjie.cim.common.enums.StatusEnum.OFF_LINE; /** @@ -170,25 +174,35 @@ public BaseResponse login(@RequestBody LoginReqVO loginReqVO) th StatusEnum status = accountService.login(loginReqVO); res.setCode(status.getCode()); res.setMessage(status.getMessage()); - if (status != StatusEnum.SUCCESS) { + if (status != StatusEnum.SUCCESS && status != StatusEnum.REPEAT_LOGIN) { return res; } // check server available + // 可以不返回,直接使用直连方式 + RouteInfo routeInfo = new RouteInfo(); Set availableServerList = metaStore.getAvailableServerList(); - String key = String.valueOf(loginReqVO.getUserId()); - String server = - routeHandle.routeServer(List.copyOf(availableServerList), key); - log.info("userInfo=[{}] route server info=[{}]", loginReqVO, server); + if (!CollectionUtils.isEmpty(availableServerList)) { + String key = String.valueOf(loginReqVO.getUserId()); + String server = + routeHandle.routeServer(List.copyOf(availableServerList), key); + log.info("userInfo=[{}] route server info=[{}]", loginReqVO, server); - RouteInfo routeInfo = RouteInfoParseUtil.parse(server); - routeInfo = commonBizService.checkServerAvailable(routeInfo, key); + routeInfo = RouteInfoParseUtil.parse(server); + routeInfo = commonBizService.checkServerAvailable(routeInfo, key); + + //保存路由信息 + accountService.saveRouteInfo(loginReqVO, server); + } - //保存路由信息 - accountService.saveRouteInfo(loginReqVO, server); + // 颁发认证 Token + PayloadVO pv = new PayloadVO(); + pv.setUserId(loginReqVO.getUserId()); + pv.setUserName(loginReqVO.getUserName()); + final String token = JwtUtils.generateToken(loginReqVO.getUserId(), pv); CIMServerResVO vo = - new CIMServerResVO(routeInfo.getIp(), routeInfo.getCimServerPort(), routeInfo.getHttpPort()); + new CIMServerResVO(routeInfo.getIp(), routeInfo.getCimServerPort(), routeInfo.getHttpPort(), token); res.setDataBody(vo); return res; } diff --git a/cim-integration-test/src/main/java/com/crossoverjie/cim/client/sdk/route/AbstractRouteBaseTest.java b/cim-integration-test/src/main/java/com/crossoverjie/cim/client/sdk/route/AbstractRouteBaseTest.java index 2d2f62aa..305b98c5 100644 --- a/cim-integration-test/src/main/java/com/crossoverjie/cim/client/sdk/route/AbstractRouteBaseTest.java +++ b/cim-integration-test/src/main/java/com/crossoverjie/cim/client/sdk/route/AbstractRouteBaseTest.java @@ -1,7 +1,7 @@ package com.crossoverjie.cim.client.sdk.route; -import com.crossoverjie.cim.common.res.BaseResponse; import com.crossoverjie.cim.client.sdk.server.AbstractServerBaseTest; +import com.crossoverjie.cim.common.res.BaseResponse; import com.crossoverjie.cim.route.RouteApplication; import com.crossoverjie.cim.route.api.RouteApi; import com.crossoverjie.cim.route.api.vo.req.RegisterInfoReqVO; @@ -18,6 +18,7 @@ public abstract class AbstractRouteBaseTest extends AbstractServerBaseTest { RedisContainer redis = new RedisContainer(DockerImageName.parse("redis:7.4.0")); protected ConfigurableApplicationContext run; + public void startRoute(String offlineModel) { redis.start(); SpringApplication route = new SpringApplication(RouteApplication.class); @@ -31,7 +32,7 @@ public void startRoute(String offlineModel) { run = route.run(args); } - public void close(){ + public void close() { super.close(); redis.close(); run.close(); diff --git a/cim-rout-api/src/main/java/com/crossoverjie/cim/route/api/vo/res/CIMServerResVO.java b/cim-rout-api/src/main/java/com/crossoverjie/cim/route/api/vo/res/CIMServerResVO.java index 07911f84..1dbeba36 100644 --- a/cim-rout-api/src/main/java/com/crossoverjie/cim/route/api/vo/res/CIMServerResVO.java +++ b/cim-rout-api/src/main/java/com/crossoverjie/cim/route/api/vo/res/CIMServerResVO.java @@ -1,17 +1,16 @@ package com.crossoverjie.cim.route.api.vo.res; -import com.crossoverjie.cim.common.pojo.RouteInfo; - -import java.io.Serializable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import java.io.Serializable; + /** * Function: * * @author crossoverJie - * Date: 2018/12/23 00:43 + * Date: 2018/12/23 00:43 * @since JDK 1.8 */ @Data @@ -19,8 +18,14 @@ @NoArgsConstructor public class CIMServerResVO implements Serializable { - private String ip ; + private String ip; private Integer cimServerPort; private Integer httpPort; + private String authToken; + public CIMServerResVO(String ip, Integer cimServerPort, Integer httpPort) { + this.ip = ip; + this.cimServerPort = cimServerPort; + this.httpPort = httpPort; + } } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/CIMServerApplication.java b/cim-server/src/main/java/com/crossoverjie/cim/server/CIMServerApplication.java index a5f99505..dd41d004 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/CIMServerApplication.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/CIMServerApplication.java @@ -1,10 +1,13 @@ package com.crossoverjie.cim.server; +import com.crossoverjie.cim.common.enums.RegistryType; import com.crossoverjie.cim.common.metastore.MetaStore; import com.crossoverjie.cim.server.config.AppConfiguration; import com.crossoverjie.cim.server.kit.RegistryMetaStore; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -17,28 +20,32 @@ */ @SpringBootApplication @Slf4j -public class CIMServerApplication implements CommandLineRunner{ +public class CIMServerApplication implements CommandLineRunner { - @Resource - private AppConfiguration appConfiguration ; + @Resource + private AppConfiguration appConfiguration; - @Resource - private MetaStore metaStore; + @Autowired(required = false) + private MetaStore metaStore; - @Value("${server.port}") - private int httpPort ; + @Value("${server.port}") + private int httpPort; - public static void main(String[] args) { + public static void main(String[] args) { SpringApplication.run(CIMServerApplication.class, args); - log.info("Start cim server success!!!"); - } - - @Override - public void run(String... args) throws Exception { - String addr = InetAddress.getLocalHost().getHostAddress(); - Thread thread = new Thread(new RegistryMetaStore(metaStore, addr, appConfiguration.getCimServerPort(),httpPort)); - thread.setName("registry-zk"); - thread.start() ; - } + log.info("Start cim server success!!!"); + } + + @Override + public void run(String... args) throws Exception { + if (StringUtils.equals(appConfiguration.getRegisterType().getCode(), RegistryType.NO.getCode())) { + log.info("no register type,client need direct connection!"); + return; + } + String addr = InetAddress.getLocalHost().getHostAddress(); + Thread thread = new Thread(new RegistryMetaStore(metaStore, addr, appConfiguration.getCimServerPort(), httpPort)); + thread.setName("registry-zk"); + thread.start(); + } } \ No newline at end of file diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/config/AppConfiguration.java b/cim-server/src/main/java/com/crossoverjie/cim/server/config/AppConfiguration.java index 0ba33499..ec24c3d4 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/config/AppConfiguration.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/config/AppConfiguration.java @@ -1,5 +1,6 @@ package com.crossoverjie.cim.server.config; +import com.crossoverjie.cim.common.enums.RegistryType; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -7,7 +8,7 @@ * Function: * * @author crossoverJie - * Date: 2018/8/24 01:43 + * Date: 2018/8/24 01:43 * @since JDK 1.8 */ @Component @@ -19,32 +20,39 @@ public class AppConfiguration { @Value("${app.zk.addr}") private String zkAddr; - @Value("${app.zk.switch}") - private boolean zkSwitch; - @Value("${cim.server.port}") private int cimServerPort; @Value("${cim.route.url}") - private String routeUrl ; + private String routeUrl; + + /** + * 链接服务端注册类型 + *

+ * no: 不注册(客户端直连模式 + * zk: zookeeper 存储 + */ + @Value("${cim.register.type:ZK}") + private RegistryType registerType; public String getRouteUrl() { return routeUrl; } + @Value("${cim.heartbeat.time}") + private long heartBeatTime; + + @Value("${app.zk.connect.timeout}") + private int zkConnectTimeout; + + public void setRouteUrl(String routeUrl) { this.routeUrl = routeUrl; } - @Value("${cim.heartbeat.time}") - private long heartBeatTime ; - - @Value("${app.zk.connect.timeout}") - private int zkConnectTimeout; - public int getZkConnectTimeout() { - return zkConnectTimeout; - } + return zkConnectTimeout; + } public String getZkRoot() { return zkRoot; @@ -62,13 +70,6 @@ public void setZkAddr(String zkAddr) { this.zkAddr = zkAddr; } - public boolean isZkSwitch() { - return zkSwitch; - } - - public void setZkSwitch(boolean zkSwitch) { - this.zkSwitch = zkSwitch; - } public int getCimServerPort() { return cimServerPort; @@ -85,4 +86,18 @@ public long getHeartBeatTime() { public void setHeartBeatTime(long heartBeatTime) { this.heartBeatTime = heartBeatTime; } + + public RegistryType getRegisterType() { + return registerType; + } + + public AppConfiguration setRegisterType(RegistryType registerType) { + this.registerType = registerType; + return this; + } + + public AppConfiguration setZkConnectTimeout(int zkConnectTimeout) { + this.zkConnectTimeout = zkConnectTimeout; + return this; + } } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java b/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java index 09cca96f..c83146a3 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/config/BeanConfig.java @@ -6,18 +6,19 @@ import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.route.api.RouteApi; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.Resource; -import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.concurrent.TimeUnit; + /** * Function: * * @author crossoverJie - * Date: 2018/12/23 00:25 + * Date: 2018/12/23 00:25 * @since JDK 1.8 */ @Configuration @@ -28,6 +29,7 @@ public class BeanConfig { /** * http client + * * @return okHttp */ @Bean @@ -35,18 +37,23 @@ public OkHttpClient okHttpClient() { OkHttpClient.Builder builder = new OkHttpClient.Builder(); builder.connectTimeout(30, TimeUnit.SECONDS) .readTimeout(10, TimeUnit.SECONDS) - .writeTimeout(10,TimeUnit.SECONDS) + .writeTimeout(10, TimeUnit.SECONDS) .retryOnConnectionFailure(true); return builder.build(); } + /** + * 在以 ZK 作为元数据存储的时候才需要 + */ @Bean + @ConditionalOnProperty(value = "cim.register.type", havingValue = "ZK", matchIfMissing = false) public MetaStore metaStore() { return new ZkMetaStoreImpl(); } /** * 创建心跳单例 + * * @return */ @Bean(value = "heartBeat") diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/config/ServerConfiguration.java b/cim-server/src/main/java/com/crossoverjie/cim/server/config/ServerConfiguration.java new file mode 100644 index 00000000..e9438d5e --- /dev/null +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/config/ServerConfiguration.java @@ -0,0 +1,28 @@ +package com.crossoverjie.cim.server.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +/** + * @author chenqwwq + * @date 2025/6/8 + **/ +@Data +@Configuration +@ConfigurationProperties(prefix = "cim.server") +public class ServerConfiguration { + + + /** + * 链接服务端注册类型 + *

+ * zk: zookeeper 存储 + * + * @see com.crossoverjie.cim.common.enums.RegistryType 注册类型 + */ + @Value("${register.type:no}") + private Boolean registerType; + +} diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java b/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java index 13636af0..4899f393 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/handle/CIMServerHandle.java @@ -1,10 +1,13 @@ package com.crossoverjie.cim.server.handle; +import com.crossoverjie.cim.common.constant.Constants; +import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; import com.crossoverjie.cim.common.exception.CIMException; import com.crossoverjie.cim.common.kit.HeartBeatHandler; import com.crossoverjie.cim.common.pojo.CIMUserInfo; import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; +import com.crossoverjie.cim.common.protocol.Response; import com.crossoverjie.cim.common.util.NettyAttrUtil; import com.crossoverjie.cim.server.kit.RouteHandler; import com.crossoverjie.cim.server.kit.ServerHeartBeatHandlerImpl; @@ -18,19 +21,19 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; /** * Function: * * @author crossoverJie - * Date: 17/05/2018 18:52 + * Date: 17/05/2018 18:52 * @since JDK 1.8 */ @ChannelHandler.Sharable @Slf4j public class CIMServerHandle extends SimpleChannelInboundHandler { - - + public static final String ERROR_MESSAGE = "client need send first data frame for auth !"; /** * 取消绑定 @@ -42,12 +45,12 @@ public class CIMServerHandle extends SimpleChannelInboundHandler { public void channelInactive(ChannelHandlerContext ctx) throws Exception { //可能出现业务判断离线后再次触发 channelInactive CIMUserInfo userInfo = SessionSocketHolder.getUserId((NioSocketChannel) ctx.channel()); - if (userInfo != null){ - log.warn("[{}] trigger channelInactive offline!",userInfo.getUserName()); + if (userInfo != null) { + log.warn("[{}] trigger channelInactive offline!", userInfo.getUserName()); //Clear route info and offline. RouteHandler routeHandler = SpringBeanFactory.getBean(RouteHandler.class); - routeHandler.userOffLine(userInfo,(NioSocketChannel) ctx.channel()); + routeHandler.userOffLine(userInfo, (NioSocketChannel) ctx.channel()); ctx.channel().close(); } @@ -61,29 +64,42 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc log.info("!!READER_IDLE!!"); - HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class) ; - heartBeatHandler.process(ctx) ; + HeartBeatHandler heartBeatHandler = SpringBeanFactory.getBean(ServerHeartBeatHandlerImpl.class); + heartBeatHandler.process(ctx); } } super.userEventTriggered(ctx, evt); } - @Override protected void channelRead0(ChannelHandlerContext ctx, Request msg) throws Exception { log.info("received msg=[{}]", msg.toString()); + if (BooleanUtils.isNotTrue(ctx.channel().attr(ChannelAttributeKeys.AUTH_RES).get())) { + log.error("channel do not through auth,good bye"); + final Response error = Response.newBuilder() + .setCmd(BaseCommand.MESSAGE) + .setResMsg(ERROR_MESSAGE) + .putProperties(Constants.MetaKey.RECEIVE_USER_ID, String.valueOf(msg.getRequestId())) + .build(); + ctx.writeAndFlush(error).addListener(ChannelFutureListener.CLOSE); + return; + } + // 登陆请求是为了上报当前用户信息,可以和连接鉴权分开,链接鉴定权是鉴定客户端是否合法 + // 目前的合法性判断是走登陆借口返回的 authToken 验证 if (msg.getCmd() == BaseCommand.LOGIN_REQUEST) { //保存客户端与 Channel 之间的关系 SessionSocketHolder.put(msg.getRequestId(), (NioSocketChannel) ctx.channel()); SessionSocketHolder.saveSession(msg.getRequestId(), msg.getReqMsg()); + ctx.channel().attr(ChannelAttributeKeys.USER_ID).set(msg.getRequestId()); + ctx.channel().attr(ChannelAttributeKeys.USER_NAME).set(msg.getReqMsg()); log.info("client [{}] online success!!", msg.getReqMsg()); } //心跳更新时间 - if (msg.getCmd() == BaseCommand.PING){ - NettyAttrUtil.updateReaderTime(ctx.channel(),System.currentTimeMillis()); + if (msg.getCmd() == BaseCommand.PING) { + NettyAttrUtil.updateReaderTime(ctx.channel(), System.currentTimeMillis()); //向客户端响应 pong 消息 Request heartBeat = SpringBeanFactory.getBean("heartBeat", Request.class); ctx.writeAndFlush(heartBeat).addListeners((ChannelFutureListener) future -> { @@ -91,7 +107,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Request msg) throws Excep log.error("IO error,close Channel"); future.channel().close(); } - }) ; + }); } } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/handle/ClientAuthHandler.java b/cim-server/src/main/java/com/crossoverjie/cim/server/handle/ClientAuthHandler.java new file mode 100644 index 00000000..b0d2c41c --- /dev/null +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/handle/ClientAuthHandler.java @@ -0,0 +1,98 @@ +package com.crossoverjie.cim.server.handle; + +import com.crossoverjie.cim.common.auth.JwtUtils; +import com.crossoverjie.cim.common.auth.jwt.dto.PayloadVO; +import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; +import com.crossoverjie.cim.common.protocol.Request; +import com.crossoverjie.cim.common.util.StringUtil; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.util.Attribute; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; + +import java.util.Objects; + +/** + * client auto Handler + *

+ * This layer of handler is above the business handler, and the data flowing here must need to be authenticated, and after authentication, it will remove itself from the pipeline + *

+ * decoder.channelRead encoder.write + * \ / + * auto.channelRead auto.write + * \ / + * cimServerHandle.channelRead -->> cimServerHandle.write + *

+ *

+ * TODO: 认证是否需要一个超时时间,因为有心跳包,如果超时时间内,心跳包就需要跳过验证 + * + * @author chenqwwq + * @date 2025/6/5 + **/ +@Slf4j +public class ClientAuthHandler extends ChannelDuplexHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + final Attribute attr = ctx.channel().attr(ChannelAttributeKeys.AUTH_RES); + if (BooleanUtils.isTrue(attr.get())) { + // 已经认证过了就往下传递 + super.channelRead(ctx, msg); + return; + } + + if (Objects.isNull(msg)) { + log.warn("Received null message during authentication"); + return; + } + + if (!(msg instanceof Request)) { + log.warn("Received non-Request message during authentication: {}", msg.getClass()); + return; + } + + // 处理 Token + final String autoToken = ((Request) msg).getReqMsg(); + if (StringUtil.isEmpty(autoToken)) { + log.error("Token is null"); + writeAndClose("Require authentication first", ctx); + return; + } + + try { + final PayloadVO payload = JwtUtils.verifyToken(autoToken); + ctx.channel().attr(ChannelAttributeKeys.USER_ID).set(payload.getUserId()); + ctx.channel().attr(ChannelAttributeKeys.USER_NAME).set(payload.getUserName()); + attr.set(Boolean.TRUE); + // 认证成功之后移除自身 + ctx.pipeline().remove(this); + ctx.writeAndFlush("auto success ,welcome !"); + return; + } catch (Exception e) { + log.error("client auto failure,e:", e); + } + writeAndClose("auto failure!", ctx); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + + // TODO: 是否需要拦截写的操作 + super.write(ctx, msg, promise); + } + + private void writeAndClose(String msg, ChannelHandlerContext ctx) { + // 当前的 Handler 还在就说明链接还未认证 + ctx.writeAndFlush("auto failure!") + .addListener((ChannelFutureListener) future -> { + if (future.isDone()) { + log.info("client auth failure,close the channel."); + ctx.close(); + } + }); + } + +} diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/init/CIMServerInitializer.java b/cim-server/src/main/java/com/crossoverjie/cim/server/init/CIMServerInitializer.java index 1d2aacfa..52179493 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/init/CIMServerInitializer.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/init/CIMServerInitializer.java @@ -1,37 +1,76 @@ package com.crossoverjie.cim.server.init; +import com.crossoverjie.cim.common.enums.ChannelAttributeKeys; +import com.crossoverjie.cim.common.handler.ChannelInboundDebugHandler; +import com.crossoverjie.cim.common.handler.ChannelOutboundDebugHandler; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.server.handle.CIMServerHandle; +import com.crossoverjie.cim.server.handle.ClientAuthHandler; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.BooleanUtils; /** * Function: * * @author crossoverJie - * Date: 17/05/2018 18:51 + * Date: 17/05/2018 18:51 * @since JDK 1.8 */ +@Slf4j public class CIMServerInitializer extends ChannelInitializer { - private final CIMServerHandle cimServerHandle = new CIMServerHandle() ; + private final CIMServerHandle cimServerHandle = new CIMServerHandle(); + + + /** + * use client auth + */ + private final Boolean authInChannelActive; + + /** + * debug model + */ + private final Boolean debug; + + public CIMServerInitializer(Boolean authInChannelActive, Boolean debug) { + this.authInChannelActive = authInChannelActive; + this.debug = debug; + } @Override protected void initChannel(Channel ch) throws Exception { + final ChannelPipeline pipeline = ch.pipeline(); + //11 秒没有向客户端发送消息就发生心跳 + pipeline.addLast(new IdleStateHandler(11, 0, 0)); + + if (debug) { + pipeline.addLast(ChannelInboundDebugHandler.INSTANCE); + } + // google Protobuf decoder + pipeline.addLast(new ProtobufVarint32FrameDecoder()) + .addLast(new ProtobufDecoder(Request.getDefaultInstance())); + if (debug) { + pipeline.addLast(ChannelOutboundDebugHandler.INSTANCE); + } + // google Protobuf encoder + pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ProtobufEncoder()); - ch.pipeline() - //11 秒没有向客户端发送消息就发生心跳 - .addLast(new IdleStateHandler(11, 0, 0)) - // google Protobuf 编解码 - .addLast(new ProtobufVarint32FrameDecoder()) - .addLast(new ProtobufDecoder(Request.getDefaultInstance())) - .addLast(new ProtobufVarint32LengthFieldPrepender()) - .addLast(new ProtobufEncoder()) - .addLast(cimServerHandle); + // auth client + // this priority of the client authenticated handler is higher + if (BooleanUtils.isTrue(authInChannelActive)) { + pipeline.addLast(new ClientAuthHandler()); + } else { + ch.attr(ChannelAttributeKeys.AUTH_RES).set(Boolean.TRUE); + } + pipeline.addLast(cimServerHandle); } } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/kit/RegistryMetaStore.java b/cim-server/src/main/java/com/crossoverjie/cim/server/kit/RegistryMetaStore.java index 60b8825b..3d535184 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/kit/RegistryMetaStore.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/kit/RegistryMetaStore.java @@ -1,11 +1,13 @@ package com.crossoverjie.cim.server.kit; +import com.crossoverjie.cim.common.enums.RegistryType; import com.crossoverjie.cim.common.metastore.MetaStore; import com.crossoverjie.cim.common.metastore.ZkConfiguration; import com.crossoverjie.cim.server.config.AppConfiguration; import com.crossoverjie.cim.server.util.SpringBeanFactory; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -13,7 +15,7 @@ * Function: * * @author crossoverJie - * Date: 2018/8/24 01:37 + * Date: 2018/8/24 01:37 * @since JDK 1.8 */ @Slf4j @@ -22,24 +24,25 @@ public class RegistryMetaStore implements Runnable { private MetaStore metaStore; - private AppConfiguration appConfiguration ; + private AppConfiguration appConfiguration; private String ip; private int cimServerPort; private int httpPort; + public RegistryMetaStore(MetaStore metaStore, String ip, int cimServerPort, int httpPort) { this.ip = ip; this.cimServerPort = cimServerPort; - this.httpPort = httpPort ; + this.httpPort = httpPort; this.metaStore = metaStore; - appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ; + appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class); } @SneakyThrows @Override public void run() { - if (!appConfiguration.isZkSwitch()){ + if (StringUtils.equals(appConfiguration.getRegisterType().getCode(), RegistryType.NO.getCode())) { log.info("Skip registry to metaStore"); return; } diff --git a/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java b/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java index 2ec721b8..84f9c439 100644 --- a/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java +++ b/cim-server/src/main/java/com/crossoverjie/cim/server/server/CIMServer.java @@ -1,6 +1,5 @@ package com.crossoverjie.cim.server.server; -import com.crossoverjie.cim.common.protocol.BaseCommand; import com.crossoverjie.cim.common.protocol.Request; import com.crossoverjie.cim.server.api.vo.req.SendMsgReqVO; import com.crossoverjie.cim.server.init.CIMServerInitializer; @@ -15,16 +14,17 @@ import io.netty.channel.socket.nio.NioSocketChannel; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import java.net.InetSocketAddress; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.net.InetSocketAddress; + /** * Function: * * @author crossoverJie - * Date: 21/05/2018 00:30 + * Date: 21/05/2018 00:30 * @since JDK 1.8 */ @Component @@ -32,8 +32,8 @@ public class CIMServer { - private EventLoopGroup boss = new NioEventLoopGroup(); - private EventLoopGroup work = new NioEventLoopGroup(); + private final EventLoopGroup boss = new NioEventLoopGroup(1); + private final EventLoopGroup work = new NioEventLoopGroup(); @Value("${cim.server.port}") @@ -55,7 +55,7 @@ public void start() throws InterruptedException { .localAddress(new InetSocketAddress(nettyPort)) //保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, true) - .childHandler(new CIMServerInitializer()); + .childHandler(new CIMServerInitializer(Boolean.TRUE, Boolean.FALSE)); ChannelFuture future = bootstrap.bind().sync(); if (future.isSuccess()) { @@ -77,6 +77,7 @@ public void destroy() { /** * Push msg to client. + * * @param sendMsgReqVO message body */ public void sendMsg(SendMsgReqVO sendMsgReqVO) { diff --git a/cim-server/src/main/resources/application.yaml b/cim-server/src/main/resources/application.yaml index aa73ce85..47dd6bfb 100644 --- a/cim-server/src/main/resources/application.yaml +++ b/cim-server/src/main/resources/application.yaml @@ -2,11 +2,11 @@ spring: application: name: cim-server - + # web port server: port: 8081 - + # enable swagger springdoc: swagger-ui: @@ -26,6 +26,8 @@ app: root: /route # zk root path # cim server config cim: + register: + type: ZK server: port: 11211 route: diff --git a/pom.xml b/pom.xml index e4da752d..3cae3388 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,16 @@ - + + org.springframework.boot + spring-boot-starter-logging + 3.3.0 + + + com.auth0 + java-jwt + 3.19.0 + com.github.xiaoymin knife4j-openapi3-jakarta-spring-boot-starter