Skip to content

Commit 3750b0f

Browse files
authored
feat: implement batchNewMessages method. (#687)
* feat: implement batchNewMessages method. * update batchnewMessages logic. * update logic. * update
1 parent b55cb90 commit 3750b0f

File tree

2 files changed

+66
-9
lines changed

2 files changed

+66
-9
lines changed

internal/conversation_msg/conversation_msg.go

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -222,30 +222,42 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
222222
//var unreadMessages []*model_struct.LocalConversationUnreadMessage
223223
var newMessages sdk_struct.NewMsgList
224224
// var reactionMsgModifierList, reactionMsgDeleterList sdk_struct.NewMsgList
225+
225226
var isUnreadCount, isConversationUpdate, isHistory, isNotPrivate, isSenderConversationUpdate bool
227+
226228
conversationChangedSet := make(map[string]*model_struct.LocalConversation)
227229
newConversationSet := make(map[string]*model_struct.LocalConversation)
228230
conversationSet := make(map[string]*model_struct.LocalConversation)
229231
phConversationChangedSet := make(map[string]*model_struct.LocalConversation)
230232
phNewConversationSet := make(map[string]*model_struct.LocalConversation)
233+
231234
log.ZDebug(ctx, "message come here conversation ch", "conversation length", len(allMsg))
232235
b := time.Now()
236+
233237
onlineMap := make(map[onlineMsgKey]struct{})
238+
234239
for conversationID, msgs := range allMsg {
235240
log.ZDebug(ctx, "parse message in one conversation", "conversationID",
236241
conversationID, "message length", len(msgs.Msgs))
237242
var insertMessage, selfInsertMessage, othersInsertMessage []*model_struct.LocalChatLog
238243
var updateMessage []*model_struct.LocalChatLog
244+
239245
for _, v := range msgs.Msgs {
240246
log.ZDebug(ctx, "parse message ", "conversationID", conversationID, "msg", v)
241247
isHistory = utils.GetSwitchFromOptions(v.Options, constant.IsHistory)
248+
242249
isUnreadCount = utils.GetSwitchFromOptions(v.Options, constant.IsUnreadCount)
250+
243251
isConversationUpdate = utils.GetSwitchFromOptions(v.Options, constant.IsConversationUpdate)
252+
244253
isNotPrivate = utils.GetSwitchFromOptions(v.Options, constant.IsNotPrivate)
254+
245255
isSenderConversationUpdate = utils.GetSwitchFromOptions(v.Options, constant.IsSenderConversationUpdate)
256+
246257
msg := &sdk_struct.MsgStruct{}
247258
copier.Copy(msg, v)
248259
msg.Content = string(v.Content)
260+
249261
var attachedInfo sdk_struct.AttachedInfoElem
250262
_ = utils.JsonStringToStruct(v.AttachedInfo, &attachedInfo)
251263
msg.AttachedInfoElem = &attachedInfo
@@ -255,7 +267,9 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
255267
insertMessage = append(insertMessage, c.msgStructToLocalChatLog(msg))
256268
continue
257269
}
270+
258271
msg.Status = constant.MsgStatusSendSuccess
272+
259273
//De-analyze data
260274
err := c.msgHandleByContentType(msg)
261275
if err != nil {
@@ -367,17 +381,21 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
367381

368382
}
369383
}
384+
385+
//todo The lock granularity needs to be optimized to the conversation level.
370386
c.conversationSyncMutex.Lock()
371387
defer c.conversationSyncMutex.Unlock()
372-
//todo The lock granularity needs to be optimized to the conversation level.
388+
373389
list, err := c.db.GetAllConversationListDB(ctx)
374390
if err != nil {
375391
log.ZError(ctx, "GetAllConversationListDB", err)
376392
}
393+
377394
m := make(map[string]*model_struct.LocalConversation)
378395
listToMap(list, m)
379396
log.ZDebug(ctx, "listToMap: ", "local conversation", list, "generated c map",
380397
string(stringutil.StructToJsonBytes(conversationSet)))
398+
381399
c.diff(ctx, m, conversationSet, conversationChangedSet, newConversationSet)
382400
log.ZInfo(ctx, "trigger map is :", "newConversations", string(stringutil.StructToJsonBytes(newConversationSet)),
383401
"changedConversations", string(stringutil.StructToJsonBytes(conversationChangedSet)))
@@ -429,7 +447,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
429447
log.ZDebug(ctx, "before trigger msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
430448

431449
if c.batchMsgListener() != nil {
432-
c.batchNewMessages(ctx, newMessages)
450+
c.batchNewMessages(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
433451
} else {
434452
c.newMessage(ctx, newMessages, conversationChangedSet, newConversationSet, onlineMap)
435453
}
@@ -451,6 +469,7 @@ func (c *Conversation) doMsgNew(c2v common.Cmd2Value) {
451469
}
452470
}
453471
}
472+
454473
log.ZDebug(ctx, "insert msg", "cost time", time.Since(b).Seconds(), "len", len(allMsg))
455474
}
456475

