Files
open-im-server/pkg/common/db/controller/conversation.go
T

224 lines
9.4 KiB
Go
Raw Normal View History

2023-02-02 19:47:21 +08:00
package controller
import (
"context"
2023-03-17 19:41:44 +08:00
2023-05-04 20:07:20 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
2023-03-24 16:41:11 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
2023-03-16 10:46:06 +08:00
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
2023-02-02 19:47:21 +08:00
)
2023-03-01 15:32:26 +08:00
type ConversationDatabase interface {
2023-02-02 19:47:21 +08:00
//UpdateUserConversationFiled 更新用户该会话的属性信息
2023-03-23 19:02:20 +08:00
UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error
2023-02-02 19:47:21 +08:00
//CreateConversation 创建一批新的会话
2023-02-08 18:29:11 +08:00
CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error
2023-02-02 19:47:21 +08:00
//SyncPeerUserPrivateConversation 同步对端私聊会话内部保证事务操作
2023-05-12 16:45:50 +08:00
SyncPeerUserPrivateConversationTx(ctx context.Context, conversation []*relationTb.ConversationModel) error
2023-02-02 19:47:21 +08:00
//FindConversations 根据会话ID获取某个用户的多个会话
2023-02-10 20:57:45 +08:00
FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error)
//FindRecvMsgNotNotifyUserIDs 获取超级大群开启免打扰的用户ID
FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
2023-02-02 19:47:21 +08:00
//GetUserAllConversation 获取一个用户在服务器上所有的会话
2023-02-08 18:29:11 +08:00
GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error)
2023-02-02 19:47:21 +08:00
//SetUserConversations 设置用户多个会话属性,如果会话不存在则创建,否则更新,内部保证原子性
2023-02-08 18:29:11 +08:00
SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error
2023-02-10 20:57:45 +08:00
//SetUsersConversationFiledTx 设置多个用户会话关于某个字段的更新操作,如果会话不存在则创建,否则更新,内部保证事务操作
2023-03-23 19:02:20 +08:00
SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
2023-05-04 20:07:20 +08:00
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error
2023-05-05 21:30:32 +08:00
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
2023-02-02 19:47:21 +08:00
}
2023-02-10 20:57:45 +08:00
2023-03-15 18:35:06 +08:00
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
2023-02-22 14:31:30 +08:00
return &ConversationDataBase{
conversationDB: conversation,
cache: cache,
tx: tx,
}
}
2023-02-10 20:57:45 +08:00
2023-02-02 19:47:21 +08:00
type ConversationDataBase struct {
2023-03-15 18:35:06 +08:00
conversationDB relationTb.ConversationModelInterface
2023-02-10 20:57:45 +08:00
cache cache.ConversationCache
2023-02-22 14:31:30 +08:00
tx tx.Tx
2023-02-10 20:57:45 +08:00
}
2023-03-23 19:02:20 +08:00
func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error {
2023-02-22 14:31:30 +08:00
return c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
2023-04-21 11:26:48 +08:00
haveUserIDs, err := conversationTx.FindUserID(ctx, userIDs, []string{conversation.ConversationID})
2023-02-10 20:57:45 +08:00
if err != nil {
return err
}
2023-04-21 14:51:57 +08:00
cache := c.cache.NewCache()
2023-03-23 19:02:20 +08:00
if len(haveUserIDs) > 0 {
2023-04-21 14:51:57 +08:00
_, err = conversationTx.UpdateByMap(ctx, haveUserIDs, conversation.ConversationID, filedMap)
2023-02-10 20:57:45 +08:00
if err != nil {
return err
}
2023-04-21 14:51:57 +08:00
cache = cache.DelUsersConversation(conversation.ConversationID, haveUserIDs...)
2023-02-10 20:57:45 +08:00
}
2023-03-23 19:02:20 +08:00
NotUserIDs := utils.DifferenceString(haveUserIDs, userIDs)
2023-03-24 16:41:11 +08:00
log.ZDebug(ctx, "SetUsersConversationFiledTx", "NotUserIDs", NotUserIDs, "haveUserIDs", haveUserIDs, "userIDs", userIDs)
2023-04-21 14:51:57 +08:00
var conversations []*relationTb.ConversationModel
2023-03-23 19:02:20 +08:00
for _, v := range NotUserIDs {
2023-02-10 20:57:45 +08:00
temp := new(relationTb.ConversationModel)
if err := utils.CopyStructFields(temp, conversation); err != nil {
return err
}
temp.OwnerUserID = v
2023-04-21 14:51:57 +08:00
conversations = append(conversations, temp)
2023-02-10 20:57:45 +08:00
}
2023-04-21 14:51:57 +08:00
if len(conversations) > 0 {
err = conversationTx.Create(ctx, conversations)
2023-03-24 17:34:33 +08:00
if err != nil {
return err
}
cache = cache.DelConversationIDs(NotUserIDs)
2023-02-10 20:57:45 +08:00
}
2023-03-23 19:02:20 +08:00
// clear cache
2023-03-27 17:20:36 +08:00
log.ZDebug(ctx, "SetUsersConversationFiledTx", "cache", cache.GetPreDelKeys(), "addr", &cache)
2023-04-21 14:51:57 +08:00
return cache.ExecDel(ctx)
2023-02-22 14:31:30 +08:00
})
2023-02-02 19:47:21 +08:00
}
2023-03-23 19:02:20 +08:00
func (c *ConversationDataBase) UpdateUsersConversationFiled(ctx context.Context, userIDs []string, conversationID string, args map[string]interface{}) error {
2023-04-21 14:51:57 +08:00
_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args)
2023-03-23 19:02:20 +08:00
if err != nil {
return err
}
2023-04-21 14:51:57 +08:00
return c.cache.DelUsersConversation(conversationID, userIDs...).ExecDel(ctx)
2023-02-02 19:47:21 +08:00
}
2023-02-22 14:31:30 +08:00
func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversations []*relationTb.ConversationModel) error {
2023-05-12 16:45:50 +08:00
if err := c.conversationDB.Create(ctx, conversations); err != nil {
return err
}
return nil
2023-02-02 19:47:21 +08:00
}
2023-05-12 16:45:50 +08:00
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
2023-02-22 14:31:30 +08:00
return c.tx.Transaction(func(tx any) error {
conversationTx := c.conversationDB.NewTx(tx)
2023-04-21 14:51:57 +08:00
cache := c.cache.NewCache()
2023-05-12 16:45:50 +08:00
for _, conversation := range conversations {
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
haveUserIDs, err := conversationTx.FindUserID(ctx, []string{v[0]}, []string{conversation.ConversationID})
2023-04-21 15:06:29 +08:00
if err != nil {
return err
}
2023-05-12 16:45:50 +08:00
if len(haveUserIDs) > 0 {
_, err := conversationTx.UpdateByMap(ctx, []string{v[0]}, conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat})
if err != nil {
return err
}
cache = cache.DelUsersConversation(conversation.ConversationID, v[0])
} else {
newConversation := *conversation
newConversation.OwnerUserID = v[0]
newConversation.UserID = v[1]
newConversation.ConversationID = conversation.ConversationID
newConversation.IsPrivateChat = conversation.IsPrivateChat
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
}
cache = cache.DelConversationIDs([]string{v[0]})
2023-02-10 20:57:45 +08:00
}
}
}
2023-04-21 14:51:57 +08:00
return c.cache.ExecDel(ctx)
2023-02-10 20:57:45 +08:00
})
2023-02-02 19:47:21 +08:00
}
2023-02-22 14:31:30 +08:00
func (c *ConversationDataBase) FindConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) {
2023-03-23 19:02:20 +08:00
return c.cache.GetConversations(ctx, ownerUserID, conversationIDs)
2023-02-10 20:57:45 +08:00
}
2023-02-22 14:31:30 +08:00
func (c *ConversationDataBase) GetConversation(ctx context.Context, ownerUserID string, conversationID string) (*relationTb.ConversationModel, error) {
2023-03-23 19:02:20 +08:00
return c.cache.GetConversation(ctx, ownerUserID, conversationID)
2023-02-02 19:47:21 +08:00
}
2023-02-22 14:31:30 +08:00
func (c *ConversationDataBase) GetUserAllConversation(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) {
2023-03-23 19:02:20 +08:00
return c.cache.GetUserAllConversations(ctx, ownerUserID)
2023-02-02 19:47:21 +08:00
}
2023-02-22 14:31:30 +08:00
func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationTb.ConversationModel) error {
return c.tx.Transaction(func(tx any) error {
2023-03-23 19:02:20 +08:00
var conversationIDs []string
2023-02-10 20:57:45 +08:00
for _, conversation := range conversations {
2023-03-23 19:02:20 +08:00
conversationIDs = append(conversationIDs, conversation.ConversationID)
2023-02-10 20:57:45 +08:00
}
2023-02-22 14:31:30 +08:00
conversationTx := c.conversationDB.NewTx(tx)
2023-03-23 19:02:20 +08:00
existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs)
2023-02-10 20:57:45 +08:00
if err != nil {
return err
}
2023-03-23 19:02:20 +08:00
if len(existConversations) > 0 {
2023-04-20 16:23:41 +08:00
for _, conversation := range conversations {
err = conversationTx.Update(ctx, conversation)
if err != nil {
return err
}
2023-02-10 20:57:45 +08:00
}
}
2023-03-23 19:02:20 +08:00
var existConversationIDs []string
for _, conversation := range existConversations {
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
2023-02-10 20:57:45 +08:00
}
2023-02-02 19:47:21 +08:00
2023-03-23 19:02:20 +08:00
var notExistConversations []*relationTb.ConversationModel
2023-02-10 20:57:45 +08:00
for _, conversation := range conversations {
2023-03-23 19:02:20 +08:00
if !utils.IsContain(conversation.ConversationID, existConversationIDs) {
notExistConversations = append(notExistConversations, conversation)
2023-02-10 20:57:45 +08:00
}
}
2023-03-23 19:02:20 +08:00
if len(notExistConversations) > 0 {
err = c.conversationDB.Create(ctx, notExistConversations)
2023-02-10 20:57:45 +08:00
if err != nil {
return err
}
}
2023-03-23 19:02:20 +08:00
cache := c.cache.NewCache()
if len(notExistConversations) > 0 {
cache = cache.DelConversationIDs([]string{ownerUserID})
2023-02-10 20:57:45 +08:00
}
2023-03-23 19:02:20 +08:00
return cache.DelConvsersations(ownerUserID, existConversationIDs).ExecDel(ctx)
2023-02-10 20:57:45 +08:00
})
2023-02-02 19:47:21 +08:00
}
func (c *ConversationDataBase) FindRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
2023-03-23 19:02:20 +08:00
return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID)
}
2023-05-04 20:07:20 +08:00
func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error {
conversationID := utils.GetConversationIDBySessionType(constant.SuperGroupChatType, groupID)
return c.tx.Transaction(func(tx any) error {
existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{groupID})
if err != nil {
return err
}
notExistUserIDs := utils.DifferenceString(userIDs, existConversationUserIDs)
var conversations []*relationTb.ConversationModel
for _, v := range notExistUserIDs {
conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversations = append(conversations, &conversation)
}
err = c.conversationDB.Create(ctx, conversations)
if err != nil {
return err
}
_, err = c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, map[string]interface{}{"max_seq": 0})
return err
})
}
2023-05-05 21:30:32 +08:00
func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
return c.cache.GetUserConversationIDs(ctx, userID)
}