mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-28 14:29:19 +08:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1f2c120074 | |||
| 57710bb9e3 | |||
| 0006bce0a2 | |||
| 0bb6f610ef | |||
| 82d133588e | |||
| d70dc7b16b | |||
| 9f796e78c1 | |||
| efcd76318e | |||
| d7af353e42 | |||
| ab1691865f |
@@ -17,6 +17,8 @@ OPENIM_ADMIN_FRONT_IMAGE=openim/openim-admin-front:release-v1.8.4
|
||||
|
||||
DATA_DIR=./
|
||||
|
||||
MONGO_BACKUP_DIR=${DATA_DIR}components/backup/mongo/
|
||||
|
||||
PROMETHEUS_PORT=19091
|
||||
ALERTMANAGER_PORT=19093
|
||||
GRAFANA_PORT=13000
|
||||
|
||||
@@ -37,6 +37,7 @@ services:
|
||||
- "${DATA_DIR}/components/mongodb/data/db:/data/db"
|
||||
- "${DATA_DIR}/components/mongodb/data/logs:/data/logs"
|
||||
- "${DATA_DIR}/components/mongodb/data/conf:/etc/mongo"
|
||||
- "${MONGO_BACKUP_DIR}:/data/backup"
|
||||
environment:
|
||||
- TZ=Asia/Shanghai
|
||||
- wiredTigerCacheSizeGB=1
|
||||
|
||||
@@ -13,7 +13,7 @@ require (
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.71
|
||||
github.com/openimsdk/tools v0.0.50-alpha.67
|
||||
github.com/openimsdk/tools v0.0.50-alpha.72
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
|
||||
@@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.71 h1:R3utzOlqepaJWTAmnfJi4ccUM/XIoFasSyjQMOipM70=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.71/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.67 h1:K7kguqvPbjldHAi7pGhcG2ERkctCqG9ZFlteT7UKaxM=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.67/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.72 h1:d/vaZjIfvrNp3EeRJEIiamBO7HiPx6CP4wiuq8NpfzY=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.72/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
|
||||
|
||||
@@ -3,11 +3,12 @@ package push
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
|
||||
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
|
||||
@@ -152,24 +153,24 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
|
||||
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "userIDs", userIDs, "msg", msg.String())
|
||||
defer func(duration time.Time) {
|
||||
t := time.Since(duration)
|
||||
log.ZInfo(ctx, "Get msg from msg_transfer And push msg", "msg", msg.String(), "time cost", t)
|
||||
log.ZInfo(ctx, "Get msg from msg_transfer And push msg end", "msg", msg.String(), "time cost", t)
|
||||
}(time.Now())
|
||||
if err := c.webhookBeforeOnlinePush(ctx, &c.config.WebhooksConfig.BeforeOnlinePush, userIDs, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(ctx, "webhookBeforeOnlinePush end")
|
||||
|
||||
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, userIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.ZInfo(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
|
||||
log.ZDebug(ctx, "single and notification push result", "result", wsResults, "msg", msg, "push_to_userID", userIDs)
|
||||
log.ZInfo(ctx, "single and notification push end")
|
||||
|
||||
if !c.shouldPushOffline(ctx, msg) {
|
||||
return nil
|
||||
}
|
||||
log.ZInfo(ctx, "shouldPushOffline end")
|
||||
log.ZInfo(ctx, "pushOffline start")
|
||||
|
||||
for _, v := range wsResults {
|
||||
//message sender do not need offline push
|
||||
@@ -188,14 +189,14 @@ func (c *ConsumerHandler) Push2User(ctx context.Context, userIDs []string, msg *
|
||||
if err = c.webhookBeforeOfflinePush(ctx, &c.config.WebhooksConfig.BeforeOfflinePush, needOfflinePushUserID, msg, &offlinePushUserID); err != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(ctx, "webhookBeforeOfflinePush end")
|
||||
|
||||
if len(offlinePushUserID) > 0 {
|
||||
needOfflinePushUserID = offlinePushUserID
|
||||
}
|
||||
err = c.offlinePushMsg(ctx, msg, needOfflinePushUserID)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg)
|
||||
log.ZDebug(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID", needOfflinePushUserID, "msg", msg)
|
||||
log.ZWarn(ctx, "offlinePushMsg failed", err, "needOfflinePushUserID length", len(needOfflinePushUserID), "msg", msg)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -250,26 +251,24 @@ func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *s
|
||||
&pushToUserIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(ctx, "webhookBeforeGroupOnlinePush end")
|
||||
|
||||
err = c.groupMessagesHandler(ctx, groupID, &pushToUserIDs, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(ctx, "groupMessagesHandler end")
|
||||
|
||||
wsResults, err := c.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.ZInfo(ctx, "group push result", "result", wsResults, "msg", msg)
|
||||
log.ZDebug(ctx, "group push result", "result", wsResults, "msg", msg)
|
||||
log.ZInfo(ctx, "online group push end")
|
||||
|
||||
if !c.shouldPushOffline(ctx, msg) {
|
||||
return nil
|
||||
}
|
||||
needOfflinePushUserIDs := c.onlinePusher.GetOnlinePushFailedUserIDs(ctx, msg, wsResults, &pushToUserIDs)
|
||||
log.ZInfo(ctx, "GetOnlinePushFailedUserIDs end")
|
||||
//filter some user, like don not disturb or don't need offline push etc.
|
||||
needOfflinePushUserIDs, err = c.filterGroupMessageOfflinePush(ctx, groupID, msg, needOfflinePushUserIDs)
|
||||
if err != nil {
|
||||
@@ -297,9 +296,11 @@ func (c *ConsumerHandler) asyncOfflinePush(ctx context.Context, needOfflinePushU
|
||||
needOfflinePushUserIDs = offlinePushUserIDs
|
||||
}
|
||||
if err := c.pushDatabase.MsgToOfflinePushMQ(ctx, conversationutil.GenConversationUniqueKeyForSingle(msg.SendID, msg.RecvID), needOfflinePushUserIDs, msg); err != nil {
|
||||
log.ZError(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
|
||||
log.ZDebug(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs",
|
||||
needOfflinePushUserIDs, "msg", msg)
|
||||
prommetrics.SingleChatMsgProcessFailedCounter.Inc()
|
||||
log.ZWarn(ctx, "Msg To OfflinePush MQ error", err, "needOfflinePushUserIDs length",
|
||||
len(needOfflinePushUserIDs), "msg", msg)
|
||||
prommetrics.GroupChatMsgProcessFailedCounter.Inc()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,10 +16,11 @@ package conversation
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
@@ -771,7 +772,7 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
|
||||
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
|
||||
continue
|
||||
}
|
||||
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-conversation.MsgDestructTime)
|
||||
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-(conversation.MsgDestructTime*1000))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -17,9 +17,10 @@ package msg
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
@@ -79,8 +80,10 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
|
||||
switch members[req.UserID].RoleLevel {
|
||||
case constant.GroupOwner:
|
||||
case constant.GroupAdmin:
|
||||
if members[msgs[0].SendID].RoleLevel != constant.GroupOrdinaryUsers {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("no permission")
|
||||
if sendMember, ok := members[msgs[0].SendID]; ok {
|
||||
if sendMember.RoleLevel != constant.GroupOrdinaryUsers {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("no permission")
|
||||
}
|
||||
}
|
||||
default:
|
||||
return nil, errs.ErrNoPermission.WrapMsg("no permission")
|
||||
|
||||
@@ -15,8 +15,10 @@
|
||||
package config
|
||||
|
||||
const (
|
||||
ConfKey = "conf"
|
||||
ETCD = "etcd"
|
||||
MountConfigFilePath = "CONFIG_PATH"
|
||||
DeploymentType = "DEPLOYMENT_TYPE"
|
||||
KUBERNETES = "kubernetes"
|
||||
ETCD = "etcd"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package config
|
||||
|
||||
import "strings"
|
||||
|
||||
var EnvPrefixMap map[string]string
|
||||
|
||||
func init() {
|
||||
EnvPrefixMap = make(map[string]string)
|
||||
fileNames := []string{
|
||||
FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName,
|
||||
KafkaConfigFileName, RedisConfigFileName,
|
||||
MongodbConfigFileName, MinioConfigFileName, LogConfigFileName,
|
||||
OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName,
|
||||
OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName,
|
||||
OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName,
|
||||
OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename,
|
||||
}
|
||||
|
||||
for _, fileName := range fileNames {
|
||||
envKey := strings.TrimSuffix(strings.TrimSuffix(fileName, ".yml"), ".yaml")
|
||||
envKey = "IMENV_" + envKey
|
||||
envKey = strings.ToUpper(strings.ReplaceAll(envKey, "-", "_"))
|
||||
EnvPrefixMap[fileName] = envKey
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
FlagConf = "config_folder_path"
|
||||
FlagTransferIndex = "index"
|
||||
)
|
||||
@@ -59,16 +59,15 @@ func (a *authDatabase) BatchSetTokenMapByUidPid(ctx context.Context, tokens []st
|
||||
setMap := make(map[string]map[string]any)
|
||||
for _, token := range tokens {
|
||||
claims, err := tokenverify.GetClaimFromToken(token, authverify.Secret(a.accessSecret))
|
||||
key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
key := cachekey.GetTokenKey(claims.UserID, claims.PlatformID)
|
||||
if v, ok := setMap[key]; ok {
|
||||
v[token] = constant.KickedToken
|
||||
} else {
|
||||
if v, ok := setMap[key]; ok {
|
||||
v[token] = constant.KickedToken
|
||||
} else {
|
||||
setMap[key] = map[string]any{
|
||||
token: constant.KickedToken,
|
||||
}
|
||||
setMap[key] = map[string]any{
|
||||
token: constant.KickedToken,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,11 +18,12 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/openimsdk/tools/utils/jsonutil"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/tools/utils/jsonutil"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
|
||||
@@ -722,13 +723,13 @@ func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error
|
||||
if index <= 0 {
|
||||
return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID)
|
||||
}
|
||||
index, err := strconv.Atoi(docID[index+1:])
|
||||
docIndex, err := strconv.Atoi(docID[index+1:])
|
||||
if err != nil {
|
||||
return errs.WrapMsg(err, "strconv.Atoi", "docID", docID)
|
||||
}
|
||||
conversationID := docID[:index]
|
||||
seqs := make([]int64, db.msgTable.GetSingleGocMsgNum())
|
||||
minSeq := db.msgTable.GetMinSeq(index)
|
||||
minSeq := db.msgTable.GetMinSeq(docIndex)
|
||||
for i := range seqs {
|
||||
seqs[i] = minSeq + int64(i)
|
||||
}
|
||||
|
||||
@@ -243,7 +243,14 @@ func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, li
|
||||
"$add": []any{
|
||||
bson.M{
|
||||
"$toLong": "$latest_msg_destruct_time",
|
||||
}, "$msg_destruct_time"},
|
||||
},
|
||||
bson.M{
|
||||
"$multiply": []any{
|
||||
"$msg_destruct_time",
|
||||
1000, // convert to milliseconds
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -1091,22 +1091,148 @@ func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []i
|
||||
return msgDocModel[0].Msg, nil
|
||||
}
|
||||
|
||||
//func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
|
||||
// if len(seqs) == 0 {
|
||||
// return nil, nil
|
||||
// }
|
||||
// result := make([]*model.MsgInfoModel, 0, len(seqs))
|
||||
// for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
// res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// for i, re := range res {
|
||||
// if re == nil || re.Msg == nil {
|
||||
// continue
|
||||
// }
|
||||
// result = append(result, res[i])
|
||||
// }
|
||||
// }
|
||||
// return result, nil
|
||||
//}
|
||||
|
||||
func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit int64) (int64, int64, error) {
|
||||
if limit == 0 {
|
||||
return 0, 0, nil
|
||||
}
|
||||
pipeline := []bson.M{
|
||||
{
|
||||
"$match": bson.M{
|
||||
"doc_id": docID,
|
||||
},
|
||||
},
|
||||
{
|
||||
"$project": bson.M{
|
||||
"_id": 0,
|
||||
"doc_id": 0,
|
||||
},
|
||||
},
|
||||
{
|
||||
"$unwind": "$msgs",
|
||||
},
|
||||
{
|
||||
"$project": bson.M{
|
||||
//"_id": 0,
|
||||
//"doc_id": 0,
|
||||
"msgs.msg.send_time": 1,
|
||||
"msgs.msg.seq": 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
if limit > 0 {
|
||||
pipeline = append(pipeline, bson.M{"$limit": limit})
|
||||
}
|
||||
type Result struct {
|
||||
Msgs *model.MsgInfoModel `bson:"msgs"`
|
||||
}
|
||||
res, err := mongoutil.Aggregate[Result](ctx, m.coll, pipeline)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
for i := len(res) - 1; i > 0; i-- {
|
||||
v := res[i]
|
||||
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
|
||||
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
|
||||
}
|
||||
}
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string, seq int64) (int64, int64, error) {
|
||||
first := true
|
||||
for i := m.model.GetDocIndex(seq); i >= 0; i-- {
|
||||
limit := int64(-1)
|
||||
if first {
|
||||
first = false
|
||||
limit = m.model.GetMsgIndex(seq)
|
||||
}
|
||||
docID := m.model.BuildDocIDByIndex(conversationID, i)
|
||||
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
if msgSendTime > 0 {
|
||||
return msgSeq, msgSendTime, nil
|
||||
}
|
||||
}
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
|
||||
if len(seqs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var abnormalSeq []int64
|
||||
result := make([]*model.MsgInfoModel, 0, len(seqs))
|
||||
for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex))
|
||||
for docID, docSeqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) {
|
||||
res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(docSeqs, m.model.GetMsgIndex))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(res) == 0 {
|
||||
abnormalSeq = append(abnormalSeq, docSeqs...)
|
||||
continue
|
||||
}
|
||||
for i, re := range res {
|
||||
if re == nil || re.Msg == nil {
|
||||
if re == nil || re.Msg == nil || re.Msg.SendTime == 0 {
|
||||
abnormalSeq = append(abnormalSeq, docSeqs[i])
|
||||
continue
|
||||
}
|
||||
result = append(result, res[i])
|
||||
}
|
||||
}
|
||||
if len(abnormalSeq) > 0 {
|
||||
datautil.Sort(abnormalSeq, false)
|
||||
sendTime := make(map[int64]int64)
|
||||
var (
|
||||
lastSeq int64
|
||||
lastSendTime int64
|
||||
)
|
||||
for _, seq := range abnormalSeq {
|
||||
if lastSendTime > 0 && lastSeq <= seq {
|
||||
sendTime[seq] = lastSendTime
|
||||
continue
|
||||
}
|
||||
msgSeq, msgSendTime, err := m.findBeforeSendTime(ctx, conversationID, seq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if msgSendTime <= 0 {
|
||||
break
|
||||
}
|
||||
sendTime[seq] = msgSendTime
|
||||
lastSeq = msgSeq
|
||||
lastSendTime = msgSendTime
|
||||
}
|
||||
for _, seq := range abnormalSeq {
|
||||
result = append(result, &model.MsgInfoModel{
|
||||
Msg: &model.MsgDataModel{
|
||||
Seq: seq,
|
||||
Status: constant.MsgStatusHasDeleted,
|
||||
SendTime: sendTime[seq],
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -15,9 +15,10 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -108,6 +109,10 @@ func (m *MsgDocModel) IsFull() bool {
|
||||
return m.Msg[len(m.Msg)-1].Msg != nil
|
||||
}
|
||||
|
||||
func (m *MsgDocModel) GetDocIndex(seq int64) int64 {
|
||||
return (seq - 1) / singleGocMsgNum
|
||||
}
|
||||
|
||||
func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
||||
seqSuffix := (seq - 1) / singleGocMsgNum
|
||||
return m.indexGen(conversationID, seqSuffix)
|
||||
@@ -135,6 +140,10 @@ func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||
}
|
||||
|
||||
func (*MsgDocModel) BuildDocIDByIndex(conversationID string, index int64) string {
|
||||
return conversationID + ":" + strconv.FormatInt(index, 10)
|
||||
}
|
||||
|
||||
func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) {
|
||||
for _, v := range seqs {
|
||||
msgModel := new(sdkws.MsgData)
|
||||
|
||||
+26
-13
@@ -5,16 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/db/redisutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"gopkg.in/yaml.v3"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
@@ -24,6 +14,19 @@ import (
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
|
||||
"github.com/openimsdk/tools/db/mongoutil"
|
||||
"github.com/openimsdk/tools/db/redisutil"
|
||||
"github.com/openimsdk/tools/utils/runtimeenv"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/spf13/viper"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -41,12 +44,22 @@ const (
|
||||
)
|
||||
|
||||
func readConfig[T any](dir string, name string) (*T, error) {
|
||||
data, err := os.ReadFile(filepath.Join(dir, name))
|
||||
if err != nil {
|
||||
if runtimeenv.PrintRuntimeEnvironment() == config.KUBERNETES {
|
||||
dir = os.Getenv(config.MountConfigFilePath)
|
||||
}
|
||||
v := viper.New()
|
||||
v.SetEnvPrefix(config.EnvPrefixMap[name])
|
||||
v.AutomaticEnv()
|
||||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||
v.SetConfigFile(filepath.Join(dir, name))
|
||||
if err := v.ReadInConfig(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fn := func(config *mapstructure.DecoderConfig) {
|
||||
config.TagName = "mapstructure"
|
||||
}
|
||||
var conf T
|
||||
if err := yaml.Unmarshal(data, &conf); err != nil {
|
||||
if err := v.Unmarshal(&conf, fn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &conf, nil
|
||||
|
||||
+5
-1
@@ -3,8 +3,10 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/seq/internal"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/tools/seq/internal"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -17,6 +19,8 @@ func main() {
|
||||
flag.Parse()
|
||||
if err := internal.Main(config, time.Duration(second)*time.Second); err != nil {
|
||||
fmt.Println("seq task", err)
|
||||
os.Exit(1)
|
||||
return
|
||||
}
|
||||
fmt.Println("seq task success!")
|
||||
}
|
||||
|
||||
+1
-1
@@ -1 +1 @@
|
||||
v3.8.3
|
||||
v3.8.3-patch.1
|
||||
|
||||
Reference in New Issue
Block a user