Files
open-im-server/internal/rpc/msg/send_pull.go
T

306 lines
9.3 KiB
Go
Raw Normal View History

2023-02-10 22:10:37 +08:00
package msg
import (
2023-02-23 19:15:30 +08:00
"OpenIM/pkg/common/constant"
promePkg "OpenIM/pkg/common/prome"
pbConversation "OpenIM/pkg/proto/conversation"
"OpenIM/pkg/proto/msg"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
2023-02-10 22:10:37 +08:00
"context"
2023-02-13 15:52:41 +08:00
"github.com/golang/protobuf/proto"
2023-02-10 22:10:37 +08:00
"sync"
)
2023-02-13 15:52:41 +08:00
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
2023-02-23 11:26:46 +08:00
resp = &msg.SendMsgResp{}
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
2023-02-13 15:52:41 +08:00
// callback
2023-02-23 17:28:57 +08:00
if err = CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
2023-02-10 22:10:37 +08:00
2023-02-13 15:52:41 +08:00
if _, err = m.messageVerification(ctx, req); err != nil {
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
2023-02-13 15:52:41 +08:00
return nil, err
}
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
2023-02-24 11:01:33 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
// callback
2023-03-01 15:32:26 +08:00
if err = CallbackAfterSendGroupMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
2023-02-13 15:52:41 +08:00
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
}
func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
2023-02-24 11:01:33 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
2023-02-24 11:01:33 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.SingleChatMsgRecvSuccessCounter)
2023-02-23 17:28:57 +08:00
if err = CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
_, err = m.messageVerification(ctx, req)
if err != nil {
return nil, err
}
2023-02-21 16:48:10 +08:00
isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, req.MsgData.SendID, constant.SingleChatType, req)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
}
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
if isSend {
2023-02-24 11:01:33 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, constant.ErrInternalServer.Wrap("insert to mq")
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself
2023-02-24 11:01:33 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, constant.ErrInternalServer.Wrap("insert to mq")
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
2023-02-14 22:04:03 +08:00
err = CallbackAfterSendSingleMsg(ctx, req)
2023-02-23 17:28:57 +08:00
if err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter)
2023-02-13 15:52:41 +08:00
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
}
func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
// callback
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.GroupChatMsgRecvSuccessCounter)
2023-02-14 22:04:03 +08:00
err = CallbackBeforeSendGroupMsg(ctx, req)
2023-02-23 17:28:57 +08:00
if err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
var memberUserIDList []string
if memberUserIDList, err = m.messageVerification(ctx, req); err != nil {
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.GroupChatMsgProcessFailedCounter)
2023-02-13 15:52:41 +08:00
return nil, err
}
var addUidList []string
switch req.MsgData.ContentType {
case constant.MemberKickedNotification:
var tips sdkws.TipsComm
var memberKickedTips sdkws.MemberKickedTips
err := proto.Unmarshal(req.MsgData.Content, &tips)
if err != nil {
2023-02-14 10:57:52 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
err = proto.Unmarshal(tips.Detail, &memberKickedTips)
if err != nil {
2023-02-14 10:57:52 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
for _, v := range memberKickedTips.KickedUserList {
addUidList = append(addUidList, v.UserID)
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
case constant.MemberQuitNotification:
addUidList = append(addUidList, req.MsgData.SendID)
default:
}
if len(addUidList) > 0 {
memberUserIDList = append(memberUserIDList, addUidList...)
}
//split parallel send
var wg sync.WaitGroup
var split = 20
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
2023-02-13 16:15:16 +08:00
mErr := make([]error, 0)
var mutex sync.RWMutex
2023-02-13 15:52:41 +08:00
remain := len(memberUserIDList) % split
for i := 0; i < len(memberUserIDList)/split; i++ {
wg.Add(1)
tmp := valueCopy(req)
2023-02-13 16:15:16 +08:00
go func() {
2023-02-13 16:16:47 +08:00
err := m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &wg)
2023-02-13 16:15:16 +08:00
if err != nil {
mutex.Lock()
mErr = append(mErr, err)
mutex.Unlock()
}
}()
2023-02-13 15:52:41 +08:00
}
if remain > 0 {
wg.Add(1)
tmp := valueCopy(req)
2023-02-13 16:16:47 +08:00
go m.sendMsgToGroupOptimization(ctx, memberUserIDList[split*(len(memberUserIDList)/split):], tmp, &wg)
2023-02-13 15:52:41 +08:00
}
wg.Wait()
// callback
2023-02-14 22:04:03 +08:00
err = CallbackAfterSendGroupMsg(ctx, req)
2023-03-01 15:32:26 +08:00
if err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
for _, v := range mErr {
if v != nil {
return nil, v
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
if req.MsgData.ContentType == constant.AtText {
go func() {
var conversationReq pbConversation.ModifyConversationFieldReq
var tag bool
var atUserID []string
conversation := pbConversation.Conversation{
OwnerUserID: req.MsgData.SendID,
2023-02-13 16:15:16 +08:00
ConversationID: utils.GetConversationIDBySessionType(req.MsgData.GroupID, constant.GroupChatType),
2023-02-13 15:52:41 +08:00
ConversationType: constant.GroupChatType,
GroupID: req.MsgData.GroupID,
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
conversationReq.Conversation = &conversation
conversationReq.FieldType = constant.FieldGroupAtType
tagAll := utils.IsContain(constant.AtAllString, req.MsgData.AtUserIDList)
if tagAll {
2023-02-13 16:15:16 +08:00
atUserID = utils.DifferenceString([]string{constant.AtAllString}, req.MsgData.AtUserIDList)
2023-02-13 15:52:41 +08:00
if len(atUserID) == 0 { //just @everyone
conversationReq.UserIDList = memberUserIDList
conversation.GroupAtType = constant.AtAll
} else { //@Everyone and @other people
conversationReq.UserIDList = atUserID
conversation.GroupAtType = constant.AtAllAtMe
tag = true
}
} else {
conversationReq.UserIDList = req.MsgData.AtUserIDList
conversation.GroupAtType = constant.AtMe
}
2023-02-13 16:15:16 +08:00
2023-02-14 16:33:18 +08:00
err := m.Conversation.ModifyConversationField(ctx, &conversationReq)
2023-02-10 22:10:37 +08:00
if err != nil {
2023-02-13 15:52:41 +08:00
return
2023-02-10 22:10:37 +08:00
}
2023-02-13 16:15:16 +08:00
2023-02-13 15:52:41 +08:00
if tag {
conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList)
conversation.GroupAtType = constant.AtAll
2023-02-14 16:33:18 +08:00
err := m.Conversation.ModifyConversationField(ctx, &conversationReq)
2023-02-13 15:52:41 +08:00
if err != nil {
2023-02-13 16:15:16 +08:00
return
2023-02-13 15:52:41 +08:00
}
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}()
}
2023-02-13 16:15:16 +08:00
//
2023-02-10 22:10:37 +08:00
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.GroupChatMsgProcessSuccessCounter)
2023-02-13 15:52:41 +08:00
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
2023-02-13 16:15:16 +08:00
}
2023-02-13 15:52:41 +08:00
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
resp = &msg.SendMsgResp{}
flag := isMessageHasReadEnabled(req.MsgData)
if !flag {
return nil, constant.ErrMessageHasReadDisable.Wrap()
}
m.encapsulateMsgData(req.MsgData)
2023-02-23 17:28:57 +08:00
if err := CallbackMsgModify(ctx, req); err != nil && err != constant.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
switch req.MsgData.SessionType {
case constant.SingleChatType:
return m.sendMsgSingleChat(ctx, req)
case constant.GroupChatType:
return m.sendMsgGroupChat(ctx, req)
2023-02-10 22:10:37 +08:00
case constant.NotificationChatType:
2023-02-13 15:52:41 +08:00
return m.sendMsgNotification(ctx, req)
2023-02-10 22:10:37 +08:00
case constant.SuperGroupChatType:
2023-02-13 15:52:41 +08:00
return m.sendMsgSuperGroupChat(ctx, req)
2023-02-10 22:10:37 +08:00
default:
2023-02-13 15:52:41 +08:00
return nil, constant.ErrArgs.Wrap("unknown sessionType")
2023-02-10 22:10:37 +08:00
}
}
2023-02-13 16:36:05 +08:00
func (m *msgServer) GetMaxAndMinSeq(ctx context.Context, req *sdkws.GetMaxAndMinSeqReq) (*sdkws.GetMaxAndMinSeqResp, error) {
2023-02-10 22:10:37 +08:00
resp := new(sdkws.GetMaxAndMinSeqResp)
2023-02-13 16:36:05 +08:00
m2 := make(map[string]*sdkws.MaxAndMinSeq)
2023-02-24 11:01:33 +08:00
maxSeq, err := m.MsgDatabase.GetUserMaxSeq(ctx, req.UserID)
2023-02-13 16:36:05 +08:00
if err != nil {
return nil, err
}
2023-02-24 11:01:33 +08:00
minSeq, err := m.MsgDatabase.GetUserMinSeq(ctx, req.UserID)
2023-02-13 16:36:05 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 16:36:05 +08:00
resp.MaxSeq = maxSeq
resp.MinSeq = minSeq
if len(req.GroupIDList) > 0 {
resp.GroupMaxAndMinSeq = make(map[string]*sdkws.MaxAndMinSeq)
for _, groupID := range req.GroupIDList {
2023-02-24 11:01:33 +08:00
maxSeq, err := m.MsgDatabase.GetGroupMaxSeq(ctx, groupID)
2023-02-13 16:36:05 +08:00
if err != nil {
return nil, err
}
2023-02-24 11:01:33 +08:00
minSeq, err := m.MsgDatabase.GetGroupMinSeq(ctx, groupID)
2023-02-13 16:36:05 +08:00
if err != nil {
return nil, err
}
m2[groupID] = &sdkws.MaxAndMinSeq{
MaxSeq: maxSeq,
MinSeq: minSeq,
}
}
2023-02-10 22:10:37 +08:00
}
return resp, nil
}
2023-02-13 16:36:05 +08:00
func (m *msgServer) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqListReq) (*sdkws.PullMessageBySeqListResp, error) {
resp := &sdkws.PullMessageBySeqListResp{GroupMsgDataList: make(map[string]*sdkws.MsgDataList)}
2023-02-24 11:01:33 +08:00
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, req.Seqs)
2023-02-10 22:10:37 +08:00
if err != nil {
2023-02-13 16:36:05 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 16:36:05 +08:00
resp.List = msgs
for userID, list := range req.GroupSeqList {
2023-02-24 11:01:33 +08:00
msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, userID, req.Seqs)
2023-02-10 22:10:37 +08:00
if err != nil {
2023-02-13 16:36:05 +08:00
return nil, err
}
resp.GroupMsgDataList[userID] = &sdkws.MsgDataList{
MsgDataList: msgs,
2023-02-10 22:10:37 +08:00
}
}
return resp, nil
}