mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-07 18:45:58 +08:00
feat: ver3 branch
Signed-off-by: kubbot & kubecub <3293172751ysy@gmail.com>
This commit is contained in:
@@ -215,9 +215,9 @@ type config struct {
|
||||
SingleMessageHasReadReceiptEnable bool `yaml:"singleMessageHasReadReceiptEnable"`
|
||||
RetainChatRecords int `yaml:"retainChatRecords"`
|
||||
ChatRecordsClearTime string `yaml:"chatRecordsClearTime"`
|
||||
Secret string `yaml:"secret"`
|
||||
TokenPolicy struct {
|
||||
AccessSecret string `yaml:"accessSecret"`
|
||||
AccessExpire int64 `yaml:"accessExpire"`
|
||||
Expire int64 `yaml:"expire"`
|
||||
} `yaml:"tokenPolicy"`
|
||||
MessageVerify struct {
|
||||
FriendVerify *bool `yaml:"friendVerify"`
|
||||
@@ -301,3 +301,8 @@ type notification struct {
|
||||
ConversationChanged NotificationConf `yaml:"conversationChanged"`
|
||||
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
|
||||
}
|
||||
|
||||
func GetServiceNames() []string {
|
||||
return []string{Config.RpcRegisterName.OpenImUserName, Config.RpcRegisterName.OpenImFriendName, Config.RpcRegisterName.OpenImMsgName, Config.RpcRegisterName.OpenImPushName, Config.RpcRegisterName.OpenImMessageGatewayName,
|
||||
Config.RpcRegisterName.OpenImGroupName, Config.RpcRegisterName.OpenImAuthName, Config.RpcRegisterName.OpenImConversationName, Config.RpcRegisterName.OpenImThirdName}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ func (c *conversationDatabase) SetUsersConversationFiledTx(ctx context.Context,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...)
|
||||
cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...).DelConvsersations(conversation.ConversationID, NotUserIDs...)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
@@ -171,6 +171,7 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
|
||||
var conversationIDs []string
|
||||
for _, conversation := range conversations {
|
||||
conversationIDs = append(conversationIDs, conversation.ConversationID)
|
||||
cache = cache.DelConvsersations(conversation.OwnerUserID, conversation.ConversationID)
|
||||
}
|
||||
conversationTx := c.conversationDB.NewTx(tx)
|
||||
existConversations, err := conversationTx.Find(ctx, ownerUserID, conversationIDs)
|
||||
@@ -203,7 +204,6 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs
|
||||
}
|
||||
cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID)
|
||||
}
|
||||
cache = cache.DelConvsersations(ownerUserID, existConversationIDs...)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
@@ -228,6 +228,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context,
|
||||
for _, v := range notExistUserIDs {
|
||||
conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
|
||||
conversations = append(conversations, &conversation)
|
||||
cache = cache.DelConvsersations(v, conversationID)
|
||||
}
|
||||
cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...)
|
||||
if len(conversations) > 0 {
|
||||
|
||||
@@ -19,9 +19,9 @@ import (
|
||||
|
||||
func Test_BatchInsertChat2DB(t *testing.T) {
|
||||
config.Config.Mongo.Address = []string{"192.168.44.128:37017"}
|
||||
config.Config.Mongo.Timeout = 60
|
||||
// config.Config.Mongo.Timeout = 60
|
||||
config.Config.Mongo.Database = "openIM"
|
||||
config.Config.Mongo.Source = "admin"
|
||||
// config.Config.Mongo.Source = "admin"
|
||||
config.Config.Mongo.Username = "root"
|
||||
config.Config.Mongo.Password = "openIM123"
|
||||
config.Config.Mongo.MaxPoolSize = 100
|
||||
@@ -130,9 +130,9 @@ func Test_BatchInsertChat2DB(t *testing.T) {
|
||||
|
||||
func GetDB() *commonMsgDatabase {
|
||||
config.Config.Mongo.Address = []string{"192.168.44.128:37017"}
|
||||
config.Config.Mongo.Timeout = 60
|
||||
// config.Config.Mongo.Timeout = 60
|
||||
config.Config.Mongo.Database = "openIM"
|
||||
config.Config.Mongo.Source = "admin"
|
||||
// config.Config.Mongo.Source = "admin"
|
||||
config.Config.Mongo.Username = "root"
|
||||
config.Config.Mongo.Password = "openIM123"
|
||||
config.Config.Mongo.MaxPoolSize = 100
|
||||
|
||||
@@ -105,7 +105,6 @@ func (c *s3Database) urlName(name string) string {
|
||||
Host: c.url.Host,
|
||||
Path: c.url.Path,
|
||||
RawPath: c.url.RawPath,
|
||||
OmitHost: c.url.OmitHost,
|
||||
ForceQuery: c.url.ForceQuery,
|
||||
RawQuery: c.url.RawQuery,
|
||||
Fragment: c.url.Fragment,
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||
)
|
||||
@@ -13,7 +12,7 @@ type ConversationLocalCache struct {
|
||||
lock sync.Mutex
|
||||
superGroupRecvMsgNotNotifyUserIDs map[string]Hash
|
||||
conversationIDs map[string]Hash
|
||||
client *rpcclient.Conversation
|
||||
client *rpcclient.ConversationRpcClient
|
||||
}
|
||||
|
||||
type Hash struct {
|
||||
@@ -21,11 +20,11 @@ type Hash struct {
|
||||
ids []string
|
||||
}
|
||||
|
||||
func NewConversationLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache {
|
||||
func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache {
|
||||
return &ConversationLocalCache{
|
||||
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
|
||||
conversationIDs: make(map[string]Hash),
|
||||
client: rpcclient.NewConversation(discov),
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||
@@ -13,7 +12,7 @@ import (
|
||||
type GroupLocalCache struct {
|
||||
lock sync.Mutex
|
||||
cache map[string]GroupMemberIDsHash
|
||||
client *rpcclient.Group
|
||||
client *rpcclient.GroupRpcClient
|
||||
}
|
||||
|
||||
type GroupMemberIDsHash struct {
|
||||
@@ -21,8 +20,7 @@ type GroupMemberIDsHash struct {
|
||||
userIDs []string
|
||||
}
|
||||
|
||||
func NewGroupLocalCache(discov discoveryregistry.SvcDiscoveryRegistry) *GroupLocalCache {
|
||||
client := rpcclient.NewGroup(discov)
|
||||
func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache {
|
||||
return &GroupLocalCache{
|
||||
cache: make(map[string]GroupMemberIDsHash, 0),
|
||||
client: client,
|
||||
|
||||
@@ -26,7 +26,7 @@ func init() {
|
||||
|
||||
func initAesKey() {
|
||||
once.Do(func() {
|
||||
key := md5.Sum([]byte("openim:" + config.Config.TokenPolicy.AccessSecret))
|
||||
key := md5.Sum([]byte("openim:" + config.Config.Secret))
|
||||
var err error
|
||||
block, err = aes.NewCipher(key[:])
|
||||
if err != nil {
|
||||
|
||||
@@ -2,12 +2,11 @@ package mw
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCheck(t *testing.T) {
|
||||
config.Config.TokenPolicy.Secret = "123456"
|
||||
// config.Config.TokenPolicy.Secret = "123456"
|
||||
|
||||
args := []string{"1", "2", "3"}
|
||||
|
||||
|
||||
+20
-77
@@ -1,10 +1,7 @@
|
||||
package mw
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/apiresp"
|
||||
@@ -19,54 +16,22 @@ import (
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type GinMwOptions func(*gin.RouterGroup)
|
||||
|
||||
func WithRecovery() GinMwOptions {
|
||||
return func(group *gin.RouterGroup) {
|
||||
group.Use(gin.Recovery())
|
||||
}
|
||||
}
|
||||
|
||||
func WithCorsHandler() GinMwOptions {
|
||||
return func(group *gin.RouterGroup) {
|
||||
group.Use(CorsHandler())
|
||||
}
|
||||
}
|
||||
|
||||
func WithGinParseOperationID() GinMwOptions {
|
||||
return func(group *gin.RouterGroup) {
|
||||
group.Use(GinParseOperationID())
|
||||
}
|
||||
}
|
||||
|
||||
func WithGinParseToken(rdb redis.UniversalClient) GinMwOptions {
|
||||
return func(group *gin.RouterGroup) {
|
||||
group.Use(GinParseToken(rdb))
|
||||
}
|
||||
}
|
||||
|
||||
func NewRouterGroup(routerGroup *gin.RouterGroup, route string, options ...GinMwOptions) *gin.RouterGroup {
|
||||
routerGroup = routerGroup.Group(route)
|
||||
for _, option := range options {
|
||||
option(routerGroup)
|
||||
}
|
||||
return routerGroup
|
||||
}
|
||||
|
||||
func CorsHandler() gin.HandlerFunc {
|
||||
return func(context *gin.Context) {
|
||||
context.Writer.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
context.Header("Access-Control-Allow-Methods", "*")
|
||||
context.Header("Access-Control-Allow-Headers", "*")
|
||||
context.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar") // 跨域关键设置 让浏览器可以解析
|
||||
context.Header("Access-Control-Max-Age", "172800") // 缓存请求信息 单位为秒
|
||||
context.Header("Access-Control-Allow-Credentials", "false") // 跨域请求是否需要带cookie信息 默认设置为true
|
||||
context.Header("content-type", "application/json") // 设置返回格式是json
|
||||
return func(c *gin.Context) {
|
||||
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
c.Header("Access-Control-Allow-Methods", "*")
|
||||
c.Header("Access-Control-Allow-Headers", "*")
|
||||
c.Header("Access-Control-Expose-Headers", "Content-Length, Access-Control-Allow-Origin, Access-Control-Allow-Headers,Cache-Control,Content-Language,Content-Type,Expires,Last-Modified,Pragma,FooBar") // 跨域关键设置 让浏览器可以解析
|
||||
c.Header("Access-Control-Max-Age", "172800") // 缓存请求信息 单位为秒
|
||||
c.Header("Access-Control-Allow-Credentials", "false") // 跨域请求是否需要带cookie信息 默认设置为true
|
||||
c.Header("content-type", "application/json") // 设置返回格式是json
|
||||
//Release all option pre-requests
|
||||
if context.Request.Method == http.MethodOptions {
|
||||
context.JSON(http.StatusOK, "Options Request!")
|
||||
if c.Request.Method == http.MethodOptions {
|
||||
c.JSON(http.StatusOK, "Options Request!")
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
context.Next()
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,42 +40,19 @@ func GinParseOperationID() gin.HandlerFunc {
|
||||
if c.Request.Method == http.MethodPost {
|
||||
operationID := c.Request.Header.Get(constant.OperationID)
|
||||
if operationID == "" {
|
||||
body, err := io.ReadAll(c.Request.Body)
|
||||
if err != nil {
|
||||
log.ZWarn(c, "read request body error", errs.ErrArgs.Wrap("read request body error: "+err.Error()))
|
||||
apiresp.GinError(c, errs.ErrArgs.Wrap("read request body error: "+err.Error()))
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
req := struct {
|
||||
OperationID string `json:"operationID"`
|
||||
}{}
|
||||
if err := json.Unmarshal(body, &req); err != nil {
|
||||
log.ZWarn(c, "json unmarshal error", errs.ErrArgs.Wrap(err.Error()))
|
||||
apiresp.GinError(c, errs.ErrArgs.Wrap("json unmarshal error"+err.Error()))
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
if req.OperationID == "" {
|
||||
err := errors.New("header must have operationID")
|
||||
log.ZWarn(c, "header must have operationID", err)
|
||||
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
c.Request.Body = io.NopCloser(bytes.NewReader(body))
|
||||
operationID = req.OperationID
|
||||
c.Request.Header.Set(constant.OperationID, operationID)
|
||||
err := errors.New("header must have operationID")
|
||||
apiresp.GinError(c, errs.ErrArgs.Wrap(err.Error()))
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
c.Set(constant.OperationID, operationID)
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
|
||||
dataBase := controller.NewAuthDatabase(cache.NewMsgCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire)
|
||||
dataBase := controller.NewAuthDatabase(cache.NewMsgCacheModel(rdb), config.Config.Secret, config.Config.TokenPolicy.Expire)
|
||||
return func(c *gin.Context) {
|
||||
switch c.Request.Method {
|
||||
case http.MethodPost:
|
||||
@@ -157,6 +99,7 @@ func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc {
|
||||
}
|
||||
} else {
|
||||
apiresp.GinError(c, errs.ErrTokenNotExist.Wrap())
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
c.Set(constant.OpUserPlatform, constant.PlatformIDToName(claims.PlatformID))
|
||||
|
||||
@@ -23,7 +23,6 @@ func RpcClientInterceptor(ctx context.Context, method string, req, resp interfac
|
||||
if ctx == nil {
|
||||
return errs.ErrInternalServer.Wrap("call rpc request context is nil")
|
||||
}
|
||||
log.ZInfo(ctx, "rpc client req", "funcName", method, "req", rpcString(req), "invoker", invoker, "invoker_type", fmt.Sprintf("%T", invoker))
|
||||
ctx, err = getRpcContext(ctx, method)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -54,6 +53,7 @@ func RpcClientInterceptor(ctx context.Context, method string, req, resp interfac
|
||||
}
|
||||
|
||||
func getRpcContext(ctx context.Context, method string) (context.Context, error) {
|
||||
// ctx, _ = context.WithTimeout(ctx, time.Second*5)
|
||||
md := metadata.Pairs()
|
||||
if keys, _ := ctx.Value(constant.RpcCustomHeader).([]string); len(keys) > 0 {
|
||||
for _, key := range keys {
|
||||
|
||||
@@ -3,6 +3,7 @@ package mw
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/checker"
|
||||
"math"
|
||||
"runtime"
|
||||
"strings"
|
||||
@@ -92,7 +93,12 @@ func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary
|
||||
}
|
||||
}
|
||||
log.ZInfo(ctx, "rpc server req", "funcName", funcName, "req", rpcString(req))
|
||||
resp, err = handler(ctx, req)
|
||||
resp, err = func() (interface{}, error) {
|
||||
if err := checker.Validate(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return handler(ctx, req)
|
||||
}()
|
||||
if err == nil {
|
||||
log.ZInfo(ctx, "rpc server resp", "funcName", funcName, "resp", rpcString(resp))
|
||||
return resp, nil
|
||||
|
||||
@@ -32,7 +32,7 @@ func BuildClaims(uid string, platformID int, ttl int64) Claims {
|
||||
|
||||
func secret() jwt.Keyfunc {
|
||||
return func(token *jwt.Token) (interface{}, error) {
|
||||
return []byte(config.Config.TokenPolicy.AccessSecret), nil
|
||||
return []byte(config.Config.Secret), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,10 +9,10 @@ import (
|
||||
)
|
||||
|
||||
func Test_ParseToken(t *testing.T) {
|
||||
config.Config.TokenPolicy.AccessSecret = "OpenIM_server"
|
||||
config.Config.Secret = "OpenIM_server"
|
||||
claims1 := BuildClaims("123456", constant.AndroidPadPlatformID, 10)
|
||||
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims1)
|
||||
tokenString, err := token.SignedString([]byte(config.Config.TokenPolicy.AccessSecret))
|
||||
tokenString, err := token.SignedString([]byte(config.Config.Secret))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user