From f0c6295cff66d4b5f2122d2962f08708f3743e7d Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 13:34:34 +0800 Subject: [PATCH 1/7] batch to mongo --- pkg/common/db/batch_insert_chat.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/common/db/batch_insert_chat.go b/pkg/common/db/batch_insert_chat.go index e327ba7b5..b3211e701 100644 --- a/pkg/common/db/batch_insert_chat.go +++ b/pkg/common/db/batch_insert_chat.go @@ -18,16 +18,17 @@ func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, if len(msgList) > GetSingleGocMsgNum() { return errors.New("too large") } + isInit := false currentMaxSeq, err := d.GetUserMaxSeq(userID) if err == nil { } else if err == redis.ErrNil { + isInit = true currentMaxSeq = 0 } else { return utils.Wrap(err, "") } - //4999 remain := uint64(GetSingleGocMsgNum()) - (currentMaxSeq % uint64(GetSingleGocMsgNum())) insertCounter := uint64(0) msgListToMongo := make([]MsgInfo, 0) @@ -43,6 +44,13 @@ func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ, if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil { return utils.Wrap(err, "") } + if isInit { + msgListToMongoNext = append(msgListToMongoNext, sMsg) + seqUidNext = getSeqUid(userID, uint32(currentMaxSeq)) + log.Debug(operationID, "msgListToMongoNext ", seqUidNext, m.MsgData.Seq, m.MsgData.ClientMsgID, insertCounter, remain) + continue + } + if insertCounter < remain { msgListToMongo = append(msgListToMongo, sMsg) insertCounter++ From 6f79e63b3fc98e59d4bf80630297448b4f1f5f95 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:06:31 +0800 Subject: [PATCH 2/7] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 248e55bd6..6bd344737 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -378,18 +378,21 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS cMsg := make([]*sarama.ConsumerMessage, 500) t := time.NewTicker(time.Duration(500) * time.Millisecond) for msg := range claim.Messages() { + operationID := utils.OperationIDGenerator() //och.TriggerCmd(OnlineTopicBusy) cMsg = append(cMsg, msg) select { case <-t.C: if len(cMsg) >= 0 { och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + log.Debug(operationID, "timer send to msgDistributionCh", och.msgDistributionCh, "len: ", len(cMsg)) sess.MarkMessage(msg, "") cMsg = cMsg[0:0] } default: if len(cMsg) >= 500 { och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + log.Debug(operationID, "500 send to msgDistributionCh", och.msgDistributionCh, "len: ", len(cMsg)) sess.MarkMessage(msg, "") cMsg = cMsg[0:0] } From 3bf04802e4b1ae6e259e88edbbfc756efc141b68 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:06:51 +0800 Subject: [PATCH 3/7] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 6bd344737..5e1e5f935 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -376,7 +376,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) cMsg := make([]*sarama.ConsumerMessage, 500) - t := time.NewTicker(time.Duration(500) * time.Millisecond) + t := time.NewTicker(time.Duration(10) * time.Millisecond) for msg := range claim.Messages() { operationID := utils.OperationIDGenerator() //och.TriggerCmd(OnlineTopicBusy) From 06aae24dada45bb0dfefeab884cdd81ce7f3640e Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:12:29 +0800 Subject: [PATCH 4/7] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 5e1e5f935..b6e717af6 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -205,19 +205,23 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for { + operationID := utils.OperationIDGenerator() select { + case cmd := <-och.msgDistributionCh: switch cmd.Cmd { case ConsumerMsgs: consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) //Aggregation map[userid]message list for i := 0; i < len(consumerMessages); i++ { + msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) + log.Error(operationID, "msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) return } + log.Debug(operationID, "MessagesDistributionHandle ", msgFromMQ.String()) if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM @@ -230,6 +234,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for userID, v := range och.UserAggregationMsgs { if len(v) >= 0 { channelID := getHashCode(userID) % ChannelNum + log.Debug(operationID, "UserAggregationMsgs ", len(v), channelID, userID) go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} }(channelID, userID, v) From b632a991b9b8a588cef2bd4de4a05425c05ede21 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:18:54 +0800 Subject: [PATCH 5/7] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index b6e717af6..b13f02631 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -238,6 +238,8 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} }(channelID, userID, v) + } else { + log.NewWarn(operationID, "UserAggregationMsgs ", len(v), userID) } } } From a14fd303fcdbd904b8459f22527e8532aae6c488 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:32:09 +0800 Subject: [PATCH 6/7] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index b13f02631..7105c08c2 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -213,6 +213,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { case ConsumerMsgs: consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) //Aggregation map[userid]message list + log.Debug(operationID, "consumerMessages len ", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { msgFromMQ := pbMsg.MsgDataToMQ{} From a7be7a3e7a6938ea29064370db7baa2d2e3c93c1 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Fri, 20 May 2022 14:38:10 +0800 Subject: [PATCH 7/7] log --- internal/msg_transfer/logic/online_history_msg_handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 7105c08c2..2a84e5a63 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -226,12 +226,15 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + log.Debug(operationID, "consumerMessages key ", string(consumerMessages[i].Key), oldM) } else { m := make([]*pbMsg.MsgDataToMQ, 100) m = append(m, &msgFromMQ) och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m + log.Debug(operationID, "consumerMessages key ", string(consumerMessages[i].Key), m) } } + log.Debug(operationID, "UserAggregationMsgs len ", len(och.UserAggregationMsgs)) for userID, v := range och.UserAggregationMsgs { if len(v) >= 0 { channelID := getHashCode(userID) % ChannelNum