Resolving code conflicts after project directory changes and Add user subscription to the operation from rpc to db layer (#684)

* Resolving code conflicts after project directory changes and Add subscribe and unsubscribe mongodb operations

* Organize and update module dependencies
This commit is contained in:
pluto
2023-07-27 19:46:44 +08:00
committed by GitHub
parent 0b9ac4bd87
commit 4d816978ec
9 changed files with 295 additions and 39 deletions
+61 -28
View File
@@ -16,6 +16,8 @@ package controller
import (
"context"
unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/protocol/constant"
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@@ -26,38 +28,45 @@ import (
)
type UserDatabase interface {
// 获取指定用户的信息 如有userID未找到 也返回错误
// FindWithError Get the information of the specified user. If the userID is not found, it will also return an error
FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// 获取指定用户的信息 如有userID未找到 不返回错误
// Find Get the information of the specified user If the userID is not found, no error will be returned
Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// 插入多条 外部保证userID 不重复 且在db中不存在
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db
Create(ctx context.Context, users []*relation.UserModel) (err error)
// 更新(非零值) 外部保证userID存在
// Update update (non-zero value) external guarantee userID exists
Update(ctx context.Context, user *relation.UserModel) (err error)
// 更新(零值) 外部保证userID存在
// UpdateByMap update (zero value) external guarantee userID exists
UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error)
// 如果没找到,不返回错误
// Page If not found, no error is returned
Page(ctx context.Context, pageNumber, showNumber int32) (users []*relation.UserModel, count int64, err error)
// 只要有一个存在就为true
// IsExist true as long as one exists
IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
// 获取所有用户ID
// GetAllUserID Get all user IDs
GetAllUserID(ctx context.Context, pageNumber, showNumber int32) ([]string, error)
// 函数内部先查询db中是否存在,存在则什么都不做;不存在则插入
// InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it
InitOnce(ctx context.Context, users []*relation.UserModel) (err error)
// 获取用户总数
// CountTotal Get the total number of users
CountTotal(ctx context.Context, before *time.Time) (int64, error)
// 获取范围内用户增量
// CountRangeEverydayTotal Get the user increment in the range
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
//SubscribeOrCancelUsersStatus Subscribe or unsubscribe a user's presence status
SubscribeOrCancelUsersStatus(ctx context.Context, userID string, userIDs []string, genre int32) error
// GetAllSubscribeList Get a list of all subscriptions
GetAllSubscribeList(ctx context.Context, userID string) ([]string, error)
// GetSubscribedList Get all subscribed lists
GetSubscribedList(ctx context.Context, userID string) ([]string, error)
}
type userDatabase struct {
userDB relation.UserModelInterface
cache cache.UserCache
tx tx.Tx
userDB relation.UserModelInterface
cache cache.UserCache
tx tx.Tx
mongoDB unRelationTb.UserModelInterface
}
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.Tx) UserDatabase {
return &userDatabase{userDB: userDB, cache: cache, tx: tx}
func NewUserDatabase(userDB relation.UserModelInterface, cache cache.UserCache, tx tx.Tx, mongoDB unRelationTb.UserModelInterface) UserDatabase {
return &userDatabase{userDB: userDB, cache: cache, tx: tx, mongoDB: mongoDB}
}
func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel) (err error) {
@@ -75,7 +84,7 @@ func (u *userDatabase) InitOnce(ctx context.Context, users []*relation.UserModel
return nil
}
// 获取指定用户的信息 如有userID未找到 也返回错误.
// FindWithError Get the information of the specified user and return an error if the userID is not found.
func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
users, err = u.cache.GetUsersInfo(ctx, userIDs)
if err != nil {
@@ -87,13 +96,13 @@ func (u *userDatabase) FindWithError(ctx context.Context, userIDs []string) (use
return
}
// 获取指定用户的信息 如有userID未找到 不返回错误.
// Find Get the information of the specified user. If the userID is not found, no error will be returned.
func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error) {
users, err = u.cache.GetUsersInfo(ctx, userIDs)
return
}
// 插入多条 外部保证userID 不重复 且在db中不存在.
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db.
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
if err := u.tx.Transaction(func(tx any) error {
err = u.userDB.Create(ctx, users)
@@ -111,7 +120,7 @@ func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel)
return u.cache.DelUsersInfo(userIDs...).ExecDel(ctx)
}
// 更新(非零值) 外部保证userID存在.
// Update (non-zero value) externally guarantees that userID exists.
func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (err error) {
if err := u.userDB.Update(ctx, user); err != nil {
return err
@@ -119,7 +128,7 @@ func (u *userDatabase) Update(ctx context.Context, user *relation.UserModel) (er
return u.cache.DelUsersInfo(user.UserID).ExecDel(ctx)
}
// 更新(零值) 外部保证userID存在.
// UpdateByMap update (zero value) externally guarantees that userID exists.
func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[string]interface{}) (err error) {
if err := u.userDB.UpdateByMap(ctx, userID, args); err != nil {
return err
@@ -127,7 +136,7 @@ func (u *userDatabase) UpdateByMap(ctx context.Context, userID string, args map[
return u.cache.DelUsersInfo(userID).ExecDel(ctx)
}
// 获取,如果没找到,不返回错误.
// Page Gets, returns no error if not found.
func (u *userDatabase) Page(
ctx context.Context,
pageNumber, showNumber int32,
@@ -135,7 +144,7 @@ func (u *userDatabase) Page(
return u.userDB.Page(ctx, pageNumber, showNumber)
}
// userIDs是否存在 只要有一个存在就为true.
// IsExist Does userIDs exist? As long as there is one, it will be true.
func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist bool, err error) {
users, err := u.userDB.Find(ctx, userIDs)
if err != nil {
@@ -147,18 +156,42 @@ func (u *userDatabase) IsExist(ctx context.Context, userIDs []string) (exist boo
return false, nil
}
// GetAllUserID Get all user IDs
func (u *userDatabase) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (userIDs []string, err error) {
return u.userDB.GetAllUserID(ctx, pageNumber, showNumber)
}
// CountTotal Get the total number of users
func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
return u.userDB.CountTotal(ctx, before)
}
func (u *userDatabase) CountRangeEverydayTotal(
ctx context.Context,
start time.Time,
end time.Time,
) (map[string]int64, error) {
// CountRangeEverydayTotal Get the user increment in the range
func (u *userDatabase) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) {
return u.userDB.CountRangeEverydayTotal(ctx, start, end)
}
//SubscribeOrCancelUsersStatus Subscribe or unsubscribe a user's presence status
func (u *userDatabase) SubscribeOrCancelUsersStatus(ctx context.Context, userID string, userIDs []string, genre int32) error {
var err error
if genre == constant.SubscriberUser {
err = u.mongoDB.AddSubscriptionList(ctx, userID, userIDs)
} else if genre == constant.Unsubscribe {
err = u.mongoDB.UnsubscriptionList(ctx, userID, userIDs)
}
return err
}
// GetAllSubscribeList Get a list of all subscriptions.
func (u *userDatabase) GetAllSubscribeList(ctx context.Context, userID string) ([]string, error) {
//TODO 获取所有订阅
return nil, nil
}
// GetSubscribedList Get all subscribed lists
func (u *userDatabase) GetSubscribedList(ctx context.Context, userID string) ([]string, error) {
//TODO 获取所有被订阅
return nil, nil
}
+42
View File
@@ -0,0 +1,42 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import "context"
// SubscribeUser collection constant.
const (
SubscribeUser = "subscribe_user"
)
// UserModel collection structure.
type UserModel struct {
UserID string `bson:"user_id" json:"userID"`
UserIDList []string `bson:"user_id_list" json:"userIDList"`
}
func (UserModel) TableName() string {
return SubscribeUser
}
// UserModelInterface Operation interface of user mongodb.
type UserModelInterface interface {
// AddSubscriptionList Subscriber's handling of thresholds.
AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error
// UnsubscriptionList Handling of unsubscribe.
UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error
}
+141
View File
@@ -0,0 +1,141 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation"
"github.com/OpenIMSDK/tools/utils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"log"
)
// prefixes and suffixes.
const (
SubscriptionPrefix = "subscription_prefix"
SubscribedPrefix = "subscribed_prefix"
)
// MaximumSubscription Maximum number of subscriptions.
const (
MaximumSubscription = 3000
)
func NewUserMongoDriver(database *mongo.Database) unrelation.UserModelInterface {
return &UserMongoDriver{
userCollection: database.Collection(unrelation.SubscribeUser),
}
}
type UserMongoDriver struct {
userCollection *mongo.Collection
}
// AddSubscriptionList Subscriber's handling of thresholds.
func (u *UserMongoDriver) AddSubscriptionList(ctx context.Context, userID string, userIDList []string) error {
// Check the number of lists in the key.
filter := bson.M{SubscriptionPrefix + userID: bson.M{"$size": 1}}
result, err := u.userCollection.Find(context.Background(), filter)
if err != nil {
return err
}
var newUserIDList []string
for result.Next(context.Background()) {
err := result.Decode(&newUserIDList)
if err != nil {
log.Fatal(err)
}
}
// If the threshold is exceeded, pop out the previous MaximumSubscription - len(userIDList) and insert it.
if len(newUserIDList)+len(userIDList) > MaximumSubscription {
newUserIDList = newUserIDList[MaximumSubscription-len(userIDList):]
_, err := u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscriptionPrefix + userID},
bson.M{"$set": bson.M{"user_id_list": newUserIDList}},
)
if err != nil {
return err
}
//for i := 1; i <= MaximumSubscription-len(userIDList); i++ {
// _, err := u.userCollection.UpdateOne(
// ctx,
// bson.M{"user_id": SubscriptionPrefix + userID},
// bson.M{SubscriptionPrefix + userID: bson.M{"$pop": -1}},
// )
// if err != nil {
// return err
// }
//}
}
upsert := true
opts := &options.UpdateOptions{
Upsert: &upsert,
}
_, err = u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscriptionPrefix + userID},
bson.M{"$addToSet": bson.M{"user_id_list": bson.M{"$each": userIDList}}},
opts,
)
if err != nil {
return err
}
for _, user := range userIDList {
_, err = u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscribedPrefix + user},
bson.M{"$addToSet": bson.M{"user_id_list": userID}},
opts,
)
if err != nil {
return utils.Wrap(err, "transaction failed")
}
}
return nil
}
// UnsubscriptionList Handling of unsubscribe.
func (u *UserMongoDriver) UnsubscriptionList(ctx context.Context, userID string, userIDList []string) error {
_, err := u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": SubscriptionPrefix + userID},
bson.M{"$pull": bson.M{"user_id_list": bson.M{"$in": userIDList}}},
)
if err != nil {
return err
}
err = u.RemoveSubscribedListFromUser(ctx, userID, userIDList)
if err != nil {
return err
}
return nil
}
// RemoveSubscribedListFromUser Among the unsubscribed users, delete the user from the subscribed list.
func (u *UserMongoDriver) RemoveSubscribedListFromUser(ctx context.Context, userID string, userIDList []string) error {
var newUserIDList []string
for _, value := range userIDList {
newUserIDList = append(newUserIDList, SubscribedPrefix+value)
}
_, err := u.userCollection.UpdateOne(
ctx,
bson.M{"user_id": bson.M{"$in": newUserIDList}},
bson.M{"$pull": bson.M{"user_id_list": userID}},
)
return utils.Wrap(err, "")
}