* feat: add EnableHistoryForNewMembers

* feat: change name

* refactor: move first create conversation

* fix: set min seq
This commit is contained in:
icey-yu
2024-08-23 17:34:58 +08:00
committed by GitHub
parent d938754ff6
commit 7deb45e35d
8 changed files with 77 additions and 51 deletions
+20 -15
View File
@@ -105,13 +105,20 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
})
gs.notification = NewGroupNotificationSender(
database,
&msgRpcClient,
&userRpcClient,
&conversationRpcClient,
config,
func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
users, err := userRpcClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return nil, err
}
return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil
},
)
localcache.InitLocalCache(&config.LocalCacheConfig)
gs.conversationRpcClient = conversationRpcClient
gs.msgRpcClient = msgRpcClient
@@ -450,10 +457,10 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, req.InvitedUserIDs); err != nil {
if err = s.notification.MemberEnterNotification(ctx, req.GroupID, req.InvitedUserIDs...); err != nil {
return nil, err
}
s.notification.MemberInvitedNotification(ctx, req.GroupID, req.Reason, req.InvitedUserIDs)
return &pbgroup.InviteUserToGroupResp{}, nil
}
@@ -822,14 +829,13 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
}
switch req.HandleResult {
case constant.GroupResponseAgree:
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.FromUserID}); err != nil {
return nil, err
}
s.notification.GroupApplicationAcceptedNotification(ctx, req)
if member == nil {
log.ZDebug(ctx, "GroupApplicationResponse", "member is nil")
} else {
s.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID)
if err = s.notification.MemberEnterNotification(ctx, req.GroupID, req.FromUserID); err != nil {
return nil, err
}
}
case constant.GroupResponseRefuse:
s.notification.GroupApplicationRejectedNotification(ctx, req)
@@ -889,10 +895,9 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
return nil, err
}
if err := s.conversationRpcClient.GroupChatFirstCreateConversation(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
if err = s.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID); err != nil {
return nil, err
}
s.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID)
s.webhookAfterJoinGroup(ctx, &s.config.WebhooksConfig.AfterJoinGroup, req)
return &pbgroup.JoinGroupResp{}, nil
+35 -28
View File
@@ -21,6 +21,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient/notification"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
@@ -43,12 +44,22 @@ const (
adminReceiver
)
func NewGroupNotificationSender(db controller.GroupDatabase, msgRpcClient *rpcclient.MessageRpcClient, userRpcClient *rpcclient.UserRpcClient, config *Config, fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)) *GroupNotificationSender {
func NewGroupNotificationSender(
db controller.GroupDatabase,
msgRpcClient *rpcclient.MessageRpcClient,
userRpcClient *rpcclient.UserRpcClient,
conversationRpcClient *rpcclient.ConversationRpcClient,
config *Config,
fn func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error),
) *GroupNotificationSender {
return &GroupNotificationSender{
NotificationSender: rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithRpcClient(msgRpcClient), rpcclient.WithUserRpcClient(userRpcClient)),
getUsersInfo: fn,
db: db,
config: config,
conversationRpcClient: conversationRpcClient,
msgRpcClient: msgRpcClient,
}
}
@@ -57,6 +68,9 @@ type GroupNotificationSender struct {
getUsersInfo func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error)
db controller.GroupDatabase
config *Config
conversationRpcClient *rpcclient.ConversationRpcClient
msgRpcClient *rpcclient.MessageRpcClient
}
func (g *GroupNotificationSender) PopulateGroupMember(ctx context.Context, members ...*model.GroupMember) error {
@@ -494,50 +508,43 @@ func (g *GroupNotificationSender) MemberKickedNotification(ctx context.Context,
g.Notification(ctx, mcontext.GetOpUserID(ctx), tips.Group.GroupID, constant.MemberKickedNotification, tips)
}
func (g *GroupNotificationSender) MemberInvitedNotification(ctx context.Context, groupID, reason string, invitedUserIDList []string) {
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID ...string) error {
var err error
defer func() {
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
}
}()
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
}
var users []*sdkws.GroupMemberFullInfo
users, err = g.getGroupMembers(ctx, groupID, invitedUserIDList)
if err != nil {
return
}
tips := &sdkws.MemberInvitedTips{Group: group, InvitedUserList: users}
err = g.fillOpUser(ctx, &tips.OpUser, tips.Group.GroupID)
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberInvitedNotification, tips)
}
func (g *GroupNotificationSender) MemberEnterNotification(ctx context.Context, groupID string, entrantUserID string) {
var err error
defer func() {
if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := g.msgRpcClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
log.ZError(ctx, stringutil.GetFuncName(1)+" failed", err)
return err
}
}()
err = g.conversationRpcClient.SetConversationMinSeq(ctx, entrantUserID, conversationID, maxSeq)
if err != nil {
return err
}
}
if err := g.conversationRpcClient.GroupChatFirstCreateConversation(ctx, groupID, entrantUserID); err != nil {
return err
}
var group *sdkws.GroupInfo
group, err = g.getGroupInfo(ctx, groupID)
if err != nil {
return
return err
}
var user *sdkws.GroupMemberFullInfo
user, err = g.getGroupMember(ctx, groupID, entrantUserID)
users, err := g.getGroupMembers(ctx, groupID, entrantUserID)
if err != nil {
return
return err
}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUser: user}
tips := &sdkws.MemberEnterTips{Group: group, EntrantUsers: users}
g.setVersion(ctx, &tips.GroupMemberVersion, &tips.GroupMemberVersionID, database.GroupMemberVersionName, tips.Group.GroupID)
g.Notification(ctx, mcontext.GetOpUserID(ctx), group.GroupID, constant.MemberEnterNotification, tips)
return nil
}
func (g *GroupNotificationSender) GroupDismissedNotification(ctx context.Context, tips *sdkws.GroupDismissedTips) {