Merge branch 'errcode' of github.com:OpenIMSDK/Open-IM-Server into errcode

This commit is contained in:
wangchuxiao
2023-03-23 19:05:20 +08:00
70 changed files with 1040 additions and 535 deletions
+4 -4
View File
@@ -6,7 +6,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tracelog"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
)
@@ -23,7 +23,7 @@ func callbackOfflinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDa
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackOfflinePushCommand,
OperationID: tracelog.GetOperationID(ctx),
OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
},
@@ -60,7 +60,7 @@ func callbackOnlinePush(ctx context.Context, userIDs []string, msg *sdkws.MsgDat
UserStatusBatchCallbackReq: callbackstruct.UserStatusBatchCallbackReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackOnlinePushCommand,
OperationID: tracelog.GetOperationID(ctx),
OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
},
@@ -85,7 +85,7 @@ func callbackBeforeSuperGroupOnlinePush(ctx context.Context, groupID string, msg
req := callbackstruct.CallbackBeforeSuperGroupOnlinePushReq{
UserStatusBaseCallback: callbackstruct.UserStatusBaseCallback{
CallbackCommand: constant.CallbackSuperGroupOnlinePushCommand,
OperationID: tracelog.GetOperationID(ctx),
OperationID: mcontext.GetOperationID(ctx),
PlatformID: int(msg.SenderPlatformID),
Platform: constant.PlatformIDToName(int(msg.SenderPlatformID)),
},
+1 -11
View File
@@ -1,17 +1,7 @@
/*
** description("").
** copyright('open-im,www.open-im.io').
** author("fg,Gordon@open-im.io").
** time(2021/3/22 15:33).
*/
package push
import (
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/statistics"
)
type Consumer struct {
@@ -31,6 +21,6 @@ func (c *Consumer) initPrometheus() {
}
func (c *Consumer) Start() {
statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
//statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&c.pushCh)
}
+3 -3
View File
@@ -6,7 +6,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
http2 "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tracelog"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils/splitter"
"github.com/go-redis/redis/v8"
"sync"
@@ -71,7 +71,7 @@ func (g *Client) Push(ctx context.Context, userIDs []string, title, content stri
go func(index int, userIDs []string) {
defer wg.Done()
if err2 := g.batchPush(ctx, token, userIDs, pushReq); err2 != nil {
log.NewError(tracelog.GetOperationID(ctx), "batchPush failed", i, token, pushReq)
log.NewError(mcontext.GetOperationID(ctx), "batchPush failed", i, token, pushReq)
err = err2
}
}(i, v.Item)
@@ -132,7 +132,7 @@ func (g *Client) batchPush(ctx context.Context, token string, userIDs []string,
}
func (g *Client) singlePush(ctx context.Context, token, userID string, pushReq PushReq) error {
operationID := tracelog.GetOperationID(ctx)
operationID := mcontext.GetOperationID(ctx)
pushReq.RequestID = &operationID
pushReq.Audience = &Audience{Alias: []string{userID}}
return g.request(ctx, pushURL, pushReq, token, nil)
+3 -13
View File
@@ -1,9 +1,3 @@
/*
** description("").
** copyright('OpenIM,www.OpenIM.io').
** author("fg,Gordon@tuoyun.net").
** time(2021/5/13 10:33).
*/
package push
import (
@@ -13,7 +7,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tracelog"
pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@@ -36,10 +29,9 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
}
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
log.Error("", "push Unmarshal msg err", "msg", string(msg), "err", err.Error())
log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg))
return
}
pbData := &pbPush.PushMsgReq{
@@ -51,7 +43,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
if nowSec-sec > 10 {
return
}
tracelog.SetOperationID(ctx, "")
var err error
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
@@ -60,7 +51,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
if err != nil {
log.NewError("", "push failed", pbData)
log.ZError(ctx, "push failed", err, "msg", pbData.String())
}
}
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
@@ -68,8 +59,7 @@ func (ConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
ctx := c.pushConsumerGroup.GetContextFromMsg(msg, "push consumer")
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
c.handleMs2PsChat(ctx, msg.Value)
sess.MarkMessage(msg, "")
}
-1
View File
@@ -22,7 +22,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
return err
}
cacheModel := cache.NewCacheModel(rdb)
offlinePusher := NewOfflinePusher(cacheModel)
database := controller.NewPushDatabase(cacheModel)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client))
+6 -12
View File
@@ -1,9 +1,3 @@
/*
** description("").
** copyright('open-im,www.open-im.io').
** author("fg,Gordon@open-im.io").
** time(2021/3/5 14:31).
*/
package push
import (
@@ -19,8 +13,8 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tracelog"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
@@ -64,9 +58,8 @@ func NewOfflinePusher(cache cache.Model) offlinepush.OfflinePusher {
}
func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgData) error {
operationID := tracelog.GetOperationID(ctx)
var userIDs = []string{userID}
log.Debug(operationID, "Get msg from msg_transfer And push msg", msg.String(), userID)
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userID", userID, "msg", msg.String())
// callback
if err := callbackOnlinePush(ctx, userIDs, msg); err != nil && err != errs.ErrCallbackContinue {
return err
@@ -77,7 +70,8 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat
return err
}
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
log.NewInfo(operationID, "push_result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush)
//log.NewInfo(operationID, "push_result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userID)
p.successCount++
if isOfflinePush && userID != msg.SendID {
// save invitation info for offline push
@@ -107,7 +101,7 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat
}
func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
operationID := tracelog.GetOperationID(ctx)
operationID := mcontext.GetOperationID(ctx)
log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID)
var pushToUserIDs []string
if err := callbackBeforeSuperGroupOnlinePush(ctx, groupID, msg, &pushToUserIDs); err != nil && err != errs.ErrCallbackContinue {
@@ -183,6 +177,7 @@ func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *s
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
conns, err := p.client.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil {
return nil, err
}
@@ -191,7 +186,6 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
msgClient := msggateway.NewMsgGatewayClient(v)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
if err != nil {
log.NewError(tracelog.GetOperationID(ctx), msg, len(pushToUserIDs), "err", err)
continue
}
if reply != nil && reply.SinglePushResult != nil {