This commit is contained in:
skiffer-git
2022-05-21 10:59:44 +08:00
parent ac840323be
commit 17b4a0525d
4 changed files with 6 additions and 0 deletions
+1
View File
@@ -147,6 +147,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
sendMsgAllCount++ sendMsgAllCount++
sendMsgAllCountLock.Unlock() sendMsgAllCountLock.Unlock()
log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data)
nReply := new(pbChat.SendMsgResp) nReply := new(pbChat.SendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
if isPass { if isPass {
@@ -105,6 +105,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) {
isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync)
if isHistory { if isHistory {
storageMsgList = append(storageMsgList, v) storageMsgList = append(storageMsgList, v)
log.NewWarn(triggerID, "storageMsgList to mongodb client msgID: ", v.MsgData.ClientMsgID)
} }
if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) { if !(!isSenderSync && msgChannelValue.userID == v.MsgData.SendID) {
pushMsgList = append(pushMsgList, v) pushMsgList = append(pushMsgList, v)
+2
View File
@@ -372,6 +372,8 @@ func (rpc *rpcChat) sendMsgToKafka(m *pbChat.MsgDataToMQ, key string, status str
pid, offset, err := rpc.onlineProducer.SendMessage(m, key) pid, offset, err := rpc.onlineProducer.SendMessage(m, key)
if err != nil { if err != nil {
log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status) log.Error(m.OperationID, "kafka send failed", "send data", m.String(), "pid", pid, "offset", offset, "err", err.Error(), "key", key, status)
} else {
log.NewWarn(m.OperationID, "sendMsgToKafka client msgID ", m.MsgData.ClientMsgID)
} }
return err return err
case constant.OfflineStatus: case constant.OfflineStatus:
+2
View File
@@ -48,6 +48,8 @@ func (d *DataBases) BatchInsertChat(userID string, msgList []*pbMsg.MsgDataToMQ,
seqUid := "" seqUid := ""
seqUidNext := "" seqUidNext := ""
log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList)) log.Debug(operationID, "remain ", remain, "insertCounter ", insertCounter, "currentMaxSeq ", currentMaxSeq, userID, len(msgList))
//4998 remain ==1
//4999
for _, m := range msgList { for _, m := range msgList {
log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID)
currentMaxSeq++ currentMaxSeq++