Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b1e5315
feat:
chenqwwq Jun 7, 2025
2bb45b7
Merge branch 'refs/heads/master' into feature/first_auth_v1
chenqwwq Jun 14, 2025
ecb124d
feat(client): 支持客户端直连模式
chenqwwq Jun 15, 2025
8357da3
build(dependencies): 更新 spring-boot-starter-logging 依赖版本
chenqwwq Jun 15, 2025
782e39f
refactor(client-sdk): 重构客户端 SDK
chenqwwq Jun 15, 2025
b1c4e8b
refactor(cim-common): 修正 RegistryType 构造函数参数顺序
chenqwwq Jun 15, 2025
dde0fea
refactor(cim): 重构注册中心相关代码
chenqwwq Jun 15, 2025
ec0b583
[链接鉴权] - fix(cim-common): 修复 ChannelInboundDebugHandler 中 channelInac…
chenqwwq Jun 16, 2025
ddceaf8
[链接鉴权] - fix(cim-common):
chenqwwq Jun 16, 2025
67b08bd
[链接鉴权] - fix(cim-common):
chenqwwq Jun 16, 2025
070e9cf
[链接鉴权] - fix:
chenqwwq Jun 16, 2025
31514f9
[链接鉴权] - fix:
chenqwwq Jun 16, 2025
4fb8f67
[连接鉴权] - fix(client): 优化客户端认证逻辑
chenqwwq Jun 18, 2025
8ba873d
Merge branch 'master' into feature/first_auth_v1
crossoverJie Jun 24, 2025
25a4512
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
94e7bea
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
cb2bd63
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
6a1d6e3
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
feff020
Merge remote-tracking branch 'origin/feature/first_auth_v1' into feat…
chenqwwq Aug 6, 2025
07a1955
[链接鉴权] - feat:
chenqwwq Aug 6, 2025
2167493
[链接鉴权] - test:
chenqwwq Aug 6, 2025
664deba
[链接鉴权] - test:
chenqwwq Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,41 @@
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;

