2023-02-09 20:36:34 +08:00
package msgtransfer
2021-05-26 19:24:25 +08:00
import (
2023-03-22 18:35:21 +08:00
"context"
2023-05-10 15:21:08 +08:00
"strconv"
"strings"
2023-04-18 14:43:54 +08:00
"sync"
"time"
2023-05-10 17:18:04 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2023-03-21 12:28:21 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
2023-04-28 18:33:33 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2021-05-26 19:24:25 +08:00
"github.com/Shopify/sarama"
2023-05-04 15:54:04 +08:00
"github.com/go-redis/redis"
2023-05-10 20:27:39 +08:00
"google.golang.org/protobuf/proto"
2021-05-26 19:24:25 +08:00
)
2023-02-15 15:52:32 +08:00
const ConsumerMsgs = 3
2023-04-28 18:38:12 +08:00
const SourceMessages = 4
2023-02-15 15:52:32 +08:00
const MongoMessages = 5
const ChannelNum = 100
2022-05-19 12:25:46 +08:00
type MsgChannelValue struct {
2023-05-19 19:43:43 +08:00
uniqueKey string
ctx context . Context
ctxMsgList [ ] * ContextMsg
2022-05-20 14:42:49 +08:00
}
2023-02-15 15:52:32 +08:00
2022-05-20 14:42:49 +08:00
type TriggerChannelValue struct {
2023-03-22 18:35:21 +08:00
ctx context . Context
cMsgList [ ] * sarama . ConsumerMessage
2022-05-19 12:25:46 +08:00
}
2023-02-15 15:52:32 +08:00
2022-05-11 18:33:48 +08:00
type Cmd2Value struct {
2022-05-11 20:49:47 +08:00
Cmd int
2022-05-11 18:33:48 +08:00
Value interface { }
}
2023-03-22 18:35:21 +08:00
type ContextMsg struct {
2023-04-28 18:39:21 +08:00
message * sdkws . MsgData
2023-03-22 18:35:21 +08:00
ctx context . Context
}
2023-02-15 15:52:32 +08:00
2022-06-16 20:35:27 +08:00
type OnlineHistoryRedisConsumerHandler struct {
2023-02-15 15:52:32 +08:00
historyConsumerGroup * kafka . MConsumerGroup
2022-05-20 13:33:38 +08:00
chArrays [ ChannelNum ] chan Cmd2Value
2022-05-25 18:46:53 +08:00
msgDistributionCh chan Cmd2Value
2023-02-15 15:52:32 +08:00
singleMsgSuccessCount uint64
singleMsgFailedCount uint64
singleMsgSuccessCountMutex sync . Mutex
singleMsgFailedCountMutex sync . Mutex
2023-05-08 12:39:45 +08:00
msgDatabase controller . CommonMsgDatabase
2023-04-28 18:33:33 +08:00
conversationRpcClient * rpcclient . ConversationClient
groupRpcClient * rpcclient . GroupClient
2021-05-26 19:24:25 +08:00
}
2023-05-17 11:42:25 +08:00
func NewOnlineHistoryRedisConsumerHandler ( database controller . CommonMsgDatabase , conversationRpcClient * rpcclient . ConversationClient , groupRpcClient * rpcclient . GroupClient ) * OnlineHistoryRedisConsumerHandler {
2023-03-03 17:42:26 +08:00
var och OnlineHistoryRedisConsumerHandler
och . msgDatabase = database
2022-05-25 18:46:53 +08:00
och . msgDistributionCh = make ( chan Cmd2Value ) //no buffer channel
go och . MessagesDistributionHandle ( )
2022-05-20 15:51:37 +08:00
for i := 0 ; i < ChannelNum ; i ++ {
2022-05-25 22:42:44 +08:00
och . chArrays [ i ] = make ( chan Cmd2Value , 50 )
2022-05-20 15:51:37 +08:00
go och . Run ( i )
}
2023-04-28 18:33:33 +08:00
och . conversationRpcClient = conversationRpcClient
2023-05-17 11:42:25 +08:00
och . groupRpcClient = groupRpcClient
2023-02-15 15:52:32 +08:00
och . historyConsumerGroup = kafka . NewMConsumerGroup ( & kafka . MConsumerGroupConfig { KafkaVersion : sarama . V2_0_0_0 ,
2021-05-26 19:24:25 +08:00
OffsetsInitial : sarama . OffsetNewest , IsReturnErr : false } , [ ] string { config . Config . Kafka . Ws2mschat . Topic } ,
2022-06-16 20:35:27 +08:00
config . Config . Kafka . Ws2mschat . Addr , config . Config . Kafka . ConsumerGroupID . MsgToRedis )
2023-03-16 10:20:40 +08:00
//statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
2023-03-03 17:42:26 +08:00
return & och
2022-05-11 18:33:48 +08:00
}
2023-02-15 15:52:32 +08:00
2022-06-16 20:35:27 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) Run ( channelID int ) {
2022-05-19 12:25:46 +08:00
for {
select {
2022-05-20 13:33:38 +08:00
case cmd := <- och . chArrays [ channelID ] :
2022-05-19 12:25:46 +08:00
switch cmd . Cmd {
2023-04-28 18:38:12 +08:00
case SourceMessages :
2022-05-19 12:25:46 +08:00
msgChannelValue := cmd . Value . ( MsgChannelValue )
2023-03-22 18:35:21 +08:00
ctxMsgList := msgChannelValue . ctxMsgList
ctx := msgChannelValue . ctx
2023-05-19 20:17:50 +08:00
log . ZDebug ( ctx , "msg arrived channel" , "channel id" , channelID , "msgList length" , len ( ctxMsgList ) , "uniqueKey" , msgChannelValue . uniqueKey )
2023-05-16 18:37:14 +08:00
storageMsgList , notStorageMsgList , storageNotificationList , notStorageNotificationList , modifyMsgList := och . getPushStorageMsgList ( ctxMsgList )
2023-05-10 18:00:05 +08:00
log . ZDebug ( ctx , "msg lens" , "storageMsgList" , len ( storageMsgList ) , "notStorageMsgList" , len ( notStorageMsgList ) ,
"storageNotificationList" , len ( storageNotificationList ) , "notStorageNotificationList" , len ( notStorageNotificationList ) , "modifyMsgList" , len ( modifyMsgList ) )
2023-05-19 19:43:43 +08:00
conversationIDMsg := utils . GetChatConversationIDByMsg ( ctxMsgList [ 0 ] . message )
conversationIDNotification := utils . GetNotificationConversationID ( ctxMsgList [ 0 ] . message )
och . handleMsg ( ctx , msgChannelValue . uniqueKey , conversationIDMsg , storageMsgList , notStorageMsgList )
och . handleNotification ( ctx , msgChannelValue . uniqueKey , conversationIDNotification , storageNotificationList , notStorageNotificationList )
if err := och . msgDatabase . MsgToModifyMQ ( ctx , msgChannelValue . uniqueKey , conversationIDNotification , modifyMsgList ) ; err != nil {
log . ZError ( ctx , "msg to modify mq error" , err , "uniqueKey" , msgChannelValue . uniqueKey , "modifyMsgList" , modifyMsgList )
2022-05-19 12:25:46 +08:00
}
}
}
}
}
2022-07-26 15:52:38 +08:00
2023-04-26 18:57:41 +08:00
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
2023-05-16 18:37:14 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) getPushStorageMsgList ( totalMsgs [ ] * ContextMsg ) ( storageMsgList , notStorageMsgList , storageNotificatoinList , notStorageNotificationList , modifyMsgList [ ] * sdkws . MsgData ) {
2023-04-28 18:39:21 +08:00
isStorage := func ( msg * sdkws . MsgData ) bool {
options2 := utils . Options ( msg . Options )
2023-04-26 18:57:41 +08:00
if options2 . IsHistory ( ) {
return true
} else {
2023-05-04 15:06:23 +08:00
// if !(!options2.IsSenderSync() && conversationID == msg.MsgData.SendID) {
// return false
// }
return false
2023-04-26 18:57:41 +08:00
}
}
for _ , v := range totalMsgs {
2023-04-28 18:39:21 +08:00
options := utils . Options ( v . message . Options )
2023-05-10 17:48:26 +08:00
if ! options . IsNotNotification ( ) {
2023-05-17 18:44:55 +08:00
// clone msg from notificationMsg
2023-04-26 18:57:41 +08:00
if options . IsSendMsg ( ) {
2023-05-17 18:44:55 +08:00
msg := proto . Clone ( v . message ) . ( * sdkws . MsgData )
2023-04-26 18:57:41 +08:00
// 消息
2023-05-10 17:48:26 +08:00
if v . message . Options != nil {
2023-05-17 18:44:55 +08:00
msg . Options = utils . NewMsgOptions ( )
2023-05-10 17:48:26 +08:00
}
2023-05-17 18:44:55 +08:00
if options . IsOfflinePush ( ) {
v . message . Options = utils . WithOptions ( utils . Options ( v . message . Options ) , utils . WithOfflinePush ( false ) )
msg . Options = utils . WithOptions ( utils . Options ( msg . Options ) , utils . WithOfflinePush ( true ) )
}
if options . IsUnreadCount ( ) {
v . message . Options = utils . WithOptions ( utils . Options ( v . message . Options ) , utils . WithUnreadCount ( false ) )
msg . Options = utils . WithOptions ( utils . Options ( msg . Options ) , utils . WithUnreadCount ( true ) )
}
storageMsgList = append ( storageMsgList , msg )
2023-04-26 18:57:41 +08:00
}
2023-05-17 18:44:55 +08:00
if isStorage ( v . message ) {
storageNotificatoinList = append ( storageNotificatoinList , v . message )
2023-04-26 18:57:41 +08:00
} else {
2023-05-17 18:44:55 +08:00
notStorageNotificationList = append ( notStorageNotificationList , v . message )
2023-04-26 18:57:41 +08:00
}
} else {
if isStorage ( v . message ) {
storageMsgList = append ( storageMsgList , v . message )
} else {
notStorageMsgList = append ( notStorageMsgList , v . message )
}
}
2023-04-28 18:39:21 +08:00
if v . message . ContentType == constant . ReactionMessageModifier || v . message . ContentType == constant . ReactionMessageDeleter {
2023-04-26 18:57:41 +08:00
modifyMsgList = append ( modifyMsgList , v . message )
}
}
return
}
2023-05-19 19:43:43 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) handleNotification ( ctx context . Context , key , conversationID string , storageList , notStorageList [ ] * sdkws . MsgData ) {
och . toPushTopic ( ctx , key , conversationID , notStorageList )
2023-04-28 18:33:33 +08:00
if len ( storageList ) > 0 {
2023-05-08 12:39:45 +08:00
lastSeq , _ , err := och . msgDatabase . BatchInsertChat2Cache ( ctx , conversationID , storageList )
2023-04-28 18:33:33 +08:00
if err != nil {
2023-05-04 15:06:23 +08:00
log . ZError ( ctx , "notification batch insert to redis error" , err , "conversationID" , conversationID , "storageList" , storageList )
2023-04-28 18:33:33 +08:00
return
}
2023-05-16 18:37:14 +08:00
log . ZDebug ( ctx , "success to next topic" , "conversationID" , conversationID )
2023-05-19 19:43:43 +08:00
och . msgDatabase . MsgToMongoMQ ( ctx , key , conversationID , storageList , lastSeq )
och . toPushTopic ( ctx , key , conversationID , storageList )
2023-04-28 18:33:33 +08:00
}
2023-04-26 18:57:41 +08:00
}
2023-05-19 19:43:43 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) toPushTopic ( ctx context . Context , key , conversationID string , msgs [ ] * sdkws . MsgData ) {
2023-04-28 18:33:33 +08:00
for _ , v := range msgs {
2023-05-19 19:43:43 +08:00
och . msgDatabase . MsgToPushMQ ( ctx , key , conversationID , v )
2023-04-28 18:33:33 +08:00
}
2023-04-26 18:57:41 +08:00
}
2023-05-19 19:43:43 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) handleMsg ( ctx context . Context , key , conversationID string , storageList , notStorageList [ ] * sdkws . MsgData ) {
och . toPushTopic ( ctx , key , conversationID , notStorageList )
2023-04-26 18:57:41 +08:00
if len ( storageList ) > 0 {
2023-05-08 12:39:45 +08:00
lastSeq , isNewConversation , err := och . msgDatabase . BatchInsertChat2Cache ( ctx , conversationID , storageList )
2023-05-10 11:49:55 +08:00
if err != nil && errs . Unwrap ( err ) != redis . Nil {
2023-05-08 12:39:45 +08:00
log . ZError ( ctx , "batch data insert to redis err" , err , "storageMsgList" , storageList )
och . singleMsgFailedCountMutex . Lock ( )
och . singleMsgFailedCount += uint64 ( len ( storageList ) )
och . singleMsgFailedCountMutex . Unlock ( )
return
}
if isNewConversation {
2023-05-05 21:30:32 +08:00
if storageList [ 0 ] . SessionType == constant . SuperGroupChatType {
2023-05-04 15:06:23 +08:00
log . ZInfo ( ctx , "group chat first create conversation" , "conversationID" , conversationID )
2023-05-04 17:27:29 +08:00
userIDs , err := och . groupRpcClient . GetGroupMemberIDs ( ctx , storageList [ 0 ] . GroupID )
if err != nil {
2023-05-08 12:39:45 +08:00
log . ZWarn ( ctx , "get group member ids error" , err , "conversationID" , conversationID )
2023-05-04 17:27:29 +08:00
} else {
if err := och . conversationRpcClient . GroupChatFirstCreateConversation ( ctx , storageList [ 0 ] . GroupID , userIDs ) ; err != nil {
2023-05-08 12:39:45 +08:00
log . ZWarn ( ctx , "single chat first create conversation error" , err , "conversationID" , conversationID )
2023-05-04 17:27:29 +08:00
}
2023-04-28 18:33:33 +08:00
}
2023-05-05 21:30:32 +08:00
} else {
2023-05-04 17:27:29 +08:00
if err := och . conversationRpcClient . SingleChatFirstCreateConversation ( ctx , storageList [ 0 ] . RecvID , storageList [ 0 ] . SendID ) ; err != nil {
2023-05-08 12:39:45 +08:00
log . ZWarn ( ctx , "single chat first create conversation error" , err , "conversationID" , conversationID )
2023-04-28 18:33:33 +08:00
}
}
}
2023-05-08 12:39:45 +08:00
log . ZDebug ( ctx , "success incr to next topic" )
2023-04-28 18:33:33 +08:00
och . singleMsgSuccessCountMutex . Lock ( )
och . singleMsgSuccessCount += uint64 ( len ( storageList ) )
och . singleMsgSuccessCountMutex . Unlock ( )
2023-05-19 19:43:43 +08:00
och . msgDatabase . MsgToMongoMQ ( ctx , key , conversationID , storageList , lastSeq )
och . toPushTopic ( ctx , key , conversationID , storageList )
2023-04-26 18:57:41 +08:00
}
2023-04-28 18:33:33 +08:00
}
2022-06-16 20:35:27 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) MessagesDistributionHandle ( ) {
2022-05-25 18:46:53 +08:00
for {
2023-03-22 18:35:21 +08:00
aggregationMsgs := make ( map [ string ] [ ] * ContextMsg , ChannelNum )
2022-05-25 18:46:53 +08:00
select {
case cmd := <- och . msgDistributionCh :
switch cmd . Cmd {
case ConsumerMsgs :
triggerChannelValue := cmd . Value . ( TriggerChannelValue )
2023-03-22 18:35:21 +08:00
ctx := triggerChannelValue . ctx
2023-02-15 15:52:32 +08:00
consumerMessages := triggerChannelValue . cMsgList
2022-05-25 18:46:53 +08:00
//Aggregation map[userid]message list
2023-03-22 19:38:35 +08:00
log . ZDebug ( ctx , "batch messages come to distribution center" , "length" , len ( consumerMessages ) )
2022-05-25 18:46:53 +08:00
for i := 0 ; i < len ( consumerMessages ) ; i ++ {
2023-03-22 18:35:21 +08:00
ctxMsg := & ContextMsg { }
2023-05-19 19:50:52 +08:00
msgFromMQ := & sdkws . MsgData { }
err := proto . Unmarshal ( consumerMessages [ i ] . Value , msgFromMQ )
2022-05-25 18:46:53 +08:00
if err != nil {
2023-03-22 18:35:21 +08:00
log . ZError ( ctx , "msg_transfer Unmarshal msg err" , err , string ( consumerMessages [ i ] . Value ) )
2023-05-19 17:10:00 +08:00
continue
2022-05-25 18:46:53 +08:00
}
2023-05-10 15:21:08 +08:00
var arr [ ] string
for i , header := range consumerMessages [ i ] . Headers {
arr = append ( arr , strconv . Itoa ( i ) , string ( header . Key ) , string ( header . Value ) )
}
log . ZInfo ( ctx , "consumer.kafka.GetContextWithMQHeader" , "len" , len ( consumerMessages [ i ] . Headers ) , "header" , strings . Join ( arr , ", " ) )
2023-03-22 18:35:21 +08:00
ctxMsg . ctx = kafka . GetContextWithMQHeader ( consumerMessages [ i ] . Headers )
2023-05-19 19:50:52 +08:00
ctxMsg . message = msgFromMQ
2023-05-19 17:10:00 +08:00
log . ZDebug ( ctx , "single msg come to distribution center" , "message" , msgFromMQ , "key" , string ( consumerMessages [ i ] . Key ) )
2023-04-26 15:10:20 +08:00
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
2022-05-28 18:10:08 +08:00
if oldM , ok := aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] ; ok {
2023-03-22 18:35:21 +08:00
oldM = append ( oldM , ctxMsg )
2022-05-28 18:10:08 +08:00
aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = oldM
2022-05-25 18:46:53 +08:00
} else {
2023-03-22 18:35:21 +08:00
m := make ( [ ] * ContextMsg , 0 , 100 )
m = append ( m , ctxMsg )
2022-05-28 18:10:08 +08:00
aggregationMsgs [ string ( consumerMessages [ i ] . Key ) ] = m
2022-05-25 18:46:53 +08:00
}
}
2023-03-22 19:38:35 +08:00
log . ZDebug ( ctx , "generate map list users len" , "length" , len ( aggregationMsgs ) )
2023-05-19 19:43:43 +08:00
for uniqueKey , v := range aggregationMsgs {
2022-05-25 18:46:53 +08:00
if len ( v ) >= 0 {
2023-05-19 19:43:43 +08:00
hashCode := utils . GetHashCode ( uniqueKey )
2022-05-25 18:46:53 +08:00
channelID := hashCode % ChannelNum
2023-05-19 20:17:50 +08:00
newCtx := withAggregationCtx ( ctx , v )
log . ZDebug ( newCtx , "generate channelID" , "hashCode" , hashCode , "channelID" , channelID , "uniqueKey" , uniqueKey )
och . chArrays [ channelID ] <- Cmd2Value { Cmd : SourceMessages , Value : MsgChannelValue { uniqueKey : uniqueKey , ctxMsgList : v , ctx : newCtx } }
2022-05-25 18:46:53 +08:00
}
}
}
}
2022-05-19 12:25:46 +08:00
}
2021-05-26 19:24:25 +08:00
}
2023-05-19 17:10:00 +08:00
func withAggregationCtx ( ctx context . Context , values [ ] * ContextMsg ) context . Context {
var allMessageOperationID string
2023-05-19 20:17:50 +08:00
for i , v := range values {
2023-05-19 17:10:00 +08:00
if opid := mcontext . GetOperationID ( v . ctx ) ; opid != "" {
2023-05-19 20:17:50 +08:00
if i == 0 {
allMessageOperationID += opid
} else {
allMessageOperationID += "$" + opid
}
2023-05-19 17:10:00 +08:00
}
}
return mcontext . SetOperationID ( ctx , allMessageOperationID )
}
2021-05-26 19:24:25 +08:00
2023-03-22 19:39:20 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
func ( och * OnlineHistoryRedisConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error {
return nil
}
2022-05-20 13:33:38 +08:00
2023-02-15 15:52:32 +08:00
func ( och * OnlineHistoryRedisConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error { // a instance in the consumer group
2022-05-22 21:08:57 +08:00
for {
if sess == nil {
2023-04-18 14:43:54 +08:00
log . ZWarn ( context . Background ( ) , "sess == nil, waiting" , nil )
2022-05-22 21:08:57 +08:00
time . Sleep ( 100 * time . Millisecond )
} else {
break
}
2022-05-21 10:19:02 +08:00
}
2022-05-25 20:29:32 +08:00
rwLock := new ( sync . RWMutex )
2023-03-22 18:47:29 +08:00
log . ZDebug ( context . Background ( ) , "online new session msg come" , "highWaterMarkOffset" ,
claim . HighWaterMarkOffset ( ) , "topic" , claim . Topic ( ) , "partition" , claim . Partition ( ) )
2022-05-25 18:46:53 +08:00
cMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
2023-04-26 15:10:20 +08:00
t := time . NewTicker ( time . Millisecond * 100 )
2022-05-25 20:29:32 +08:00
go func ( ) {
2022-05-25 21:15:17 +08:00
for {
select {
case <- t . C :
if len ( cMsg ) > 0 {
rwLock . Lock ( )
ccMsg := make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
for _ , v := range cMsg {
ccMsg = append ( ccMsg , v )
}
cMsg = make ( [ ] * sarama . ConsumerMessage , 0 , 1000 )
rwLock . Unlock ( )
split := 1000
2023-05-19 17:10:00 +08:00
ctx := mcontext . WithTriggerIDContext ( context . Background ( ) , utils . OperationIDGenerator ( ) )
2023-03-22 19:38:35 +08:00
log . ZDebug ( ctx , "timer trigger msg consumer start" , "length" , len ( ccMsg ) )
2022-05-25 21:15:17 +08:00
for i := 0 ; i < len ( ccMsg ) / split ; i ++ {
//log.Debug()
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
2023-03-22 18:35:21 +08:00
ctx : ctx , cMsgList : ccMsg [ i * split : ( i + 1 ) * split ] } }
2022-05-25 21:15:17 +08:00
}
if ( len ( ccMsg ) % split ) > 0 {
och . msgDistributionCh <- Cmd2Value { Cmd : ConsumerMsgs , Value : TriggerChannelValue {
2023-03-22 18:35:21 +08:00
ctx : ctx , cMsgList : ccMsg [ split * ( len ( ccMsg ) / split ) : ] } }
2022-05-25 21:15:17 +08:00
}
2023-03-22 19:38:35 +08:00
log . ZDebug ( ctx , "timer trigger msg consumer end" , "length" , len ( ccMsg ) )
2022-05-25 21:03:48 +08:00
}
2022-05-25 18:46:53 +08:00
}
}
2022-05-25 20:29:32 +08:00
} ( )
2022-05-25 21:23:10 +08:00
for msg := range claim . Messages ( ) {
rwLock . Lock ( )
2022-06-08 17:11:17 +08:00
if len ( msg . Value ) != 0 {
cMsg = append ( cMsg , msg )
}
2022-05-25 21:23:10 +08:00
rwLock . Unlock ( )
sess . MarkMessage ( msg , "" )
}
2021-05-26 19:24:25 +08:00
return nil
}