Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

This commit is contained in:
Gordon
2022-08-08 17:42:39 +08:00
18 changed files with 365 additions and 129 deletions
+48 -2
View File
@@ -1251,7 +1251,7 @@ func SetGroupMemberInfo(c *gin.Context) {
if req.RoleLevel != nil {
reqPb.RoleLevel = &wrappers.Int32Value{Value: *req.RoleLevel}
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " api args ", reqPb.String())
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
@@ -1269,6 +1269,52 @@ func SetGroupMemberInfo(c *gin.Context) {
resp.ErrMsg = respPb.CommonResp.ErrMsg
resp.ErrCode = respPb.CommonResp.ErrCode
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " api args ", resp)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " api return ", resp)
c.JSON(http.StatusOK, resp)
}
func GetGroupAbstractInfo(c *gin.Context) {
var (
req api.GetGroupAbstractInfoReq
resp api.GetGroupAbstractInfoResp
)
if err := c.BindJSON(&req); err != nil {
log.NewError("0", "BindJSON failed ", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
ok, opUserID, errInfo := token_verify.GetUserIDFromToken(c.Request.Header.Get("token"), req.OperationID)
if !ok {
errMsg := req.OperationID + " " + "GetUserIDFromToken failed " + errInfo + " token:" + c.Request.Header.Get("token")
log.NewError(req.OperationID, errMsg)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 500, "errMsg": errMsg})
return
}
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName, req.OperationID)
if etcdConn == nil {
errMsg := req.OperationID + "getcdv3.GetConn == nil"
log.NewError(req.OperationID, errMsg)
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": errMsg})
return
}
client := rpc.NewGroupClient(etcdConn)
respPb, err := client.GetGroupAbstractInfo(context.Background(), &rpc.GetGroupAbstractInfoReq{
GroupID: req.GroupID,
OpUserID: opUserID,
OperationID: req.OperationID,
})
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " api args ", respPb.String())
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), " failed ", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": err.Error()})
return
}
resp.ErrMsg = respPb.CommonResp.ErrMsg
resp.ErrCode = respPb.CommonResp.ErrCode
resp.GroupMemberNumber = respPb.GroupMemberNumber
resp.GroupMemberListHash = respPb.GroupMemberListHash
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), " api return ", resp)
c.JSON(http.StatusOK, resp)
return
}
+2 -2
View File
@@ -36,8 +36,8 @@ var (
func Init() {
cmdCh = make(chan Cmd2Value, 10000)
w = new(sync.Mutex)
persistentCH.Init()
historyCH.Init(cmdCh)
persistentCH.Init() // 订阅ws2mschat 消费到 mysql
historyCH.Init(cmdCh) // 订阅ws2mschat 如果可靠性存储 消费到 incrseq 再存入mongo 再push || 非可靠性 直接incr再push 初始化ws2mschat
historyMongoCH.Init()
onlineTopicStatus = OnlineTopicVacancy
//offlineHistoryCH.Init(cmdCh)
+2 -2
View File
@@ -179,7 +179,7 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr
return err
}
client := &http.Client{}
log.Debug(operationID, utils.GetSelfFuncName(), "json:", string(con))
log.Debug(operationID, utils.GetSelfFuncName(), "json:", string(con), "token:", token)
req, err := http.NewRequest("POST", config.Config.Push.Getui.PushUrl+url, bytes.NewBuffer(con))
if err != nil {
return err
@@ -197,7 +197,7 @@ func (g *Getui) request(url string, content interface{}, token string, returnStr
if err != nil {
return err
}
log.NewInfo(operationID, "getui", utils.GetSelfFuncName(), "resp, ", string(result))
log.NewDebug(operationID, "getui", utils.GetSelfFuncName(), "resp, ", string(result))
commonResp := GetuiCommonResp{}
commonResp.Data = returnStruct
if err := json.Unmarshal(result, &commonResp); err != nil {
+79 -66
View File
@@ -34,10 +34,6 @@ type groupServer struct {
etcdAddr []string
}
func (s *groupServer) GetGroupAbstractInfo(c context.Context, req *pbGroup.GetGroupAbstractInfoReq) (*pbGroup.GetGroupAbstractInfoResp, error) {
panic("implement me")
}
func NewGroupServer(port int) *groupServer {
log.NewPrivateLog(constant.LogFileName)
return &groupServer{
@@ -148,7 +144,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
if req.GroupInfo.GroupType != constant.SuperGroup {
//to group member
for _, user := range req.InitMemberList {
us, err := imdb.GetUserByUserID(user.UserID)
us, err := rocksCache.GetUserInfoFromCache(user.UserID)
if err != nil {
log.NewError(req.OperationID, "GetUserByUserID failed ", err.Error(), user.UserID)
continue
@@ -168,7 +164,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
}
okUserIDList = append(okUserIDList, user.UserID)
}
group, err := imdb.GetGroupInfoByGroupID(groupId)
group, err := rocksCache.GetGroupInfoFromCache(groupId)
if err != nil {
log.NewError(req.OperationID, "GetGroupInfoByGroupID failed ", err.Error(), groupId)
resp.ErrCode = constant.ErrDB.ErrCode
@@ -176,7 +172,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbGroup.CreateGroupR
return resp, nil
}
utils.CopyStructFields(resp.GroupInfo, group)
memberCount, err := imdb.GetGroupMemberNumByGroupID(groupId)
memberCount, err := rocksCache.GetGroupMemberNumFromCache(groupId)
resp.GroupInfo.MemberCount = uint32(memberCount)
if err != nil {
log.NewError(req.OperationID, "GetGroupMemberNumByGroupID failed ", err.Error(), groupId)
@@ -245,7 +241,7 @@ func (s *groupServer) GetJoinedGroupList(ctx context.Context, req *pbGroup.GetJo
var resp pbGroup.GetJoinedGroupListResp
for _, v := range joinedGroupList {
var groupNode open_im_sdk.GroupInfo
num, err := imdb.GetGroupMemberNumByGroupID(v)
num, err := rocksCache.GetGroupMemberNumFromCache(v)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), v)
continue
@@ -463,7 +459,12 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.InviteUserToGroupResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
if groupInfo.GroupType != constant.SuperGroup {
for _, userID := range okUserIDList {
err = rocksCache.DelJoinedGroupIDListFromCache(userID)
@@ -687,6 +688,9 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.KickGroupMemberResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}, nil
}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if groupInfo.GroupType != constant.SuperGroup {
for _, userID := range okUserIDList {
@@ -698,7 +702,9 @@ func (s *groupServer) KickGroupMember(ctx context.Context, req *pbGroup.KickGrou
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.MemberKickedNotification(req, okUserIDList)
} else {
for _, userID := range okUserIDList {
@@ -913,19 +919,15 @@ func (s *groupServer) GroupApplicationResponse(_ context.Context, req *pbGroup.G
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.GroupApplicationResponseResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//group, err := rocksCache.GetGroupInfoFromCache(req.GroupID)
//if err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
//}
//if group != nil {
// if group.GroupType != constant.SuperGroup {
// if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
// }
// }
//}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if err := rocksCache.DelJoinedGroupIDListFromCache(req.FromUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.FromUserID, err.Error())
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.GroupApplicationAcceptedNotification(req)
chat.MemberEnterNotification(req)
} else if req.HandleResult == constant.GroupResponseRefuse {
@@ -991,22 +993,16 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbGroup.JoinGroupReq)
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//for _, userID := range okUserIDList {
// err = rocksCache.DelJoinedGroupIDListFromCache(userID)
// if err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), userID)
// }
//}
err = rocksCache.DelJoinedGroupIDListFromCache(req.OpUserID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
}
//err = rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID)
//if err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error())
//}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.MemberEnterDirectlyNotification(req.GroupID, req.OpUserID, req.OperationID)
log.NewInfo(req.OperationID, "JoinGroup rpc return ")
return &pbGroup.JoinGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
@@ -1113,17 +1109,19 @@ func (s *groupServer) QuitGroup(ctx context.Context, req *pbGroup.QuitGroupReq)
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.QuitGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
}
if groupInfo.GroupType != constant.SuperGroup {
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
//}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.OpUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
if err := rocksCache.DelJoinedGroupIDListFromCache(req.OpUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.OpUserID)
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.OpUserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
chat.MemberQuitNotification(req)
} else {
if err := rocksCache.DelJoinedSuperGroupIDListFromCache(req.OpUserID); err != nil {
@@ -1322,9 +1320,6 @@ func (s *groupServer) TransferGroupOwner(_ context.Context, req *pbGroup.Transfe
log.NewError(req.OperationID, "UpdateGroupMemberInfo failed ", groupMemberInfo)
return &pbGroup.TransferGroupOwnerResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), req.GroupID, err.Error())
//}
err = rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.NewOwnerUserID)
if err != nil {
log.NewError(req.OperationID, "DelGroupMemberInfoFromCache failed ", req.GroupID, req.NewOwnerUserID)
@@ -1607,9 +1602,13 @@ func (s *groupServer) RemoveGroupMembersCMS(_ context.Context, req *pbGroup.Remo
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return resp, http.WrapError(constant.ErrDB)
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId)
//}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupId); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId)
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupId); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId)
}
for _, userID := range resp.Success {
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupId, userID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupId, userID)
@@ -1674,9 +1673,12 @@ func (s *groupServer) AddGroupMembersCMS(_ context.Context, req *pbGroup.AddGrou
log.NewError(req.OperationId, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return resp, http.WrapError(constant.ErrDB)
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupId); err != nil {
// log.NewError(req.OperationId, utils.GetSelfFuncName(), err.Error(), req.GroupId)
//}
if err := rocksCache.DelGroupMemberListHashFromCache(req.GroupId); err != nil {
log.NewError(req.OperationId, utils.GetSelfFuncName(), err.Error(), req.GroupId)
}
if err := rocksCache.DelGroupMemberNumFromCache(req.GroupId); err != nil {
log.NewError(req.OperationId, utils.GetSelfFuncName(), err.Error(), req.GroupId)
}
chat.MemberInvitedNotification(req.OperationId, req.GroupId, req.OpUserId, "admin add you to group", resp.Success)
return resp, nil
@@ -1798,9 +1800,9 @@ func (s *groupServer) DismissGroup(ctx context.Context, req *pbGroup.DismissGrou
log.NewError(req.OperationID, "DelGroupMemberIDListFromCache rpc logic call failed ", cacheResp.String())
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: cacheResp.CommonResp.ErrCode, ErrMsg: cacheResp.CommonResp.ErrMsg}}, nil
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
//}
if err := rocksCache.DelGroupInfoFromCache(req.GroupID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "rpc return ", pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""})
return &pbGroup.DismissGroupResp{CommonResp: &pbGroup.CommonResp{ErrCode: 0, ErrMsg: ""}}, nil
}
@@ -1846,9 +1848,6 @@ func (s *groupServer) MuteGroupMember(ctx context.Context, req *pbGroup.MuteGrou
log.Error(req.OperationID, "UpdateGroupMemberInfo failed ", err.Error(), groupMemberInfo)
return &pbGroup.MuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
//}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
@@ -1893,9 +1892,6 @@ func (s *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbGroup.Ca
log.Error(req.OperationID, "UpdateGroupMemberInfo failed ", err.Error(), groupMemberInfo)
return &pbGroup.CancelMuteGroupMemberResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
//}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
@@ -2016,9 +2012,6 @@ func (s *groupServer) SetGroupMemberNickname(ctx context.Context, req *pbGroup.S
log.Error(req.OperationID, errMsg)
return &pbGroup.SetGroupMemberNicknameResp{CommonResp: &pbGroup.CommonResp{ErrCode: constant.ErrDB.ErrCode, ErrMsg: constant.ErrDB.ErrMsg}}, nil
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
//}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
}
@@ -2054,9 +2047,6 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
resp.CommonResp.ErrMsg = constant.ErrDB.ErrMsg + ":" + err.Error()
return resp, nil
}
//if err := rocksCache.DelAllGroupMembersInfoFromCache(req.GroupID); err != nil {
// log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID)
//}
if err := rocksCache.DelGroupMemberInfoFromCache(req.GroupID, req.UserID); err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), err.Error(), req.GroupID, req.UserID)
}
@@ -2075,3 +2065,26 @@ func (s *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbGroup.SetGr
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "resp: ", resp.String())
return resp, nil
}
func (s *groupServer) GetGroupAbstractInfo(c context.Context, req *pbGroup.GetGroupAbstractInfoReq) (*pbGroup.GetGroupAbstractInfoResp, error) {
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", req.String())
resp := &pbGroup.GetGroupAbstractInfoResp{CommonResp: &pbGroup.CommonResp{}}
hashCode, err := rocksCache.GetGroupMemberListHashFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupMemberListHashFromCache failed", req.GroupID, err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.GroupMemberListHash = hashCode
num, err := rocksCache.GetGroupMemberNumFromCache(req.GroupID)
if err != nil {
log.NewError(req.OperationID, utils.GetSelfFuncName(), "GetGroupMemberNumByGroupID failed", req.GroupID, err.Error())
resp.CommonResp.ErrCode = constant.ErrDB.ErrCode
resp.CommonResp.ErrMsg = err.Error()
return resp, nil
}
resp.GroupMemberNumber = int32(num)
log.NewInfo(req.OperationID, utils.GetSelfFuncName(), "req: ", resp.String())
return resp, nil
}
+89
View File
@@ -0,0 +1,89 @@
package timedTask
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
server_api_params "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"github.com/golang/protobuf/proto"
"strconv"
"strings"
)
const oldestList = 0
const newestList = -1
func DeleteMongoMsgAndResetRedisSeq(operationID, ID string, diffusionType int) error {
// -1 表示从当前最早的一个开始
var delMsgIDList []string
minSeq, err := deleteMongoMsg(operationID, ID, oldestList, &delMsgIDList)
if err != nil {
return utils.Wrap(err, "")
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList: ", delMsgIDList)
if diffusionType == constant.WriteDiffusion {
err = db.DB.SetUserMinSeq(ID, minSeq)
} else if diffusionType == constant.ReadDiffusion {
err = db.DB.SetGroupMinSeq(ID, minSeq)
}
return err
}
// recursion
func deleteMongoMsg(operationID string, ID string, index int64, IDList *[]string) (uint32, error) {
// 从最旧的列表开始找
msgs, err := db.DB.GetUserMsgListByIndex(ID, index)
if err != nil {
return 0, utils.Wrap(err, "GetUserMsgListByIndex failed")
}
log.NewDebug(operationID, utils.GetSelfFuncName(), "get msgs: ", msgs.UID)
for i, msg := range msgs.Msg {
// 找到列表中不需要删除的消息了
if msg.SendTime+int64(config.Config.Mongo.DBRetainChatRecords) > utils.GetCurrentTimestampByMill() {
if len(*IDList) > 0 {
err := db.DB.DelMongoMsgs(*IDList)
if err != nil {
return 0, utils.Wrap(err, "DelMongoMsgs failed")
}
}
minSeq := getDelMaxSeqByIDList(*IDList)
if i > 0 {
msgPb := &server_api_params.MsgData{}
err = proto.Unmarshal(msg.Msg, msgPb)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), ID, index)
} else {
err = db.DB.ReplaceMsgToBlankByIndex(msgs.UID, i-1)
if err != nil {
log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), msgs.UID, i)
return minSeq, nil
}
minSeq = msgPb.Seq - 1
}
}
return minSeq, nil
}
}
*IDList = append(*IDList, msgs.UID)
// 没有找到 代表需要全部删除掉 继续查找下一个比较旧的列表
seq, err := deleteMongoMsg(operationID, utils.GetSelfFuncName(), index-1, IDList)
if err != nil {
return 0, utils.Wrap(err, "deleteMongoMsg failed")
}
return seq, nil
}
func getDelMaxSeqByIDList(IDList []string) uint32 {
if len(IDList) == 0 {
return 0
}
l := strings.Split(IDList[len(IDList)-1], ":")
index, _ := strconv.Atoi(l[len(l)-1])
if index == 0 {
// 4999
return uint32(db.GetSingleGocMsgNum()) - 1
} // 5000
return (uint32(db.GetSingleGocMsgNum()) - 1) + uint32(index*db.GetSingleGocMsgNum())
}
+1 -26
View File
@@ -1,26 +1 @@
package timed_task
type TimeTask struct {
delMgoChatChan chan bool
}
var timeTask TimeTask
func GetInstance() *TimeTask {
if timeTask.delMgoChatChan == nil {
timeTask.delMgoChatChan = make(chan bool)
go func() {
timeTask.delMgoChatChan <- true
}()
}
return &timeTask
}
func (t *TimeTask) Run() {
for {
select {
case <-t.delMgoChatChan:
t.timedDeleteUserChat()
}
}
}
package timedTask
+17 -20
View File
@@ -1,26 +1,23 @@
package timed_task
package timedTask
import (
"Open_IM/pkg/common/db"
"time"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"github.com/robfig/cron/v3"
)
func (t *TimeTask) timedDeleteUserChat() {
now := time.Now()
next := now.Add(time.Hour * 24)
next = time.Date(next.Year(), next.Month(), next.Day(), 0, 0, 0, 0, next.Location())
tm := time.NewTimer(next.Sub(now))
<-tm.C
count, _ := db.DB.MgoUserCount()
for i := 0; i < count; i++ {
time.Sleep(10 * time.Millisecond)
uid, _ := db.DB.MgoSkipUID(i)
db.DB.DelUserChatMongo2(uid)
func main() {
log.NewInfo(utils.OperationIDGenerator(), "start cron task")
c := cron.New()
_, err := c.AddFunc("30 3-6,20-23 * * *", func() {
operationID := utils.OperationIDGenerator()
if err := DeleteMongoMsgAndResetRedisSeq(operationID, "", constant.ReadDiffusion); err != nil {
log.NewError(operationID)
}
})
if err != nil {
panic(err)
}
go func() {
t.delMgoChatChan <- true
}()
c.Start()
}