mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-10 20:15:59 +08:00
feat: support stream message (#2824)
* fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * update gomake version * update gomake version * fix: seq conversion bug * fix: redis pipe exec * fix: ImportFriends * fix: A large number of logs keysAndValues length is not even * feat: mark read aggregate write * feat: online status supports redis cluster * feat: online status supports redis cluster * feat: online status supports redis cluster * merge * merge * read seq is written to mongo * read seq is written to mongo * fix: invitation to join group notification * fix: friend op_user_id * feat: optimizing asynchronous context * feat: optimizing memamq size * feat: add GetSeqMessage * feat: GroupApplicationAgreeMemberEnterNotification * feat: GroupApplicationAgreeMemberEnterNotification * feat: go.mod * feat: go.mod * feat: join group notification and get seq * feat: join group notification and get seq * feat: avoid pulling messages from sessions with a large number of max seq values of 0 * feat: API supports gzip * go.mod * fix: nil pointer error on close * fix: listen error * fix: listen error * update go.mod * feat: add log * fix: token parse token value * fix: GetMsgBySeqs boundary issues * fix: sn_ not sort * fix: sn_ not sort * fix: sn_ not sort * fix: jssdk add * fix: jssdk support * fix: jssdk support * fix: jssdk support * fix: the message I sent is not set to read seq in mongodb * fix: cannot modify group member avatars * fix: MemberEnterNotification * fix: MemberEnterNotification * fix: MsgData status * feat: stream msg * feat: support stream messages --------- Co-authored-by: withchao <withchao@users.noreply.github.com>
This commit is contained in:
@@ -48,3 +48,7 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv
|
||||
}
|
||||
m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips)
|
||||
}
|
||||
|
||||
func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) {
|
||||
m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips)
|
||||
}
|
||||
|
||||
@@ -34,6 +34,11 @@ import (
|
||||
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
||||
if req.MsgData != nil {
|
||||
m.encapsulateMsgData(req.MsgData)
|
||||
if req.MsgData.ContentType == constant.Stream {
|
||||
if err := m.handlerStreamMsg(ctx, req.MsgData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
switch req.MsgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
return m.sendMsgSingleChat(ctx, req)
|
||||
|
||||
@@ -42,8 +42,9 @@ type (
|
||||
|
||||
// MsgServer encapsulates dependencies required for message handling.
|
||||
msgServer struct {
|
||||
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
|
||||
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
|
||||
RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration.
|
||||
MsgDatabase controller.CommonMsgDatabase // Interface for message database operations.
|
||||
StreamMsgDatabase controller.StreamMsgDatabase
|
||||
Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service.
|
||||
UserLocalCache *rpccache.UserLocalCache // Local cache for user data.
|
||||
FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data.
|
||||
@@ -101,6 +102,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
streamMsg, err := mgo.NewStreamMsgMongo(mgocli.GetDB())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
seqUserCache := redis.NewSeqUserCacheRedis(rdb, seqUser)
|
||||
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel, seqUserCache, seqConversationCache, &config.KafkaConfig)
|
||||
if err != nil {
|
||||
@@ -109,6 +114,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
||||
s := &msgServer{
|
||||
Conversation: &conversationClient,
|
||||
MsgDatabase: msgDatabase,
|
||||
StreamMsgDatabase: controller.NewStreamMsgDatabase(streamMsg),
|
||||
RegisterCenter: client,
|
||||
UserLocalCache: rpccache.NewUserLocalCache(userRpcClient, &config.LocalCacheConfig, rdb),
|
||||
GroupLocalCache: rpccache.NewGroupLocalCache(groupRpcClient, &config.LocalCacheConfig, rdb),
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"time"
|
||||
)
|
||||
|
||||
const StreamDeadlineTime = time.Second * 60 * 10
|
||||
|
||||
func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error {
|
||||
now := time.Now()
|
||||
val := &model.StreamMsg{
|
||||
ClientMsgID: msgData.ClientMsgID,
|
||||
ConversationID: msgprocessor.GetConversationIDByMsg(msgData),
|
||||
UserID: msgData.SendID,
|
||||
CreateTime: now,
|
||||
DeadlineTime: now.Add(StreamDeadlineTime),
|
||||
}
|
||||
return m.StreamMsgDatabase.CreateStreamMsg(ctx, val)
|
||||
}
|
||||
|
||||
func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) {
|
||||
res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
now := time.Now()
|
||||
if !res.End && res.DeadlineTime.Before(now) {
|
||||
res.End = true
|
||||
res.DeadlineTime = now
|
||||
_ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) {
|
||||
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.End {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("stream msg is end")
|
||||
}
|
||||
if len(res.Packets) < int(req.StartIndex) {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("start index is invalid")
|
||||
}
|
||||
if val := len(res.Packets) - int(req.StartIndex); val > 0 {
|
||||
exist := res.Packets[int(req.StartIndex):]
|
||||
for i, s := range exist {
|
||||
if len(req.Packets) == 0 {
|
||||
break
|
||||
}
|
||||
if s != req.Packets[i] {
|
||||
return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i))
|
||||
}
|
||||
req.StartIndex++
|
||||
req.Packets = req.Packets[1:]
|
||||
}
|
||||
}
|
||||
if len(req.Packets) == 0 && res.End == req.End {
|
||||
return &msg.AppendStreamMsgResp{}, nil
|
||||
}
|
||||
deadlineTime := time.Now().Add(StreamDeadlineTime)
|
||||
if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversation, err := m.Conversation.GetConversation(ctx, res.UserID, res.ConversationID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tips := &sdkws.StreamMsgTips{
|
||||
ConversationID: res.ConversationID,
|
||||
ClientMsgID: res.ClientMsgID,
|
||||
StartIndex: req.StartIndex,
|
||||
Packets: req.Packets,
|
||||
End: req.End,
|
||||
}
|
||||
var (
|
||||
recvID string
|
||||
sessionType int32
|
||||
)
|
||||
if conversation.GroupID == "" {
|
||||
sessionType = constant.SingleChatType
|
||||
recvID = conversation.UserID
|
||||
} else {
|
||||
sessionType = constant.ReadGroupChatType
|
||||
recvID = conversation.GroupID
|
||||
}
|
||||
m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips)
|
||||
return &msg.AppendStreamMsgResp{}, nil
|
||||
}
|
||||
|
||||
func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) {
|
||||
res, err := m.getStreamMsg(ctx, req.ClientMsgID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &msg.GetStreamMsgResp{
|
||||
ClientMsgID: res.ClientMsgID,
|
||||
ConversationID: res.ConversationID,
|
||||
UserID: res.UserID,
|
||||
Packets: res.Packets,
|
||||
End: res.End,
|
||||
CreateTime: res.CreateTime.UnixMilli(),
|
||||
DeadlineTime: res.DeadlineTime.UnixMilli(),
|
||||
}, nil
|
||||
}
|
||||
Reference in New Issue
Block a user