add redis pipeline

This commit is contained in:
wangchuxiao
2023-03-24 11:29:39 +08:00
parent 39c35e840d
commit 59fe9f0ca9
2 changed files with 45 additions and 32 deletions
+42 -30
View File
@@ -13,8 +13,9 @@ import (
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/jsonpb"
"google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
)
@@ -212,28 +213,30 @@ func (c *cache) DeleteTokenByUidPid(ctx context.Context, userID string, platform
return utils.Wrap1(c.rdb.HDel(ctx, key, fields...).Err())
}
func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) {
var errResult error
for _, v := range seqList {
func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
pipe := c.rdb.Pipeline()
for _, v := range seqs {
//MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1
key := messageCache + userID + "_" + strconv.Itoa(int(v))
result, err := c.rdb.Get(ctx, key).Result()
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(result, &msg)
if err != nil {
errResult = err
failedSeqList = append(failedSeqList, v)
} else {
seqMsg = append(seqMsg, &msg)
}
if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil {
return nil, nil, err
}
}
return seqMsg, failedSeqList, errResult
result, err := pipe.Exec(ctx)
for i, v := range result {
if v.Err() != nil {
failedSeqs = append(failedSeqs, seqs[i])
} else {
msg := sdkws.MsgData{}
err = jsonpb.UnmarshalString(v.String(), &msg)
if err != nil {
failedSeqs = append(failedSeqs, seqs[i])
} else {
seqMsgs = append(seqMsgs, &msg)
}
}
}
return seqMsgs, failedSeqs, err
}
func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) (int, error) {
@@ -258,12 +261,14 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []
}
func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*pbMsg.MsgDataToMQ) error {
pipe := c.rdb.Pipeline()
for _, v := range msgList {
if err := c.rdb.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(v.MsgData.Seq))).Err(); err != nil {
if err := pipe.Del(ctx, messageCache+userID+"_"+strconv.Itoa(int(v.MsgData.Seq))).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
_, err := pipe.Exec(ctx)
return utils.Wrap1(err)
}
func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
@@ -275,12 +280,14 @@ func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error {
if err != nil {
return utils.Wrap1(err)
}
pipe := c.rdb.Pipeline()
for _, v := range vals {
if err := c.rdb.Del(ctx, v).Err(); err != nil {
if err := pipe.Del(ctx, v).Err(); err != nil {
return utils.Wrap1(err)
}
}
return nil
_, err = pipe.Exec(ctx)
return utils.Wrap1(err)
}
func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) {
@@ -306,26 +313,31 @@ func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, push
return false, nil
}
if isInviteSignal {
pipe := c.rdb.Pipeline()
for _, userID := range inviteeUserIDs {
timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout)
if err != nil {
return false, utils.Wrap1(err)
}
keyList := signalListCache + userID
err = c.rdb.LPush(ctx, keyList, msg.ClientMsgID).Err()
keys := signalListCache + userID
err = pipe.LPush(ctx, keys, msg.ClientMsgID).Err()
if err != nil {
return false, utils.Wrap1(err)
}
err = c.rdb.Expire(ctx, keyList, time.Duration(timeout)*time.Second).Err()
err = pipe.Expire(ctx, keys, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, utils.Wrap1(err)
}
key := signalCache + msg.ClientMsgID
err = c.rdb.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
err = pipe.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err()
if err != nil {
return false, utils.Wrap1(err)
}
}
_, err := pipe.Exec(ctx)
if err != nil {
return false, utils.Wrap1(err)
}
}
return true, nil
}
@@ -367,8 +379,8 @@ func (c *cache) DelUserSignalList(ctx context.Context, userID string) error {
return utils.Wrap1(c.rdb.Del(ctx, signalListCache+userID).Err())
}
func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error {
for _, seq := range seqList {
func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error {
for _, seq := range seqs {
key := messageCache + userID + "_" + strconv.Itoa(int(seq))
result, err := c.rdb.Get(ctx, key).Result()
if err != nil {