mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-21 01:09:01 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
@@ -58,16 +58,16 @@ func StartTransfer(prometheusPort int) error {
|
||||
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
|
||||
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
|
||||
conversationRpcClient := rpcclient.NewConversationClient(client)
|
||||
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient)
|
||||
groupRpcClient := rpcclient.NewGroupClient(client)
|
||||
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient, groupRpcClient)
|
||||
msgTransfer.initPrometheus()
|
||||
return msgTransfer.Start(prometheusPort)
|
||||
}
|
||||
|
||||
func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase,
|
||||
extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase,
|
||||
conversationRpcClient *rpcclient.ConversationClient) *MsgTransfer {
|
||||
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient),
|
||||
conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *MsgTransfer {
|
||||
return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
|
||||
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)}
|
||||
}
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ type OnlineHistoryRedisConsumerHandler struct {
|
||||
groupRpcClient *rpcclient.GroupClient
|
||||
}
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *OnlineHistoryRedisConsumerHandler {
|
||||
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient, groupRpcClient *rpcclient.GroupClient) *OnlineHistoryRedisConsumerHandler {
|
||||
var och OnlineHistoryRedisConsumerHandler
|
||||
och.msgDatabase = database
|
||||
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
|
||||
@@ -73,6 +73,7 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase,
|
||||
go och.Run(i)
|
||||
}
|
||||
och.conversationRpcClient = conversationRpcClient
|
||||
och.groupRpcClient = groupRpcClient
|
||||
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
|
||||
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.Ws2mschat.Topic},
|
||||
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
|
||||
@@ -90,11 +91,11 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
ctxMsgList := msgChannelValue.ctxMsgList
|
||||
ctx := msgChannelValue.ctx
|
||||
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "conversationID", msgChannelValue.conversationID)
|
||||
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.conversationID, ctxMsgList)
|
||||
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
|
||||
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
|
||||
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
|
||||
och.handleMsg(ctx, msgChannelValue.conversationID, storageMsgList, notStorageMsgList)
|
||||
och.handleNotification(ctx, msgChannelValue.conversationID, storageNotificationList, notStorageNotificationList)
|
||||
och.handleMsg(ctx, utils.GetChatConversationIDByMsg(ctxMsgList[0].message), storageMsgList, notStorageMsgList)
|
||||
och.handleNotification(ctx, utils.GetNotificationConversationID(ctxMsgList[0].message), storageNotificationList, notStorageNotificationList)
|
||||
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.conversationID, modifyMsgList); err != nil {
|
||||
log.ZError(ctx, "msg to modify mq error", err, "conversationID", msgChannelValue.conversationID, "modifyMsgList", modifyMsgList)
|
||||
}
|
||||
@@ -104,7 +105,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
|
||||
}
|
||||
|
||||
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
|
||||
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
|
||||
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
|
||||
isStorage := func(msg *sdkws.MsgData) bool {
|
||||
options2 := utils.Options(msg.Options)
|
||||
if options2.IsHistory() {
|
||||
@@ -119,19 +120,27 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(conversation
|
||||
for _, v := range totalMsgs {
|
||||
options := utils.Options(v.message.Options)
|
||||
if !options.IsNotNotification() {
|
||||
// 原通知
|
||||
notificationMsg := proto.Clone(v.message).(*sdkws.MsgData)
|
||||
// clone msg from notificationMsg
|
||||
if options.IsSendMsg() {
|
||||
msg := proto.Clone(v.message).(*sdkws.MsgData)
|
||||
// 消息
|
||||
if v.message.Options != nil {
|
||||
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false))
|
||||
msg.Options = utils.NewMsgOptions()
|
||||
}
|
||||
storageMsgList = append(storageMsgList, v.message)
|
||||
if options.IsOfflinePush() {
|
||||
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithOfflinePush(false))
|
||||
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithOfflinePush(true))
|
||||
}
|
||||
if options.IsUnreadCount() {
|
||||
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithUnreadCount(false))
|
||||
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithUnreadCount(true))
|
||||
}
|
||||
storageMsgList = append(storageMsgList, msg)
|
||||
}
|
||||
if isStorage(notificationMsg) {
|
||||
storageNotificatoinList = append(storageNotificatoinList, notificationMsg)
|
||||
if isStorage(v.message) {
|
||||
storageNotificatoinList = append(storageNotificatoinList, v.message)
|
||||
} else {
|
||||
notStorageNotificationList = append(notStorageNotificationList, notificationMsg)
|
||||
notStorageNotificationList = append(notStorageNotificationList, v.message)
|
||||
}
|
||||
} else {
|
||||
if isStorage(v.message) {
|
||||
@@ -155,7 +164,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
||||
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageList)
|
||||
return
|
||||
}
|
||||
log.ZDebug(ctx, "success to next topic")
|
||||
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
|
||||
och.msgDatabase.MsgToMongoMQ(ctx, conversationID, storageList, lastSeq)
|
||||
och.toPushTopic(ctx, conversationID, storageList)
|
||||
}
|
||||
|
||||
@@ -195,9 +195,6 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbConver
|
||||
if req.Conversation.DraftTextTime != nil {
|
||||
m["draft_text_time"] = req.Conversation.DraftTextTime.Value
|
||||
}
|
||||
if req.Conversation.UnreadCount != nil {
|
||||
m["unread_count"] = req.Conversation.UnreadCount.Value
|
||||
}
|
||||
if req.Conversation.AttachedInfo != nil {
|
||||
m["attached_info"] = req.Conversation.AttachedInfo.Value
|
||||
}
|
||||
@@ -283,7 +280,8 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
|
||||
|
||||
func (c *conversationServer) DelGroupChatConversations(ctx context.Context, req *pbConversation.DelGroupChatConversationsReq) (*pbConversation.DelGroupChatConversationsResp, error) {
|
||||
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, req.OwnerUserID,
|
||||
utils.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupID), map[string]interface{}{"max_seq": req.MaxSeq}); err != nil {
|
||||
utils.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupID),
|
||||
map[string]interface{}{"max_seq": req.MaxSeq}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbConversation.DelGroupChatConversationsResp{}, nil
|
||||
@@ -315,3 +313,11 @@ func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Contex
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req *pbConversation.GetUserConversationIDsHashReq) (*pbConversation.GetUserConversationIDsHashResp, error) {
|
||||
hash, err := c.conversationDatabase.GetUserConversationIDsHash(ctx, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pbConversation.GetUserConversationIDsHashResp{Hash: hash}, nil
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
|
||||
return nil, err
|
||||
}
|
||||
|
||||
friendRequest := tablerelation.FriendRequestModel{FromUserID: req.ToUserID, ToUserID: req.FromUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
|
||||
friendRequest := tablerelation.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
|
||||
if req.HandleResult == constant.FriendResponseAgree {
|
||||
err := s.friendDatabase.AgreeFriendRequest(ctx, &friendRequest)
|
||||
if err != nil {
|
||||
@@ -222,8 +222,6 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *p
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.ZInfo(ctx, "test log out")
|
||||
log.ZInfo(ctx, "GetPaginationFriendsApplyFrom.xxx", "total", total, "friendRequests", len(friendRequests), "userRpcClient", s.userRpcClient == nil)
|
||||
resp.FriendRequests, err = convert.FriendRequestDB2Pb(ctx, friendRequests, s.userRpcClient.GetUsersInfoMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -62,6 +62,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
return utils.Slice(users, func(e *sdkws.UserInfo) rpcclient.CommonUser { return e }), nil
|
||||
}),
|
||||
conversationRpcClient: rpcclient.NewConversationClient(client),
|
||||
msgRpcClient: rpcclient.NewMsgClient(client),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@@ -751,11 +752,12 @@ func (s *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
maxSeq, ok := resp.MaxSeqs[conevrsationID]
|
||||
if !ok {
|
||||
return errs.ErrInternalServer.Wrap("get max seq error")
|
||||
}
|
||||
return s.conversationRpcClient.DelGroupChatConversations(ctx, userIDs, groupID, maxSeq)
|
||||
//log.ZInfo(ctx, "deleteMemberAndSetConversationSeq.GetMaxSeq", "maxSeqs", resp.MaxSeqs, "conevrsationID", conevrsationID)
|
||||
//maxSeq, ok := resp.MaxSeqs[conevrsationID]
|
||||
//if !ok {
|
||||
// return errs.ErrInternalServer.Wrap("get max seq error")
|
||||
//}
|
||||
return s.conversationRpcClient.DelGroupChatConversations(ctx, userIDs, groupID, resp.MaxSeqs[conevrsationID])
|
||||
}
|
||||
|
||||
func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInfoReq) (*pbGroup.SetGroupInfoResp, error) {
|
||||
|
||||
@@ -62,10 +62,14 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var isSend bool = true
|
||||
conversationID := utils.GetConversationIDByMsg(req.MsgData)
|
||||
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if !utils.IsNotification(conversationID) {
|
||||
isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if isSend {
|
||||
err = m.MsgDatabase.MsgToMQ(ctx, conversationID, req.MsgData)
|
||||
|
||||
+26
-23
@@ -23,22 +23,24 @@ import (
|
||||
|
||||
type MessageInterceptorChain []MessageInterceptorFunc
|
||||
type msgServer struct {
|
||||
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
|
||||
MsgDatabase controller.CommonMsgDatabase
|
||||
ExtendMsgDatabase controller.ExtendMsgDatabase
|
||||
Group *rpcclient.GroupClient
|
||||
User *rpcclient.UserClient
|
||||
Conversation *rpcclient.ConversationClient
|
||||
friend *rpcclient.FriendClient
|
||||
black *rpcclient.BlackClient
|
||||
GroupLocalCache *localcache.GroupLocalCache
|
||||
MessageLocker MessageLocker
|
||||
Handlers MessageInterceptorChain
|
||||
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
|
||||
MsgDatabase controller.CommonMsgDatabase
|
||||
ExtendMsgDatabase controller.ExtendMsgDatabase
|
||||
Group *rpcclient.GroupClient
|
||||
User *rpcclient.UserClient
|
||||
Conversation *rpcclient.ConversationClient
|
||||
friend *rpcclient.FriendClient
|
||||
black *rpcclient.BlackClient
|
||||
GroupLocalCache *localcache.GroupLocalCache
|
||||
ConversationLocalCache *localcache.ConversationLocalCache
|
||||
MessageLocker MessageLocker
|
||||
Handlers MessageInterceptorChain
|
||||
}
|
||||
|
||||
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
|
||||
m.Handlers = append(m.Handlers, interceptorFunc...)
|
||||
}
|
||||
|
||||
func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsgReq) error {
|
||||
for _, handler := range m.Handlers {
|
||||
msgData, err := handler(ctx, req)
|
||||
@@ -49,6 +51,7 @@ func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsg
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
|
||||
rdb, err := cache.NewRedis()
|
||||
if err != nil {
|
||||
@@ -66,16 +69,17 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
|
||||
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
|
||||
|
||||
s := &msgServer{
|
||||
Conversation: rpcclient.NewConversationClient(client),
|
||||
User: rpcclient.NewUserClient(client),
|
||||
Group: rpcclient.NewGroupClient(client),
|
||||
MsgDatabase: msgDatabase,
|
||||
ExtendMsgDatabase: extendMsgDatabase,
|
||||
RegisterCenter: client,
|
||||
GroupLocalCache: localcache.NewGroupLocalCache(client),
|
||||
black: rpcclient.NewBlackClient(client),
|
||||
friend: rpcclient.NewFriendClient(client),
|
||||
MessageLocker: NewLockerMessage(cacheModel),
|
||||
Conversation: rpcclient.NewConversationClient(client),
|
||||
User: rpcclient.NewUserClient(client),
|
||||
Group: rpcclient.NewGroupClient(client),
|
||||
MsgDatabase: msgDatabase,
|
||||
ExtendMsgDatabase: extendMsgDatabase,
|
||||
RegisterCenter: client,
|
||||
GroupLocalCache: localcache.NewGroupLocalCache(client),
|
||||
ConversationLocalCache: localcache.NewConversationLocalCache(client),
|
||||
black: rpcclient.NewBlackClient(client),
|
||||
friend: rpcclient.NewFriendClient(client),
|
||||
MessageLocker: NewLockerMessage(cacheModel),
|
||||
}
|
||||
s.addInterceptorHandler(MessageHasReadEnabled, MessageModifyCallback)
|
||||
s.initPrometheus()
|
||||
@@ -127,7 +131,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
|
||||
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationIDs, err := m.Conversation.GetConversationIDs(ctx, req.UserID)
|
||||
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -168,7 +172,6 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag
|
||||
continue
|
||||
}
|
||||
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
|
||||
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
|
||||
Reference in New Issue
Block a user