Merge branch 'v2.3.0release' of github.com:OpenIMSDK/Open-IM-Server into v2.3.0release

This commit is contained in:
wangchuxiao
2022-07-29 14:36:33 +08:00
16 changed files with 367 additions and 181 deletions
+16 -14
View File
@@ -29,10 +29,10 @@ import (
var validate *validator.Validate
func SetOptions(options map[string]bool, value bool) {
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, value)
utils.SetSwitchFromOptions(options, constant.IsHistory, value)
utils.SetSwitchFromOptions(options, constant.IsPersistent, value)
utils.SetSwitchFromOptions(options, constant.IsSenderSync, value)
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, value)
}
func newUserSendMsgReq(params *api.ManagementSendMsgReq) *pbChat.SendMsgReq {
@@ -59,10 +59,12 @@ func newUserSendMsgReq(params *api.ManagementSendMsgReq) *pbChat.SendMsgReq {
if params.IsOnlineOnly {
SetOptions(options, false)
}
if params.ContentType == constant.CustomMsgOnlineOnly {
SetOptions(options, false)
} else if params.ContentType == constant.CustomMsgNotTriggerConversation {
if params.NotOfflinePush {
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
}
if params.ContentType == constant.CustomOnlineOnly {
SetOptions(options, false)
} else if params.ContentType == constant.CustomNotTriggerConversation {
utils.SetSwitchFromOptions(options, constant.IsConversationUpdate, false)
}
@@ -146,9 +148,9 @@ func ManagementSendMsg(c *gin.Context) {
case constant.OANotification:
data = OANotificationElem{}
params.SessionType = constant.NotificationChatType
case constant.CustomMsgNotTriggerConversation:
case constant.CustomNotTriggerConversation:
data = CustomElem{}
case constant.CustomMsgOnlineOnly:
case constant.CustomOnlineOnly:
data = CustomElem{}
//case constant.HasReadReceipt:
//case constant.Typing:
@@ -277,9 +279,9 @@ func ManagementBatchSendMsg(c *gin.Context) {
case constant.OANotification:
data = OANotificationElem{}
params.SessionType = constant.NotificationChatType
case constant.CustomMsgNotTriggerConversation:
case constant.CustomNotTriggerConversation:
data = CustomElem{}
case constant.CustomMsgOnlineOnly:
case constant.CustomOnlineOnly:
data = CustomElem{}
//case constant.HasReadReceipt:
//case constant.Typing:
@@ -322,12 +324,11 @@ func ManagementBatchSendMsg(c *gin.Context) {
return
}
client := pbChat.NewMsgClient(etcdConn)
req := &api.ManagementSendMsgReq{
ManagementSendMsg: params.ManagementSendMsg,
}
pbData := newUserSendMsgReq(req)
for _, recvID := range params.RecvIDList {
req := &api.ManagementSendMsgReq{
ManagementSendMsg: params.ManagementSendMsg,
RecvID: recvID,
}
pbData := newUserSendMsgReq(req)
pbData.MsgData.RecvID = recvID
log.Info(params.OperationID, "", "api ManagementSendMsg call start..., ", pbData.String())
@@ -344,10 +345,11 @@ func ManagementBatchSendMsg(c *gin.Context) {
msgSendFailedFlag = true
continue
}
resp.Data.ResultList = append(resp.Data.ResultList, server_api_params.UserSendMsgResp{
resp.Data.ResultList = append(resp.Data.ResultList, &api.SingleReturnResult{
ServerMsgID: rpcResp.ServerMsgID,
ClientMsgID: rpcResp.ClientMsgID,
SendTime: rpcResp.SendTime,
RecvID: recvID,
})
}
if msgSendFailedFlag {
+31 -14
View File
@@ -78,15 +78,18 @@ func DelSuperGroupMsg(c *gin.Context) {
req api.DelSuperGroupMsgReq
resp api.DelSuperGroupMsgResp
)
rpcReq := &rpc.DelSuperGroupMsgReq{}
utils.CopyStructFields(req, &req)
if err := c.BindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req:", req)
ok, opUserID, errInfo := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
var ok bool
var errInfo string
ok, rpcReq.OpUserID, errInfo = token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
if !ok {
errMsg := req.OperationID + " " + opUserID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
errMsg := req.OperationID + " " + rpcReq.OpUserID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
log.NewError(req.OperationID, errMsg)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 500, "errMsg": errMsg})
return
@@ -137,18 +140,32 @@ func DelSuperGroupMsg(c *gin.Context) {
client := rpc.NewMsgClient(etcdConn)
log.Info(req.OperationID, "", "api DelSuperGroupMsg call, api call rpc...")
RpcResp, err := client.SendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(req.OperationID, "call delete UserSendMsg rpc server failed", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"})
return
if req.IsAllDelete {
RpcResp, err := client.DelSuperGroupMsg(context.Background(),rpcReq)
if err != nil {
log.NewError(req.OperationID, "call delete DelSuperGroupMsg rpc server failed", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call DelSuperGroupMsg rpc server failed"})
return
}
log.Info(req.OperationID, "", "api DelSuperGroupMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
resp.ErrCode = RpcResp.ErrCode
resp.ErrMsg = RpcResp.ErrMsg
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
c.JSON(http.StatusOK, resp)
}else{
RpcResp, err := client.SendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(req.OperationID, "call delete UserSendMsg rpc server failed", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"})
return
}
log.Info(req.OperationID, "", "api DelSuperGroupMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
resp.ErrCode = RpcResp.ErrCode
resp.ErrMsg = RpcResp.ErrMsg
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
c.JSON(http.StatusOK, resp)
}
log.Info(req.OperationID, "", "api DelSuperGroupMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
resp.ErrCode = RpcResp.ErrCode
resp.ErrMsg = RpcResp.ErrMsg
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), resp)
c.JSON(http.StatusOK, resp)
}
// @Summary 清空用户消息
+1
View File
@@ -417,6 +417,7 @@ func GetUsersOnlineStatus(c *gin.Context) {
flag := false
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRelayName)
for _, v := range grpcCons {
log.Debug(params.OperationID, "get node ", *v, v.Target())
client := pbRelay.NewRelayClient(v)
reply, err := client.GetUsersOnlineStatus(context.Background(), req)
if err != nil {
@@ -5,8 +5,6 @@ const (
Address = "127.0.0.1:11300"
)
//var roomClient *lksdk.RoomServiceClient
type Media struct {
}
@@ -14,45 +12,11 @@ func NewMedia() *Media {
return &Media{}
}
//func (m *Media) GetJoinToken(room, identity string, operationID string, data *open_im_sdk.ParticipantMetaData) (string, string, error) {
// var newData pbRtc.ParticipantMetaData
// copier.Copy(&newData, data)
// conn, err := grpc.Dial(Address, grpc.WithInsecure())
// if err != nil {
// return "", "", err
// }
// defer conn.Close()
// c := pbRtc.NewRtcServiceClient(conn)
// req := &pbRtc.GetJoinTokenReq{Room: room, OperationID: operationID, Identity: identity, MetaData: &newData}
// resp, err := c.GetJoinToken(context.Background(), req)
// if err != nil {
// return "", "", err
// }
// if resp.CommonResp.ErrCode != 0 {
// return "", "", errors.New(resp.CommonResp.ErrMsg)
// }
// return resp.Jwt, resp.LiveURL, nil
// //at := auth.NewAccessToken(m.ApiKey, m.ApiSecret)
// //grant := &auth.VideoGrant{
// // RoomJoin: true,
// // Room: room,
// //}
// //at.AddGrant(grant).
// // SetIdentity(identity).
// // SetValidFor(time.Hour)
// //
// //return at.ToJWT()
//}
func init() {
//roomClient = lksdk.NewRoomServiceClient(MediaAddress, ApiKey, ApiSecret)
}
func (m *Media) CreateRoom(roomName string) (error, error) {
return nil, nil
//return roomClient.CreateRoom(context.Background(), &livekit.CreateRoomRequest{
// Name: roomName,
// EmptyTimeout: 60 * 3,
//})
}
@@ -36,7 +36,12 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(cMsg *sarama.Con
}
err = db.DB.BatchInsertChat2DB(msgFromMQ.AggregationID, msgFromMQ.MessageList, msgFromMQ.TriggerID, msgFromMQ.LastSeq)
if err != nil {
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList)
log.NewError(msgFromMQ.TriggerID, "single data insert to mongo err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
} else {
err = db.DB.DeleteMessageFromCache(msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.GetTriggerID())
if err != nil {
log.NewError(msgFromMQ.TriggerID, "remove cache msg from redis err", err.Error(), msgFromMQ.MessageList, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
}
}
for _, v := range msgFromMQ.MessageList {
if v.MsgData.ContentType == constant.DeleteMessageNotification {
+13 -20
View File
@@ -1,36 +1,30 @@
package push
package fcm
import (
"Open_IM/internal/push"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"context"
"log"
"path/filepath"
"strconv"
firebase "firebase.google.com/go"
"firebase.google.com/go/messaging"
"google.golang.org/api/option"
"path/filepath"
"strconv"
)
type Fcm struct {
}
var (
FcmClient *Fcm
FcmMsgCli *messaging.Client
)
func init() {
//FcmClient = newFcmClient()
}
func NewFcm() *Fcm {
return newFcmClient()
}
func newFcmClient() *Fcm {
opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount))
fcmApp, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {
log.Println("error initializing app: %v\n", err)
log.Debug("", "error initializing app: ", err.Error())
return nil
}
//授权
@@ -41,13 +35,12 @@ func newFcmClient() *Fcm {
// }
// log.Printf("%#v\r\n", fcmClient)
ctx := context.Background()
FcmMsgCli, err = fcmApp.Messaging(ctx)
fcmMsgClient, err := fcmApp.Messaging(ctx)
if err != nil {
log.Fatalf("error getting Messaging client: %v\n", err)
panic(err.Error())
return nil
}
log.Println(FcmMsgCli)
return &Fcm{}
return &Fcm{FcmMsgCli: fcmMsgClient}
}
func (f *Fcm) Push(accounts []string, alert, detailContent, operationID string, opts push.PushOpts) (string, error) {
@@ -87,9 +80,9 @@ func (f *Fcm) Push(accounts []string, alert, detailContent, operationID string,
//An error from SendMulticast indicates a total failure -- i.e.
//the message could not be sent to any of the recipients.
//Partial failures are indicated by a `BatchResponse` return value.
response, err := FcmMsgCli.SendMulticast(ctx, Msg)
response, err := f.FcmMsgCli.SendMulticast(ctx, Msg)
if err != nil {
log.Fatalln(err)
return "", err
}
Success = Success + response.SuccessCount
Fail = Fail + response.FailureCount
+1 -1
View File
@@ -44,7 +44,7 @@ func init() {
}
if config.Config.Push.Fcm.Enable {
offlinePusher = fcm.FcmClient
offlinePusher = fcm.NewFcm()
}
}
+1 -1
View File
@@ -280,7 +280,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.GroupID, err)
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}, nil
}
if groupInfo.Status == constant.GroupStatusDismissed {
errMsg := " group status is dismissed "
+1 -1
View File
@@ -53,7 +53,7 @@ func (rpc *rpcChat) SetMsgMinSeq(_ context.Context, req *pbChat.SetMsgMinSeqReq)
}
return &pbChat.SetMsgMinSeqResp{}, nil
}
err := db.DB.SetGroupUserMinSeq(req.GroupID, req.UserID, req.MinSeq)
err := db.DB.SetGroupUserMinSeq(req.GroupID, req.UserID, uint64(req.MinSeq))
if err != nil {
errMsg := "SetGroupUserMinSeq failed " + err.Error() + req.OperationID + req.GroupID + req.UserID + utils.Uint32ToString(req.MinSeq)
log.Error(req.OperationID, errMsg)
+27
View File
@@ -2,8 +2,11 @@ package msg
import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/proto/msg"
"Open_IM/pkg/utils"
"context"
"time"
@@ -27,3 +30,27 @@ func (rpc *rpcChat) DelMsgList(_ context.Context, req *commonPb.DelMsgListReq) (
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (rpc *rpcChat) DelSuperGroupMsg(_ context.Context, req *msg.DelSuperGroupMsgReq) (*msg.DelSuperGroupMsgResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
if !token_verify.CheckAccess(req.OpUserID, req.UserID) {
log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.UserID)
return &msg.DelSuperGroupMsgResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
resp := &msg.DelSuperGroupMsgResp{}
groupMaxSeq, err := db.DB.GetGroupMaxSeq(req.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupMaxSeq false ", req.OpUserID, req.UserID,req.GroupID)
resp.ErrCode = constant.ErrDB.ErrCode
resp.ErrMsg = err.Error()
return resp, nil
}
err = db.DB.SetGroupUserMinSeq(req.GroupID,req.UserID, groupMaxSeq)
if err != nil {
log.NewError(req.OperationID, "SetGroupUserMinSeq false ", req.OpUserID, req.UserID,req.GroupID)
resp.ErrCode = constant.ErrDB.ErrCode
resp.ErrMsg = err.Error()
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}