2023-07-04 11:15:20 +08:00
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2023-06-29 19:51:11 +08:00
package msgtransfer
import (
"context"
2024-09-10 10:26:02 +08:00
2023-09-07 17:38:09 +08:00
"github.com/IBM/sarama"
2023-09-07 19:04:36 +08:00
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
2023-11-10 19:37:25 +08:00
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
2024-05-27 11:58:36 +08:00
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
2024-04-19 22:23:08 +08:00
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mq/kafka"
2024-03-05 10:51:55 +08:00
"google.golang.org/protobuf/proto"
2023-06-29 19:51:11 +08:00
)
type OnlineHistoryMongoConsumerHandler struct {
2024-04-19 22:23:08 +08:00
historyConsumerGroup * kafka . MConsumerGroup
2024-09-10 10:26:02 +08:00
msgTransferDatabase controller . MsgTransferDatabase
2023-06-29 19:51:11 +08:00
}
2024-09-10 10:26:02 +08:00
func NewOnlineHistoryMongoConsumerHandler ( kafkaConf * config . Kafka , database controller . MsgTransferDatabase ) ( * OnlineHistoryMongoConsumerHandler , error ) {
2024-05-14 18:21:36 +08:00
historyConsumerGroup , err := kafka . NewMConsumerGroup ( kafkaConf . Build ( ) , kafkaConf . ToMongoGroupID , [ ] string { kafkaConf . ToMongoTopic } , true )
2024-02-02 10:11:13 +08:00
if err != nil {
return nil , err
}
2023-06-29 19:51:11 +08:00
mc := & OnlineHistoryMongoConsumerHandler {
2024-02-02 10:11:13 +08:00
historyConsumerGroup : historyConsumerGroup ,
2024-09-10 10:26:02 +08:00
msgTransferDatabase : database ,
2023-06-29 19:51:11 +08:00
}
2024-02-02 10:11:13 +08:00
return mc , nil
2023-06-29 19:51:11 +08:00
}
2024-04-19 22:23:08 +08:00
func ( mc * OnlineHistoryMongoConsumerHandler ) handleChatWs2Mongo ( ctx context . Context , cMsg * sarama . ConsumerMessage , key string , session sarama . ConsumerGroupSession ) {
2023-06-29 19:51:11 +08:00
msg := cMsg . Value
2023-08-23 16:45:52 +08:00
msgFromMQ := pbmsg . MsgDataToMongoByMQ { }
2023-06-29 19:51:11 +08:00
err := proto . Unmarshal ( msg , & msgFromMQ )
if err != nil {
log . ZError ( ctx , "unmarshall failed" , err , "key" , key , "len" , len ( msg ) )
return
}
if len ( msgFromMQ . MsgData ) == 0 {
log . ZError ( ctx , "msgFromMQ.MsgData is empty" , nil , "cMsg" , cMsg )
return
}
2024-09-12 10:38:17 +08:00
log . ZDebug ( ctx , "mongo consumer recv msg" , "msgs" , msgFromMQ . String ( ) )
2024-09-10 10:26:02 +08:00
err = mc . msgTransferDatabase . BatchInsertChat2DB ( ctx , msgFromMQ . ConversationID , msgFromMQ . MsgData , msgFromMQ . LastSeq )
2023-06-29 19:51:11 +08:00
if err != nil {
2023-07-03 16:29:22 +08:00
log . ZError (
ctx ,
"single data insert to mongo err" ,
err ,
"msg" ,
msgFromMQ . MsgData ,
"conversationID" ,
msgFromMQ . ConversationID ,
)
2023-11-10 19:37:25 +08:00
prommetrics . MsgInsertMongoFailedCounter . Inc ( )
2023-11-07 14:36:56 +08:00
} else {
2023-11-10 19:37:25 +08:00
prommetrics . MsgInsertMongoSuccessCounter . Inc ( )
2023-06-29 19:51:11 +08:00
}
2025-02-12 18:24:15 +08:00
//var seqs []int64
//for _, msg := range msgFromMQ.MsgData {
// seqs = append(seqs, msg.Seq)
//}
//if err := mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs); err != nil {
// log.ZError(ctx, "remove cache msg from redis err", err, "msg",
// msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
//}
2023-06-29 19:51:11 +08:00
}
2024-12-26 17:49:05 +08:00
func ( * OnlineHistoryMongoConsumerHandler ) Setup ( _ sarama . ConsumerGroupSession ) error { return nil }
2024-08-21 11:44:00 +08:00
func ( * OnlineHistoryMongoConsumerHandler ) Cleanup ( _ sarama . ConsumerGroupSession ) error { return nil }
2023-07-03 16:29:22 +08:00
2024-12-26 17:49:05 +08:00
func ( mc * OnlineHistoryMongoConsumerHandler ) ConsumeClaim ( sess sarama . ConsumerGroupSession , claim sarama . ConsumerGroupClaim ) error { // an instance in the consumer group
2023-06-29 19:51:11 +08:00
log . ZDebug ( context . Background ( ) , "online new session msg come" , "highWaterMarkOffset" ,
claim . HighWaterMarkOffset ( ) , "topic" , claim . Topic ( ) , "partition" , claim . Partition ( ) )
for msg := range claim . Messages ( ) {
ctx := mc . historyConsumerGroup . GetContextFromMsg ( msg )
if len ( msg . Value ) != 0 {
mc . handleChatWs2Mongo ( ctx , msg , string ( msg . Key ) , sess )
} else {
log . ZError ( ctx , "mongo msg get from kafka but is nil" , nil , "conversationID" , msg . Key )
}
sess . MarkMessage ( msg , "" )
}
return nil
}