rocks cache

This commit is contained in:
wangchuxiao
2022-07-14 12:08:28 +08:00
parent 9e0e2fe124
commit b064244907
34 changed files with 1103 additions and 2447 deletions
+24 -230
View File
@@ -3,20 +3,16 @@ package cache
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbCache "Open_IM/pkg/proto/cache"
commonPb "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"context"
"google.golang.org/grpc"
"net"
"strconv"
"strings"
"sync"
"google.golang.org/grpc"
)
type cacheServer struct {
@@ -54,13 +50,6 @@ func (s *cacheServer) Run() {
defer listener.Close()
//grpc server
//to cache
err = SyncDB2Cache()
if err != nil {
log.NewError("", err.Error(), "db to cache failed")
panic(err.Error())
}
srv := grpc.NewServer()
defer srv.GracefulStop()
pbCache.RegisterCacheServer(srv, s)
@@ -86,167 +75,10 @@ func (s *cacheServer) Run() {
log.NewInfo("0", "message cms rpc success")
}
func SyncDB2Cache() error {
var err error
log.NewInfo("0", utils.GetSelfFuncName())
userList, err := imdb.GetAllUser()
log.NewDebug("", utils.GetSelfFuncName(), "userList", userList)
if err != nil {
return utils.Wrap(err, "")
}
//err = updateAllUserToCache(userList)
wg := &sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
err = updateAllFriendToCache(userList)
}()
go func() {
defer wg.Done()
err = updateAllBlackListToCache(userList)
}()
go func() {
defer wg.Done()
err = updateAllGroupMemberListToCache()
}()
wg.Wait()
return utils.Wrap(err, "")
}
func DelRelationCache() {}
func updateAllUserToCache(userList []db.User) error {
wg := &sync.WaitGroup{}
wg.Add(len(userList))
for _, userInfo := range userList {
go func() {
defer wg.Done()
userInfoPb := &commonPb.UserInfo{
UserID: userInfo.UserID,
Nickname: userInfo.Nickname,
FaceURL: userInfo.FaceURL,
Gender: userInfo.Gender,
PhoneNumber: userInfo.PhoneNumber,
Birth: uint32(userInfo.Birth.Unix()),
Email: userInfo.Email,
Ex: userInfo.Ex,
CreateTime: uint32(userInfo.CreateTime.Unix()),
AppMangerLevel: userInfo.AppMangerLevel,
}
m, err := utils.Pb2Map(userInfoPb)
if err != nil {
log.NewWarn("", utils.GetSelfFuncName(), err.Error())
}
if err := db.DB.SetUserInfoToCache(userInfo.UserID, m); err != nil {
log.NewWarn("0", utils.GetSelfFuncName(), "set userInfo to cache failed", err.Error())
}
}()
}
wg.Wait()
log.NewInfo("0", utils.GetSelfFuncName(), "ok")
return nil
}
func updateAllGroupMemberListToCache() error {
log.NewInfo("0", utils.GetSelfFuncName())
groupIDList, err := imdb.GetAllGroupIDList()
if err != nil {
log.NewWarn("0", utils.GetSelfFuncName(), "getAllGroupIDList failed", err.Error())
panic(err.Error())
}
for _, groupID := range groupIDList {
groupMemberIDList, err := imdb.GetGroupMemberIDListByGroupID(groupID)
if err != nil {
log.NewWarn("", utils.GetSelfFuncName(), "GetGroupMemberIDListByGroupID", err.Error())
continue
}
if len(groupMemberIDList) > 0 {
if err := db.DB.AddGroupMemberToCache(groupID, groupMemberIDList...); err != nil {
log.NewWarn("", utils.GetSelfFuncName(), "AddGroupMemberToCache", err.Error())
}
}
}
log.NewInfo("0", utils.GetSelfFuncName(), "ok")
return nil
}
func updateAllFriendToCache(userList []db.User) error {
log.NewInfo("0", utils.GetSelfFuncName())
for _, user := range userList {
friendIDList, err := imdb.GetFriendIDListByUserID(user.UserID)
if err != nil {
log.NewWarn("0", utils.GetSelfFuncName(), err.Error())
continue
}
if len(friendIDList) > 0 {
if err := db.DB.AddFriendToCache(user.UserID, friendIDList...); err != nil {
log.NewWarn("0", utils.GetSelfFuncName(), err.Error(), friendIDList, user.UserID)
}
}
}
log.NewInfo("0", utils.GetSelfFuncName(), "ok")
return nil
}
func updateAllBlackListToCache(userList []db.User) error {
log.NewInfo("0", utils.GetSelfFuncName())
for _, user := range userList {
blackIDList, err := imdb.GetBlackIDListByUserID(user.UserID)
if err != nil {
log.NewWarn("", utils.GetSelfFuncName(), err.Error())
continue
}
if len(blackIDList) > 0 {
if err := db.DB.AddBlackUserToCache(user.UserID, blackIDList...); err != nil {
log.NewWarn("0", utils.GetSelfFuncName(), err.Error())
}
}
}
log.NewInfo("0", utils.GetSelfFuncName(), "ok")
return nil
}
func (s *cacheServer) GetUserInfoFromCache(_ context.Context, req *pbCache.GetUserInfoFromCacheReq) (resp *pbCache.GetUserInfoFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.GetUserInfoFromCacheResp{
CommonResp: &pbCache.CommonResp{},
}
for _, userID := range req.UserIDList {
userInfo, err := db.DB.GetUserInfoFromCache(userID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "get userInfo from cache failed", err.Error())
continue
}
resp.UserInfoList = append(resp.UserInfoList, userInfo)
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) UpdateUserInfoToCache(_ context.Context, req *pbCache.UpdateUserInfoToCacheReq) (resp *pbCache.UpdateUserInfoToCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.UpdateUserInfoToCacheResp{
CommonResp: &pbCache.CommonResp{},
}
for _, userInfo := range req.UserInfoList {
m, err := utils.Pb2Map(userInfo)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), *userInfo)
}
if err := db.DB.SetUserInfoToCache(userInfo.UserID, m); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "set userInfo to cache failed", err.Error())
}
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) GetFriendIDListFromCache(_ context.Context, req *pbCache.GetFriendIDListFromCacheReq) (resp *pbCache.GetFriendIDListFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.GetFriendIDListFromCacheResp{CommonResp: &pbCache.CommonResp{}}
friendIDList, err := db.DB.GetFriendIDListFromCache(req.UserID)
friendIDList, err := rocksCache.GetFriendIDListFromCache(req.UserID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetFriendIDListFromCache", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
@@ -259,36 +91,24 @@ func (s *cacheServer) GetFriendIDListFromCache(_ context.Context, req *pbCache.G
return resp, nil
}
func (s *cacheServer) AddFriendToCache(_ context.Context, req *pbCache.AddFriendToCacheReq) (resp *pbCache.AddFriendToCacheResp, err error) {
// this is for dtm call
func (s *cacheServer) DelFriendIDListFromCache(_ context.Context, req *pbCache.DelFriendIDListFromCacheReq) (resp *pbCache.DelFriendIDListFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.AddFriendToCacheResp{CommonResp: &pbCache.CommonResp{}}
if err := db.DB.AddFriendToCache(req.UserID, req.FriendID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", err.Error())
resp = &pbCache.DelFriendIDListFromCacheResp{}
if err := rocksCache.DelFriendIDListFromCache(req.UserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "args: ", req.UserID, err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) ReduceFriendFromCache(_ context.Context, req *pbCache.ReduceFriendFromCacheReq) (resp *pbCache.ReduceFriendFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.ReduceFriendFromCacheResp{CommonResp: &pbCache.CommonResp{}}
if err := db.DB.ReduceFriendToCache(req.UserID, req.FriendID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", resp.String())
return resp, nil
}
func (s *cacheServer) GetBlackIDListFromCache(_ context.Context, req *pbCache.GetBlackIDListFromCacheReq) (resp *pbCache.GetBlackIDListFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.GetBlackIDListFromCacheResp{CommonResp: &pbCache.CommonResp{}}
blackUserIDList, err := db.DB.GetBlackListFromCache(req.UserID)
blackUserIDList, err := rocksCache.GetBlackListFromCache(req.UserID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
@@ -301,29 +121,16 @@ func (s *cacheServer) GetBlackIDListFromCache(_ context.Context, req *pbCache.Ge
return resp, nil
}
func (s *cacheServer) AddBlackUserToCache(_ context.Context, req *pbCache.AddBlackUserToCacheReq) (resp *pbCache.AddBlackUserToCacheResp, err error) {
func (s *cacheServer) DelBlackIDListFromCache(_ context.Context, req *pbCache.DelBlackIDListFromCacheReq) (resp *pbCache.DelBlackIDListFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.AddBlackUserToCacheResp{CommonResp: &pbCache.CommonResp{}}
if err := db.DB.AddBlackUserToCache(req.UserID, req.BlackUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
resp = &pbCache.DelBlackIDListFromCacheResp{}
if err := rocksCache.DelBlackIDListFromCache(req.UserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "args: ", req.UserID, err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) ReduceBlackUserFromCache(_ context.Context, req *pbCache.ReduceBlackUserFromCacheReq) (resp *pbCache.ReduceBlackUserFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.ReduceBlackUserFromCacheResp{CommonResp: &pbCache.CommonResp{}}
if err := db.DB.ReduceBlackUserFromCache(req.UserID, req.BlackUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", resp.String())
return resp, nil
}
@@ -332,7 +139,7 @@ func (s *cacheServer) GetGroupMemberIDListFromCache(_ context.Context, req *pbCa
resp = &pbCache.GetGroupMemberIDListFromCacheResp{
CommonResp: &pbCache.CommonResp{},
}
userIDList, err := db.DB.GetGroupMemberIDListFromCache(req.GroupID)
userIDList, err := rocksCache.GetGroupMemberIDListFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupMemberIDListFromCache failed", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
@@ -344,28 +151,15 @@ func (s *cacheServer) GetGroupMemberIDListFromCache(_ context.Context, req *pbCa
return resp, nil
}
func (s *cacheServer) AddGroupMemberToCache(_ context.Context, req *pbCache.AddGroupMemberToCacheReq) (resp *pbCache.AddGroupMemberToCacheResp, err error) {
func (s *cacheServer) DelGroupMemberIDListFromCache(_ context.Context, req *pbCache.DelGroupMemberIDListFromCacheReq) (resp *pbCache.DelGroupMemberIDListFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.AddGroupMemberToCacheResp{CommonResp: &pbCache.CommonResp{}}
if err := db.DB.AddGroupMemberToCache(req.GroupID, req.UserIDList...); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "AddGroupMemberToCache failed", err.Error())
resp = &pbCache.DelGroupMemberIDListFromCacheResp{}
if err := rocksCache.DelGroupMemberIDListFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "args: ", req.GroupID, err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *cacheServer) ReduceGroupMemberFromCache(_ context.Context, req *pbCache.ReduceGroupMemberFromCacheReq) (resp *pbCache.ReduceGroupMemberFromCacheResp, err error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp = &pbCache.ReduceGroupMemberFromCacheResp{CommonResp: &pbCache.CommonResp{}}
if err := db.DB.ReduceGroupMemberFromCache(req.GroupID, req.UserIDList...); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "ReduceGroupMemberFromCache failed", err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg
return resp, nil
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", resp.String())
return resp, nil
}
@@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
cp "Open_IM/pkg/common/utils"
@@ -95,10 +96,10 @@ func (s *friendServer) AddBlacklist(ctx context.Context, req *pbFriend.AddBlackl
err := imdb.InsertInToUserBlackList(black)
if err != nil {
log.NewError(req.CommID.OperationID, "InsertInToUserBlackList failed ", err.Error())
return &pbFriend.AddBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
return &pbFriend.AddBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}}, nil
}
log.NewInfo(req.CommID.OperationID, "AddBlacklist rpc ok ", req.CommID.FromUserID, req.CommID.ToUserID)
reqAddBlackUserToCache := &pbCache.AddBlackUserToCacheReq{UserID: req.CommID.FromUserID, BlackUserID: req.CommID.ToUserID, OperationID: req.CommID.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.CommID.OperationID)
if etcdConn == nil {
errMsg := req.CommID.OperationID + "getcdv3.GetConn == nil"
@@ -106,15 +107,16 @@ func (s *friendServer) AddBlacklist(ctx context.Context, req *pbFriend.AddBlackl
return &pbFriend.AddBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.AddBlackUserToCache(context.Background(), reqAddBlackUserToCache)
cacheResp, err := cacheClient.DelBlackIDListFromCache(context.Background(), &pbCache.DelBlackIDListFromCacheReq{UserID: req.CommID.FromUserID, OperationID: req.CommID.OperationID})
if err != nil {
log.NewError(req.CommID.OperationID, "AddBlackUserToCache rpc call failed ", err.Error())
log.NewError(req.CommID.OperationID, "DelBlackIDListFromCache rpc call failed ", err.Error())
return &pbFriend.AddBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: 500, ErrMsg: "AddBlackUserToCache rpc call failed"}}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.CommID.OperationID, "AddBlackUserToCache rpc logic call failed ", cacheResp.String())
log.NewError(req.CommID.OperationID, "DelBlackIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbFriend.AddBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}, nil
}
chat.BlackAddedNotification(req)
return &pbFriend.AddBlacklistResp{CommonResp: &pbFriend.CommonResp{}}, nil
}
@@ -208,6 +210,30 @@ func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFri
}
}
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
resp.CommonResp.ErrMsg = errMsg
resp.CommonResp.ErrCode = 500
return &resp, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.DelFriendIDListFromCache(context.Background(), &pbCache.DelFriendIDListFromCacheReq{UserID: req.FromUserID, OperationID: req.OperationID})
if err != nil {
log.NewError(req.OperationID, "DelBlackIDListFromCache rpc call failed ", err.Error())
resp.CommonResp.ErrCode = 500
resp.CommonResp.ErrMsg = err.Error()
return &resp, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "DelBlackIDListFromCache rpc logic call failed ", cacheResp.String())
resp.CommonResp.ErrCode = 500
resp.CommonResp.ErrMsg = cacheResp.CommonResp.ErrMsg
return &resp, nil
}
resp.CommonResp.ErrCode = 0
log.NewInfo(req.OperationID, "ImportFriend rpc ok ", resp.String())
return &resp, nil
@@ -267,7 +293,7 @@ func (s *friendServer) AddFriendResponse(ctx context.Context, req *pbFriend.AddF
return &pbFriend.AddFriendResponseResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
// cache rpc
addFriendToCacheReq := &pbCache.AddFriendToCacheReq{OperationID: req.CommID.OperationID}
delFriendIDListFromCacheReq := &pbCache.DelFriendIDListFromCacheReq{OperationID: req.CommID.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.CommID.OperationID)
if etcdConn == nil {
errMsg := req.CommID.OperationID + "getcdv3.GetConn == nil"
@@ -275,29 +301,26 @@ func (s *friendServer) AddFriendResponse(ctx context.Context, req *pbFriend.AddF
return &pbFriend.AddFriendResponseResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}}, nil
}
client := pbCache.NewCacheClient(etcdConn)
addFriendToCacheReq.UserID = req.CommID.ToUserID
addFriendToCacheReq.FriendID = req.CommID.FromUserID
respPb, err := client.AddFriendToCache(context.Background(), addFriendToCacheReq)
delFriendIDListFromCacheReq.UserID = req.CommID.ToUserID
respPb, err := client.DelFriendIDListFromCache(context.Background(), delFriendIDListFromCacheReq)
if err != nil {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", err.Error(), addFriendToCacheReq.String())
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "DelFriendIDListFromCache failed", err.Error())
return &pbFriend.AddFriendResponseResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrServer.ErrCode, ErrMsg: constant.ErrServer.ErrMsg}}, nil
}
if respPb.CommonResp.ErrCode != 0 {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", addFriendToCacheReq.String())
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "DelFriendIDListFromCache failed", respPb.CommonResp.String())
return &pbFriend.AddFriendResponseResp{CommonResp: &pbFriend.CommonResp{ErrCode: respPb.CommonResp.ErrCode, ErrMsg: respPb.CommonResp.ErrMsg}}, nil
}
addFriendToCacheReq.UserID = req.CommID.FromUserID
addFriendToCacheReq.FriendID = req.CommID.ToUserID
respPb, err = client.AddFriendToCache(context.Background(), addFriendToCacheReq)
delFriendIDListFromCacheReq.UserID = req.CommID.FromUserID
respPb, err = client.DelFriendIDListFromCache(context.Background(), delFriendIDListFromCacheReq)
if err != nil {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", err.Error(), addFriendToCacheReq.String())
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "DelFriendIDListFromCache failed", err.Error())
return &pbFriend.AddFriendResponseResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrServer.ErrCode, ErrMsg: constant.ErrServer.ErrMsg}}, nil
}
if respPb.CommonResp.ErrCode != 0 {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed", addFriendToCacheReq.String())
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "DelFriendIDListFromCache failed", respPb.CommonResp.String())
return &pbFriend.AddFriendResponseResp{CommonResp: &pbFriend.CommonResp{ErrCode: respPb.CommonResp.ErrCode, ErrMsg: respPb.CommonResp.ErrMsg}}, nil
}
chat.FriendAddedNotification(req.CommID.OperationID, req.CommID.OpUserID, req.CommID.FromUserID, req.CommID.ToUserID)
}
}
@@ -327,7 +350,7 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri
return &pbFriend.DeleteFriendResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}}, nil
}
log.NewInfo(req.CommID.OperationID, "DeleteFriend rpc ok")
reduceFriendFromCache := &pbCache.ReduceFriendFromCacheReq{OperationID: req.CommID.OperationID, UserID: req.CommID.FromUserID, FriendID: req.CommID.ToUserID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.CommID.OperationID)
if etcdConn == nil {
errMsg := req.CommID.OperationID + "getcdv3.GetConn == nil"
@@ -335,13 +358,13 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri
return &pbFriend.DeleteFriendResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}}, nil
}
client := pbCache.NewCacheClient(etcdConn)
respPb, err := client.ReduceFriendFromCache(context.Background(), reduceFriendFromCache)
respPb, err := client.DelFriendIDListFromCache(context.Background(), &pbCache.DelFriendIDListFromCacheReq{OperationID: req.CommID.OperationID, UserID: req.CommID.FromUserID})
if err != nil {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "AddFriendToCache rpc failed", err.Error())
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "DelFriendIDListFromCache rpc failed", err.Error())
return &pbFriend.DeleteFriendResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrServer.ErrCode, ErrMsg: constant.ErrServer.ErrMsg}}, nil
}
if respPb.CommonResp.ErrCode != 0 {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "AddFriendToCache failed")
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "DelFriendIDListFromCache failed", respPb.CommonResp.String())
return &pbFriend.DeleteFriendResp{CommonResp: &pbFriend.CommonResp{ErrCode: respPb.CommonResp.ErrCode, ErrMsg: respPb.CommonResp.ErrMsg}}, nil
}
chat.FriendDeletedNotification(req)
@@ -351,13 +374,12 @@ func (s *friendServer) DeleteFriend(ctx context.Context, req *pbFriend.DeleteFri
func (s *friendServer) GetBlacklist(ctx context.Context, req *pbFriend.GetBlacklistReq) (*pbFriend.GetBlacklistResp, error) {
log.NewInfo(req.CommID.OperationID, "GetBlacklist args ", req.String())
//Parse token, to find current user information
if !token_verify.CheckAccess(req.CommID.OpUserID, req.CommID.FromUserID) {
log.NewError(req.CommID.OperationID, "CheckAccess false ", req.CommID.OpUserID, req.CommID.FromUserID)
return &pbFriend.GetBlacklistResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
blackListInfo, err := imdb.GetBlackListByUserID(req.CommID.FromUserID)
blackIDList, err := rocksCache.GetBlackListFromCache(req.CommID.FromUserID)
if err != nil {
log.NewError(req.CommID.OperationID, "GetBlackListByUID failed ", err.Error(), req.CommID.FromUserID)
return &pbFriend.GetBlacklistResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
@@ -366,15 +388,14 @@ func (s *friendServer) GetBlacklist(ctx context.Context, req *pbFriend.GetBlackl
var (
userInfoList []*sdkws.PublicUserInfo
)
for _, blackUser := range blackListInfo {
var blackUserInfo sdkws.PublicUserInfo
//Find black user information
us, err := imdb.GetUserByUserID(blackUser.BlockUserID)
for _, userID := range blackIDList {
user, err := rocksCache.GetUserInfoFromCache(userID)
if err != nil {
log.NewError(req.CommID.OperationID, "GetUserByUserID failed ", err.Error(), blackUser.BlockUserID)
log.NewError(req.CommID.OperationID, "GetUserByUserID failed ", err.Error(), userID)
continue
}
utils.CopyStructFields(&blackUserInfo, us)
var blackUserInfo sdkws.PublicUserInfo
utils.CopyStructFields(&blackUserInfo, user)
userInfoList = append(userInfoList, &blackUserInfo)
}
log.NewInfo(req.CommID.OperationID, "rpc GetBlacklist ok ", pbFriend.GetBlacklistResp{BlackUserInfoList: userInfoList})
@@ -391,10 +412,15 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *pbFriend.SetFri
err := imdb.UpdateFriendComment(req.CommID.FromUserID, req.CommID.ToUserID, req.Remark)
if err != nil {
log.NewError(req.CommID.OperationID, "UpdateFriendComment failed ", req.CommID.FromUserID, req.CommID.ToUserID, req.Remark)
log.NewError(req.CommID.OperationID, "UpdateFriendComment failed ", req.CommID.FromUserID, req.CommID.ToUserID, req.Remark, err.Error())
return &pbFriend.SetFriendRemarkResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
log.NewInfo(req.CommID.OperationID, "rpc SetFriendComment ok")
err = rocksCache.DelAllFriendsInfoFromCache(req.CommID.FromUserID)
if err != nil {
log.NewError(req.CommID.OperationID, "DelAllFriendInfoFromCache failed ", req.CommID.FromUserID, err.Error())
return &pbFriend.SetFriendRemarkResp{CommonResp: &pbFriend.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
chat.FriendRemarkSetNotification(req.CommID.OperationID, req.CommID.OpUserID, req.CommID.FromUserID, req.CommID.ToUserID)
return &pbFriend.SetFriendRemarkResp{CommonResp: &pbFriend.CommonResp{}}, nil
}
@@ -413,7 +439,7 @@ func (s *friendServer) RemoveBlacklist(ctx context.Context, req *pbFriend.Remove
}
log.NewInfo(req.CommID.OperationID, "rpc RemoveBlacklist ok ")
reqReduceBlackUserFromCache := &pbCache.ReduceBlackUserFromCacheReq{UserID: req.CommID.FromUserID, BlackUserID: req.CommID.ToUserID, OperationID: req.CommID.OperationID}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.CommID.OperationID)
if etcdConn == nil {
errMsg := req.CommID.OperationID + "getcdv3.GetConn == nil"
@@ -422,13 +448,13 @@ func (s *friendServer) RemoveBlacklist(ctx context.Context, req *pbFriend.Remove
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.ReduceBlackUserFromCache(context.Background(), reqReduceBlackUserFromCache)
cacheResp, err := cacheClient.DelBlackIDListFromCache(context.Background(), &pbCache.DelBlackIDListFromCacheReq{UserID: req.CommID.FromUserID, OperationID: req.CommID.OperationID})
if err != nil {
log.NewError(req.CommID.OperationID, "ReduceBlackUserFromCache rpc call failed ", err.Error())
log.NewError(req.CommID.OperationID, "DelBlackIDListFromCache rpc call failed ", err.Error())
return &pbFriend.RemoveBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: 500, ErrMsg: "ReduceBlackUserFromCache rpc call failed"}}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.CommID.OperationID, "ReduceBlackUserFromCache rpc logic call failed ", cacheResp.String())
log.NewError(req.CommID.OperationID, "DelBlackIDListFromCache rpc logic call failed ", cacheResp.CommonResp.String())
return &pbFriend.RemoveBlacklistResp{CommonResp: &pbFriend.CommonResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}, nil
}
chat.BlackDeletedNotification(req)
@@ -441,10 +467,13 @@ func (s *friendServer) IsInBlackList(ctx context.Context, req *pbFriend.IsInBlac
log.NewError(req.CommID.OperationID, "CheckAccess false ", req.CommID.OpUserID, req.CommID.FromUserID)
return &pbFriend.IsInBlackListResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
var isInBlacklist = false
err := imdb.CheckBlack(req.CommID.FromUserID, req.CommID.ToUserID)
if err == nil {
blackIDList, err := rocksCache.GetBlackListFromCache(req.CommID.FromUserID)
if err != nil {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), err.Error(), req.CommID.FromUserID)
return &pbFriend.IsInBlackListResp{ErrMsg: err.Error(), ErrCode: constant.ErrDB.ErrCode}, nil
}
var isInBlacklist bool
if utils.IsContain(req.CommID.ToUserID, blackIDList) {
isInBlacklist = true
}
log.NewInfo(req.CommID.OperationID, "IsInBlackList rpc ok ", pbFriend.IsInBlackListResp{Response: isInBlacklist})
@@ -453,16 +482,18 @@ func (s *friendServer) IsInBlackList(ctx context.Context, req *pbFriend.IsInBlac
func (s *friendServer) IsFriend(ctx context.Context, req *pbFriend.IsFriendReq) (*pbFriend.IsFriendResp, error) {
log.NewInfo(req.CommID.OperationID, req.String())
var isFriend bool
if !token_verify.CheckAccess(req.CommID.OpUserID, req.CommID.FromUserID) {
log.NewError(req.CommID.OperationID, "CheckAccess false ", req.CommID.OpUserID, req.CommID.FromUserID)
return &pbFriend.IsFriendResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
_, err := imdb.GetFriendRelationshipFromFriend(req.CommID.FromUserID, req.CommID.ToUserID)
if err == nil {
friendIDList, err := rocksCache.GetFriendIDListFromCache(req.CommID.FromUserID)
if err != nil {
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), err.Error(), req.CommID.FromUserID)
return &pbFriend.IsFriendResp{ErrMsg: err.Error(), ErrCode: constant.ErrDB.ErrCode}, nil
}
var isFriend bool
if utils.IsContain(req.CommID.ToUserID, friendIDList) {
isFriend = true
} else {
isFriend = false
}
log.NewInfo(req.CommID.OperationID, pbFriend.IsFriendResp{Response: isFriend})
return &pbFriend.IsFriendResp{Response: isFriend}, nil
@@ -474,16 +505,16 @@ func (s *friendServer) GetFriendList(ctx context.Context, req *pbFriend.GetFrien
log.NewError(req.CommID.OperationID, "CheckAccess false ", req.CommID.OpUserID, req.CommID.FromUserID)
return &pbFriend.GetFriendListResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
friends, err := imdb.GetFriendListByUserID(req.CommID.FromUserID)
friendList, err := rocksCache.GetAllFriendsInfoFromCache(req.CommID.FromUserID)
if err != nil {
log.NewError(req.CommID.OperationID, "FindUserInfoFromFriend failed ", err.Error(), req.CommID.FromUserID)
log.NewError(req.CommID.OperationID, utils.GetSelfFuncName(), "GetFriendIDListFromCache failed", err.Error(), req.CommID.FromUserID)
return &pbFriend.GetFriendListResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
var userInfoList []*sdkws.FriendInfo
for _, friendUser := range friends {
for _, friendUser := range friendList {
friendUserInfo := sdkws.FriendInfo{FriendUser: &sdkws.UserInfo{}}
cp.FriendDBCopyOpenIM(&friendUserInfo, &friendUser)
cp.FriendDBCopyOpenIM(&friendUserInfo, friendUser)
log.NewDebug(req.CommID.OperationID, "friends : ", friendUser, "openim friends: ", friendUserInfo)
userInfoList = append(userInfoList, &friendUserInfo)
}
+198 -128
View File
@@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
"Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
@@ -159,7 +160,8 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
return resp, nil
}
utils.CopyStructFields(resp.GroupInfo, group)
resp.GroupInfo.MemberCount, err = imdb.GetGroupMemberNumByGroupID(groupId)
memberCount, err := imdb.GetGroupMemberNumByGroupID(groupId)
resp.GroupInfo.MemberCount = uint32(memberCount)
if err != nil {
log.NewError(req.OperationID, "GetGroupMemberNumByGroupID failed ", err.Error(), groupId)
resp.ErrCode = constant.ErrDB.ErrCode
@@ -182,28 +184,6 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
}
if len(okUserIDList) != 0 {
addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{
UserIDList: okUserIDList,
GroupID: groupId,
OperationID: req.OperationID,
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq)
if err != nil {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc call failed ", err.Error())
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String())
return &pbGroup.CreateGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
log.NewInfo(req.OperationID, "rpc CreateGroup return ", resp.String())
if req.GroupInfo.GroupType != constant.SuperGroup {
chat.GroupCreatedNotification(req.OperationID, req.OpUserID, groupId, okUserIDList)
@@ -227,10 +207,10 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo
log.NewError(req.OperationID, "CheckAccess false ", req.OpUserID, req.FromUserID)
return &pbGroup.GetJoinedGroupListResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}, nil
}
//group list
joinedGroupList, err := imdb.GetJoinedGroupIDListByUserID(req.FromUserID)
joinedGroupList, err := rocksCache.GetJoinedGroupIDListFromCache(req.FromUserID)
if err != nil {
log.NewError(req.OperationID, "GetJoinedGroupIDListByUserID failed ", err.Error(), req.FromUserID)
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetJoinedGroupIDListFromCache failed", err.Error(), req.FromUserID)
return &pbGroup.GetJoinedGroupListResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
@@ -238,8 +218,14 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo
for _, v := range joinedGroupList {
var groupNode open_im_sdk.GroupInfo
num, err := imdb.GetGroupMemberNumByGroupID(v)
owner, err2 := imdb.GetGroupOwnerInfoByGroupID(v)
group, err := imdb.GetGroupInfoByGroupID(v)
groupOwnerID, err2 := rocksCache.GetGroupOwnerFromCache(v)
owner, err2 := rocksCache.GetGroupMemberInfoFromCache(v, groupOwnerID)
//owner, err2 := imdb.GetGroupOwnerInfoByGroupID(v)
group, err := rocksCache.GetGroupInfoFromCache(v)
//group, err := imdb.GetGroupInfoByGroupID(v)
if num > 0 && owner != nil && err2 == nil && group != nil && err == nil {
utils.CopyStructFields(&groupNode, group)
groupNode.CreateTime = uint32(group.CreateTime.Unix())
@@ -416,11 +402,7 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
}
}
addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{
UserIDList: okUserIDList,
GroupID: req.GroupID,
OperationID: req.OperationID,
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
@@ -428,17 +410,23 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupID,
OperationID: req.OperationID,
})
if err != nil {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc call failed ", err.Error())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if groupInfo.GroupType != constant.SuperGroup {
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.MemberInvitedNotification(req.OperationID, req.GroupID, req.OpUserID, req.Reason, okUserIDList)
} else {
go func() {
@@ -455,7 +443,8 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGroupAllMemberReq) (*pbGroup.GetGroupAllMemberResp, error) {
log.NewInfo(req.OperationID, "GetGroupAllMember, args ", req.String())
var resp pbGroup.GetGroupAllMemberResp
groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID)
//groupInfo, err := imdb.GetGroupInfoByGroupID(req.GroupID)
groupInfo, err := rocksCache.GetGroupInfoFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
resp.ErrCode = constant.ErrDB.ErrCode
@@ -463,18 +452,23 @@ func (s *groupServer) GetGroupAllMember(ctx context.Context, req *pbGroup.GetGro
return &resp, nil
}
if groupInfo.GroupType != constant.SuperGroup {
memberList, err := imdb.GetGroupMemberListByGroupID(req.GroupID)
memberList, err := rocksCache.GetAllGroupMembersInfoFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
resp.ErrCode = constant.ErrDB.ErrCode
resp.ErrMsg = constant.ErrDB.ErrMsg
log.NewError(req.OperationID, "GetGroupMemberListByGroupID failed,", err.Error(), req.GroupID)
return &resp, nil
}
//memberList, err := imdb.GetGroupMemberListByGroupID(req.GroupID)
//if err != nil {
// log.NewError(req.OperationID, "GetGroupMemberListByGroupID failed,", err.Error(), req.GroupID)
// resp.ErrCode = constant.ErrDB.ErrCode
// resp.ErrMsg = constant.ErrDB.ErrMsg
// return &resp, nil
//}
for _, v := range memberList {
//log.Debug(req.OperationID, v)
var node open_im_sdk.GroupMemberFullInfo
cp.GroupMemberDBCopyOpenIM(&node, &v)
cp.GroupMemberDBCopyOpenIM(&node, v)
//log.Debug(req.OperationID, "db value:", v.MuteEndTime, "seconds: ", v.MuteEndTime.Unix())
//log.Debug(req.OperationID, "cp value: ", node)
resp.MemberList = append(resp.MemberList, &node)
@@ -506,7 +500,6 @@ func (s *groupServer) GetGroupMemberList(ctx context.Context, req *pbGroup.GetGr
} else {
resp.NextSeq = req.NextSeq + int32(len(memberList))
}
resp.ErrCode = 0
log.NewInfo(req.OperationID, "GetGroupMemberList rpc return ", resp.String())
return &resp, nil
@@ -635,11 +628,6 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
}
}
reduceGroupMemberFromCacheReq := &pbCache.ReduceGroupMemberFromCacheReq{
UserIDList: okUserIDList,
GroupID: req.GroupID,
OperationID: req.OperationID,
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
@@ -647,16 +635,23 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.ReduceGroupMemberFromCache(context.Background(), reduceGroupMemberFromCacheReq)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupID,
OperationID: req.OperationID,
})
if err != nil {
log.NewError(req.OperationID, "ReduceGroupMemberFromCache rpc call failed ", err.Error())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "ReduceGroupMemberFromCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if groupInfo.GroupType != constant.SuperGroup {
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.MemberKickedNotification(req, okUserIDList)
} else {
go func() {
@@ -664,8 +659,8 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
chat.SuperGroupNotification(req.OperationID, v, v)
}
}()
}
log.NewInfo(req.OperationID, "GetGroupMemberList rpc return ", resp.String())
return &resp, nil
}
@@ -674,20 +669,38 @@ func (s *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbGroup.GetG
log.NewInfo(req.OperationID, "GetGroupMembersInfo args ", req.String())
var resp pbGroup.GetGroupMembersInfoResp
resp.MemberList = []*open_im_sdk.GroupMemberFullInfo{}
for _, v := range req.MemberList {
var memberNode open_im_sdk.GroupMemberFullInfo
memberInfo, err := imdb.GetMemberInfoByID(req.GroupID, v)
memberNode.UserID = v
if err != nil {
log.NewError(req.OperationID, "GetMemberInfoById failed ", err.Error(), req.GroupID, v)
continue
} else {
utils.CopyStructFields(&memberNode, memberInfo)
memberNode.JoinTime = int32(memberInfo.JoinTime.Unix())
//for _, v := range req.MemberList {
// var memberNode open_im_sdk.GroupMemberFullInfo
// memberInfo, err := imdb.GetMemberInfoByID(req.GroupID, v)
// memberNode.UserID = v
// if err != nil {
// log.NewError(req.OperationID, "GetMemberInfoById failed ", err.Error(), req.GroupID, v)
// continue
// } else {
// utils.CopyStructFields(&memberNode, memberInfo)
// memberNode.JoinTime = int32(memberInfo.JoinTime.Unix())
// resp.MemberList = append(resp.MemberList, &memberNode)
// }
//}
groupMembers, err := rocksCache.GetAllGroupMembersInfoFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
resp.ErrCode = constant.ErrDB.ErrCode
resp.ErrMsg = constant.ErrDB.ErrMsg
return &resp, nil
}
for _, member := range groupMembers {
if utils.IsContain(member.UserID, req.MemberList) {
var memberNode open_im_sdk.GroupMemberFullInfo
utils.CopyStructFields(&memberNode, member)
memberNode.JoinTime = int32(member.JoinTime.Unix())
resp.MemberList = append(resp.MemberList, &memberNode)
}
}
resp.ErrCode = 0
log.NewInfo(req.OperationID, "GetGroupMembersInfo rpc return ", resp.String())
return &resp, nil
@@ -730,16 +743,17 @@ func (s *groupServer) GetGroupsInfo(ctx context.Context, req *pbGroup.GetGroupsI
log.NewInfo(req.OperationID, "GetGroupsInfo args ", req.String())
groupsInfoList := make([]*open_im_sdk.GroupInfo, 0)
for _, groupID := range req.GroupIDList {
groupInfoFromMysql, err := imdb.GetGroupInfoByGroupID(groupID)
//groupInfoFromMysql, err := imdb.GetGroupInfoByGroupID(groupID)
groupInfoFromRedis, err := rocksCache.GetGroupInfoFromCache(groupID)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", err.Error(), groupID)
continue
}
var groupInfo open_im_sdk.GroupInfo
cp.GroupDBCopyOpenIM(&groupInfo, groupInfoFromMysql)
cp.GroupDBCopyOpenIM(&groupInfo, groupInfoFromRedis)
//groupInfo.NeedVerification
groupInfo.NeedVerification = groupInfoFromMysql.NeedVerification
groupInfo.NeedVerification = groupInfoFromRedis.NeedVerification
groupsInfoList = append(groupsInfoList, &groupInfo)
}
@@ -823,23 +837,36 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G
} else {
log.NewDebug(req.OperationID, utils.GetSelfFuncName(), "SetConversation success", respPb.String())
}
addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{OperationID: req.OperationID, GroupID: req.GroupID, UserIDList: []string{req.FromUserID}}
etcdCacheConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
if etcdCacheConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}}, nil
}
cacheClient := pbCache.NewCacheClient(etcdCacheConn)
cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{OperationID: req.OperationID, GroupID: req.GroupID})
if err != nil {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc call failed ", err.Error())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
group, err := rocksCache.GetGroupInfoFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if group != nil {
if group.GroupType != constant.SuperGroup {
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
}
}
chat.GroupApplicationAcceptedNotification(req)
chat.MemberEnterNotification(req)
} else if req.HandleResult == constant.GroupResponseRefuse {
@@ -872,24 +899,19 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
}
if groupInfo.NeedVerification == constant.Directly {
us, err := imdb.GetUserByUserID(req.OpUserID)
if err != nil {
log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), req.OpUserID)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//to group member
groupMember := db.GroupMember{GroupID: req.GroupID, RoleLevel: constant.GroupOwner, OperatorUserID: req.OpUserID}
utils.CopyStructFields(&groupMember, us)
err = imdb.InsertIntoGroupMember(groupMember)
if err != nil {
log.NewError(req.OperationID, "InsertIntoGroupMember failed ", err.Error(), groupMember)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if groupInfo.GroupType != constant.SuperGroup {
addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{
UserIDList: []string{req.OpUserID},
GroupID: req.GroupID,
OperationID: req.OperationID,
us, err := imdb.GetUserByUserID(req.OpUserID)
if err != nil {
log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), req.OpUserID)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//to group member
groupMember := db.GroupMember{GroupID: req.GroupID, RoleLevel: constant.GroupOwner, OperatorUserID: req.OpUserID}
utils.CopyStructFields(&groupMember, us)
err = imdb.InsertIntoGroupMember(groupMember)
if err != nil {
log.NewError(req.OperationID, "InsertIntoGroupMember failed ", err.Error(), groupMember)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
@@ -898,20 +920,23 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: constant.ErrInternal.ErrMsg}}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupID,
OperationID: req.OperationID,
})
if err != nil {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc call failed ", err.Error())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
chat.MemberEnterDirectlyNotification(req.GroupID, req.OpUserID, req.OperationID)
log.NewInfo(req.OperationID, "JoinGroup rpc return ")
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
} else {
log.Error(req.OperationID, "JoinGroup rpc failed, group type: ", groupInfo.GroupType)
log.Error(req.OperationID, "JoinGroup rpc failed, group type: ", groupInfo.GroupType, "not support directly")
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrArgs.ErrCode, ErrMsg: constant.ErrArgs.ErrMsg}}, nil
}
}
@@ -926,7 +951,6 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//
//_, err = imdb.GetGroupMemberListByGroupIDAndRoleLevel(req.GroupID, constant.GroupOwner)
//if err != nil {
// log.NewError(req.OperationID, "GetGroupMemberListByGroupIDAndRoleLevel failed ", err.Error(), req.GroupID, constant.GroupOwner)
@@ -994,25 +1018,23 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq)
}
}
reduceGroupMemberFromCacheReq := &pbCache.ReduceGroupMemberFromCacheReq{
UserIDList: []string{req.OpUserID},
GroupID: req.GroupID,
OperationID: req.OperationID,
}
etcdConnCache := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConnCache == nil {
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: errMsg}}, nil
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrInternal.ErrCode, ErrMsg: constant.ErrInternal.ErrMsg}}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConnCache)
cacheResp, err := cacheClient.ReduceGroupMemberFromCache(context.Background(), reduceGroupMemberFromCacheReq)
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupID,
OperationID: req.OperationID,
})
if err != nil {
log.NewError(req.OperationID, "ReduceGroupMemberFromCache rpc call failed ", err.Error())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "ReduceGroupMemberFromCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
@@ -1105,6 +1127,11 @@ func (s *groupServer) SetGroupInfo(ctx context.Context, req *pbGroup.SetGroupInf
log.NewError(req.OperationID, "SetGroupInfo failed ", err.Error(), groupInfo)
return &pbGroup.SetGroupInfoResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, http.WrapError(constant.ErrDB)
}
if err := rocksCache.DelGroupInfoFromCache(req.GroupInfoForSet.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "DelGroupInfoFromCache failed ", err.Error(), req.GroupInfoForSet.GroupID)
return &pbGroup.SetGroupInfoResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, http.WrapError(constant.ErrDB)
}
log.NewInfo(req.OperationID, "SetGroupInfo rpc return ", pbGroup.SetGroupInfoResp{CommonResp: &pbGroup.CommonResp{}})
if changedType != 0 {
chat.GroupInfoSetNotification(req.OperationID, req.OpUserID, req.GroupInfoForSet.GroupID, groupName, notification, introduction, faceURL, req.GroupInfoForSet.NeedVerification)
@@ -1187,8 +1214,10 @@ func (s *groupServer) TransferGroupOwner(_ context.Context, req *pbGroup.Transfe
log.NewError(req.OperationID, "UpdateGroupMemberInfo failed ", groupMemberInfo)
return &pbGroup.TransferGroupOwnerResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
chat.GroupOwnerTransferredNotification(req)
return &pbGroup.TransferGroupOwnerResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
}
@@ -1379,7 +1408,7 @@ func (s *groupServer) GetGroupMembersCMS(_ context.Context, req *pbGroup.GetGrou
return resp, http.WrapError(constant.ErrDB)
}
log.NewInfo(req.OperationID, groupMembersCount)
resp.MemberNums = groupMembersCount
resp.MemberNums = int32(groupMembersCount)
for _, groupMember := range groupMembers {
resp.Members = append(resp.Members, &open_im_sdk.GroupMemberFullInfo{
GroupID: req.GroupId,
@@ -1443,27 +1472,28 @@ func (s *groupServer) RemoveGroupMembersCMS(_ context.Context, req *pbGroup.Remo
}
}
reduceGroupMemberFromCacheReq := &pbCache.ReduceGroupMemberFromCacheReq{
UserIDList: resp.Success,
GroupID: req.GroupId,
OperationID: req.OperationID,
}
etcdConnCache := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConnCache == nil {
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
return resp, http.WrapError(constant.ErrInternal)
return resp, http.WrapError(constant.ErrDB)
}
cacheClient := pbCache.NewCacheClient(etcdConnCache)
cacheResp, err := cacheClient.ReduceGroupMemberFromCache(context.Background(), reduceGroupMemberFromCacheReq)
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupId,
OperationID: req.OperationID,
})
if err != nil {
log.NewError(req.OperationID, "ReduceGroupMemberFromCache rpc call failed ", err.Error())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return resp, http.WrapError(constant.ErrDB)
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "ReduceGroupMemberFromCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return resp, http.WrapError(constant.ErrDB)
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId)
}
chat.MemberKickedNotification(reqKick, resp.Success)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp)
@@ -1504,27 +1534,28 @@ func (s *groupServer) AddGroupMembersCMS(_ context.Context, req *pbGroup.AddGrou
}
}
addGroupMemberToCacheReq := &pbCache.AddGroupMemberToCacheReq{
UserIDList: resp.Success,
GroupID: req.GroupId,
OperationID: req.OperationId,
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationId)
if etcdConn == nil {
errMsg := req.OperationId + "getcdv3.GetConn == nil"
log.NewError(req.OperationId, errMsg)
return resp, http.WrapError(constant.ErrInternal)
return resp, http.WrapError(constant.ErrDB)
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.AddGroupMemberToCache(context.Background(), addGroupMemberToCacheReq)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupId,
OperationID: req.OperationId,
})
if err != nil {
log.NewError(req.OperationId, "AddGroupMemberToCache rpc call failed ", err.Error())
log.NewError(req.OperationId, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return resp, http.WrapError(constant.ErrDB)
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationId, "AddGroupMemberToCache rpc logic call failed ", cacheResp.String())
log.NewError(req.OperationId, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return resp, http.WrapError(constant.ErrDB)
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil {
log.NewError(req.OperationId, utils.GetSelfFuncName(), err.Error(), req.GroupId)
}
chat.MemberInvitedNotification(req.OperationId, req.GroupId, req.OpUserId, "admin add you to group", resp.Success)
return resp, nil
@@ -1627,6 +1658,28 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
}
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 500, ErrMsg: errMsg}}, nil
}
cacheClient := pbCache.NewCacheClient(etcdConn)
cacheResp, err := cacheClient.DelGroupMemberIDListFromCache(context.Background(), &pbCache.DelGroupMemberIDListFromCacheReq{
GroupID: req.GroupID,
OperationID: req.OperationID,
})
if err != nil {
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc call failed ", err.Error())
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 500, ErrMsg: err.Error()}}, nil
}
if cacheResp.CommonResp.ErrCode != 0 {
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}, nil
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
}
@@ -1672,6 +1725,9 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou
log.Error(req.OperationID, "UpdateGroupMemberInfo failed ", err.Error(), groupMemberInfo)
return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.GroupMemberMutedNotification(req.OperationID, req.OpUserID, req.GroupID, req.UserID, req.MutedSeconds)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
@@ -1713,6 +1769,9 @@ func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.Ca
log.Error(req.OperationID, "UpdateGroupMemberInfo failed ", err.Error(), groupMemberInfo)
return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.GroupMemberCancelMutedNotification(req.OperationID, req.OpUserID, req.GroupID, req.UserID)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
@@ -1752,6 +1811,9 @@ func (s *groupServer) MuteGroup(ctx context.Context, req *pbGroup.MuteGroupReq)
log.Error(req.OperationID, "OperateGroupStatus failed ", err.Error(), req.GroupID, constant.GroupStatusMuted)
return &pbGroup.MuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelGroupInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.GroupMutedNotification(req.OperationID, req.OpUserID, req.GroupID)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.MuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
@@ -1791,6 +1853,9 @@ func (s *groupServer) CancelMuteGroup(ctx context.Context, req *pbGroup.CancelMu
log.Error(req.OperationID, "UpdateGroupInfoDefaultZero failed ", err.Error(), req.GroupID)
return &pbGroup.CancelMuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelGroupInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.GroupCancelMutedNotification(req.OperationID, req.OpUserID, req.GroupID)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.CancelMuteGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
@@ -1803,7 +1868,6 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S
log.Error(req.OperationID, errMsg)
return &pbGroup.SetGroupMemberNicknameResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrAccess.ErrCode, ErrMsg: constant.ErrAccess.ErrMsg}}, nil
}
groupMemberInfo := db.GroupMember{}
groupMemberInfo.UserID = req.UserID
groupMemberInfo.GroupID = req.GroupID
@@ -1824,6 +1888,9 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S
log.Error(req.OperationID, errMsg)
return &pbGroup.SetGroupMemberNicknameResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.GroupMemberInfoSetNotification(req.OperationID, req.OpUserID, req.GroupID, req.UserID)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.SetGroupMemberNicknameResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
@@ -1856,6 +1923,9 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg + ":" + err.Error()
return resp, nil
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
if req.RoleLevel != nil {
switch req.RoleLevel.Value {
case constant.GroupOrdinaryUsers:
+2 -2
View File
@@ -182,7 +182,7 @@ func (s *organizationServer) UpdateDepartment(ctx context.Context, req *rpc.Upda
func (s *organizationServer) GetSubDepartment(ctx context.Context, req *rpc.GetSubDepartmentReq) (*rpc.GetSubDepartmentResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String())
err, departmentList := imdb.GetSubDepartmentList(req.DepartmentID)
departmentList, err := imdb.GetSubDepartmentList(req.DepartmentID)
if err != nil {
errMsg := req.OperationID + " " + "GetDepartment failed " + err.Error()
log.Error(req.OperationID, errMsg)
@@ -477,7 +477,7 @@ func (s *organizationServer) DeleteOrganizationUser(ctx context.Context, req *rp
func (s *organizationServer) GetDepartmentMember(ctx context.Context, req *rpc.GetDepartmentMemberReq) (*rpc.GetDepartmentMemberResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " rpc args ", req.String())
err, departmentMemberList := imdb.GetDepartmentMemberList(req.DepartmentID)
departmentMemberList, err := imdb.GetDepartmentMemberList(req.DepartmentID)
if err != nil {
errMsg := req.OperationID + " " + req.OpUserID + " is not app manager"
log.Error(req.OperationID, errMsg)
+21 -25
View File
@@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
imdb "Open_IM/pkg/common/db/mysql_model/im_mysql_model"
"Open_IM/pkg/common/db/rocks_cache"
errors "Open_IM/pkg/common/http"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
@@ -112,7 +113,7 @@ func (s *userServer) GetUserInfo(ctx context.Context, req *pbUser.GetUserInfoReq
if len(req.UserIDList) > 0 {
for _, userID := range req.UserIDList {
var userInfo sdkws.UserInfo
user, err := imdb.GetUserByUserID(userID)
user, err := rocksCache.GetUserInfoFromCache(userID)
if err != nil {
log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), userID)
continue
@@ -401,36 +402,24 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbUser.UpdateUserI
CommID: &pbFriend.CommID{OperationID: req.OperationID, FromUserID: req.UserInfo.UserID, OpUserID: req.OpUserID},
}
RpcResp, err := client.GetFriendList(context.Background(), newReq)
rpcResp, err := client.GetFriendList(context.Background(), newReq)
if err != nil {
log.NewError(req.OperationID, "GetFriendList failed ", err.Error(), newReq)
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{}}, nil
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: 500, ErrMsg: err.Error()}}, nil
}
for _, v := range RpcResp.FriendInfoList {
for _, v := range rpcResp.FriendInfoList {
log.Info(req.OperationID, "UserInfoUpdatedNotification ", req.UserInfo.UserID, v.FriendUser.UserID)
chat.UserInfoUpdatedNotification(req.OperationID, req.UserInfo.UserID, v.FriendUser.UserID)
}
if err := rocksCache.DelUserInfoFromCache(user.UserID); err != nil {
log.NewError(req.OperationID, "GetFriendList failed ", err.Error(), newReq)
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: err.Error()}}, nil
}
chat.UserInfoUpdatedNotification(req.OperationID, req.UserInfo.UserID, req.OpUserID)
log.Info(req.OperationID, "UserInfoUpdatedNotification ", req.UserInfo.UserID, req.OpUserID)
if req.UserInfo.FaceURL != "" {
go s.SyncJoinedGroupMemberFaceURL(req.UserInfo.UserID, req.UserInfo.FaceURL, req.OperationID, req.OpUserID)
}
//updateUserInfoToCacheReq := &cache.UpdateUserInfoToCacheReq{
// OperationID: req.OperationID,
// UserInfoList: []*sdkws.UserInfo{req.UserInfo},
//}
//cacheEtcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName)
//cacheClient := cache.NewCacheClient(cacheEtcdConn)
//resp, err := cacheClient.UpdateUserInfoToCache(context.Background(), updateUserInfoToCacheReq)
//if err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), updateUserInfoToCacheReq.String())
// return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrServer.ErrCode, ErrMsg: err.Error()}}, nil
//}
//if resp.CommonResp.ErrCode != 0 {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), resp.String())
// return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{ErrCode: constant.ErrServer.ErrCode, ErrMsg: resp.CommonResp.ErrMsg}}, nil
//}
return &pbUser.UpdateUserInfoResp{CommonResp: &pbUser.CommonResp{}}, nil
}
func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbUser.SetGlobalRecvMessageOptReq) (*pbUser.SetGlobalRecvMessageOptResp, error) {
@@ -456,15 +445,22 @@ func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbUser.Se
return &pbUser.SetGlobalRecvMessageOptResp{CommonResp: &pbUser.CommonResp{}}, nil
}
func (s *userServer) SyncJoinedGroupMemberFaceURL(userID string, faceURL string, operationID string, opUserID string) {
joinedGroupIDList, err := imdb.GetJoinedGroupIDListByUserID(userID)
joinedGroupIDList, err := rocksCache.GetJoinedGroupIDListFromCache(userID)
if err != nil {
log.NewWarn(operationID, "GetJoinedGroupIDListByUserID failed ", userID, err.Error())
return
}
for _, v := range joinedGroupIDList {
groupMemberInfo := db.GroupMember{UserID: userID, GroupID: v, FaceURL: faceURL}
imdb.UpdateGroupMemberInfo(groupMemberInfo)
chat.GroupMemberInfoSetNotification(operationID, opUserID, v, userID)
for _, groupID := range joinedGroupIDList {
groupMemberInfo := db.GroupMember{UserID: userID, GroupID: groupID, FaceURL: faceURL}
if err := imdb.UpdateGroupMemberInfo(groupMemberInfo); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupMemberInfo)
continue
}
if err := rocksCache.DelAllGroupMembersInfoFromCache(groupID); err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID)
continue
}
chat.GroupMemberInfoSetNotification(operationID, opUserID, groupID, userID)
}
}