Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

This commit is contained in:
wangchuxiao
2022-03-31 14:50:17 +08:00
29 changed files with 127 additions and 75 deletions
+6 -6
View File
@@ -140,7 +140,7 @@ func ManagementSendMsg(c *gin.Context) {
}
log.NewInfo("", data, params)
token := c.Request.Header.Get("token")
claims, err := token_verify.ParseToken(token)
claims, err := token_verify.ParseToken(token, params.OperationID)
if err != nil {
log.NewError(params.OperationID, "parse token failed", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""})
@@ -289,16 +289,16 @@ type OANotificationElem struct {
Url string `mapstructure:"url"`
MixType int32 `mapstructure:"mixType"`
Image struct {
SourceUrl string `mapstructure:"sourceUrl"`
SnapshotUrl string `mapstructure:"snapshotUrl"`
SourceUrl string `mapstructure:"sourceURL"`
SnapshotUrl string `mapstructure:"snapshotURL"`
} `mapstructure:"image"`
Video struct {
SourceUrl string `mapstructure:"sourceUrl"`
SnapshotUrl string `mapstructure:"snapshotUrl"`
SourceUrl string `mapstructure:"sourceURL"`
SnapshotUrl string `mapstructure:"snapshotURL"`
Duration int64 `mapstructure:"duration"`
} `mapstructure:"video"`
File struct {
SourceUrl string `mapstructure:"sourceUrl"`
SourceUrl string `mapstructure:"sourceURL"`
FileName string `mapstructure:"fileName"`
FileSize int64 `mapstructure:"fileSize"`
} `mapstructure:"file"`
+2 -1
View File
@@ -2,6 +2,7 @@ package gate
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/statistics"
"fmt"
@@ -20,7 +21,7 @@ var (
func Init(rpcPort, wsPort int) {
//log initialization
log.NewPrivateLog(config.Config.ModuleName.LongConnSvrName)
log.NewPrivateLog(constant.LogFileName)
rwLock = new(sync.RWMutex)
validate = validator.New()
statistics.NewStatistics(&sendMsgCount, config.Config.ModuleName.LongConnSvrName, fmt.Sprintf("%d second recv to msg_gateway sendMsgCount", sendMsgCount), 300)
@@ -80,6 +80,22 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
mc.groupMsgCount++
}
go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID)
case constant.NotificationChatType:
log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist)
if isHistory {
err := saveUserChat(msgKey, &msgFromMQ)
if err != nil {
log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String())
return
}
mc.singleMsgCount++
log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
}
if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID {
} else {
go sendMessageToPush(&msgFromMQ, msgKey)
}
log.NewDebug(operationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time)
default:
log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String())
return
+2 -1
View File
@@ -2,6 +2,7 @@ package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
)
@@ -13,7 +14,7 @@ var (
)
func Init() {
log.NewPrivateLog(config.Config.ModuleName.MsgTransferName)
log.NewPrivateLog(constant.LogFileName)
persistentCH.Init()
historyCH.Init()
producer = kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic)
@@ -34,26 +34,31 @@ func (pc *PersistentConsumerHandler) Init() {
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(msg []byte, msgKey string) {
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg))
var tag bool
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.ErrorByKv("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
log.NewError(msgFromMQ.OperationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
return
}
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
//Only process receiver data
if isPersist {
if msgKey == msgFromMQ.MsgData.RecvID && msgFromMQ.MsgData.SessionType == constant.SingleChatType {
log.InfoByKv("msg_transfer msg persisting", msgFromMQ.OperationID)
if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil {
log.ErrorByKv("Message insert failed", msgFromMQ.OperationID, "err", err.Error(), "msg", msgFromMQ.String())
return
switch msgFromMQ.MsgData.SessionType {
case constant.SingleChatType, constant.NotificationChatType:
if msgKey == msgFromMQ.MsgData.RecvID {
tag = true
}
} else if msgFromMQ.MsgData.SessionType == constant.GroupChatType && msgKey == msgFromMQ.MsgData.SendID {
log.InfoByKv("msg_transfer msg persisting", msgFromMQ.OperationID)
case constant.GroupChatType:
if msgKey == msgFromMQ.MsgData.SendID || utils.IsContain(msgFromMQ.MsgData.SendID, config.Config.Manager.AppManagerUid) {
tag = true
}
}
if tag {
log.NewInfo(msgFromMQ.OperationID, "msg_transfer msg persisting", string(msg))
if err = im_mysql_msg_model.InsertMessageToChatLog(msgFromMQ); err != nil {
log.ErrorByKv("Message insert failed", msgFromMQ.OperationID, "err", err.Error(), "msg", msgFromMQ.String())
log.NewError(msgFromMQ.OperationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
return
}
}
+1 -1
View File
@@ -24,7 +24,7 @@ var (
)
func Init(rpcPort int) {
log.NewPrivateLog(config.Config.ModuleName.PushName)
log.NewPrivateLog(constant.LogFileName)
rpcServer.Init(rpcPort)
pushCh.Init()
pushTerminal = []int32{constant.IOSPlatformID, constant.AndroidPlatformID}
+4 -4
View File
@@ -24,7 +24,7 @@ type adminCMSServer struct {
}
func NewAdminCMSServer(port int) *adminCMSServer {
log.NewPrivateLog("AdminCMS")
log.NewPrivateLog(constant.LogFileName)
return &adminCMSServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImAdminCMSName,
@@ -66,12 +66,12 @@ func (s *adminCMSServer) Run() {
func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLoginReq) (*pbAdminCMS.AdminLoginResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbAdminCMS.AdminLoginResp{}
for i, adminID := range config.Config.Manager.AppManagerUid{
for i, adminID := range config.Config.Manager.AppManagerUid {
if adminID == req.AdminID && config.Config.Manager.Secrets[i] == req.Secret {
token, expTime, err := token_verify.CreateToken(adminID, constant.SingleChatType)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "generate token success", "token: ", token, "expTime:", expTime)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "generate token failed", "adminID: ", adminID, err.Error())
log.NewError(req.OperationID, utils.GetSelfFuncName(), "generate token failed", "adminID: ", adminID, err.Error())
return resp, openIMHttp.WrapError(constant.ErrTokenUnknown)
}
resp.Token = token
@@ -84,4 +84,4 @@ func (s *adminCMSServer) AdminLogin(_ context.Context, req *pbAdminCMS.AdminLogi
return resp, openIMHttp.WrapError(constant.ErrTokenMalformed)
}
return resp, nil
}
}
+1 -1
View File
@@ -63,7 +63,7 @@ type rpcAuth struct {
}
func NewRpcAuthServer(port int) *rpcAuth {
log.NewPrivateLog("auth")
log.NewPrivateLog(constant.LogFileName)
return &rpcAuth{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImAuthName,
+1 -1
View File
@@ -29,7 +29,7 @@ type friendServer struct {
}
func NewFriendServer(port int) *friendServer {
log.NewPrivateLog("friend")
log.NewPrivateLog(constant.LogFileName)
return &friendServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImFriendName,
+4 -5
View File
@@ -31,7 +31,7 @@ type groupServer struct {
}
func NewGroupServer(port int) *groupServer {
log.NewPrivateLog("group")
log.NewPrivateLog(constant.LogFileName)
return &groupServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImGroupName,
@@ -276,7 +276,7 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro
log.Debug(req.OperationID, v)
var node open_im_sdk.GroupMemberFullInfo
cp.GroupMemberDBCopyOpenIM(&node, &v)
log.Debug(req.OperationID, "db value:", v)
log.Debug(req.OperationID, "db value:", v.MuteEndTime, "seconds: ", v.MuteEndTime.Unix())
log.Debug(req.OperationID, "cp value: ", node)
resp.MemberList = append(resp.MemberList, &node)
}
@@ -1029,11 +1029,10 @@ func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMu
log.Error(req.OperationID, "verify failed ", req.OpUserID, req.GroupID)
return &pbGroup.CancelMuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}}, nil
}
groupInfo := db.Group{GroupID: req.GroupID}
err := imdb.UpdateGroupInfoDefaultZero(groupInfo, map[string]interface{}{"status": constant.GroupOk})
err := imdb.UpdateGroupInfoDefaultZero(req.GroupID, map[string]interface{}{"status": constant.GroupOk})
if err != nil {
log.Error(req.OperationID, "UpdateGroupInfoDefaultZero failed ", err.Error(), groupInfo)
log.Error(req.OperationID, "UpdateGroupInfoDefaultZero failed ", err.Error(), req.GroupID)
return &pbGroup.CancelMuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
chat.GroupCancelMutedNotification(req.OperationID, req.OpUserID, req.GroupID)
+10 -10
View File
@@ -31,7 +31,7 @@ type messageCMSServer struct {
}
func NewMessageCMSServer(port int) *messageCMSServer {
log.NewPrivateLog("MessageCMS")
log.NewPrivateLog(constant.LogFileName)
return &messageCMSServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImMessageCMSName,
@@ -84,8 +84,8 @@ func (s *messageCMSServer) GetChatLogs(_ context.Context, req *pbMessageCMS.GetC
log.NewError(req.OperationID, utils.GetSelfFuncName(), "time string parse error", err.Error())
}
chatLog := db.ChatLog{
Content: req.Content,
SendTime: time,
Content: req.Content,
SendTime: time,
ContentType: req.ContentType,
SessionType: req.SessionType,
}
@@ -101,20 +101,20 @@ func (s *messageCMSServer) GetChatLogs(_ context.Context, req *pbMessageCMS.GetC
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLogCount", err.Error())
}
chatLogs, err := imdb.GetChatLog(chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber)
chatLogs, err := imdb.GetChatLog(chatLog, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetChatLog", err.Error())
return resp, errors.WrapError(constant.ErrDB)
}
for _, chatLog := range chatLogs {
pbChatLog := &pbMessageCMS.ChatLogs{
SessionType: chatLog.SessionType,
ContentType: chatLog.ContentType,
SearchContent: req.Content,
WholeContent: chatLog.Content,
Date: chatLog.CreateTime.String(),
SessionType: chatLog.SessionType,
ContentType: chatLog.ContentType,
SearchContent: req.Content,
WholeContent: chatLog.Content,
Date: chatLog.CreateTime.String(),
SenderNickName: chatLog.SenderNickname,
SenderId: chatLog.SendID,
SenderId: chatLog.SendID,
}
if chatLog.SenderNickname == "" {
sendUser, err := imdb.GetUserByUserID(chatLog.SendID)
+2 -1
View File
@@ -2,6 +2,7 @@ package msg
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/kafka"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
@@ -22,7 +23,7 @@ type rpcChat struct {
}
func NewRpcChatServer(port int) *rpcChat {
log.NewPrivateLog("msg")
log.NewPrivateLog(constant.LogFileName)
rc := rpcChat{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImOfflineMessageName,
+18 -1
View File
@@ -281,6 +281,23 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error())
}
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
case constant.NotificationChatType:
msgToMQ.MsgData = pb.MsgData
log.NewInfo(msgToMQ.OperationID, msgToMQ)
err1 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.RecvID)
if err1 != nil {
log.NewError(msgToMQ.OperationID, "kafka send msg err:RecvID", msgToMQ.MsgData.RecvID, msgToMQ.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
if msgToMQ.MsgData.SendID != msgToMQ.MsgData.RecvID { //Filter messages sent to yourself
err2 := rpc.sendMsgToKafka(&msgToMQ, msgToMQ.MsgData.SendID)
if err2 != nil {
log.NewError(msgToMQ.OperationID, "kafka send msg err:SendID", msgToMQ.MsgData.SendID, msgToMQ.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
return returnMsg(&replay, pb, 0, "", msgToMQ.MsgData.ServerMsgID, msgToMQ.MsgData.SendTime)
default:
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
}
@@ -571,6 +588,6 @@ func Notification(n *NotificationMsg) {
if err != nil {
log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), err.Error())
} else if reply.ErrCode != 0 {
log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String())
log.NewError(req.OperationID, "SendMsg rpc failed, ", req.String(), reply.ErrCode, reply.ErrMsg)
}
}
+5 -5
View File
@@ -26,7 +26,7 @@ type officeServer struct {
}
func NewOfficeServer(port int) *officeServer {
log.NewPrivateLog("office")
log.NewPrivateLog(constant.LogFileName)
return &officeServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImOfficeName,
@@ -103,7 +103,7 @@ func (s *officeServer) GetUserTags(_ context.Context, req *pbOffice.GetUserTagsR
func (s *officeServer) CreateTag(_ context.Context, req *pbOffice.CreateTagReq) (resp *pbOffice.CreateTagResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "CreateTag req", req.String())
userIDList := utils.RemoveUserIDRepByMap(req.UserIDList)
userIDList := utils.RemoveRepeatedStringInList(req.UserIDList)
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "userIDList: ", userIDList)
resp = &pbOffice.CreateTagResp{CommonResp: &pbOffice.CommonResp{}}
if err := db.DB.CreateTag(req.UserID, req.TagName, userIDList); err != nil {
@@ -132,8 +132,8 @@ func (s *officeServer) DeleteTag(_ context.Context, req *pbOffice.DeleteTagReq)
func (s *officeServer) SetTag(_ context.Context, req *pbOffice.SetTagReq) (resp *pbOffice.SetTagResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbOffice.SetTagResp{CommonResp: &pbOffice.CommonResp{}}
IncreaseUserIDList := utils.RemoveUserIDRepByMap(req.IncreaseUserIDList)
reduceUserIDList := utils.RemoveUserIDRepByMap(req.ReduceUserIDList)
IncreaseUserIDList := utils.RemoveRepeatedStringInList(req.IncreaseUserIDList)
reduceUserIDList := utils.RemoveRepeatedStringInList(req.ReduceUserIDList)
if err := db.DB.SetTag(req.UserID, req.TagID, req.NewName, IncreaseUserIDList, reduceUserIDList); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetTag failed", err.Error())
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
@@ -171,7 +171,7 @@ func (s *officeServer) SendMsg2Tag(_ context.Context, req *pbOffice.SendMsg2TagR
userIDList = append(userIDList, tagUserIDList...)
userIDList = append(userIDList, groupUserIDList...)
userIDList = append(userIDList, req.UserList...)
userIDList = utils.RemoveUserIDRepByMap(userIDList)
userIDList = utils.RemoveRepeatedStringInList(userIDList)
for i, userID := range userIDList {
if userID == req.SendID || userID == "" {
userIDList = append(userIDList[:i], userIDList[i+1:]...)
+4 -4
View File
@@ -19,10 +19,10 @@ import (
//open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
//"context"
errors "Open_IM/pkg/common/http"
"net"
"strconv"
"strings"
errors "Open_IM/pkg/common/http"
"google.golang.org/grpc"
)
@@ -35,7 +35,7 @@ type statisticsServer struct {
}
func NewStatisticsServer(port int) *statisticsServer {
log.NewPrivateLog("Statistics")
log.NewPrivateLog(constant.LogFileName)
return &statisticsServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImStatisticsName,
@@ -164,7 +164,7 @@ func GetRangeDate(from, to time.Time) [][2]time.Time {
}
// month
case !isInOneMonth(from, to):
if to.Sub(from) < time.Hour * 24 * 30 {
if to.Sub(from) < time.Hour*24*30 {
for i := 0; ; i++ {
fromTime := from.Add(time.Hour * 24 * time.Duration(i))
toTime := from.Add(time.Hour * 24 * time.Duration(i+1))
@@ -251,7 +251,7 @@ func (s *statisticsServer) GetGroupStatistics(_ context.Context, req *pbStatisti
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetIncreaseGroupNum", v, err.Error())
}
resp.TotalGroupNumList[index] = &pbStatistics.DateNumList{
Date: v[0].String(),
Date: v[0].String(),
Num: num,
}
}(wg, i, v)
+1 -1
View File
@@ -32,7 +32,7 @@ type userServer struct {
}
func NewUserServer(port int) *userServer {
log.NewPrivateLog("user")
log.NewPrivateLog(constant.LogFileName)
return &userServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImUserName,
+1 -1
View File
@@ -76,7 +76,7 @@ func Test_ParseToken(t *testing.T) {
uid := "1"
platform := int32(1)
tokenString, _, _ := token_verify.CreateToken(uid, platform)
claims, err := token_verify.ParseToken(tokenString)
claims, err := token_verify.ParseToken(tokenString, "")
if err == nil {
assert.Equal(t, claims.UID, uid)
}