Merge remote-tracking branch 'origin/modify' into v2.3.0release

# Conflicts:
#	config/config.yaml
#	pkg/common/constant/constant.go
#	pkg/proto/sdk_ws/ws.pb.go
#	pkg/proto/sdk_ws/ws.proto
This commit is contained in:
Gordon
2022-12-23 16:16:28 +08:00
24 changed files with 2741 additions and 217 deletions
-6
View File
@@ -221,9 +221,3 @@ func callbackMsgModify(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp {
log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content))
return callbackResp
}
func CallbackBeforeExtendMsgModify() cbApi.CommonCallbackResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: ""}
return callbackResp
}
+423
View File
@@ -1 +1,424 @@
package msg
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
go_redis "github.com/go-redis/redis/v8"
"time"
)
func (rpc *rpcChat) SetMessageReactionExtensions(ctx context.Context, req *msg.SetMessageReactionExtensionsReq) (resp *msg.SetMessageReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.SetMessageReactionExtensionsResp
rResp.ClientMsgID = req.ClientMsgID
rResp.MsgFirstModifyTime = req.MsgFirstModifyTime
callbackResp := callbackSetMessageReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
rResp.ErrCode = int32(callbackResp.ErrCode)
rResp.ErrMsg = callbackResp.ErrMsg
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = callbackResp.ErrMsg
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
//if ExternalExtension
if req.IsExternalExtensions {
var isHistory bool
if req.IsReact {
isHistory = false
} else {
isHistory = true
}
rResp.MsgFirstModifyTime = callbackResp.MsgFirstModifyTime
rResp.Result = callbackResp.ResultReactionExtensionList
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, isHistory, false)
return &rResp, nil
}
for _, v := range callbackResp.ResultReactionExtensionList {
if v.ErrCode == 0 {
req.ReactionExtensionList[v.KeyValue.TypeKey] = v.KeyValue
} else {
delete(req.ReactionExtensionList, v.KeyValue.TypeKey)
rResp.Result = append(rResp.Result, v)
}
}
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
if !isExists {
if !req.IsReact {
log.Debug(req.OperationID, "redis handle firstly", req.String())
rResp.MsgFirstModifyTime = utils.GetCurrentTimestampByMill()
for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
}
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil {
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, v)
continue
}
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
}
rResp.IsReact = true
_, err := db.DB.SetMessageReactionExpire(req.ClientMsgID, req.SessionType, time.Duration(24*3)*time.Hour)
if err != nil {
log.Error(req.OperationID, "SetMessageReactionExpire err:", err.Error(), req.String())
}
} else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil {
rResp.ErrCode = 200
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
setValue := make(map[string]*server_api_params.KeyValue)
for k, v := range req.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
if vv, ok := mongoValue.ReactionExtensionList[k]; ok {
utils.CopyStructFields(temp, &vv)
if v.LatestUpdateTime != vv.LatestUpdateTime {
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
continue
}
}
temp.TypeKey = k
temp.Value = v.Value
temp.LatestUpdateTime = utils.GetCurrentTimestampByMill()
setValue[k] = temp
}
err = db.DB.InsertOrUpdateReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
if err != nil {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
} else {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
rResp.Result = append(rResp.Result, temp)
}
}
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
}
}
} else {
log.Debug(req.OperationID, "redis handle secondly", req.String())
for k, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, k)
if err != nil {
setKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, k, v)
continue
}
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k)
if err != nil && err != go_redis.Nil {
setKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, k, v)
continue
}
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp)
if v.LatestUpdateTime != temp.LatestUpdateTime {
setKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, k, temp)
continue
} else {
v.LatestUpdateTime = utils.GetCurrentTimestampByMill()
newerr := db.DB.SetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, k, utils.StructToJsonString(v))
if newerr != nil {
setKeyResultInfo(&rResp, 201, newerr.Error(), req.ClientMsgID, k, temp)
continue
}
setKeyResultInfo(&rResp, 0, "", req.ClientMsgID, k, v)
}
}
}
if !isExists {
if !req.IsReact {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, true, true)
} else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
}
} else {
ExtendMessageUpdatedNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, true)
}
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
func setKeyResultInfo(r *msg.SetMessageReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
temp := new(msg.KeyValueResp)
temp.KeyValue = keyValue
temp.ErrCode = errCode
temp.ErrMsg = errMsg
r.Result = append(r.Result, temp)
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}
func setDeleteKeyResultInfo(r *msg.DeleteMessageListReactionExtensionsResp, errCode int32, errMsg, clientMsgID, typeKey string, keyValue *server_api_params.KeyValue) {
temp := new(msg.KeyValueResp)
temp.KeyValue = keyValue
temp.ErrCode = errCode
temp.ErrMsg = errMsg
r.Result = append(r.Result, temp)
_ = db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}
func (rpc *rpcChat) GetMessageListReactionExtensions(ctx context.Context, req *msg.GetMessageListReactionExtensionsReq) (resp *msg.GetMessageListReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.GetMessageListReactionExtensionsResp
for _, messageValue := range req.MessageReactionKeyList {
var oneMessage msg.SingleMessageExtensionResult
oneMessage.ClientMsgID = messageValue.ClientMsgID
isExists, err := db.DB.JudgeMessageReactionEXISTS(messageValue.ClientMsgID, req.SessionType)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
return &rResp, nil
}
if isExists {
redisValue, err := db.DB.GetOneMessageAllReactionList(messageValue.ClientMsgID, req.SessionType)
if err != nil {
oneMessage.ErrCode = 100
oneMessage.ErrMsg = err.Error()
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
continue
}
keyMap := make(map[string]*server_api_params.KeyValue)
for k, v := range redisValue {
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(v, temp)
keyMap[k] = temp
}
oneMessage.ReactionExtensionList = keyMap
} else {
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, messageValue.ClientMsgID, messageValue.MsgFirstModifyTime)
if err != nil {
oneMessage.ErrCode = 100
oneMessage.ErrMsg = err.Error()
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
continue
}
keyMap := make(map[string]*server_api_params.KeyValue)
for k, v := range mongoValue.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
temp.TypeKey = v.TypeKey
temp.Value = v.Value
temp.LatestUpdateTime = v.LatestUpdateTime
keyMap[k] = temp
}
oneMessage.ReactionExtensionList = keyMap
}
rResp.SingleMessageResult = append(rResp.SingleMessageResult, &oneMessage)
}
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
func (rpc *rpcChat) AddMessageReactionExtensions(ctx context.Context, req *msg.ModifyMessageReactionExtensionsReq) (resp *msg.ModifyMessageReactionExtensionsResp, err error) {
return
}
func (rpc *rpcChat) DeleteMessageReactionExtensions(ctx context.Context, req *msg.DeleteMessageListReactionExtensionsReq) (resp *msg.DeleteMessageListReactionExtensionsResp, err error) {
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc args is:", req.String())
var rResp msg.DeleteMessageListReactionExtensionsResp
callbackResp := callbackDeleteMessageReactionExtensions(req)
if callbackResp.ActionCode != constant.ActionAllow || callbackResp.ErrCode != 0 {
rResp.ErrCode = int32(callbackResp.ErrCode)
rResp.ErrMsg = callbackResp.ErrMsg
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = callbackResp.ErrMsg
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
//if ExternalExtension
if req.IsExternalExtensions {
rResp.Result = callbackResp.ResultReactionExtensionList
ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, false)
return &rResp, nil
}
for _, v := range callbackResp.ResultReactionExtensionList {
if v.ErrCode != 0 {
func(req *[]*server_api_params.KeyValue, typeKey string) {
for i := 0; i < len(*req); i++ {
if (*req)[i].TypeKey == typeKey {
*req = append((*req)[:i], (*req)[i+1:]...)
}
}
}(&req.ReactionExtensionList, v.KeyValue.TypeKey)
rResp.Result = append(rResp.Result, v)
}
}
isExists, err := db.DB.JudgeMessageReactionEXISTS(req.ClientMsgID, req.SessionType)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
if isExists {
log.Debug(req.OperationID, "redis handle this delete", req.String())
for _, v := range req.ReactionExtensionList {
err := rpc.dMessageLocker.LockMessageTypeKey(req.ClientMsgID, v.TypeKey)
if err != nil {
setDeleteKeyResultInfo(&rResp, 100, err.Error(), req.ClientMsgID, v.TypeKey, v)
continue
}
redisValue, err := db.DB.GetMessageTypeKeyValue(req.ClientMsgID, req.SessionType, v.TypeKey)
if err != nil && err != go_redis.Nil {
setDeleteKeyResultInfo(&rResp, 200, err.Error(), req.ClientMsgID, v.TypeKey, v)
continue
}
temp := new(server_api_params.KeyValue)
utils.JsonStringToStruct(redisValue, temp)
if v.LatestUpdateTime != temp.LatestUpdateTime {
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
continue
} else {
newErr := db.DB.DeleteOneMessageKey(req.ClientMsgID, req.SessionType, v.TypeKey)
if newErr != nil {
setDeleteKeyResultInfo(&rResp, 201, newErr.Error(), req.ClientMsgID, v.TypeKey, temp)
continue
}
setDeleteKeyResultInfo(&rResp, 0, "", req.ClientMsgID, v.TypeKey, v)
}
}
} else {
err := rpc.dMessageLocker.LockGlobalMessage(req.ClientMsgID)
if err != nil {
rResp.ErrCode = 100
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
mongoValue, err := db.DB.GetExtendMsg(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime)
if err != nil {
rResp.ErrCode = 200
rResp.ErrMsg = err.Error()
for _, value := range req.ReactionExtensionList {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
return &rResp, nil
}
setValue := make(map[string]*server_api_params.KeyValue)
for _, v := range req.ReactionExtensionList {
temp := new(server_api_params.KeyValue)
if vv, ok := mongoValue.ReactionExtensionList[v.TypeKey]; ok {
utils.CopyStructFields(temp, &vv)
if v.LatestUpdateTime != vv.LatestUpdateTime {
setDeleteKeyResultInfo(&rResp, 300, "message have update", req.ClientMsgID, v.TypeKey, temp)
continue
}
} else {
setDeleteKeyResultInfo(&rResp, 400, "key not in", req.ClientMsgID, v.TypeKey, v)
continue
}
temp.TypeKey = v.TypeKey
setValue[v.TypeKey] = temp
}
err = db.DB.DeleteReactionExtendMsgSet(req.SourceID, req.SessionType, req.ClientMsgID, req.MsgFirstModifyTime, setValue)
if err != nil {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
temp.ErrMsg = err.Error()
temp.ErrCode = 100
rResp.Result = append(rResp.Result, temp)
}
} else {
for _, value := range setValue {
temp := new(msg.KeyValueResp)
temp.KeyValue = value
rResp.Result = append(rResp.Result, temp)
}
}
lockErr := rpc.dMessageLocker.UnLockGlobalMessage(req.ClientMsgID)
if lockErr != nil {
log.Error(req.OperationID, "UnLockGlobalMessage err:", lockErr.Error())
}
}
ExtendMessageDeleteNotification(req.OperationID, req.OpUserID, req.SourceID, req.SessionType, req, &rResp, false, isExists)
log.Debug(req.OperationID, utils.GetSelfFuncName(), "rpc return is:", rResp.String())
return &rResp, nil
}
+106
View File
@@ -0,0 +1,106 @@
package msg
import (
"Open_IM/pkg/base_info"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/msg"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"strings"
)
func ExtendMessageUpdatedNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.SetMessageReactionExtensionsReq, resp *msg.SetMessageReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageModifierNotification
m.SourceID = req.SourceID
m.OpUserID = req.OpUserID
m.SessionType = req.SessionType
keyMap := make(map[string]*open_im_sdk.KeyValue)
for _, valueResp := range resp.Result {
if valueResp.ErrCode == 0 {
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
}
}
if len(keyMap) == 0 {
log.NewWarn(operationID, "all key set failed can not send notification", *req)
return
}
m.SuccessReactionExtensionList = keyMap
m.ClientMsgID = req.ClientMsgID
m.IsReact = resp.IsReact
m.IsExternalExtensions = req.IsExternalExtensions
m.MsgFirstModifyTime = resp.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageModifier, utils.StructToJsonString(m), isHistory, isReactionFromCache)
}
func ExtendMessageDeleteNotification(operationID, sendID string, sourceID string, sessionType int32,
req *msg.DeleteMessageListReactionExtensionsReq, resp *msg.DeleteMessageListReactionExtensionsResp, isHistory bool, isReactionFromCache bool) {
var m base_info.ReactionMessageDeleteNotification
m.SourceID = req.SourceID
m.OpUserID = req.OpUserID
m.SessionType = req.SessionType
keyMap := make(map[string]*open_im_sdk.KeyValue)
for _, valueResp := range resp.Result {
if valueResp.ErrCode == 0 {
keyMap[valueResp.KeyValue.TypeKey] = valueResp.KeyValue
}
}
if len(keyMap) == 0 {
log.NewWarn(operationID, "all key set failed can not send notification", *req)
return
}
m.SuccessReactionExtensionList = keyMap
m.ClientMsgID = req.ClientMsgID
m.MsgFirstModifyTime = req.MsgFirstModifyTime
messageReactionSender(operationID, sendID, sourceID, sessionType, constant.ReactionMessageDeleter, utils.StructToJsonString(m), isHistory, isReactionFromCache)
}
func messageReactionSender(operationID, sendID string, sourceID string, sessionType, contentType int32, content string, isHistory bool, isReactionFromCache bool) {
options := make(map[string]bool, 5)
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false)
utils.SetSwitchFromOptions(options, constant.IsSenderConversationUpdate, false)
utils.SetSwitchFromOptions(options, constant.IsUnreadCount, false)
utils.SetSwitchFromOptions(options, constant.IsReactionFromCache, isReactionFromCache)
if !isHistory {
utils.SetSwitchFromOptions(options, constant.IsHistory, false)
utils.SetSwitchFromOptions(options, constant.IsPersistent, false)
}
pbData := msg.SendMsgReq{
OperationID: operationID,
MsgData: &open_im_sdk.MsgData{
SendID: sendID,
ClientMsgID: utils.GetMsgID(sendID),
SessionType: sessionType,
MsgFrom: constant.SysMsgType,
ContentType: contentType,
Content: []byte(content),
// ForceList: params.ForceList,
CreateTime: utils.GetCurrentTimestampByMill(),
Options: options,
},
}
switch sessionType {
case constant.SingleChatType, constant.NotificationChatType:
pbData.MsgData.RecvID = sourceID
case constant.GroupChatType, constant.SuperGroupChatType:
pbData.MsgData.GroupID = sourceID
}
etcdConn := getcdv3.GetDefaultConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImMsgName, operationID)
if etcdConn == nil {
errMsg := operationID + "getcdv3.GetDefaultConn == nil"
log.NewError(operationID, errMsg)
return
}
client := msg.NewMsgClient(etcdConn)
reply, err := client.SendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(operationID, "SendMsg rpc failed, ", pbData.String(), err.Error())
} else if reply.ErrCode != 0 {
log.NewError(operationID, "SendMsg rpc failed, ", pbData.String(), reply.ErrCode, reply.ErrMsg)
}
}
+60
View File
@@ -0,0 +1,60 @@
package msg
import (
cbApi "Open_IM/pkg/call_back_struct"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
http2 "net/http"
)
func callbackSetMessageReactionExtensions(setReq *msg.SetMessageReactionExtensionsReq) *cbApi.CallbackBeforeSetMessageReactionExtResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
req := cbApi.CallbackBeforeSetMessageReactionExtReq{
OperationID: setReq.OperationID,
CallbackCommand: constant.CallbackBeforeSetMessageReactionExtensionCommand,
SourceID: setReq.SourceID,
OpUserID: setReq.OpUserID,
SessionType: setReq.SessionType,
ReactionExtensionList: setReq.ReactionExtensionList,
ClientMsgID: setReq.ClientMsgID,
IsReact: setReq.IsReact,
IsExternalExtensions: setReq.IsExternalExtensions,
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbApi.CallbackBeforeSetMessageReactionExtResp{CommonCallbackResp: &callbackResp}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeSetMessageReactionExtensionCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return resp
}
func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessageListReactionExtensionsReq) *cbApi.CallbackDeleteMessageReactionExtResp {
callbackResp := cbApi.CommonCallbackResp{OperationID: setReq.OperationID}
log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), setReq.String())
req := cbApi.CallbackDeleteMessageReactionExtReq{
OperationID: setReq.OperationID,
CallbackCommand: constant.CallbackBeforeDeleteMessageReactionExtensionsCommand,
SourceID: setReq.SourceID,
OpUserID: setReq.OpUserID,
SessionType: setReq.SessionType,
ReactionExtensionList: setReq.ReactionExtensionList,
ClientMsgID: setReq.ClientMsgID,
IsExternalExtensions: setReq.IsExternalExtensions,
MsgFirstModifyTime: setReq.MsgFirstModifyTime,
}
resp := &cbApi.CallbackDeleteMessageReactionExtResp{CommonCallbackResp: &callbackResp}
defer log.NewDebug(setReq.OperationID, utils.GetSelfFuncName(), req, *resp)
if err := http.CallBackPostReturn(config.Config.Callback.CallbackUrl, constant.CallbackBeforeDeleteMessageReactionExtensionsCommand, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil {
callbackResp.ErrCode = http2.StatusInternalServerError
callbackResp.ErrMsg = err.Error()
}
return resp
}
+52
View File
@@ -0,0 +1,52 @@
package msg
import (
"Open_IM/pkg/common/db"
"time"
)
const GlOBLLOCK = "GLOBAL_LOCK"
type MessageLocker interface {
LockMessageTypeKey(clientMsgID, typeKey string) (err error)
UnLockMessageTypeKey(clientMsgID string, typeKey string) error
LockGlobalMessage(clientMsgID string) (err error)
UnLockGlobalMessage(clientMsgID string) (err error)
}
type LockerMessage struct{}
func NewLockerMessage() *LockerMessage {
return &LockerMessage{}
}
func (l *LockerMessage) LockMessageTypeKey(clientMsgID, typeKey string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, typeKey)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
}
func (l *LockerMessage) LockGlobalMessage(clientMsgID string) (err error) {
for i := 0; i < 3; i++ {
err = db.DB.LockMessageTypeKey(clientMsgID, GlOBLLOCK)
if err != nil {
time.Sleep(time.Millisecond * 100)
continue
} else {
break
}
}
return err
}
func (l *LockerMessage) UnLockMessageTypeKey(clientMsgID string, typeKey string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, typeKey)
}
func (l *LockerMessage) UnLockGlobalMessage(clientMsgID string) error {
return db.DB.UnLockMessageTypeKey(clientMsgID, GlOBLLOCK)
}
+3 -1
View File
@@ -31,7 +31,8 @@ type rpcChat struct {
etcdAddr []string
messageWriter MessageWriter
//offlineProducer *kafka.Producer
delMsgCh chan deleteMsg
delMsgCh chan deleteMsg
dMessageLocker MessageLocker
}
type deleteMsg struct {
@@ -48,6 +49,7 @@ func NewRpcChatServer(port int) *rpcChat {
rpcRegisterName: config.Config.RpcRegisterName.OpenImMsgName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
dMessageLocker: NewLockerMessage(),
}
rc.messageWriter = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
//rc.offlineProducer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschatOffline.Addr, config.Config.Kafka.Ws2mschatOffline.Topic)