Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
withchao
2023-02-20 17:44:28 +08:00
14 changed files with 291 additions and 342 deletions
+20 -111
View File
@@ -2,132 +2,41 @@ package conversation
import (
"Open_IM/internal/common/check"
chat "Open_IM/internal/rpc/msg"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/cache"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
tableRelation "Open_IM/pkg/common/db/table/relation"
"github.com/OpenIMSDK/openKeeper"
"Open_IM/pkg/common/log"
promePkg "Open_IM/pkg/common/prome"
"Open_IM/internal/common/notification"
pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/utils"
"context"
"github.com/dtm-labs/rockscache"
"net"
"strconv"
"strings"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"Open_IM/pkg/common/config"
"google.golang.org/grpc"
)
type conversationServer struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
groupChecker *check.GroupChecker
groupChecker *check.GroupChecker
controller.ConversationInterface
notify *notification.Check
}
func NewConversationServer(port int) *conversationServer {
log.NewPrivateLog(constant.LogFileName)
c := conversationServer{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
groupChecker: check.NewGroupChecker(),
}
var cDB relation.Conversation
var cCache cache.ConversationCache
//mysql init
var mysql relation.Mysql
err := mysql.InitConn().AutoMigrateModel(&tableRelation.ConversationModel{})
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
db, err := relation.NewGormDB()
if err != nil {
panic("db init err:" + err.Error())
return err
}
if mysql.GormConn() != nil {
//get gorm model
cDB = relation.NewConversationGorm(mysql.GormConn())
} else {
panic("db init err:" + "conn is nil")
if err := db.AutoMigrate(&tableRelation.ConversationModel{}); err != nil {
return err
}
//redis init
var redis cache.RedisClient
redis.InitRedis()
rcClient := rockscache.NewClient(redis.GetClient(), rockscache.Options{
RandomExpireAdjustment: 0.2,
DisableCacheRead: false,
DisableCacheDelete: false,
StrongConsistency: true,
pbConversation.RegisterConversationServer(server, &conversationServer{
groupChecker: check.NewGroupChecker(client),
ConversationInterface: controller.NewConversationController(controller.NewConversationDataBase(controller.NewConversationGorm(db), cache.NewConversationRedis(nil))),
})
cCache = cache.NewConversationRedis(rcClient)
database := controller.NewConversationDataBase(cDB, cCache)
c.ConversationInterface = controller.NewConversationController(database)
return &c
}
func (c *conversationServer) Run() {
log.NewInfo("0", "rpc conversation start...")
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(c.rpcPort)
listener, err := net.Listen("tcp", address)
if err != nil {
panic("listening err:" + err.Error() + c.rpcRegisterName)
}
log.NewInfo("0", "listen network success, ", address, listener)
//grpc server
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
grpcOpts = append(grpcOpts, []grpc.ServerOption{
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
}
srv := grpc.NewServer(grpcOpts...)
defer srv.GracefulStop()
//service registers with etcd
pbConversation.RegisterConversationServer(srv, c)
rpcRegisterIP := config.Config.RpcRegisterIP
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
log.NewInfo("", "rpcRegisterIP", rpcRegisterIP)
err = rpc.RegisterEtcd(c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName, 10, "")
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(),
c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
panic(utils.Wrap(err, "register conversation module rpc to etcd err"))
}
log.NewInfo("0", "RegisterConversationServer ok ", c.etcdSchema, strings.Join(c.etcdAddr, ","), rpcRegisterIP, c.rpcPort, c.rpcRegisterName)
err = srv.Serve(listener)
if err != nil {
log.NewError("0", "Serve failed ", err.Error())
return
}
log.NewInfo("0", "rpc conversation ok")
controller.NewConversationDataBase()
controller.NewConversationController()
return nil
}
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
@@ -179,7 +88,7 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
if err != nil {
return nil, err
}
chat.ConversationChangeNotification(ctx, req.OwnerUserID)
c.notify.ConversationChangeNotification(ctx, req.OwnerUserID)
return resp, nil
}
@@ -196,7 +105,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
var err error
isSyncConversation := true
if req.Conversation.ConversationType == constant.GroupChatType {
groupInfo, err := c.groupChecker.GetGroupInfo(req.Conversation.GroupID)
groupInfo, err := c.groupChecker.GetGroupInfo(ctx, req.Conversation.GroupID)
if err != nil {
return nil, err
}
@@ -213,7 +122,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
if err != nil {
return nil, err
}
chat.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil
}
//haveUserID, err := c.ConversationInterface.GetUserIDExistConversation(ctx, req.UserIDList, req.Conversation.ConversationID)
@@ -247,11 +156,11 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
if isSyncConversation {
for _, v := range req.UserIDList {
chat.ConversationChangeNotification(ctx, v)
c.notify.ConversationChangeNotification(ctx, v)
}
} else {
for _, v := range req.UserIDList {
chat.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
c.notify.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
}
}
return resp, nil
+18 -15
View File
@@ -7,10 +7,10 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db/controller"
"Open_IM/pkg/common/db/relation"
relationTb "Open_IM/pkg/common/db/table/relation"
tablerelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
registry "Open_IM/pkg/discoveryregistry"
pbfriend "Open_IM/pkg/proto/friend"
"Open_IM/pkg/utils"
"context"
@@ -23,7 +23,7 @@ type friendServer struct {
controller.BlackInterface
notification *notification.Check
userCheck *check.UserCheck
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
RegisterCenter registry.SvcDiscoveryRegistry
}
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
@@ -31,7 +31,7 @@ func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
if err != nil {
return err
}
if err := mysql.AutoMigrate(&relationTb.FriendModel{}, &relationTb.FriendRequestModel{}, &relationTb.BlackModel{}); err != nil {
if err := mysql.AutoMigrate(&tablerelation.FriendModel{}, &tablerelation.FriendRequestModel{}, &tablerelation.BlackModel{}); err != nil {
return err
}
pbfriend.RegisterFriendServer(server, &friendServer{
@@ -102,7 +102,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *pbfriend.Res
if err := s.userCheck.Access(ctx, req.ToUserID); err != nil {
return nil, err
}
friendRequest := relationTb.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
friendRequest := tablerelation.FriendRequestModel{FromUserID: req.FromUserID, ToUserID: req.ToUserID, HandleMsg: req.HandleMsg, HandleResult: req.HandleResult}
if req.HandleResult == constant.FriendResponseAgree {
err := s.AgreeFriendRequest(ctx, &friendRequest)
if err != nil {
@@ -158,20 +158,21 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbfriend.SetFri
// ok
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *pbfriend.GetDesignatedFriendsReq) (resp *pbfriend.GetDesignatedFriendsResp, err error) {
resp = &pbfriend.GetDesignatedFriendsResp{}
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
if utils.Duplicate(req.FriendUserIDs) {
return nil, constant.ErrArgs.Wrap("friend userID repeated")
}
friends, total, err := s.FriendInterface.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
friends, err := s.FriendInterface.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err
}
resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends)
if err != nil {
if resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends); err != nil {
return nil, err
}
resp.Total = int32(total)
return resp, nil
}
// ok 获取接收到的好友申请(即别人主动申请的)
@@ -223,15 +224,17 @@ func (s *friendServer) IsFriend(ctx context.Context, req *pbfriend.IsFriendReq)
// ok
func (s *friendServer) GetPaginationFriends(ctx context.Context, req *pbfriend.GetPaginationFriendsReq) (resp *pbfriend.GetPaginationFriendsResp, err error) {
resp = &pbfriend.GetPaginationFriendsResp{}
if utils.Duplicate(req.FriendUserIDs) {
return nil, constant.ErrArgs.Wrap("friend userID repeated")
if err := s.userCheck.Access(ctx, req.UserID); err != nil {
return nil, err
}
friends, err := s.FriendInterface.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
friends, total, err := s.FriendInterface.PageOwnerFriends(ctx, req.UserID, req.Pagination.PageNumber, req.Pagination.ShowNumber)
if err != nil {
return nil, err
}
if resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends); err != nil {
resp.FriendsInfo, err = (*convert.NewDBFriend(nil, s.RegisterCenter)).DB2PB(ctx, friends)
if err != nil {
return nil, err
}
resp.Total = int32(total)
return resp, nil
}
+14 -17
View File
@@ -10,7 +10,7 @@ import (
tablerelation "Open_IM/pkg/common/db/table/relation"
"Open_IM/pkg/common/tokenverify"
"Open_IM/pkg/common/tracelog"
discoveryRegistry "Open_IM/pkg/discoveryregistry"
registry "Open_IM/pkg/discoveryregistry"
"Open_IM/pkg/proto/sdkws"
pbuser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
@@ -24,7 +24,8 @@ type userServer struct {
notification *notification.Check
userCheck *check.UserCheck
ConversationChecker *check.ConversationChecker
RegisterCenter discoveryRegistry.SvcDiscoveryRegistry
RegisterCenter registry.SvcDiscoveryRegistry
friendCheck *check.FriendChecker
}
func Start(client *openKeeper.ZkClient, server *grpc.Server) error {
@@ -58,10 +59,6 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
return resp, nil
}
func (s *userServer) GetAllPageFriends(ctx context.Context, ownerUserID string) (resp []*sdkws.FriendInfo, err error) {
return
}
// ok
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
resp = &pbuser.UpdateUserInfoResp{}
@@ -69,14 +66,6 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
//oldNickname := ""
//if req.UserInfo.Nickname != "" {
// u, err := s.FindWithError(ctx, []string{req.UserInfo.UserID})
// if err != nil {
// return nil, err
// }
// oldNickname = u[0].Nickname
//}
user, err := convert.NewPBUser(req.UserInfo).Convert()
if err != nil {
return nil, err
@@ -85,7 +74,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
friends, err := s.GetAllPageFriends(ctx, req.UserInfo.UserID)
friends, err := s.friendCheck.GetAllPageFriends(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
@@ -94,9 +83,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v.FriendUser.UserID, tracelog.GetOpUserID(ctx))
}
}()
s.notification.UserInfoUpdatedNotification(ctx, tracelog.GetOpUserID(ctx), req.UserInfo.UserID)
return resp, nil
}
@@ -184,3 +171,13 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
}
return resp, nil
}
func (s *userServer) GetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.GetGlobalRecvMessageOptReq) (resp *pbuser.GetGlobalRecvMessageOptResp, err error) {
resp = &pbuser.GetGlobalRecvMessageOptResp{}
user, err := s.FindWithError(ctx, []string{req.UserID})
if err != nil {
return nil, err
}
resp.GlobalRecvMsgOpt = user[0].GlobalRecvMsgOpt
return resp, nil
}