@@ -886,15 +905,52 @@ func (c *Conversation) newMessage(ctx context.Context, newMessagesList sdk_struc
886905
}
887906
}
888907

889-
func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk_struct.NewMsgList) {
890-
sort.Sort(newMessagesList)
891-
if len(newMessagesList) > 0 {
892-
c.batchMsgListener().OnRecvNewMessages(utils.StructToJsonString(newMessagesList))
893-
//if c.IsBackground {
894-
// c.batchMsgListener.OnRecvOfflineNewMessages(utils.StructToJsonString(newMessagesList))
895-
//}
908+
func (c *Conversation) batchNewMessages(ctx context.Context, newMessagesList sdk_struct.NewMsgList, conversationChanged, newConversation map[string]*model_struct.LocalConversation, onlineMsg map[onlineMsgKey]struct{}) {
909+
if len(newMessagesList) == 0 {
910+
log.ZWarn(ctx, "newMessagesList is empty", errs.New("newMessagesList is empty"))
911+
return
896912
}
897913

914+
sort.Sort(newMessagesList)
915+
var needNotificationMsgList sdk_struct.NewMsgList
916+
917+
// offline
918+
if c.GetBackground() {
919+
u, err := c.user.GetSelfUserInfo(ctx)
920+
if err != nil {
921+
log.ZWarn(ctx, "GetSelfUserInfo err", err)
922+
}
923+
924+
if u.GlobalRecvMsgOpt != constant.ReceiveMessage {
925+
return
926+
}
927+
928+
for _, w := range newMessagesList {
929+
conversationID := utils.GetConversationIDByMsg(w)
930+
if v, ok := conversationChanged[conversationID]; ok && v.RecvMsgOpt == constant.ReceiveMessage {
931+
needNotificationMsgList = append(needNotificationMsgList, w)
932+
}
933+
if v, ok := newConversation[conversationID]; ok && v.RecvMsgOpt == constant.ReceiveMessage {
934+
needNotificationMsgList = append(needNotificationMsgList, w)
935+
}
936+
}
937+
938+
if len(needNotificationMsgList) != 0 {
939+
c.batchMsgListener().OnRecvOfflineNewMessages(utils.StructToJsonString(needNotificationMsgList))
940+
}
941+
} else { // online
942+
for _, w := range newMessagesList {
943+
if w.ContentType == constant.Typing {
944+
continue
945+
}
946+
947+
needNotificationMsgList = append(needNotificationMsgList, w)
948+
}
949+
950+
if len(needNotificationMsgList) != 0 {
951+
c.batchMsgListener().OnRecvNewMessages(utils.StructToJsonString(needNotificationMsgList))
952+
}
953+
}
898954
}
899955

900956
func (c *Conversation) msgConvert(msg *sdk_struct.MsgStruct) (err error) {

open_im_sdk/em.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package open_im_sdk
1717
import (
1818
"context"
1919
"errors"
20+
2021
"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
2122
"github.com/openimsdk/tools/log"
2223
)

0 commit comments

Comments
 (0)