mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-12 13:05:58 +08:00
merge v3dev into main (#504)
* statistics user register * refactor: router change * minio init * UserRegisterCount * push use local conn * refactor: user pb update * remove online push close grpc conn * refactor: user pb update * refactor:pb file * msgs statistics * msgs statistics * revoke userID * refactor: errcode update * active user * active user * active user * refactor: errcode update * feat: conn update token * active user * active user * feat: conn update token * active user * feat: conn update token * feat: conn update token * feat: conn update token * add tx_oss cos * active user * active user * group create * group create * feat: group notification show to conversation * feat: group notification show to conversation * group active * user active * sendNotificationWithName * withname * privateChat * a2r call option * grpc with detail return error * change log error * chain unary interceptor * api nil slice map * fix sync has read * fix: text update * fix: update add model * set conversations update * set privateChat * fix: content update * remove unuse rpc * msgDestruct * cron use rpc mw * set IsMsgDestruct * msg destruct * msgDestruct * s3 minio, cos, oss support * feat: add implement of GetUsersOnlineStatus, #472 (#477) * s3 minio, cos, oss support * s3 route * remove extendMsg code * s3 route * remove unuse code * s3 pb * s3 pb * s3 pb * s3 presigned put * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * s3 presigned test * Update .gitignore (#482) * s3 debug log * s3 debug log * cron add log and fix cron * add log * cron * s3 config * fix kick user bug * s3 cos * add kick log * s3 cos test * s3 cos test * s3 cos test * kick user log * kickuserlog * s3 cos copy * s3 cos copy * s3 url * s3 url * s3 AccessURL * log * s3 InitiateMultipartUpload add ExpireTime --------- Co-authored-by: withchao <993506633@qq.com> Co-authored-by: wangchuxiao <wangchuxiao97@outlook.com> Co-authored-by: BanTanger <88583317+BanTanger@users.noreply.github.com> Co-authored-by: withchao <48119764+withchao@users.noreply.github.com> Co-authored-by: Alan <68671759+hanzhixiao@users.noreply.github.com>
This commit is contained in:
+14
-104
@@ -1,17 +1,3 @@
|
||||
// Copyright © 2023 OpenIM. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package push
|
||||
|
||||
import (
|
||||
@@ -52,16 +38,9 @@ type Pusher struct {
|
||||
|
||||
var errNoOfflinePusher = errors.New("no offlinePusher is configured")
|
||||
|
||||
func NewPusher(
|
||||
discov discoveryregistry.SvcDiscoveryRegistry,
|
||||
offlinePusher offlinepush.OfflinePusher,
|
||||
database controller.PushDatabase,
|
||||
groupLocalCache *localcache.GroupLocalCache,
|
||||
conversationLocalCache *localcache.ConversationLocalCache,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient,
|
||||
groupRpcClient *rpcclient.GroupRpcClient,
|
||||
msgRpcClient *rpcclient.MessageRpcClient,
|
||||
) *Pusher {
|
||||
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
|
||||
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache,
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient) *Pusher {
|
||||
return &Pusher{
|
||||
discov: discov,
|
||||
database: database,
|
||||
@@ -108,18 +87,7 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
|
||||
return err
|
||||
}
|
||||
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
|
||||
log.ZDebug(
|
||||
ctx,
|
||||
"push_result",
|
||||
"ws push result",
|
||||
wsResults,
|
||||
"sendData",
|
||||
msg,
|
||||
"isOfflinePush",
|
||||
isOfflinePush,
|
||||
"push_to_userID",
|
||||
userIDs,
|
||||
)
|
||||
log.ZDebug(ctx, "push_result", "ws push result", wsResults, "sendData", msg, "isOfflinePush", isOfflinePush, "push_to_userID", userIDs)
|
||||
p.successCount++
|
||||
for _, userID := range userIDs {
|
||||
if isOfflinePush && userID != msg.SendID {
|
||||
@@ -170,15 +138,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
||||
}
|
||||
defer func(groupID string, userIDs []string) {
|
||||
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
||||
log.ZError(
|
||||
ctx,
|
||||
"MemberQuitNotification DeleteMemberAndSetConversationSeq",
|
||||
err,
|
||||
"groupID",
|
||||
groupID,
|
||||
"userIDs",
|
||||
userIDs,
|
||||
)
|
||||
log.ZError(ctx, "MemberQuitNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
|
||||
}
|
||||
}(groupID, []string{tips.QuitUser.UserID})
|
||||
pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID)
|
||||
@@ -187,21 +147,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
||||
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
kickedUsers := utils.Slice(
|
||||
tips.KickedUserList,
|
||||
func(e *sdkws.GroupMemberFullInfo) string { return e.UserID },
|
||||
)
|
||||
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
|
||||
defer func(groupID string, userIDs []string) {
|
||||
if err := p.DeleteMemberAndSetConversationSeq(ctx, groupID, userIDs); err != nil {
|
||||
log.ZError(
|
||||
ctx,
|
||||
"MemberKickedNotification DeleteMemberAndSetConversationSeq",
|
||||
err,
|
||||
"groupID",
|
||||
groupID,
|
||||
"userIDs",
|
||||
userIDs,
|
||||
)
|
||||
log.ZError(ctx, "MemberKickedNotification DeleteMemberAndSetConversationSeq", err, "groupID", groupID, "userIDs", userIDs)
|
||||
}
|
||||
}(groupID, kickedUsers)
|
||||
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
|
||||
@@ -211,16 +160,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
||||
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
|
||||
return err
|
||||
}
|
||||
log.ZInfo(
|
||||
ctx,
|
||||
"GroupDismissedNotificationInfo****",
|
||||
"groupID",
|
||||
groupID,
|
||||
"num",
|
||||
len(pushToUserIDs),
|
||||
"list",
|
||||
pushToUserIDs,
|
||||
)
|
||||
log.ZInfo(ctx, "GroupDismissedNotificationInfo****", "groupID", groupID, "num", len(pushToUserIDs), "list", pushToUserIDs)
|
||||
if len(config.Config.Manager.UserID) > 0 {
|
||||
ctx = mcontext.WithOpUserIDContext(ctx, config.Config.Manager.UserID[0])
|
||||
}
|
||||
@@ -284,23 +224,9 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
||||
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
|
||||
return err
|
||||
}
|
||||
_, err := p.GetConnsAndOnlinePush(
|
||||
ctx,
|
||||
msg,
|
||||
utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs),
|
||||
)
|
||||
_, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
|
||||
if err != nil {
|
||||
log.ZError(
|
||||
ctx,
|
||||
"offlinePushMsg failed",
|
||||
err,
|
||||
"groupID",
|
||||
groupID,
|
||||
"msg",
|
||||
msg,
|
||||
"userIDs",
|
||||
utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs),
|
||||
)
|
||||
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, WebAndPcBackgroundUserIDs))
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -308,11 +234,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pusher) GetConnsAndOnlinePush(
|
||||
ctx context.Context,
|
||||
msg *sdkws.MsgData,
|
||||
pushToUserIDs []string,
|
||||
) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
|
||||
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
|
||||
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
|
||||
if err != nil {
|
||||
@@ -321,11 +243,7 @@ func (p *Pusher) GetConnsAndOnlinePush(
|
||||
//Online push message
|
||||
for _, v := range conns {
|
||||
msgClient := msggateway.NewMsgGatewayClient(v)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(
|
||||
ctx,
|
||||
&msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs},
|
||||
)
|
||||
p.discov.CloseConn(v)
|
||||
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: pushToUserIDs})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@@ -338,12 +256,7 @@ func (p *Pusher) GetConnsAndOnlinePush(
|
||||
return wsResults, nil
|
||||
}
|
||||
|
||||
func (p *Pusher) offlinePushMsg(
|
||||
ctx context.Context,
|
||||
conversationID string,
|
||||
msg *sdkws.MsgData,
|
||||
offlinePushUserIDs []string,
|
||||
) error {
|
||||
func (p *Pusher) offlinePushMsg(ctx context.Context, conversationID string, msg *sdkws.MsgData, offlinePushUserIDs []string) error {
|
||||
title, content, opts, err := p.getOfflinePushInfos(conversationID, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -377,10 +290,7 @@ func (p *Pusher) GetOfflinePushOpts(msg *sdkws.MsgData) (opts *offlinepush.Opts,
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func (p *Pusher) getOfflinePushInfos(
|
||||
conversationID string,
|
||||
msg *sdkws.MsgData,
|
||||
) (title, content string, opts *offlinepush.Opts, err error) {
|
||||
func (p *Pusher) getOfflinePushInfos(conversationID string, msg *sdkws.MsgData) (title, content string, opts *offlinepush.Opts, err error) {
|
||||
if p.offlinePusher == nil {
|
||||
err = errNoOfflinePusher
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user