mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-10 20:15:59 +08:00
Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release
This commit is contained in:
@@ -17,8 +17,8 @@ import (
|
||||
|
||||
func SetMessageReactionExtensions(c *gin.Context) {
|
||||
var (
|
||||
req api.SetMessageReactionExtensionsCallbackReq
|
||||
resp api.SetMessageReactionExtensionsCallbackResp
|
||||
req api.SetMessageReactionExtensionsReq
|
||||
resp api.SetMessageReactionExtensionsResp
|
||||
reqPb rpc.SetMessageReactionExtensionsReq
|
||||
)
|
||||
|
||||
@@ -115,8 +115,9 @@ func AddMessageReactionExtensions(c *gin.Context) {
|
||||
var (
|
||||
req api.AddMessageReactionExtensionsReq
|
||||
resp api.AddMessageReactionExtensionsResp
|
||||
reqPb rpc.ModifyMessageReactionExtensionsReq
|
||||
reqPb rpc.AddMessageReactionExtensionsReq
|
||||
)
|
||||
|
||||
if err := c.BindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
@@ -152,6 +153,9 @@ func AddMessageReactionExtensions(c *gin.Context) {
|
||||
}
|
||||
resp.ErrCode = respPb.ErrCode
|
||||
resp.ErrMsg = respPb.ErrMsg
|
||||
resp.Data.ResultKeyValue = respPb.Result
|
||||
resp.Data.MsgFirstModifyTime = reqPb.MsgFirstModifyTime
|
||||
resp.Data.IsReact = reqPb.IsReact
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
|
||||
@@ -743,6 +743,27 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
|
||||
resp.ErrMsg = constant.ErrDB.ErrMsg
|
||||
return &resp, nil
|
||||
}
|
||||
reqPb := pbConversation.ModifyConversationFieldReq{Conversation: &pbConversation.Conversation{}}
|
||||
reqPb.OperationID = req.OperationID
|
||||
reqPb.UserIDList = okUserIDList
|
||||
reqPb.FieldType = constant.FieldUnread
|
||||
reqPb.Conversation.GroupID = req.GroupID
|
||||
reqPb.Conversation.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.SuperGroupChatType)
|
||||
reqPb.Conversation.ConversationType = int32(constant.SuperGroupChatType)
|
||||
reqPb.Conversation.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
|
||||
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImConversationName, req.OperationID)
|
||||
if etcdConn == nil {
|
||||
errMsg := req.OperationID + "getcdv3.GetDefaultConn == nil"
|
||||
log.NewError(req.OperationID, errMsg)
|
||||
}
|
||||
client := pbConversation.NewConversationClient(etcdConn)
|
||||
respPb, err := client.ModifyConversationField(context.Background(), &reqPb)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ModifyConversationField rpc failed, ", reqPb.String(), err.Error())
|
||||
} else {
|
||||
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "ModifyConversationField success", respPb.String())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if groupInfo.GroupType != constant.SuperGroup {
|
||||
@@ -776,7 +797,15 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
|
||||
var resp pbGroup.GetGroupMembersInfoResp
|
||||
resp.MemberList = []*open_im_sdk.GroupMemberFullInfo{}
|
||||
for _, userID := range req.MemberList {
|
||||
groupMember, err := rocksCache.GetGroupMemberInfoFromCache(req.GroupID, userID)
|
||||
var (
|
||||
groupMember *db.GroupMember
|
||||
err error
|
||||
)
|
||||
if req.NoCache {
|
||||
groupMember, err = imdb.GetGroupMemberInfoByGroupIDAndUserID(req.GroupID, userID)
|
||||
} else {
|
||||
groupMember, err = rocksCache.GetGroupMemberInfoFromCache(req.GroupID, userID)
|
||||
}
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, userID, err.Error())
|
||||
continue
|
||||
|
||||
@@ -282,8 +282,47 @@ func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *m
|
||||
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
|
||||
return
|
||||
func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.AddMessageReactionExtensionsReq) (resp *msg.AddMessageReactionExtensionsResp, err error) {
|
||||
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
|
||||
var rResp msg.AddMessageReactionExtensionsResp
|
||||
rResp.ClientMsgID = req.ClientMsgID
|
||||
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
|
||||
callbackResp := callbackAddMessageReactionExtensions(req)
|
||||
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
|
||||
rResp.ErrCode = int32(callbackResp.ErrCode)
|
||||
rResp.ErrMsg = callbackResp.ErrMsg
|
||||
for _, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = callbackResp.ErrMsg
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
return &rResp, nil
|
||||
}
|
||||
if !req.IsExternalExtensions {
|
||||
rResp.ErrCode = 200
|
||||
rResp.ErrMsg = "only extenalextensions message can be used"
|
||||
for _, value := range req.ReactionExtensionList {
|
||||
temp := new(msg.KeyValueResp)
|
||||
temp.KeyValue = value
|
||||
temp.ErrMsg = callbackResp.ErrMsg
|
||||
temp.ErrCode = 100
|
||||
rResp.Result = append(rResp.Result, temp)
|
||||
}
|
||||
return &rResp, nil
|
||||
}
|
||||
//if ExternalExtension
|
||||
var isHistory bool
|
||||
if req.IsReact {
|
||||
isHistory = false
|
||||
} else {
|
||||
isHistory = true
|
||||
}
|
||||
rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime
|
||||
rResp.Result = callbackResp.ResultReactionExtensionList
|
||||
ExtendMessageAddedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false)
|
||||
return &rResp, nil
|
||||
}
|
||||
|
||||
func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
|
||||
|
||||
@@ -18,6 +18,31 @@ func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID strin
|
||||
var m base_info.ReactionMessageModifierNotification
|
||||
m.SourceID = req.SourceID
|
||||
m.OpUserID = req.OpUserID
|
||||
m.Operation = constant.SetMessageExtensions
|
||||
m.SessionType = req.SessionType
|
||||
keyMap := make(map[string]*open_im_sdk.KeyValue)
|
||||
for _, valueResp := range resp.Result {
|
||||
if valueResp.ErrCode == 0 {
|
||||
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
|
||||
}
|
||||
}
|
||||
if len(keyMap) == 0 {
|
||||
log.NewWarn(operationID, "all key set failed can not send notification", *req)
|
||||
return
|
||||
}
|
||||
m.SuccessReactionExtensionList = keyMap
|
||||
m.ClientMsgID = req.ClientMsgID
|
||||
m.IsReact = resp.IsReact
|
||||
m.IsExternalExtensions = req.IsExternalExtensions
|
||||
m.MsgFirstModifyTime = resp.MsgFirstModifyTime
|
||||
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache)
|
||||
}
|
||||
func ExtendMessageAddedNotification(operationID, sendID string, sourceID string, sessionType int32,
|
||||
req *msg.AddMessageReactionExtensionsReq, resp *msg.AddMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
|
||||
var m base_info.ReactionMessageModifierNotification
|
||||
m.SourceID = req.SourceID
|
||||
m.OpUserID = req.OpUserID
|
||||
m.Operation = constant.AddMessageExtensions
|
||||
m.SessionType = req.SessionType
|
||||
keyMap := make(map[string]*open_im_sdk.KeyValue)
|
||||
for _, valueResp := range resp.Result {
|
||||
|
||||
@@ -58,22 +58,47 @@ func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReacti
|
||||
}
|
||||
return resp
|
||||
}
|
||||
func callbackGetMessageListReactionExtensions(setReq *msg.GetMessageListReactionExtensionsReq) *cbApi.CallbackGetMessageListReactionExtResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
|
||||
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
||||
func callbackGetMessageListReactionExtensions(getReq *msg.GetMessageListReactionExtensionsReq) *cbApi.CallbackGetMessageListReactionExtResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: getReq.OperationID}
|
||||
log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), getReq.String())
|
||||
req := cbApi.CallbackGetMessageListReactionExtReq{
|
||||
OperationID: setReq.OperationID,
|
||||
OperationID: getReq.OperationID,
|
||||
CallbackCommand: constant.CallbackGetMessageListReactionExtensionsCommand,
|
||||
SourceID: setReq.SourceID,
|
||||
OpUserID: setReq.OpUserID,
|
||||
SessionType: setReq.SessionType,
|
||||
MessageKeyList: setReq.MessageReactionKeyList,
|
||||
SourceID: getReq.SourceID,
|
||||
OpUserID: getReq.OpUserID,
|
||||
SessionType: getReq.SessionType,
|
||||
TypeKeyList: getReq.TypeKeyList,
|
||||
MessageKeyList: getReq.MessageReactionKeyList,
|
||||
}
|
||||
resp := &cbApi.CallbackGetMessageListReactionExtResp{CommonCallbackResp: &callbackResp}
|
||||
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||
defer log.NewDebug(getReq.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackGetMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
||||
callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
callbackResp.ErrMsg = err.Error()
|
||||
}
|
||||
return resp
|
||||
}
|
||||
func callbackAddMessageReactionExtensions(setReq *msg.AddMessageReactionExtensionsReq) *cbApi.CallbackAddMessageReactionExtResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
|
||||
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
|
||||
req := cbApi.CallbackAddMessageReactionExtReq{
|
||||
OperationID: setReq.OperationID,
|
||||
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
|
||||
SourceID: setReq.SourceID,
|
||||
OpUserID: setReq.OpUserID,
|
||||
SessionType: setReq.SessionType,
|
||||
ReactionExtensionList: setReq.ReactionExtensionList,
|
||||
ClientMsgID: setReq.ClientMsgID,
|
||||
IsReact: setReq.IsReact,
|
||||
IsExternalExtensions: setReq.IsExternalExtensions,
|
||||
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
|
||||
}
|
||||
resp := &cbApi.CallbackAddMessageReactionExtResp{CommonCallbackResp: &callbackResp}
|
||||
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
|
||||
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackAddMessageListReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
|
||||
callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
callbackResp.ErrMsg = err.Error()
|
||||
}
|
||||
return resp
|
||||
|
||||
}
|
||||
|
||||
@@ -145,14 +145,9 @@ func (rpc *rpcChat) runCh() {
|
||||
select {
|
||||
case msg := <-rpc.delMsgCh:
|
||||
log.NewInfo(msg.OperationID, utils.GetSelfFuncName(), "delmsgch recv new: ", msg)
|
||||
db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID)
|
||||
unexistSeqList, err := db.DB.DelMsgBySeqList(msg.UserID, msg.SeqList, msg.OperationID)
|
||||
if err != nil {
|
||||
log.NewError(msg.OperationID, utils.GetSelfFuncName(), "DelMsgBySeqList args: ", msg.UserID, msg.SeqList, msg.OperationID, err.Error())
|
||||
continue
|
||||
}
|
||||
if len(unexistSeqList) > 0 {
|
||||
DeleteMessageNotification(msg.OpUserID, msg.UserID, unexistSeqList, msg.OperationID)
|
||||
if len(msg.SeqList) > 0 {
|
||||
db.DB.DelMsgFromCache(msg.UserID, msg.SeqList, msg.OperationID)
|
||||
DeleteMessageNotification(msg.OpUserID, msg.UserID, msg.SeqList, msg.OperationID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user