Compare commits

..

1 Commits

Author SHA1 Message Date
mo3et d32cd465fd Update CHANGELOG for release v3.8.3-patch.8 2025-08-13 10:28:27 +00:00
9 changed files with 42 additions and 60 deletions
+4 -6
View File
@@ -1,10 +1,8 @@
## [v3.8.3-patch.9](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.9) (2025-08-19) ## [v3.8.3-patch.8](https://github.com/openimsdk/open-im-server/releases/tag/v3.8.3-patch.8) (2025-08-13)
### New Features
* feat: enable redis aof-use-rdb-preamble && disable auto rdb [Created [#3535](https://github.com/openimsdk/open-im-server/pull/3535)
### Bug Fixes ### Bug Fixes
* fix: fill in the most recent sendTime for a gap message to prevent th… [#3523](https://github.com/openimsdk/open-im-server/pull/3523) * fix: fix incorrect kicked logic and PCAndOther Login policy In v3.8.3-patch [#3511](https://github.com/openimsdk/open-im-server/pull/3511)
* fix: solve batch incorrect error in Find DocIDs in v3.8.3-patch branch. [#3515](https://github.com/openimsdk/open-im-server/pull/3515)
**Full Changelog**: [v3.8.3-patch.8...v3.8.3-patch.9](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.8...v3.8.3-patch.9) **Full Changelog**: [v3.8.3-patch.7...v3.8.3-patch.8](https://github.com/openimsdk/open-im-server/compare/v3.8.3-patch.7...v3.8.3-patch.8)
+1 -6
View File
@@ -63,12 +63,7 @@ services:
restart: always restart: always
sysctls: sysctls:
net.core.somaxconn: 1024 net.core.somaxconn: 1024
command: > command: redis-server /usr/local/redis/config/redis.conf --requirepass openIM123 --appendonly yes
redis-server
--requirepass openIM123
--appendonly yes
--aof-use-rdb-preamble yes
--save ""
networks: networks:
- openim - openim
+1 -1
View File
@@ -24,7 +24,7 @@ var (
func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache { func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache {
initMemMsgCache.Do(func() { initMemMsgCache.Do(func() {
memMsgCache = lru.NewLazyLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil) memMsgCache = lru.NewLayLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
}) })
return &msgCache{ return &msgCache{
cache: cache, cache: cache,
+6 -7
View File
@@ -5,11 +5,6 @@ import (
"fmt" "fmt"
"time" "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
@@ -19,6 +14,10 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
func NewMsgMongo(db *mongo.Database) (database.Msg, error) { func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
@@ -1155,7 +1154,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
for i := len(res) - 1; i >= 0; i-- { for i := len(res) - 1; i > 0; i-- {
v := res[i] v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 { if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
@@ -1170,7 +1169,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
limit := int64(-1) limit := int64(-1)
if first { if first {
first = false first = false
limit = m.model.GetLimitForSingleDoc(seq) limit = m.model.GetMsgIndex(seq)
} }
docID := m.model.BuildDocIDByIndex(conversationID, i) docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit) msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
-4
View File
@@ -132,10 +132,6 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
return (seq - 1) % singleGocMsgNum return (seq - 1) % singleGocMsgNum
} }
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
return seq % singleGocMsgNum
}
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
} }
+1 -1
View File
@@ -49,7 +49,7 @@ func New[V any](opts ...Option) Cache[V] {
if opt.expirationEvict { if opt.expirationEvict {
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} else { } else {
return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} }
} }
if opt.localSlotNum == 1 { if opt.localSlotNum == 1 {
+25 -31
View File
@@ -21,25 +21,25 @@ import (
"github.com/hashicorp/golang-lru/v2/simplelru" "github.com/hashicorp/golang-lru/v2/simplelru"
) )
type lazyLruItem[V any] struct { type layLruItem[V any] struct {
lock sync.Mutex lock sync.Mutex
expires int64 expires int64
err error err error
value V value V
} }
func NewLazyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LazyLRU[K, V] { func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] {
var cb simplelru.EvictCallback[K, *lazyLruItem[V]] var cb simplelru.EvictCallback[K, *layLruItem[V]]
if onEvict != nil { if onEvict != nil {
cb = func(key K, value *lazyLruItem[V]) { cb = func(key K, value *layLruItem[V]) {
onEvict(key, value.value) onEvict(key, value.value)
} }
} }
core, err := simplelru.NewLRU[K, *lazyLruItem[V]](size, cb) core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return &LazyLRU[K, V]{ return &LayLRU[K, V]{
core: core, core: core,
successTTL: successTTL, successTTL: successTTL,
failedTTL: failedTTL, failedTTL: failedTTL,
@@ -47,15 +47,15 @@ func NewLazyLRU[K comparable, V any](size int, successTTL, failedTTL time.Durati
} }
} }
type LazyLRU[K comparable, V any] struct { type LayLRU[K comparable, V any] struct {
lock sync.Mutex lock sync.Mutex
core *simplelru.LRU[K, *lazyLruItem[V]] core *simplelru.LRU[K, *layLruItem[V]]
successTTL time.Duration successTTL time.Duration
failedTTL time.Duration failedTTL time.Duration
target Target target Target
} }
func (x *LazyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock() x.lock.Lock()
v, ok := x.core.Get(key) v, ok := x.core.Get(key)
if ok { if ok {
@@ -68,7 +68,7 @@ func (x *LazyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return value, err return value, err
} }
} else { } else {
v = &lazyLruItem[V]{} v = &layLruItem[V]{}
x.core.Add(key, v) x.core.Add(key, v)
v.lock.Lock() v.lock.Lock()
x.lock.Unlock() x.lock.Unlock()
@@ -88,15 +88,15 @@ func (x *LazyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return v.value, v.err return v.value, v.err
} }
func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var ( var (
err error err error
once sync.Once once sync.Once
) )
res := make(map[K]V) res := make(map[K]V)
queries := make([]K, 0, len(keys)) queries := make([]K, 0)
setVs := make(map[K]*layLruItem[V])
for _, key := range keys { for _, key := range keys {
x.lock.Lock() x.lock.Lock()
v, ok := x.core.Get(key) v, ok := x.core.Get(key)
@@ -118,20 +118,14 @@ func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
} }
queries = append(queries, key) queries = append(queries, key)
} }
values, err1 := fetch(queries)
if len(queries) == 0 { if err1 != nil {
return res, err
}
values, fetchErr := fetch(queries)
if fetchErr != nil {
once.Do(func() { once.Do(func() {
err = fetchErr err = err1
}) })
} }
for key, val := range values { for key, val := range values {
v := &lazyLruItem[V]{} v := &layLruItem[V]{}
v.value = val v.value = val
if err == nil { if err == nil {
@@ -141,7 +135,7 @@ func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
v.expires = time.Now().Add(x.failedTTL).UnixMilli() v.expires = time.Now().Add(x.failedTTL).UnixMilli()
x.target.IncrGetFailed() x.target.IncrGetFailed()
} }
setVs[key] = v
x.lock.Lock() x.lock.Lock()
x.core.Add(key, v) x.core.Add(key, v)
x.lock.Unlock() x.lock.Unlock()
@@ -151,29 +145,29 @@ func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
return res, err return res, err
} }
//func (x *LazyLRU[K, V]) Has(key K) bool { //func (x *LayLRU[K, V]) Has(key K) bool {
// x.lock.Lock() // x.lock.Lock()
// defer x.lock.Unlock() // defer x.lock.Unlock()
// return x.core.Contains(key) // return x.core.Contains(key)
//} //}
func (x *LazyLRU[K, V]) Set(key K, value V) { func (x *LayLRU[K, V]) Set(key K, value V) {
x.lock.Lock() x.lock.Lock()
defer x.lock.Unlock() defer x.lock.Unlock()
x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
} }
func (x *LazyLRU[K, V]) SetHas(key K, value V) bool { func (x *LayLRU[K, V]) SetHas(key K, value V) bool {
x.lock.Lock() x.lock.Lock()
defer x.lock.Unlock() defer x.lock.Unlock()
if x.core.Contains(key) { if x.core.Contains(key) {
x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
return true return true
} }
return false return false
} }
func (x *LazyLRU[K, V]) Del(key K) bool { func (x *LayLRU[K, V]) Del(key K) bool {
x.lock.Lock() x.lock.Lock()
ok := x.core.Remove(key) ok := x.core.Remove(key)
x.lock.Unlock() x.lock.Unlock()
@@ -185,6 +179,6 @@ func (x *LazyLRU[K, V]) Del(key K) bool {
return ok return ok
} }
func (x *LazyLRU[K, V]) Stop() { func (x *LayLRU[K, V]) Stop() {
} }
+3 -3
View File
@@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct {
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var ( var (
slotKeys = make(map[uint64][]K) slotKeys = make(map[uint64][]K)
kVs = make(map[K]V) vs = make(map[K]V)
) )
for _, k := range keys { for _, k := range keys {
@@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
return nil, err return nil, err
} }
for key, value := range batches { for key, value := range batches {
kVs[key] = value vs[key] = value
} }
} }
return kVs, nil return vs, nil
} }
func (x *slotLRU[K, V]) getIndex(k K) uint64 { func (x *slotLRU[K, V]) getIndex(k K) uint64 {
+1 -1
View File
@@ -64,7 +64,7 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.
case false: case false:
log.ZDebug(ctx, "fullUserCache is false") log.ZDebug(ctx, "fullUserCache is false")
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
return lru.NewLazyLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
}) })
x.CurrentPhase.Store(DoSubscribeOver) x.CurrentPhase.Store(DoSubscribeOver)
x.Cond.Broadcast() x.Cond.Broadcast()