Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao
2023-05-22 10:21:57 +08:00
16 changed files with 21 additions and 431 deletions
+1 -2
View File
@@ -9,7 +9,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
@@ -98,7 +97,7 @@ func (m Message) newUserSendMsgReq(c *gin.Context, params *apistruct.ManagementS
tips.JsonDetail = utils.StructToJsonString(params.Content)
pbData.MsgData.Content, err = proto.Marshal(&tips)
if err != nil {
log.Error(mcontext.GetOperationID(c), "Marshal failed ", err.Error(), tips.String())
log.ZError(c, "Marshal failed ", err, "tips", tips.String())
}
}
return &pbData
+2 -4
View File
@@ -2,14 +2,12 @@ package msggateway
import (
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
)
func RunWsAndServer(rpcPort, wsPort, prometheusPort int) error {
log.NewPrivateLog(constant.LogFileName)
fmt.Println("start rpc/msg_gateway server, port: ", rpcPort, wsPort, prometheusPort, ", OpenIM version: ", config.Version)
longServer, err := NewWsServer(
WithPort(wsPort),
@@ -30,12 +30,12 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase)
return mc
}
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, conversationID string, session sarama.ConsumerGroupSession) {
func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, key string, session sarama.ConsumerGroupSession) {
msg := cMsg.Value
msgFromMQ := pbMsg.MsgDataToMongoByMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.ZError(ctx, "unmarshall failed", err, "conversationID", conversationID, "len", len(msg))
log.ZError(ctx, "unmarshall failed", err, "key", key, "len", len(msg))
return
}
if len(msgFromMQ.MsgData) == 0 {
+1 -1
View File
@@ -75,7 +75,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
go func(index int, userIDs []string) {
defer wg.Done()
if err2 := g.batchPush(ctx, token, userIDs, pushReq); err2 != nil {
log.NewError(mcontext.GetOperationID(ctx), "batchPush failed", i, token, pushReq)
log.ZError(ctx, "batchPush failed", err2, "index", index, "token", token, "req", pushReq)
err = err2
}
}(i, v.Item)
+6 -6
View File
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/fcm"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush/getui"
@@ -127,8 +128,7 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
}
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
operationID := mcontext.GetOperationID(ctx)
log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID)
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil && err != errs.ErrCallbackContinue {
return err
@@ -176,7 +176,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if err != nil {
return err
}
log.Debug(operationID, "push_result", wsResults, "sendData", msg)
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
p.successCount++
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush {
@@ -204,7 +204,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
log.Error(operationID, utils.GetSelfFuncName(), "GetRecvMsgNotNotifyUserIDs failed", groupID)
log.ZError(ctx, "GetRecvMsgNotNotifyUserIDs failed", err, "groupID", groupID)
return err
}
needOfflinePushUserIDs = utils.DifferenceString(notNotificationUserIDs, needOfflinePushUserIDs)
@@ -221,12 +221,12 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
}
err = p.offlinePushMsg(ctx, groupID, msg, offlinePushUserIDs)
if err != nil {
log.NewError(operationID, "offlinePushMsg failed", groupID)
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
_, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
if err != nil {
log.NewError(operationID, "offlinePushMsg failed", groupID)
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
return err
}
}
+2 -2
View File
@@ -1,6 +1,7 @@
package tools
import (
"context"
"fmt"
"sync"
@@ -12,8 +13,7 @@ import (
const moduleName = "cron"
func StartCronTask() error {
log.NewPrivateLog(moduleName)
log.NewInfo("StartCronTask", "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
log.ZInfo(context.Background(), "start cron task", "cron config", config.Config.Mongo.ChatRecordsClearTime)
fmt.Println("cron task start, config", config.Config.Mongo.ChatRecordsClearTime)
msgTool, err := InitMsgTool()
if err != nil {