Merge remote-tracking branch 'origin/errcode' into errcode

# Conflicts:
#	cmd/msggateway/main.go
#	go.sum
#	internal/msggateway/init.go
This commit is contained in:
Gordon
2023-03-08 18:42:25 +08:00
87 changed files with 2538 additions and 2232 deletions
+5 -5
View File
@@ -3,24 +3,24 @@ package api
import (
"OpenIM/internal/api/a2r"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
auth "OpenIM/pkg/proto/auth"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
)
var _ context.Context // 解决goland编辑器bug
func NewAuth(zk *openKeeper.ZkClient) *Auth {
return &Auth{zk: zk}
func NewAuth(c discoveryregistry.SvcDiscoveryRegistry) *Auth {
return &Auth{c: c}
}
type Auth struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
}
func (o *Auth) client() (auth.AuthClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImAuthName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImAuthName)
if err != nil {
return nil, err
}
+5 -5
View File
@@ -3,24 +3,24 @@ package api
import (
"OpenIM/internal/api/a2r"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/conversation"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
)
var _ context.Context // 解决goland编辑器bug
func NewConversation(zk *openKeeper.ZkClient) *Conversation {
return &Conversation{zk: zk}
func NewConversation(c discoveryregistry.SvcDiscoveryRegistry) *Conversation {
return &Conversation{c: c}
}
type Conversation struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
}
func (o *Conversation) client() (conversation.ConversationClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImConversationName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImConversationName)
if err != nil {
return nil, err
}
+6 -5
View File
@@ -3,24 +3,25 @@ package api
import (
"OpenIM/internal/api/a2r"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/friend"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
)
var _ context.Context // 解决goland编辑器bug
func NewFriend(zk *openKeeper.ZkClient) *Friend {
return &Friend{zk: zk}
func NewFriend(c discoveryregistry.SvcDiscoveryRegistry) *Friend {
return &Friend{c: c}
}
type Friend struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
}
func (o *Friend) client() (friend.FriendClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImFriendName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImFriendName)
if err != nil {
return nil, err
}
+6 -5
View File
@@ -3,24 +3,25 @@ package api
import (
"OpenIM/internal/api/a2r"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/group"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
)
var _ context.Context // 解决goland编辑器bug
func NewGroup(zk *openKeeper.ZkClient) *Group {
return &Group{zk: zk}
func NewGroup(c discoveryregistry.SvcDiscoveryRegistry) *Group {
return &Group{c: c}
}
type Group struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
}
func (o *Group) client() (group.GroupClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImGroupName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImGroupName)
if err != nil {
return nil, err
}
+6 -6
View File
@@ -7,13 +7,13 @@ import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/log"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/errs"
"OpenIM/pkg/proto/msg"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"errors"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
"github.com/golang/protobuf/proto"
@@ -22,12 +22,12 @@ import (
var _ context.Context // 解决goland编辑器bug
func NewMsg(zk *openKeeper.ZkClient) *Msg {
return &Msg{zk: zk, validate: validator.New()}
func NewMsg(c discoveryregistry.SvcDiscoveryRegistry) *Msg {
return &Msg{c: c, validate: validator.New()}
}
type Msg struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
validate *validator.Validate
}
@@ -107,7 +107,7 @@ func newUserSendMsgReq(params *apistruct.ManagementSendMsgReq) *msg.SendMsgReq {
}
func (o *Msg) client() (msg.MsgClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImMsgName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImMsgName)
if err != nil {
return nil, err
}
@@ -214,7 +214,7 @@ func (o *Msg) ManagementSendMsg(c *gin.Context) {
}
log.NewInfo(params.OperationID, "Ws call success to ManagementSendMsgReq", params)
pbData := newUserSendMsgReq(&params)
conn, err := o.zk.GetConn(config.Config.RpcRegisterName.OpenImMsgName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImMsgName)
if err != nil {
apiresp.GinError(c, errs.ErrInternalServer)
return
+3 -7
View File
@@ -5,15 +5,13 @@ import (
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/mw"
"OpenIM/pkg/common/prome"
"github.com/OpenIMSDK/openKeeper"
"OpenIM/pkg/discoveryregistry"
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"io"
"os"
)
func NewGinRouter() *gin.Engine {
openKeeper.DefaultOptions = []grpc.DialOption{mw.GrpcClient()} // 默认RPC中间件
func NewGinRouter(zk discoveryregistry.SvcDiscoveryRegistry) *gin.Engine {
gin.SetMode(gin.ReleaseMode)
f, _ := os.Create("../logs/api.log")
gin.DefaultWriter = io.MultiWriter(f)
@@ -28,9 +26,7 @@ func NewGinRouter() *gin.Engine {
r.Use(prome.PrometheusMiddleware)
r.GET("/metrics", prome.PrometheusHandler())
}
var zk *openKeeper.ZkClient
zk.AddOption(mw.GrpcClient()) // 默认RPC中间件
userRouterGroup := r.Group("/user")
{
u := NewUser(zk)
+6 -5
View File
@@ -3,24 +3,25 @@ package api
import (
"OpenIM/internal/api/a2r"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/third"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
)
var _ context.Context // 解决goland编辑器bug
func NewThird(zk *openKeeper.ZkClient) *Third {
return &Third{zk: zk}
func NewThird(c discoveryregistry.SvcDiscoveryRegistry) *Third {
return &Third{c: c}
}
type Third struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
}
func (o *Third) client() (third.ThirdClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImThirdName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImThirdName)
if err != nil {
return nil, err
}
+5 -5
View File
@@ -3,24 +3,24 @@ package api
import (
"OpenIM/internal/api/a2r"
"OpenIM/pkg/common/config"
"OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/proto/user"
"context"
"github.com/OpenIMSDK/openKeeper"
"github.com/gin-gonic/gin"
)
var _ context.Context // 解决goland编辑器bug
func NewUser(zk *openKeeper.ZkClient) *User {
return &User{zk: zk}
func NewUser(client discoveryregistry.SvcDiscoveryRegistry) *User {
return &User{c: client}
}
type User struct {
zk *openKeeper.ZkClient
c discoveryregistry.SvcDiscoveryRegistry
}
func (o *User) client() (user.UserClient, error) {
conn, err := o.zk.GetDefaultConn(config.Config.RpcRegisterName.OpenImUserName)
conn, err := o.c.GetConn(config.Config.RpcRegisterName.OpenImUserName)
if err != nil {
return nil, err
}
+6 -19
View File
@@ -2,9 +2,7 @@ package check
import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
discoveryRegistry "OpenIM/pkg/discoveryregistry"
"OpenIM/pkg/errs"
"OpenIM/pkg/proto/friend"
sdkws "OpenIM/pkg/proto/sdkws"
"context"
@@ -51,26 +49,15 @@ func (f *FriendChecker) IsFriend(ctx context.Context, possibleFriendUserID, user
}
func (f *FriendChecker) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
func (f *FriendChecker) GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) {
cc, err := f.getConn()
if err != nil {
return nil, err
}
page := int32(0)
req := friend.GetPaginationFriendsReq{UserID: ownerUserID}
for {
req.Pagination = &sdkws.RequestPagination{PageNumber: page, ShowNumber: constant.ShowNumber}
tmp, err := friend.NewFriendClient(cc).GetPaginationFriends(ctx, &req)
if err != nil {
return nil, err
}
if len(tmp.FriendsInfo) == 0 {
if tmp.Total == int32(len(resp)) {
return resp, nil
}
return nil, errs.ErrData.Wrap("The total number of results and expectations are different, but result is nil")
}
resp = append(resp, tmp.FriendsInfo...)
page++
req := friend.GetFriendIDsReq{UserID: ownerUserID}
resp, err := friend.NewFriendClient(cc).GetFriendIDs(ctx, &req)
if err != nil {
return nil, err
}
return resp.FriendIDs, err
}
-18
View File
@@ -47,21 +47,3 @@ func (m *MsgCheck) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMess
resp, err := msg.NewMsgClient(cc).PullMessageBySeqs(ctx, req)
return resp, err
}
//func (m *MsgCheck) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
// cc, err := m.getConn()
// if err != nil {
// return nil, err
// }
// resp, err := msg.NewMsgClient(cc).SendMsg(ctx, req)
// return resp, err
//}
//
//func (m *MsgCheck) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) {
// cc, err := m.getConn()
// if err != nil {
// return nil, err
// }
// resp, err := msg.NewMsgClient(cc).SendMsg(ctx, req)
// return resp, err
//}
+4
View File
@@ -9,6 +9,7 @@ import (
"OpenIM/pkg/common/db/unrelation"
"OpenIM/pkg/common/prome"
"fmt"
"sync"
)
type MsgTransfer struct {
@@ -64,6 +65,8 @@ func (m *MsgTransfer) initPrometheus() {
}
func (m *MsgTransfer) Start(prometheusPort int) error {
var wg sync.WaitGroup
wg.Add(4)
if config.Config.ChatPersistenceMysql {
go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
} else {
@@ -76,5 +79,6 @@ func (m *MsgTransfer) Start(prometheusPort int) error {
if err != nil {
return err
}
wg.Wait()
return nil
}
@@ -242,7 +242,7 @@ func (och *OnlineHistoryRedisConsumerHandler) ConsumeClaim(sess sarama.ConsumerG
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Context, message *pbMsg.MsgDataToMQ, pushToUserID string) {
mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: message.MsgData, SourceID: pushToUserID}
pid, offset, err := och.producerToPush.SendMessage(ctx, &mqPushMsg, mqPushMsg.SourceID)
pid, offset, err := och.producerToPush.SendMessage(ctx, mqPushMsg.SourceID, &mqPushMsg)
if err != nil {
log.Error(tracelog.GetOperationID(ctx), "kafka send failed", "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
}
@@ -251,7 +251,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToPushMQ(ctx context.Co
func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ) {
if len(messages) > 0 {
pid, offset, err := och.producerToModify.SendMessage(ctx, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
pid, offset, err := och.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
}
@@ -260,7 +260,7 @@ func (och *OnlineHistoryRedisConsumerHandler) sendMessageToModifyMQ(ctx context.
func (och *OnlineHistoryRedisConsumerHandler) SendMessageToMongoCH(ctx context.Context, aggregationID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) {
if len(messages) > 0 {
pid, offset, err := och.producerToMongo.SendMessage(ctx, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID}, aggregationID)
pid, offset, err := och.producerToMongo.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages, TriggerID: triggerID})
if err != nil {
log.Error(triggerID, "kafka send failed", "send data", len(messages), "pid", pid, "offset", offset, "err", err.Error(), "key", aggregationID)
}
@@ -24,7 +24,7 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.MsgDatabase) *Onli
mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
}
return mc
+2 -2
View File
@@ -83,13 +83,13 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
friends, err := s.friendCheck.GetAllPageFriends(ctx, req.UserInfo.UserID)
friends, err := s.friendCheck.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
go func() {
for _, v := range friends {
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx))
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, tracelog.GetOpUserID(ctx))
}
}()
s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID)
+1 -1
View File
@@ -20,7 +20,7 @@ func start(rpcPort int, rpcRegisterName string, prometheusPorts int, rpcFn func(
flagRpcPort := flag.Int("port", rpcPort, "get RpcGroupPort from cmd,default 16000 as port")
flagPrometheusPort := flag.Int("prometheus_port", prometheusPorts, "groupPrometheusPort default listen port")
flag.Parse()
fmt.Println("start group rpc server, port: ", *flagRpcPort, ", OpenIM version: ", constant.CurrentVersion)
fmt.Println("start group rpc server, port: ", *flagRpcPort, ", OpenIM version: ", config.Version)
log.NewPrivateLog(constant.LogFileName)
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.Config.ListenIP, *flagRpcPort))
if err != nil {
+13 -6
View File
@@ -11,6 +11,7 @@ import (
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/utils"
"context"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"math"
@@ -22,6 +23,8 @@ type MsgTool struct {
groupDatabase controller.GroupDatabase
}
var errSeq = errors.New("cache max seq and mongo max seq is diff > 10")
func NewMsgTool(msgDatabase controller.MsgDatabase, userDatabase controller.UserDatabase, groupDatabase controller.GroupDatabase) *MsgTool {
return &MsgTool{
msgDatabase: msgDatabase,
@@ -125,7 +128,9 @@ func (c *MsgTool) fixGroupSeq(ctx context.Context, groupID string, userIDs []str
continue
}
}
c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
if err := c.CheckMaxSeqWithMongo(ctx, groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", groupID, maxSeqCache, maxSeqMongo, constant.WriteDiffusion)
}
return nil
}
@@ -162,10 +167,11 @@ func (c *MsgTool) GetAndFixGroupUserSeq(ctx context.Context, userID string, grou
return minSeqCache, nil
}
func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) {
func (c *MsgTool) CheckMaxSeqWithMongo(ctx context.Context, sourceID string, maxSeqCache, maxSeqMongo int64, diffusionType int) error {
if math.Abs(float64(maxSeqMongo-maxSeqCache)) > 10 {
log.NewWarn(tracelog.GetOperationID(ctx), "cache max seq and mongo max seq is diff > 10", sourceID, maxSeqCache, maxSeqMongo, diffusionType)
return errSeq
}
return nil
}
func (c *MsgTool) ShowUserSeqs(ctx context.Context, userID string) {
@@ -180,10 +186,10 @@ func (c *MsgTool) ShowSuperGroupUserSeqs(ctx context.Context, groupID, userID st
}
func (c *MsgTool) FixAllSeq(ctx context.Context) {
func (c *MsgTool) FixAllSeq(ctx context.Context) error {
userIDs, err := c.userDatabase.GetAllUserID(ctx)
if err != nil {
panic(err.Error())
return err
}
for _, userID := range userIDs {
userCurrentMinSeq, err := c.msgDatabase.GetUserMinSeq(ctx, userID)
@@ -204,7 +210,7 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) {
fmt.Println("fix users seq success")
groupIDs, err := c.groupDatabase.GetGroupIDsByGroupType(ctx, constant.WorkingGroup)
if err != nil {
panic(err.Error())
return err
}
for _, groupID := range groupIDs {
maxSeq, err := c.msgDatabase.GetGroupMaxSeq(ctx, groupID)
@@ -232,4 +238,5 @@ func (c *MsgTool) FixAllSeq(ctx context.Context) {
}
}
fmt.Println("fix all seq finished")
return nil
}
+274 -48
View File
@@ -2,22 +2,24 @@ package tools
import (
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache"
"OpenIM/pkg/common/tracelog"
"OpenIM/pkg/proto/sdkws"
"OpenIM/pkg/utils"
"context"
"fmt"
"github.com/golang/protobuf/proto"
"go.mongodb.org/mongo-driver/bson"
"strconv"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/db/unrelation"
"testing"
"time"
)
func GenUserChat(startSeq, stopSeq, delSeq, index int64, userID string) *mongo2.UserChat {
chat := &mongo2.UserChat{UID: userID + strconv.Itoa(int(index))}
func GenMsgDoc(startSeq, stopSeq, delSeq, index int64, userID string) *unRelationTb.MsgDocModel {
msgDoc := &unRelationTb.MsgDocModel{DocID: userID + strconv.Itoa(int(index))}
for i := startSeq; i <= stopSeq; i++ {
msg := sdkws.MsgData{
SendID: "sendID1",
@@ -31,57 +33,281 @@ func GenUserChat(startSeq, stopSeq, delSeq, index int64, userID string) *mongo2.
SessionType: 1,
MsgFrom: 100,
ContentType: 101,
Content: []byte("testFaceURL.com"),
Content: []byte("testFaceURL"),
Seq: i,
SendTime: time.Now().Unix(),
CreateTime: time.Now().Unix(),
Status: 1,
}
bytes, _ := proto.Marshal(&msg)
sendTime := 0
chat.Msg = append(chat.Msg, mongo2.MsgInfo{SendTime: int64(sendTime), Msg: bytes})
var sendTime int64
if i <= delSeq {
sendTime = 10000
} else {
sendTime = utils.GetCurrentTimestampByMill()
}
msgDoc.Msg = append(msgDoc.Msg, unRelationTb.MsgInfoModel{SendTime: int64(sendTime), Msg: bytes})
}
return chat
return msgDoc
}
func SetUserMaxSeq(userID string, seq int) error {
return redisClient.Set(context.Background(), "REDIS_USER_INCR_SEQ"+userID, seq, 0).Err()
}
func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) {
operationID := "test"
func CreateChat(userChat *mongo2.UserChat) error {
_, err := mongoClient.InsertOne(context.Background(), userChat)
return err
}
func TestDeleteUserMsgsAndSetMinSeq(t *testing.T) {
operationID := getCronTaskOperationID()
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:16379",
Password: "openIM123", // no password set
DB: 13, // use default DB
})
mongoUri := fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
"root", "openIM123", "127.0.0.1:37017",
"openIM", 100)
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoUri))
mongoClient = client.Database("openIM").Collection("msg")
testUID1 := "test_del_id1"
//testUID2 := "test_del_id2"
//testUID3 := "test_del_id3"
//testUID4 := "test_del_id4"
//testUID5 := "test_del_id5"
//testUID6 := "test_del_id6"
err = SetUserMaxSeq(testUID1, 600)
userChat := GenUserChat(1, 500, 200, 0, testUID1)
err = CreateChat(userChat)
if err := DeleteUserMsgsAndSetMinSeq(operationID, testUID1); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if err := checkMaxSeqWithMongo(operationID, testUID1, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
rdb, err := cache.NewRedis()
if err != nil {
t.Error("err is not nil", testUID1, err.Error())
return
}
mgo, err := unrelation.NewMongo()
if err != nil {
return
}
cacheModel := cache.NewCacheModel(rdb)
mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName())
ctx := context.Background()
tracelog.SetOperationID(ctx, operationID)
testUID1 := "test_del_id1"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID1 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("DeleteOne failed")
return
}
err = cacheModel.SetUserMaxSeq(ctx, testUID1, 600)
if err != nil {
t.Error("SetUserMaxSeq failed")
}
msgDoc := GenMsgDoc(1, 600, 200, 0, testUID1)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID1)
}
msgTools, err := InitMsgTool()
if err != nil {
t.Error("init failed")
return
}
msgTools.ClearUsersMsg(ctx, []string{testUID1})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err := msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID1)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID1, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID1)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 201 {
t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201)
}
/////// uid2
testUID2 := "test_del_id2"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID2 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID2 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID2, 7000)
if err != nil {
t.Error("SetUserMaxSeq failed")
}
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID2)
msgDoc2 := GenMsgDoc(5000, 7000, 6000, 1, testUID2)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID1)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID1)
}
msgTools.ClearUsersMsg(ctx, []string{testUID2})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID2)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID2, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID2)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 6001 {
t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201)
}
/////// uid3
testUID3 := "test_del_id3"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID3 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID3, 4999)
if err != nil {
t.Error("SetUserMaxSeq failed")
}
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID3)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID3)
}
msgTools.ClearUsersMsg(ctx, []string{testUID3})
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID3)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID3, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID3)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 5000 {
t.Error("test1 is not the same", "minSeq:", minSeqCache, "targetSeq", 201)
}
//// uid4
testUID4 := "test_del_id4"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID4 + ":" + strconv.Itoa(2)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID4, 12000)
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID4)
msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID4)
msgDoc3 := GenMsgDoc(10000, 12000, 11000, 2, testUID4)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil {
t.Error("InsertOne failed", testUID4)
}
msgTools.ClearUsersMsg(ctx, []string{testUID4})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID4)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID4, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID4)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 5000 {
t.Error("test1 is not the same", "minSeq:", minSeqCache)
}
testUID5 := "test_del_id5"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID5 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID5 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
err = cacheModel.SetUserMaxSeq(ctx, testUID5, 9999)
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID5)
msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID5)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID5)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID5)
}
msgTools.ClearUsersMsg(ctx, []string{testUID5})
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID5)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID5, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID5)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 10000 {
t.Error("test1 is not the same", "minSeq:", minSeqCache)
}
testUID6 := "test_del_id6"
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(0)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(1)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(2)})
if err != nil {
t.Error("delete failed")
}
_, err = mongoClient.DeleteOne(ctx, bson.M{"uid": testUID6 + ":" + strconv.Itoa(3)})
if err != nil {
t.Error("delete failed")
}
msgDoc = GenMsgDoc(1, 4999, 5000, 0, testUID6)
msgDoc2 = GenMsgDoc(5000, 9999, 10000, 1, testUID6)
msgDoc3 = GenMsgDoc(10000, 14999, 13000, 2, testUID6)
msgDoc4 := GenMsgDoc(15000, 19999, 0, 3, testUID6)
if _, err := mongoClient.InsertOne(ctx, msgDoc); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc2); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc3); err != nil {
t.Error("InsertOne failed", testUID4)
}
if _, err := mongoClient.InsertOne(ctx, msgDoc4); err != nil {
t.Error("InsertOne failed", testUID4)
}
minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache, err = msgTools.msgDatabase.GetUserMinMaxSeqInMongoAndCache(ctx, testUID6)
if err != nil {
t.Error("GetSuperGroupMinMaxSeqInMongoAndCache failed")
return
}
if err := msgTools.CheckMaxSeqWithMongo(ctx, testUID6, maxSeqCache, maxSeqMongo, constant.WriteDiffusion); err != nil {
t.Error("checkMaxSeqWithMongo failed", testUID6)
}
if minSeqMongo != minSeqCache {
t.Error("minSeqMongo != minSeqCache", minSeqMongo, minSeqCache)
}
if minSeqCache != 13001 {
t.Error("test1 is not the same", "minSeq:", minSeqCache)
}
}