Skip to content

Commit 80b3acc

Browse files
authored
fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)
* fix(consume): subMsgs should be used instead of msgs in consuming goroutine
1 parent 09c1624 commit 80b3acc

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

consumer/push_consumer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
913913
Properties: make(map[string]string),
914914
ConsumerGroup: pc.consumerGroup,
915915
MQ: mq,
916-
Msgs: msgs,
916+
Msgs: subMsgs,
917917
}
918918
ctx := context.Background()
919919
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -944,14 +944,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
944944
} else {
945945
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
946946
if pc.model == BroadCasting {
947-
for i := 0; i < len(msgs); i++ {
947+
for i := 0; i < len(subMsgs); i++ {
948948
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
949949
"message": subMsgs[i],
950950
})
951951
}
952952
} else {
953-
for i := 0; i < len(msgs); i++ {
954-
msg := msgs[i]
953+
for i := 0; i < len(subMsgs); i++ {
954+
msg := subMsgs[i]
955955
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
956956
msg.ReconsumeTimes += 1
957957
msgBackFailed = append(msgBackFailed, msg)
@@ -973,7 +973,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
973973
} else {
974974
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
975975
rlog.LogKeyMessageQueue: mq,
976-
"message": msgs,
976+
"message": subMsgs,
977977
})
978978
}
979979
})

0 commit comments

Comments
 (0)