mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-15 06:25:58 +08:00
Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun
This commit is contained in:
@@ -7,12 +7,14 @@
|
||||
package manage
|
||||
|
||||
import (
|
||||
api "Open_IM/pkg/base_info"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/common/token_verify"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbChat "Open_IM/pkg/proto/chat"
|
||||
"Open_IM/pkg/proto/sdk_ws"
|
||||
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
@@ -40,11 +42,13 @@ func newUserSendMsgReq(params *ManagementSendMsgReq) *pbChat.SendMsgReq {
|
||||
newContent = utils.StructToJsonString(params.Content)
|
||||
default:
|
||||
}
|
||||
options := make(map[string]bool, 2)
|
||||
var options map[string]bool
|
||||
if params.IsOnlineOnly {
|
||||
options = make(map[string]bool, 5)
|
||||
utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
|
||||
utils.SetSwitchFromOptions(options, constant.IsHistory, false)
|
||||
utils.SetSwitchFromOptions(options, constant.IsPersistent, false)
|
||||
utils.SetSwitchFromOptions(options, constant.IsSenderSync, false)
|
||||
}
|
||||
pbData := pbChat.SendMsgReq{
|
||||
OperationID: params.OperationID,
|
||||
@@ -152,20 +156,16 @@ func ManagementSendMsg(c *gin.Context) {
|
||||
|
||||
log.Info("", "", "api ManagementSendMsg call, api call rpc...")
|
||||
|
||||
reply, err := client.SendMsg(context.Background(), pbData)
|
||||
RpcResp, err := client.SendMsg(context.Background(), pbData)
|
||||
if err != nil {
|
||||
log.NewError(params.OperationID, "call delete UserSendMsg rpc server failed", err.Error())
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"})
|
||||
return
|
||||
}
|
||||
log.Info("", "", "api ManagementSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String())
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"errCode": reply.ErrCode,
|
||||
"errMsg": reply.ErrMsg,
|
||||
"sendTime": reply.SendTime,
|
||||
"msgID": reply.ClientMsgID,
|
||||
})
|
||||
log.Info("", "", "api ManagementSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), RpcResp.String())
|
||||
resp := api.ManagementSendMsgResp{CommResp: api.CommResp{ErrCode: RpcResp.ErrCode, ErrMsg: RpcResp.ErrMsg}, ResultList: server_api_params.UserSendMsgResp{ServerMsgID: RpcResp.ServerMsgID, ClientMsgID: RpcResp.ClientMsgID, SendTime: RpcResp.SendTime}}
|
||||
log.Info(params.OperationID, "ManagementSendMsg return", resp)
|
||||
c.JSON(http.StatusOK, resp)
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ package gate
|
||||
import (
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/statistics"
|
||||
"fmt"
|
||||
"github.com/go-playground/validator/v10"
|
||||
"sync"
|
||||
)
|
||||
@@ -12,6 +14,7 @@ var (
|
||||
validate *validator.Validate
|
||||
ws WServer
|
||||
rpcSvr RPCServer
|
||||
count uint64
|
||||
)
|
||||
|
||||
func Init(rpcPort, wsPort int) {
|
||||
@@ -19,6 +22,7 @@ func Init(rpcPort, wsPort int) {
|
||||
log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName)
|
||||
rwLock = new(sync.RWMutex)
|
||||
validate = validator.New()
|
||||
statistics.NewStatistics(&count, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway count", count), 10)
|
||||
ws.onInit(wsPort)
|
||||
rpcSvr.onInit(rpcPort)
|
||||
}
|
||||
|
||||
@@ -142,6 +142,7 @@ func (ws *WServer) pullMsgBySeqListResp(conn *UserConn, m *Req, pb *sdk_ws.PullM
|
||||
|
||||
}
|
||||
func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
||||
count++
|
||||
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID)
|
||||
nReply := new(pbChat.SendMsgResp)
|
||||
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbMsg "Open_IM/pkg/proto/chat"
|
||||
pbPush "Open_IM/pkg/proto/push"
|
||||
"Open_IM/pkg/statistics"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"github.com/Shopify/sarama"
|
||||
@@ -20,9 +21,14 @@ type fcb func(msg []byte, msgKey string)
|
||||
type HistoryConsumerHandler struct {
|
||||
msgHandle map[string]fcb
|
||||
historyConsumerGroup *kfk.MConsumerGroup
|
||||
singleMsgCount uint64
|
||||
groupMsgCount uint64
|
||||
}
|
||||
|
||||
func (mc *HistoryConsumerHandler) Init() {
|
||||
statistics.NewStatistics(&mc.singleMsgCount, config.Config.ModuleName.MsgTransferName, "singleMsgCount insert to mongo ", 10)
|
||||
statistics.NewStatistics(&mc.groupMsgCount, config.Config.ModuleName.MsgTransferName, "groupMsgCount insert to mongo ", 10)
|
||||
|
||||
mc.msgHandle = make(map[string]fcb)
|
||||
mc.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = mc.handleChatWs2Mongo
|
||||
mc.historyConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V0_10_2_0,
|
||||
@@ -55,6 +61,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
||||
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
|
||||
return
|
||||
}
|
||||
mc.singleMsgCount++
|
||||
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
|
||||
}
|
||||
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
|
||||
@@ -70,6 +77,7 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
||||
log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error())
|
||||
return
|
||||
}
|
||||
mc.groupMsgCount++
|
||||
}
|
||||
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
|
||||
default:
|
||||
|
||||
@@ -20,6 +20,6 @@ func Init() {
|
||||
}
|
||||
func Run() {
|
||||
//register mysqlConsumerHandler to
|
||||
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
||||
//go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
||||
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/kafka"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/statistics"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -18,6 +20,7 @@ var (
|
||||
pushCh PushConsumerHandler
|
||||
pushTerminal []int32
|
||||
producer *kafka.Producer
|
||||
count uint64
|
||||
)
|
||||
|
||||
func Init(rpcPort int) {
|
||||
@@ -28,6 +31,7 @@ func Init(rpcPort int) {
|
||||
}
|
||||
func init() {
|
||||
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic)
|
||||
statistics.NewStatistics(&count, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", 10), 10)
|
||||
}
|
||||
|
||||
func Run() {
|
||||
|
||||
@@ -7,16 +7,12 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
push "Open_IM/internal/push/jpush"
|
||||
"Open_IM/pkg/common/config"
|
||||
"Open_IM/pkg/common/constant"
|
||||
"Open_IM/pkg/common/log"
|
||||
"Open_IM/pkg/grpc-etcdv3/getcdv3"
|
||||
pbPush "Open_IM/pkg/proto/push"
|
||||
pbRelay "Open_IM/pkg/proto/relay"
|
||||
"Open_IM/pkg/utils"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -34,7 +30,7 @@ type AtContent struct {
|
||||
|
||||
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
var wsResult []*pbRelay.SingleMsgToUser
|
||||
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
|
||||
//isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
|
||||
log.InfoByKv("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String())
|
||||
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
|
||||
//Online push message
|
||||
@@ -51,66 +47,67 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
|
||||
}
|
||||
}
|
||||
log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData)
|
||||
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
|
||||
for _, v := range wsResult {
|
||||
if v.ResultCode == 0 {
|
||||
continue
|
||||
}
|
||||
//supported terminal
|
||||
for _, t := range pushTerminal {
|
||||
if v.RecvPlatFormID == t {
|
||||
//Use offline push messaging
|
||||
var UIDList []string
|
||||
UIDList = append(UIDList, v.RecvID)
|
||||
customContent := OpenIMContent{
|
||||
SessionType: int(pushMsg.MsgData.SessionType),
|
||||
From: pushMsg.MsgData.SendID,
|
||||
To: pushMsg.MsgData.RecvID,
|
||||
Seq: pushMsg.MsgData.Seq,
|
||||
}
|
||||
bCustomContent, _ := json.Marshal(customContent)
|
||||
jsonCustomContent := string(bCustomContent)
|
||||
var content string
|
||||
if pushMsg.MsgData.OfflinePushInfo != nil {
|
||||
content = pushMsg.MsgData.OfflinePushInfo.Title
|
||||
|
||||
} else {
|
||||
switch pushMsg.MsgData.ContentType {
|
||||
case constant.Text:
|
||||
content = constant.ContentType2PushContent[constant.Text]
|
||||
case constant.Picture:
|
||||
content = constant.ContentType2PushContent[constant.Picture]
|
||||
case constant.Voice:
|
||||
content = constant.ContentType2PushContent[constant.Voice]
|
||||
case constant.Video:
|
||||
content = constant.ContentType2PushContent[constant.Video]
|
||||
case constant.File:
|
||||
content = constant.ContentType2PushContent[constant.File]
|
||||
case constant.AtText:
|
||||
a := AtContent{}
|
||||
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
|
||||
if utils.IsContain(v.RecvID, a.AtUserList) {
|
||||
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
|
||||
} else {
|
||||
content = constant.ContentType2PushContent[constant.GroupMsg]
|
||||
}
|
||||
default:
|
||||
content = constant.ContentType2PushContent[constant.Common]
|
||||
}
|
||||
}
|
||||
|
||||
pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t))
|
||||
if err != nil {
|
||||
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t))
|
||||
} else {
|
||||
log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
count++
|
||||
//if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
|
||||
// for _, v := range wsResult {
|
||||
// if v.ResultCode == 0 {
|
||||
// continue
|
||||
// }
|
||||
// //supported terminal
|
||||
// for _, t := range pushTerminal {
|
||||
// if v.RecvPlatFormID == t {
|
||||
// //Use offline push messaging
|
||||
// var UIDList []string
|
||||
// UIDList = append(UIDList, v.RecvID)
|
||||
// customContent := OpenIMContent{
|
||||
// SessionType: int(pushMsg.MsgData.SessionType),
|
||||
// From: pushMsg.MsgData.SendID,
|
||||
// To: pushMsg.MsgData.RecvID,
|
||||
// Seq: pushMsg.MsgData.Seq,
|
||||
// }
|
||||
// bCustomContent, _ := json.Marshal(customContent)
|
||||
// jsonCustomContent := string(bCustomContent)
|
||||
// var content string
|
||||
// if pushMsg.MsgData.OfflinePushInfo != nil {
|
||||
// content = pushMsg.MsgData.OfflinePushInfo.Title
|
||||
//
|
||||
// } else {
|
||||
// switch pushMsg.MsgData.ContentType {
|
||||
// case constant.Text:
|
||||
// content = constant.ContentType2PushContent[constant.Text]
|
||||
// case constant.Picture:
|
||||
// content = constant.ContentType2PushContent[constant.Picture]
|
||||
// case constant.Voice:
|
||||
// content = constant.ContentType2PushContent[constant.Voice]
|
||||
// case constant.Video:
|
||||
// content = constant.ContentType2PushContent[constant.Video]
|
||||
// case constant.File:
|
||||
// content = constant.ContentType2PushContent[constant.File]
|
||||
// case constant.AtText:
|
||||
// a := AtContent{}
|
||||
// _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
|
||||
// if utils.IsContain(v.RecvID, a.AtUserList) {
|
||||
// content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
|
||||
// } else {
|
||||
// content = constant.ContentType2PushContent[constant.GroupMsg]
|
||||
// }
|
||||
// default:
|
||||
// content = constant.ContentType2PushContent[constant.Common]
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t))
|
||||
// if err != nil {
|
||||
// log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t))
|
||||
// } else {
|
||||
// log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t))
|
||||
// }
|
||||
//
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
//}
|
||||
}
|
||||
|
||||
//func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {
|
||||
|
||||
@@ -245,7 +245,7 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string) error {
|
||||
}
|
||||
func GetMsgID(sendID string) string {
|
||||
t := time.Now().Format("2006-01-02 15:04:05")
|
||||
return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
|
||||
return utils.Md5(t + "-" + sendID + "-" + strconv.Itoa(rand.Int()))
|
||||
}
|
||||
|
||||
func returnMsg(replay *pbChat.SendMsgResp, pb *pbChat.SendMsgReq, errCode int32, errMsg, serverMsgID string, sendTime int64) (*pbChat.SendMsgResp, error) {
|
||||
|
||||
Reference in New Issue
Block a user