remove extendMsg code

This commit is contained in:
wangchuxiao
2023-07-10 17:07:08 +08:00
parent 4b8af3be2e
commit bcf2f0f443
20 changed files with 710 additions and 4227 deletions
+5 -9
View File
@@ -10,7 +10,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
@@ -25,7 +24,7 @@ type MsgTransfer struct {
persistentCH *PersistentConsumerHandler // 聊天记录持久化到mysql的消费者 订阅的topic: ws2ms_chat
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
// modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
}
func StartTransfer(prometheusPort int) error {
@@ -59,23 +58,20 @@ func StartTransfer(prometheusPort int) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
msgModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient)
msgTransfer := NewMsgTransfer(chatLogDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient)
msgTransfer.initPrometheus()
return msgTransfer.Start(prometheusPort)
}
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase,
msgDatabase controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer {
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase)}
}
func (m *MsgTransfer) initPrometheus() {
@@ -100,7 +96,7 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
}
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
err := prome.StartPrometheusSrv(prometheusPort)
if err != nil {
return err
-113
View File
@@ -1,113 +0,0 @@
package msgtransfer
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
)
type ModifyMsgConsumerHandler struct {
modifyMsgConsumerGroup *kfk.MConsumerGroup
extendMsgDatabase controller.ExtendMsgDatabase
extendSetMsgModel unRelationTb.ExtendMsgSetModel
}
func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyMsgConsumerHandler {
return &ModifyMsgConsumerHandler{
modifyMsgConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify),
extendMsgDatabase: database,
}
}
func (ModifyMsgConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (ModifyMsgConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
log.ZDebug(ctx, "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
} else {
log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key)
}
sess.MarkMessage(msg, "")
}
return nil
}
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msgFromMQ := pbMsg.MsgDataToModifyByMQ{}
operationID := mcontext.GetOperationID(ctx)
err := proto.Unmarshal(cMsg.Value, &msgFromMQ)
if err != nil {
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, "msg", string(cMsg.Value))
return
}
log.ZDebug(ctx, "proto.Unmarshal MsgDataToMQ", "msgs", msgFromMQ.String())
for _, msg := range msgFromMQ.Messages {
isReactionFromCache := utils.GetSwitchFromOptions(msg.Options, constant.IsReactionFromCache)
if !isReactionFromCache {
continue
}
ctx = mcontext.SetOperationID(ctx, operationID)
if msg.ContentType == constant.ReactionMessageModifier {
notification := &sdkws.ReactionMessageModifierNotification{}
if err := json.Unmarshal(msg.Content, notification); err != nil {
continue
}
if notification.IsExternalExtensions {
continue
}
if !notification.IsReact {
// first time to modify
var reactionExtensionList = make(map[string]unRelationTb.KeyValueModel)
extendMsg := unRelationTb.ExtendMsgModel{
ReactionExtensionList: reactionExtensionList,
ClientMsgID: notification.ClientMsgID,
MsgFirstModifyTime: notification.MsgFirstModifyTime,
}
for _, v := range notification.SuccessReactionExtensions {
reactionExtensionList[v.TypeKey] = unRelationTb.KeyValueModel{
TypeKey: v.TypeKey,
Value: v.Value,
LatestUpdateTime: v.LatestUpdateTime,
}
}
if err := mmc.extendMsgDatabase.InsertExtendMsg(ctx, notification.ConversationID, notification.SessionType, &extendMsg); err != nil {
// log.ZError(ctx, "MsgFirstModify InsertExtendMsg failed", notification.ConversationID, notification.SessionType, extendMsg, err.Error())
continue
}
} else {
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
} else if msg.ContentType == constant.ReactionMessageDeleter {
notification := &sdkws.ReactionMessageDeleteNotification{}
if err := json.Unmarshal(msg.Content, notification); err != nil {
continue
}
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
}
}
}
}