This commit is contained in:
withchao
2023-01-11 14:29:37 +08:00
parent 46c3b5a6ea
commit 1fc7c4434c
5 changed files with 128 additions and 46 deletions
+4 -45
View File
@@ -9,25 +9,20 @@ import (
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/middleware"
promePkg "Open_IM/pkg/common/prometheus"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/common/trace_log"
cp "Open_IM/pkg/common/utils"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"github.com/OpenIMSDK/getcdv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"path"
"runtime/debug"
pbCache "Open_IM/pkg/proto/cache"
pbConversation "Open_IM/pkg/proto/conversation"
pbGroup "Open_IM/pkg/proto/group"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context"
"errors"
"github.com/OpenIMSDK/getcdv3"
"math/big"
"net"
"strconv"
@@ -58,42 +53,6 @@ func NewGroupServer(port int) *groupServer {
}
}
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.NewError("", info.FullMethod, "panic", r, "stack", string(debug.Stack()))
}
}()
funcName := path.Base(info.FullMethod)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errors.New("not metadata")
}
operationID := md.Get("operationID")[0]
opUserID := md.Get("opUserID")[0]
ctx = trace_log.NewRpcCtx(ctx, funcName, operationID)
defer trace_log.ShowLog(ctx)
_ = opUserID
trace_log.SetContextInfo(ctx, funcName, err, "rpc req", req.(interface{ String() string }).String())
resp, err = handler(ctx, req)
if err != nil {
errInfo := constant.ToAPIErrWithErr(err)
var code codes.Code
if errInfo.ErrCode == 0 {
code = codes.Unknown
} else {
code = codes.Code(errInfo.ErrCode)
}
sta, err := status.New(code, errInfo.ErrMsg).WithDetails(wrapperspb.String(errInfo.DetailErrMsg))
if err != nil {
return nil, err
}
return nil, sta.Err()
}
trace_log.SetContextInfo(ctx, funcName, err, "rpc resp", resp.(interface{ String() string }).String())
return
}
func (s *groupServer) Run() {
log.NewInfo("", "group rpc start ")
listenIP := ""
@@ -116,7 +75,7 @@ func (s *groupServer) Run() {
var grpcOpts = []grpc.ServerOption{
grpc.MaxRecvMsgSize(recvSize),
grpc.MaxSendMsgSize(sendSize),
grpc.UnaryInterceptor(UnaryServerInterceptor),
grpc.UnaryInterceptor(middleware.RpcServerInterceptor),
}
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()