mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-09 19:45:58 +08:00
Merge remote-tracking branch 'origin/v3dev' into v3dev
This commit is contained in:
@@ -148,22 +148,6 @@ func (m *MessageApi) DeleteMsgPhysical(c *gin.Context) {
|
||||
a2r.Call(msg.MsgClient.DeleteMsgPhysical, m.Client, c)
|
||||
}
|
||||
|
||||
func (m *MessageApi) SetMessageReactionExtensions(c *gin.Context) {
|
||||
a2r.Call(msg.MsgClient.SetMessageReactionExtensions, m.Client, c)
|
||||
}
|
||||
|
||||
func (m *MessageApi) GetMessageListReactionExtensions(c *gin.Context) {
|
||||
a2r.Call(msg.MsgClient.GetMessagesReactionExtensions, m.Client, c)
|
||||
}
|
||||
|
||||
func (m *MessageApi) AddMessageReactionExtensions(c *gin.Context) {
|
||||
a2r.Call(msg.MsgClient.AddMessageReactionExtensions, m.Client, c)
|
||||
}
|
||||
|
||||
func (m *MessageApi) DeleteMessageReactionExtensions(c *gin.Context) {
|
||||
a2r.Call(msg.MsgClient.DeleteMessageReactionExtensions, m.Client, c)
|
||||
}
|
||||
|
||||
func (m *MessageApi) SendMessage(c *gin.Context) {
|
||||
params := apistruct.ManagementSendMsgReq{}
|
||||
if err := c.BindJSON(¶ms); err != nil {
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
|
||||
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw"
|
||||
@@ -25,7 +24,7 @@ type MsgTransfer struct {
|
||||
persistentCH *PersistentConsumerHandler // 聊天记录持久化到mysql的消费者 订阅的topic: ws2ms_chat
|
||||
historyCH *OnlineHistoryRedisConsumerHandler // 这个消费者聚合消息, 订阅的topic:ws2ms_chat, 修改通知发往msg_to_modify topic, 消息存入redis后Incr Redis, 再发消息到ms2pschat topic推送, 发消息到msg_to_mongo topic持久化
|
||||
historyMongoCH *OnlineHistoryMongoConsumerHandler // mongoDB批量插入, 成功后删除redis中消息,以及处理删除通知消息删除的 订阅的topic: msg_to_mongo
|
||||
modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
|
||||
// modifyCH *ModifyMsgConsumerHandler // 负责消费修改消息通知的consumer, 订阅的topic: msg_to_modify
|
||||
}
|
||||
|
||||
func StartTransfer(prometheusPort int) error {
|
||||
@@ -59,23 +58,20 @@ func StartTransfer(prometheusPort int) error {
|
||||
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
msgModel := cache.NewMsgCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
|
||||
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
|
||||
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
|
||||
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
|
||||
groupRpcClient := rpcclient.NewGroupRpcClient(client)
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient)
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, msgDatabase, &conversationRpcClient, &groupRpcClient)
|
||||
msgTransfer.initPrometheus()
|
||||
return msgTransfer.Start(prometheusPort)
|
||||
}
|
||||
|
||||
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
|
||||
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase,
|
||||
msgDatabase controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer {
|
||||
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
|
||||
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
|
||||
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase)}
|
||||
}
|
||||
|
||||
func (m *MsgTransfer) initPrometheus() {
|
||||
@@ -100,7 +96,7 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
|
||||
}
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
|
||||
go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
|
||||
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
|
||||
err := prome.StartPrometheusSrv(prometheusPort)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -1,113 +0,0 @@
|
||||
package msgtransfer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
|
||||
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"github.com/Shopify/sarama"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type ModifyMsgConsumerHandler struct {
|
||||
modifyMsgConsumerGroup *kfk.MConsumerGroup
|
||||
|
||||
extendMsgDatabase controller.ExtendMsgDatabase
|
||||
extendSetMsgModel unRelationTb.ExtendMsgSetModel
|
||||
}
|
||||
|
||||
func NewModifyMsgConsumerHandler(database controller.ExtendMsgDatabase) *ModifyMsgConsumerHandler {
|
||||
return &ModifyMsgConsumerHandler{
|
||||
modifyMsgConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToModify.Topic},
|
||||
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToModify),
|
||||
extendMsgDatabase: database,
|
||||
}
|
||||
}
|
||||
|
||||
func (ModifyMsgConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (ModifyMsgConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||
func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
|
||||
claim sarama.ConsumerGroupClaim) error {
|
||||
for msg := range claim.Messages() {
|
||||
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
|
||||
log.ZDebug(ctx, "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
|
||||
if len(msg.Value) != 0 {
|
||||
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
|
||||
} else {
|
||||
log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key)
|
||||
}
|
||||
sess.MarkMessage(msg, "")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
|
||||
msgFromMQ := pbMsg.MsgDataToModifyByMQ{}
|
||||
operationID := mcontext.GetOperationID(ctx)
|
||||
err := proto.Unmarshal(cMsg.Value, &msgFromMQ)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "msg_transfer Unmarshal msg err", err, "msg", string(cMsg.Value))
|
||||
return
|
||||
}
|
||||
log.ZDebug(ctx, "proto.Unmarshal MsgDataToMQ", "msgs", msgFromMQ.String())
|
||||
for _, msg := range msgFromMQ.Messages {
|
||||
isReactionFromCache := utils.GetSwitchFromOptions(msg.Options, constant.IsReactionFromCache)
|
||||
if !isReactionFromCache {
|
||||
continue
|
||||
}
|
||||
ctx = mcontext.SetOperationID(ctx, operationID)
|
||||
if msg.ContentType == constant.ReactionMessageModifier {
|
||||
notification := &sdkws.ReactionMessageModifierNotification{}
|
||||
if err := json.Unmarshal(msg.Content, notification); err != nil {
|
||||
continue
|
||||
}
|
||||
if notification.IsExternalExtensions {
|
||||
continue
|
||||
}
|
||||
if !notification.IsReact {
|
||||
// first time to modify
|
||||
var reactionExtensionList = make(map[string]unRelationTb.KeyValueModel)
|
||||
extendMsg := unRelationTb.ExtendMsgModel{
|
||||
ReactionExtensionList: reactionExtensionList,
|
||||
ClientMsgID: notification.ClientMsgID,
|
||||
MsgFirstModifyTime: notification.MsgFirstModifyTime,
|
||||
}
|
||||
for _, v := range notification.SuccessReactionExtensions {
|
||||
reactionExtensionList[v.TypeKey] = unRelationTb.KeyValueModel{
|
||||
TypeKey: v.TypeKey,
|
||||
Value: v.Value,
|
||||
LatestUpdateTime: v.LatestUpdateTime,
|
||||
}
|
||||
}
|
||||
|
||||
if err := mmc.extendMsgDatabase.InsertExtendMsg(ctx, notification.ConversationID, notification.SessionType, &extendMsg); err != nil {
|
||||
// log.ZError(ctx, "MsgFirstModify InsertExtendMsg failed", notification.ConversationID, notification.SessionType, extendMsg, err.Error())
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if err := mmc.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
|
||||
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
}
|
||||
}
|
||||
} else if msg.ContentType == constant.ReactionMessageDeleter {
|
||||
notification := &sdkws.ReactionMessageDeleteNotification{}
|
||||
if err := json.Unmarshal(msg.Content, notification); err != nil {
|
||||
continue
|
||||
}
|
||||
if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil {
|
||||
// log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,359 +0,0 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
)
|
||||
|
||||
func (m *msgServer) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
|
||||
resp = &msg.SetMessageReactionExtensionsResp{}
|
||||
resp.ClientMsgID = req.ClientMsgID
|
||||
resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
if err := callbackSetMessageReactionExtensions(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if req.IsExternalExtensions {
|
||||
resp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.conversationID, req.SessionType, req, &resp, !req.IsReact, false)
|
||||
return resp, nil
|
||||
}
|
||||
// isExists, err := m.MsgDatabase.JudgeMessageReactionExist(ctx, req.ClientMsgID, req.SessionType)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// if !isExists {
|
||||
// if !req.IsReact {
|
||||
// resp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
|
||||
// for k, v := range req.ReactionExtensions {
|
||||
// err := m.MessageLocker.LockMessageTypeKey(ctx, req.ClientMsgID, k)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// if err := m.MsgDatabase.SetMessageTypeKeyValue(ctx, req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v)); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// }
|
||||
// resp.IsReact = true
|
||||
// _, err := m.MsgDatabase.SetMessageReactionExpire(ctx, req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// } else {
|
||||
// err := m.MessageLocker.LockGlobalMessage(ctx, req.ClientMsgID)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// mongoValue, err := m.MsgDatabase.GetExtendMsg(ctx, req.conversationID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// setValue := make(map[string]*sdkws.KeyValue)
|
||||
// for k, v := range req.ReactionExtensions {
|
||||
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// if vv, ok := mongoValue.ReactionExtensions[k]; ok {
|
||||
// utils.CopyStructFields(temp, &vv)
|
||||
// if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
// setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
// continue
|
||||
// }
|
||||
// }
|
||||
// temp.TypeKey = k
|
||||
// temp.Value = v.Value
|
||||
// temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// setValue[k] = temp
|
||||
// }
|
||||
// err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.conversationID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
||||
// if err != nil {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// resp.Result = append(resp.Result, temp)
|
||||
// }
|
||||
// } else {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// resp.Result = append(resp.Result, temp)
|
||||
// }
|
||||
// }
|
||||
// lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||
// if lockErr != nil {
|
||||
// log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
||||
// }
|
||||
// }
|
||||
|
||||
// } else {
|
||||
// log.Debug(req.OperationID, "redis handle secondly", req.String())
|
||||
|
||||
// for k, v := range req.Pb2Model {
|
||||
// err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
|
||||
// if err != nil {
|
||||
// setKeyResultInfo(&resp, 100, err.Error(), req.ClientMsgID, k, v)
|
||||
// continue
|
||||
// }
|
||||
// redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
|
||||
// if err != nil && err != go_redis.Nil {
|
||||
// setKeyResultInfo(&resp, 200, err.Error(), req.ClientMsgID, k, v)
|
||||
// continue
|
||||
// }
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// utils.JsonStringToStruct(redisValue, temp)
|
||||
// if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
// setKeyResultInfo(&resp, 300, "message have update", req.ClientMsgID, k, temp)
|
||||
// continue
|
||||
// } else {
|
||||
// v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
|
||||
// newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
|
||||
// if newerr != nil {
|
||||
// setKeyResultInfo(&resp, 201, newerr.Error(), req.ClientMsgID, k, temp)
|
||||
// continue
|
||||
// }
|
||||
// setKeyResultInfo(&resp, 0, "", req.ClientMsgID, k, v)
|
||||
// }
|
||||
|
||||
// }
|
||||
// }
|
||||
// if !isExists {
|
||||
// if !req.IsReact {
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.conversationID, req.SessionType, req, &resp, true, true)
|
||||
// } else {
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.conversationID, req.SessionType, req, &resp, false, false)
|
||||
// }
|
||||
// } else {
|
||||
// notification.ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.conversationID, req.SessionType, req, &resp, false, true)
|
||||
// }
|
||||
// log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", resp.String())
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
func (m *msgServer) setKeyResultInfo(ctx context.Context, r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = keyValue
|
||||
temp.ErrCode = errCode
|
||||
temp.ErrMsg = errMsg
|
||||
r.Result = append(r.Result, temp)
|
||||
_ = m.MessageLocker.UnLockMessageTypeKey(ctx, clientMsgID, typeKey)
|
||||
}
|
||||
func (m *msgServer) setDeleteKeyResultInfo(ctx context.Context, r *msg.DeleteMessagesReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *sdkws.KeyValue) {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = keyValue
|
||||
temp.ErrCode = errCode
|
||||
temp.ErrMsg = errMsg
|
||||
r.Result = append(r.Result, temp)
|
||||
_ = m.MessageLocker.UnLockMessageTypeKey(ctx, clientMsgID, typeKey)
|
||||
}
|
||||
|
||||
func (m *msgServer) GetMessagesReactionExtensions(ctx context.Context, req *msg.GetMessagesReactionExtensionsReq) (resp *msg.GetMessagesReactionExtensionsResp, err error) {
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
//var rResp msg.GetMessageListReactionExtensionsResp
|
||||
//for _, messageValue := range req.MessageReactionKeyList {
|
||||
// var oneMessage msg.SingleMessageExtensionResult
|
||||
// oneMessage.ClientMsgID = messageValue.ClientMsgID
|
||||
//
|
||||
// isExists, err := db.DB.JudgeMessageReactionExist(messageValue.ClientMsgID, req.SessionType)
|
||||
// if err != nil {
|
||||
// rResp.ErrCode = 100
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// return &rResp, nil
|
||||
// }
|
||||
// if isExists {
|
||||
// redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType)
|
||||
// if err != nil {
|
||||
// oneMessage.ErrCode = 100
|
||||
// oneMessage.ErrMsg = err.Error()
|
||||
// rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
// continue
|
||||
// }
|
||||
// keyMap := make(map[string]*sdkws.KeyValue)
|
||||
//
|
||||
// for k, v := range redisValue {
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// utils.JsonStringToStruct(v, temp)
|
||||
// keyMap[k] = temp
|
||||
// }
|
||||
// oneMessage.Pb2Model = keyMap
|
||||
//
|
||||
// } else {
|
||||
// mongoValue, err := db.DB.GetExtendMsg(req.conversationID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
|
||||
// if err != nil {
|
||||
// oneMessage.ErrCode = 100
|
||||
// oneMessage.ErrMsg = err.Error()
|
||||
// rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
// continue
|
||||
// }
|
||||
// keyMap := make(map[string]*sdkws.KeyValue)
|
||||
//
|
||||
// for k, v := range mongoValue.Pb2Model {
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// temp.TypeKey = v.TypeKey
|
||||
// temp.Value = v.Value
|
||||
// temp.LatestUpdateTime = v.LatestUpdateTime
|
||||
// keyMap[k] = temp
|
||||
// }
|
||||
// oneMessage.Pb2Model = keyMap
|
||||
// }
|
||||
// rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
|
||||
//}
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
||||
return resp, nil
|
||||
|
||||
}
|
||||
|
||||
func (m *msgServer) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (m *msgServer) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessagesReactionExtensionsReq) (resp *msg.DeleteMessagesReactionExtensionsResp, err error) {
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m args is:", req.String())
|
||||
//var rResp msg.DeleteMessagesReactionExtensionsResp
|
||||
//callbackResp := notification.callbackDeleteMessageReactionExtensions(req)
|
||||
//if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
||||
// rResp.ErrCode = int32(callbackResp.ErrCode)
|
||||
// rResp.ErrMsg = callbackResp.ErrMsg
|
||||
// for _, value := range req.Pb2Model {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = callbackResp.ErrMsg
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
//}
|
||||
////if ExternalExtension
|
||||
//if req.IsExternalExtensions {
|
||||
// rResp.Result = callbackResp.ResultReactionExtensionList
|
||||
// notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.conversationID, req.SessionType, req, &rResp, false, false)
|
||||
// return &rResp, nil
|
||||
//
|
||||
//}
|
||||
//for _, v := range callbackResp.ResultReactionExtensions {
|
||||
// if v.ErrCode != 0 {
|
||||
// func(req *[]*sdkws.KeyValue, typeKey string) {
|
||||
// for i := 0; i < len(*req); i++ {
|
||||
// if (*req)[i].TypeKey == typeKey {
|
||||
// *req = append((*req)[:i], (*req)[i+1:]...)
|
||||
// }
|
||||
// }
|
||||
// }(&req.Pb2Model, v.KeyValue.TypeKey)
|
||||
// rResp.Result = append(rResp.Result, v)
|
||||
// }
|
||||
//}
|
||||
//isExists, err := db.DB.JudgeMessageReactionExist(req.ClientMsgID, req.SessionType)
|
||||
//if err != nil {
|
||||
// rResp.ErrCode = 100
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// for _, value := range req.Pb2Model {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
//}
|
||||
//
|
||||
//if isExists {
|
||||
// log.Debug(req.OperationID, "redis handle this delete", req.String())
|
||||
// for _, v := range req.Pb2Model {
|
||||
// err := m.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
|
||||
// if err != nil {
|
||||
// setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||
// continue
|
||||
// }
|
||||
//
|
||||
// redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||
// if err != nil && err != go_redis.Nil {
|
||||
// setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
|
||||
// continue
|
||||
// }
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// utils.JsonStringToStruct(redisValue, temp)
|
||||
// if v.LatestUpdateTime != temp.LatestUpdateTime {
|
||||
// setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||
// continue
|
||||
// } else {
|
||||
// newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
|
||||
// if newErr != nil {
|
||||
// setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
|
||||
// continue
|
||||
// }
|
||||
// setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
|
||||
// }
|
||||
// }
|
||||
//} else {
|
||||
// err := m.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
|
||||
// if err != nil {
|
||||
// rResp.ErrCode = 100
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// for _, value := range req.Pb2Model {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
// }
|
||||
// mongoValue, err := db.DB.GetExtendMsg(req.conversationID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
|
||||
// if err != nil {
|
||||
// rResp.ErrCode = 200
|
||||
// rResp.ErrMsg = err.Error()
|
||||
// for _, value := range req.Pb2Model {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// return &rResp, nil
|
||||
// }
|
||||
// setValue := make(map[string]*sdkws.KeyValue)
|
||||
// for _, v := range req.Pb2Model {
|
||||
//
|
||||
// temp := new(sdkws.KeyValue)
|
||||
// if vv, ok := mongoValue.Pb2Model[v.TypeKey]; ok {
|
||||
// utils.CopyStructFields(temp, &vv)
|
||||
// if v.LatestUpdateTime != vv.LatestUpdateTime {
|
||||
// setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
|
||||
// continue
|
||||
// }
|
||||
// } else {
|
||||
// setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
|
||||
// continue
|
||||
// }
|
||||
// temp.TypeKey = v.TypeKey
|
||||
// setValue[v.TypeKey] = temp
|
||||
// }
|
||||
// err = db.DB.DeleteReactionExtendMsgSet(req.conversationID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
|
||||
// if err != nil {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// temp.ErrMsg = err.Error()
|
||||
// temp.ErrCode = 100
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// } else {
|
||||
// for _, value := range setValue {
|
||||
// temp := new(msg.KeyValueResp)
|
||||
// temp.KeyValue = value
|
||||
// rResp.Result = append(rResp.Result, temp)
|
||||
// }
|
||||
// }
|
||||
// lockErr := m.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
|
||||
// if lockErr != nil {
|
||||
// log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
|
||||
// }
|
||||
//
|
||||
//}
|
||||
//notification.ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.conversationID, req.SessionType, req, &rResp, false, isExists)
|
||||
//log.Debug(req.OperationID, utils.GetSelfFuncName(), "m return is:", rResp.String())
|
||||
return resp, nil
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
cbapi "github.com/OpenIMSDK/Open-IM-Server/pkg/callbackstruct"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
)
|
||||
|
||||
func callbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
req := &cbapi.CallbackBeforeSetMessageReactionExtReq{
|
||||
OperationID: mcontext.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand,
|
||||
ConversationID: setReq.ConversationID,
|
||||
OpUserID: mcontext.GetOpUserID(ctx),
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensions,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsReact: setReq.IsReact,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||
}
|
||||
resp := &cbapi.CallbackBeforeSetMessageReactionExtResp{}
|
||||
if err := http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg); err != nil {
|
||||
return err
|
||||
}
|
||||
setReq.MsgFirstModifyTime = resp.MsgFirstModifyTime
|
||||
return nil
|
||||
}
|
||||
|
||||
func callbackDeleteMessageReactionExtensions(ctx context.Context, setReq *msg.DeleteMessagesReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
req := &cbapi.CallbackDeleteMessageReactionExtReq{
|
||||
OperationID: setReq.OperationID,
|
||||
CallbackCommand: constant.CallbackBeforeDeleteMessageReactionExtensionsCommand,
|
||||
ConversationID: setReq.ConversationID,
|
||||
OpUserID: setReq.OpUserID,
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensions,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||
}
|
||||
resp := &cbapi.CallbackDeleteMessageReactionExtResp{}
|
||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func callbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
req := &cbapi.CallbackGetMessageListReactionExtReq{
|
||||
OperationID: mcontext.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
|
||||
ConversationID: getReq.ConversationID,
|
||||
OpUserID: mcontext.GetOperationID(ctx),
|
||||
SessionType: getReq.SessionType,
|
||||
TypeKeyList: getReq.TypeKeys,
|
||||
}
|
||||
resp := &cbapi.CallbackGetMessageListReactionExtResp{}
|
||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func callbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error {
|
||||
req := &cbapi.CallbackAddMessageReactionExtReq{
|
||||
OperationID: mcontext.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
|
||||
ConversationID: setReq.ConversationID,
|
||||
OpUserID: mcontext.GetOperationID(ctx),
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensions,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsReact: setReq.IsReact,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||
}
|
||||
resp := &cbapi.CallbackAddMessageReactionExtResp{}
|
||||
return http.CallBackPostReturn(ctx, cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
@@ -21,7 +20,6 @@ type MessageInterceptorChain []MessageInterceptorFunc
|
||||
type msgServer struct {
|
||||
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
|
||||
MsgDatabase controller.CommonMsgDatabase
|
||||
ExtendMsgDatabase controller.ExtendMsgDatabase
|
||||
Group *rpcclient.GroupRpcClient
|
||||
User *rpcclient.UserRpcClient
|
||||
Conversation *rpcclient.ConversationRpcClient
|
||||
@@ -62,9 +60,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
}
|
||||
cacheModel := cache.NewMsgCacheModel(rdb)
|
||||
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
|
||||
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
|
||||
extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCacheModel, tx.NewMongo(mongo.GetClient()))
|
||||
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
|
||||
conversationClient := rpcclient.NewConversationRpcClient(client)
|
||||
userRpcClient := rpcclient.NewUserRpcClient(client)
|
||||
@@ -75,7 +70,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
User: &userRpcClient,
|
||||
Group: &groupRpcClient,
|
||||
MsgDatabase: msgDatabase,
|
||||
ExtendMsgDatabase: extendMsgDatabase,
|
||||
RegisterCenter: client,
|
||||
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
|
||||
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
|
||||
|
||||
@@ -26,8 +26,10 @@ func (c *MsgTool) ConversationsDestructMsgs() {
|
||||
log.ZError(ctx, "updateUsersConversationFiled failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
continue
|
||||
}
|
||||
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
|
||||
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
if len(seqs) > 0 {
|
||||
if err := c.msgNotificationSender.UserDeleteMsgsNotification(ctx, conversation.OwnerUserID, conversation.ConversationID, seqs); err != nil {
|
||||
log.ZError(ctx, "userDeleteMsgsNotification failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user