* refactor: refactor workflows contents.

* add tool workflows.

* update field.

* fix: remove chat error.

* Fix err.

* fix error.

* remove cn comment.

* update workflows files.

* update infra config.

* move workflows.

* feat: update bot.

* fix: solve uncorrect outdated msg get.

* update get docIDs logic.

* update

* update skip logic.

* fix

* update.

* fix: delay deleteObject func.

* remove unused content.

* update log type.

* feat: implement request batch count limit.

* update

* update

* feat: add rocksTimeout

* feat: wrap logs

* feat: add logs

* feat: listen config

* feat: enable listen TIME_WAIT port

* feat: add logs

* feat: cache batch

* chore: enable fullUserCache

* feat: push rpc num

* feat: push err

* feat: with operationID

* feat: sleep

* feat: change 1s

* feat: change log

* feat: implement Getbatch in rpcCache.

* feat: print getOnline cost

* feat: change log

* feat: change kafka and push config

* feat: del interface

* feat: fix err

* feat: change config

* feat: go mod

* feat: change config

* feat: change config

* feat: add sleep in push

* feat: warn logs

* feat: logs

* feat: logs

* feat: change port

* feat: start config

* feat: remove port reuse

* feat: prometheus config

* feat: prometheus config

* feat: prometheus config

* feat: add long time send msg to grafana

* feat: init

* feat: init

* feat: implement offline push.

* feat: batch get user online

* feat: implement batch Push spilt

* update go mod

* Revert "feat: change port"

This reverts commit 06d5e944

* feat: change port

* feat: change config

* feat: implement kafka producer and consumer.

* update format,

* add PushMQ log.

* feat: get all online users and init push

* feat: lock in online cache

* feat: config

* fix: init online status

* fix: add logs

* fix: userIDs

* fix: add logs

* feat: update Handler logic.

* update MQ logic.

* update

* update

* fix: method name

* fix: update OfflinePushConsumerHandler.

* fix: prommetrics

* fix: add logs

* fix: ctx

* fix: log

* fix: config

* feat: change port

* fix: atomic online cache status

---------

Co-authored-by: Monet Lee <monet_lee@163.com>
This commit is contained in:
icey-yu
2024-09-12 10:38:17 +08:00
committed by GitHub
parent b703449615
commit 0276c7df60
43 changed files with 2639 additions and 2105 deletions
+43 -22
View File
@@ -27,6 +27,9 @@ import (
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"math/rand"
"strconv"
"time"
)
type ConsumerHandler struct {
@@ -55,6 +58,7 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
}
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
consumerHandler.offlinePusher = offlinePusher
consumerHandler.onlinePusher = NewOnlinePusher(client, config)
consumerHandler.groupRpcClient = rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
@@ -65,7 +69,10 @@ func NewConsumerHandler(config *Config, database controller.PushDatabase, offlin
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config
consumerHandler.pushDatabase = database
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
consumerHandler.onlineCache, err = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil)
if err != nil {
return nil, err
}
return &consumerHandler, nil
}
@@ -108,6 +115,14 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
c.onlineCache.Lock.Lock()
for c.onlineCache.CurrentPhase.Load() < rpccache.DoSubscribeOver {
c.onlineCache.Cond.Wait()
}
c.onlineCache.Lock.Unlock()
ctx := mcontext.SetOperationID(context.TODO(), strconv.FormatInt(time.Now().UnixNano()+int64(rand.Uint32()), 10))
log.ZInfo(ctx, "begin consume messages")
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
c.handleMs2PsChat(ctx, msg.Value)
@@ -118,20 +133,27 @@ func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim s
// Push2User Suitable for two types of conversations, one is SingleChatType and the other is NotificationChatType.
func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
defer func(duration time.Time) {
t := time.Since(duration)
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "msg", msg.String(), "time cost", t)
}(time.Now())
if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
return err
}
log.ZInfo(ctx, "webhookBeforeOnlinePush end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil {
return err
}
log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
log.ZInfo(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
if !c.shouldPushOffline(ctx, msg) {
return nil
}
log.ZInfo(ctx, "shouldPushOffline end")
for _, v := range wsResults {
//message sender do not need offline push
@@ -150,7 +172,7 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
offlinePushUserID, msg, nil); err != nil {
return err
}
log.ZInfo(ctx, "webhookBeforeOfflinePush end")
err = c.offlinePushMsg(ctx, msg, offlinePushUserID)
if err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "offlinePushUserID", offlinePushUserID, "msg", msg)
@@ -172,21 +194,11 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat
}
func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) {
var (
onlineUserIDs []string
offlineUserIDs []string
)
for _, userID := range pushToUserIDs {
online, err := c.onlineCache.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
} else {
offlineUserIDs = append(offlineUserIDs, userID)
}
onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs)
if err != nil {
return nil, err
}
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 {
@@ -205,35 +217,42 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
}
func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
log.ZInfo(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
defer func(duration time.Time) {
t := time.Since(duration)
log.ZInfo(ctx, "Get group msg from msg_transfer and push msg end", "msg", msg.String(), "groupID", groupID, "time cost", t)
}(time.Now())
var pushToUserIDs []string
if err = c.webhookBeforeGroupOnlinePush(ctx, &c.config.WebhooksConfig.BeforeGroupOnlinePush, groupID, msg,
&pushToUserIDs); err != nil {
return err
}
log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end")
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
if err != nil {
return err
}
log.ZInfo(ctx, "groupMessagesHandler end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
return err
}
log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg)
log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg)
if !c.shouldPushOffline(ctx, msg) {
return nil
}
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end")
//filter some user, like don not disturb or don't need offline push etc.
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
if err != nil {
return err
}
log.ZInfo(ctx, "filterGroupMessageOfflinePush end")
// Use offline push messaging
if len(needOfflinePushUserIDs) > 0 {
@@ -295,7 +314,7 @@ func (c *ConsumerHandler) groupMessagesHandler(ctx context.Context, groupID stri
if unmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs)
log.ZDebug(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(*pushToUserIDs), "list", pushToUserIDs)
if len(c.config.Share.IMAdminUserID) > 0 {
ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0])
}
@@ -379,6 +398,7 @@ func (c *ConsumerHandler) getOfflinePushInfos(msg *sdkws.MsgData) (title, conten
}
return
}
func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := c.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
@@ -387,6 +407,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context,
}
return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq)
}
func unmarshalNotificationElem(bytes []byte, t any) error {
var notification sdkws.NotificationElem
if err := json.Unmarshal(bytes, &notification); err != nil {