mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 14:29:19 +08:00
feat: add long time push msg in prometheus (#2584)
* feat: add long time push msg in prometheus * fix: log print * fix: go mod * fix: log msg * fix: log init * feat: push msg * feat: go mod ,remove cgo package * feat: remove error log * feat: test dummy push * feat:redis pool config * feat: push to kafka log
This commit is contained in:
@@ -237,6 +237,10 @@ func (och *OnlineHistoryRedisConsumerHandler) categorizeMessageLists(totalMsgs [
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key, conversationID string, storageList, notStorageList []*ContextMsg) {
|
||||
for _, storageMsg := range storageList {
|
||||
log.ZDebug(ctx, "handle storage msg", "msg", storageMsg.message.String())
|
||||
}
|
||||
|
||||
och.toPushTopic(ctx, key, conversationID, notStorageList)
|
||||
var storageMessageList []*sdkws.MsgData
|
||||
for _, msg := range storageList {
|
||||
@@ -311,8 +315,9 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
||||
}
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(_ context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||
for _, v := range msgs {
|
||||
log.ZDebug(ctx, "push msg to topic", "msg", v.message.String())
|
||||
och.msgDatabase.MsgToPushMQ(v.ctx, key, conversationID, v.message)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ package dummy
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
"github.com/openimsdk/tools/log"
|
||||
)
|
||||
|
||||
func NewClient() *Dummy {
|
||||
@@ -27,5 +28,6 @@ type Dummy struct {
|
||||
}
|
||||
|
||||
func (d *Dummy) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
|
||||
log.ZInfo(ctx, "dummy push")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -93,7 +93,8 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
|
||||
nowSec := timeutil.GetCurrentTimestampBySecond()
|
||||
|
||||
if nowSec-sec > 10 {
|
||||
log.ZWarn(ctx, "long time push msg", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
|
||||
prommetrics.MsgLoneTimePushCounter.Inc()
|
||||
log.ZWarn(ctx, "it’s been a while since the message was sent", nil, "msg", pbData.String(), "sec", sec, "nowSec", nowSec, "nowSec-sec", nowSec-sec)
|
||||
}
|
||||
var err error
|
||||
switch msgFromMQ.MsgData.SessionType {
|
||||
|
||||
Reference in New Issue
Block a user