feat: Implement webhook in createConversation (#3228)

* update test method args.

* feat: implement createConversations webhook function.

* improve webhookCreateConversations Implement

* implement createconversation webhook.

* remove unused paramaters.
This commit is contained in:
Monet Lee
2025-03-14 16:46:29 +08:00
committed by GitHub
parent 0b9dbd301c
commit b969827b9a
10 changed files with 411 additions and 131 deletions
+117
View File
@@ -0,0 +1,117 @@
package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/tools/utils/datautil"
)
func (c *conversationServer) webhookBeforeCreateSingleChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeCreateSingleChatConversationsReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateSingleChatConversationsCommand,
OwnerUserID: req.OwnerUserID,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
UserID: req.UserID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
resp := &callbackstruct.CallbackBeforeCreateSingleChatConversationsResp{}
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
datautil.NotNilReplace(&req.Ex, resp.Ex)
return nil
})
}
func (c *conversationServer) webhookAfterCreateSingleChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
cbReq := &callbackstruct.CallbackAfterCreateSingleChatConversationsReq{
CallbackCommand: callbackstruct.CallbackAfterCreateSingleChatConversationsCommand,
OwnerUserID: req.OwnerUserID,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
UserID: req.UserID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateSingleChatConversationsResp{}, after)
return nil
}
func (c *conversationServer) webhookBeforeCreateGroupChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeCreateGroupChatConversationsReq{
CallbackCommand: callbackstruct.CallbackBeforeCreateGroupChatConversationsCommand,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
GroupID: req.GroupID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
resp := &callbackstruct.CallbackBeforeCreateGroupChatConversationsResp{}
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
datautil.NotNilReplace(&req.Ex, resp.Ex)
return nil
})
}
func (c *conversationServer) webhookAfterCreateGroupChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
cbReq := &callbackstruct.CallbackAfterCreateGroupChatConversationsReq{
CallbackCommand: callbackstruct.CallbackAfterCreateGroupChatConversationsCommand,
ConversationID: req.ConversationID,
ConversationType: req.ConversationType,
GroupID: req.GroupID,
RecvMsgOpt: req.RecvMsgOpt,
IsPinned: req.IsPinned,
IsPrivateChat: req.IsPrivateChat,
BurnDuration: req.BurnDuration,
GroupAtType: req.GroupAtType,
AttachedInfo: req.AttachedInfo,
Ex: req.Ex,
}
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupChatConversationsResp{}, after)
return nil
}
+57 -18
View File
@@ -30,6 +30,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/constant"
@@ -49,9 +50,10 @@ type conversationServer struct {
conversationNotificationSender *ConversationNotificationSender
config *Config
userClient *rpcli.UserClient
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
webhookClient *webhook.Client
userClient *rpcli.UserClient
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
}
type Config struct {
@@ -60,6 +62,7 @@ type Config struct {
MongodbConfig config.Mongo
NotificationConfig config.Notification
Share config.Share
WebhooksConfig config.Webhooks
LocalCacheConfig config.LocalCache
Discovery config.Discovery
}
@@ -90,16 +93,25 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
if err != nil {
return err
}
msgClient := rpcli.NewMsgClient(msgConn)
cs := conversationServer{
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
}
cs.conversationNotificationSender = NewConversationNotificationSender(&config.NotificationConfig, msgClient)
cs.conversationDatabase = controller.NewConversationDatabase(
conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB),
mgocli.GetTx())
localcache.InitLocalCache(&config.LocalCacheConfig)
pbconversation.RegisterConversationServer(server, &conversationServer{
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
conversationDatabase: controller.NewConversationDatabase(conversationDB,
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()),
userClient: rpcli.NewUserClient(userConn),
groupClient: rpcli.NewGroupClient(groupConn),
msgClient: msgClient,
})
pbconversation.RegisterConversationServer(server, &cs)
return nil
}
@@ -326,49 +338,76 @@ func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
req *pbconversation.CreateSingleChatConversationsReq,
) (*pbconversation.CreateSingleChatConversationsResp, error) {
var conversation dbModel.Conversation
switch req.ConversationType {
case constant.SingleChatType:
var conversation dbModel.Conversation
// sendUser create
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.SendID
conversation.UserID = req.RecvID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
}
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
// recvUser create
conversation2 := conversation
conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation2)
case constant.NotificationChatType:
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
}
return &pbconversation.CreateSingleChatConversationsResp{}, nil
}
func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) {
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs)
var conversation dbModel.Conversation
conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
conversation.GroupID = req.GroupID
conversation.ConversationType = constant.ReadGroupChatType
if err := c.webhookBeforeCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateGroupChatConversations, &conversation); err != nil {
return nil, err
}
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs, &conversation)
if err != nil {
return nil, err
}
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
return nil, err
}
c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation)
return &pbconversation.CreateGroupChatConversationsResp{}, nil
}
+1 -1
View File
@@ -46,7 +46,7 @@ func TestName(t *testing.T) {
srv := &cronServer{
ctx: ctx,
config: &CronTaskConfig{
config: &Config{
CronTask: config.CronTask{
RetainChatRecords: 1,
FileExpireTime: 1,