notification

This commit is contained in:
wangchuxiao
2023-04-23 19:50:42 +08:00
parent c80babaef6
commit 94b5703399
32 changed files with 286 additions and 1126 deletions
+5 -4
View File
@@ -2,6 +2,7 @@ package auth
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@@ -12,14 +13,14 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
pbAuth "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/auth"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msggateway"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"google.golang.org/grpc"
)
type authServer struct {
authDatabase controller.AuthDatabase
userCheck *check.UserCheck
userRpcClient *rpcclient.UserClient
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
}
@@ -29,7 +30,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
return err
}
pbAuth.RegisterAuthServer(server, &authServer{
userCheck: check.NewUserCheck(client),
userRpcClient: rpcclient.NewUserClient(client),
RegisterCenter: client,
authDatabase: controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire),
})
@@ -38,7 +39,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
func (s *authServer) UserToken(ctx context.Context, req *pbAuth.UserTokenReq) (*pbAuth.UserTokenResp, error) {
resp := pbAuth.UserTokenResp{}
if _, err := s.userCheck.GetUserInfo(ctx, req.UserID); err != nil {
if _, err := s.userRpcClient.GetUserInfo(ctx, req.UserID); err != nil {
return nil, err
}
token, err := s.authDatabase.CreateToken(ctx, req.UserID, constant.PlatformIDToName(int(req.PlatformID)))
+10 -11
View File
@@ -12,16 +12,15 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
notification "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification2"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"google.golang.org/grpc"
)
type conversationServer struct {
groupChecker *check.GroupChecker
// groupChecker *check.GroupChecker
controller.ConversationDatabase
notify *notification.Check
conversationNotificationSender *notification.ConversationNotificationSender
}
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -38,8 +37,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}
conversationDB := relation.NewConversationGorm(db)
pbConversation.RegisterConversationServer(server, &conversationServer{
notify: notification.NewCheck(client),
groupChecker: check.NewGroupChecker(client),
conversationNotificationSender: notification.NewConversationNotificationSender(client),
// groupChecker: check.NewGroupChecker(client),
ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)),
})
return nil
@@ -93,7 +92,7 @@ func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbC
if err != nil {
return nil, err
}
c.notify.ConversationChangeNotification(ctx, req.OwnerUserID)
c.conversationNotificationSender.ConversationChangeNotification(ctx, req.OwnerUserID)
resp := &pbConversation.BatchSetConversationsResp{}
return resp, nil
}
@@ -107,7 +106,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbConvers
if err != nil {
return nil, err
}
c.notify.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID)
c.conversationNotificationSender.ConversationChangeNotification(ctx, req.Conversation.OwnerUserID)
resp := &pbConversation.SetConversationResp{}
return resp, nil
}
@@ -142,7 +141,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
if err != nil {
return nil, err
}
c.notify.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
c.conversationNotificationSender.ConversationSetPrivateNotification(ctx, req.Conversation.OwnerUserID, req.Conversation.UserID, req.Conversation.IsPrivateChat)
return resp, nil
}
filedMap := make(map[string]interface{})
@@ -172,11 +171,11 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
if isSyncConversation {
for _, v := range req.UserIDList {
c.notify.ConversationChangeNotification(ctx, v)
c.conversationNotificationSender.ConversationChangeNotification(ctx, v)
}
} else {
for _, v := range req.UserIDList {
c.notify.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
c.conversationNotificationSender.ConversationUnreadChangeNotification(ctx, v, req.Conversation.ConversationID, req.Conversation.UpdateUnreadCountTime)
}
}
return resp, nil
+10 -9
View File
@@ -10,6 +10,7 @@ import (
"time"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mw/specialerror"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
@@ -48,19 +49,19 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
return err
}
pbGroup.RegisterGroupServer(server, &groupServer{
GroupDatabase: controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()),
UserCheck: check.NewUserCheck(client),
Notification: notification.NewCheck(client),
ConversationChecker: check.NewConversationChecker(client),
GroupDatabase: controller.InitGroupDatabase(db, rdb, mongo.GetDatabase()),
UserCheck: check.NewUserCheck(client),
Notification: notification.NewCheck(client),
conversationRpcClient: rpcclient.NewConversationClient(client),
})
return nil
}
type groupServer struct {
GroupDatabase controller.GroupDatabase
UserCheck *check.UserCheck
Notification *notification.Check
ConversationChecker *check.ConversationChecker
GroupDatabase controller.GroupDatabase
UserCheck *check.UserCheck
Notification *notification.Check
conversationRpcClient *rpcclient.ConversationClient
}
func (s *groupServer) CheckGroupAdmin(ctx context.Context, groupID string) error {
@@ -750,7 +751,7 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf
FieldType: constant.FieldGroupAtType,
UserIDList: userIDs,
}
if err := s.ConversationChecker.ModifyConversationField(ctx, args); err != nil {
if err := s.conversationRpcClient.ModifyConversationField(ctx, args); err != nil {
log.ZWarn(ctx, "modifyConversationField failed", err, "args", args)
}
}
+12 -11
View File
@@ -2,6 +2,7 @@ package msg
import (
"context"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -14,7 +15,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/go-redis/redis/v8"
"google.golang.org/grpc"
)
@@ -24,12 +25,12 @@ type msgServer struct {
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
MsgDatabase controller.MsgDatabase
ExtendMsgDatabase controller.ExtendMsgDatabase
Group *check.GroupChecker
User *check.UserCheck
Conversation *check.ConversationChecker
friend *check.FriendChecker
// Group *check.GroupChecker
User *rpcclient.UserClient
Conversation *rpcclient.ConversationClient
friend *rpcclient.FriendClient
*localcache.GroupLocalCache
black *check.BlackChecker
black *rpcclient.BlackClient
MessageLocker MessageLocker
Handlers MessageInterceptorChain
}
@@ -64,15 +65,15 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel)
s := &msgServer{
Conversation: check.NewConversationChecker(client),
User: check.NewUserCheck(client),
Group: check.NewGroupChecker(client),
Conversation: rpcclient.NewConversationClient(client),
User: rpcclient.NewUserClient(client),
// Group: check.NewGroupChecker(client),
MsgDatabase: msgDatabase,
ExtendMsgDatabase: extendMsgDatabase,
RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(client),
black: check.NewBlackChecker(client),
friend: check.NewFriendChecker(client),
black: rpcclient.NewBlackClient(client),
friend: rpcclient.NewFriendClient(client),
MessageLocker: NewLockerMessage(cacheModel),
}
s.addInterceptorHandler(MessageHasReadEnabled, MessageModifyCallback)
+5 -4
View File
@@ -2,6 +2,8 @@ package third
import (
"context"
"net/url"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
@@ -10,9 +12,8 @@ import (
relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation"
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/third"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"google.golang.org/grpc"
"net/url"
)
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -37,7 +38,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}
third.RegisterThirdServer(server, &thirdServer{
thirdDatabase: controller.NewThirdDatabase(cache.NewCacheModel(rdb)),
userCheck: check.NewUserCheck(client),
userRpcClient: rpcclient.NewUserClient(client),
s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db), u),
})
return nil
@@ -46,7 +47,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
type thirdServer struct {
thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database
userCheck *check.UserCheck
userRpcClient *rpcclient.UserClient
}
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
+25 -29
View File
@@ -17,20 +17,18 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
pbuser "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/user"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/check"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/convert"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
notification "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification2"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"google.golang.org/grpc"
)
type userServer struct {
controller.UserDatabase
notification *notification.Check
userCheck *check.UserCheck
conversationChecker *check.ConversationChecker
RegisterCenter registry.SvcDiscoveryRegistry
friendCheck *check.FriendChecker
notificationSender *notification.FriendNotificationSender
friendRpcClient *rpcclient.FriendClient
RegisterCenter registry.SvcDiscoveryRegistry
}
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -54,13 +52,12 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
}
userDB := relation.NewUserGorm(db)
cache := cache.NewUserCacheRedis(rdb, userDB, cache.GetDefaultOpt())
database := controller.NewUserDatabase(userDB, cache, tx.NewGorm(db))
u := &userServer{
UserDatabase: controller.NewUserDatabase(userDB, cache, tx.NewGorm(db)),
notification: notification.NewCheck(client),
userCheck: check.NewUserCheck(client),
friendCheck: check.NewFriendChecker(client),
conversationChecker: check.NewConversationChecker(client),
RegisterCenter: client,
UserDatabase: database,
RegisterCenter: client,
friendRpcClient: rpcclient.NewFriendClient(client),
notificationSender: notification.NewFriendNotificationSender(client, notification.WithDBFunc(database.FindWithError)),
}
pbuser.RegisterUserServer(server, u)
return u.UserDatabase.InitOnce(context.Background(), users)
@@ -73,7 +70,7 @@ func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesig
if err != nil {
return nil, err
}
resp.UsersInfo, err = (*convert.DBUser)(nil).DB2PB(users)
resp.UsersInfo = convert.UsersDB2Pb(users)
if err != nil {
return nil, err
}
@@ -87,7 +84,7 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
user, err := convert.NewPBUser(req.UserInfo).Convert()
user := convert.UserPb2DB(req.UserInfo)
if err != nil {
return nil, err
}
@@ -95,16 +92,16 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
if err != nil {
return nil, err
}
friends, err := s.friendCheck.GetFriendIDs(ctx, req.UserInfo.UserID)
friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
go func() {
for _, v := range friends {
s.notification.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, mcontext.GetOpUserID(ctx))
s.notificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, v, mcontext.GetOpUserID(ctx))
}
}()
s.notification.UserInfoUpdatedNotification(ctx, mcontext.GetOpUserID(ctx), req.UserInfo.UserID)
s.notificationSender.UserInfoUpdatedNotification(ctx, mcontext.GetOpUserID(ctx), req.UserInfo.UserID)
return resp, nil
}
@@ -119,7 +116,7 @@ func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.Se
if err := s.UpdateByMap(ctx, req.UserID, m); err != nil {
return nil, err
}
s.notification.UserInfoUpdatedNotification(ctx, req.UserID, req.UserID)
s.notificationSender.UserInfoUpdatedNotification(ctx, req.UserID, req.UserID)
return resp, nil
}
@@ -155,14 +152,16 @@ func (s *userServer) AccountCheck(ctx context.Context, req *pbuser.AccountCheckR
// ok
func (s *userServer) GetPaginationUsers(ctx context.Context, req *pbuser.GetPaginationUsersReq) (resp *pbuser.GetPaginationUsersResp, err error) {
resp = &pbuser.GetPaginationUsersResp{}
usersDB, total, err := s.Page(ctx, req.Pagination.PageNumber, req.Pagination.ShowNumber)
var pageNumber, showNumber int32
if req.Pagination != nil {
pageNumber = req.Pagination.PageNumber
showNumber = req.Pagination.ShowNumber
}
users, total, err := s.Page(ctx, pageNumber, showNumber)
if err != nil {
return nil, err
}
resp.Total = int32(total)
resp.Users, err = (*convert.DBUser)(nil).DB2PB(usersDB)
return resp, err
return &pbuser.GetPaginationUsersResp{Total: int32(total), Users: convert.UsersDB2Pb(users)}, err
}
// ok
@@ -208,13 +207,11 @@ func (s *userServer) UserRegister(ctx context.Context, req *pbuser.UserRegisterR
}
func (s *userServer) GetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.GetGlobalRecvMessageOptReq) (resp *pbuser.GetGlobalRecvMessageOptResp, err error) {
resp = &pbuser.GetGlobalRecvMessageOptResp{}
user, err := s.FindWithError(ctx, []string{req.UserID})
if err != nil {
return nil, err
}
resp.GlobalRecvMsgOpt = user[0].GlobalRecvMsgOpt
return resp, nil
return &pbuser.GetGlobalRecvMessageOptResp{GlobalRecvMsgOpt: user[0].GlobalRecvMsgOpt}, nil
}
func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDReq) (resp *pbuser.GetAllUserIDResp, err error) {
@@ -222,6 +219,5 @@ func (s *userServer) GetAllUserID(ctx context.Context, req *pbuser.GetAllUserIDR
if err != nil {
return nil, err
}
resp = &pbuser.GetAllUserIDResp{UserIDs: userIDs}
return resp, nil
return &pbuser.GetAllUserIDResp{UserIDs: userIDs}, nil
}