Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

This commit is contained in:
Gordon
2023-02-01 17:37:47 +08:00
84 changed files with 8085 additions and 5713 deletions
+15
View File
@@ -61,6 +61,21 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
if req.Conversation.ConversationType == constant.SuperGroupChatType {
if req.Conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
if err = db.DB.SetSuperGroupUserReceiveNotNotifyMessage(req.Conversation.GroupID, v); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), req.Conversation.GroupID, v)
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
} else {
if err = db.DB.SetSuperGroupUserReceiveNotifyMessage(req.Conversation.GroupID, v); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), req.Conversation.GroupID, v)
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
}
}
}
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt})
case constant.FieldGroupAtType:
+55
View File
@@ -10,6 +10,8 @@ import (
pbGroup "Open_IM/pkg/proto/group"
"Open_IM/pkg/utils"
http2 "net/http"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func callbackBeforeCreateGroup(req *pbGroup.CreateGroupReq) cbApi.CommonCallbackResp {
@@ -126,3 +128,56 @@ func CallbackBeforeMemberJoinGroup(operationID string, groupMember *db.GroupMemb
}
return callbackResp
}
func CallbackBeforeSetGroupMemberInfo(req *pbGroup.SetGroupMemberInfoReq) cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: req.OperationID}
if !config.Config.Callback.CallbackBeforeSetGroupMemberInfo.Enable {
return callbackResp
}
callbackReq := cbApi.CallbackBeforeSetGroupMemberInfoReq{
CallbackCommand: constant.CallbackBeforeSetGroupMemberInfoCommand,
OperationID: req.OperationID,
GroupID: req.GroupID,
UserID: req.UserID,
}
if req.Nickname != nil {
callbackReq.Nickname = req.Nickname.Value
}
if req.FaceURL != nil {
callbackReq.FaceURL = req.FaceURL.Value
}
if req.RoleLevel != nil {
callbackReq.RoleLevel = req.RoleLevel.Value
}
if req.Ex != nil {
callbackReq.Ex = req.Ex.Value
}
resp := &cbApi.CallbackBeforeSetGroupMemberInfoResp{
CommonCallbackResp: &callbackResp,
}
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetGroupMemberInfoCommand, callbackReq, resp, config.Config.Callback.CallbackBeforeSetGroupMemberInfo.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
if !config.Config.Callback.CallbackBeforeSetGroupMemberInfo.CallbackFailedContinue {
callbackResp.ActionCode = constant.ActionForbidden
return callbackResp
} else {
callbackResp.ActionCode = constant.ActionAllow
return callbackResp
}
}
if resp.FaceURL != nil {
req.FaceURL = &wrapperspb.StringValue{Value: *resp.FaceURL}
}
if resp.Nickname != nil {
req.Nickname = &wrapperspb.StringValue{Value: *resp.Nickname}
}
if resp.RoleLevel != nil {
req.RoleLevel = &wrapperspb.Int32Value{Value: *resp.RoleLevel}
}
if resp.Ex != nil {
req.Ex = &wrapperspb.StringValue{Value: *resp.Ex}
}
return callbackResp
}
+161 -7
View File
@@ -30,6 +30,7 @@ import (
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"
"gorm.io/gorm"
)
@@ -364,6 +365,14 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
var resp pbGroup.InviteUserToGroupResp
joinReq := pbGroup.JoinGroupReq{}
for _, v := range req.InvitedUserIDList {
if imdb.IsExistGroupMember(req.GroupID, v) {
log.NewError(req.OperationID, "IsExistGroupMember ", req.GroupID, v)
var resultNode pbGroup.Id2Result
resultNode.Result = -1
resultNode.UserID = v
resp.Id2ResultList = append(resp.Id2ResultList, &resultNode)
continue
}
var groupRequest db.GroupRequest
groupRequest.UserID = v
groupRequest.GroupID = req.GroupID
@@ -451,8 +460,19 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
resp.Id2ResultList = append(resp.Id2ResultList, &resultNode)
}
} else {
okUserIDList = req.InvitedUserIDList
if err := db.DB.AddUserToSuperGroup(req.GroupID, req.InvitedUserIDList); err != nil {
for _, v := range req.InvitedUserIDList {
if imdb.IsExistGroupMember(req.GroupID, v) {
log.NewError(req.OperationID, "IsExistGroupMember ", req.GroupID, v)
var resultNode pbGroup.Id2Result
resultNode.Result = -1
resp.Id2ResultList = append(resp.Id2ResultList, &resultNode)
continue
} else {
okUserIDList = append(okUserIDList, v)
}
}
//okUserIDList = req.InvitedUserIDList
if err := db.DB.AddUserToSuperGroup(req.GroupID, okUserIDList); err != nil {
log.NewError(req.OperationID, "AddUserToSuperGroup failed ", req.GroupID, err)
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil
}
@@ -723,6 +743,33 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
resp.ErrMsg = constant.ErrDB.ErrMsg
return &resp, nil
}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if err := rocksCache.DelGroupMemberIDListFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
reqPb := pbConversation.ModifyConversationFieldReq{Conversation: &pbConversation.Conversation{}}
reqPb.OperationID = req.OperationID
reqPb.UserIDList = okUserIDList
reqPb.FieldType = constant.FieldUnread
reqPb.Conversation.GroupID = req.GroupID
reqPb.Conversation.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.SuperGroupChatType)
reqPb.Conversation.ConversationType = int32(constant.SuperGroupChatType)
reqPb.Conversation.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
log.NewError(req.OperationID, errMsg)
}
client := pbConversation.NewConversationClient(etcdConn)
respPb, err := client.ModifyConversationField(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ModifyConversationField rpc failed, ", reqPb.String(), err.Error())
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "ModifyConversationField success", respPb.String())
}
}
if groupInfo.GroupType != constant.SuperGroup {
@@ -756,7 +803,15 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
var resp pbGroup.GetGroupMembersInfoResp
resp.MemberList = []*open_im_sdk.GroupMemberFullInfo{}
for _, userID := range req.MemberList {
groupMember, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, userID)
var (
groupMember *db.GroupMember
err error
)
if req.NoCache {
groupMember, err = imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, userID)
} else {
groupMember, err = rocksCache.GetGroupMemberInfoFromCache(req.GroupID, userID)
}
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, userID, err.Error())
continue
@@ -861,6 +916,10 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G
log.NewError(req.OperationID, "GroupApplicationResponse failed ", err.Error(), req.FromUserID)
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if imdb.IsExistGroupMember(req.GroupID, req.FromUserID) {
log.NewInfo(req.OperationID, "GroupApplicationResponse user in group", req.GroupID, req.FromUserID)
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{}}, nil
}
member := db.GroupMember{}
member.GroupID = req.GroupID
member.UserID = req.FromUserID
@@ -976,12 +1035,15 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G
func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq) (*pbGroup.JoinGroupResp, error) {
log.NewInfo(req.OperationID, "JoinGroup args ", req.String())
if imdb.IsExistGroupMember(req.GroupID, req.OpUserID) {
log.NewInfo(req.OperationID, "IsExistGroupMember", req.GroupID, req.OpUserID)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{}}, nil
}
_, err := imdb.GetUserByUserID(req.OpUserID)
if err != nil {
log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), req.OpUserID)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.GroupID, err)
@@ -1274,7 +1336,8 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf
}
log.NewInfo(req.OperationID, "SetGroupInfo rpc return ", pbGroup.SetGroupInfoResp{CommonResp: &pbGroup.CommonResp{}})
if changedType != 0 {
chat.GroupInfoSetNotification(req.OperationID, req.OpUserID, req.GroupInfoForSet.GroupID, groupName, notification, introduction, faceURL, req.GroupInfoForSet.NeedVerification)
chat.GroupInfoSetNotification(req.OperationID, req.OpUserID, req.GroupInfoForSet.GroupID, groupName, notification,
introduction, faceURL, req.GroupInfoForSet.NeedVerification, req.GroupInfoForSet.ApplyMemberFriend, req.GroupInfoForSet.LookMemberInfo)
}
if req.GroupInfoForSet.Notification != "" {
//get group member user id
@@ -1765,11 +1828,35 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S
log.Error(req.OperationID, errMsg)
return &pbGroup.SetGroupMemberNicknameResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}}, nil
}
cbReq := &pbGroup.SetGroupMemberInfoReq{
GroupID: req.GroupID,
UserID: req.UserID,
OperationID: req.OperationID,
OpUserID: req.OpUserID,
Nickname: &wrapperspb.StringValue{Value: req.Nickname},
}
callbackResp := CallbackBeforeSetGroupMemberInfo(cbReq)
if callbackResp.ErrCode != 0 {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup result", "end rpc and return", callbackResp)
return &pbGroup.SetGroupMemberNicknameResp{
CommonResp: &pbGroup.CommonResp{
ErrCode: int32(callbackResp.ErrCode),
ErrMsg: callbackResp.ErrMsg,
},
}, nil
}
nickName := cbReq.Nickname.Value
groupMemberInfo := db.GroupMember{}
groupMemberInfo.UserID = req.UserID
groupMemberInfo.GroupID = req.GroupID
if req.Nickname == "" {
if nickName == "" {
userNickname, err := imdb.GetUserNameByUserID(groupMemberInfo.UserID)
if err != nil {
errMsg := req.OperationID + " GetUserNameByUserID failed " + err.Error()
@@ -1778,7 +1865,7 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S
}
groupMemberInfo.Nickname = userNickname
} else {
groupMemberInfo.Nickname = req.Nickname
groupMemberInfo.Nickname = nickName
}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil {
@@ -1805,6 +1892,23 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
callbackResp := CallbackBeforeSetGroupMemberInfo(req)
if callbackResp.ErrCode != 0 {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup resp: ", callbackResp)
}
if callbackResp.ActionCode != constant.ActionAllow {
if callbackResp.ErrCode == 0 {
callbackResp.ErrCode = 201
}
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CallbackBeforeMemberJoinGroup result", "end rpc and return", callbackResp)
return &pbGroup.SetGroupMemberInfoResp{
CommonResp: &pbGroup.CommonResp{
ErrCode: int32(callbackResp.ErrCode),
ErrMsg: callbackResp.ErrMsg,
},
}, nil
}
groupMember := db.GroupMember{
GroupID: req.GroupID,
UserID: req.UserID,
@@ -1913,3 +2017,53 @@ func (s *groupServer) DelGroupAndUserCache(operationID, groupID string, userIDLi
}
return nil
}
func (s *groupServer) GroupIsExist(c context.Context, req *pbGroup.GroupIsExistReq) (*pbGroup.GroupIsExistResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbGroup.GroupIsExistResp{CommonResp: &pbGroup.CommonResp{}}
groups, err := imdb.GetGroupInfoByGroupIDList(req.GroupIDList)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), "args:", req.GroupIDList)
resp.CommonResp.ErrMsg = err.Error()
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
return resp, nil
}
var m = make(map[string]bool)
for _, groupID := range req.GroupIDList {
m[groupID] = false
for _, group := range groups {
if groupID == group.GroupID {
m[groupID] = true
break
}
}
}
resp.IsExistMap = m
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", req.String())
return resp, nil
}
func (s *groupServer) UserIsInGroup(c context.Context, req *pbGroup.UserIsInGroupReq) (*pbGroup.UserIsInGroupResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbGroup.UserIsInGroupResp{}
groupMemberList, err := imdb.GetGroupMemberByUserIDList(req.GroupID, req.UserIDList)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), "args:", req.GroupID, req.UserIDList)
resp.CommonResp.ErrMsg = err.Error()
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
return resp, nil
}
var m = make(map[string]bool)
for _, userID := range req.UserIDList {
m[userID] = false
for _, user := range groupMemberList {
if userID == user.UserID {
m[userID] = true
break
}
}
}
resp.IsExistMap = m
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", req.String())
return resp, nil
}
-6
View File
@@ -221,9 +221,3 @@ func callbackMsgModify(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content))
return callbackResp
}
func CallbackBeforeExtendMsgModify() cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: ""}
return callbackResp
}
+5 -5
View File
@@ -5,8 +5,8 @@ import (
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/proto/msg"
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"time"
@@ -39,18 +39,18 @@ func (rpc *rpcChat) DelSuperGroupMsg(_ context.Context, req *msg.DelSuperGroupMs
resp := &msg.DelSuperGroupMsgResp{}
groupMaxSeq, err := db.DB.GetGroupMaxSeq(req.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupMaxSeq false ", req.OpUserID, req.UserID,req.GroupID)
log.NewError(req.OperationID, "GetGroupMaxSeq false ", req.OpUserID, req.UserID, req.GroupID)
resp.ErrCode = constant.ErrDB.ErrCode
resp.ErrMsg = err.Error()
return resp, nil
}
err = db.DB.SetGroupUserMinSeq(req.GroupID,req.UserID, groupMaxSeq)
err = db.DB.SetGroupUserMinSeq(req.GroupID, req.UserID, groupMaxSeq+1)
if err != nil {
log.NewError(req.OperationID, "SetGroupUserMinSeq false ", req.OpUserID, req.UserID,req.GroupID)
log.NewError(req.OperationID, "SetGroupUserMinSeq false ", req.OpUserID, req.UserID, req.GroupID)
resp.ErrCode = constant.ErrDB.ErrCode
resp.ErrMsg = err.Error()
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
}
+476
View File
@@ -1 +1,477 @@
package msg
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
go_redis "github.com/go-redis/redis/v8"
"time"
)
func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.SetMessageReactionExtensionsResp
rResp.ClientMsgID = req.ClientMsgID
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
callbackResp := callbackSetMessageReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
rResp.ErrCode = int32(callbackResp.ErrCode)
rResp.ErrMsg = callbackResp.ErrMsg
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = callbackResp.ErrMsg
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
//if ExternalExtension
if req.IsExternalExtensions {
var isHistory bool
if req.IsReact {
isHistory = false
} else {
isHistory = true
}
rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime
rResp.Result = callbackResp.ResultReactionExtensionList
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false)
return &rResp, nil
}
for _, v := range callbackResp.ResultReactionExtensionList {
if v.ErrCode == 0 {
req.ReactionExtensionList[v.KeyValue.TypeKey] = v.KeyValue
} else {
delete(req.ReactionExtensionList, v.KeyValue.TypeKey)
rResp.Result = append(rResp.Result, v)
}
}
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
if !isExists {
if !req.IsReact {
log.Debug(req.OperationID, "redis handle firstly", req.String())
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
}
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil {
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v)
continue
}
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
}
rResp.IsReact = true
_, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
if err != nil {
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
}
} else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil {
rResp.ErrCode = 200
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
setValue := make(map[string]*server_api_params.KeyValue)
for k, v := range req.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
utils.CopyStructFields(temp, &vv)
if v.LatestUpdateTime != vv.LatestUpdateTime {
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
continue
}
}
temp.TypeKey = k
temp.Value = v.Value
temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
setValue[k] = temp
}
err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
if err != nil {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
} else {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
rResp.Result = append(rResp.Result, temp)
}
}
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
}
}
} else {
log.Debug(req.OperationID, "redis handle secondly", req.String())
for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
}
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
if err != nil && err != go_redis.Nil {
setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
continue
}
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp)
if v.LatestUpdateTime != temp.LatestUpdateTime {
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
continue
} else {
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil {
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
continue
}
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
}
}
}
if !isExists {
if !req.IsReact {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true)
} else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
}
} else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true)
}
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
temp := new(msg.KeyValueResp)
temp.KeyValue = keyValue
temp.ErrCode = errCode
temp.ErrMsg = errMsg
r.Result = append(r.Result, temp)
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}
func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
temp := new(msg.KeyValueResp)
temp.KeyValue = keyValue
temp.ErrCode = errCode
temp.ErrMsg = errMsg
r.Result = append(r.Result, temp)
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}
func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.GetMessageListReactionExtensionsResp
if req.IsExternalExtensions {
callbackResp := callbackGetMessageListReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
rResp.ErrCode = int32(callbackResp.ErrCode)
rResp.ErrMsg = callbackResp.ErrMsg
return &rResp, nil
} else {
rResp.SingleMessageResult = callbackResp.MessageResultList
return &rResp, nil
}
}
for _, messageValue := range req.MessageReactionKeyList {
var oneMessage msg.SingleMessageExtensionResult
oneMessage.ClientMsgID = messageValue.ClientMsgID
isExists, err := db.DB.JudgeMessageReactionEXISTS(messageValue.ClientMsgID, req.SessionType)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
return &rResp, nil
}
if isExists {
redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType)
if err != nil {
oneMessage.ErrCode = 100
oneMessage.ErrMsg = err.Error()
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
continue
}
keyMap := make(map[string]*server_api_params.KeyValue)
for k, v := range redisValue {
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(v, temp)
keyMap[k] = temp
}
oneMessage.ReactionExtensionList = keyMap
} else {
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
if err != nil {
oneMessage.ErrCode = 100
oneMessage.ErrMsg = err.Error()
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
continue
}
keyMap := make(map[string]*server_api_params.KeyValue)
for k, v := range mongoValue.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
temp.TypeKey = v.TypeKey
temp.Value = v.Value
temp.LatestUpdateTime = v.LatestUpdateTime
keyMap[k] = temp
}
oneMessage.ReactionExtensionList = keyMap
}
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
}
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.AddMessageReactionExtensionsReq) (resp *msg.AddMessageReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.AddMessageReactionExtensionsResp
rResp.ClientMsgID = req.ClientMsgID
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
callbackResp := callbackAddMessageReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
rResp.ErrCode = int32(callbackResp.ErrCode)
rResp.ErrMsg = callbackResp.ErrMsg
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = callbackResp.ErrMsg
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
//if !req.IsExternalExtensions {
// rResp.ErrCode = 200
// rResp.ErrMsg = "only extenalextensions message can be used"
// for _, value := range req.ReactionExtensionList {
// temp := new(msg.KeyValueResp)
// temp.KeyValue = value
// temp.ErrMsg = callbackResp.ErrMsg
// temp.ErrCode = 100
// rResp.Result = append(rResp.Result, temp)
// }
// return &rResp, nil
//}
//if ExternalExtension
var isHistory bool
if req.IsReact {
isHistory = false
} else {
isHistory = true
}
rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime
rResp.Result = callbackResp.ResultReactionExtensionList
rResp.IsReact = callbackResp.IsReact
ExtendMessageAddedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false)
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", resp.String())
return &rResp, nil
}
func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.DeleteMessageListReactionExtensionsResp
callbackResp := callbackDeleteMessageReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
rResp.ErrCode = int32(callbackResp.ErrCode)
rResp.ErrMsg = callbackResp.ErrMsg
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = callbackResp.ErrMsg
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
//if ExternalExtension
if req.IsExternalExtensions {
rResp.Result = callbackResp.ResultReactionExtensionList
ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
return &rResp, nil
}
for _, v := range callbackResp.ResultReactionExtensionList {
if v.ErrCode != 0 {
func(req *[]*server_api_params.KeyValue, typeKey string) {
for i := 0; i < len(*req); i++ {
if (*req)[i].TypeKey == typeKey {
*req = append((*req)[:i], (*req)[i+1:]...)
}
}
}(&req.ReactionExtensionList, v.KeyValue.TypeKey)
rResp.Result = append(rResp.Result, v)
}
}
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
if isExists {
log.Debug(req.OperationID, "redis handle this delete", req.String())
for _, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
if err != nil {
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
continue
}
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
if err != nil && err != go_redis.Nil {
setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
continue
}
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp)
if v.LatestUpdateTime != temp.LatestUpdateTime {
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
continue
} else {
newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
if newErr != nil {
setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
continue
}
setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
}
}
} else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil {
rResp.ErrCode = 200
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
setValue := make(map[string]*server_api_params.KeyValue)
for _, v := range req.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok {
utils.CopyStructFields(temp, &vv)
if v.LatestUpdateTime != vv.LatestUpdateTime {
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
continue
}
} else {
setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
continue
}
temp.TypeKey = v.TypeKey
setValue[v.TypeKey] = temp
}
err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
if err != nil {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
} else {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
rResp.Result = append(rResp.Result, temp)
}
}
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
}
}
ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
+131
View File
@@ -0,0 +1,131 @@
package msg
import (
"Open_IM/pkg/base_info"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/msg"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"strings"
)
func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageModifierNotification
m.SourceID = req.SourceID
m.OpUserID = req.OpUserID
m.Operation = constant.SetMessageExtensions
m.SessionType = req.SessionType
keyMap := make(map[string]*open_im_sdk.KeyValue)
for _, valueResp := range resp.Result {
if valueResp.ErrCode == 0 {
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
}
}
if len(keyMap) == 0 {
log.NewWarn(operationID, "all key set failed can not send notification", *req)
return
}
m.SuccessReactionExtensionList = keyMap
m.ClientMsgID = req.ClientMsgID
m.IsReact = resp.IsReact
m.IsExternalExtensions = req.IsExternalExtensions
m.MsgFirstModifyTime = resp.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache)
}
func ExtendMessageAddedNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.AddMessageReactionExtensionsReq, resp *msg.AddMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageModifierNotification
m.SourceID = req.SourceID
m.OpUserID = req.OpUserID
m.Operation = constant.AddMessageExtensions
m.SessionType = req.SessionType
keyMap := make(map[string]*open_im_sdk.KeyValue)
for _, valueResp := range resp.Result {
if valueResp.ErrCode == 0 {
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
}
}
if len(keyMap) == 0 {
log.NewWarn(operationID, "all key set failed can not send notification", *req)
return
}
m.SuccessReactionExtensionList = keyMap
m.ClientMsgID = req.ClientMsgID
m.IsReact = resp.IsReact
m.IsExternalExtensions = req.IsExternalExtensions
m.MsgFirstModifyTime = resp.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache)
}
func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageDeleteNotification
m.SourceID = req.SourceID
m.OpUserID = req.OpUserID
m.SessionType = req.SessionType
keyMap := make(map[string]*open_im_sdk.KeyValue)
for _, valueResp := range resp.Result {
if valueResp.ErrCode == 0 {
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
}
}
if len(keyMap) == 0 {
log.NewWarn(operationID, "all key set failed can not send notification", *req)
return
}
m.SuccessReactionExtensionList = keyMap
m.ClientMsgID = req.ClientMsgID
m.MsgFirstModifyTime = req.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory, isReactionFromCache)
}
func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool, isReactionFromCache bool) {
options := make(map[string]bool, 5)
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false)
utils.SetSwitchFromOptions(options, constant.IsSenderConversationUpdate, false)
utils.SetSwitchFromOptions(options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(options, constant.IsReactionFromCache, isReactionFromCache)
if !isHistory {
utils.SetSwitchFromOptions(options, constant.IsHistory, false)
utils.SetSwitchFromOptions(options, constant.IsPersistent, false)
}
pbData := msg.SendMsgReq{
OperationID: operationID,
MsgData: &open_im_sdk.MsgData{
SendID: sendID,
ClientMsgID: utils.GetMsgID(sendID),
SessionType: sessionType,
MsgFrom: constant.SysMsgType,
ContentType: contentType,
Content: []byte(content),
// ForceList: params.ForceList,
CreateTime: utils.GetCurrentTimestampByMill(),
Options: options,
},
}
switch sessionType {
case constant.SingleChatType, constant.NotificationChatType:
pbData.MsgData.RecvID = sourceID
case constant.GroupChatType, constant.SuperGroupChatType:
pbData.MsgData.GroupID = sourceID
}
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, operationID)
if etcdConn == nil {
errMsg := operationID + "getcdv3.GetDefaultConn == nil"
log.NewError(operationID, errMsg)
return
}
client := msg.NewMsgClient(etcdConn)
reply, err := client.SendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(operationID, "SendMsg rpc failed, ", pbData.String(), err.Error())
} else if reply.ErrCode != 0 {
log.NewError(operationID, "SendMsg rpc failed, ", pbData.String(), reply.ErrCode, reply.ErrMsg)
}
}
+104
View File
@@ -0,0 +1,104 @@
package msg
import (
cbApi "Open_IM/pkg/call_back_struct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
http2 "net/http"
)
func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cbApi.CallbackBeforeSetMessageReactionExtResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
req := cbApi.CallbackBeforeSetMessageReactionExtReq{
OperationID: setReq.OperationID,
CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand,
SourceID: setReq.SourceID,
OpUserID: setReq.OpUserID,
SessionType: setReq.SessionType,
ReactionExtensionList: setReq.ReactionExtensionList,
ClientMsgID: setReq.ClientMsgID,
IsReact: setReq.IsReact,
IsExternalExtensions: setReq.IsExternalExtensions,
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbApi.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetMessageReactionExtensionCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return resp
}
func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cbApi.CallbackDeleteMessageReactionExtResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
req := cbApi.CallbackDeleteMessageReactionExtReq{
OperationID: setReq.OperationID,
CallbackCommand: constant.CallbackBeforeDeleteMessageReactionExtensionsCommand,
SourceID: setReq.SourceID,
OpUserID: setReq.OpUserID,
SessionType: setReq.SessionType,
ReactionExtensionList: setReq.ReactionExtensionList,
ClientMsgID: setReq.ClientMsgID,
IsExternalExtensions: setReq.IsExternalExtensions,
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbApi.CallbackDeleteMessageReactionExtResp{CommonCallbackResp: &callbackResp}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeDeleteMessageReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return resp
}
func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cbApi.CallbackGetMessageListReactionExtResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: getReq.OperationID}
log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), getReq.String())
req := cbApi.CallbackGetMessageListReactionExtReq{
OperationID: getReq.OperationID,
CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
SourceID: getReq.SourceID,
OpUserID: getReq.OpUserID,
SessionType: getReq.SessionType,
TypeKeyList: getReq.TypeKeyList,
MessageKeyList: getReq.MessageReactionKeyList,
}
resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp}
defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return resp
}
func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cbApi.CallbackAddMessageReactionExtResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
req := cbApi.CallbackAddMessageReactionExtReq{
OperationID: setReq.OperationID,
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
SourceID: setReq.SourceID,
OpUserID: setReq.OpUserID,
SessionType: setReq.SessionType,
ReactionExtensionList: setReq.ReactionExtensionList,
ClientMsgID: setReq.ClientMsgID,
IsReact: setReq.IsReact,
IsExternalExtensions: setReq.IsExternalExtensions,
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbApi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp, *resp.CommonCallbackResp, resp.IsReact, resp.MsgFirstModifyTime)
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return resp
}
+35 -1
View File
@@ -245,12 +245,15 @@ func GroupCreatedNotification(operationID, opUserID, groupID string, initMemberL
// notification := ""
// introduction := ""
// faceURL := ""
func GroupInfoSetNotification(operationID, opUserID, groupID string, groupName, notification, introduction, faceURL string, needVerification *wrapperspb.Int32Value) {
func GroupInfoSetNotification(operationID, opUserID, groupID string, groupName, notification, introduction, faceURL string, needVerification, applyMemberFriend, lookMemberInfo *wrapperspb.Int32Value) {
GroupInfoChangedTips := open_im_sdk.GroupInfoSetTips{Group: &open_im_sdk.GroupInfo{}, OpUser: &open_im_sdk.GroupMemberFullInfo{}}
if err := setGroupInfo(groupID, GroupInfoChangedTips.Group); err != nil {
log.Error(operationID, "setGroupInfo failed ", err.Error(), groupID)
return
}
GroupInfoChangedTips.Group.NeedVerification = 0
GroupInfoChangedTips.Group.LookMemberInfo = 0
GroupInfoChangedTips.Group.ApplyMemberFriend = 0
GroupInfoChangedTips.Group.GroupName = groupName
GroupInfoChangedTips.Group.Notification = notification
GroupInfoChangedTips.Group.Introduction = introduction
@@ -258,6 +261,12 @@ func GroupInfoSetNotification(operationID, opUserID, groupID string, groupName,
if needVerification != nil {
GroupInfoChangedTips.Group.NeedVerification = needVerification.Value
}
if applyMemberFriend != nil {
GroupInfoChangedTips.Group.ApplyMemberFriend = applyMemberFriend.Value
}
if lookMemberInfo != nil {
GroupInfoChangedTips.Group.LookMemberInfo = lookMemberInfo.Value
}
if err := setOpUserInfo(opUserID, groupID, GroupInfoChangedTips.OpUser); err != nil {
log.Error(operationID, "setOpUserInfo failed ", err.Error(), opUserID, groupID)
@@ -435,7 +444,20 @@ func GroupApplicationAcceptedNotification(req *pbGroup.GroupApplicationResponseR
log.Error(req.OperationID, "setOpUserInfo failed", req.OpUserID, req.GroupID, GroupApplicationAcceptedTips.OpUser)
return
}
groupNotification(constant.GroupApplicationAcceptedNotification, &GroupApplicationAcceptedTips, req.OpUserID, "", req.FromUserID, req.OperationID)
adminList, err := imdb.GetOwnerManagerByGroupID(req.GroupID)
if err != nil {
log.Error(req.OperationID, "GetOwnerManagerByGroupID failed", req.GroupID)
return
}
for _, v := range adminList {
if v.UserID == req.OpUserID {
continue
}
GroupApplicationAcceptedTips.ReceiverAs = 1
groupNotification(constant.GroupApplicationAcceptedNotification, &GroupApplicationAcceptedTips, req.OpUserID, "", v.UserID, req.OperationID)
}
}
func GroupApplicationRejectedNotification(req *pbGroup.GroupApplicationResponseReq) {
@@ -449,6 +471,18 @@ func GroupApplicationRejectedNotification(req *pbGroup.GroupApplicationResponseR
return
}
groupNotification(constant.GroupApplicationRejectedNotification, &GroupApplicationRejectedTips, req.OpUserID, "", req.FromUserID, req.OperationID)
adminList, err := imdb.GetOwnerManagerByGroupID(req.GroupID)
if err != nil {
log.Error(req.OperationID, "GetOwnerManagerByGroupID failed", req.GroupID)
return
}
for _, v := range adminList {
if v.UserID == req.OpUserID {
continue
}
GroupApplicationRejectedTips.ReceiverAs = 1
groupNotification(constant.GroupApplicationRejectedNotification, &GroupApplicationRejectedTips, req.OpUserID, "", v.UserID, req.OperationID)
}
}
func GroupOwnerTransferredNotification(req *pbGroup.TransferGroupOwnerReq) {
+52
View File
@@ -0,0 +1,52 @@
package msg
import (
"Open_IM/pkg/common/db"
"time"
)
const GlOBLLOCK = "GLOBAL_LOCK"
type MessageLocker interface {
LockMessageTypeKey(clientMsgID, typeKey string) (err error)
UnLockMessageTypeKey(clientMsgID string, typeKey string) error
LockGlobalMessage(clientMsgID string) (err error)
UnLockGlobalMessage(clientMsgID string) (err error)
}
type LockerMessage struct{}
func NewLockerMessage() *LockerMessage {
return &LockerMessage{}
}
func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, typeKey)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
}
func (l *LockerMessage) LockGlobalMessage(clientMsgID string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, GlOBLLOCK)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
}
func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}
func (l *LockerMessage) UnLockGlobalMessage(clientMsgID string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, GlOBLLOCK)
}
+6 -9
View File
@@ -31,7 +31,8 @@ type rpcChat struct {
etcdAddr []string
messageWriter MessageWriter
//offlineProducer *kafka.Producer
delMsgCh chan deleteMsg
delMsgCh chan deleteMsg
dMessageLocker MessageLocker
}
type deleteMsg struct {
@@ -48,6 +49,7 @@ func NewRpcChatServer(port int) *rpcChat {
rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
dMessageLocker: NewLockerMessage(),
}
rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
//rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic)
@@ -143,14 +145,9 @@ func (rpc *rpcChat) runCh() {
select {
case msg := <-rpc.delMsgCh:
log.NewInfo(msg.OperationID, utils.GetSelfFuncName(), "delmsgch recv new: ", msg)
db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID)
unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID)
if err != nil {
log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error())
continue
}
if len(unexistSeqList) > 0 {
DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID)
if len(msg.SeqList) > 0 {
db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID)
DeleteMessageNotification(msg.OpUserID, msg.UserID, msg.SeqList, msg.OperationID)
}
}
}
+2 -1
View File
@@ -25,6 +25,7 @@ import (
"time"
promePkg "Open_IM/pkg/common/prometheus"
go_redis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
)
@@ -125,7 +126,7 @@ func (rpc *rpcChat) messageVerification(data *pbChat.SendMsgReq) (bool, int32, s
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
return true, 0, "", nil
}
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin {
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin && data.MsgData.ContentType != constant.SignalingNotification {
return true, 0, "", nil
}
log.NewDebug(data.OperationID, *config.Config.MessageVerify.FriendVerify)
+50 -1
View File
@@ -163,6 +163,22 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc
return resp, nil
}
if v.ConversationType == constant.SuperGroupChatType {
if v.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
if err := db.DB.SetSuperGroupUserReceiveNotNotifyMessage(v.GroupID, v.OwnerUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), v.GroupID, v.OwnerUserID)
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}
return resp, nil
}
} else {
if err := db.DB.SetSuperGroupUserReceiveNotifyMessage(v.GroupID, v.OwnerUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), v.GroupID, err.Error())
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}
return resp, nil
}
}
}
isUpdate, err := imdb.SetConversation(conversation)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
@@ -251,7 +267,7 @@ func (s *userServer) SetConversation(ctx context.Context, req *pbUser.SetConvers
if req.NotificationType == 0 {
req.NotificationType = constant.ConversationOptChangeNotification
}
if req.Conversation.ConversationType == constant.GroupChatType {
if req.Conversation.ConversationType == constant.GroupChatType || req.Conversation.ConversationType == constant.SuperGroupChatType {
groupInfo, err := imdb.GetGroupInfoByGroupID(req.Conversation.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.Conversation.GroupID, err.Error())
@@ -264,7 +280,24 @@ func (s *userServer) SetConversation(ctx context.Context, req *pbUser.SetConvers
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg}
return resp, nil
}
if req.Conversation.ConversationType == constant.SuperGroupChatType {
if req.Conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
if err = db.DB.SetSuperGroupUserReceiveNotNotifyMessage(req.Conversation.GroupID, req.Conversation.OwnerUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), req.Conversation.GroupID, req.Conversation.OwnerUserID)
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}
return resp, nil
}
} else {
if err = db.DB.SetSuperGroupUserReceiveNotifyMessage(req.Conversation.GroupID, req.Conversation.OwnerUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), req.Conversation.GroupID, err.Error())
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}
return resp, nil
}
}
}
}
var conversation db.Conversation
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CopyStructFields failed", *req.Conversation, err.Error())
@@ -326,6 +359,21 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp
case "group":
conversation.GroupID = stringList[1]
conversation.ConversationType = constant.GroupChatType
case "super_group":
conversation.GroupID = stringList[1]
if req.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
if err := db.DB.SetSuperGroupUserReceiveNotNotifyMessage(conversation.GroupID, req.OwnerUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), conversation.GroupID, req.OwnerUserID)
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
} else {
if err := db.DB.SetSuperGroupUserReceiveNotifyMessage(conversation.GroupID, req.OwnerUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error(), conversation.GroupID, req.OwnerUserID)
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
}
}
}
isUpdate, err := imdb.SetRecvMsgOpt(conversation)
@@ -334,6 +382,7 @@ func (s *userServer) SetRecvMsgOpt(ctx context.Context, req *pbUser.SetRecvMsgOp
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
if isUpdate {
err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID)
} else {