Files
open-im-server/internal/rpc/conversation/conversaion.go
T

228 lines
9.9 KiB
Go
Raw Normal View History

2022-04-24 11:23:54 +08:00
package conversation
import (
2022-04-24 15:55:19 +08:00
chat "Open_IM/internal/rpc/msg"
2022-04-24 11:23:54 +08:00
"Open_IM/pkg/common/constant"
2022-04-24 15:55:19 +08:00
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
2022-08-21 17:33:40 +08:00
rocksCache "Open_IM/pkg/common/db/rocks_cache"
2022-04-24 11:23:54 +08:00
"Open_IM/pkg/common/log"
2022-09-15 01:22:20 +08:00
promePkg "Open_IM/pkg/common/prometheus"
2022-04-24 11:23:54 +08:00
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbConversation "Open_IM/pkg/proto/conversation"
"Open_IM/pkg/utils"
"context"
"net"
"strconv"
"strings"
2022-09-15 16:27:36 +08:00
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
2022-04-24 11:23:54 +08:00
"Open_IM/pkg/common/config"
"google.golang.org/grpc"
)
type rpcConversation struct {
rpcPort int
rpcRegisterName string
etcdSchema string
etcdAddr []string
}
func (rpc *rpcConversation) ModifyConversationField(c context.Context, req *pbConversation.ModifyConversationFieldReq) (*pbConversation.ModifyConversationFieldResp, error) {
2022-04-24 15:55:19 +08:00
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbConversation.ModifyConversationFieldResp{}
var err error
2022-08-05 12:08:00 +08:00
isSyncConversation := true
2022-04-24 15:55:19 +08:00
if req.Conversation.ConversationType == constant.GroupChatType {
groupInfo, err := imdb.GetGroupInfoByGroupID(req.Conversation.GroupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", req.Conversation.GroupID, err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
2022-08-10 18:29:49 +08:00
if groupInfo.Status == constant.GroupStatusDismissed && !req.Conversation.IsNotInGroup && req.FieldType != constant.FieldUnread {
2022-04-24 15:55:19 +08:00
errMsg := "group status is dismissed"
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: errMsg}
return resp, nil
}
}
var conversation db.Conversation
if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "CopyStructFields failed", *req.Conversation, err.Error())
}
2022-04-25 19:56:46 +08:00
haveUserID, _ := imdb.GetExistConversationUserIDList(req.UserIDList, req.Conversation.ConversationID)
2022-04-24 15:55:19 +08:00
switch req.FieldType {
2022-04-24 17:27:26 +08:00
case constant.FieldRecvMsgOpt:
2022-04-24 15:55:19 +08:00
for _, v := range req.UserIDList {
if err = db.DB.SetSingleConversationRecvMsgOpt(v, req.Conversation.ConversationID, req.Conversation.RecvMsgOpt); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "cache failed, rpc return", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
}
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt})
2022-04-24 17:27:26 +08:00
case constant.FieldGroupAtType:
2022-04-24 15:55:19 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"group_at_type": conversation.GroupAtType})
2022-04-24 17:27:26 +08:00
case constant.FieldIsNotInGroup:
2022-04-24 15:55:19 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_not_in_group": conversation.IsNotInGroup})
2022-04-24 17:27:26 +08:00
case constant.FieldIsPinned:
2022-04-24 15:55:19 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_pinned": conversation.IsPinned})
2022-04-24 17:27:26 +08:00
case constant.FieldIsPrivateChat:
2022-04-24 15:55:19 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"is_private_chat": conversation.IsPrivateChat})
2022-04-24 17:27:26 +08:00
case constant.FieldEx:
2022-04-24 15:55:19 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"ex": conversation.Ex})
2022-04-24 17:27:26 +08:00
case constant.FieldAttachedInfo:
2022-04-24 15:55:19 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"attached_info": conversation.AttachedInfo})
2022-07-26 21:02:11 +08:00
case constant.FieldUnread:
2022-08-05 12:08:00 +08:00
isSyncConversation = false
2022-09-01 21:05:16 +08:00
err = imdb.UpdateColumnsConversations(haveUserID, req.Conversation.ConversationID, map[string]interface{}{"update_unread_count_time": conversation.UpdateUnreadCountTime})
2022-04-24 15:55:19 +08:00
}
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "UpdateColumnsConversations error", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
for _, v := range utils.DifferenceString(haveUserID, req.UserIDList) {
conversation.OwnerUserID = v
2022-08-22 20:00:05 +08:00
err = rocksCache.DelUserConversationIDListFromCache(v)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
2022-04-24 15:55:19 +08:00
err := imdb.SetOneConversation(conversation)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
}
2022-08-21 17:33:40 +08:00
2022-04-24 15:55:19 +08:00
// notification
2022-04-24 17:27:26 +08:00
if req.Conversation.ConversationType == constant.SingleChatType && req.FieldType == constant.FieldIsPrivateChat {
2022-04-24 15:55:19 +08:00
//sync peer user conversation if conversation is singleChatType
if err := syncPeerUserConversation(req.Conversation, req.OperationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "syncPeerUserConversation", err.Error())
resp.CommonResp = &pbConversation.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}
return resp, nil
}
2022-08-21 17:33:40 +08:00
2022-04-24 15:55:19 +08:00
} else {
2022-08-05 12:08:00 +08:00
if isSyncConversation {
for _, v := range req.UserIDList {
2022-08-21 17:33:40 +08:00
if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
2022-08-05 12:08:00 +08:00
chat.ConversationChangeNotification(req.OperationID, v)
}
} else {
for _, v := range req.UserIDList {
2022-08-21 17:33:40 +08:00
if err = rocksCache.DelConversationFromCache(v, req.Conversation.ConversationID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), v, req.Conversation.ConversationID, err.Error())
}
2022-09-05 19:07:16 +08:00
chat.ConversationUnreadChangeNotification(req.OperationID, v, req.Conversation.ConversationID, conversation.UpdateUnreadCountTime)
2022-08-05 12:08:00 +08:00
}
2022-04-24 15:55:19 +08:00
}
2022-08-05 12:08:00 +08:00
2022-04-24 15:55:19 +08:00
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return", resp.String())
resp.CommonResp = &pbConversation.CommonResp{}
return resp, nil
}
func syncPeerUserConversation(conversation *pbConversation.Conversation, operationID string) error {
peerUserConversation := db.Conversation{
OwnerUserID: conversation.UserID,
ConversationID: utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType),
ConversationType: constant.SingleChatType,
UserID: conversation.OwnerUserID,
GroupID: "",
RecvMsgOpt: 0,
UnreadCount: 0,
DraftTextTime: 0,
IsPinned: false,
IsPrivateChat: conversation.IsPrivateChat,
AttachedInfo: "",
Ex: "",
}
err := imdb.PeerUserSetConversation(peerUserConversation)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "SetConversation error", err.Error())
return err
}
2022-08-21 17:33:40 +08:00
err = rocksCache.DelConversationFromCache(conversation.UserID, utils.GetConversationIDBySessionType(conversation.OwnerUserID, constant.SingleChatType))
if err != nil {
2022-09-14 18:31:56 +08:00
log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error(), conversation.OwnerUserID, conversation.ConversationID)
}
err = rocksCache.DelConversationFromCache(conversation.OwnerUserID, conversation.ConversationID)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), "DelConversationFromCache failed", err.Error(), conversation.OwnerUserID, conversation.ConversationID)
2022-08-21 17:33:40 +08:00
}
2022-04-24 15:55:19 +08:00
chat.ConversationSetPrivateNotification(operationID, conversation.OwnerUserID, conversation.UserID, conversation.IsPrivateChat)
return nil
2022-04-24 11:23:54 +08:00
}
func NewRpcConversationServer(port int) *rpcConversation {
log.NewPrivateLog(constant.LogFileName)
return &rpcConversation{
rpcPort: port,
rpcRegisterName: config.Config.RpcRegisterName.OpenImConversationName,
etcdSchema: config.Config.Etcd.EtcdSchema,
etcdAddr: config.Config.Etcd.EtcdAddr,
}
}
func (rpc *rpcConversation) Run() {
log.NewInfo("0", "rpc conversation start...")
2022-05-07 17:05:05 +08:00
listenIP := ""
if config.Config.ListenIP == "" {
listenIP = "0.0.0.0"
} else {
listenIP = config.Config.ListenIP
}
address := listenIP + ":" + strconv.Itoa(rpc.rpcPort)
2022-04-24 11:23:54 +08:00
listener, err := net.Listen("tcp", address)
if err != nil {
2022-05-10 09:09:37 +08:00
panic("listening err:" + err.Error() + rpc.rpcRegisterName)
2022-04-24 11:23:54 +08:00
}
log.NewInfo("0", "listen network success, ", address, listener)
//grpc server
2022-09-15 01:22:20 +08:00
var grpcOpts []grpc.ServerOption
if config.Config.Prometheus.Enable {
2022-09-15 08:45:10 +08:00
promePkg.NewGrpcRequestCounter()
promePkg.NewGrpcRequestFailedCounter()
promePkg.NewGrpcRequestSuccessCounter()
2022-09-15 16:27:36 +08:00
grpcOpts = append(grpcOpts, []grpc.ServerOption{
2022-09-15 16:39:49 +08:00
// grpc.UnaryInterceptor(promePkg.UnaryServerInterceptorProme),
2022-09-15 16:27:36 +08:00
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcPrometheus.UnaryServerInterceptor),
}...)
2022-09-15 01:22:20 +08:00
}
srv := grpc.NewServer(grpcOpts...)
2022-04-24 11:23:54 +08:00
defer srv.GracefulStop()
//service registers with etcd
pbConversation.RegisterConversationServer(srv, rpc)
2022-06-23 09:24:05 +08:00
rpcRegisterIP := config.Config.RpcRegisterIP
2022-05-07 17:05:05 +08:00
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
}
}
2022-05-29 19:44:22 +08:00
log.NewInfo("", "rpcRegisterIP", rpcRegisterIP)
2022-05-07 17:05:05 +08:00
err = getcdv3.RegisterEtcd(rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName, 10)
2022-04-24 11:23:54 +08:00
if err != nil {
log.NewError("0", "RegisterEtcd failed ", err.Error(),
2022-05-07 17:05:05 +08:00
rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
2022-08-26 17:41:58 +08:00
panic(utils.Wrap(err, "register conversation module rpc to etcd err"))
2022-04-24 11:23:54 +08:00
}
2022-05-07 17:05:05 +08:00
log.NewInfo("0", "RegisterConversationServer ok ", rpc.etcdSchema, strings.Join(rpc.etcdAddr, ","), rpcRegisterIP, rpc.rpcPort, rpc.rpcRegisterName)
2022-04-24 11:23:54 +08:00
err = srv.Serve(listener)
if err != nil {
log.NewError("0", "Serve failed ", err.Error())
return
}
log.NewInfo("0", "rpc conversation ok")
}