msg update

This commit is contained in:
Gordon
2023-03-22 18:35:21 +08:00
parent b07bb1765e
commit 11ebe4e234
7 changed files with 134 additions and 81 deletions
+3 -5
View File
@@ -6,7 +6,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbChat "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@@ -29,10 +28,10 @@ func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
}
func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
log.NewDebug("", "msg come from kafka And push!!!", "msg", string(msg))
log.ZDebug(ctx, "msg come from kafka And push!!!", "msg", string(msg))
msgFromMQ := pbChat.PushMsgDataToMQ{}
if err := proto.Unmarshal(msg, &msgFromMQ); err != nil {
log.Error("", "push Unmarshal msg err", "msg", string(msg), "err", err.Error())
log.ZError(ctx, "push Unmarshal msg err", err, "msg", string(msg))
return
}
pbData := &pbPush.PushMsgReq{
@@ -44,7 +43,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
if nowSec-sec > 10 {
return
}
mcontext.SetOperationID(ctx, "")
var err error
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
@@ -53,7 +51,7 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
err = c.pusher.MsgToUser(ctx, pbData.SourceID, pbData.MsgData)
}
if err != nil {
log.NewError("", "push failed", *pbData)
log.ZError(ctx, "push failed", err, "msg", pbData.String())
}
}
func (ConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }