From b551fc23b6f375827c358045bf313434eb3a54f5 Mon Sep 17 00:00:00 2001 From: deferz Date: Thu, 3 Jul 2025 23:27:35 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E4=BD=BF=E7=94=A8=20WebSocketChann?= =?UTF-8?q?el=20=E6=9B=BF=E4=BB=A3=20Socket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/manager/connect_manager.dart | 240 ++++++++++++++++++++++++------- pubspec.yaml | 1 + 2 files changed, 186 insertions(+), 55 deletions(-) diff --git a/lib/manager/connect_manager.dart b/lib/manager/connect_manager.dart index 745eac3..ace52f8 100644 --- a/lib/manager/connect_manager.dart +++ b/lib/manager/connect_manager.dart @@ -1,11 +1,11 @@ import 'dart:async'; import 'dart:collection'; import 'dart:convert'; -import 'dart:io'; import 'dart:typed_data'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:uuid/uuid.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:wukongimfluttersdk/db/const.dart'; import 'package:wukongimfluttersdk/db/wk_db_helper.dart'; @@ -22,46 +22,67 @@ import '../proto/proto.dart'; import '../type/const.dart'; class _WKSocket { - Socket? _socket; // 将 _socket 声明为可空类型 + WebSocketChannel? _channel; bool _isListening = false; + bool _isClosed = false; static _WKSocket? _instance; - _WKSocket._internal(this._socket); + _WKSocket._internal(this._channel); - factory _WKSocket.newSocket(Socket socket) { - _instance ??= _WKSocket._internal(socket); + factory _WKSocket.newSocket(WebSocketChannel channel) { + _instance ??= _WKSocket._internal(channel); return _instance!; } void close() { _isListening = false; + _isClosed = true; _instance = null; try { - _socket?.close(); - // _socket?.destroy(); + _channel?.sink.close(); + } catch (e) { + Logs.debug('关闭WebSocket时发生错误: $e'); } finally { - _socket = null; // 现在可以将 _socket 设置为 null + _channel = null; } } - send(Uint8List data) { + bool get isConnected => _channel != null && !_isClosed; + + Future send(Uint8List data) async { try { - if (_socket?.remotePort != null) { - _socket?.add(data); // 使用安全调用操作符 - return _socket?.flush(); + if (isConnected) { + _channel!.sink.add(data); + } else { + throw Exception('WebSocket未连接'); } } catch (e) { - Logs.debug('发送消息错误$e'); + Logs.debug('发送消息错误: $e'); + rethrow; } } - void listen(void Function(Uint8List data) onData, void Function() error) { - if (!_isListening && _socket != null) { - _socket!.listen(onData, onError: (err) { - Logs.debug('socket断开了${err.toString()}'); - }, onDone: () { - // close(); // 关闭和重置 Socket 连接 - // error(); - }); + void listen(void Function(Uint8List data) onData, void Function() onError) { + if (!_isListening && isConnected) { + _channel!.stream.listen( + (data) { + if (data is Uint8List) { + onData(data); + } else if (data is List) { + onData(Uint8List.fromList(data)); + } else { + Logs.debug('收到非二进制数据: $data'); + } + }, + onError: (err) { + Logs.debug('WebSocket错误: ${err.toString()}'); + onError(); + }, + onDone: () { + Logs.debug('WebSocket连接关闭'); + onError(); + }, + cancelOnError: false, // 不因为错误而取消监听 + ); _isListening = true; } } @@ -72,22 +93,47 @@ class WKConnectionManager { static final WKConnectionManager _instance = WKConnectionManager._privateConstructor(); static WKConnectionManager get shared => _instance; - // bool _isLogout = false; + + // 连接状态 bool isDisconnection = false; bool isReconnection = false; bool isNetworkUnavailable = false; + bool _isConnecting = false; + + // 重连配置 final int reconnMilliseconds = 1500; + final int maxReconnectAttempts = 10; + int _reconnectAttempts = 0; + + // 定时器 Timer? heartTimer; Timer? checkNetworkTimer; + Timer? reconnectTimer; + final heartIntervalSecond = const Duration(seconds: 60); final checkNetworkSecond = const Duration(seconds: 1); + + // 心跳相关 int unReceivePongCount = 0; + final int maxUnReceivePongCount = 3; + + // 消息管理 final LinkedHashMap _sendingMsgMap = LinkedHashMap(); + + // 连接监听器 HashMap? _connectionListenerMap; + + // WebSocket实例 _WKSocket? _socket; + + // 网络状态 ConnectivityResult? lastConnectivityResult; final Connectivity _connectivity = Connectivity(); + // 连接状态获取器 + bool get isConnected => _socket?.isConnected ?? false; + bool get isConnecting => _isConnecting; + addOnConnectionStatus(String key, Function(int, int?, ConnectionInfo?) back) { _connectionListenerMap ??= HashMap(); _connectionListenerMap![key] = back; @@ -108,6 +154,18 @@ class WKConnectionManager { } connect() { + // 检查是否已经在连接中 + if (_isConnecting) { + Logs.debug('正在连接中,跳过重复连接请求'); + return; + } + + // 检查是否已经连接 + if (isConnected) { + Logs.debug('已经连接,无需重复连接'); + return; + } + var addr = WKIM.shared.options.addr; if ((addr == null || addr == "") && WKIM.shared.options.getAddr == null) { Logs.info("没有配置addr!"); @@ -121,10 +179,19 @@ class WKConnectionManager { return; } if (isNetworkUnavailable) { + Logs.debug('网络不可用,跳过连接'); return; } + + // 重置重连计数 + if (!isReconnection) { + _reconnectAttempts = 0; + } + disconnect(false); isDisconnection = false; + _isConnecting = true; + if (WKIM.shared.options.getAddr != null) { WKIM.shared.options.getAddr!((String addr) { _socketConnect(addr); @@ -156,50 +223,90 @@ class WKConnectionManager { _connectFail('连接地址为空'); return; } - var addrs = addr.split(":"); - var host = addrs[0]; - var port = addrs[1]; + + // 将TCP地址转换为WebSocket地址 + String wsAddr = addr; + if (!addr.startsWith('ws://') && !addr.startsWith('wss://')) { + var addrs = addr.split(":"); + var host = addrs[0]; + var port = addrs[1]; + wsAddr = 'ws://$host:$port'; + } + try { setConnectionStatus(WKConnectStatus.connecting); - Socket.connect(host, int.parse(port), timeout: const Duration(seconds: 5)) - .then((socket) { - _socket = _WKSocket.newSocket(socket); - _connectSuccess(); - }).catchError((err) { - _connectFail(err); - }).onError((err, stackTrace) { - _connectFail(err); - }); + + final channel = WebSocketChannel.connect( + Uri.parse(wsAddr), + protocols: ['wukongim'], // 可以添加自定义协议 + ); + + _socket = _WKSocket.newSocket(channel); + _connectSuccess(); } catch (e) { - Logs.error(e.toString()); + Logs.error('WebSocket连接失败: ${e.toString()}'); + _connectFail(e); } } // socket 连接成功 _connectSuccess() { + _isConnecting = false; + _reconnectAttempts = 0; // 重置重连计数 + isReconnection = false; + unReceivePongCount = 0; // 重置心跳计数 + + Logs.info('WebSocket连接成功'); + // 监听消息 _socket?.listen((Uint8List data) { _cutDatas(data); - // _decodePacket(data); }, () { if (isDisconnection) { - Logs.debug("登出了"); + Logs.debug("主动断开连接"); return; } - // isReconnection = true; + + Logs.debug("连接断开,准备重连"); + // 延迟重连,避免立即重连 Future.delayed(Duration(milliseconds: reconnMilliseconds), () { - connect(); + if (!isDisconnection) { + connect(); + } }); }); + // 发送连接包 _sendConnectPacket(); } _connectFail(error) { - // _socket?.close(); - Future.delayed(Duration(milliseconds: reconnMilliseconds), () { - connect(); - }); + _isConnecting = false; + Logs.error('连接失败: $error'); + + // 如果不是主动断开,则尝试重连 + if (!isDisconnection && !isNetworkUnavailable) { + _reconnectAttempts++; + + if (_reconnectAttempts <= maxReconnectAttempts) { + Logs.info('第 $_reconnectAttempts 次重连尝试,${reconnMilliseconds}ms后重试'); + isReconnection = true; + + // 使用指数退避策略 + int delay = reconnMilliseconds * _reconnectAttempts; + if (delay > 30000) delay = 30000; // 最大30秒 + + reconnectTimer?.cancel(); + reconnectTimer = Timer(Duration(milliseconds: delay), () { + if (!isDisconnection) { + connect(); + } + }); + } else { + Logs.error('重连次数超过最大限制 ($maxReconnectAttempts),停止重连'); + setConnectionStatus(WKConnectStatus.fail, reasoncode: -1); + } + } } testCutData(Uint8List data) { @@ -331,17 +438,31 @@ class WKConnectionManager { } _closeAll() { - // _isLogout = true; - // WKIM.shared.options.uid = ''; - // WKIM.shared.options.token = ''; - // WKIM.shared.messageManager.updateSendingMsgFail(); + Logs.debug('清理所有连接资源'); + + // 停止所有定时器 _stopCheckNetworkTimer(); _stopHeartTimer(); + _stopReconnectTimer(); + + // 关闭WebSocket连接 if (_socket != null) { _socket!.close(); + _socket = null; } - // WKDBHelper.shared.close(); + // 重置状态 + _isConnecting = false; + isReconnection = false; + unReceivePongCount = 0; + _reconnectAttempts = 0; + } + + _stopReconnectTimer() { + if (reconnectTimer != null) { + reconnectTimer!.cancel(); + reconnectTimer = null; + } } _sendReceAckPacket(BigInt messageID, int messageSeq, PacketHeader header) { @@ -370,8 +491,8 @@ class WKConnectionManager { _sendPacket(Packet packet) async { var data = WKIM.shared.options.proto.encode(packet); - if (!isReconnection) { - await _socket?.send(data); + if (!isReconnection && _socket != null) { + await _socket!.send(data); } } @@ -384,7 +505,7 @@ class WKConnectionManager { isReconnection = true; isNetworkUnavailable = true; Logs.debug('网络断开了'); - _checkSedingMsg(); + _checkSedingMsg(); // 移除 await,因为 Timer 回调中不能使用 await setConnectionStatus(WKConnectStatus.noNetwork); lastConnectivityResult = ConnectivityResult.none; } else { @@ -418,13 +539,19 @@ class WKConnectionManager { _startHeartTimer() { _stopHeartTimer(); heartTimer = Timer.periodic(heartIntervalSecond, (timer) { - if (unReceivePongCount > 0) { - Logs.debug('心跳包未收到pong,重连中...'); + if (!isConnected) { + Logs.debug('连接已断开,停止心跳'); + return; + } + + if (unReceivePongCount >= maxUnReceivePongCount) { + Logs.debug('心跳包未收到pong次数过多($unReceivePongCount),重连中...'); isReconnection = false; connect(); return; } - Logs.info('ping...'); + + Logs.info('发送ping...'); unReceivePongCount++; _sendPacket(PingPacket()); }); @@ -610,7 +737,10 @@ class WKConnectionManager { (DateTime.now().millisecondsSinceEpoch / 1000).truncate(); wkSendingMsg.sendCount++; _sendingMsgMap[key] = wkSendingMsg; - _sendPacket(wkSendingMsg.sendPacket); + // 使用 Future.microtask 来处理异步发送 + Future.microtask(() async { + await _sendPacket(wkSendingMsg.sendPacket); + }); Logs.debug("消息发送失败,尝试重发中..."); } } diff --git a/pubspec.yaml b/pubspec.yaml index c9c7c70..91d1084 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -45,6 +45,7 @@ dependencies: shared_preferences: ^2.2.0 sqflite: ^2.4.1 connectivity_plus: ^6.1.0 + web_socket_channel: ^3.0.3 dev_dependencies: flutter_test: From 610c453578883c6def8412e68b8a6c625c2c9733 Mon Sep 17 00:00:00 2001 From: deferz Date: Thu, 3 Jul 2025 23:33:44 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20demo=20=E4=BD=BF=E7=94=A8wss=5Faddr?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/lib/http.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/lib/http.dart b/example/lib/http.dart index be3d4c5..4b39117 100644 --- a/example/lib/http.dart +++ b/example/lib/http.dart @@ -67,7 +67,7 @@ class HttpUtils { try { final response = await dio.get('/users/$uid/route'); if (response.statusCode == HttpStatus.ok) { - return response.data['tcp_addr'] ?? ''; + return response.data['wss_addr'] ?? ''; } } catch (e) { print('Get IP error: $e');