Merge remote-tracking branch 'origin/errcode' into errcode

This commit is contained in:
Gordon
2023-05-19 17:10:30 +08:00
12 changed files with 145 additions and 53 deletions
+3 -3
View File
@@ -40,12 +40,12 @@ func (ModifyMsgConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { r
func (mmc *ModifyMsgConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
log.ZDebug(ctx, "kafka get info to mysql", "ModifyMsgConsumerHandler", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
ctx := mmc.modifyMsgConsumerGroup.GetContextFromMsg(msg)
mmc.ModifyMsg(ctx, msg, string(msg.Key), sess)
} else {
log.Error("", "msg get from kafka but is nil", msg.Key)
log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key)
}
sess.MarkMessage(msg, "")
}
+8 -10
View File
@@ -14,7 +14,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext"
pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
@@ -38,16 +37,15 @@ func NewPersistentConsumerHandler(database controller.ChatLogDatabase) *Persiste
func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMsg *sarama.ConsumerMessage, msgKey string, _ sarama.ConsumerGroupSession) {
msg := cMsg.Value
operationID := mcontext.GetOperationID(ctx)
log.NewInfo("msg come here mysql!!!", "", "msg", string(msg), msgKey)
var tag bool
msgFromMQ := pbMsg.MsgDataToMQ{}
err := proto.Unmarshal(msg, &msgFromMQ)
if err != nil {
log.NewError(operationID, "msg_transfer Unmarshal msg err", "msg", string(msg), "err", err.Error())
log.ZError(ctx, "msg_transfer Unmarshal msg err", err)
return
}
log.Debug(operationID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String())
return
log.ZDebug(ctx, "handleChatWs2Mysql", "msg", msgFromMQ.MsgData)
//Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent)
//Only process receiver data
@@ -65,9 +63,9 @@ func (pc *PersistentConsumerHandler) handleChatWs2Mysql(ctx context.Context, cMs
tag = true
}
if tag {
log.NewInfo(operationID, "msg_transfer msg persisting", string(msg))
log.ZInfo(ctx, "msg_transfer msg persisting", "msg", string(msg))
if err = pc.chatLogDatabase.CreateChatLog(&msgFromMQ); err != nil {
log.NewError(operationID, "Message insert failed", "err", err.Error(), "msg", msgFromMQ.String())
log.ZError(ctx, "Message insert failed", err, "msg", msgFromMQ.String())
return
}
}
@@ -77,12 +75,12 @@ func (PersistentConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
func (PersistentConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (pc *PersistentConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg)
log.ZDebug(ctx, "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value), "key", string(msg.Key))
if len(msg.Value) != 0 {
ctx := pc.persistentConsumerGroup.GetContextFromMsg(msg)
pc.handleChatWs2Mysql(ctx, msg, string(msg.Key), sess)
} else {
log.Error("", "msg get from kafka but is nil", msg.Key)
log.ZError(ctx, "msg get from kafka but is nil", nil, "key", msg.Key)
}
sess.MarkMessage(msg, "")
}
+26
View File
@@ -2,6 +2,7 @@ package push
import (
"context"
"encoding/json"
"errors"
"github.com/OpenIMSDK/Open-IM-Server/internal/push/offlinepush"
@@ -101,6 +102,16 @@ func (p *Pusher) Push2User(ctx context.Context, userIDs []string, msg *sdkws.Msg
return nil
}
func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t interface{}) error {
var notificationElem struct {
Detail string `json:"detail,omitempty"`
}
if err := json.Unmarshal(bytes, &notificationElem); err != nil {
return err
}
return json.Unmarshal([]byte(notificationElem.Detail), t)
}
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
operationID := mcontext.GetOperationID(ctx)
log.Debug(operationID, "Get super group msg from msg_transfer And push msg", msg.String(), groupID)
@@ -113,6 +124,21 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if err != nil {
return err
}
switch msg.ContentType {
case constant.MemberQuitNotification:
var tips sdkws.MemberQuitTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
pushToUserIDs = append(pushToUserIDs, tips.QuitUser.UserID)
case constant.MemberKickedNotification:
var tips sdkws.MemberKickedTips
if p.UnmarshalNotificationElem(msg.Content, &tips) != nil {
return err
}
kickedUsers := utils.Slice(tips.KickedUserList, func(e *sdkws.GroupMemberFullInfo) string { return e.UserID })
pushToUserIDs = append(pushToUserIDs, kickedUsers...)
}
}
wsResults, err := p.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
if err != nil {
+7 -7
View File
@@ -50,7 +50,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}
func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) {
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID})
if err != nil {
return nil, err
@@ -58,16 +57,17 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers
if len(conversations) < 1 {
return nil, errs.ErrRecordNotFound.Wrap("conversation not found")
}
resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}}
resp.Conversation = convert.ConversationDB2Pb(conversations[0])
return resp, nil
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) {
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
resp.Conversations = convert.ConversationsDB2Pb(conversations)
return resp, nil
}
@@ -297,7 +297,7 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbConv
}
func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *pbConversation.GetConversationsHasReadAndMaxSeqReq) (*pbConversation.GetConversationsHasReadAndMaxSeqResp, error) {
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.UserID)
conversations, err := c.conversationDatabase.GetUserAllHasReadSeqs(ctx, req.UserID)
if err != nil {
return nil, err
}
@@ -306,10 +306,10 @@ func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Contex
return nil, err
}
resp := &pbConversation.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*pbConversation.Seqs)}
for _, v := range conversations {
resp.Seqs[v.ConversationID] = &pbConversation.Seqs{
HasReadSeq: v.HasReadSeq,
MaxSeq: maxSeqs.MaxSeqs[v.ConversationID],
for conversationID, seq := range conversations {
resp.Seqs[conversationID] = &pbConversation.Seqs{
HasReadSeq: seq,
MaxSeq: maxSeqs.MaxSeqs[conversationID],
}
}
return resp, nil
+1 -1
View File
@@ -21,7 +21,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter)
return nil, err
}
err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData)
err = m.MsgDatabase.MsgToMQ(ctx, utils.GetConversationIDByMsg(req.MsgData), req.MsgData)
if err != nil {
return nil, err
}