feat: msg local cache

This commit is contained in:
withchao
2024-01-08 15:39:39 +08:00
parent f27b1e43f5
commit d9921248a7
23 changed files with 778 additions and 68 deletions
+51
View File
@@ -0,0 +1,51 @@
package local
import (
"hash/fnv"
"time"
"unsafe"
)
type Cache[V any] interface {
Get(key string, fetch func() (V, error)) (V, error)
Del(key string) bool
}
func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target) Cache[V] {
c := &cache[V]{
n: uint64(slotNum),
slots: make([]*LRU[string, V], slotNum),
target: target,
}
for i := 0; i < slotNum; i++ {
c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target)
}
return c
}
type cache[V any] struct {
n uint64
slots []*LRU[string, V]
target Target
}
func (c *cache[V]) index(s string) uint64 {
h := fnv.New64a()
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
//_, _ = h.Write([]byte(s))
return h.Sum64() % c.n
}
func (c *cache[V]) Get(key string, fetch func() (V, error)) (V, error) {
return c.slots[c.index(key)].Get(key, fetch)
}
func (c *cache[V]) Del(key string) bool {
if c.slots[c.index(key)].Del(key) {
c.target.IncrDelHit()
return true
} else {
c.target.IncrDelNotFound()
return false
}
}
+76
View File
@@ -0,0 +1,76 @@
package local
import (
"github.com/hashicorp/golang-lru/v2/simplelru"
"sync"
"time"
)
type waitItem[V any] struct {
lock sync.Mutex
expires int64
active bool
err error
value V
}
func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target) *LRU[K, V] {
core, err := simplelru.NewLRU[K, *waitItem[V]](size, nil)
if err != nil {
panic(err)
}
return &LRU[K, V]{
core: core,
successTTL: successTTL,
failedTTL: failedTTL,
target: target,
}
}
type LRU[K comparable, V any] struct {
lock sync.Mutex
core *simplelru.LRU[K, *waitItem[V]]
successTTL time.Duration
failedTTL time.Duration
target Target
}
func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock()
v, ok := x.core.Get(key)
if ok {
x.lock.Unlock()
v.lock.Lock()
expires, value, err := v.expires, v.value, v.err
if expires != 0 && expires > time.Now().UnixMilli() {
v.lock.Unlock()
x.target.IncrGetHit()
return value, err
}
} else {
v = &waitItem[V]{}
x.core.Add(key, v)
v.lock.Lock()
x.lock.Unlock()
}
defer v.lock.Unlock()
if v.expires > time.Now().UnixMilli() {
return v.value, v.err
}
v.value, v.err = fetch()
if v.err == nil {
v.expires = time.Now().Add(x.successTTL).UnixMilli()
x.target.IncrGetSuccess()
} else {
v.expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed()
}
return v.value, v.err
}
func (x *LRU[K, V]) Del(key K) bool {
x.lock.Lock()
ok := x.core.Remove(key)
x.lock.Unlock()
return ok
}
+95
View File
@@ -0,0 +1,95 @@
package local
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)
type cacheTarget struct {
getHit int64
getSuccess int64
getFailed int64
delHit int64
delNotFound int64
}
func (r *cacheTarget) IncrGetHit() {
atomic.AddInt64(&r.getHit, 1)
}
func (r *cacheTarget) IncrGetSuccess() {
atomic.AddInt64(&r.getSuccess, 1)
}
func (r *cacheTarget) IncrGetFailed() {
atomic.AddInt64(&r.getFailed, 1)
}
func (r *cacheTarget) IncrDelHit() {
atomic.AddInt64(&r.delHit, 1)
}
func (r *cacheTarget) IncrDelNotFound() {
atomic.AddInt64(&r.delNotFound, 1)
}
func (r *cacheTarget) String() string {
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
}
func TestName(t *testing.T) {
target := &cacheTarget{}
l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target)
//l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target)
fn := func(key string, n int, fetch func() (string, error)) {
for i := 0; i < n; i++ {
//v, err := l.Get(key, fetch)
//if err == nil {
// t.Log("key", key, "value", v)
//} else {
// t.Error("key", key, err)
//}
l.Get(key, fetch)
//time.Sleep(time.Second / 100)
}
}
tmp := make(map[string]struct{})
var wg sync.WaitGroup
for i := 0; i < 10000; i++ {
wg.Add(1)
key := fmt.Sprintf("key_%d", i%200)
tmp[key] = struct{}{}
go func() {
defer wg.Done()
//t.Log(key)
fn(key, 10000, func() (string, error) {
//time.Sleep(time.Second * 3)
//t.Log(time.Now(), "key", key, "fetch")
//if rand.Uint32()%5 == 0 {
// return "value_" + key, nil
//}
//return "", errors.New("rand error")
return "value_" + key, nil
})
}()
//wg.Add(1)
//go func() {
// defer wg.Done()
// for i := 0; i < 10; i++ {
// l.Del(key)
// time.Sleep(time.Second / 3)
// }
//}()
}
wg.Wait()
t.Log(len(tmp))
t.Log(target.String())
}
+10
View File
@@ -0,0 +1,10 @@
package local
type Target interface {
IncrGetHit()
IncrGetSuccess()
IncrGetFailed()
IncrDelHit()
IncrDelNotFound()
}