mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-05 09:36:00 +08:00
Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release
This commit is contained in:
@@ -29,10 +29,10 @@ import (
|
||||
var validate *validator.Validate
|
||||
|
||||
func SetOptions(options map[string]bool, value bool) {
|
||||
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, value)
|
||||
utils.SetSwitchFromOptions(options, constant.IsHistory, value)
|
||||
utils.SetSwitchFromOptions(options, constant.IsPersistent, value)
|
||||
utils.SetSwitchFromOptions(options, constant.IsSenderSync, value)
|
||||
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
|
||||
}
|
||||
|
||||
func newUserSendMsgReq(params *api.ManagementSendMsgReq) *pbChat.SendMsgReq {
|
||||
@@ -59,10 +59,12 @@ func newUserSendMsgReq(params *api.ManagementSendMsgReq) *pbChat.SendMsgReq {
|
||||
if params.IsOnlineOnly {
|
||||
SetOptions(options, false)
|
||||
}
|
||||
if params.ContentType == constant.CustomMsgOnlineOnly {
|
||||
SetOptions(options, false)
|
||||
} else if params.ContentType == constant.CustomMsgNotTriggerConversation {
|
||||
if params.NotOfflinePush {
|
||||
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
|
||||
}
|
||||
if params.ContentType == constant.CustomOnlineOnly {
|
||||
SetOptions(options, false)
|
||||
} else if params.ContentType == constant.CustomNotTriggerConversation {
|
||||
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false)
|
||||
}
|
||||
|
||||
@@ -146,9 +148,9 @@ func ManagementSendMsg(c *gin.Context) {
|
||||
case constant.OANotification:
|
||||
data = OANotificationElem{}
|
||||
params.SessionType = constant.NotificationChatType
|
||||
case constant.CustomMsgNotTriggerConversation:
|
||||
case constant.CustomNotTriggerConversation:
|
||||
data = CustomElem{}
|
||||
case constant.CustomMsgOnlineOnly:
|
||||
case constant.CustomOnlineOnly:
|
||||
data = CustomElem{}
|
||||
//case constant.HasReadReceipt:
|
||||
//case constant.Typing:
|
||||
@@ -277,9 +279,9 @@ func ManagementBatchSendMsg(c *gin.Context) {
|
||||
case constant.OANotification:
|
||||
data = OANotificationElem{}
|
||||
params.SessionType = constant.NotificationChatType
|
||||
case constant.CustomMsgNotTriggerConversation:
|
||||
case constant.CustomNotTriggerConversation:
|
||||
data = CustomElem{}
|
||||
case constant.CustomMsgOnlineOnly:
|
||||
case constant.CustomOnlineOnly:
|
||||
data = CustomElem{}
|
||||
//case constant.HasReadReceipt:
|
||||
//case constant.Typing:
|
||||
@@ -322,12 +324,11 @@ func ManagementBatchSendMsg(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
client := pbChat.NewMsgClient(etcdConn)
|
||||
req := &api.ManagementSendMsgReq{
|
||||
ManagementSendMsg: params.ManagementSendMsg,
|
||||
}
|
||||
pbData := newUserSendMsgReq(req)
|
||||
for _, recvID := range params.RecvIDList {
|
||||
req := &api.ManagementSendMsgReq{
|
||||
ManagementSendMsg: params.ManagementSendMsg,
|
||||
RecvID: recvID,
|
||||
}
|
||||
pbData := newUserSendMsgReq(req)
|
||||
pbData.MsgData.RecvID = recvID
|
||||
log.Info(params.OperationID, "", "api ManagementSendMsg call start..., ", pbData.String())
|
||||
|
||||
@@ -344,10 +345,11 @@ func ManagementBatchSendMsg(c *gin.Context) {
|
||||
msgSendFailedFlag = true
|
||||
continue
|
||||
}
|
||||
resp.Data.ResultList = append(resp.Data.ResultList, server_api_params.UserSendMsgResp{
|
||||
resp.Data.ResultList = append(resp.Data.ResultList, &api.SingleReturnResult{
|
||||
ServerMsgID: rpcResp.ServerMsgID,
|
||||
ClientMsgID: rpcResp.ClientMsgID,
|
||||
SendTime: rpcResp.SendTime,
|
||||
RecvID: recvID,
|
||||
})
|
||||
}
|
||||
if msgSendFailedFlag {
|
||||
|
||||
+31
-14
@@ -78,15 +78,18 @@ func DelSuperGroupMsg(c *gin.Context) {
|
||||
req api.DelSuperGroupMsgReq
|
||||
resp api.DelSuperGroupMsgResp
|
||||
)
|
||||
rpcReq := &rpc.DelSuperGroupMsgReq{}
|
||||
utils.CopyStructFields(req, &req)
|
||||
if err := c.BindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||
return
|
||||
}
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req)
|
||||
|
||||
ok, opUserID, errInfo := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
|
||||
var ok bool
|
||||
var errInfo string
|
||||
ok, rpcReq.OpUserID, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
|
||||
if !ok {
|
||||
errMsg := req.OperationID + " " + opUserID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
|
||||
errMsg := req.OperationID + " " + rpcReq.OpUserID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
|
||||
log.NewError(req.OperationID, errMsg)
|
||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 500, "errMsg": errMsg})
|
||||
return
|
||||
@@ -137,18 +140,32 @@ func DelSuperGroupMsg(c *gin.Context) {
|
||||
client := rpc.NewMsgClient(etcdConn)
|
||||
|
||||
log.Info(req.OperationID, "", "api DelSuperGroupMsg call, api call rpc...")
|
||||
|
||||
RpcResp, err := client.SendMsg(context.Background(), &pbData)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "call delete UserSendMsg rpc server failed", err.Error())
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"})
|
||||
return
|
||||
if req.IsAllDelete {
|
||||
RpcResp, err := client.DelSuperGroupMsg(context.Background(),rpcReq)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "call delete DelSuperGroupMsg rpc server failed", err.Error())
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call DelSuperGroupMsg rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.Info(req.OperationID, "", "api DelSuperGroupMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
|
||||
resp.ErrCode = RpcResp.ErrCode
|
||||
resp.ErrMsg = RpcResp.ErrMsg
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}else{
|
||||
RpcResp, err := client.SendMsg(context.Background(), &pbData)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "call delete UserSendMsg rpc server failed", err.Error())
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.Info(req.OperationID, "", "api DelSuperGroupMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
|
||||
resp.ErrCode = RpcResp.ErrCode
|
||||
resp.ErrMsg = RpcResp.ErrMsg
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
|
||||
c.JSON(http.StatusOK, resp)
|
||||
}
|
||||
log.Info(req.OperationID, "", "api DelSuperGroupMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
|
||||
resp.ErrCode = RpcResp.ErrCode
|
||||
resp.ErrMsg = RpcResp.ErrMsg
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
|
||||
c.JSON(http.StatusOK, resp)
|
||||
|
||||
}
|
||||
|
||||
// @Summary 清空用户消息
|
||||
|
||||
@@ -160,6 +160,7 @@ func MinioStorageCredential(c *gin.Context) {
|
||||
resp.AccessKeyID = v.AccessKeyID
|
||||
resp.BucketName = config.Config.Credential.Minio.Bucket
|
||||
resp.StsEndpointURL = config.Config.Credential.Minio.Endpoint
|
||||
resp.StorageTime = config.Config.Credential.Minio.StorageTime
|
||||
c.JSON(http.StatusOK, gin.H{"errCode": 0, "errMsg": "", "data": resp})
|
||||
}
|
||||
|
||||
|
||||
@@ -17,11 +17,13 @@ func callbackUserOnline(operationID, userID string, platformID int, token string
|
||||
callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{
|
||||
Token: token,
|
||||
UserStatusCallbackReq: cbApi.UserStatusCallbackReq{
|
||||
CallbackCommand: constant.CallbackUserOnlineCommand,
|
||||
OperationID: operationID,
|
||||
UserID: userID,
|
||||
PlatformID: int32(platformID),
|
||||
Platform: constant.PlatformIDToName(platformID),
|
||||
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
CallbackCommand: constant.CallbackUserOnlineCommand,
|
||||
OperationID: operationID,
|
||||
PlatformID: int32(platformID),
|
||||
Platform: constant.PlatformIDToName(platformID),
|
||||
},
|
||||
UserID: userID,
|
||||
},
|
||||
Seq: int(time.Now().UnixNano() / 1e6),
|
||||
}
|
||||
@@ -40,11 +42,13 @@ func callbackUserOffline(operationID, userID string, platformID int) cbApi.Commo
|
||||
}
|
||||
callbackOfflineReq := cbApi.CallbackUserOfflineReq{
|
||||
UserStatusCallbackReq: cbApi.UserStatusCallbackReq{
|
||||
CallbackCommand: constant.CallbackUserOfflineCommand,
|
||||
OperationID: operationID,
|
||||
UserID: userID,
|
||||
PlatformID: int32(platformID),
|
||||
Platform: constant.PlatformIDToName(platformID),
|
||||
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
CallbackCommand: constant.CallbackUserOfflineCommand,
|
||||
OperationID: operationID,
|
||||
PlatformID: int32(platformID),
|
||||
Platform: constant.PlatformIDToName(platformID),
|
||||
},
|
||||
UserID: userID,
|
||||
},
|
||||
Seq: int(time.Now().UnixNano() / 1e6),
|
||||
}
|
||||
|
||||
@@ -36,7 +36,12 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
|
||||
}
|
||||
err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList)
|
||||
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
} else {
|
||||
err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
|
||||
if err != nil {
|
||||
log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
|
||||
}
|
||||
}
|
||||
for _, v := range msgFromMQ.MessageList {
|
||||
if v.MsgData.ContentType == constant.DeleteMessageNotification {
|
||||
|
||||
+13
-20
@@ -1,36 +1,30 @@
|
||||
package push
|
||||
package fcm
|
||||
|
||||
import (
|
||||
"Open_IM/internal/push"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/common/log"
|
||||
"context"
|
||||
"log"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
||||
firebase "firebase.google.com/go"
|
||||
"firebase.google.com/go/messaging"
|
||||
"google.golang.org/api/option"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type Fcm struct {
|
||||
}
|
||||
|
||||
var (
|
||||
FcmClient *Fcm
|
||||
FcmMsgCli *messaging.Client
|
||||
)
|
||||
|
||||
func init() {
|
||||
//FcmClient = newFcmClient()
|
||||
}
|
||||
|
||||
func NewFcm() *Fcm {
|
||||
return newFcmClient()
|
||||
}
|
||||
func newFcmClient() *Fcm {
|
||||
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
|
||||
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
|
||||
if err != nil {
|
||||
log.Println("error initializing app: %v\n", err)
|
||||
log.Debug("", "error initializing app: ", err.Error())
|
||||
return nil
|
||||
}
|
||||
//授权
|
||||
@@ -41,13 +35,12 @@ func newFcmClient() *Fcm {
|
||||
// }
|
||||
// log.Printf("%#v\r\n", fcmClient)
|
||||
ctx := context.Background()
|
||||
FcmMsgCli, err = fcmApp.Messaging(ctx)
|
||||
fcmMsgClient, err := fcmApp.Messaging(ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("error getting Messaging client: %v\n", err)
|
||||
panic(err.Error())
|
||||
return nil
|
||||
}
|
||||
log.Println(FcmMsgCli)
|
||||
return &Fcm{}
|
||||
return &Fcm{FcmMsgCli: fcmMsgClient}
|
||||
}
|
||||
|
||||
func (f *Fcm) Push(accounts []string, alert, detailContent, operationID string, opts push.PushOpts) (string, error) {
|
||||
@@ -87,9 +80,9 @@ func (f *Fcm) Push(accounts []string, alert, detailContent, operationID string,
|
||||
//An error from SendMulticast indicates a total failure -- i.e.
|
||||
//the message could not be sent to any of the recipients.
|
||||
//Partial failures are indicated by a `BatchResponse` return value.
|
||||
response, err := FcmMsgCli.SendMulticast(ctx, Msg)
|
||||
response, err := f.FcmMsgCli.SendMulticast(ctx, Msg)
|
||||
if err != nil {
|
||||
log.Fatalln(err)
|
||||
return "", err
|
||||
}
|
||||
Success = Success + response.SuccessCount
|
||||
Fail = Fail + response.FailureCount
|
||||
|
||||
@@ -5,22 +5,22 @@ import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/http"
|
||||
"Open_IM/pkg/common/log"
|
||||
commonPb "Open_IM/pkg/proto/sdk_ws"
|
||||
"Open_IM/pkg/utils"
|
||||
http2 "net/http"
|
||||
)
|
||||
|
||||
func callbackOfflinePush(operationID, userID string, msg *commonPb.MsgData) cbApi.CommonCallbackResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
|
||||
if !config.Config.Callback.CallbackOfflinePush.Enable {
|
||||
return callbackResp
|
||||
}
|
||||
callbackOfflinePushReq := cbApi.CallbackOfflinePushReq{
|
||||
UserStatusCallbackReq: cbApi.UserStatusCallbackReq{
|
||||
CallbackCommand: constant.CallbackOfflinePushCommand,
|
||||
OperationID: operationID,
|
||||
UserID: userID,
|
||||
PlatformID: msg.SenderPlatformID,
|
||||
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
func callbackBeforePush(operationID string, userIDList []string, msg *commonPb.MsgData, command string, callbackResp cbApi.CommonCallbackResp, timeOut int) cbApi.CommonCallbackResp {
|
||||
req := cbApi.CallbackBeforePushReq{
|
||||
UserStatusBatchCallbackReq: cbApi.UserStatusBatchCallbackReq{
|
||||
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
CallbackCommand: command,
|
||||
OperationID: operationID,
|
||||
PlatformID: msg.SenderPlatformID,
|
||||
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
},
|
||||
UserIDList: userIDList,
|
||||
},
|
||||
OfflinePushInfo: msg.OfflinePushInfo,
|
||||
SendID: msg.SendID,
|
||||
@@ -30,8 +30,8 @@ func callbackOfflinePush(operationID, userID string, msg *commonPb.MsgData) cbAp
|
||||
AtUserIDList: msg.AtUserIDList,
|
||||
Content: string(msg.Content),
|
||||
}
|
||||
callbackOfflinePushResp := &cbApi.CallbackOfflinePushResp{CommonCallbackResp: &callbackResp}
|
||||
if err := http.PostReturn(config.Config.Callback.CallbackUrl, callbackOfflinePushReq, callbackOfflinePushResp, config.Config.Callback.CallbackOfflinePush.CallbackTimeOut); err != nil {
|
||||
resp := &cbApi.CallbackBeforePushResp{CommonCallbackResp: &callbackResp}
|
||||
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, timeOut); err != nil {
|
||||
callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
callbackResp.ErrMsg = err.Error()
|
||||
if !config.Config.Callback.CallbackOfflinePush.CallbackFailedContinue {
|
||||
@@ -44,3 +44,59 @@ func callbackOfflinePush(operationID, userID string, msg *commonPb.MsgData) cbAp
|
||||
}
|
||||
return callbackResp
|
||||
}
|
||||
|
||||
func callbackOfflinePush(operationID string, userIDList []string, msg *commonPb.MsgData) cbApi.CommonCallbackResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
|
||||
if !config.Config.Callback.CallbackOfflinePush.Enable {
|
||||
return callbackResp
|
||||
}
|
||||
return callbackBeforePush(operationID, userIDList, msg, constant.CallbackOfflinePushCommand, callbackResp, config.Config.Callback.CallbackOfflinePush.CallbackTimeOut)
|
||||
}
|
||||
|
||||
func callbackOnlinePush(operationID string, userIDList []string, msg *commonPb.MsgData) cbApi.CommonCallbackResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
|
||||
if !config.Config.Callback.CallbackOnlinePush.Enable {
|
||||
return callbackResp
|
||||
}
|
||||
return callbackBeforePush(operationID, userIDList, msg, constant.CallbackOnlinePushCommand, callbackResp, config.Config.Callback.CallbackOnlinePush.CallbackTimeOut)
|
||||
}
|
||||
|
||||
func callbackBeforeSuperGroupOnlinePush(operationID string, groupID string, msg *commonPb.MsgData, pushToUserList *[]string) cbApi.CommonCallbackResp {
|
||||
callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
|
||||
if !config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable {
|
||||
return callbackResp
|
||||
}
|
||||
req := cbApi.CallbackBeforeSuperGroupOnlinePushReq{
|
||||
UserStatusBaseCallback: cbApi.UserStatusBaseCallback{
|
||||
CallbackCommand: constant.CallbackSuperGroupOnlinePushCommand,
|
||||
OperationID: operationID,
|
||||
PlatformID: msg.SenderPlatformID,
|
||||
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
|
||||
},
|
||||
OfflinePushInfo: msg.OfflinePushInfo,
|
||||
SendID: msg.SendID,
|
||||
GroupID: groupID,
|
||||
ContentType: msg.ContentType,
|
||||
SessionType: msg.SessionType,
|
||||
AtUserIDList: msg.AtUserIDList,
|
||||
Content: string(msg.Content),
|
||||
}
|
||||
resp := &cbApi.CallbackBeforeSuperGroupOnlinePushResp{CommonCallbackResp: &callbackResp}
|
||||
if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.CallbackTimeOut); err != nil {
|
||||
callbackResp.ErrCode = http2.StatusInternalServerError
|
||||
callbackResp.ErrMsg = err.Error()
|
||||
if !config.Config.Callback.CallbackOfflinePush.CallbackFailedContinue {
|
||||
callbackResp.ActionCode = constant.ActionForbidden
|
||||
return callbackResp
|
||||
} else {
|
||||
callbackResp.ActionCode = constant.ActionAllow
|
||||
return callbackResp
|
||||
}
|
||||
}
|
||||
if resp.ErrCode == constant.CallbackHandleSuccess && resp.ActionCode == constant.ActionAllow && len(resp.UserIDList) != 0 {
|
||||
*pushToUserList = resp.UserIDList
|
||||
}
|
||||
log.NewDebug(operationID, utils.GetSelfFuncName(), pushToUserList, resp.UserIDList)
|
||||
return callbackResp
|
||||
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ func init() {
|
||||
}
|
||||
|
||||
if config.Config.Push.Fcm.Enable {
|
||||
offlinePusher = fcm.FcmClient
|
||||
offlinePusher = fcm.NewFcm()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,18 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ")
|
||||
grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRelayName)
|
||||
}
|
||||
|
||||
var UIDList = []string{pushMsg.PushToUserID}
|
||||
callbackResp := callbackOnlinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData)
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush callback Resp")
|
||||
if callbackResp.ErrCode != 0 {
|
||||
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOnlinePush result: ", callbackResp)
|
||||
}
|
||||
if callbackResp.ActionCode != constant.ActionAllow {
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "OnlinePush stop")
|
||||
return
|
||||
}
|
||||
|
||||
//Online push message
|
||||
log.Debug(pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
|
||||
for _, v := range grpcCons {
|
||||
@@ -75,9 +87,6 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
return
|
||||
}
|
||||
}
|
||||
//Use offline push messaging
|
||||
var UIDList []string
|
||||
UIDList = append(UIDList, pushMsg.PushToUserID)
|
||||
customContent := OpenIMContent{
|
||||
SessionType: int(pushMsg.MsgData.SessionType),
|
||||
From: pushMsg.MsgData.SendID,
|
||||
@@ -118,7 +127,7 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
}
|
||||
}
|
||||
|
||||
callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData)
|
||||
callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList, pushMsg.MsgData)
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
|
||||
if callbackResp.ErrCode != 0 {
|
||||
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
|
||||
@@ -148,32 +157,49 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
|
||||
var wsResult []*pbRelay.SingelMsgToUserResultList
|
||||
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
|
||||
log.Debug(pushMsg.OperationID, "Get super group msg from msg_transfer And push msg", pushMsg.String())
|
||||
getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID}
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID)
|
||||
if etcdConn == nil {
|
||||
errMsg := pushMsg.OperationID + "getcdv3.GetConn == nil"
|
||||
log.NewError(pushMsg.OperationID, errMsg)
|
||||
return
|
||||
}
|
||||
client := pbCache.NewCacheClient(etcdConn)
|
||||
cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq)
|
||||
if err != nil {
|
||||
log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error())
|
||||
return
|
||||
}
|
||||
if cacheResp.CommonResp.ErrCode != 0 {
|
||||
log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
|
||||
return
|
||||
var pushToUserIDList []string
|
||||
if config.Config.Callback.CallbackBeforeSuperGroupOnlinePush.Enable {
|
||||
callbackResp := callbackBeforeSuperGroupOnlinePush(pushMsg.OperationID, pushMsg.PushToUserID, pushMsg.MsgData, &pushToUserIDList)
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
|
||||
if callbackResp.ErrCode != 0 {
|
||||
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
|
||||
}
|
||||
if callbackResp.ActionCode != constant.ActionAllow {
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "onlinePush stop")
|
||||
return
|
||||
}
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "callback userIDList Resp", pushToUserIDList)
|
||||
} else {
|
||||
getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pushMsg.OperationID, GroupID: pushMsg.MsgData.GroupID}
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, pushMsg.OperationID)
|
||||
if etcdConn == nil {
|
||||
errMsg := pushMsg.OperationID + "getcdv3.GetConn == nil"
|
||||
log.NewError(pushMsg.OperationID, errMsg)
|
||||
return
|
||||
}
|
||||
client := pbCache.NewCacheClient(etcdConn)
|
||||
cacheResp, err := client.GetGroupMemberIDListFromCache(context.Background(), getGroupMemberIDListFromCacheReq)
|
||||
if err != nil {
|
||||
log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc call failed ", err.Error())
|
||||
return
|
||||
}
|
||||
if cacheResp.CommonResp.ErrCode != 0 {
|
||||
log.NewError(pushMsg.OperationID, "GetGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
|
||||
return
|
||||
}
|
||||
pushToUserIDList = cacheResp.UserIDList
|
||||
}
|
||||
|
||||
if len(grpcCons) == 0 {
|
||||
log.NewWarn(pushMsg.OperationID, "first GetConn4Unique ")
|
||||
grpcCons = getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRelayName)
|
||||
}
|
||||
|
||||
//Online push message
|
||||
log.Debug("test", pushMsg.OperationID, "len grpc", len(grpcCons), "data", pushMsg.String())
|
||||
for _, v := range grpcCons {
|
||||
msgClient := pbRelay.NewRelayClient(v)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: cacheResp.UserIDList})
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(context.Background(), &pbRelay.OnlineBatchPushOneMsgReq{OperationID: pushMsg.OperationID, MsgData: pushMsg.MsgData, PushToUserIDList: pushToUserIDList})
|
||||
if err != nil {
|
||||
log.NewError("push data to client rpc err", pushMsg.OperationID, "err", err)
|
||||
continue
|
||||
@@ -192,7 +218,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
|
||||
onlineSuccessUserIDList = append(onlineSuccessUserIDList, v.UserID)
|
||||
}
|
||||
}
|
||||
onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, cacheResp.UserIDList)
|
||||
onlineFailedUserIDList := utils.DifferenceString(onlineSuccessUserIDList, pushToUserIDList)
|
||||
//Use offline push messaging
|
||||
customContent := OpenIMContent{
|
||||
SessionType: int(pushMsg.MsgData.SessionType),
|
||||
@@ -234,7 +260,7 @@ func MsgToSuperGroupUser(pushMsg *pbPush.PushMsgReq) {
|
||||
}
|
||||
}
|
||||
if len(onlineFailedUserIDList) > 0 {
|
||||
callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList[0], pushMsg.MsgData)
|
||||
callbackResp := callbackOfflinePush(pushMsg.OperationID, onlineFailedUserIDList, pushMsg.MsgData)
|
||||
log.NewDebug(pushMsg.OperationID, utils.GetSelfFuncName(), "offline callback Resp")
|
||||
if callbackResp.ErrCode != 0 {
|
||||
log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp)
|
||||
|
||||
+45
-59
@@ -178,6 +178,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
||||
resp.GroupInfo.OwnerUserID = req.OwnerUserID
|
||||
okUserIDList = append(okUserIDList, req.OwnerUserID)
|
||||
}
|
||||
// superGroup stored in mongodb
|
||||
} else {
|
||||
for _, v := range req.InitMemberList {
|
||||
okUserIDList = append(okUserIDList, v.UserID)
|
||||
@@ -186,20 +187,25 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
|
||||
log.NewError(req.OperationID, "GetGroupMemberNumByGroupID failed ", err.Error(), groupId)
|
||||
resp.ErrCode = constant.ErrDB.ErrCode
|
||||
resp.ErrMsg = err.Error() + ": CreateSuperGroup failed"
|
||||
}
|
||||
}
|
||||
|
||||
for _, userID := range okUserIDList {
|
||||
if err := rocksCache.DelJoinedGroupIDListFromCache(userID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), userID, err.Error())
|
||||
return resp, nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(okUserIDList) != 0 {
|
||||
log.NewInfo(req.OperationID, "rpc CreateGroup return ", resp.String())
|
||||
if req.GroupInfo.GroupType != constant.SuperGroup {
|
||||
for _, userID := range okUserIDList {
|
||||
if err := rocksCache.DelJoinedGroupIDListFromCache(userID); err != nil {
|
||||
log.NewWarn(req.OperationID, utils.GetSelfFuncName(), userID, err.Error())
|
||||
}
|
||||
}
|
||||
chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, okUserIDList)
|
||||
} else {
|
||||
for _, userID := range okUserIDList {
|
||||
if err := rocksCache.DelJoinedSuperGroupIDListFromCache(userID); err != nil {
|
||||
log.NewWarn(req.OperationID, utils.GetSelfFuncName(), userID, err.Error())
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
for _, v := range okUserIDList {
|
||||
chat.SuperGroupNotification(req.OperationID, v, v)
|
||||
@@ -274,7 +280,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
|
||||
groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.GroupID, err)
|
||||
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
|
||||
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil
|
||||
}
|
||||
if groupInfo.Status == constant.GroupStatusDismissed {
|
||||
errMsg := " group status is dismissed "
|
||||
@@ -426,7 +432,6 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
|
||||
log.NewError(req.OperationID, "AddUserToSuperGroup failed ", req.GroupID, err)
|
||||
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
|
||||
@@ -449,19 +454,23 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
|
||||
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
|
||||
}
|
||||
|
||||
for _, userID := range okUserIDList {
|
||||
err = rocksCache.DelJoinedGroupIDListFromCache(userID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
}
|
||||
|
||||
if groupInfo.GroupType != constant.SuperGroup {
|
||||
for _, userID := range okUserIDList {
|
||||
err = rocksCache.DelJoinedGroupIDListFromCache(userID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
}
|
||||
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
chat.MemberInvitedNotification(req.OperationID, req.GroupID, req.OpUserID, req.Reason, okUserIDList)
|
||||
} else {
|
||||
for _, v := range req.InvitedUserIDList {
|
||||
if err := rocksCache.DelJoinedSuperGroupIDListFromCache(v); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
for _, v := range req.InvitedUserIDList {
|
||||
chat.SuperGroupNotification(req.OperationID, v, v)
|
||||
@@ -492,18 +501,9 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro
|
||||
resp.ErrMsg = constant.ErrDB.ErrMsg
|
||||
return &resp, nil
|
||||
}
|
||||
//memberList, err := imdb.GetGroupMemberListByGroupID(req.GroupID)
|
||||
//if err != nil {
|
||||
// log.NewError(req.OperationID, "GetGroupMemberListByGroupID failed,", err.Error(), req.GroupID)
|
||||
// resp.ErrCode = constant.ErrDB.ErrCode
|
||||
// resp.ErrMsg = constant.ErrDB.ErrMsg
|
||||
// return &resp, nil
|
||||
//}
|
||||
for _, v := range memberList {
|
||||
var node open_im_sdk.GroupMemberFullInfo
|
||||
cp.GroupMemberDBCopyOpenIM(&node, v)
|
||||
//log.Debug(req.OperationID, "db value:", v.MuteEndTime, "seconds: ", v.MuteEndTime.Unix())
|
||||
//log.Debug(req.OperationID, "cp value: ", node)
|
||||
resp.MemberList = append(resp.MemberList, &node)
|
||||
}
|
||||
}
|
||||
@@ -681,22 +681,24 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
|
||||
return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
|
||||
}
|
||||
|
||||
for _, userID := range okUserIDList {
|
||||
err = rocksCache.DelJoinedGroupIDListFromCache(userID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
}
|
||||
if err = rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
|
||||
if groupInfo.GroupType != constant.SuperGroup {
|
||||
for _, userID := range okUserIDList {
|
||||
err = rocksCache.DelJoinedGroupIDListFromCache(userID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
}
|
||||
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
chat.MemberKickedNotification(req, okUserIDList)
|
||||
} else {
|
||||
for _, userID := range okUserIDList {
|
||||
err = rocksCache.DelJoinedSuperGroupIDListFromCache(userID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID)
|
||||
}
|
||||
}
|
||||
go func() {
|
||||
for _, v := range req.KickedUserIDList {
|
||||
chat.SuperGroupNotification(req.OperationID, v, v)
|
||||
@@ -711,24 +713,8 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
|
||||
|
||||
func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetGroupMembersInfoReq) (*pbGroup.GetGroupMembersInfoResp, error) {
|
||||
log.NewInfo(req.OperationID, "GetGroupMembersInfo args ", req.String())
|
||||
|
||||
var resp pbGroup.GetGroupMembersInfoResp
|
||||
resp.MemberList = []*open_im_sdk.GroupMemberFullInfo{}
|
||||
|
||||
//for _, v := range req.MemberList {
|
||||
// var memberNode open_im_sdk.GroupMemberFullInfo
|
||||
// memberInfo, err := imdb.GetMemberInfoByID(req.GroupID, v)
|
||||
// memberNode.UserID = v
|
||||
// if err != nil {
|
||||
// log.NewError(req.OperationID, "GetMemberInfoById failed ", err.Error(), req.GroupID, v)
|
||||
// continue
|
||||
// } else {
|
||||
// utils.CopyStructFields(&memberNode, memberInfo)
|
||||
// memberNode.JoinTime = int32(memberInfo.JoinTime.Unix())
|
||||
// resp.MemberList = append(resp.MemberList, &memberNode)
|
||||
// }
|
||||
//}
|
||||
|
||||
groupMembers, err := rocksCache.GetAllGroupMembersInfoFromCache(req.GroupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
|
||||
@@ -791,7 +777,6 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsI
|
||||
log.NewInfo(req.OperationID, "GetGroupsInfo args ", req.String())
|
||||
groupsInfoList := make([]*open_im_sdk.GroupInfo, 0)
|
||||
for _, groupID := range req.GroupIDList {
|
||||
//groupInfoFromMysql, err := imdb.GetGroupInfoByGroupID(groupID)
|
||||
groupInfoFromRedis, err := rocksCache.GetGroupInfoFromCache(groupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", err.Error(), groupID)
|
||||
@@ -1021,7 +1006,6 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
|
||||
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
|
||||
}
|
||||
|
||||
//
|
||||
//_, err = imdb.GetGroupMemberListByGroupIDAndRoleLevel(req.GroupID, constant.GroupOwner)
|
||||
//if err != nil {
|
||||
// log.NewError(req.OperationID, "GetGroupMemberListByGroupIDAndRoleLevel failed ", err.Error(), req.GroupID, constant.GroupOwner)
|
||||
@@ -1110,17 +1094,19 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq)
|
||||
}
|
||||
|
||||
if groupInfo.GroupType != constant.SuperGroup {
|
||||
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
if err := rocksCache.DelJoinedGroupIDListFromCache(req.OpUserID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.OpUserID)
|
||||
}
|
||||
chat.MemberQuitNotification(req)
|
||||
} else {
|
||||
if err := rocksCache.DelJoinedSuperGroupIDListFromCache(req.OpUserID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.OpUserID)
|
||||
}
|
||||
chat.SuperGroupNotification(req.OperationID, req.OpUserID, req.OpUserID)
|
||||
}
|
||||
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
|
||||
}
|
||||
if err := rocksCache.DelJoinedGroupIDListFromCache(req.OpUserID); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.OpUserID)
|
||||
}
|
||||
|
||||
log.NewInfo(req.OperationID, "rpc QuitGroup return ", pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}})
|
||||
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
|
||||
}
|
||||
|
||||
@@ -2,47 +2,42 @@ package group
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
|
||||
rocksCache "Open_IM/pkg/common/db/rocks_cache"
|
||||
"Open_IM/pkg/common/log"
|
||||
cp "Open_IM/pkg/common/utils"
|
||||
pbGroup "Open_IM/pkg/proto/group"
|
||||
commonPb "Open_IM/pkg/proto/sdk_ws"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
func (s *groupServer) GetJoinedSuperGroupList(ctx context.Context, req *pbGroup.GetJoinedSuperGroupListReq) (*pbGroup.GetJoinedSuperGroupListResp, error) {
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||
resp := &pbGroup.GetJoinedSuperGroupListResp{CommonResp: &pbGroup.CommonResp{}}
|
||||
userToSuperGroup, err := db.DB.GetSuperGroupByUserID(req.UserID)
|
||||
if err == mongo.ErrNoDocuments {
|
||||
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "GetSuperGroupByUserID failed ", err.Error(), req.UserID)
|
||||
return resp, nil
|
||||
}
|
||||
//userToSuperGroup, err := db.DB.GetSuperGroupByUserID(req.UserID)
|
||||
groupIDList, err := rocksCache.GetJoinedSuperGroupListFromCache(req.UserID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetSuperGroupByUserID failed ", err.Error(), req.UserID)
|
||||
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
|
||||
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
|
||||
return resp, nil
|
||||
}
|
||||
for _, groupID := range userToSuperGroup.GroupIDList {
|
||||
groupInfoDB, err := imdb.GetGroupInfoByGroupID(groupID)
|
||||
for _, groupID := range groupIDList {
|
||||
groupInfoFromCache, err := rocksCache.GetGroupInfoFromCache(groupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupInfoByGroupID failed", groupID, err.Error())
|
||||
continue
|
||||
}
|
||||
groupInfo := &commonPb.GroupInfo{}
|
||||
if err := utils.CopyStructFields(groupInfo, groupInfoDB); err != nil {
|
||||
if err := utils.CopyStructFields(groupInfo, groupInfoFromCache); err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
|
||||
}
|
||||
group, err := db.DB.GetSuperGroup(groupID)
|
||||
groupMemberIDList, err := rocksCache.GetGroupMemberIDListFromCache(groupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetSuperGroup failed", groupID, err.Error())
|
||||
continue
|
||||
}
|
||||
groupInfo.MemberCount = uint32(len(group.MemberIDList))
|
||||
groupInfo.MemberCount = uint32(len(groupMemberIDList))
|
||||
resp.GroupList = append(resp.GroupList, groupInfo)
|
||||
}
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
|
||||
@@ -54,13 +49,13 @@ func (s *groupServer) GetSuperGroupsInfo(_ context.Context, req *pbGroup.GetSupe
|
||||
resp = &pbGroup.GetSuperGroupsInfoResp{CommonResp: &pbGroup.CommonResp{}}
|
||||
groupsInfoList := make([]*commonPb.GroupInfo, 0)
|
||||
for _, groupID := range req.GroupIDList {
|
||||
groupInfoFromMysql, err := imdb.GetGroupInfoByGroupID(groupID)
|
||||
groupInfoFromRedis, err := rocksCache.GetGroupInfoFromCache(groupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", err.Error(), groupID)
|
||||
continue
|
||||
}
|
||||
var groupInfo commonPb.GroupInfo
|
||||
cp.GroupDBCopyOpenIM(&groupInfo, groupInfoFromMysql)
|
||||
cp.GroupDBCopyOpenIM(&groupInfo, groupInfoFromRedis)
|
||||
groupsInfoList = append(groupsInfoList, &groupInfo)
|
||||
}
|
||||
resp.GroupInfoList = groupsInfoList
|
||||
|
||||
@@ -53,7 +53,7 @@ func (rpc *rpcChat) SetMsgMinSeq(_ context.Context, req *pbChat.SetMsgMinSeqReq)
|
||||
}
|
||||
return &pbChat.SetMsgMinSeqResp{}, nil
|
||||
}
|
||||
err := db.DB.SetGroupUserMinSeq(req.GroupID, req.UserID, req.MinSeq)
|
||||
err := db.DB.SetGroupUserMinSeq(req.GroupID, req.UserID, uint64(req.MinSeq))
|
||||
if err != nil {
|
||||
errMsg := "SetGroupUserMinSeq failed " + err.Error() + req.OperationID + req.GroupID + req.UserID + utils.Uint32ToString(req.MinSeq)
|
||||
log.Error(req.OperationID, errMsg)
|
||||
|
||||
@@ -2,8 +2,11 @@ package msg
|
||||
|
||||
import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/db"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/common/token_verify"
|
||||
commonPb "Open_IM/pkg/proto/sdk_ws"
|
||||
"Open_IM/pkg/proto/msg"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"time"
|
||||
@@ -27,3 +30,27 @@ func (rpc *rpcChat) DelMsgList(_ context.Context, req *commonPb.DelMsgListReq) (
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
|
||||
return resp, nil
|
||||
}
|
||||
func (rpc *rpcChat) DelSuperGroupMsg(_ context.Context, req *msg.DelSuperGroupMsgReq) (*msg.DelSuperGroupMsgResp, error) {
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
|
||||
if !token_verify.CheckAccess(req.OpUserID, req.UserID) {
|
||||
log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.UserID)
|
||||
return &msg.DelSuperGroupMsgResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
|
||||
}
|
||||
resp := &msg.DelSuperGroupMsgResp{}
|
||||
groupMaxSeq, err := db.DB.GetGroupMaxSeq(req.GroupID)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "GetGroupMaxSeq false ", req.OpUserID, req.UserID,req.GroupID)
|
||||
resp.ErrCode = constant.ErrDB.ErrCode
|
||||
resp.ErrMsg = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
err = db.DB.SetGroupUserMinSeq(req.GroupID,req.UserID, groupMaxSeq)
|
||||
if err != nil {
|
||||
log.NewError(req.OperationID, "SetGroupUserMinSeq false ", req.OpUserID, req.UserID,req.GroupID)
|
||||
resp.ErrCode = constant.ErrDB.ErrCode
|
||||
resp.ErrMsg = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
|
||||
return resp, nil
|
||||
}
|
||||
@@ -74,7 +74,7 @@ func messageVerification(data *pbChat.SendMsgReq) (bool, int32, string, []string
|
||||
if utils.IsContain(data.MsgData.SendID, config.Config.Manager.AppManagerUid) {
|
||||
return true, 0, "", nil
|
||||
}
|
||||
if data.MsgData.ContentType <= constant.FriendApplicationNotification && data.MsgData.ContentType >= constant.FriendApplicationApprovedNotification {
|
||||
if data.MsgData.ContentType <= constant.NotificationEnd && data.MsgData.ContentType >= constant.NotificationBegin {
|
||||
return true, 0, "", nil
|
||||
}
|
||||
log.NewDebug(data.OperationID, config.Config.MessageVerify.FriendVerify)
|
||||
|
||||
Reference in New Issue
Block a user