Compare commits

..

10 Commits

Author SHA1 Message Date
github-actions[bot] 1f2c120074 Update version to v3.8.3-patch.1 2025-02-25 11:19:47 +00:00
withchao 57710bb9e3 resolving merge conflicts 2025-02-25 18:16:57 +08:00
chao 0006bce0a2 fix: seq conversion not reading env in docker environment (#3130)
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash

* fix: fill send time

* fix: fill send time

* fix: crash caused by withdrawing messages from users who have left the group

* fix: user msg timestamp

* seq read config

* seq read config
2025-02-25 18:08:40 +08:00
chao 0bb6f610ef fix: the user sets the conversation timer cleanup timestamp unit incorrectly (#3102)
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash

* fix: fill send time

* fix: fill send time

* fix: crash caused by withdrawing messages from users who have left the group

* fix: user msg timestamp
2025-02-25 18:08:22 +08:00
chao 82d133588e fix: crash caused by withdrawing messages from users who have left the group (#3100)
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash

* fix: fill send time

* fix: fill send time

* fix: crash caused by withdrawing messages from users who have left the group
2025-02-25 18:07:46 +08:00
chao d70dc7b16b fix: the abnormal message has no sending time, causing the SDK to be abnormal (#3087)
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash

* fix: fill send time

* fix: fill send time
2025-02-25 18:07:26 +08:00
chao 9f796e78c1 fix: DeleteDoc crash (#3078)
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting

* fix: DeleteDoc crash
2025-02-25 18:05:55 +08:00
icey-yu efcd76318e fix: check error in BatchSetTokenMapByUidPid (#3076) 2025-02-25 18:05:43 +08:00
icey-yu d7af353e42 feat: add backup volume && optimize log print (#3066)
* feat: backup

* feat: backup

* feat: backup

* feat: optimize log print
2025-02-25 18:04:25 +08:00
chao ab1691865f fix: seq conversion failed without exiting (#3052)
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* fix

* fix

* optimize log output

* feat: support GetLastMessage

* feat: support GetLastMessage

* feat: s3 switch

* feat: s3 switch

* fix: GetUsersOnline

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: SendBusinessNotification supported configuration parameters

* feat: seq conversion failed without exiting
2025-02-25 18:03:35 +08:00
21 changed files with 261 additions and 102 deletions
+2
View File
@@ -17,6 +17,8 @@ OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.8.4
DATA_DIR=./
MONGO_BACKUP_DIR=${DATA_DIR}components/backup/mongo/
PROMETHEUS_PORT=19091
ALERTMANAGER_PORT=19093
GRAFANA_PORT=13000
+1
View File
@@ -37,6 +37,7 @@ services:
- "${DATA_DIR}/components/mongodb/data/db:/data/db"
- "${DATA_DIR}/components/mongodb/data/logs:/data/logs"
- "${DATA_DIR}/components/mongodb/data/conf:/etc/mongo"
- "${MONGO_BACKUP_DIR}:/data/backup"
environment:
- TZ=Asia/Shanghai
- wiredTigerCacheSizeGB=1
+1 -1
View File
@@ -13,7 +13,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.71
github.com/openimsdk/tools v0.0.50-alpha.67
github.com/openimsdk/tools v0.0.50-alpha.72
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
+2 -2
View File
@@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70=
github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.67 h1:K7kguqvPbjldHAi7pGhcG2ERkctCqG9ZFlteT7UKaxM=
github.com/openimsdk/tools v0.0.50-alpha.67/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY=
github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
+3 -5
View File
@@ -16,13 +16,11 @@ package fcm
import (
"context"
"errors"
"fmt"
"path/filepath"
"strings"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
"github.com/openimsdk/tools/utils/httputil"
"path/filepath"
"strings"
firebase "firebase.google.com/go/v4"
"firebase.google.com/go/v4/messaging"
@@ -135,7 +133,7 @@ func (f *Fcm) Push(ctx context.Context, userIDs []string, title, content string,
unreadCountSum, err := f.cache.GetUserBadgeUnreadCountSum(ctx, userID)
if err == nil && unreadCountSum != 0 {
apns.Payload.Aps.Badge = &unreadCountSum
} else if errors.Is(err, redis.Nil) || unreadCountSum == 0 {
} else if err == redis.Nil || unreadCountSum == 0 {
zero := 1
apns.Payload.Aps.Badge = &zero
} else {
+2 -36
View File
@@ -18,16 +18,6 @@ import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
)
var (
incOne = datautil.ToPtr("+1")
addNum = "1"
defaultStrategy = strategy{
Default: 1,
}
msgCategory = "CATEGORY_MESSAGE"
)
type Resp struct {
@@ -68,24 +58,7 @@ type TaskResp struct {
}
type Settings struct {
TTL *int64 `json:"ttl"`
Strategy strategy `json:"strategy"`
}
type strategy struct {
Default int64 `json:"default"`
//IOS int64 `json:"ios"`
//St int64 `json:"st"`
//Hw int64 `json:"hw"`
//Ho int64 `json:"ho"`
//XM int64 `json:"xm"`
//XMG int64 `json:"xmg"`
//VV int64 `json:"vv"`
//Op int64 `json:"op"`
//OpG int64 `json:"opg"`
//MZ int64 `json:"mz"`
//HosHw int64 `json:"hoshw"`
//WX int64 `json:"wx"`
TTL *int64 `json:"ttl"`
}
type Audience struct {
@@ -139,8 +112,6 @@ type Notification struct {
ChannelID string `json:"channelID"`
ChannelName string `json:"ChannelName"`
ClickType string `json:"click_type"`
BadgeAddNum string `json:"badge_add_num"`
Category string `json:"category"`
}
type Options struct {
@@ -149,7 +120,6 @@ type Options struct {
ChannelID string `json:"/message/android/notification/channel_id"`
Sound string `json:"/message/android/notification/sound"`
Importance string `json:"/message/android/notification/importance"`
Category string `json:"/message/android/category"`
} `json:"HW"`
XM struct {
ChannelID string `json:"/extra.channel_id"`
@@ -170,8 +140,6 @@ func newPushReq(pushConf *config.Push, title, content string) PushReq {
ClickType: "startapp",
ChannelID: pushConf.GeTui.ChannelID,
ChannelName: pushConf.GeTui.ChannelName,
BadgeAddNum: addNum,
Category: msgCategory,
}}}
return pushReq
}
@@ -188,7 +156,6 @@ func (pushReq *PushReq) setPushChannel(title string, body string) {
notify := "notify"
pushReq.PushChannel.Ios.NotificationType = &notify
pushReq.PushChannel.Ios.Aps.Sound = "default"
pushReq.PushChannel.Ios.AutoBadge = incOne
pushReq.PushChannel.Ios.Aps.Alert = Alert{
Title: title,
Body: body,
@@ -205,8 +172,7 @@ func (pushReq *PushReq) setPushChannel(title string, body string) {
ChannelID string `json:"/message/android/notification/channel_id"`
Sound string `json:"/message/android/notification/sound"`
Importance string `json:"/message/android/notification/importance"`
Category string `json:"/message/android/category"`
}{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL", Category: "IM"},
}{ChannelID: "RingRing4", Sound: "/raw/ring001", Importance: "NORMAL"},
XM: struct {
ChannelID string `json:"/extra.channel_id"`
}{ChannelID: "high_system"},
+3 -5
View File
@@ -18,7 +18,6 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"strconv"
"sync"
"time"
@@ -71,7 +70,7 @@ func NewClient(pushConf *config.Push, cache cache.ThirdCache) *Client {
func (g *Client) Push(ctx context.Context, userIDs []string, title, content string, opts *options.Opts) error {
token, err := g.cache.GetGetuiToken(ctx)
if err != nil {
if errors.Is(err, redis.Nil) {
if errs.Unwrap(err) == redis.Nil {
log.ZDebug(ctx, "getui token not exist in redis")
token, err = g.getTokenAndSave2Redis(ctx)
if err != nil {
@@ -145,7 +144,7 @@ func (g *Client) Auth(ctx context.Context, timeStamp int64) (token string, expir
func (g *Client) GetTaskID(ctx context.Context, token string, pushReq PushReq) (string, error) {
respTask := TaskResp{}
ttl := int64(1000 * 60 * 5)
pushReq.Settings = &Settings{TTL: &ttl, Strategy: defaultStrategy}
pushReq.Settings = &Settings{TTL: &ttl}
err := g.request(ctx, taskURL, pushReq, token, &respTask)
if err != nil {
return "", errs.Wrap(err)
@@ -189,7 +188,6 @@ func (g *Client) postReturn(
if err != nil {
return err
}
log.ZDebug(ctx, "postReturn", "url", url, "header", header, "input", input, "timeout", timeout, "output", output)
return output.parseError()
}
@@ -206,7 +204,7 @@ func (g *Client) getTokenAndSave2Redis(ctx context.Context) (token string, err e
}
func (g *Client) GetTaskIDAndSave2Redis(ctx context.Context, token string, pushReq PushReq) (taskID string, err error) {
pushReq.Settings = &Settings{TTL: &g.taskIDTTL, Strategy: defaultStrategy}
pushReq.Settings = &Settings{TTL: &g.taskIDTTL}
taskID, err = g.GetTaskID(ctx, token, pushReq)
if err != nil {
return
+14 -13
View File
@@ -3,11 +3,12 @@ package push
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/rand"
"strconv"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
@@ -152,24 +153,24 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
defer func(duration time.Time) {
t := time.Since(duration)
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "msg", msg.String(), "time cost", t)
log.ZInfo(ctx, "Get msg from msg_transfer And push msg end", "msg", msg.String(), "time cost", t)
}(time.Now())
if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
return err
}
log.ZInfo(ctx, "webhookBeforeOnlinePush end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs)
if err != nil {
return err
}
log.ZInfo(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
log.ZInfo(ctx, "single and notification push end")
if !c.shouldPushOffline(ctx, msg) {
return nil
}
log.ZInfo(ctx, "shouldPushOffline end")
log.ZInfo(ctx, "pushOffline start")
for _, v := range wsResults {
//message sender do not need offline push
@@ -188,14 +189,14 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserID, msg, &offlinePushUserID); err != nil {
return err
}
log.ZInfo(ctx, "webhookBeforeOfflinePush end")
if len(offlinePushUserID) > 0 {
needOfflinePushUserID = offlinePushUserID
}
err = c.offlinePushMsg(ctx, msg, needOfflinePushUserID)
if err != nil {
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg)
log.ZDebug(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg)
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID length", len(needOfflinePushUserID), "msg", msg)
return nil
}
@@ -250,26 +251,24 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
&pushToUserIDs); err != nil {
return err
}
log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end")
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
if err != nil {
return err
}
log.ZInfo(ctx, "groupMessagesHandler end")
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
return err
}
log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg)
log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg)
log.ZInfo(ctx, "online group push end")
if !c.shouldPushOffline(ctx, msg) {
return nil
}
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end")
//filter some user, like don not disturb or don't need offline push etc.
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
if err != nil {
@@ -297,9 +296,11 @@ func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushU
needOfflinePushUserIDs = offlinePushUserIDs
}
if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
log.ZError(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
log.ZDebug(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
needOfflinePushUserIDs, "msg", msg)
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
log.ZWarn(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs length",
len(needOfflinePushUserIDs), "msg", msg)
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
return
}
}
+3 -2
View File
@@ -16,10 +16,11 @@ package conversation
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -771,7 +772,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-conversation.MsgDestructTime)
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
if err != nil {
return nil, err
}
+6 -3
View File
@@ -17,9 +17,10 @@ package msg
import (
"context"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/constant"
@@ -79,8 +80,10 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
switch members[req.UserID].RoleLevel {
case constant.GroupOwner:
case constant.GroupAdmin:
if members[msgs[0].SendID].RoleLevel != constant.GroupOrdinaryUsers {
return nil, errs.ErrNoPermission.WrapMsg("no permission")
if sendMember, ok := members[msgs[0].SendID]; ok {
if sendMember.RoleLevel != constant.GroupOrdinaryUsers {
return nil, errs.ErrNoPermission.WrapMsg("no permission")
}
}
default:
return nil, errs.ErrNoPermission.WrapMsg("no permission")
+4 -2
View File
@@ -15,8 +15,10 @@
package config
const (
ConfKey = "conf"
ETCD = "etcd"
MountConfigFilePath = "CONFIG_PATH"
DeploymentType = "DEPLOYMENT_TYPE"
KUBERNETES = "kubernetes"
ETCD = "etcd"
)
const (
+30
View File
@@ -0,0 +1,30 @@
package config
import "strings"
var EnvPrefixMap map[string]string
func init() {
EnvPrefixMap = make(map[string]string)
fileNames := []string{
FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName,
KafkaConfigFileName, RedisConfigFileName,
MongodbConfigFileName, MinioConfigFileName, LogConfigFileName,
OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName,
OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName,
OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName,
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename,
}
for _, fileName := range fileNames {
envKey := strings.TrimSuffix(strings.TrimSuffix(fileName, ".yml"), ".yaml")
envKey = "IMENV_" + envKey
envKey = strings.ToUpper(strings.ReplaceAll(envKey, "-", "_"))
EnvPrefixMap[fileName] = envKey
}
}
const (
FlagConf = "config_folder_path"
FlagTransferIndex = "index"
)
+1 -3
View File
@@ -2,9 +2,7 @@ package redis
import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@@ -58,7 +56,7 @@ func callLua(ctx context.Context, rdb redis.Scripter, script *redis.Script, keys
}
}
v, err := r.Result()
if errors.Is(err, redis.Nil) {
if err == redis.Nil {
err = nil
}
return v, errs.WrapMsg(err, "call lua err", "scriptHash", script.Hash(), "keys", keys, "args", args)
+6 -7
View File
@@ -59,16 +59,15 @@ func (a *authDatabase) BatchSetTokenMapByUidPid(ctx context.Context, tokens []st
setMap := make(map[string]map[string]any)
for _, token := range tokens {
claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(a.accessSecret))
key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID)
if err != nil {
continue
}
key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID)
if v, ok := setMap[key]; ok {
v[token] = constant.KickedToken
} else {
if v, ok := setMap[key]; ok {
v[token] = constant.KickedToken
} else {
setMap[key] = map[string]any{
token: constant.KickedToken,
}
setMap[key] = map[string]any{
token: constant.KickedToken,
}
}
}
+4 -3
View File
@@ -18,11 +18,12 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/tools/utils/jsonutil"
"strconv"
"strings"
"time"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@@ -722,13 +723,13 @@ func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error
if index <= 0 {
return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID)
}
index, err := strconv.Atoi(docID[index+1:])
docIndex, err := strconv.Atoi(docID[index+1:])
if err != nil {
return errs.WrapMsg(err, "strconv.Atoi", "docID", docID)
}
conversationID := docID[:index]
seqs := make([]int64, db.msgTable.GetSingleGocMsgNum())
minSeq := db.msgTable.GetMinSeq(index)
minSeq := db.msgTable.GetMinSeq(docIndex)
for i := range seqs {
seqs[i] = minSeq + int64(i)
}
@@ -243,7 +243,14 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
"$add": []any{
bson.M{
"$toLong": "$latest_msg_destruct_time",
}, "$msg_destruct_time"},
},
bson.M{
"$multiply": []any{
"$msg_destruct_time",
1000, // convert to milliseconds
},
},
},
},
},
},
+129 -3
View File
@@ -1091,22 +1091,148 @@ func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []i
return msgDocModel[0].Msg, nil
}
//func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
// if len(seqs) == 0 {
// return nil, nil
// }
// result := make([]*model.MsgInfoModel, 0, len(seqs))
// for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
// res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
// if err != nil {
// return nil, err
// }
// for i, re := range res {
// if re == nil || re.Msg == nil {
// continue
// }
// result = append(result, res[i])
// }
// }
// return result, nil
//}
func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit int64) (int64, int64, error) {
if limit == 0 {
return 0, 0, nil
}
pipeline := []bson.M{
{
"$match": bson.M{
"doc_id": docID,
},
},
{
"$project": bson.M{
"_id": 0,
"doc_id": 0,
},
},
{
"$unwind": "$msgs",
},
{
"$project": bson.M{
//"_id": 0,
//"doc_id": 0,
"msgs.msg.send_time": 1,
"msgs.msg.seq": 1,
},
},
}
if limit > 0 {
pipeline = append(pipeline, bson.M{"$limit": limit})
}
type Result struct {
Msgs *model.MsgInfoModel `bson:"msgs"`
}
res, err := mongoutil.Aggregate[Result](ctx, m.coll, pipeline)
if err != nil {
return 0, 0, err
}
for i := len(res) - 1; i > 0; i-- {
v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
}
}
return 0, 0, nil
}
func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string, seq int64) (int64, int64, error) {
first := true
for i := m.model.GetDocIndex(seq); i >= 0; i-- {
limit := int64(-1)
if first {
first = false
limit = m.model.GetMsgIndex(seq)
}
docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
if err != nil {
return 0, 0, err
}
if msgSendTime > 0 {
return msgSeq, msgSendTime, nil
}
}
return 0, 0, nil
}
func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 {
return nil, nil
}
var abnormalSeq []int64
result := make([]*model.MsgInfoModel, 0, len(seqs))
for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
for docID, docSeqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(docSeqs, m.model.GetMsgIndex))
if err != nil {
return nil, err
}
if len(res) == 0 {
abnormalSeq = append(abnormalSeq, docSeqs...)
continue
}
for i, re := range res {
if re == nil || re.Msg == nil {
if re == nil || re.Msg == nil || re.Msg.SendTime == 0 {
abnormalSeq = append(abnormalSeq, docSeqs[i])
continue
}
result = append(result, res[i])
}
}
if len(abnormalSeq) > 0 {
datautil.Sort(abnormalSeq, false)
sendTime := make(map[int64]int64)
var (
lastSeq int64
lastSendTime int64
)
for _, seq := range abnormalSeq {
if lastSendTime > 0 && lastSeq <= seq {
sendTime[seq] = lastSendTime
continue
}
msgSeq, msgSendTime, err := m.findBeforeSendTime(ctx, conversationID, seq)
if err != nil {
return nil, err
}
if msgSendTime <= 0 {
break
}
sendTime[seq] = msgSendTime
lastSeq = msgSeq
lastSendTime = msgSendTime
}
for _, seq := range abnormalSeq {
result = append(result, &model.MsgInfoModel{
Msg: &model.MsgDataModel{
Seq: seq,
Status: constant.MsgStatusHasDeleted,
SendTime: sendTime[seq],
},
})
}
}
return result, nil
}
+10 -1
View File
@@ -15,9 +15,10 @@
package model
import (
"strconv"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"strconv"
)
const (
@@ -108,6 +109,10 @@ func (m *MsgDocModel) IsFull() bool {
return m.Msg[len(m.Msg)-1].Msg != nil
}
func (m *MsgDocModel) GetDocIndex(seq int64) int64 {
return (seq - 1) / singleGocMsgNum
}
func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
seqSuffix := (seq - 1) / singleGocMsgNum
return m.indexGen(conversationID, seqSuffix)
@@ -135,6 +140,10 @@ func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
}
func (*MsgDocModel) BuildDocIDByIndex(conversationID string, index int64) string {
return conversationID + ":" + strconv.FormatInt(index, 10)
}
func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
for _, v := range seqs {
msgModel := new(sdkws.MsgData)
+26 -13
View File
@@ -5,16 +5,6 @@ import (
"context"
"errors"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"gopkg.in/yaml.v3"
"os"
"os/signal"
"path/filepath"
@@ -24,6 +14,19 @@ import (
"sync/atomic"
"syscall"
"time"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/redis/go-redis/v9"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
@@ -41,12 +44,22 @@ const (
)
func readConfig[T any](dir string, name string) (*T, error) {
data, err := os.ReadFile(filepath.Join(dir, name))
if err != nil {
if runtimeenv.PrintRuntimeEnvironment() == config.KUBERNETES {
dir = os.Getenv(config.MountConfigFilePath)
}
v := viper.New()
v.SetEnvPrefix(config.EnvPrefixMap[name])
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.SetConfigFile(filepath.Join(dir, name))
if err := v.ReadInConfig(); err != nil {
return nil, err
}
fn := func(config *mapstructure.DecoderConfig) {
config.TagName = "mapstructure"
}
var conf T
if err := yaml.Unmarshal(data, &conf); err != nil {
if err := v.Unmarshal(&conf, fn); err != nil {
return nil, err
}
return &conf, nil
+5 -1
View File
@@ -3,8 +3,10 @@ package main
import (
"flag"
"fmt"
"github.com/openimsdk/open-im-server/v3/tools/seq/internal"
"os"
"time"
"github.com/openimsdk/open-im-server/v3/tools/seq/internal"
)
func main() {
@@ -17,6 +19,8 @@ func main() {
flag.Parse()
if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
fmt.Println("seq task", err)
os.Exit(1)
return
}
fmt.Println("seq task success!")
}
+1 -1
View File
@@ -1 +1 @@
v3.8.3
v3.8.3-patch.1