Files
open-im-server/internal/rpc/msg/send_pull.go
T

220 lines
6.5 KiB
Go
Raw Normal View History

2023-02-10 22:10:37 +08:00
package msg
import (
"context"
2023-04-28 18:33:33 +08:00
"sync"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
promePkg "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2023-04-28 18:33:33 +08:00
"google.golang.org/protobuf/proto"
2023-02-10 22:10:37 +08:00
)
2023-02-13 15:52:41 +08:00
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
2023-02-23 11:26:46 +08:00
resp = &msg.SendMsgResp{}
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
2023-02-13 15:52:41 +08:00
if _, err = m.messageVerification(ctx, req); err != nil {
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
2023-02-13 15:52:41 +08:00
return nil, err
}
2023-05-04 15:54:04 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
// callback
2023-03-07 12:19:30 +08:00
if err = CallbackAfterSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
2023-05-04 15:54:04 +08:00
resp.SendTime = req.MsgData.SendTime
resp.ServerMsgID = req.MsgData.ServerMsgID
resp.ClientMsgID = req.MsgData.ClientMsgID
2023-02-13 15:52:41 +08:00
return resp, nil
}
2023-05-05 12:19:04 +08:00
2023-02-13 15:52:41 +08:00
func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
2023-05-04 15:54:04 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-05-04 15:54:04 +08:00
if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData)
2023-02-13 15:52:41 +08:00
if err != nil {
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
2023-03-03 18:08:26 +08:00
resp = &msg.SendMsgResp{
2023-05-04 15:54:04 +08:00
ServerMsgID: req.MsgData.ServerMsgID,
ClientMsgID: req.MsgData.ClientMsgID,
SendTime: req.MsgData.SendTime,
2023-03-03 18:08:26 +08:00
}
2023-02-13 15:52:41 +08:00
return resp, nil
}
func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.SingleChatMsgRecvSuccessCounter)
2023-02-13 15:52:41 +08:00
_, err = m.messageVerification(ctx, req)
if err != nil {
return nil, err
}
2023-05-16 18:37:14 +08:00
var isSend bool = true
2023-05-10 17:18:04 +08:00
conversationID := utils.GetConversationIDByMsg(req.MsgData)
2023-05-16 18:37:14 +08:00
if utils.MsgIsNotification(req.MsgData) {
isSend, err = m.modifyMessageByUserMessageReceiveOpt(ctx, req.MsgData.RecvID, conversationID, constant.SingleChatType, req)
if err != nil {
return nil, err
}
2023-02-13 15:52:41 +08:00
}
if isSend {
2023-05-10 17:18:04 +08:00
err = m.MsgDatabase.MsgToMQ(ctx, conversationID, req.MsgData)
2023-02-13 15:52:41 +08:00
if err != nil {
2023-03-07 12:19:30 +08:00
return nil, errs.ErrInternalServer.Wrap("insert to mq")
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
2023-02-14 22:04:03 +08:00
err = CallbackAfterSendSingleMsg(ctx, req)
2023-03-07 12:19:30 +08:00
if err != nil && err != errs.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter)
2023-03-03 18:08:26 +08:00
resp = &msg.SendMsgResp{
2023-05-04 15:54:04 +08:00
ServerMsgID: req.MsgData.ServerMsgID,
ClientMsgID: req.MsgData.ClientMsgID,
SendTime: req.MsgData.SendTime,
2023-03-03 18:08:26 +08:00
}
2023-02-13 15:52:41 +08:00
return resp, nil
}
func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) {
// callback
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.GroupChatMsgRecvSuccessCounter)
2023-02-13 15:52:41 +08:00
var memberUserIDList []string
if memberUserIDList, err = m.messageVerification(ctx, req); err != nil {
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.GroupChatMsgProcessFailedCounter)
2023-02-13 15:52:41 +08:00
return nil, err
}
var addUidList []string
switch req.MsgData.ContentType {
case constant.MemberKickedNotification:
var tips sdkws.TipsComm
var memberKickedTips sdkws.MemberKickedTips
err := proto.Unmarshal(req.MsgData.Content, &tips)
if err != nil {
2023-02-14 10:57:52 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
err = proto.Unmarshal(tips.Detail, &memberKickedTips)
if err != nil {
2023-02-14 10:57:52 +08:00
return nil, err
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
for _, v := range memberKickedTips.KickedUserList {
addUidList = append(addUidList, v.UserID)
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
case constant.MemberQuitNotification:
addUidList = append(addUidList, req.MsgData.SendID)
default:
}
if len(addUidList) > 0 {
memberUserIDList = append(memberUserIDList, addUidList...)
}
//split parallel send
var wg sync.WaitGroup
var split = 20
msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData}
2023-02-13 16:15:16 +08:00
mErr := make([]error, 0)
var mutex sync.RWMutex
2023-02-13 15:52:41 +08:00
remain := len(memberUserIDList) % split
for i := 0; i < len(memberUserIDList)/split; i++ {
wg.Add(1)
tmp := valueCopy(req)
2023-05-08 12:39:45 +08:00
go func(i int) {
2023-02-13 16:16:47 +08:00
err := m.sendMsgToGroupOptimization(ctx, memberUserIDList[i*split:(i+1)*split], tmp, &wg)
2023-02-13 16:15:16 +08:00
if err != nil {
mutex.Lock()
mErr = append(mErr, err)
mutex.Unlock()
}
2023-05-08 12:39:45 +08:00
}(i)
2023-02-13 15:52:41 +08:00
}
if remain > 0 {
wg.Add(1)
tmp := valueCopy(req)
2023-02-13 16:16:47 +08:00
go m.sendMsgToGroupOptimization(ctx, memberUserIDList[split*(len(memberUserIDList)/split):], tmp, &wg)
2023-02-13 15:52:41 +08:00
}
wg.Wait()
// callback
2023-02-14 22:04:03 +08:00
err = CallbackAfterSendGroupMsg(ctx, req)
2023-03-07 12:19:30 +08:00
if err != nil && err != errs.ErrCallbackContinue {
2023-02-13 15:52:41 +08:00
return nil, err
}
for _, v := range mErr {
if v != nil {
return nil, v
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}
if req.MsgData.ContentType == constant.AtText {
go func() {
var conversationReq pbConversation.ModifyConversationFieldReq
var tag bool
var atUserID []string
conversation := pbConversation.Conversation{
OwnerUserID: req.MsgData.SendID,
2023-05-04 15:06:23 +08:00
ConversationID: utils.GetConversationIDBySessionType(constant.GroupChatType, req.MsgData.GroupID),
2023-02-13 15:52:41 +08:00
ConversationType: constant.GroupChatType,
GroupID: req.MsgData.GroupID,
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
conversationReq.Conversation = &conversation
conversationReq.FieldType = constant.FieldGroupAtType
tagAll := utils.IsContain(constant.AtAllString, req.MsgData.AtUserIDList)
if tagAll {
2023-02-13 16:15:16 +08:00
atUserID = utils.DifferenceString([]string{constant.AtAllString}, req.MsgData.AtUserIDList)
2023-02-13 15:52:41 +08:00
if len(atUserID) == 0 { //just @everyone
conversationReq.UserIDList = memberUserIDList
conversation.GroupAtType = constant.AtAll
} else { //@Everyone and @other people
conversationReq.UserIDList = atUserID
conversation.GroupAtType = constant.AtAllAtMe
tag = true
}
} else {
conversationReq.UserIDList = req.MsgData.AtUserIDList
conversation.GroupAtType = constant.AtMe
}
2023-02-13 16:15:16 +08:00
2023-02-14 16:33:18 +08:00
err := m.Conversation.ModifyConversationField(ctx, &conversationReq)
2023-02-10 22:10:37 +08:00
if err != nil {
2023-02-13 15:52:41 +08:00
return
2023-02-10 22:10:37 +08:00
}
2023-02-13 16:15:16 +08:00
2023-02-13 15:52:41 +08:00
if tag {
conversationReq.UserIDList = utils.DifferenceString(atUserID, memberUserIDList)
conversation.GroupAtType = constant.AtAll
2023-02-14 16:33:18 +08:00
err := m.Conversation.ModifyConversationField(ctx, &conversationReq)
2023-02-13 15:52:41 +08:00
if err != nil {
2023-02-13 16:15:16 +08:00
return
2023-02-13 15:52:41 +08:00
}
2023-02-10 22:10:37 +08:00
}
2023-02-13 15:52:41 +08:00
}()
}
2023-02-13 16:15:16 +08:00
//
2023-02-10 22:10:37 +08:00
2023-02-24 11:13:16 +08:00
promePkg.Inc(promePkg.GroupChatMsgProcessSuccessCounter)
2023-02-13 15:52:41 +08:00
resp.SendTime = msgToMQSingle.MsgData.SendTime
resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID
resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID
return resp, nil
2023-02-13 16:15:16 +08:00
}