Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao
2023-03-09 14:42:21 +08:00
81 changed files with 769 additions and 2001 deletions
+5 -5
View File
@@ -2,6 +2,7 @@ package cmd
import (
"OpenIM/internal/msggateway"
//"OpenIM/internal/msggateway"
"OpenIM/pkg/common/constant"
"github.com/spf13/cobra"
)
@@ -23,14 +24,13 @@ func (m *MsgGatewayCmd) getWsPortFlag(cmd *cobra.Command) int {
return port
}
func (m *MsgGatewayCmd) addRun() {
m.Command.Run = func(cmd *cobra.Command, args []string) {
msggateway.Init(m.getPortFlag(cmd), m.getWsPortFlag(cmd))
msggateway.Run(m.getPrometheusPortFlag(cmd))
func (m *MsgGatewayCmd) addRunE() {
m.Command.RunE = func(cmd *cobra.Command, args []string) error {
return msggateway.RunWsAndServer(m.getPortFlag(cmd), m.getWsPortFlag(cmd), m.getPrometheusPortFlag(cmd))
}
}
func (m *MsgGatewayCmd) Exec() error {
m.addRun()
m.addRunE()
return m.Execute()
}
+4 -4
View File
@@ -8,15 +8,15 @@ import (
)
type PushCmd struct {
*RpcCmd
*AuthCmd
}
func NewPushCmd() *PushCmd {
return &PushCmd{NewRpcCmd(config.Config.RpcRegisterName.OpenImPushName)}
return &PushCmd{NewAuthCmd()}
}
func (r *RpcCmd) AddPush() {
func (r *PushCmd) AddPush() {
r.Command.RunE = func(cmd *cobra.Command, args []string) error {
return startrpc.Start(r.getPortFlag(cmd), r.rpcRegisterName, r.getPrometheusPortFlag(cmd), push.Start)
return startrpc.Start(r.getPortFlag(cmd), config.Config.RpcRegisterName.OpenImPushName, r.getPrometheusPortFlag(cmd), push.Start)
}
}
+12 -2
View File
@@ -7,7 +7,9 @@ import (
)
type RootCmd struct {
Command cobra.Command
Command cobra.Command
port int
prometheusPort int
}
func NewRootCmd() (rootCmd *RootCmd) {
@@ -50,8 +52,12 @@ func (r *RootCmd) getPortFlag(cmd *cobra.Command) int {
return port
}
func (r *RootCmd) GetPortFlag() int {
return r.port
}
func (r *RootCmd) AddPrometheusPortFlag() {
r.Command.Flags().StringP(constant.FlagPrometheusPort, "pp", "", "server prometheus listen port")
r.Command.Flags().String(constant.FlagPrometheusPort, "", "server prometheus listen port")
}
func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int {
@@ -59,6 +65,10 @@ func (r *RootCmd) getPrometheusPortFlag(cmd *cobra.Command) int {
return port
}
func (r *RootCmd) GetPrometheusPortFlag() int {
return r.prometheusPort
}
func (r *RootCmd) getConfFromCmdAndInit(cmdLines *cobra.Command) error {
configFolderPath, _ := cmdLines.Flags().GetString(constant.FlagConf)
return config.InitConfig(configFolderPath)
+11 -10
View File
@@ -7,22 +7,23 @@ import (
"google.golang.org/grpc"
)
type RpcCmd struct {
type AuthCmd struct {
*RootCmd
rpcRegisterName string
}
func NewRpcCmd(rpcRegisterName string) *RpcCmd {
return &RpcCmd{NewRootCmd(), rpcRegisterName}
func NewAuthCmd() *AuthCmd {
authCmd := &AuthCmd{NewRootCmd()}
return authCmd
}
func (r *RpcCmd) AddRpc(rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) {
r.Command.RunE = func(cmd *cobra.Command, args []string) error {
return startrpc.Start(r.getPortFlag(cmd), r.rpcRegisterName, r.getPrometheusPortFlag(cmd), rpcFn)
func (a *AuthCmd) Exec() error {
a.Command.Run = func(cmd *cobra.Command, args []string) {
a.port = a.getPortFlag(cmd)
a.prometheusPort = a.getPrometheusPortFlag(cmd)
}
return a.Execute()
}
func (r *RpcCmd) Exec(rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
r.AddRpc(rpcFn)
return r.Execute()
func (a *AuthCmd) StartSvr(name string, rpcFn func(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error) error {
return startrpc.Start(a.GetPortFlag(), name, a.GetPrometheusPortFlag(), rpcFn)
}
+4 -3
View File
@@ -176,7 +176,7 @@ type config struct {
ZkAddr []string `yaml:"zkAddr"`
UserName string `yaml:"userName"`
Password string `yaml:"password"`
}
} `yaml:"zookeeper"`
Log struct {
StorageLocation string `yaml:"storageLocation"`
RotationTime int `yaml:"rotationTime"`
@@ -515,9 +515,10 @@ func (c *config) initConfig(config interface{}, configName, configFolderPath str
return err
}
configPath = filepath.Join(Root, "config", configName)
fmt.Println(configPath, "not exist, use", configPath)
fmt.Println("use", configPath)
} else {
Root = filepath.Dir(configPath)
}
Root = filepath.Dir(configPath)
return c.unmarshalConfig(config, configPath)
}
-10
View File
@@ -1,16 +1,6 @@
package constant
const (
//Websocket Protocol
WSGetNewestSeq = 1001
WSPullMsgBySeqList = 1002
WSSendMsg = 1003
WSSendSignalMsg = 1004
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WsLogoutMsg = 2003
WsSetBackgroundStatus = 2004
WSDataError = 3001
///ContentType
//UserRelated
+1
View File
@@ -5,6 +5,7 @@ import (
"context"
)
// for mongoDB
type ExtendMsgDatabase interface {
CreateExtendMsgSet(ctx context.Context, set *unRelationTb.ExtendMsgSetModel) error
GetAllExtendMsgSet(ctx context.Context, ID string, opts *unRelationTb.GetAllExtendMsgSetOpts) (sets []*unRelationTb.ExtendMsgSetModel, err error)
+4 -4
View File
@@ -76,6 +76,9 @@ func NewGroupDatabase(
}
func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.Database) GroupDatabase {
rcOptions := rockscache.NewDefaultOptions()
rcOptions.StrongConsistency = true
rcOptions.RandomExpireAdjustment = 0.2
return NewGroupDatabase(
relation.NewGroupDB(db),
relation.NewGroupMemberDB(db),
@@ -83,10 +86,7 @@ func InitGroupDatabase(db *gorm.DB, rdb redis.UniversalClient, database *mongo.D
tx.NewGorm(db),
tx.NewMongo(database.Client()),
unrelation.NewSuperGroupMongoDriver(database),
cache.NewGroupCacheRedis(rdb, relation.NewGroupDB(db), relation.NewGroupMemberDB(db), relation.NewGroupRequest(db), unrelation.NewSuperGroupMongoDriver(database), rockscache.Options{
StrongConsistency: true,
RandomExpireAdjustment: 2.0,
}),
cache.NewGroupCacheRedis(rdb, relation.NewGroupDB(db), relation.NewGroupMemberDB(db), relation.NewGroupRequest(db), unrelation.NewSuperGroupMongoDriver(database), rcOptions),
)
}
+9 -4
View File
@@ -1,10 +1,12 @@
package controller
import (
"OpenIM/pkg/common/config"
"OpenIM/pkg/common/constant"
"OpenIM/pkg/common/db/cache"
unRelationTb "OpenIM/pkg/common/db/table/unrelation"
"OpenIM/pkg/common/db/unrelation"
"OpenIM/pkg/common/kafka"
"OpenIM/pkg/common/log"
"OpenIM/pkg/common/prome"
"OpenIM/pkg/common/tracelog"
@@ -68,7 +70,7 @@ type MsgDatabase interface {
DeleteReactionExtendMsgSet(ctx context.Context, sourceID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error
MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
GetUserMaxSeq(ctx context.Context, userID string) (int64, error)
GetUserMinSeq(ctx context.Context, userID string) (int64, error)
GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error)
@@ -79,6 +81,7 @@ func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel ca
return &msgDatabase{
msgDocDatabase: msgDocModel,
cache: cacheModel,
producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic),
}
}
@@ -93,6 +96,8 @@ type msgDatabase struct {
msgDocDatabase unRelationTb.MsgDocModelInterface
extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface
cache cache.Model
producer *kafka.Producer
// model
msg unRelationTb.MsgDocModel
extendMsgSetModel unRelationTb.ExtendMsgSetModel
}
@@ -165,9 +170,9 @@ func (db *msgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32,
return db.cache.GetSendMsgStatus(ctx, id)
}
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, mq *pbMsg.MsgDataToMQ) error {
//TODO implement me
panic("implement me")
func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error {
_, _, err := db.producer.SendMessage(ctx, key, msg2mq)
return err
}
func (db *msgDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) {
+2 -2
View File
@@ -10,7 +10,7 @@ import (
const (
singleGocMsgNum = 5000
CChat = "msg"
Msg = "msg"
OldestList = 0
NewestList = -1
)
@@ -38,7 +38,7 @@ type MsgDocModelInterface interface {
}
func (MsgDocModel) TableName() string {
return CChat
return Msg
}
func (MsgDocModel) GetSingleGocMsgNum() int64 {
+1 -1
View File
@@ -61,7 +61,7 @@ func (m *Mongo) GetDatabase() *mongo.Database {
}
func (m *Mongo) CreateMsgIndex() error {
return m.createMongoIndex(unrelation.CChat, false, "uid")
return m.createMongoIndex(unrelation.Msg, false, "uid")
}
func (m *Mongo) CreateSuperGroupIndex() error {
+1 -1
View File
@@ -46,7 +46,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
return &p
}
func (p *Producer) SendMessage(ctx context.Context, m proto.Message, key string) (int32, int64, error) {
func (p *Producer) SendMessage(ctx context.Context, key string, m proto.Message) (int32, int64, error) {
operationID := tracelog.GetOperationID(ctx)
log.Info(operationID, "SendMessage", "key ", key, m.String(), p.producer)
kMsg := &sarama.ProducerMessage{}
+3
View File
@@ -90,3 +90,6 @@ func ParseRedisInterfaceToken(redisToken interface{}) (*Claims, error) {
func IsManagerUserID(opUserID string) bool {
return utils.IsContain(opUserID, config.Config.Manager.AppManagerUid)
}
func WsVerifyToken(token, userID, platformID string) error {
return nil
}