mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-10 03:56:00 +08:00
Merge remote-tracking branch 'origin/errcode' into errcode
This commit is contained in:
@@ -63,9 +63,9 @@ type CommonMsgDatabase interface {
|
||||
|
||||
// to mq
|
||||
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
|
||||
MsgToModifyMQ(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) error
|
||||
MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, conversationID string, msgs []*sdkws.MsgData, lastSeq int64) error
|
||||
MsgToModifyMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData) error
|
||||
MsgToPushMQ(ctx context.Context, key, conversarionID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
MsgToMongoMQ(ctx context.Context, key, conversarionID string, msgs []*sdkws.MsgData, lastSeq int64) error
|
||||
|
||||
// modify
|
||||
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
|
||||
@@ -114,26 +114,26 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd
|
||||
return err
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error {
|
||||
func (db *commonMsgDatabase) MsgToModifyMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
_, _, err := db.producerToModify.SendMessage(ctx, key, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
partition, offset, err := db.producerToPush.SendMessage(ctx, conversationID, &pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||
func (db *commonMsgDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToPushMQ", err, "key", conversationID, "msg2mq", msg2mq)
|
||||
log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq)
|
||||
return 0, 0, err
|
||||
}
|
||||
return partition, offset, nil
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
func (db *commonMsgDatabase) MsgToMongoMQ(ctx context.Context, key, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error {
|
||||
if len(messages) > 0 {
|
||||
_, _, err := db.producerToMongo.SendMessage(ctx, conversationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
_, _, err := db.producerToMongo.SendMessage(ctx, key, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages})
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
)
|
||||
|
||||
@@ -28,14 +29,12 @@ func NewKafkaConsumer(addr []string, topic string) *Consumer {
|
||||
consumer, err := sarama.NewConsumer(p.addr, consumerConfig)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
return nil
|
||||
}
|
||||
p.Consumer = consumer
|
||||
|
||||
partitionList, err := consumer.Partitions(p.Topic)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
return nil
|
||||
}
|
||||
p.PartitionList = partitionList
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ package kafka
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
@@ -17,7 +16,7 @@ import (
|
||||
prome "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
)
|
||||
|
||||
var emptyMsg = errors.New("binary msg is empty")
|
||||
var errEmptyMsg = errors.New("binary msg is empty")
|
||||
|
||||
type Producer struct {
|
||||
topic string
|
||||
@@ -40,15 +39,14 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
||||
}
|
||||
p.addr = addr
|
||||
p.topic = topic
|
||||
|
||||
producer, err := sarama.NewSyncProducer(p.addr, p.config) //Initialize the client
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
return nil
|
||||
}
|
||||
p.producer = producer
|
||||
return &p
|
||||
}
|
||||
|
||||
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
|
||||
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
|
||||
if err != nil {
|
||||
@@ -60,6 +58,7 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)
|
||||
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
|
||||
{Key: []byte(constant.ConnID), Value: []byte(connID)}}, err
|
||||
}
|
||||
|
||||
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||
var values []string
|
||||
for _, recordHeader := range header {
|
||||
@@ -67,31 +66,28 @@ func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
|
||||
}
|
||||
return mcontext.WithMustInfoCtx(values) // TODO
|
||||
}
|
||||
func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) {
|
||||
log.ZDebug(ctx, "SendMessage", "key ", key, "msg", m, "topic", p.topic)
|
||||
|
||||
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
|
||||
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
|
||||
kMsg := &sarama.ProducerMessage{}
|
||||
kMsg.Topic = p.topic
|
||||
kMsg.Key = sarama.StringEncoder(key)
|
||||
bMsg, err := proto.Marshal(m)
|
||||
bMsg, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
|
||||
}
|
||||
if len(bMsg) == 0 {
|
||||
return 0, 0, utils.Wrap(emptyMsg, "")
|
||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||
}
|
||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
||||
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||
return 0, 0, utils.Wrap(emptyMsg, "")
|
||||
return 0, 0, utils.Wrap(errEmptyMsg, "")
|
||||
}
|
||||
kMsg.Metadata = ctx
|
||||
header, err := GetMQHeaderWithContext(ctx)
|
||||
if err != nil {
|
||||
return 0, 0, utils.Wrap(err, "")
|
||||
}
|
||||
var arr []string
|
||||
for i, header := range header {
|
||||
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
|
||||
}
|
||||
kMsg.Headers = header
|
||||
partition, offset, err := p.producer.SendMessage(kMsg)
|
||||
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
|
||||
|
||||
Reference in New Issue
Block a user