feat: implement true batchGetIncrGroupMember RPC method and dependency methods. (#2417)

* update wip contents.

* update protocol pkg.

* feat: add BatchOption struct and method.

* fix: remove unnecessary field.

* feat: implement true BatchGetIncrGroupMember RPC method and corresponding dependency methods.

* fix: update mongo version collection have unique index.

* optimize method structures.

* update resp in add sortVersion field.

* fix uncorrect condition.

* add errs pkg.
This commit is contained in:
Monet Lee
2024-07-19 17:20:11 +08:00
committed by GitHub
parent 6c8ac45137
commit d0d33b6b78
13 changed files with 454 additions and 61 deletions
+1
View File
@@ -64,5 +64,6 @@ type GroupCache interface {
DelMaxGroupMemberVersion(groupIDs ...string) GroupCache
DelMaxJoinGroupVersion(userIDs ...string) GroupCache
FindMaxGroupMemberVersion(ctx context.Context, groupID string) (*model.VersionLog, error)
BatchFindMaxGroupMemberVersion(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error)
FindMaxJoinGroupVersion(ctx context.Context, userID string) (*model.VersionLog, error)
}
+17 -1
View File
@@ -17,6 +17,8 @@ package redis
import (
"context"
"fmt"
"time"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@@ -28,7 +30,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"time"
)
const (
@@ -390,6 +391,21 @@ func (g *GroupCacheRedis) FindMaxGroupMemberVersion(ctx context.Context, groupID
})
}
func (g *GroupCacheRedis) BatchFindMaxGroupMemberVersion(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) {
return batchGetCache2(ctx, g.rcClient, g.expireTime, groupIDs,
func(groupID string) string {
return g.getGroupMemberMaxVersionKey(groupID)
}, func(versionLog *model.VersionLog) string {
return versionLog.DID
}, func(ctx context.Context, groupIDs []string) ([]*model.VersionLog, error) {
// create two slices with len is groupIDs, just need 0
versions := make([]uint, len(groupIDs))
limits := make([]int, len(groupIDs))
return g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, versions, limits)
})
}
func (g *GroupCacheRedis) FindMaxJoinGroupVersion(ctx context.Context, userID string) (*model.VersionLog, error) {
return getCache(ctx, g.rcClient, g.getJoinGroupMaxVersionKey(userID), g.expireTime, func(ctx context.Context) (*model.VersionLog, error) {
return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, 0, 0)
+42 -1
View File
@@ -16,17 +16,19 @@ package controller
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
redis2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/common"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/pagination"
"github.com/openimsdk/tools/db/tx"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
)
@@ -108,6 +110,7 @@ type GroupDatabase interface {
DeleteGroupMemberHash(ctx context.Context, groupIDs []string) error
FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error)
BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error)
FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error
@@ -115,6 +118,7 @@ type GroupDatabase interface {
//FindSortJoinGroupIDs(ctx context.Context, userID string) ([]string, error)
FindMaxGroupMemberVersionCache(ctx context.Context, groupID string) (*model.VersionLog, error)
BatchFindMaxGroupMemberVersionCache(ctx context.Context, groupIDs []string) (map[string]*model.VersionLog, error)
FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error)
SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error)
@@ -498,6 +502,29 @@ func (g *groupDatabase) FindMemberIncrVersion(ctx context.Context, groupID strin
return g.groupMemberDB.FindMemberIncrVersion(ctx, groupID, version, limit)
}
func (g *groupDatabase) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint64, limits []int) (map[string]*model.VersionLog, error) {
if len(groupIDs) == 0 {
return nil, errs.Wrap(errs.New("groupIDs is nil."))
}
// convert []uint64 to []uint
var uintVersions []uint
for _, version := range versions {
uintVersions = append(uintVersions, uint(version))
}
versionLogs, err := g.groupMemberDB.BatchFindMemberIncrVersion(ctx, groupIDs, uintVersions, limits)
if err != nil {
return nil, errs.Wrap(err)
}
groupMemberIncrVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string {
return e.DID
})
return groupMemberIncrVersionsMap, nil
}
func (g *groupDatabase) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
return g.groupMemberDB.FindJoinIncrVersion(ctx, userID, version, limit)
}
@@ -506,6 +533,20 @@ func (g *groupDatabase) FindMaxGroupMemberVersionCache(ctx context.Context, grou
return g.cache.FindMaxGroupMemberVersion(ctx, groupID)
}
func (g *groupDatabase) BatchFindMaxGroupMemberVersionCache(ctx context.Context, groupIDs []string) (map[string]*model.VersionLog, error) {
if len(groupIDs) == 0 {
return nil, errs.Wrap(errs.New("groupIDs is nil in Cache."))
}
versionLogs, err := g.cache.BatchFindMaxGroupMemberVersion(ctx, groupIDs)
if err != nil {
return nil, errs.Wrap(err)
}
maxGroupMemberVersionsMap := datautil.SliceToMap(versionLogs, func(e *model.VersionLog) string {
return e.DID
})
return maxGroupMemberVersionsMap, nil
}
func (g *groupDatabase) FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error) {
return g.cache.FindMaxJoinGroupVersion(ctx, userID)
}
@@ -16,6 +16,7 @@ package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
)
@@ -40,5 +41,6 @@ type GroupMember interface {
JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error
MemberGroupIncrVersion(ctx context.Context, groupID string, userIDs []string, state int32) error
FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error)
BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint, limits []int) ([]*model.VersionLog, error)
FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
}
@@ -16,6 +16,7 @@ package mgo
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/log"
@@ -230,6 +231,11 @@ func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID stri
return g.member.FindChangeLog(ctx, groupID, version, limit)
}
func (g *GroupMemberMgo) BatchFindMemberIncrVersion(ctx context.Context, groupIDs []string, versions []uint, limits []int) ([]*model.VersionLog, error) {
log.ZDebug(ctx, "Batch find member incr version", "groupIDs", groupIDs, "versions", versions)
return g.member.BatchFindChangeLog(ctx, groupIDs, versions, limits)
}
func (g *GroupMemberMgo) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
log.ZDebug(ctx, "find join incr version", "userID", userID, "version", version)
return g.join.FindChangeLog(ctx, userID, version, limit)
+23 -1
View File
@@ -3,6 +3,8 @@ package mgo
import (
"context"
"errors"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
@@ -13,7 +15,6 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)
func NewVersionLog(coll *mongo.Collection) (database.VersionLog, error) {
@@ -35,6 +36,7 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error {
},
Options: options.Index().SetUnique(true),
})
return err
}
@@ -198,6 +200,26 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u
}
}
func (l *VersionLogMgo) BatchFindChangeLog(ctx context.Context, dIds []string, versions []uint, limits []int) (vLogs []*model.VersionLog, err error) {
for i := 0; i < len(dIds); i++ {
if vLog, err := l.findChangeLog(ctx, dIds[i], versions[i], limits[i]); err == nil {
vLogs = append(vLogs, vLog)
} else if !errors.Is(err, mongo.ErrNoDocuments) {
log.ZError(ctx, "findChangeLog error:", errs.Wrap(err))
}
log.ZDebug(ctx, "init doc", "dId", dIds[i])
if res, err := l.initDoc(ctx, dIds[i], nil, 0, time.Now()); err == nil {
log.ZDebug(ctx, "init doc success", "dId", dIds[i])
vLogs = append(vLogs, res)
} else if mongo.IsDuplicateKeyError(err) {
l.findChangeLog(ctx, dIds[i], versions[i], limits[i])
} else {
log.ZError(ctx, "init doc error:", errs.Wrap(err))
}
}
return vLogs, errs.Wrap(err)
}
func (l *VersionLogMgo) findChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) {
if version == 0 && limit == 0 {
return l.findDoc(ctx, dId)
+3 -1
View File
@@ -2,8 +2,9 @@ package database
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)
const (
@@ -14,6 +15,7 @@ const (
type VersionLog interface {
IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error
FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error)
BatchFindChangeLog(ctx context.Context, dIds []string, versions []uint, limits []int) ([]*model.VersionLog, error)
DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error
Delete(ctx context.Context, dId string) error
}