This commit is contained in:
wangchuxiao
2023-06-06 18:46:06 +08:00
parent adb572a88f
commit 85da9e0ac8
14 changed files with 844 additions and 826 deletions
+22 -1
View File
@@ -26,6 +26,7 @@ const (
maxSeq = "MAX_SEQ:"
minSeq = "MIN_SEQ:"
conversationUserMinSeq = "CON_USER_MIN_SEQ:"
hasReadSeq = "HAS_READ_SEQ:"
appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN"
@@ -42,7 +43,7 @@ const (
uidPidToken = "UID_PID_TOKEN_STATUS:"
)
type MsgModel interface {
type SeqCache interface {
SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
@@ -57,7 +58,13 @@ type MsgModel interface {
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
// seqs map: key conversationID value minSeq
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
// has read seq
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
}
type MsgModel interface {
SeqCache
AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error
GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error)
SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error
@@ -116,6 +123,10 @@ func (c *msgCache) getMinSeqKey(conversationID string) string {
return minSeq + conversationID
}
func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string {
return hasReadSeq + userID + ":" + conversationID
}
func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error {
return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err())
}
@@ -216,6 +227,16 @@ func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID strin
})
}
func (c *msgCache) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return utils.Wrap1(c.rdb.Set(ctx, c.getHasReadSeqKey(conversationID, userID), hasReadSeq, 0).Err())
}
func (c *msgCache) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, func(conversationID string) string {
return c.getHasReadSeqKey(conversationID, userID)
})
}
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {
key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID)
return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err())
+10
View File
@@ -65,6 +65,9 @@ type CommonMsgDatabase interface {
SetConversationUserMinSeq(ctx context.Context, conversationID string, userID string, minSeq int64) error
SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error)
SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (maxSeq, minSeq int64, err error)
GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error)
SetSendMsgStatus(ctx context.Context, id string, status int32) error
@@ -704,6 +707,13 @@ func (db *commonMsgDatabase) SetUserConversationsMinSeqs(ctx context.Context, us
return db.cache.SetUserConversationsMinSeqs(ctx, userID, seqs)
}
func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.cache.SetHasReadSeq(ctx, userID, conversationID, hasReadSeq)
}
func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.cache.GetHasReadSeqs(ctx, userID, conversationIDs)
}
func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return db.cache.SetSendMsgStatus(ctx, id, status)
}
+6 -6
View File
@@ -11,8 +11,8 @@ import (
type ConversationLocalCache struct {
lock sync.Mutex
SuperGroupRecvMsgNotNotifyUserIDs map[string]Hash
ConversationIDs map[string]Hash
superGroupRecvMsgNotNotifyUserIDs map[string]Hash
conversationIDs map[string]Hash
client discoveryregistry.SvcDiscoveryRegistry
}
@@ -23,8 +23,8 @@ type Hash struct {
func NewConversationLocalCache(client discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache {
return &ConversationLocalCache{
SuperGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
ConversationIDs: make(map[string]Hash),
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
conversationIDs: make(map[string]Hash),
client: client,
}
}
@@ -58,7 +58,7 @@ func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID
}
g.lock.Lock()
defer g.lock.Unlock()
hash, ok := g.ConversationIDs[userID]
hash, ok := g.conversationIDs[userID]
if !ok || hash.hash != resp.Hash {
conversationIDsResp, err := client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{
UserID: userID,
@@ -66,7 +66,7 @@ func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID
if err != nil {
return nil, err
}
g.ConversationIDs[userID] = Hash{
g.conversationIDs[userID] = Hash{
hash: resp.Hash,
ids: conversationIDsResp.ConversationIDs,
}