mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-06 01:55:58 +08:00
feat: support message cache (#3007)
* pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache
This commit is contained in:
+4
-38
@@ -15,50 +15,16 @@
|
||||
package cachekey
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const (
|
||||
messageCache = "MESSAGE_CACHE:"
|
||||
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
|
||||
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
|
||||
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
||||
exTypeKeyLocker = "EX_LOCK:"
|
||||
reactionExSingle = "EX_SINGLE_"
|
||||
reactionWriteGroup = "EX_GROUP_"
|
||||
reactionReadGroup = "EX_SUPER_GROUP_"
|
||||
reactionNotification = "EX_NOTIFICATION_"
|
||||
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
|
||||
messageCache = "MSG_CACHE:"
|
||||
)
|
||||
|
||||
func GetMessageCacheKey(conversationID string, seq int64) string {
|
||||
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
|
||||
}
|
||||
|
||||
func GetMessageDelUserListKey(conversationID string, seq int64) string {
|
||||
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
|
||||
}
|
||||
|
||||
func GetUserDelListKey(conversationID, userID string) string {
|
||||
return userDelMessagesList + conversationID + ":" + userID
|
||||
}
|
||||
|
||||
func GetMessageReactionExKey(clientMsgID string, sessionType int32) string {
|
||||
switch sessionType {
|
||||
case constant.SingleChatType:
|
||||
return reactionExSingle + clientMsgID
|
||||
case constant.WriteGroupChatType:
|
||||
return reactionWriteGroup + clientMsgID
|
||||
case constant.ReadGroupChatType:
|
||||
return reactionReadGroup + clientMsgID
|
||||
case constant.NotificationChatType:
|
||||
return reactionNotification + clientMsgID
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
func GetLockMessageTypeKey(clientMsgID string, TypeKey string) string {
|
||||
return exTypeKeyLocker + clientMsgID + "_" + TypeKey
|
||||
func GetMsgCacheKey(conversationID string, seq int64) string {
|
||||
return messageCache + conversationID + ":" + strconv.Itoa(int(seq))
|
||||
}
|
||||
|
||||
func GetSendMsgKey(id string) string {
|
||||
|
||||
Vendored
+5
-14
@@ -16,23 +16,14 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
)
|
||||
|
||||
type MsgCache interface {
|
||||
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
|
||||
SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
|
||||
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||||
SetSendMsgStatus(ctx context.Context, id string, status int32) error
|
||||
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
|
||||
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
|
||||
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
|
||||
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
|
||||
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
|
||||
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
|
||||
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
|
||||
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
|
||||
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
|
||||
|
||||
GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
|
||||
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
|
||||
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error
|
||||
}
|
||||
|
||||
+53
-129
@@ -2,10 +2,12 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
@@ -13,76 +15,26 @@ import (
|
||||
) //
|
||||
|
||||
// msgCacheTimeout is expiration time of message cache, 86400 seconds
|
||||
const msgCacheTimeout = 86400
|
||||
const msgCacheTimeout = time.Hour * 24
|
||||
|
||||
func NewMsgCache(client redis.UniversalClient) cache.MsgCache {
|
||||
return &msgCache{rdb: client}
|
||||
func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
|
||||
return &msgCache{
|
||||
rdb: client,
|
||||
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
|
||||
msgDocDatabase: db,
|
||||
}
|
||||
}
|
||||
|
||||
type msgCache struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
|
||||
func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
|
||||
return cachekey.GetMessageCacheKey(conversationID, seq)
|
||||
}
|
||||
func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string {
|
||||
return cachekey.GetMessageDelUserListKey(conversationID, seq)
|
||||
}
|
||||
|
||||
func (c *msgCache) getUserDelList(conversationID, userID string) string {
|
||||
return cachekey.GetUserDelListKey(conversationID, userID)
|
||||
rdb redis.UniversalClient
|
||||
rcClient *rockscache.Client
|
||||
msgDocDatabase database.Msg
|
||||
}
|
||||
|
||||
func (c *msgCache) getSendMsgKey(id string) string {
|
||||
return cachekey.GetSendMsgKey(id)
|
||||
}
|
||||
|
||||
func (c *msgCache) getLockMessageTypeKey(clientMsgID string, TypeKey string) string {
|
||||
return cachekey.GetLockMessageTypeKey(clientMsgID, TypeKey)
|
||||
}
|
||||
|
||||
func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string {
|
||||
return cachekey.GetMessageReactionExKey(clientMsgID, sessionType)
|
||||
}
|
||||
|
||||
func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
|
||||
msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string {
|
||||
return c.getMessageCacheKey(conversationID, msg.Seq)
|
||||
})
|
||||
keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string {
|
||||
return c.getMessageCacheKey(conversationID, msg.Seq)
|
||||
})
|
||||
err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
|
||||
var values []string
|
||||
for _, key := range keys {
|
||||
if msg, ok := msgMap[key]; ok {
|
||||
s, err := msgprocessor.Pb2String(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
values = append(values, s)
|
||||
}
|
||||
}
|
||||
return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, msgCacheTimeout)
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(msgs), nil
|
||||
}
|
||||
|
||||
func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
|
||||
var keys []string
|
||||
for _, seq := range seqs {
|
||||
keys = append(keys, c.getMessageCacheKey(conversationID, seq))
|
||||
}
|
||||
|
||||
return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
|
||||
return LuaDeleteBatch(ctx, c.rdb, keys)
|
||||
})
|
||||
}
|
||||
|
||||
func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
|
||||
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
|
||||
}
|
||||
@@ -92,81 +44,53 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
|
||||
return int32(result), errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
|
||||
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
|
||||
return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err())
|
||||
func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
|
||||
if len(seqs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
getKey := func(seq int64) string {
|
||||
return cachekey.GetMsgCacheKey(conversationID, seq)
|
||||
}
|
||||
getMsgID := func(msg *model.MsgInfoModel) int64 {
|
||||
return msg.Msg.Seq
|
||||
}
|
||||
find := func(ctx context.Context, seqs []int64) ([]*model.MsgInfoModel, error) {
|
||||
return c.msgDocDatabase.FindSeqs(ctx, conversationID, seqs)
|
||||
}
|
||||
return batchGetCache2(ctx, c.rcClient, msgCacheTimeout, seqs, getKey, getMsgID, find)
|
||||
}
|
||||
|
||||
func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error {
|
||||
key := c.getLockMessageTypeKey(clientMsgID, TypeKey)
|
||||
return errs.Wrap(c.rdb.Del(ctx, key).Err())
|
||||
}
|
||||
|
||||
func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) {
|
||||
n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
|
||||
func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error {
|
||||
if len(seqs) == 0 {
|
||||
return nil
|
||||
}
|
||||
keys := datautil.Slice(seqs, func(seq int64) string {
|
||||
return cachekey.GetMsgCacheKey(conversationID, seq)
|
||||
})
|
||||
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys)
|
||||
if err != nil {
|
||||
return false, errs.Wrap(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return n > 0, nil
|
||||
}
|
||||
|
||||
func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error {
|
||||
return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err())
|
||||
}
|
||||
|
||||
func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) {
|
||||
val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()
|
||||
return val, errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) {
|
||||
val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()
|
||||
return val, errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) {
|
||||
val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()
|
||||
return val, errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error {
|
||||
return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err())
|
||||
}
|
||||
|
||||
func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
|
||||
var keys []string
|
||||
keySeqMap := make(map[string]int64, 10)
|
||||
for _, seq := range seqs {
|
||||
key := c.getMessageCacheKey(conversationID, seq)
|
||||
keys = append(keys, key)
|
||||
keySeqMap[key] = seq
|
||||
for _, keys := range slotKeys {
|
||||
if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
|
||||
result, err := LuaGetBatch(ctx, c.rdb, keys)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error {
|
||||
for _, msg := range msgs {
|
||||
if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 {
|
||||
continue
|
||||
}
|
||||
data, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i, value := range result {
|
||||
seq := keySeqMap[keys[i]]
|
||||
if value == nil {
|
||||
failedSeqs = append(failedSeqs, seq)
|
||||
continue
|
||||
}
|
||||
|
||||
msg := &sdkws.MsgData{}
|
||||
msgString, ok := value.(string)
|
||||
if !ok || msgprocessor.String2Pb(msgString, msg) != nil {
|
||||
failedSeqs = append(failedSeqs, seq)
|
||||
continue
|
||||
}
|
||||
seqMsgs = append(seqMsgs, msg)
|
||||
|
||||
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return seqMsgs, failedSeqs, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
-133
@@ -1,133 +0,0 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_msgCache_SetMessagesToCache(t *testing.T) {
|
||||
type fields struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
conversationID string
|
||||
msgs []*sdkws.MsgData
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want int
|
||||
wantErr assert.ErrorAssertionFunc
|
||||
}{
|
||||
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Username: "", Password: "openIM123", DB: 0})}, args{context.Background(),
|
||||
"cid", []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}}, 3, assert.NoError},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &msgCache{
|
||||
rdb: tt.fields.rdb,
|
||||
}
|
||||
got, err := c.SetMessagesToCache(tt.args.ctx, tt.args.conversationID, tt.args.msgs)
|
||||
if !tt.wantErr(t, err, fmt.Sprintf("SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)) {
|
||||
return
|
||||
}
|
||||
assert.Equalf(t, tt.want, got, "SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_msgCache_GetMessagesBySeq(t *testing.T) {
|
||||
type fields struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
conversationID string
|
||||
seqs []int64
|
||||
}
|
||||
var failedSeq []int64
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantSeqMsgs []*sdkws.MsgData
|
||||
wantFailedSeqs []int64
|
||||
wantErr assert.ErrorAssertionFunc
|
||||
}{
|
||||
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})},
|
||||
args{context.Background(), "cid", []int64{1, 2, 3}},
|
||||
[]*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}, failedSeq, assert.NoError},
|
||||
{"test2", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})},
|
||||
args{context.Background(), "cid", []int64{4, 5, 6}},
|
||||
nil, []int64{4, 5, 6}, assert.NoError},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &msgCache{
|
||||
rdb: tt.fields.rdb,
|
||||
}
|
||||
gotSeqMsgs, gotFailedSeqs, err := c.GetMessagesBySeq(tt.args.ctx, tt.args.conversationID, tt.args.seqs)
|
||||
if !tt.wantErr(t, err, fmt.Sprintf("GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) {
|
||||
return
|
||||
}
|
||||
equalMsgDataSlices(t, tt.wantSeqMsgs, gotSeqMsgs)
|
||||
assert.Equalf(t, tt.wantFailedSeqs, gotFailedSeqs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func equalMsgDataSlices(t *testing.T, expected, actual []*sdkws.MsgData) {
|
||||
assert.Equal(t, len(expected), len(actual), "Slices have different lengths")
|
||||
for i := range expected {
|
||||
assert.True(t, proto.Equal(expected[i], actual[i]), "Element %d not equal: expected %v, got %v", i, expected[i], actual[i])
|
||||
}
|
||||
}
|
||||
|
||||
func Test_msgCache_DeleteMessagesFromCache(t *testing.T) {
|
||||
type fields struct {
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
conversationID string
|
||||
seqs []int64
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr assert.ErrorAssertionFunc
|
||||
}{
|
||||
{"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123"})},
|
||||
args{context.Background(), "cid", []int64{1, 2, 3}}, assert.NoError},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &msgCache{
|
||||
rdb: tt.fields.rdb,
|
||||
}
|
||||
tt.wantErr(t, c.DeleteMessagesFromCache(tt.args.ctx, tt.args.conversationID, tt.args.seqs),
|
||||
fmt.Sprintf("DeleteMessagesFromCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs))
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user