import java.io.Closeable;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public interface Client extends Closeable {

static ClientBuilder builder(ClientConfigurationData conf) {
Objects.requireNonNull(conf, "ClientConfigurationData must not be null");
return new ClientBuilderImpl(conf);
}

static ClientBuilder builder() {
return new ClientBuilderImpl();
}

default void sendP2P(P2PReqVO p2PReqVO) throws Exception{
String checkHost();

Integer checkPort();

default void sendP2P(P2PReqVO p2PReqVO) throws Exception {
sendP2PAsync(p2PReqVO).get();
};
}

;

CompletableFuture<Void> sendP2PAsync(P2PReqVO p2PReqVO);

default void sendGroup(String msg) throws Exception{
default void sendGroup(String msg) throws Exception {
sendGroupAsync(msg).get();
};
}

;

CompletableFuture<Void> sendGroupAsync(String msg);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.common.util.StringUtil;
import java.util.concurrent.ThreadPoolExecutor;
import okhttp3.OkHttpClient;

import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;

public class ClientBuilderImpl implements ClientBuilder {


Expand All @@ -18,18 +20,20 @@ public class ClientBuilderImpl implements ClientBuilder {
public ClientBuilderImpl() {
this(new ClientConfigurationData());
}

public ClientBuilderImpl(ClientConfigurationData conf) {
Objects.requireNonNull(conf, "ClientConfigurationData must not be null");
this.conf = conf;
}

@Override
public Client build() {
return new ClientImpl(conf);
}

@Override
public ClientBuilder auth(ClientConfigurationData.Auth auth) {
if (auth.getUserId() <= 0 || StringUtil.isEmpty(auth.getUserName())){
if (auth.getUserId() <= 0 || StringUtil.isEmpty(auth.getUserName())) {
throw new IllegalArgumentException("userId and userName must be set");
}
this.conf.setAuth(auth);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
package com.crossoverjie.cim.client.sdk.impl;

import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.io.ReconnectCheck;
import com.crossoverjie.cim.client.sdk.io.backoff.BackoffStrategy;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import okhttp3.OkHttpClient;

import java.util.concurrent.ThreadPoolExecutor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ClientConfigurationData {

private boolean debug = false;

private Auth auth;

private String host;

private Integer serverPort;

private Integer httpPort;

@Data
@AllArgsConstructor
@Builder
public static class Auth{
public static class Auth {
private long userId;
private String userName;

@JsonIgnore
private String authToken;
}

private String routeUrl;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package com.crossoverjie.cim.client.sdk.impl;

import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL;

import com.crossoverjie.cim.client.sdk.Client;
import com.crossoverjie.cim.client.sdk.ClientState;
import com.crossoverjie.cim.client.sdk.FetchOfflineMsgJob;
import com.crossoverjie.cim.client.sdk.ReConnectManager;
import com.crossoverjie.cim.client.sdk.RouteManager;
import com.crossoverjie.cim.client.sdk.*;
import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer;
import com.crossoverjie.cim.common.data.construct.RingBufferWheel;
import com.crossoverjie.cim.common.exception.CIMException;
Expand All @@ -27,14 +21,18 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import static com.crossoverjie.cim.common.enums.StatusEnum.RECONNECT_FAIL;

@Slf4j
public class ClientImpl extends ClientState implements Client {
Expand Down Expand Up @@ -133,16 +131,23 @@ private void connectServer(Consumer<Void> success) {
*/
private CompletableFuture<Boolean> doConnectServer() {
CompletableFuture<Boolean> future = new CompletableFuture<>();
this.userLogin(future).ifPresentOrElse((cimServer) -> {
this.doConnectServer(cimServer, future);
this.loginServer();
this.serverInfo = cimServer;
future.complete(true);
}, () -> {
this.conf.getEvent().error("Login fail!, cannot get server info!");
this.conf.getEvent().fatal(this);
future.complete(false);
});
this.userLogin(future) // save serverInfo after login success
.ifPresentOrElse((cimServer) -> {
if (StringUtils.isBlank(cimServer.getAuthToken())) {
future.complete(false);
this.conf.getEvent().error("Login fail!, auth token is blank!");
this.conf.getEvent().fatal(this);
return;
}
getAuth().setAuthToken(cimServer.getAuthToken());
this.doConnectServer(future);
this.loginServer();
future.complete(true);
}, () -> {
this.conf.getEvent().error("Login fail!, cannot get server info!");
this.conf.getEvent().fatal(this);
future.complete(false);
});
return future;
}

Expand All @@ -157,8 +162,7 @@ private Optional<CIMServerResVO> userLogin(CompletableFuture<Boolean> future) {

CIMServerResVO cimServer = null;
try {
cimServer = routeManager.getServer(loginReqVO);
log.info("cimServer=[{}]", cimServer);
serverInfo = cimServer = routeManager.getServer(loginReqVO);
} catch (Exception e) {
log.error("login fail", e);
future.completeExceptionally(e);
Expand All @@ -168,14 +172,21 @@ private Optional<CIMServerResVO> userLogin(CompletableFuture<Boolean> future) {

private final EventLoopGroup group = new NioEventLoopGroup(0, new DefaultThreadFactory("cim-work"));

private void doConnectServer(CIMServerResVO cimServer, CompletableFuture<Boolean> future) {
private void doConnectServer(CompletableFuture<Boolean> future) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new CIMClientHandleInitializer());
.handler(new CIMClientHandleInitializer(conf.isDebug(), getAuth()));
ChannelFuture sync;
try {
sync = bootstrap.connect(cimServer.getIp(), cimServer.getCimServerPort()).sync();
final String host = checkHost();
final Integer port = checkPort();
if (StringUtils.isBlank(host) || Objects.isNull(port)) {
this.conf.getEvent().error("cim server host or port is null");
future.complete(false);
return;
}
sync = bootstrap.connect(host, port).sync();
if (sync.isSuccess()) {
this.conf.getEvent().info("Start cim client success!");
channel = (SocketChannel) sync.channel();
Expand Down Expand Up @@ -205,6 +216,7 @@ private void loginServer() {
* 2. reconnect.
* 3. shutdown reconnect job.
* 4. reset reconnect state.
*
* @throws Exception
*/
public void reconnect() throws Exception {
Expand Down Expand Up @@ -238,6 +250,25 @@ public void close() {
ringBufferWheel.stop(true);
}

@Override
public String checkHost() {
// 优先使用直连的方式
final String host = StringUtils.isNoneBlank(conf.getHost()) ? conf.getHost() : serverInfo.getIp();
if (StringUtils.isBlank(host)) {
throw new IllegalArgumentException("cim server host is null");
}
return host;
}

@Override
public Integer checkPort() {
final Integer port = Objects.nonNull(conf.getServerPort()) ? conf.getServerPort() : serverInfo.getCimServerPort();
if (Objects.isNull(port)) {
throw new IllegalArgumentException("cim server port is null");
}
return port;
}

@Override
public CompletableFuture<Void> sendP2PAsync(P2PReqVO p2PReqVO) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
package com.crossoverjie.cim.client.sdk.io;

import com.crossoverjie.cim.client.sdk.ClientState;
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.impl.ClientImpl;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.enums.ChannelAttributeKeys;
import com.crossoverjie.cim.common.protocol.BaseCommand;
import com.crossoverjie.cim.common.protocol.Request;
import com.crossoverjie.cim.common.protocol.Response;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.common.StringUtils;

@ChannelHandler.Sharable
@Slf4j
public class CIMClientHandle extends SimpleChannelInboundHandler<Response> {


private final ClientConfigurationData.Auth auth;

public CIMClientHandle(ClientConfigurationData.Auth auth) {
this.auth = auth;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

Expand All @@ -41,8 +49,41 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc

@Override
public void channelActive(ChannelHandlerContext ctx) {
ClientImpl.getClient().getConf().getEvent().debug("ChannelActive");
ClientImpl.getClient().setState(ClientState.State.Ready);

// 获取认证信息
// channelActive 执行时间过早,所以这些属性没办法放在 channel 上
final String token = auth.getAuthToken();
if (StringUtils.isBlank(token)) {
log.error("auth token is blank!");
ctx.close();
return;
}
final long userId = auth.getUserId();

// 连接建立之后就发送认证请求
Request authReq = Request.newBuilder()
.setRequestId(userId)
.setCmd(BaseCommand.LOGIN_REQUEST)
.setReqMsg(token)
.build();

ctx.writeAndFlush(authReq).syncUninterruptibly().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("auth msg send success,userId:{},userName:{}", auth.getUserId(), auth.getUserName());
ClientImpl.getClient().getConf().getEvent().debug("ChannelActive");
ctx.channel().attr(ChannelAttributeKeys.USER_ID).set(userId);
log.info("channel is active,userId:{}", userId);
ClientImpl.getClient().setState(ClientState.State.Ready);
} else {
log.error("auth msg send failure,userId:{},userName:{}", auth.getUserId(), auth.getUserName());
ctx.channel().close(); // 认证失败关闭连接
}
}
});


}

@Override
Expand Down Expand Up @@ -75,7 +116,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Response msg) {
client.getConf().getCallbackThreadPool().execute(() -> {
log.info("client address: {} :{}", ctx.channel().remoteAddress(), client);
MessageListener messageListener = client.getConf().getMessageListener();
if (msg.getBatchResMsgCount() >0 ){
if (msg.getBatchResMsgCount() > 0) {
for (int i = 0; i < msg.getBatchResMsgCount(); i++) {
messageListener.received(client, msg.getPropertiesMap(), msg.getBatchResMsg(i));
}
Expand Down
Loading
Loading