Skip to content

Commit 53d3452

Browse files
committed
perf: ack链表处理函数添加节流措施
1 parent 8ba20e6 commit 53d3452

File tree

2 files changed

+32
-25
lines changed

2 files changed

+32
-25
lines changed

mqttclient/RyanMqttClient.h

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,26 +92,27 @@ extern "C"
9292

9393
typedef struct
9494
{
95-
uint8_t lwtFlag : 1; // 遗嘱标志位
96-
uint8_t destoryFlag : 1; // 销毁标志位
97-
uint16_t ackHandlerCount; // 等待ack的记录个数
98-
uint16_t packetId; // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
99-
uint32_t eventFlag; // 事件标志位
100-
RyanMqttState_e clientState; // mqtt客户端的状态
101-
RyanList_t msgHandlerList; // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
102-
RyanList_t ackHandlerList; // 维护ack链表
103-
RyanList_t userAckHandlerList; // 用户接口的ack链表,会由mqtt线程移动到ack链表
104-
platformTimer_t keepaliveTimer; // 保活定时器
105-
platformTimer_t keepaliveDebounTimer; // 保活定时器消抖
106-
platformNetwork_t network; // 网络组件
107-
RyanMqttClientConfig_t config; // mqtt config
108-
platformThread_t mqttThread; // mqtt线程
109-
platformMutex_t msgHandleLock; // msg链表锁
110-
platformMutex_t ackHandleLock; // ack链表锁
111-
platformMutex_t userAckHandleLock; // 用户接口的ack链表锁
112-
platformMutex_t sendBufLock; // 写缓冲区锁
113-
platformCritical_t criticalLock; // 临界区锁
114-
lwtOptions_t lwtOptions; // 遗嘱相关配置
95+
uint8_t lwtFlag : 1; // 遗嘱标志位
96+
uint8_t destoryFlag : 1; // 销毁标志位
97+
uint16_t ackHandlerCount; // 等待ack的记录个数
98+
uint16_t packetId; // mqtt报文标识符,控制报文必须包含一个非零的 16 位报文标识符
99+
uint32_t eventFlag; // 事件标志位
100+
RyanMqttState_e clientState; // mqtt客户端的状态
101+
RyanList_t msgHandlerList; // 维护消息处理列表,这是mqtt协议必须实现的内容,所有来自服务器的publish报文都会被处理(前提是订阅了对应的消息,或者设置了拦截器)
102+
RyanList_t ackHandlerList; // 维护ack链表
103+
RyanList_t userAckHandlerList; // 用户接口的ack链表,会由mqtt线程移动到ack链表
104+
platformTimer_t ackScanThrottleTimer; // ack链表检查节流定时器
105+
platformTimer_t keepaliveTimer; // 保活定时器
106+
platformTimer_t keepaliveThrottleTimer; // 保活检查节流定时器
107+
platformNetwork_t network; // 网络组件
108+
RyanMqttClientConfig_t config; // mqtt config
109+
platformThread_t mqttThread; // mqtt线程
110+
platformMutex_t msgHandleLock; // msg链表锁
111+
platformMutex_t ackHandleLock; // ack链表锁
112+
platformMutex_t userAckHandleLock; // 用户接口的ack链表锁
113+
platformMutex_t sendBufLock; // 写缓冲区锁
114+
platformCritical_t criticalLock; // 临界区锁
115+
lwtOptions_t lwtOptions; // 遗嘱相关配置
115116
} RyanMqttClient_t;
116117

117118
/* extern variables-----------------------------------------------------------*/

mqttclient/RyanMqttThread.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
2525
RyanMqttConnectStatus_e connectState = RyanMqttKeepaliveTimeout;
2626
RyanMqttError_e result = RyanMqttFailedError;
2727
int32_t packetLen = 0;
28+
uint32_t timeRemain = 0;
2829
RyanMqttAssert(NULL != client);
2930

3031
// mqtt没有连接就退出
3132
if (RyanMqttConnectState != RyanMqttGetClientState(client))
3233
return RyanMqttNotConnectError;
3334

34-
uint32_t timeRemain = platformTimerRemain(&client->keepaliveTimer);
35+
timeRemain = platformTimerRemain(&client->keepaliveTimer);
3536

3637
// 超过设置的 1.4 倍心跳周期
3738
if (0 == timeRemain)
@@ -46,8 +47,8 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
4647
else if (timeRemain < client->config.recvTimeout ||
4748
timeRemain < 1000 * 0.5 * client->config.keepaliveTimeoutS)
4849
{
49-
// 消抖时间内不发送心跳包
50-
if (platformTimerRemain(&client->keepaliveDebounTimer))
50+
// 节流时间内不发送心跳报文
51+
if (platformTimerRemain(&client->keepaliveThrottleTimer))
5152
return RyanMqttSuccessError;
5253

5354
platformMutexLock(client->config.userData, &client->sendBufLock); // 获取互斥锁
@@ -62,7 +63,7 @@ static RyanMqttError_e RyanMqttKeepalive(RyanMqttClient_t *client)
6263
});
6364
platformMutexUnLock(client->config.userData, &client->sendBufLock); // 释放互斥锁
6465

65-
platformTimerCutdown(&client->keepaliveDebounTimer, 1500); // 启动心跳消抖定时器
66+
platformTimerCutdown(&client->keepaliveThrottleTimer, 1500); // 启动心跳检查节流定时器
6667
}
6768

6869
return RyanMqttSuccessError;
@@ -557,6 +558,10 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
557558
if (RyanMqttConnectState != RyanMqttGetClientState(client))
558559
return;
559560

561+
// 节流时间内不检查ack链表
562+
if (platformTimerRemain(&client->ackScanThrottleTimer))
563+
return;
564+
560565
platformMutexLock(client->config.userData, &client->ackHandleLock);
561566
RyanListForEachSafe(curr, next, &client->ackHandlerList)
562567
{
@@ -623,8 +628,9 @@ static void RyanMqttAckListScan(RyanMqttClient_t *client, RyanMqttBool_e WaitFla
623628
}
624629
}
625630
}
626-
627631
platformMutexUnLock(client->config.userData, &client->ackHandleLock);
632+
633+
platformTimerCutdown(&client->ackScanThrottleTimer, 1000); // 启动ack scan节流定时器
628634
}
629635

630636
/**

0 commit comments

Comments
 (0)