Merge remote-tracking branch 'origin/main'

This commit is contained in:
withchao
2024-12-17 11:59:19 +08:00
203 changed files with 2676 additions and 7931 deletions
+5 -5
View File
@@ -26,7 +26,7 @@ import (
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
@@ -41,9 +41,9 @@ import (
)
type Config struct {
API config.API
Share config.Share
Discovery config.Discovery
API conf.API
Share conf.Share
Discovery conf.Discovery
RuntimeEnv string
}
@@ -86,7 +86,7 @@ func Start(ctx context.Context, index int, config *Config) error {
return listener, port, nil
}
if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != kdisc.Etcd {
if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}
+2
View File
@@ -173,6 +173,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
data = apistruct.AtElem{}
case constant.Custom:
data = apistruct.CustomElem{}
case constant.Quote:
data = apistruct.QuoteElem{}
case constant.Stream:
data = apistruct.StreamMsgElem{}
case constant.OANotification:
+5 -4
View File
@@ -2,8 +2,10 @@ package api
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/discovery"
@@ -11,7 +13,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
clientv3 "go.etcd.io/etcd/client/v3"
"net/http"
)
type PrometheusDiscoveryApi struct {
@@ -23,14 +24,14 @@ func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegi
api := &PrometheusDiscoveryApi{
config: config,
}
if config.Discovery.Enable == discoveryregister.Etcd {
if config.Discovery.Enable == conf.ETCD {
api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
return api
}
func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) {
if p.config.Discovery.Enable != discoveryregister.Etcd {
if p.config.Discovery.Enable != conf.ETCD {
c.JSON(http.StatusOK, []struct{}{})
c.Abort()
}
+1 -2
View File
@@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/openimsdk/tools/mw"
"runtime/debug"
"sync"
"sync/atomic"
@@ -378,7 +377,7 @@ func (c *Client) activeHeartbeat(ctx context.Context) {
go func() {
defer func() {
if r := recover(); r != nil {
mw.PanicStackToLog(ctx, r)
log.ZPanic(ctx, "activeHeartbeat Panic", r)
}
}()
log.ZDebug(ctx, "server initiative send heartbeat start.")
+5 -4
View File
@@ -3,15 +3,16 @@ package msggateway
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/mcontext"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/mcontext"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
+10 -10
View File
@@ -37,7 +37,7 @@ import (
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
@@ -64,13 +64,13 @@ type MsgTransfer struct {
}
type Config struct {
MsgTransfer config.MsgTransfer
RedisConfig config.Redis
MongodbConfig config.Mongo
KafkaConfig config.Kafka
Share config.Share
WebhooksConfig config.Webhooks
Discovery config.Discovery
MsgTransfer conf.MsgTransfer
RedisConfig conf.Redis
MongodbConfig conf.Mongo
KafkaConfig conf.Kafka
Share conf.Share
WebhooksConfig conf.Webhooks
Discovery conf.Discovery
}
func Start(ctx context.Context, index int, config *Config) error {
@@ -168,7 +168,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
return listener, port, nil
}
if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != kdisc.Etcd {
if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD {
return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap()
}
@@ -204,7 +204,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
go func() {
defer func() {
if r := recover(); r != nil {
mw.PanicStackToLog(m.ctx, r)
log.ZPanic(m.ctx, "MsgTransfer Start Panic", r)
}
}()
if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
@@ -18,13 +18,13 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/mw"
"strconv"
"strings"
"sync"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -349,7 +349,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
mw.PanicStackToLog(ctx, r)
log.ZPanic(ctx, "HandleUserHasReadSeqMessages Panic", r)
}
}()
+13 -7
View File
@@ -2,14 +2,19 @@ package push
import (
"context"
"errors"
"sync"
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"sync"
conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
type OnlinePusher interface {
@@ -37,15 +42,16 @@ func (u emptyOnlinePusher) GetOnlinePushFailedUserIDs(ctx context.Context, msg *
}
func NewOnlinePusher(disCov discovery.SvcDiscoveryRegistry, config *Config) OnlinePusher {
switch config.Discovery.Enable {
case "k8s":
return NewK8sStaticConsistentHash(disCov, config)
case "zookeeper":
if config.runTimeEnv == conf.KUBERNETES {
return NewDefaultAllNode(disCov, config)
case "etcd":
}
switch config.Discovery.Enable {
case conf.ETCD:
return NewDefaultAllNode(disCov, config)
default:
return newEmptyOnlinePusher()
log.ZError(context.Background(), "NewOnlinePusher is error", errs.Wrap(errors.New("unsupported discovery type")), "type", config.Discovery.Enable)
return nil
}
}
+5
View File
@@ -10,6 +10,7 @@ import (
pbpush "github.com/openimsdk/protocol/push"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
)
@@ -32,6 +33,8 @@ type Config struct {
LocalCacheConfig config.LocalCache
Discovery config.Discovery
FcmConfigPath string
runTimeEnv string
}
func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
@@ -48,6 +51,8 @@ func (p pushServer) DelUserPushToken(ctx context.Context,
}
func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()
rdb, err := redisutil.NewRedisClient(ctx, config.RedisConfig.Build())
if err != nil {
return err
+1 -1
View File
@@ -130,7 +130,7 @@ func (s *authServer) GetUserToken(ctx context.Context, req *pbauth.GetUserTokenR
func (s *authServer) parseToken(ctx context.Context, tokensString string) (claims *tokenverify.Claims, err error) {
claims, err = tokenverify.GetClaimFromToken(tokensString, authverify.Secret(s.config.Share.Secret))
if err != nil {
return nil, errs.Wrap(err)
return nil, err
}
isAdmin := authverify.IsManagerUserID(claims.UserID, s.config.Share.IMAdminUserID)
if isAdmin {
+1 -2
View File
@@ -16,7 +16,6 @@ package msg
import (
"context"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@@ -89,7 +88,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
defer func() {
if r := recover(); r != nil {
mw.PanicStackToLog(nctx, r)
log.ZPanic(nctx, "setConversationAtInfo Panic", r)
}
}()
+3 -1
View File
@@ -92,7 +92,7 @@ func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq
NotificationMsgs: make(map[string]*sdkws.PullMsgs),
}
for _, conv := range req.Conversations {
_, _, msgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, req.UserID, conv.ConversationID, conv.Seqs)
isEnd, endSeq, msgs, err := m.MsgDatabase.GetMessagesBySeqWithBounds(ctx, req.UserID, conv.ConversationID, conv.Seqs, req.GetOrder())
if err != nil {
return nil, err
}
@@ -111,6 +111,8 @@ func (m *msgServer) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq
}
}
pullMsgs.Msgs = append(pullMsgs.Msgs, msgs...)
pullMsgs.IsEnd = isEnd
pullMsgs.EndSeq = endSeq
}
return resp, nil
}
+8 -8
View File
@@ -50,14 +50,14 @@ func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq)
platform := constant.PlatformID2Name[int(req.Platform)]
for _, fileURL := range req.FileURLs {
log := relationtb.Log{
Platform: platform,
UserID: userID,
CreateTime: time.Now(),
Url: fileURL.URL,
FileName: fileURL.Filename,
SystemType: req.AppFramework,
Version: req.Version,
Ex: req.Ex,
Platform: platform,
UserID: userID,
CreateTime: time.Now(),
Url: fileURL.URL,
FileName: fileURL.Filename,
AppFramework: req.AppFramework,
Version: req.Version,
Ex: req.Ex,
}
for i := 0; i < 20; i++ {
id := genLogID()