Merge branch 'shichuang' of github.com:OpenIMSDK/Open-IM-Server into shichuang

This commit is contained in:
wangchuxiao
2022-09-01 21:59:09 +08:00
15 changed files with 2895 additions and 4625 deletions
+1 -1
View File
@@ -26,7 +26,7 @@ func SetConversation(c *gin.Context) {
return
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req)
reqPb.Conversation = &pbUser.Conversation{}
reqPb.Conversation = &pbConversation.Conversation{}
err := utils.CopyStructFields(&reqPb, req)
err = utils.CopyStructFields(reqPb.Conversation, req.Conversation)
if err != nil {
+1 -1
View File
@@ -367,7 +367,7 @@ func GetSelfUserInfo(c *gin.Context) {
log.NewInfo(req.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
} else {
resp := api.GetSelfUserInfoResp{CommResp: api.CommResp{ErrCode: RpcResp.CommonResp.ErrCode, ErrMsg: RpcResp.CommonResp.ErrMsg}}
resp := api.GetSelfUserInfoResp{CommResp: api.CommResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}
log.NewInfo(req.OperationID, "GetUserInfo api return ", resp)
c.JSON(http.StatusOK, resp)
}
+1
View File
@@ -134,6 +134,7 @@ func (ws *WServer) MultiTerminalLoginRemoteChecker(userID string, platformID int
resp, err := client.MultiTerminalLoginCheck(context.Background(), req)
if err != nil {
log.Error(operationID, "MultiTerminalLoginCheck failed ", err.Error())
continue
}
if resp.ErrCode != 0 {
log.Error(operationID, "MultiTerminalLoginCheck errCode, errMsg: ", resp.ErrCode, resp.ErrMsg)
+9 -1
View File
@@ -102,7 +102,10 @@ type Options struct {
} `json:"HW"`
XM struct {
ChannelID string `json:"/extra.channel_id"`
} `json:""`
} `json:"XM"`
VV struct {
Classification int `json:"/classification"`
} `json:"VV"`
}
type PushResp struct {
@@ -156,6 +159,11 @@ func (g *Getui) Push(userIDList []string, alert, detailContent, operationID stri
XM: struct {
ChannelID string `json:"/extra.channel_id"`
}{ChannelID: "high_system"},
VV: struct {
Classification int "json:\"/classification\""
}{
Classification: 1,
},
}
pushResp := PushResp{}
err = g.request(PushURL, pushReq, token, &pushResp, operationID)
+7
View File
@@ -13,6 +13,7 @@ import (
"Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/msg"
pbPush "Open_IM/pkg/proto/push"
"Open_IM/pkg/utils"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
@@ -43,6 +44,11 @@ func (ms *PushConsumerHandler) handleMs2PsChat(msg []byte) {
MsgData: msgFromMQ.MsgData,
PushToUserID: msgFromMQ.PushToUserID,
}
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := utils.GetCurrentTimestampBySecond()
if nowSec-sec > 10 {
return
}
switch msgFromMQ.MsgData.SessionType {
case constant.SuperGroupChatType:
MsgToSuperGroupUser(pbData)
@@ -59,6 +65,7 @@ func (ms *PushConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
for msg := range claim.Messages() {
log.NewDebug("", "kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
ms.msgHandle[msg.Topic](msg.Value)
sess.MarkMessage(msg, "")
}
return nil
}
+6
View File
@@ -74,6 +74,7 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"attached_info": conversation.AttachedInfo})
case constant.FieldUnread:
isSyncConversation = false
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"update_unread_count_time": utils.GetCurrentTimestampByMill()})
}
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "UpdateColumnsConversations error", err.Error())
@@ -82,6 +83,11 @@ func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbCo
}
for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) {
conversation.OwnerUserID = v
conversation.UpdateUnreadCountTime = utils.GetCurrentTimestampByMill()
err = rocksCache.DelUserConversationIDListFromCache(v)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
err := imdb.SetOneConversation(conversation)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
+9 -8
View File
@@ -6,7 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
rocksCache "Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
@@ -19,12 +19,13 @@ import (
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context"
"google.golang.org/grpc"
"math/big"
"net"
"strconv"
"strings"
"time"
"google.golang.org/grpc"
)
type groupServer struct {
@@ -381,7 +382,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
haveConUserID = append(haveConUserID, v.OwnerUserID)
}
var reqPb pbUser.SetConversationReq
var c pbUser.Conversation
var c pbConversation.Conversation
for _, v := range conversations {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v.OwnerUserID
@@ -634,7 +635,7 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
}
}
var reqPb pbUser.SetConversationReq
var c pbUser.Conversation
var c pbConversation.Conversation
for _, v := range okUserIDList {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v
@@ -867,7 +868,7 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G
}
var reqPb pbUser.SetConversationReq
reqPb.OperationID = req.OperationID
var c pbUser.Conversation
var c pbConversation.Conversation
conversation, err := imdb.GetConversation(req.FromUserID, utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType))
if err != nil {
c.OwnerUserID = req.FromUserID
@@ -1061,7 +1062,7 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq)
}
//modify quitter conversation info
var reqPb pbUser.SetConversationReq
var c pbUser.Conversation
var c pbConversation.Conversation
reqPb.OperationID = req.OperationID
c.OwnerUserID = req.OpUserID
c.ConversationID = utils.GetConversationIDBySessionType(req.GroupID, constant.GroupChatType)
@@ -1515,7 +1516,7 @@ func (s *groupServer) RemoveGroupMembersCMS(_ context.Context, req *pbGroup.Remo
OpUserID: req.OpUserID,
}
var reqPb pbUser.SetConversationReq
var c pbUser.Conversation
var c pbConversation.Conversation
for _, v := range resp.Success {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v
@@ -1700,7 +1701,7 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
}
//modify quitter conversation info
var reqPb pbUser.SetConversationReq
var c pbUser.Conversation
var c pbConversation.Conversation
for _, v := range memberList {
reqPb.OperationID = req.OperationID
c.OwnerUserID = v.UserID
+6 -5
View File
@@ -11,6 +11,7 @@ import (
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
pbFriend "Open_IM/pkg/proto/friend"
sdkws "Open_IM/pkg/proto/sdk_ws"
pbUser "Open_IM/pkg/proto/user"
@@ -83,7 +84,7 @@ func (s *userServer) Run() {
log.NewInfo("0", "rpc user success")
}
func syncPeerUserConversation(conversation *pbUser.Conversation, operationID string) error {
func syncPeerUserConversation(conversation *pbConversation.Conversation, operationID string) error {
peerUserConversation := db.Conversation{
OwnerUserID: conversation.UserID,
ConversationID: utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType),
@@ -178,7 +179,7 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc
func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAllConversationsReq) (*pbUser.GetAllConversationsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbUser.GetAllConversationsResp{Conversations: []*pbUser.Conversation{}}
resp := &pbUser.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := rocksCache.GetUserAllConversationList(req.OwnerUserID)
log.NewDebug(req.OperationID, "conversations: ", conversations)
if err != nil {
@@ -196,7 +197,7 @@ func (s *userServer) GetAllConversations(ctx context.Context, req *pbUser.GetAll
func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConversationReq) (*pbUser.GetConversationResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbUser.GetConversationResp{Conversation: &pbUser.Conversation{}}
resp := &pbUser.GetConversationResp{Conversation: &pbConversation.Conversation{}}
conversation, err := rocksCache.GetConversationFromCache(req.OwnerUserID, req.ConversationID)
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation)
if err != nil {
@@ -214,7 +215,7 @@ func (s *userServer) GetConversation(ctx context.Context, req *pbUser.GetConvers
func (s *userServer) GetConversations(ctx context.Context, req *pbUser.GetConversationsReq) (*pbUser.GetConversationsResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbUser.GetConversationsResp{Conversations: []*pbUser.Conversation{}}
resp := &pbUser.GetConversationsResp{Conversations: []*pbConversation.Conversation{}}
conversations, err := rocksCache.GetConversationsFromCache(req.OwnerUserID, req.ConversationIDs)
log.NewDebug("", utils.GetSelfFuncName(), "conversations", conversations)
if err != nil {
@@ -683,7 +684,7 @@ func (s *userServer) AlterUser(ctx context.Context, req *pbUser.AlterUserReq) (*
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbUser.AlterUserResp{}
user := db.User{
PhoneNumber: strconv.FormatInt(req.PhoneNumber, 10),
PhoneNumber: req.PhoneNumber,
Nickname: req.Nickname,
Email: req.Email,
UserID: req.UserId,