style: add format

Signed-off-by: Xinwei Xiong(cubxxw-openim) <3293172751nss@gmail.com>
This commit is contained in:
Xinwei Xiong(cubxxw-openim)
2023-07-03 16:29:22 +08:00
parent 187ee9a375
commit 166ddb1e34
170 changed files with 4816 additions and 1279 deletions
@@ -9,6 +9,10 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/Shopify/sarama"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -18,9 +22,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/Shopify/sarama"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
)
const ConsumerMsgs = 3
@@ -63,7 +64,11 @@ type OnlineHistoryRedisConsumerHandler struct {
groupRpcClient *rpcclient.GroupRpcClient
}
func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *OnlineHistoryRedisConsumerHandler {
func NewOnlineHistoryRedisConsumerHandler(
database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient,
) *OnlineHistoryRedisConsumerHandler {
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel
@@ -77,7 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(database controller.CommonMsgDatabase,
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
//statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och
}
@@ -90,16 +96,53 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
msgChannelValue := cmd.Value.(MsgChannelValue)
ctxMsgList := msgChannelValue.ctxMsgList
ctx := msgChannelValue.ctx
log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "uniqueKey", msgChannelValue.uniqueKey)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(ctxMsgList)
log.ZDebug(ctx, "msg lens", "storageMsgList", len(storageMsgList), "notStorageMsgList", len(notStorageMsgList),
"storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList), "modifyMsgList", len(modifyMsgList))
log.ZDebug(
ctx,
"msg arrived channel",
"channel id",
channelID,
"msgList length",
len(ctxMsgList),
"uniqueKey",
msgChannelValue.uniqueKey,
)
storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(
ctxMsgList,
)
log.ZDebug(
ctx,
"msg lens",
"storageMsgList",
len(storageMsgList),
"notStorageMsgList",
len(notStorageMsgList),
"storageNotificationList",
len(storageNotificationList),
"notStorageNotificationList",
len(notStorageNotificationList),
"modifyMsgList",
len(modifyMsgList),
)
conversationIDMsg := utils.GetChatConversationIDByMsg(ctxMsgList[0].message)
conversationIDNotification := utils.GetNotificationConversationID(ctxMsgList[0].message)
och.handleMsg(ctx, msgChannelValue.uniqueKey, conversationIDMsg, storageMsgList, notStorageMsgList)
och.handleNotification(ctx, msgChannelValue.uniqueKey, conversationIDNotification, storageNotificationList, notStorageNotificationList)
och.handleNotification(
ctx,
msgChannelValue.uniqueKey,
conversationIDNotification,
storageNotificationList,
notStorageNotificationList,
)
if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.uniqueKey, conversationIDNotification, modifyMsgList); err != nil {
log.ZError(ctx, "msg to modify mq error", err, "uniqueKey", msgChannelValue.uniqueKey, "modifyMsgList", modifyMsgList)
log.ZError(
ctx,
"msg to modify mq error",
err,
"uniqueKey",
msgChannelValue.uniqueKey,
"modifyMsgList",
modifyMsgList,
)
}
}
}
@@ -107,7 +150,9 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
}
// 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(
totalMsgs []*ContextMsg,
) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) {
isStorage := func(msg *sdkws.MsgData) bool {
options2 := utils.Options(msg.Options)
if options2.IsHistory() {
@@ -130,11 +175,17 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []
msg.Options = utils.NewMsgOptions()
}
if options.IsOfflinePush() {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithOfflinePush(false))
v.message.Options = utils.WithOptions(
utils.Options(v.message.Options),
utils.WithOfflinePush(false),
)
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithOfflinePush(true))
}
if options.IsUnreadCount() {
v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithUnreadCount(false))
v.message.Options = utils.WithOptions(
utils.Options(v.message.Options),
utils.WithUnreadCount(false),
)
msg.Options = utils.WithOptions(utils.Options(msg.Options), utils.WithUnreadCount(true))
}
storageMsgList = append(storageMsgList, msg)
@@ -151,19 +202,32 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(totalMsgs []
notStorageMsgList = append(notStorageMsgList, v.message)
}
}
if v.message.ContentType == constant.ReactionMessageModifier || v.message.ContentType == constant.ReactionMessageDeleter {
if v.message.ContentType == constant.ReactionMessageModifier ||
v.message.ContentType == constant.ReactionMessageDeleter {
modifyMsgList = append(modifyMsgList, v.message)
}
}
return
}
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
ctx context.Context,
key, conversationID string,
storageList, notStorageList []*sdkws.MsgData,
) {
och.toPushTopic(ctx, key, conversationID, notStorageList)
if len(storageList) > 0 {
lastSeq, _, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
if err != nil {
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID, "storageList", storageList)
log.ZError(
ctx,
"notification batch insert to redis error",
err,
"conversationID",
conversationID,
"storageList",
storageList,
)
return
}
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
@@ -172,13 +236,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
}
}
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*sdkws.MsgData) {
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(
ctx context.Context,
key, conversationID string,
msgs []*sdkws.MsgData,
) {
for _, v := range msgs {
och.msgDatabase.MsgToPushMQ(ctx, key, conversationID, v)
}
}
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*sdkws.MsgData) {
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
ctx context.Context,
key, conversationID string,
storageList, notStorageList []*sdkws.MsgData,
) {
och.toPushTopic(ctx, key, conversationID, notStorageList)
if len(storageList) > 0 {
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
@@ -240,11 +312,26 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
for i, header := range consumerMessages[i].Headers {
arr = append(arr, strconv.Itoa(i), string(header.Key), string(header.Value))
}
log.ZInfo(ctx, "consumer.kafka.GetContextWithMQHeader", "len", len(consumerMessages[i].Headers), "header", strings.Join(arr, ", "))
log.ZInfo(
ctx,
"consumer.kafka.GetContextWithMQHeader",
"len",
len(consumerMessages[i].Headers),
"header",
strings.Join(arr, ", "),
)
ctxMsg.ctx = kafka.GetContextWithMQHeader(consumerMessages[i].Headers)
ctxMsg.message = msgFromMQ
log.ZDebug(ctx, "single msg come to distribution center", "message", msgFromMQ, "key", string(consumerMessages[i].Key))
//aggregationMsgs[string(consumerMessages[i].Key)] = append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
log.ZDebug(
ctx,
"single msg come to distribution center",
"message",
msgFromMQ,
"key",
string(consumerMessages[i].Key),
)
// aggregationMsgs[string(consumerMessages[i].Key)] =
// append(aggregationMsgs[string(consumerMessages[i].Key)], ctxMsg)
if oldM, ok := aggregationMsgs[string(consumerMessages[i].Key)]; ok {
oldM = append(oldM, ctxMsg)
aggregationMsgs[string(consumerMessages[i].Key)] = oldM
@@ -260,7 +347,16 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
hashCode := utils.GetHashCode(uniqueKey)
channelID := hashCode % ChannelNum
newCtx := withAggregationCtx(ctx, v)
log.ZDebug(newCtx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "uniqueKey", uniqueKey)
log.ZDebug(
newCtx,
"generate channelID",
"hashCode",
hashCode,
"channelID",
channelID,
"uniqueKey",
uniqueKey,
)
och.chArrays[channelID] <- Cmd2Value{Cmd: SourceMessages, Value: MsgChannelValue{uniqueKey: uniqueKey, ctxMsgList: v, ctx: newCtx}}
}
}
@@ -288,7 +384,10 @@ func (och *OnlineHistoryRedisConsumerHandler) Cleanup(_ sarama.ConsumerGroupSess
return nil
}
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group
func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error { // a instance in the consumer group
for {
if sess == nil {
log.ZWarn(context.Background(), "sess == nil, waiting", nil)