mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-07 18:45:58 +08:00
feat: add rpc interface permission check (#3366)
* pb
* fix: Modifying other fields while setting IsPrivateChat does not take effect
* fix: quote message error revoke
* refactoring scheduled tasks
* refactoring scheduled tasks
* refactoring scheduled tasks
* refactoring scheduled tasks
* refactoring scheduled tasks
* refactoring scheduled tasks
* upgrading pkg tools
* fix
* fix
* optimize log output
* feat: support GetLastMessage
* feat: support GetLastMessage
* feat: s3 switch
* feat: s3 switch
* fix: GetUsersOnline
* feat: SendBusinessNotification supported configuration parameters
* feat: SendBusinessNotification supported configuration parameters
* feat: SendBusinessNotification supported configuration parameters
* feat: seq conversion failed without exiting
* fix: DeleteDoc crash
* fix: fill send time
* fix: fill send time
* fix: crash caused by withdrawing messages from users who have left the group
* fix: user msg timestamp
* seq read config
* seq read config
* fix: the source message of the reference is withdrawn, and the referenced message is deleted
* feat: optimize the default notification.yml
* fix: shouldPushOffline
* fix: the sorting is wrong after canceling the administrator in group settings
* feat: Sending messages supports returning fields modified by webhook
* feat: Sending messages supports returning fields modified by webhook
* feat: Sending messages supports returning fields modified by webhook
* fix: oss specifies content-type when uploading
* fix: the version number contains a line break
* fix: the version number contains a line break
* feat: GetConversationsHasReadAndMaxSeq support pinned
* feat: GetConversationsHasReadAndMaxSeq support pinned
* feat: GetConversationsHasReadAndMaxSeq support pinned
* fix: transferring the group owner to a muted member, incremental version error
* feat: unified conversion code
* feat: update gomake
* fix: in standalone mode, the user online status is wrong
* fix: add permission check
* fix: add permission check
(cherry picked from commit 748d783d36)
# Conflicts:
# internal/rpc/conversation/conversation.go
# internal/rpc/group/cache.go
# internal/rpc/group/statistics.go
# internal/rpc/msg/send.go
This commit is contained in:
@@ -0,0 +1,117 @@
|
||||
package conversation
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
)
|
||||
|
||||
func (c *conversationServer) webhookBeforeCreateSingleChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
|
||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||
cbReq := &callbackstruct.CallbackBeforeCreateSingleChatConversationsReq{
|
||||
CallbackCommand: callbackstruct.CallbackBeforeCreateSingleChatConversationsCommand,
|
||||
OwnerUserID: req.OwnerUserID,
|
||||
ConversationID: req.ConversationID,
|
||||
ConversationType: req.ConversationType,
|
||||
UserID: req.UserID,
|
||||
RecvMsgOpt: req.RecvMsgOpt,
|
||||
IsPinned: req.IsPinned,
|
||||
IsPrivateChat: req.IsPrivateChat,
|
||||
BurnDuration: req.BurnDuration,
|
||||
GroupAtType: req.GroupAtType,
|
||||
AttachedInfo: req.AttachedInfo,
|
||||
Ex: req.Ex,
|
||||
}
|
||||
|
||||
resp := &callbackstruct.CallbackBeforeCreateSingleChatConversationsResp{}
|
||||
|
||||
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
|
||||
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
|
||||
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
|
||||
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
|
||||
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
|
||||
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
|
||||
datautil.NotNilReplace(&req.Ex, resp.Ex)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *conversationServer) webhookAfterCreateSingleChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
|
||||
cbReq := &callbackstruct.CallbackAfterCreateSingleChatConversationsReq{
|
||||
CallbackCommand: callbackstruct.CallbackAfterCreateSingleChatConversationsCommand,
|
||||
OwnerUserID: req.OwnerUserID,
|
||||
ConversationID: req.ConversationID,
|
||||
ConversationType: req.ConversationType,
|
||||
UserID: req.UserID,
|
||||
RecvMsgOpt: req.RecvMsgOpt,
|
||||
IsPinned: req.IsPinned,
|
||||
IsPrivateChat: req.IsPrivateChat,
|
||||
BurnDuration: req.BurnDuration,
|
||||
GroupAtType: req.GroupAtType,
|
||||
AttachedInfo: req.AttachedInfo,
|
||||
Ex: req.Ex,
|
||||
}
|
||||
|
||||
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateSingleChatConversationsResp{}, after)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) webhookBeforeCreateGroupChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error {
|
||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||
cbReq := &callbackstruct.CallbackBeforeCreateGroupChatConversationsReq{
|
||||
CallbackCommand: callbackstruct.CallbackBeforeCreateGroupChatConversationsCommand,
|
||||
ConversationID: req.ConversationID,
|
||||
ConversationType: req.ConversationType,
|
||||
GroupID: req.GroupID,
|
||||
RecvMsgOpt: req.RecvMsgOpt,
|
||||
IsPinned: req.IsPinned,
|
||||
IsPrivateChat: req.IsPrivateChat,
|
||||
BurnDuration: req.BurnDuration,
|
||||
GroupAtType: req.GroupAtType,
|
||||
AttachedInfo: req.AttachedInfo,
|
||||
Ex: req.Ex,
|
||||
}
|
||||
|
||||
resp := &callbackstruct.CallbackBeforeCreateGroupChatConversationsResp{}
|
||||
|
||||
if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt)
|
||||
datautil.NotNilReplace(&req.IsPinned, resp.IsPinned)
|
||||
datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat)
|
||||
datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration)
|
||||
datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType)
|
||||
datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo)
|
||||
datautil.NotNilReplace(&req.Ex, resp.Ex)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *conversationServer) webhookAfterCreateGroupChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error {
|
||||
cbReq := &callbackstruct.CallbackAfterCreateGroupChatConversationsReq{
|
||||
CallbackCommand: callbackstruct.CallbackAfterCreateGroupChatConversationsCommand,
|
||||
ConversationID: req.ConversationID,
|
||||
ConversationType: req.ConversationType,
|
||||
GroupID: req.GroupID,
|
||||
RecvMsgOpt: req.RecvMsgOpt,
|
||||
IsPinned: req.IsPinned,
|
||||
IsPrivateChat: req.IsPrivateChat,
|
||||
BurnDuration: req.BurnDuration,
|
||||
GroupAtType: req.GroupAtType,
|
||||
AttachedInfo: req.AttachedInfo,
|
||||
Ex: req.Ex,
|
||||
}
|
||||
|
||||
c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupChatConversationsResp{}, after)
|
||||
return nil
|
||||
}
|
||||
@@ -19,9 +19,12 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/dbbuild"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||
@@ -30,6 +33,7 @@ import (
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
@@ -39,7 +43,6 @@ import (
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type conversationServer struct {
|
||||
@@ -49,9 +52,10 @@ type conversationServer struct {
|
||||
conversationNotificationSender *ConversationNotificationSender
|
||||
config *Config
|
||||
|
||||
userClient *rpcli.UserClient
|
||||
msgClient *rpcli.MsgClient
|
||||
groupClient *rpcli.GroupClient
|
||||
webhookClient *webhook.Client
|
||||
userClient *rpcli.UserClient
|
||||
msgClient *rpcli.MsgClient
|
||||
groupClient *rpcli.GroupClient
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -60,6 +64,7 @@ type Config struct {
|
||||
MongodbConfig config.Mongo
|
||||
NotificationConfig config.Notification
|
||||
Share config.Share
|
||||
WebhooksConfig config.Webhooks
|
||||
LocalCacheConfig config.LocalCache
|
||||
Discovery config.Discovery
|
||||
}
|
||||
@@ -90,20 +95,32 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgClient := rpcli.NewMsgClient(msgConn)
|
||||
|
||||
cs := conversationServer{
|
||||
config: config,
|
||||
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
|
||||
userClient: rpcli.NewUserClient(userConn),
|
||||
groupClient: rpcli.NewGroupClient(groupConn),
|
||||
msgClient: msgClient,
|
||||
}
|
||||
|
||||
cs.conversationNotificationSender = NewConversationNotificationSender(&config.NotificationConfig, msgClient)
|
||||
cs.conversationDatabase = controller.NewConversationDatabase(
|
||||
conversationDB,
|
||||
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB),
|
||||
mgocli.GetTx())
|
||||
|
||||
localcache.InitLocalCache(&config.LocalCacheConfig)
|
||||
pbconversation.RegisterConversationServer(server, &conversationServer{
|
||||
conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient),
|
||||
conversationDatabase: controller.NewConversationDatabase(conversationDB,
|
||||
redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()),
|
||||
userClient: rpcli.NewUserClient(userConn),
|
||||
groupClient: rpcli.NewGroupClient(groupConn),
|
||||
msgClient: msgClient,
|
||||
})
|
||||
pbconversation.RegisterConversationServer(server, &cs)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversation(ctx context.Context, req *pbconversation.GetConversationReq) (*pbconversation.GetConversationResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -117,7 +134,9 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) {
|
||||
log.ZDebug(ctx, "GetSortedConversationList", "seqs", req, "userID", req.UserID)
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var conversationIDs []string
|
||||
if len(req.ConversationIDs) == 0 {
|
||||
conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
|
||||
@@ -190,6 +209,9 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -200,6 +222,9 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -220,6 +245,9 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
|
||||
}
|
||||
|
||||
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.GetConversation().GetUserID()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var conversation dbModel.Conversation
|
||||
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
|
||||
return nil, err
|
||||
@@ -234,8 +262,10 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers
|
||||
}
|
||||
|
||||
func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) {
|
||||
if req.Conversation == nil {
|
||||
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
|
||||
for _, userID := range req.UserIDs {
|
||||
if err := authverify.CheckAccess(ctx, userID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if req.Conversation.ConversationType == constant.WriteGroupChatType {
|
||||
groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
|
||||
@@ -271,109 +301,29 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
||||
conversation.UserID = req.Conversation.UserID
|
||||
conversation.GroupID = req.Conversation.GroupID
|
||||
|
||||
m := make(map[string]any)
|
||||
|
||||
setConversationFieldsFunc := func() {
|
||||
if req.Conversation.RecvMsgOpt != nil {
|
||||
conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value
|
||||
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
|
||||
}
|
||||
if req.Conversation.AttachedInfo != nil {
|
||||
conversation.AttachedInfo = req.Conversation.AttachedInfo.Value
|
||||
m["attached_info"] = req.Conversation.AttachedInfo.Value
|
||||
}
|
||||
if req.Conversation.Ex != nil {
|
||||
conversation.Ex = req.Conversation.Ex.Value
|
||||
m["ex"] = req.Conversation.Ex.Value
|
||||
}
|
||||
if req.Conversation.IsPinned != nil {
|
||||
conversation.IsPinned = req.Conversation.IsPinned.Value
|
||||
m["is_pinned"] = req.Conversation.IsPinned.Value
|
||||
}
|
||||
if req.Conversation.GroupAtType != nil {
|
||||
conversation.GroupAtType = req.Conversation.GroupAtType.Value
|
||||
m["group_at_type"] = req.Conversation.GroupAtType.Value
|
||||
}
|
||||
if req.Conversation.MsgDestructTime != nil {
|
||||
conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value
|
||||
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
|
||||
}
|
||||
if req.Conversation.IsMsgDestruct != nil {
|
||||
conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value
|
||||
m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value
|
||||
}
|
||||
if req.Conversation.BurnDuration != nil {
|
||||
conversation.BurnDuration = req.Conversation.BurnDuration.Value
|
||||
m["burn_duration"] = req.Conversation.BurnDuration.Value
|
||||
}
|
||||
m, conversation, err := UpdateConversationsMap(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// set need set field in conversation
|
||||
setConversationFieldsFunc()
|
||||
|
||||
for userID := range conversationMap {
|
||||
unequal := len(m)
|
||||
unequal := UserUpdateCheckMap(ctx, userID, req.Conversation, conversationMap[userID])
|
||||
|
||||
if req.Conversation.RecvMsgOpt != nil {
|
||||
if req.Conversation.RecvMsgOpt.Value == conversationMap[userID].RecvMsgOpt {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.AttachedInfo != nil {
|
||||
if req.Conversation.AttachedInfo.Value == conversationMap[userID].AttachedInfo {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.Ex != nil {
|
||||
if req.Conversation.Ex.Value == conversationMap[userID].Ex {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
if req.Conversation.IsPinned != nil {
|
||||
if req.Conversation.IsPinned.Value == conversationMap[userID].IsPinned {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.GroupAtType != nil {
|
||||
if req.Conversation.GroupAtType.Value == conversationMap[userID].GroupAtType {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.MsgDestructTime != nil {
|
||||
if req.Conversation.MsgDestructTime.Value == conversationMap[userID].MsgDestructTime {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.IsMsgDestruct != nil {
|
||||
if req.Conversation.IsMsgDestruct.Value == conversationMap[userID].IsMsgDestruct {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.BurnDuration != nil {
|
||||
if req.Conversation.BurnDuration.Value == conversationMap[userID].BurnDuration {
|
||||
unequal--
|
||||
}
|
||||
}
|
||||
|
||||
if unequal > 0 {
|
||||
if unequal {
|
||||
needUpdateUsersList = append(needUpdateUsersList, userID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(m) != 0 && len(needUpdateUsersList) != 0 {
|
||||
if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, needUpdateUsersList, &conversation, m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, v := range needUpdateUsersList {
|
||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID})
|
||||
for _, userID := range needUpdateUsersList {
|
||||
c.conversationNotificationSender.ConversationChangeNotification(ctx, userID, []string{req.Conversation.ConversationID})
|
||||
}
|
||||
}
|
||||
|
||||
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
|
||||
var conversations []*dbModel.Conversation
|
||||
for _, ownerUserID := range req.UserIDs {
|
||||
@@ -396,58 +346,94 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
|
||||
return &pbconversation.SetConversationsResp{}, nil
|
||||
}
|
||||
|
||||
// Get user IDs with "Do Not Disturb" enabled in super large groups.
|
||||
func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) {
|
||||
return nil, errs.New("deprecated")
|
||||
func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := make(map[string]any)
|
||||
if req.Ex != nil {
|
||||
m["ex"] = req.Ex.Value
|
||||
}
|
||||
if len(m) > 0 {
|
||||
if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &pbconversation.UpdateConversationsByUserResp{}, nil
|
||||
}
|
||||
|
||||
// create conversation without notification for msg redis transfer.
|
||||
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
|
||||
req *pbconversation.CreateSingleChatConversationsReq,
|
||||
) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
||||
func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbconversation.CreateSingleChatConversationsReq) (*pbconversation.CreateSingleChatConversationsResp, error) {
|
||||
var conversation dbModel.Conversation
|
||||
switch req.ConversationType {
|
||||
case constant.SingleChatType:
|
||||
var conversation dbModel.Conversation
|
||||
// sendUser create
|
||||
conversation.ConversationID = req.ConversationID
|
||||
conversation.ConversationType = req.ConversationType
|
||||
conversation.OwnerUserID = req.SendID
|
||||
conversation.UserID = req.RecvID
|
||||
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
|
||||
}
|
||||
|
||||
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
|
||||
|
||||
// recvUser create
|
||||
conversation2 := conversation
|
||||
conversation2.OwnerUserID = req.RecvID
|
||||
conversation2.UserID = req.SendID
|
||||
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
||||
}
|
||||
|
||||
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation2)
|
||||
case constant.NotificationChatType:
|
||||
var conversation dbModel.Conversation
|
||||
conversation.ConversationID = req.ConversationID
|
||||
conversation.ConversationType = req.ConversationType
|
||||
conversation.OwnerUserID = req.RecvID
|
||||
conversation.UserID = req.SendID
|
||||
if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
|
||||
}
|
||||
|
||||
c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation)
|
||||
}
|
||||
|
||||
return &pbconversation.CreateSingleChatConversationsResp{}, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) {
|
||||
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs)
|
||||
var conversation dbModel.Conversation
|
||||
|
||||
conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
|
||||
conversation.GroupID = req.GroupID
|
||||
conversation.ConversationType = constant.ReadGroupChatType
|
||||
|
||||
if err := c.webhookBeforeCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateGroupChatConversations, &conversation); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs, &conversation)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
|
||||
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation)
|
||||
return &pbconversation.CreateGroupChatConversationsResp{}, nil
|
||||
}
|
||||
|
||||
@@ -480,6 +466,9 @@ func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbc
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -488,6 +477,9 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconv
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req *pbconversation.GetUserConversationIDsHashReq) (*pbconversation.GetUserConversationIDsHashResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hash, err := c.conversationDatabase.GetUserConversationIDsHash(ctx, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -495,10 +487,7 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
|
||||
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetConversationsByConversationID(
|
||||
ctx context.Context,
|
||||
req *pbconversation.GetConversationsByConversationIDReq,
|
||||
) (*pbconversation.GetConversationsByConversationIDResp, error) {
|
||||
func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) {
|
||||
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -552,10 +541,7 @@ func (c *conversationServer) conversationSort(conversations map[int64]string, re
|
||||
resp.ConversationElems = append(resp.ConversationElems, cons...)
|
||||
}
|
||||
|
||||
func (c *conversationServer) getConversationInfo(
|
||||
ctx context.Context,
|
||||
chatLogs map[string]*sdkws.MsgData,
|
||||
userID string) (map[string]*pbconversation.ConversationElem, error) {
|
||||
func (c *conversationServer) getConversationInfo(ctx context.Context, chatLogs map[string]*sdkws.MsgData, userID string) (map[string]*pbconversation.ConversationElem, error) {
|
||||
var (
|
||||
sendIDs []string
|
||||
groupIDs []string
|
||||
@@ -641,6 +627,11 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context
|
||||
}
|
||||
|
||||
func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconversation.UpdateConversationReq) (*pbconversation.UpdateConversationResp, error) {
|
||||
for _, userID := range req.UserIDs {
|
||||
if err := authverify.CheckAccess(ctx, userID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
m := make(map[string]any)
|
||||
if req.RecvMsgOpt != nil {
|
||||
m["recv_msg_opt"] = req.RecvMsgOpt.Value
|
||||
@@ -687,6 +678,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbconversation.GetOwnerConversationReq) (*pbconversation.GetOwnerConversationResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
total, conversations, err := c.conversationDatabase.GetOwnerConversation(ctx, req.UserID, req.Pagination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -748,6 +742,9 @@ func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -756,6 +753,9 @@ func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, re
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
package conversation
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
)
|
||||
|
||||
func UpdateConversationsMap(ctx context.Context, req *conversation.SetConversationsReq) (m map[string]any, conversation dbModel.Conversation, err error) {
|
||||
m = make(map[string]any)
|
||||
|
||||
conversation.ConversationID = req.Conversation.ConversationID
|
||||
conversation.ConversationType = req.Conversation.ConversationType
|
||||
conversation.UserID = req.Conversation.UserID
|
||||
conversation.GroupID = req.Conversation.GroupID
|
||||
|
||||
if req.Conversation.RecvMsgOpt != nil {
|
||||
conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value
|
||||
m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value
|
||||
}
|
||||
|
||||
if req.Conversation.AttachedInfo != nil {
|
||||
conversation.AttachedInfo = req.Conversation.AttachedInfo.Value
|
||||
m["attached_info"] = req.Conversation.AttachedInfo.Value
|
||||
}
|
||||
|
||||
if req.Conversation.Ex != nil {
|
||||
conversation.Ex = req.Conversation.Ex.Value
|
||||
m["ex"] = req.Conversation.Ex.Value
|
||||
}
|
||||
if req.Conversation.IsPinned != nil {
|
||||
conversation.IsPinned = req.Conversation.IsPinned.Value
|
||||
m["is_pinned"] = req.Conversation.IsPinned.Value
|
||||
}
|
||||
if req.Conversation.GroupAtType != nil {
|
||||
conversation.GroupAtType = req.Conversation.GroupAtType.Value
|
||||
m["group_at_type"] = req.Conversation.GroupAtType.Value
|
||||
}
|
||||
if req.Conversation.MsgDestructTime != nil {
|
||||
conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value
|
||||
m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value
|
||||
}
|
||||
if req.Conversation.IsMsgDestruct != nil {
|
||||
conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value
|
||||
m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value
|
||||
}
|
||||
if req.Conversation.BurnDuration != nil {
|
||||
conversation.BurnDuration = req.Conversation.BurnDuration.Value
|
||||
m["burn_duration"] = req.Conversation.BurnDuration.Value
|
||||
}
|
||||
|
||||
return m, conversation, nil
|
||||
}
|
||||
|
||||
func UserUpdateCheckMap(ctx context.Context, userID string, req *conversation.ConversationReq, conversation *dbModel.Conversation) (unequal bool) {
|
||||
unequal = false
|
||||
|
||||
if req.RecvMsgOpt != nil && conversation.RecvMsgOpt != req.RecvMsgOpt.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.AttachedInfo != nil && conversation.AttachedInfo != req.AttachedInfo.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.Ex != nil && conversation.Ex != req.Ex.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.IsPinned != nil && conversation.IsPinned != req.IsPinned.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.GroupAtType != nil && conversation.GroupAtType != req.GroupAtType.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.MsgDestructTime != nil && conversation.MsgDestructTime != req.MsgDestructTime.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.IsMsgDestruct != nil && conversation.IsMsgDestruct != req.IsMsgDestruct.Value {
|
||||
unequal = true
|
||||
}
|
||||
if req.BurnDuration != nil && conversation.BurnDuration != req.BurnDuration.Value {
|
||||
unequal = true
|
||||
}
|
||||
|
||||
return unequal
|
||||
}
|
||||
@@ -17,11 +17,11 @@ package conversation
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/notification"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/notification"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
)
|
||||
|
||||
@@ -35,6 +35,9 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re
|
||||
}
|
||||
|
||||
func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) {
|
||||
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{
|
||||
Ctx: ctx,
|
||||
VersionKey: req.UserID,
|
||||
|
||||
Reference in New Issue
Block a user