feat: add msgDBSave webhook when data save to DB. (#3578)

* feat: add msgDBSave webhook when data save to DB.

* update callback args condition.

* remove unused contents.
This commit is contained in:
Monet Lee
2025-11-04 12:18:06 +08:00
committed by GitHub
parent 7d6682ca4b
commit b36f00f2ad
8 changed files with 68 additions and 70 deletions
+10 -23
View File
@@ -51,37 +51,24 @@ func GetContent(msg *sdkws.MsgData) string {
}
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendSingleMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterMsgSaveDB(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if !filterAfterMsg(msg, after) {
return
}
cbReq := &cbapi.CallbackAfterSendSingleMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendSingleMsgCommand),
RecvID: msg.RecvID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendSingleMsgResp{}, after, buildKeyMsgDataQuery(msg))
}
func (mc *OnlineHistoryMongoConsumerHandler) webhookAfterSendGroupMsg(ctx context.Context, after *config.AfterConfig, msg *sdkws.MsgData) {
if msg.ContentType == constant.Typing {
return
cbReq := &cbapi.CallbackAfterMsgSaveDBReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterMsgSaveDBCommand),
}
if !filterAfterMsg(msg, after) {
return
switch msg.SessionType {
case constant.SingleChatType, constant.NotificationChatType:
cbReq.RecvID = msg.RecvID
case constant.ReadGroupChatType:
cbReq.GroupID = msg.GroupID
default:
}
cbReq := &cbapi.CallbackAfterSendGroupMsgReq{
CommonCallbackReq: toCommonCallback(ctx, msg, cbapi.CallbackAfterSendGroupMsgCommand),
GroupID: msg.GroupID,
}
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterSendGroupMsgResp{}, after, buildKeyMsgDataQuery(msg))
mc.webhookClient.AsyncPostWithQuery(ctx, cbReq.GetCallbackCommand(), cbReq, &cbapi.CallbackAfterMsgSaveDBResp{}, after, buildKeyMsgDataQuery(msg))
}
func buildKeyMsgDataQuery(msg *sdkws.MsgData) map[string]string {
@@ -15,7 +15,6 @@
package msgtransfer
import (
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mq"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
@@ -57,7 +56,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
log.ZDebug(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.String())
err = mc.msgTransferDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq)
if err != nil {
log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
log.ZError(ctx, "batch data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID)
prommetrics.MsgInsertMongoFailedCounter.Inc()
} else {
prommetrics.MsgInsertMongoSuccessCounter.Inc()
@@ -65,12 +64,7 @@ func (mc *OnlineHistoryMongoConsumerHandler) HandleChatWs2Mongo(val mq.Message)
}
for _, msgData := range msgFromMQ.MsgData {
switch msgData.SessionType {
case constant.SingleChatType:
mc.webhookAfterSendSingleMsg(ctx, &mc.config.WebhooksConfig.AfterSendSingleMsg, msgData)
case constant.ReadGroupChatType:
mc.webhookAfterSendGroupMsg(ctx, &mc.config.WebhooksConfig.AfterSendGroupMsg, msgData)
}
mc.webhookAfterMsgSaveDB(ctx, &mc.config.WebhooksConfig.AfterMsgSaveDB, msgData)
}
//var seqs []int64