Merge branch 'tuoyun' of github.com:OpenIMSDK/Open-IM-Server into tuoyun

This commit is contained in:
wangchuxiao
2022-04-21 10:43:46 +08:00
12 changed files with 279 additions and 201 deletions
+5 -4
View File
@@ -23,8 +23,9 @@ func UserRegister(c *gin.Context) {
}
if params.Secret != config.Config.Secret {
log.NewError(params.OperationID, "params.Secret != config.Config.Secret", params.Secret, config.Config.Secret)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
errMsg := " params.Secret != config.Config.Secret "
log.NewError(params.OperationID, errMsg, params.Secret, config.Config.Secret)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": errMsg})
return
}
req := &rpc.UserRegisterReq{UserInfo: &open_im_sdk.UserInfo{}}
@@ -36,7 +37,7 @@ func UserRegister(c *gin.Context) {
client := rpc.NewAuthClient(etcdConn)
reply, err := client.UserRegister(context.Background(), req)
if err != nil {
log.NewError(req.OperationID, "call rpc err ", err)
log.NewError(req.OperationID, "call rpc err ", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "internal service err"})
return
}
@@ -86,6 +87,6 @@ func UserToken(c *gin.Context) {
}
resp := api.UserTokenResp{CommResp: api.CommResp{ErrCode: reply.CommonResp.ErrCode, ErrMsg: reply.CommonResp.ErrMsg},
UserToken: api.UserTokenInfo{UserID: req.FromUserID, Token: reply.Token, ExpiredTime: reply.ExpiredTime}}
log.NewInfo(req.OperationID, "UserRegister return ", resp)
log.NewInfo(req.OperationID, "UserToken return ", resp)
c.JSON(http.StatusOK, resp)
}
+6 -2
View File
@@ -2,8 +2,8 @@ package logic
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/kafka"
"fmt"
)
var (
@@ -20,6 +20,10 @@ func Init() {
}
func Run() {
//register mysqlConsumerHandler to
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
if config.Config.ChatPersistenceMysql {
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
} else {
fmt.Println("not start mysql consumer")
}
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
}
+40 -1
View File
@@ -13,6 +13,7 @@ import (
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbGroup "Open_IM/pkg/proto/group"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
pbUser "Open_IM/pkg/proto/user"
"Open_IM/pkg/utils"
"context"
"net"
@@ -576,6 +577,22 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq)
}
chat.MemberQuitNotification(req)
//modify quitter conversation info
var reqPb pbUser.SetConversationReq
reqPb.OperationID = req.OperationID
reqPb.Conversation.OwnerUserID = req.OpUserID
reqPb.Conversation.ConversationID = utils.GetConversationIDBySessionType(req.OpUserID, constant.GroupChatType)
reqPb.Conversation.ConversationType = constant.GroupChatType
reqPb.Conversation.GroupID = req.GroupID
reqPb.Conversation.IsNotInGroup = true
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error())
} else {
log.NewDebug(req.OpUserID, utils.GetSelfFuncName(), respPb.String())
}
log.NewInfo(req.OperationID, "rpc QuitGroup return ", pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}})
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
}
@@ -988,13 +1005,35 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
chat.GroupDismissedNotification(req)
memberList, err := imdb.GetGroupMemberListByGroupID(req.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupMemberListByGroupID failed,", err.Error(), req.GroupID)
}
//modify quitter conversation info
var reqPb pbUser.SetConversationReq
for _, v := range memberList {
reqPb.OperationID = req.OperationID
reqPb.Conversation.OwnerUserID = v.UserID
reqPb.Conversation.ConversationID = utils.GetConversationIDBySessionType(v.UserID, constant.GroupChatType)
reqPb.Conversation.ConversationType = constant.GroupChatType
reqPb.Conversation.GroupID = req.GroupID
reqPb.Conversation.IsNotInGroup = true
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImUserName)
client := pbUser.NewUserClient(etcdConn)
respPb, err := client.SetConversation(context.Background(), &reqPb)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation rpc failed, ", reqPb.String(), err.Error(), v.UserID)
} else {
log.NewDebug(req.OpUserID, utils.GetSelfFuncName(), respPb.String(), v.UserID)
}
}
err = imdb.DeleteGroupMemberByGroupID(req.GroupID)
if err != nil {
log.NewError(req.OperationID, "DeleteGroupMemberByGroupID failed ", req.GroupID)
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
}
+2 -1
View File
@@ -75,7 +75,7 @@ func (s *userServer) Run() {
func syncPeerUserConversation(conversation *pbUser.Conversation, operationID string) error {
peerUserConversation := db.Conversation{
OwnerUserID: conversation.UserID,
ConversationID: "single_" + conversation.OwnerUserID,
ConversationID: utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType),
ConversationType: constant.SingleChatType,
UserID: conversation.OwnerUserID,
GroupID: "",
@@ -129,6 +129,7 @@ func (s *userServer) BatchSetConversations(ctx context.Context, req *pbUser.Batc
if err := utils.CopyStructFields(&conversation, v); err != nil {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), v.String(), "CopyStructFields failed", err.Error())
}
//redis op
if err := db.DB.SetSingleConversationRecvMsgOpt(req.OwnerUserID, v.ConversationID, v.RecvMsgOpt); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error())
resp.CommonResp = &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}