group update batch

This commit is contained in:
withchao
2023-02-09 19:40:45 +08:00
parent 5da1195274
commit d6d6afb788
22 changed files with 446 additions and 486 deletions
-23
View File
@@ -148,29 +148,6 @@ func (c *ConversationRedis) GetUserConversationIDs1(ctx context.Context, ownerUs
// return t, nil
//}
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
v, err := rcClient.Fetch(key, expire, func() (string, error) {
v, err := fn(ctx)
if err != nil {
return "", err
}
bs, err := json.Marshal(v)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bs), nil
})
var t T
if err != nil {
return t, err
}
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return t, utils.Wrap(err, "")
}
return t, nil
}
func (c *ConversationRedis) DelUserConversationIDs(ctx context.Context, ownerUserID string) (err error) {
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "ownerUserID", ownerUserID)
+11 -34
View File
@@ -56,16 +56,16 @@ type GroupCache interface {
}
type GroupCacheRedis struct {
group *relation.GroupGorm
groupMember *relation.GroupMemberGorm
groupRequest *relation.GroupRequestGorm
group relationTb.GroupModelInterface
groupMember relationTb.GroupMemberModelInterface
groupRequest relationTb.GroupRequestModelInterface
mongoDB *unrelation.SuperGroupMongoDriver
expireTime time.Duration
redisClient *RedisClient
rcClient *rockscache.Client
}
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB *relation.GroupGorm, groupMemberDB *relation.GroupMemberGorm, groupRequestDB *relation.GroupRequestGorm, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCacheRedis {
func NewGroupCacheRedis(rdb redis.UniversalClient, groupDB relationTb.GroupModelInterface, groupMemberDB relationTb.GroupMemberModelInterface, groupRequestDB relationTb.GroupRequestModelInterface, mongoClient *unrelation.SuperGroupMongoDriver, opts rockscache.Options) *GroupCacheRedis {
return &GroupCacheRedis{rcClient: rockscache.NewClient(rdb, opts), expireTime: groupExpireTime,
group: groupDB, groupMember: groupMemberDB, groupRequest: groupRequestDB, redisClient: NewRedisClient(rdb),
mongoDB: mongoClient,
@@ -105,39 +105,16 @@ func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string {
}
// / groupInfo
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relation.Group, err error) {
for _, groupID := range groupIDs {
group, err := g.GetGroupInfo(ctx, groupID)
if err != nil {
return nil, err
}
groups = append(groups, group)
}
return groups, nil
func (g *GroupCacheRedis) GetGroupsInfo(ctx context.Context, groupIDs []string) (groups []*relationTb.GroupModel, err error) {
return GetCacheFor(ctx, g.rcClient, groupIDs, func(ctx context.Context, groupID string) (*relationTb.GroupModel, error) {
return g.GetGroupInfo(ctx, groupID)
})
}
func (g *GroupCacheRedis) GetGroupInfo(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error) {
getGroup := func() (string, error) {
groupInfo, err := g.group.Take(ctx, groupID)
if err != nil {
return "", utils.Wrap(err, "")
}
bytes, err := json.Marshal(groupInfo)
if err != nil {
return "", utils.Wrap(err, "")
}
return string(bytes), nil
}
group = &relationTb.GroupModel{}
defer func() {
tracelog.SetCtxDebug(ctx, utils.GetFuncName(1), err, "groupID", groupID, "group", *group)
}()
groupStr, err := g.rcClient.Fetch(g.getGroupInfoKey(groupID), g.expireTime, getGroup)
if err != nil {
return nil, err
}
err = json.Unmarshal([]byte(groupStr), group)
return group, utils.Wrap(err, "")
return GetCache(ctx, g.rcClient, g.getGroupInfoKey(groupID), g.expireTime, func(ctx context.Context) (*relationTb.GroupModel, error) {
return g.group.Take(ctx, groupID)
})
}
func (g *GroupCacheRedis) DelGroupInfo(ctx context.Context, groupID string) (err error) {
+49
View File
@@ -0,0 +1,49 @@
package cache
import (
"Open_IM/pkg/utils"
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"time"
)
func GetCache[T any](ctx context.Context, rcClient *rockscache.Client, key string, expire time.Duration, fn func(ctx context.Context) (T, error)) (T, error) {
var t T
var write bool
v, err := rcClient.Fetch(key, expire, func() (s string, err error) {
t, err = fn(ctx)
if err != nil {
return "", err
}
bs, err := json.Marshal(t)
if err != nil {
return "", utils.Wrap(err, "")
}
write = true
return string(bs), nil
})
if err != nil {
return t, err
}
if write {
return t, nil
}
err = json.Unmarshal([]byte(v), &t)
if err != nil {
return t, utils.Wrap(err, "")
}
return t, nil
}
func GetCacheFor[E any, T any](ctx context.Context, rcClient *rockscache.Client, list []E, fn func(ctx context.Context, item E) (T, error)) ([]T, error) {
rs := make([]T, 0, len(list))
for _, e := range list {
r, err := fn(ctx, e)
if err != nil {
return nil, err
}
rs = append(rs, r)
}
return rs, nil
}
+31 -8
View File
@@ -19,6 +19,12 @@ import (
//type GroupInterface GroupDataBaseInterface
type BatchUpdateGroupMember struct {
GroupID string
UserID string
Map map[string]any
}
type GroupInterface interface {
CreateGroup(ctx context.Context, groups []*relationTb.GroupModel, groupMembers []*relationTb.GroupMemberModel) error
TakeGroup(ctx context.Context, groupID string) (group *relationTb.GroupModel, err error)
@@ -38,7 +44,8 @@ type GroupInterface interface {
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string][]string, error)
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
UpdateGroupMember(ctx context.Context, groupID, userID string, data map[string]any) error
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error
// GroupRequest
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
@@ -130,7 +137,11 @@ func (g *GroupController) TransferGroupOwner(ctx context.Context, groupID string
return g.database.TransferGroupOwner(ctx, groupID, oldOwnerUserID, newOwnerUserID, roleLevel)
}
func (g *GroupController) UpdateGroupMember(ctx context.Context, groupID, userID string, data map[string]any) error {
func (g *GroupController) UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error {
return g.database.UpdateGroupMembers(ctx, data)
}
func (g *GroupController) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error {
return g.database.UpdateGroupMember(ctx, groupID, userID, data)
}
@@ -146,9 +157,6 @@ func (g *GroupController) PageGroupRequestUser(ctx context.Context, userID strin
return g.database.PageGroupRequestUser(ctx, userID, pageNumber, showNumber)
}
// func (g *GroupController) TakeSuperGroup(ctx context.Context, groupID string) (superGroup *unrelationTb.SuperGroupModel, err error) {
// return g.database.TakeSuperGroup(ctx, groupID)
// }
func (g *GroupController) FindSuperGroup(ctx context.Context, groupIDs []string) ([]*unrelationTb.SuperGroupModel, error) {
return g.database.FindSuperGroup(ctx, groupIDs)
}
@@ -192,7 +200,8 @@ type GroupDataBaseInterface interface {
MapGroupMemberUserID(ctx context.Context, groupIDs []string) (map[string][]string, error)
MapGroupMemberNum(ctx context.Context, groupIDs []string) (map[string]uint32, error)
TransferGroupOwner(ctx context.Context, groupID string, oldOwnerUserID, newOwnerUserID string, roleLevel int32) error // 转让群
UpdateGroupMember(ctx context.Context, groupID, userID string, data map[string]any) error
UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error
UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error
// GroupRequest
CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error
TakeGroupRequest(ctx context.Context, groupID string, userID string) (*relationTb.GroupRequestModel, error)
@@ -217,7 +226,7 @@ func NewGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, mgoClient *mongo.C
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
db: &newDB,
cache: cache.NewGroupCache(rdb, groupDB, groupMemberDB, groupRequestDB, SuperGroupMongoDriver, rockscache.Options{
cache: cache.NewGroupCacheRedis(rdb, groupDB, groupMemberDB, groupRequestDB, SuperGroupMongoDriver, rockscache.Options{
RandomExpireAdjustment: 0.2,
DisableCacheRead: false,
DisableCacheDelete: false,
@@ -408,7 +417,7 @@ func (g *GroupDataBase) TransferGroupOwner(ctx context.Context, groupID string,
})
}
func (g *GroupDataBase) UpdateGroupMember(ctx context.Context, groupID, userID string, data map[string]any) error {
func (g *GroupDataBase) UpdateGroupMember(ctx context.Context, groupID string, userID string, data map[string]any) error {
return g.db.Transaction(func(tx *gorm.DB) error {
if err := g.groupMemberDB.Update(ctx, groupID, userID, data, tx); err != nil {
return err
@@ -420,6 +429,20 @@ func (g *GroupDataBase) UpdateGroupMember(ctx context.Context, groupID, userID s
})
}
func (g *GroupDataBase) UpdateGroupMembers(ctx context.Context, data []*BatchUpdateGroupMember) error {
return g.db.Transaction(func(tx *gorm.DB) error {
for _, item := range data {
if err := g.groupMemberDB.Update(ctx, item.GroupID, item.UserID, item.Map, tx); err != nil {
return err
}
if err := g.cache.DelGroupMemberInfo(ctx, item.GroupID, item.UserID); err != nil {
return err
}
}
return nil
})
}
func (g *GroupDataBase) CreateGroupRequest(ctx context.Context, requests []*relationTb.GroupRequestModel) error {
return g.groupRequestDB.Create(ctx, requests)
}