msg update

This commit is contained in:
Gordon
2023-03-21 12:28:21 +08:00
parent 7eb28e7d0a
commit c036d27b36
40 changed files with 215 additions and 188 deletions
+3 -3
View File
@@ -9,7 +9,7 @@ package kafka
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tracelog"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/Shopify/sarama"
)
@@ -43,14 +43,14 @@ func NewMConsumerGroup(consumerConfig *MConsumerGroupConfig, topics, addrs []str
}
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage, rootFuncName string) context.Context {
ctx := tracelog.NewCtx(rootFuncName)
ctx := mcontext.NewCtx(rootFuncName)
var operationID string
for _, v := range cMsg.Headers {
if string(v.Key) == constant.OperationID {
operationID = string(v.Value)
}
}
tracelog.SetOperationID(ctx, operationID)
mcontext.SetOperationID(ctx, operationID)
return ctx
}
+17 -12
View File
@@ -6,7 +6,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
log "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tracelog"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
@@ -15,6 +15,8 @@ import (
prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
)
var emptyMsg = errors.New("binary msg is empty")
type Producer struct {
topic string
addr []string
@@ -47,30 +49,33 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
}
func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) {
operationID := tracelog.GetOperationID(ctx)
log.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m.String())
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
kMsg.Key = sarama.StringEncoder(key)
bMsg, err := proto.Marshal(m)
if err != nil {
log.Error(operationID, "", "proto marshal err = %s", err.Error())
return -1, -1, err
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
}
if len(bMsg) == 0 {
log.Error(operationID, "len(bMsg) == 0 ")
return 0, 0, errors.New("len(bMsg) == 0 ")
return 0, 0, utils.Wrap(emptyMsg, "")
}
kMsg.Value = sarama.ByteEncoder(bMsg)
log.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length())
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
log.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
return -1, -1, errors.New("key or value == 0")
return 0, 0, utils.Wrap(emptyMsg, "")
}
kMsg.Metadata = ctx
kMsg.Headers = []sarama.RecordHeader{{Key: []byte(constant.OperationID), Value: []byte(operationID)}}
operationID, opUserID, platform, connID, err := mcontext.GetMustCtxInfo(ctx)
if err != nil {
return 0, 0, utils.Wrap(err, "")
}
kMsg.Headers = []sarama.RecordHeader{
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
{Key: []byte(constant.ConnID), Value: []byte(connID)}}
partition, offset, err := p.producer.SendMessage(kMsg)
log.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer)
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), "key length", kMsg.Value.Length())
if err == nil {
prome.Inc(prome.SendMsgCounter)
}