Skip to content

Commit c01611e

Browse files
authored
refactor: performance optimization of pull messages and reduce redundant data synchronization. (#694)
* fix: Bug fix for clearing unread messages. Signed-off-by: Gordon <[email protected]> * fix: Bug fix for pull messages. Signed-off-by: Gordon <[email protected]> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <[email protected]> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <[email protected]> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <[email protected]> * refactor: performance optimization of pull messages. Signed-off-by: Gordon <[email protected]> * refactor: performance optimization of pull messages and reduce redundant data synchronization. Signed-off-by: Gordon <[email protected]> --------- Signed-off-by: Gordon <[email protected]>
1 parent 2522772 commit c01611e

File tree

8 files changed

+137
-155
lines changed

8 files changed

+137
-155
lines changed

internal/conversation_msg/conversation_msg.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"encoding/json"
2020
"errors"
21+
"fmt"
2122
"math"
2223
"sync"
2324

@@ -470,7 +471,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
470471
}
471472
}
472473

473-
log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
474+
log.ZDebug(ctx, "insert msg", "duration", fmt.Sprintf("%dms", time.Since(b)), "len", len(allMsg))
474475
}
475476

476477
func (c *Conversation) doMsgSyncByReinstalled(c2v common.Cmd2Value) {

internal/interaction/long_conn_mgr.go

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,15 @@ import (
2727
"sync"
2828
"time"
2929

30+
"github.com/golang/protobuf/proto"
31+
"github.com/gorilla/websocket"
32+
3033
"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
3134
"github.com/openimsdk/openim-sdk-core/v3/pkg/ccontext"
3235
"github.com/openimsdk/openim-sdk-core/v3/pkg/common"
3336
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
3437
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
3538
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
36-
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
37-
38-
"github.com/golang/protobuf/proto"
39-
"github.com/gorilla/websocket"
4039

4140
"github.com/openimsdk/protocol/sdkws"
4241
"github.com/openimsdk/tools/errs"
@@ -88,7 +87,6 @@ type LongConnMgr struct {
8887
pushMsgAndMaxSeqCh chan common.Cmd2Value
8988
conversationCh chan common.Cmd2Value
9089
loginMgrCh chan common.Cmd2Value
91-
heartbeatCh chan common.Cmd2Value
9290
closedErr error
9391
ctx context.Context
9492
IsCompression bool
@@ -110,7 +108,7 @@ type Message struct {
110108
Resp chan *GeneralWsResp
111109
}
112110

113-
func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnListener, userOnline func(map[string][]int32), heartbeatCmdCh, pushMsgAndMaxSeqCh, loginMgrCh chan common.Cmd2Value) *LongConnMgr {
111+
func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnListener, userOnline func(map[string][]int32), pushMsgAndMaxSeqCh, loginMgrCh chan common.Cmd2Value) *LongConnMgr {
114112
l := &LongConnMgr{
115113
listener: listener,
116114
userOnline: userOnline,
@@ -127,7 +125,6 @@ func NewLongConnMgr(ctx context.Context, listener open_im_sdk_callback.OnConnLis
127125
l.conn = NewWebSocket(WebSocket)
128126
l.connWrite = new(sync.Mutex)
129127
l.ctx = ctx
130-
l.heartbeatCh = heartbeatCmdCh
131128
return l
132129
}
133130
func (c *LongConnMgr) Run(ctx context.Context) {
@@ -318,8 +315,6 @@ func (c *LongConnMgr) heartbeat(ctx context.Context) {
318315
case <-ctx.Done():
319316
log.ZInfo(ctx, "heartbeat done sdk logout.....")
320317
return
321-
case <-c.heartbeatCh:
322-
c.retrieveMaxSeq(ctx)
323318
case <-ticker.C:
324319
log.ZInfo(ctx, "sendPingMessage", "goroutine ID:", getGoroutineID())
325320
c.sendPingMessage(ctx)
@@ -356,51 +351,6 @@ func getGoroutineID() int64 {
356351
return id
357352
}
358353

359-
func (c *LongConnMgr) retrieveMaxSeq(ctx context.Context) {
360-
if c.conn == nil {
361-
return
362-
}
363-
var m sdkws.GetMaxSeqReq
364-
m.UserID = ccontext.Info(ctx).UserID()
365-
opID := utils.OperationIDGenerator()
366-
sCtx := ccontext.WithOperationID(c.ctx, opID)
367-
log.ZInfo(sCtx, "retrieveMaxSeq start", "goroutine ID:", getGoroutineID())
368-
data, err := proto.Marshal(&m)
369-
if err != nil {
370-
log.ZError(sCtx, "proto.Marshal", err)
371-
return
372-
}
373-
req := &GeneralWsReq{
374-
ReqIdentifier: constant.GetNewestSeq,
375-
SendID: m.UserID,
376-
OperationID: opID,
377-
Data: data,
378-
}
379-
resp, err := c.sendAndWaitResp(req)
380-
if err != nil {
381-
log.ZError(sCtx, "sendAndWaitResp", err)
382-
_ = c.close()
383-
time.Sleep(time.Second * 1)
384-
return
385-
} else {
386-
if resp.ErrCode != 0 {
387-
log.ZError(sCtx, "retrieveMaxSeq failed", nil, "errCode:", resp.ErrCode, "errMsg:", resp.ErrMsg)
388-
}
389-
var wsSeqResp sdkws.GetMaxSeqResp
390-
err = proto.Unmarshal(resp.Data, &wsSeqResp)
391-
if err != nil {
392-
log.ZError(sCtx, "proto.Unmarshal", err)
393-
}
394-
var cmd sdk_struct.CmdMaxSeqToMsgSync
395-
cmd.ConversationMaxSeqOnSvr = wsSeqResp.MaxSeqs
396-
397-
err := common.TriggerCmdMaxSeq(sCtx, &cmd, c.pushMsgAndMaxSeqCh)
398-
if err != nil {
399-
log.ZError(sCtx, "TriggerCmdMaxSeq failed", err)
400-
}
401-
}
402-
}
403-
404354
func (c *LongConnMgr) sendAndWaitResp(msg *GeneralWsReq) (*GeneralWsResp, error) {
405355
tempChan, err := c.writeBinaryMsgAndRetry(msg)
406356
defer c.Syncer.DelCh(msg.MsgIncr)

internal/interaction/msg_sync.go

Lines changed: 107 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"runtime/debug"
2121
"strings"
2222
"sync"
23+
"time"
2324

2425
"golang.org/x/sync/errgroup"
2526

@@ -42,7 +43,8 @@ const (
4243
pullMsgGoroutineLimit = 10
4344
)
4445

45-
// The callback synchronization starts. The reconnection ends
46+
// MsgSyncer is a central hub for message relay, responsible for sequential message gap pulling,
47+
// handling network events, and managing app foreground and background events.
4648
type MsgSyncer struct {
4749
loginUserID string // login user ID
4850
longConnMgr *LongConnMgr // long connection manager
@@ -54,6 +56,8 @@ type MsgSyncer struct {
5456
syncTimes int // times of sync
5557
ctx context.Context // context
5658
reinstalled bool //true if the app was uninstalled and reinstalled
59+
isSyncing bool // indicates whether data is being synced
60+
isSyncingLock sync.Mutex // lock for syncing state
5761

5862
}
5963

@@ -183,9 +187,19 @@ func (m *MsgSyncer) handlePushMsgAndEvent(cmd common.Cmd2Value) {
183187
switch cmd.Cmd {
184188
case constant.CmdConnSuccesss:
185189
log.ZInfo(cmd.Ctx, "recv long conn mgr connected", "cmd", cmd.Cmd, "value", cmd.Value)
186-
m.doConnected(cmd.Ctx)
187-
case constant.CmdMaxSeq:
188-
log.ZInfo(cmd.Ctx, "recv max seqs from long conn mgr, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value)
190+
if m.startSync() {
191+
m.doConnected(cmd.Ctx)
192+
} else {
193+
log.ZWarn(cmd.Ctx, "syncing, ignore connected event", nil, "cmd", cmd.Cmd, "value", cmd.Value)
194+
}
195+
case constant.CmdWakeUpDataSync:
196+
log.ZInfo(cmd.Ctx, "app wake up, start sync msgs", "cmd", cmd.Cmd, "value", cmd.Value)
197+
if m.startSync() {
198+
m.doWakeupDataSync(cmd.Ctx)
199+
} else {
200+
log.ZWarn(cmd.Ctx, "syncing, ignore wake up event", nil, "cmd", cmd.Cmd, "value", cmd.Value)
201+
202+
}
189203
m.compareSeqsAndBatchSync(cmd.Ctx, cmd.Value.(*sdk_struct.CmdMaxSeqToMsgSync).ConversationMaxSeqOnSvr, defaultPullNums)
190204
case constant.CmdPushMsg:
191205
m.doPushMsg(cmd.Ctx, cmd.Value.(*sdkws.PushMessages))
@@ -200,7 +214,9 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
200214
messagesSeqMap := make(map[string]int64)
201215
for conversationID, seq := range maxSeqToSync {
202216
if IsNotification(conversationID) {
203-
notificationsSeqMap[conversationID] = seq
217+
if seq != 0 { // seq is 0, no need to sync
218+
notificationsSeqMap[conversationID] = seq
219+
}
204220
} else {
205221
messagesSeqMap[conversationID] = seq
206222
}
@@ -243,13 +259,40 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
243259
needSyncSeqMap[conversationID] = [2]int64{syncedMaxSeq + 1, maxSeq}
244260
}
245261
} else {
246-
needSyncSeqMap[conversationID] = [2]int64{0, maxSeq}
262+
if maxSeq != 0 { // seq is 0, no need to sync
263+
needSyncSeqMap[conversationID] = [2]int64{0, maxSeq}
264+
}
247265
}
248266
}
249267
_ = m.syncAndTriggerMsgs(m.ctx, needSyncSeqMap, pullNums)
250268
}
251269
}
252270

271+
// startSync checks if the sync is already in progress.
272+
// If syncing is in progress, it returns false. Otherwise, it starts syncing and returns true.
273+
func (ms *MsgSyncer) startSync() bool {
274+
ms.isSyncingLock.Lock()
275+
defer ms.isSyncingLock.Unlock()
276+
277+
if ms.isSyncing {
278+
// If already syncing, return false
279+
return false
280+
}
281+
282+
// Set syncing to true and start the sync
283+
ms.isSyncing = true
284+
285+
// Create a goroutine that waits for 5 seconds and then sets isSyncing to false
286+
go func() {
287+
time.Sleep(5 * time.Second)
288+
ms.isSyncingLock.Lock()
289+
ms.isSyncing = false
290+
ms.isSyncingLock.Unlock()
291+
}()
292+
293+
return true
294+
}
295+
253296
func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) {
254297
log.ZDebug(ctx, "push msgs", "push", push, "syncedMaxSeqs", m.syncedMaxSeqs)
255298
m.pushTriggerAndSync(ctx, push.Msgs, m.triggerConversation)
@@ -308,83 +351,83 @@ func (m *MsgSyncer) doConnected(ctx context.Context) {
308351
}
309352
}
310353

354+
func (m *MsgSyncer) doWakeupDataSync(ctx context.Context) {
355+
common.TriggerCmdSyncData(ctx, m.conversationCh)
356+
var resp sdkws.GetMaxSeqResp
357+
if err := m.longConnMgr.SendReqWaitResp(m.ctx, &sdkws.GetMaxSeqReq{UserID: m.loginUserID}, constant.GetNewestSeq, &resp); err != nil {
358+
log.ZError(m.ctx, "get max seq error", err)
359+
return
360+
} else {
361+
log.ZDebug(m.ctx, "get max seq success", "resp", resp.MaxSeqs)
362+
}
363+
m.compareSeqsAndBatchSync(ctx, resp.MaxSeqs, defaultPullNums)
364+
}
365+
311366
func IsNotification(conversationID string) bool {
312367
return strings.HasPrefix(conversationID, "n_")
313368
}
314369

315-
// Fragment synchronization message, seq refresh after successful trigger
316370
func (m *MsgSyncer) syncAndTriggerMsgs(ctx context.Context, seqMap map[string][2]int64, syncMsgNum int64) error {
317-
if len(seqMap) > 0 {
318-
log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap)
319-
var (
320-
tempSeqMap = make(map[string][2]int64, 50)
321-
msgNum = 0
322-
)
323-
for k, v := range seqMap {
324-
oneConversationSyncNum := v[1] - v[0] + 1
325-
if (oneConversationSyncNum/SplitPullMsgNum) > 1 && IsNotification(k) {
326-
nSeqMap := make(map[string][2]int64, 1)
327-
count := int(oneConversationSyncNum / SplitPullMsgNum)
328-
startSeq := v[0]
329-
var end int64
330-
for i := 0; i <= count; i++ {
331-
if i == count {
332-
nSeqMap[k] = [2]int64{startSeq, v[1]}
333-
} else {
334-
end = startSeq + int64(SplitPullMsgNum)
335-
if end > v[1] {
336-
end = v[1]
337-
i = count
338-
}
339-
nSeqMap[k] = [2]int64{startSeq, end}
340-
}
341-
resp, err := m.pullMsgBySeqRange(ctx, nSeqMap, syncMsgNum)
342-
if err != nil {
343-
log.ZError(ctx, "syncMsgFromSvr err", err, "nSeqMap", nSeqMap)
344-
return err
345-
}
346-
_ = m.triggerConversation(ctx, resp.Msgs)
347-
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
348-
for conversationID, seqs := range nSeqMap {
349-
m.syncedMaxSeqs[conversationID] = seqs[1]
350-
}
351-
startSeq = end + 1
352-
}
353-
continue
371+
if len(seqMap) == 0 {
372+
log.ZDebug(ctx, "nothing to sync", "syncMsgNum", syncMsgNum)
373+
return nil
374+
}
375+
376+
log.ZDebug(ctx, "current sync seqMap", "seqMap", seqMap)
377+
var (
378+
tempSeqMap = make(map[string][2]int64, 50)
379+
msgNum = 0
380+
)
381+
382+
for k, v := range seqMap {
383+
oneConversationSyncNum := v[1] - v[0] + 1
384+
tempSeqMap[k] = v
385+
// For notification conversations, use oneConversationSyncNum directly
386+
if IsNotification(k) {
387+
msgNum += int(oneConversationSyncNum)
388+
} else {
389+
// For regular conversations, ensure msgNum is the minimum of oneConversationSyncNum and syncMsgNum
390+
currentSyncMsgNum := int64(0)
391+
if oneConversationSyncNum > syncMsgNum {
392+
currentSyncMsgNum = syncMsgNum
393+
} else {
394+
currentSyncMsgNum = oneConversationSyncNum
354395
}
355-
tempSeqMap[k] = v
356-
if oneConversationSyncNum > 0 {
357-
msgNum += int(oneConversationSyncNum)
396+
msgNum += int(currentSyncMsgNum)
397+
}
398+
399+
// If accumulated msgNum reaches SplitPullMsgNum, trigger a batch pull
400+
if msgNum >= SplitPullMsgNum {
401+
resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum)
402+
if err != nil {
403+
log.ZError(ctx, "syncMsgFromSvr error", err, "tempSeqMap", tempSeqMap)
404+
return err
358405
}
359-
if msgNum >= SplitPullMsgNum {
360-
resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum)
361-
if err != nil {
362-
log.ZError(ctx, "syncMsgFromSvr err", err, "tempSeqMap", tempSeqMap)
363-
return err
364-
}
365-
_ = m.triggerConversation(ctx, resp.Msgs)
366-
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
367-
for conversationID, seqs := range tempSeqMap {
368-
m.syncedMaxSeqs[conversationID] = seqs[1]
369-
}
370-
tempSeqMap = make(map[string][2]int64, 50)
371-
msgNum = 0
406+
_ = m.triggerConversation(ctx, resp.Msgs)
407+
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
408+
for conversationID, seqs := range tempSeqMap {
409+
m.syncedMaxSeqs[conversationID] = seqs[1]
372410
}
411+
// Reset tempSeqMap and msgNum to handle the next batch
412+
tempSeqMap = make(map[string][2]int64, 50)
413+
msgNum = 0
373414
}
415+
}
374416

417+
// Handle remaining messages to ensure all are synced
418+
if len(tempSeqMap) > 0 {
375419
resp, err := m.pullMsgBySeqRange(ctx, tempSeqMap, syncMsgNum)
376420
if err != nil {
377-
log.ZError(ctx, "syncMsgFromSvr err", err, "seqMap", seqMap)
421+
log.ZError(ctx, "syncMsgFromSvr error", err, "tempSeqMap", tempSeqMap)
378422
return err
379423
}
380424
_ = m.triggerConversation(ctx, resp.Msgs)
381425
_ = m.triggerNotification(ctx, resp.NotificationMsgs)
382-
for conversationID, seqs := range seqMap {
426+
for conversationID, seqs := range tempSeqMap {
383427
m.syncedMaxSeqs[conversationID] = seqs[1]
384428
}
385-
} else {
386-
log.ZDebug(ctx, "noting conversation to sync", "syncMsgNum", syncMsgNum)
387429
}
430+
388431
return nil
389432
}
390433

internal/util/post.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func ApiPost(ctx context.Context, api string, req, resp any) (err error) {
6666
defer func(start time.Time) {
6767
elapsed := time.Since(start).Milliseconds()
6868
if err == nil {
69-
log.ZDebug(ctx, "CallApi", "api", api, "state", "success", "cost time", fmt.Sprintf("%dms", elapsed))
69+
log.ZDebug(ctx, "CallApi", "duration", fmt.Sprintf("%dms", elapsed), "api", api, "state", "success")
7070
} else {
71-
log.ZError(ctx, "CallApi", err, "api", api, "state", "failed", "cost time", fmt.Sprintf("%dms", elapsed))
71+
log.ZError(ctx, "CallApi", err, "duration", fmt.Sprintf("%dms", elapsed), "api", api, "state", "failed")
7272
}
7373
}(time.Now())
7474

msgtest/module/msg_sender.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func newUserCtx(userID, token string, imConfig sdk_struct.IMConfig) context.Cont
129129
func NewUser(userID, token string, timeOffset int64, p *PressureTester, imConfig sdk_struct.IMConfig, opts ...func(core *SendMsgUser)) *SendMsgUser {
130130
pushMsgAndMaxSeqCh := make(chan common.Cmd2Value, 1000)
131131
ctx := newUserCtx(userID, token, imConfig)
132-
longConnMgr := interaction.NewLongConnMgr(ctx, &ConnListener{}, func(m map[string][]int32) {}, nil, pushMsgAndMaxSeqCh, nil)
132+
longConnMgr := interaction.NewLongConnMgr(ctx, &ConnListener{}, func(m map[string][]int32) {}, pushMsgAndMaxSeqCh, nil)
133133
core := &SendMsgUser{
134134
pushMsgAndMaxSeqCh: pushMsgAndMaxSeqCh,
135135
longConnMgr: longConnMgr,

0 commit comments

Comments
 (0)