Merge remote-tracking branch 'origin/tuoyun' into tuoyun

This commit is contained in:
wenxu12345
2021-10-26 12:10:31 +08:00
9 changed files with 75 additions and 46 deletions
@@ -31,7 +31,7 @@ func UserNewestSeq(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"})
return
}
pbData := pbMsg.GetNewSeqReq{}
pbData := pbMsg.GetMaxAndMinSeqReq{}
pbData.UserID = params.SendID
pbData.OperationID = params.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
@@ -40,7 +40,7 @@ func UserNewestSeq(c *gin.Context) {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", params)
}
msgClient := pbMsg.NewChatClient(grpcConn)
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
return
@@ -52,7 +52,8 @@ func UserNewestSeq(c *gin.Context) {
"msgIncr": params.MsgIncr,
"reqIdentifier": params.ReqIdentifier,
"data": gin.H{
"seq": reply.Seq,
"maxSeq": reply.MaxSeq,
"minSeq": reply.MinSeq,
},
})
+16 -2
View File
@@ -9,6 +9,7 @@ const (
userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq
appleDeviceToken = "DEVICE_TOKEN"
lastGetSeq = "LAST_GET_SEQ"
userMinSeq = "REDIS_USER_Min_SEQ:"
)
func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (interface{}, error) {
@@ -37,12 +38,25 @@ func (d *DataBases) IncrUserSeq(uid string) (int64, error) {
return redis.Int64(d.Exec("INCR", key))
}
//获取最新的seq
func (d *DataBases) GetUserSeq(uid string) (int64, error) {
//获取最大的Seq
func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) {
key := userIncrSeq + uid
return redis.Int64(d.Exec("GET", key))
}
//设置用户最小的seq
func (d *DataBases) SetUserMinSeq(uid string) (err error) {
key := userMinSeq + uid
_, err = d.Exec("SET", key)
return err
}
//获取最小的Seq
func (d *DataBases) GetUserMinSeq(uid string) (int64, error) {
key := userMinSeq + uid
return redis.Int64(d.Exec("GET", key))
}
//存储苹果的设备token到redis
func (d *DataBases) SetAppleDeviceToken(accountAddress, value string) (err error) {
key := appleDeviceToken + accountAddress
+6 -5
View File
@@ -68,9 +68,10 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
log.NewInfo("", "goroutine num is ", runtime.NumGoroutine())
}
func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) {
var mReplyData pbWs.GetNewSeqResp
mReplyData.Seq = pb.GetSeq()
func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetMaxAndMinSeqResp) {
var mReplyData pbWs.GetMaxAndMinSeqResp
mReplyData.MaxSeq = pb.GetMaxSeq()
mReplyData.MinSeq = pb.GetMinSeq()
b, _ := proto.Marshal(&mReplyData)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
@@ -84,7 +85,7 @@ func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqRes
}
func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
pbData := pbChat.GetNewSeqReq{}
pbData := pbChat.GetMaxAndMinSeqReq{}
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
@@ -92,7 +93,7 @@ func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
log.ErrorByKv("get grpcConn err", pbData.OperationID, "args", m)
}
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.GetNewSeq(context.Background(), &pbData)
reply, err := msgClient.GetMaxAndMinSeq(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("rpc call failed to getNewSeq", pbData.OperationID, "err", err, "pbData", pbData.String())
return
+20 -13
View File
@@ -13,22 +13,30 @@ import (
pbMsg "Open_IM/src/proto/chat"
)
func (rpc *rpcChat) GetNewSeq(_ context.Context, in *pbMsg.GetNewSeqReq) (*pbMsg.GetNewSeqResp, error) {
log.InfoByKv("rpc getNewSeq is arriving", in.OperationID, in.String())
func (rpc *rpcChat) GetMaxAndMinSeq(_ context.Context, in *pbMsg.GetMaxAndMinSeqReq) (*pbMsg.GetMaxAndMinSeqResp, error) {
log.InfoByKv("rpc getMaxAndMinSeq is arriving", in.OperationID, in.String())
//seq, err := model.GetBiggestSeqFromReceive(in.UserID)
seq, err := commonDB.DB.GetUserSeq(in.UserID)
resp := new(pbMsg.GetNewSeqResp)
if err == nil {
resp.Seq = seq
maxSeq, err1 := commonDB.DB.GetUserMaxSeq(in.UserID)
minSeq, err2 := commonDB.DB.GetUserMinSeq(in.UserID)
resp := new(pbMsg.GetMaxAndMinSeqResp)
if err1 == nil && err2 == nil {
resp.MaxSeq = maxSeq
resp.MinSeq = minSeq
resp.ErrCode = 0
resp.ErrMsg = ""
return resp, err
return resp, nil
} else {
if err == redis.ErrNil {
resp.Seq = 0
} else {
log.ErrorByKv("getSeq from redis error", in.OperationID, "args", in.String(), "err", err.Error())
resp.Seq = -1
if err1 == redis.ErrNil {
resp.MaxSeq = 0
} else if err1 != nil {
log.NewInfo(in.OperationID, "getMaxSeq from redis error", in.String(), err1.Error())
resp.MaxSeq = -1
}
if err2 == redis.ErrNil {
resp.MinSeq = 0
} else if err2 != nil {
log.NewInfo(in.OperationID, "getMinSeq from redis error", in.String(), err2.Error())
resp.MinSeq = -1
}
resp.ErrCode = 0
resp.ErrMsg = ""
@@ -81,7 +89,6 @@ func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessag
SingleUserMsg: respSingleMsgFormat,
GroupUserMsg: respGroupMsgFormat,
}, nil
panic("implement me")
}
func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat {
var userid string