delete sync

This commit is contained in:
wangchuxiao
2023-06-02 16:00:38 +08:00
parent ff7b6e200c
commit a918281625
12 changed files with 348 additions and 300 deletions
+24 -18
View File
@@ -47,6 +47,7 @@ type MsgModel interface {
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMinSeq(ctx context.Context, conversationID string) (int64, error)
GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
@@ -162,6 +163,23 @@ func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64,
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return c.setSeq(ctx, conversationID, minSeq, c.getMinSeqKey)
}
func (c *msgCache) setSeqs(ctx context.Context, seqs map[string]int64, getkey func(key string) string) error {
pipe := c.rdb.Pipeline()
for k, seq := range seqs {
err := pipe.Set(ctx, getkey(k), seq, 0).Err()
if err != nil {
return errs.Wrap(err)
}
}
_, err := pipe.Exec(ctx)
return err
}
func (c *msgCache) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return c.setSeqs(ctx, seqs, c.getMinSeqKey)
}
func (c *msgCache) GetMinSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
return c.getSeqs(ctx, conversationIDs, c.getMinSeqKey)
}
@@ -187,27 +205,15 @@ func (c *msgCache) SetConversationUserMinSeq(ctx context.Context, conversationID
}
func (c *msgCache) SetConversationUserMinSeqs(ctx context.Context, conversationID string, seqs map[string]int64) (err error) {
pipe := c.rdb.Pipeline()
for userID, minSeq := range seqs {
err = pipe.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()
if err != nil {
return errs.Wrap(err)
}
}
_, err = pipe.Exec(ctx)
return err
return c.setSeqs(ctx, seqs, func(userID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) {
pipe := c.rdb.Pipeline()
for conversationID, minSeq := range seqs {
err = pipe.Set(ctx, c.getConversationUserMinSeqKey(conversationID, userID), minSeq, 0).Err()
if err != nil {
return errs.Wrap(err)
}
}
_, err = pipe.Exec(ctx)
return err
return c.setSeqs(ctx, seqs, func(conversationID string) string {
return c.getConversationUserMinSeqKey(conversationID, userID)
})
}
func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error {