Merge branch 'errcode' of https://github.com/OpenIMSDK/Open-IM-Server into errcode

This commit is contained in:
withchao
2023-03-24 14:48:17 +08:00
46 changed files with 1005 additions and 879 deletions
+20 -10
View File
@@ -4,9 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"github.com/golang/protobuf/proto"
"runtime/debug"
"sync"
)
@@ -37,7 +40,7 @@ type Client struct {
isCompress bool
userID string
isBackground bool
connID string
ctx *UserConnContext
onlineAt int64 // 上线时间戳(毫秒)
longConnServer LongConnServer
closed bool
@@ -50,7 +53,7 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
platformID: utils.StringToInt(ctx.GetPlatformID()),
isCompress: isCompress,
userID: ctx.GetUserID(),
connID: ctx.GetConnID(),
ctx: ctx,
onlineAt: utils.GetCurrentTimestampByMill(),
}
}
@@ -60,7 +63,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress boo
c.platformID = utils.StringToInt(ctx.GetPlatformID())
c.isCompress = isCompress
c.userID = ctx.GetUserID()
c.connID = ctx.GetConnID()
c.ctx = ctx
c.onlineAt = utils.GetCurrentTimestampByMill()
c.longConnServer = longConnServer
}
@@ -69,7 +72,7 @@ func (c *Client) readMessage() {
if r := recover(); r != nil {
fmt.Println("socket have panic err:", r, string(debug.Stack()))
}
//c.close()
c.close()
}()
//var returnErr error
for {
@@ -119,11 +122,7 @@ func (c *Client) handleMessage(message []byte) error {
if binaryReq.SendID != c.userID {
return errors.New("exception conn userID not same to req userID")
}
ctx := context.Background()
ctx = context.WithValue(ctx, ConnID, c.connID)
ctx = context.WithValue(ctx, OperationID, binaryReq.OperationID)
ctx = context.WithValue(ctx, CommonUserID, binaryReq.SendID)
ctx = context.WithValue(ctx, PlatformID, c.platformID)
ctx := mcontext.WithMustInfoCtx([]string{binaryReq.OperationID, binaryReq.SendID, constant.PlatformIDToName(c.platformID), c.ctx.GetConnID()})
var messageErr error
var resp []byte
switch binaryReq.ReqIdentifier {
@@ -161,6 +160,7 @@ func (c *Client) setAppBackgroundStatus(ctx context.Context, req Req) ([]byte, e
func (c *Client) close() {
c.w.Lock()
defer c.w.Unlock()
c.closed = true
c.conn.Close()
c.longConnServer.UnRegister(c)
@@ -175,7 +175,17 @@ func (c *Client) replyMessage(binaryReq *Req, err error, resp []byte) {
_ = c.writeMsg(mReply)
}
func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error {
return nil
data, err := proto.Marshal(msgData)
if err != nil {
return err
}
resp := Resp{
ReqIdentifier: WSPushMsg,
OperationID: mcontext.GetOperationID(ctx),
Data: data,
}
return c.writeMsg(resp)
}
func (c *Client) KickOnlineMessage(ctx context.Context) error {
+37 -1
View File
@@ -1,9 +1,11 @@
package msggateway
import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"net/http"
"strconv"
"time"
)
type UserConnContext struct {
@@ -12,6 +14,36 @@ type UserConnContext struct {
Path string
Method string
RemoteAddr string
ConnID string
}
func (c *UserConnContext) Deadline() (deadline time.Time, ok bool) {
return
}
func (c *UserConnContext) Done() <-chan struct{} {
return nil
}
func (c *UserConnContext) Err() error {
return nil
}
func (c *UserConnContext) Value(key any) any {
switch key {
case constant.OpUserID:
return c.GetUserID()
case constant.OperationID:
return c.GetOperationID()
case constant.ConnID:
return c.GetConnID()
case constant.OpUserPlatform:
return constant.PlatformIDToName(utils.StringToInt(c.GetPlatformID()))
case constant.RemoteAddr:
return c.RemoteAddr
default:
return ""
}
}
func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnContext {
@@ -21,6 +53,7 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
Path: req.URL.Path,
Method: req.Method,
RemoteAddr: req.RemoteAddr,
ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))),
}
}
func (c *UserConnContext) Query(key string) (string, bool) {
@@ -44,7 +77,7 @@ func (c *UserConnContext) ErrReturn(error string, code int) {
http.Error(c.RespWriter, error, code)
}
func (c *UserConnContext) GetConnID() string {
return utils.Md5(c.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill())))
return c.ConnID
}
func (c *UserConnContext) GetUserID() string {
return c.Req.URL.Query().Get(WsUserID)
@@ -52,3 +85,6 @@ func (c *UserConnContext) GetUserID() string {
func (c *UserConnContext) GetPlatformID() string {
return c.Req.URL.Query().Get(PlatformID)
}
func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID)
}
+1 -2
View File
@@ -31,7 +31,6 @@ type Server struct {
prometheusPort int
LongConnServer LongConnServer
pushTerminal []int
//rpcServer *RpcServer
}
func (s *Server) SetLongConnServer(LongConnServer LongConnServer) {
@@ -67,7 +66,7 @@ func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUs
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(client.platformID)
ps.Status = constant.OnlineStatus
ps.ConnID = client.connID
ps.ConnID = client.ctx.GetConnID()
ps.IsBackground = client.isBackground
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
+4 -9
View File
@@ -2,7 +2,7 @@ package msggateway
import (
"errors"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
@@ -97,7 +97,6 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
clients: newUserMap(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
//handler: NewGrpcHandler(validate),
}, nil
}
func (ws *WsServer) Run() error {
@@ -127,8 +126,7 @@ func (ws *WsServer) registerClient(client *Client) {
ws.clients.Set(client.userID, client)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
} else {
if clientOK { //已经有同平台的连接存在
ws.clients.Set(client.userID, client)
@@ -136,11 +134,9 @@ func (ws *WsServer) registerClient(client *Client) {
} else {
ws.clients.Set(client.userID, client)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
}
}
log.ZInfo(client.ctx, "user online", "online user Num", ws.onlineUserNum, "online user conn Num", ws.onlineUserConnNum)
}
func (ws *WsServer) multiTerminalLoginChecker(client []*Client) {
@@ -153,8 +149,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
atomic.AddInt64(&ws.onlineUserNum, -1)
}
atomic.AddInt64(&ws.onlineUserConnNum, -1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
log.ZInfo(client.ctx, "user offline", "online user Num", ws.onlineUserNum, "online user conn Num", ws.onlineUserConnNum)
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
+5 -3
View File
@@ -2,14 +2,16 @@ package msgtransfer
import (
"fmt"
"sync"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"sync"
)
type MsgTransfer struct {
@@ -38,9 +40,9 @@ func StartTransfer(prometheusPort int) error {
cacheModel := cache.NewCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db))
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase)
@@ -8,6 +8,7 @@ package msgtransfer
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -65,7 +66,7 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs
}
if tag {
log.NewInfo(operationID, "msg_transfer msg persisting", string(msg))
if err = pc.chatLogDatabase.CreateChatLog(msgFromMQ); err != nil {
if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil {
log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
return
}
+1
View File
@@ -2,6 +2,7 @@ package push
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
-2
View File
@@ -9,7 +9,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
pbPush "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/push"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sync"
)
@@ -23,7 +22,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
return err
}
cacheModel := cache.NewCacheModel(rdb)
client.AddOption(grpc.WithTransportCredentials(insecure.NewCredentials()))
offlinePusher := NewOfflinePusher(cacheModel)
database := controller.NewPushDatabase(cacheModel)
pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client))
+2 -2
View File
@@ -71,7 +71,7 @@ func (p *Pusher) MsgToUser(ctx context.Context, userID string, msg *sdkws.MsgDat
}
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
//log.NewInfo(operationID, "push_result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush)
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userID)
p.successCount++
if isOfflinePush && userID != msg.SendID {
// save invitation info for offline push
@@ -177,6 +177,7 @@ func (p *Pusher) MsgToSuperGroupUser(ctx context.Context, groupID string, msg *s
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
conns, err := p.client.GetConns(config.Config.RpcRegisterName.OpenImMessageGatewayName)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil {
return nil, err
}
@@ -185,7 +186,6 @@ func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData,
msgClient := msggateway.NewMsgGatewayClient(v)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
if err != nil {
continue
}
if reply != nil && reply.SinglePushResult != nil {
+22 -6
View File
@@ -10,6 +10,7 @@ import (
tableRelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
@@ -35,9 +36,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
if err != nil {
return err
}
conversationDB := relation.NewConversationGorm(db)
pbConversation.RegisterConversationServer(server, &conversationServer{
groupChecker: check.NewGroupChecker(client),
ConversationDatabase: controller.NewConversationDatabase(relation.NewConversationGorm(db), cache.NewConversationRedis(rdb, cache.GetDefaultOpt()), tx.NewGorm(db)),
ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
})
return nil
}
@@ -54,7 +56,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
}
return resp, nil
}
return nil, nil
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
@@ -70,11 +72,11 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon
}
func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) {
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.ConversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs)
if err != nil {
return nil, err
}
resp := &pbConversation.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
if err := utils.CopyStructFields(&resp.Conversations, conversations); err != nil {
return nil, err
}
@@ -82,7 +84,6 @@ func (c *conversationServer) GetConversations(ctx context.Context, req *pbConver
}
func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) {
resp := &pbConversation.BatchSetConversationsResp{}
var conversations []*tableRelation.ConversationModel
if err := utils.CopyStructFields(&conversations, req.Conversations); err != nil {
return nil, err
@@ -92,15 +93,30 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
return nil, err
}
c.notify.ConversationChangeNotification(ctx, req.OwnerUserID)
resp := &pbConversation.BatchSetConversationsResp{}
return resp, nil
}
func (c *conversationServer) SetConversation(ctx context.Context, req *pbConversation.SetConversationReq) (*pbConversation.SetConversationResp, error) {
panic("implement me")
var conversation tableRelation.ConversationModel
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
}
err := c.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tableRelation.ConversationModel{&conversation})
if err != nil {
return nil, err
}
c.notify.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID)
resp := &pbConversation.SetConversationResp{}
return resp, nil
}
func (c *conversationServer) SetRecvMsgOpt(ctx context.Context, req *pbConversation.SetRecvMsgOptReq) (*pbConversation.SetRecvMsgOptResp, error) {
panic("implement me")
conversation := tableRelation.ConversationModel{OwnerUserID: req.OwnerUserID, ConversationID: req.ConversationID, RecvMsgOpt: req.RecvMsgOpt}
if err := c.SetUsersConversationFiledTx(ctx, []string{req.OwnerUserID}, &conversation, map[string]interface{}{"recv_msg_opt": req.RecvMsgOpt}); err != nil {
return nil, err
}
return &pbConversation.SetRecvMsgOptResp{}, nil
}
func (c *conversationServer) ModifyConversationField(ctx context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) {
+9 -2
View File
@@ -4,6 +4,7 @@ import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/relation"
tablerelation "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
@@ -35,9 +36,15 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
if err := db.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
return err
}
rdb, err := cache.NewRedis()
if err != nil {
return err
}
blackDB := relation.NewBlackGorm(db)
friendDB := relation.NewFriendGorm(db)
pbfriend.RegisterFriendServer(server, &friendServer{
FriendDatabase: controller.NewFriendDatabase(relation.NewFriendGorm(db), relation.NewFriendRequestGorm(db), tx.NewGorm(db)),
BlackDatabase: controller.NewBlackDatabase(relation.NewBlackGorm(db)),
FriendDatabase: controller.NewFriendDatabase(friendDB, relation.NewFriendRequestGorm(db), cache.NewFriendCacheRedis(rdb, friendDB, cache.GetDefaultOpt()), tx.NewGorm(db)),
BlackDatabase: controller.NewBlackDatabase(blackDB, cache.NewBlackCacheRedis(rdb, blackDB, cache.GetDefaultOpt())),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
RegisterCenter: client,
+3 -2
View File
@@ -1,9 +1,10 @@
package group
import (
pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"time"
pbGroup "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
)
func UpdateGroupInfoMap(group *sdkws.GroupInfoForSet) map[string]any {
+3 -2
View File
@@ -6,6 +6,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
@@ -58,8 +59,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
cacheModel := cache.NewCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase())
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel)
extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt())
extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCacheModel, tx.NewMongo(mongo.GetClient()))
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
s := &msgServer{