feat: local cache

This commit is contained in:
withchao
2024-01-12 17:51:01 +08:00
parent 9908e2c658
commit a81bc3fc23
22 changed files with 245 additions and 316 deletions
-70
View File
@@ -1,70 +0,0 @@
package localcache
import (
"context"
"encoding/json"
"github.com/OpenIMSDK/tools/log"
"github.com/dtm-labs/rockscache"
"github.com/redis/go-redis/v9"
)
func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option {
return WithDeleteLocal(func(fn func(key ...string)) {
if fn == nil {
log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic)
return
}
msg := cli.Subscribe(context.Background(), topic).Channel()
for m := range msg {
log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload)
var key []string
if err := json.Unmarshal([]byte(m.Payload), &key); err != nil {
log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload)
continue
}
if len(key) == 0 {
continue
}
fn(key...)
}
})
}
func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option {
return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
data, err := json.Marshal(key)
if err != nil {
log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key)
return
}
if err := cli.Publish(ctx, topic, data).Err(); err != nil {
log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key)
} else {
log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key)
}
})
}
func WithRedisDelete(cli redis.UniversalClient) Option {
return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
for _, s := range key {
if err := cli.Del(ctx, s).Err(); err != nil {
log.ZError(ctx, "redis delete error", err, "key", s)
} else {
log.ZDebug(ctx, "redis delete success", "key", s)
}
}
})
}
func WithRocksCacheDelete(cli *rockscache.Client) Option {
return WithDeleteKeyBefore(func(ctx context.Context, key ...string) {
for _, k := range key {
if err := cli.TagAsDeleted2(ctx, k); err != nil {
log.ZError(ctx, "rocksdb delete error", err, "key", k)
} else {
log.ZDebug(ctx, "rocksdb delete success", "key", k)
}
}
})
}
-85
View File
@@ -1,85 +0,0 @@
package localcache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local"
lopt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
)
type Cache[V any] interface {
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error)
Del(ctx context.Context, key ...string)
}
func New[V any](opts ...Option) Cache[V] {
opt := defaultOption()
for _, o := range opts {
o(opt)
}
c := cache[V]{opt: opt}
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
go func() {
c.opt.delCh(c.del)
}()
if opt.linkSlotNum > 0 {
c.link = link.New(opt.linkSlotNum)
}
}
return &c
}
type cache[V any] struct {
opt *option
link link.Link
local local.Cache[V]
}
func (c *cache[V]) onEvict(key string, value V) {
if c.link != nil {
lks := c.link.Del(key)
for k := range lks {
if key != k { // prevent deadlock
c.local.Del(k)
}
}
}
}
func (c *cache[V]) del(key ...string) {
for _, k := range key {
lks := c.link.Del(k)
c.local.Del(k)
for k := range lks {
c.local.Del(k)
}
}
}
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) {
if c.local != nil {
return c.local.Get(key, func() (V, error) {
if c.link != nil {
for _, o := range opts {
c.link.Link(key, o.Link...)
}
}
return fetch(ctx)
})
} else {
return fetch(ctx)
}
}
func (c *cache[V]) Del(ctx context.Context, key ...string) {
if len(key) == 0 {
return
}
for _, fn := range c.opt.delFn {
fn(ctx, key...)
}
if c.local != nil {
c.del(key...)
}
}
-5
View File
@@ -1,5 +0,0 @@
package localcache
import "github.com/hashicorp/golang-lru/v2/simplelru"
type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V]
-109
View File
@@ -1,109 +0,0 @@
package link
import (
"hash/fnv"
"sync"
"unsafe"
)
type Link interface {
Link(key string, link ...string)
Del(key string) map[string]struct{}
}
func newLinkKey() *linkKey {
return &linkKey{
data: make(map[string]map[string]struct{}),
}
}
type linkKey struct {
lock sync.Mutex
data map[string]map[string]struct{}
}
func (x *linkKey) link(key string, link ...string) {
x.lock.Lock()
defer x.lock.Unlock()
v, ok := x.data[key]
if !ok {
v = make(map[string]struct{})
x.data[key] = v
}
for _, k := range link {
v[k] = struct{}{}
}
}
func (x *linkKey) del(key string) map[string]struct{} {
x.lock.Lock()
defer x.lock.Unlock()
ks, ok := x.data[key]
if !ok {
return nil
}
delete(x.data, key)
return ks
}
func New(n int) Link {
if n <= 0 {
panic("must be greater than 0")
}
slots := make([]*linkKey, n)
for i := 0; i < len(slots); i++ {
slots[i] = newLinkKey()
}
return &slot{
n: uint64(n),
slots: slots,
}
}
type slot struct {
n uint64
slots []*linkKey
}
func (x *slot) index(s string) uint64 {
h := fnv.New64a()
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
return h.Sum64() % x.n
}
func (x *slot) Link(key string, link ...string) {
if len(link) == 0 {
return
}
mk := key
lks := make([]string, len(link))
for i, k := range link {
lks[i] = k
}
x.slots[x.index(mk)].link(mk, lks...)
for _, lk := range lks {
x.slots[x.index(lk)].link(lk, mk)
}
}
func (x *slot) Del(key string) map[string]struct{} {
return x.delKey(key)
}
func (x *slot) delKey(k string) map[string]struct{} {
del := make(map[string]struct{})
stack := []string{k}
for len(stack) > 0 {
curr := stack[len(stack)-1]
stack = stack[:len(stack)-1]
if _, ok := del[curr]; ok {
continue
}
del[curr] = struct{}{}
childKeys := x.slots[x.index(curr)].del(curr)
for ck := range childKeys {
stack = append(stack, ck)
}
}
return del
}
-20
View File
@@ -1,20 +0,0 @@
package link
import (
"testing"
)
func TestName(t *testing.T) {
v := New(1)
//v.Link("a:1", "b:1", "c:1", "d:1")
v.Link("a:1", "b:1", "c:1")
v.Link("z:1", "b:1")
//v.DelKey("a:1")
v.Del("z:1")
t.Log(v)
}
-89
View File
@@ -1,89 +0,0 @@
package localcache
import (
"github.com/hashicorp/golang-lru/v2/simplelru"
"sync"
"time"
)
type waitItem[V any] struct {
lock sync.Mutex
expires int64
active bool
err error
value V
}
func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LRU[K, V] {
var cb simplelru.EvictCallback[K, *waitItem[V]]
if onEvict != nil {
cb = func(key K, value *waitItem[V]) {
onEvict(key, value.value)
}
}
core, err := simplelru.NewLRU[K, *waitItem[V]](size, cb)
if err != nil {
panic(err)
}
return &LRU[K, V]{
core: core,
successTTL: successTTL,
failedTTL: failedTTL,
target: target,
s: NewSingleFlight[K, V](),
}
}
type LRU[K comparable, V any] struct {
lock sync.Mutex
core *simplelru.LRU[K, *waitItem[V]]
successTTL time.Duration
failedTTL time.Duration
target Target
s *SingleFlight[K, V]
}
func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock()
v, ok := x.core.Get(key)
if ok {
x.lock.Unlock()
v.lock.Lock()
expires, value, err := v.expires, v.value, v.err
if expires != 0 && expires > time.Now().UnixMilli() {
v.lock.Unlock()
x.target.IncrGetHit()
return value, err
}
} else {
v = &waitItem[V]{}
x.core.Add(key, v)
v.lock.Lock()
x.lock.Unlock()
}
defer v.lock.Unlock()
if v.expires > time.Now().UnixMilli() {
return v.value, v.err
}
v.value, v.err = fetch()
if v.err == nil {
v.expires = time.Now().Add(x.successTTL).UnixMilli()
x.target.IncrGetSuccess()
} else {
v.expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed()
}
return v.value, v.err
}
func (x *LRU[K, V]) Del(key K) bool {
x.lock.Lock()
ok := x.core.Remove(key)
x.lock.Unlock()
if ok {
x.target.IncrDelHit()
} else {
x.target.IncrDelNotFound()
}
return ok
}
-55
View File
@@ -1,55 +0,0 @@
package localcache
//func TestName(t *testing.T) {
// target := &cacheTarget{}
// l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil)
// //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target)
//
// fn := func(key string, n int, fetch func() (string, error)) {
// for i := 0; i < n; i++ {
// //v, err := l.Get(key, fetch)
// //if err == nil {
// // t.Log("key", key, "value", v)
// //} else {
// // t.Error("key", key, err)
// //}
// l.Get(key, fetch)
// //time.Sleep(time.Second / 100)
// }
// }
//
// tmp := make(map[string]struct{})
//
// var wg sync.WaitGroup
// for i := 0; i < 10000; i++ {
// wg.Add(1)
// key := fmt.Sprintf("key_%d", i%200)
// tmp[key] = struct{}{}
// go func() {
// defer wg.Done()
// //t.Log(key)
// fn(key, 10000, func() (string, error) {
// //time.Sleep(time.Second * 3)
// //t.Log(time.Now(), "key", key, "fetch")
// //if rand.Uint32()%5 == 0 {
// // return "value_" + key, nil
// //}
// //return "", errors.New("rand error")
// return "value_" + key, nil
// })
// }()
//
// //wg.Add(1)
// //go func() {
// // defer wg.Done()
// // for i := 0; i < 10; i++ {
// // l.Del(key)
// // time.Sleep(time.Second / 3)
// // }
// //}()
// }
// wg.Wait()
// t.Log(len(tmp))
// t.Log(target.String())
//
//}
-115
View File
@@ -1,115 +0,0 @@
package localcache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local"
"time"
)
func defaultOption() *option {
return &option{
localSlotNum: 500,
localSlotSize: 20000,
linkSlotNum: 500,
localSuccessTTL: time.Minute,
localFailedTTL: time.Second * 5,
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
target: emptyTarget{},
}
}
type option struct {
localSlotNum int
localSlotSize int
linkSlotNum int
localSuccessTTL time.Duration
localFailedTTL time.Duration
delFn []func(ctx context.Context, key ...string)
delCh func(fn func(key ...string))
target local.Target
}
type Option func(o *option)
func WithLocalDisable() Option {
return WithLinkSlotNum(0)
}
func WithLinkDisable() Option {
return WithLinkSlotNum(0)
}
func WithLinkSlotNum(linkSlotNum int) Option {
return func(o *option) {
o.linkSlotNum = linkSlotNum
}
}
func WithLocalSlotNum(localSlotNum int) Option {
return func(o *option) {
o.localSlotNum = localSlotNum
}
}
func WithLocalSlotSize(localSlotSize int) Option {
return func(o *option) {
o.localSlotSize = localSlotSize
}
}
func WithLocalSuccessTTL(localSuccessTTL time.Duration) Option {
if localSuccessTTL < 0 {
panic("localSuccessTTL should be greater than 0")
}
return func(o *option) {
o.localSuccessTTL = localSuccessTTL
}
}
func WithLocalFailedTTL(localFailedTTL time.Duration) Option {
if localFailedTTL < 0 {
panic("localFailedTTL should be greater than 0")
}
return func(o *option) {
o.localFailedTTL = localFailedTTL
}
}
func WithTarget(target local.Target) Option {
if target == nil {
panic("target should not be nil")
}
return func(o *option) {
o.target = target
}
}
func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option {
if fn == nil {
panic("fn should not be nil")
}
return func(o *option) {
o.delFn = append(o.delFn, fn)
}
}
func WithDeleteLocal(fn func(fn func(key ...string))) Option {
if fn == nil {
panic("fn should not be nil")
}
return func(o *option) {
o.delCh = fn
}
}
type emptyTarget struct{}
func (e emptyTarget) IncrGetHit() {}
func (e emptyTarget) IncrGetSuccess() {}
func (e emptyTarget) IncrGetFailed() {}
func (e emptyTarget) IncrDelHit() {}
func (e emptyTarget) IncrDelNotFound() {}
-20
View File
@@ -1,20 +0,0 @@
package option
func NewOption() *Option {
return &Option{}
}
type Option struct {
Link []string
}
func (o *Option) WithLink(key ...string) *Option {
if len(key) > 0 {
if len(o.Link) == 0 {
o.Link = key
} else {
o.Link = append(o.Link, key...)
}
}
return o
}
-43
View File
@@ -1,43 +0,0 @@
package localcache
import "sync"
type call[K comparable, V any] struct {
wg sync.WaitGroup
val V
err error
}
type SingleFlight[K comparable, V any] struct {
mu sync.Mutex
m map[K]*call[K, V]
}
func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V] {
return &SingleFlight[K, V]{m: make(map[K]*call[K, V])}
}
func (r *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (V, error) {
r.mu.Lock()
if r.m == nil {
r.m = make(map[K]*call[K, V])
}
if c, ok := r.m[key]; ok {
r.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call[K, V])
c.wg.Add(1)
r.m[key] = c
r.mu.Unlock()
c.val, c.err = fn()
c.wg.Done()
r.mu.Lock()
delete(r.m, key)
r.mu.Unlock()
return c.val, c.err
}
-59
View File
@@ -1,59 +0,0 @@
package localcache
import (
"fmt"
"sync/atomic"
)
type Target interface {
IncrGetHit()
IncrGetSuccess()
IncrGetFailed()
IncrDelHit()
IncrDelNotFound()
}
type cacheTarget struct {
getHit int64
getSuccess int64
getFailed int64
delHit int64
delNotFound int64
}
func (r *cacheTarget) IncrGetHit() {
atomic.AddInt64(&r.getHit, 1)
}
func (r *cacheTarget) IncrGetSuccess() {
atomic.AddInt64(&r.getSuccess, 1)
}
func (r *cacheTarget) IncrGetFailed() {
atomic.AddInt64(&r.getFailed, 1)
}
func (r *cacheTarget) IncrDelHit() {
atomic.AddInt64(&r.delHit, 1)
}
func (r *cacheTarget) IncrDelNotFound() {
atomic.AddInt64(&r.delNotFound, 1)
}
func (r *cacheTarget) String() string {
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
}
type emptyTarget struct{}
func (e emptyTarget) IncrGetHit() {}
func (e emptyTarget) IncrGetSuccess() {}
func (e emptyTarget) IncrGetFailed() {}
func (e emptyTarget) IncrDelHit() {}
func (e emptyTarget) IncrDelNotFound() {}
-71
View File
@@ -1,71 +0,0 @@
package localcache
import (
"sync"
"time"
)
type Execute[K comparable, V any] func(K, V)
type Task[K comparable, V any] struct {
key K
value V
}
type TimeWheel[K comparable, V any] struct {
ticker *time.Ticker
slots [][]Task[K, V]
currentPos int
size int
slotMutex sync.Mutex
execute Execute[K, V]
}
func NewTimeWheel[K comparable, V any](size int, tickDuration time.Duration, execute Execute[K, V]) *TimeWheel[K, V] {
return &TimeWheel[K, V]{
ticker: time.NewTicker(tickDuration),
slots: make([][]Task[K, V], size),
currentPos: 0,
size: size,
execute: execute,
}
}
func (tw *TimeWheel[K, V]) Start() {
for range tw.ticker.C {
tw.tick()
}
}
func (tw *TimeWheel[K, V]) Stop() {
tw.ticker.Stop()
}
func (tw *TimeWheel[K, V]) tick() {
tw.slotMutex.Lock()
defer tw.slotMutex.Unlock()
tasks := tw.slots[tw.currentPos]
tw.slots[tw.currentPos] = nil
if len(tasks) > 0 {
go func(tasks []Task[K, V]) {
for _, task := range tasks {
tw.execute(task.key, task.value)
}
}(tasks)
}
tw.currentPos = (tw.currentPos + 1) % tw.size
}
func (tw *TimeWheel[K, V]) AddTask(delay int, task Task[K, V]) {
if delay < 0 || delay >= tw.size {
return
}
tw.slotMutex.Lock()
defer tw.slotMutex.Unlock()
pos := (tw.currentPos + delay) % tw.size
tw.slots[pos] = append(tw.slots[pos], task)
}
-9
View File
@@ -1,9 +0,0 @@
package localcache
func AnyValue[V any](v any, err error) (V, error) {
if err != nil {
var zero V
return zero, err
}
return v.(V), nil
}