Merge remote-tracking branch 'origin/tuoyun' into tuoyun

This commit is contained in:
wenxu12345
2022-02-08 11:54:52 +08:00
25 changed files with 388 additions and 966 deletions
+1 -2
View File
@@ -18,9 +18,8 @@ const (
//Websocket Protocol
WSGetNewestSeq = 1001
WSPullMsg = 1002
WSPullMsgBySeqList = 1002
WSSendMsg = 1003
WSPullMsgBySeqList = 1004
WSPushMsg = 2001
WSKickOnlineMsg = 2002
WSDataError = 3001
+19
View File
@@ -165,3 +165,22 @@ type Black struct {
OperatorUserID string `gorm:"column:operator_user_id;size:64"`
Ex string `gorm:"column:ex;size:1024"`
}
type ChatLog struct {
ServerMsgID string `gorm:"column:server_msg_id;primary_key;type:char(64)" json:"serverMsgID"`
ClientMsgID string `gorm:"column:client_msg_id;type:char(64)" json:"clientMsgID"`
SendID string `gorm:"column:send_id;type:char(64)" json:"sendID"`
RecvID string `gorm:"column:recv_id;type:char(64)" json:"recvID"`
SenderPlatformID int32 `gorm:"column:sender_platform_id" json:"senderPlatformID"`
SenderNickname string `gorm:"column:sender_nick_name;type:varchar(255)" json:"senderNickname"`
SenderFaceURL string `gorm:"column:sender_face_url;type:varchar(255)" json:"senderFaceURL"`
SessionType int32 `gorm:"column:session_type" json:"sessionType"`
MsgFrom int32 `gorm:"column:msg_from" json:"msgFrom"`
ContentType int32 `gorm:"column:content_type" json:"contentType"`
Content string `gorm:"column:content;type:varchar(1000)" json:"content"`
Status int32 `gorm:"column:status" json:"status"`
Seq uint32 `gorm:"column:seq;index:index_seq;default:0" json:"seq"`
SendTime time.Time `gorm:"column:send_time" json:"sendTime"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
}
+39 -80
View File
@@ -2,10 +2,10 @@ package db
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
pbMsg "Open_IM/pkg/proto/chat"
open_im_sdk "Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils"
"errors"
"github.com/garyburd/redigo/redis"
"github.com/golang/protobuf/proto"
@@ -33,51 +33,8 @@ type GroupMember_x struct {
UIDList []string
}
func (d *DataBases) GetMsgBySeqRange(uid string, seqBegin, seqEnd int64) (SingleMsg []*open_im_sdk.MsgData, GroupMsg []*open_im_sdk.MsgData, MaxSeq int64, MinSeq int64, err error) {
var count int64
session := d.mgoSession.Clone()
if session == nil {
return nil, nil, MaxSeq, MinSeq, errors.New("session == nil")
}
defer session.Close()
c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
sChat := UserChat{}
if err = c.Find(bson.M{"uid": uid}).One(&sChat); err != nil {
return nil, nil, MaxSeq, MinSeq, err
}
for i := 0; i < len(sChat.Msg); i++ {
msg := new(open_im_sdk.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
return nil, nil, MaxSeq, MinSeq, err
}
if msg.Seq >= seqBegin && msg.Seq <= seqEnd {
if msg.Seq > MaxSeq {
MaxSeq = msg.Seq
}
if count == 0 {
MinSeq = msg.Seq
}
if msg.Seq < MinSeq {
MinSeq = msg.Seq
}
if msg.SessionType == constant.SingleChatType {
SingleMsg = append(SingleMsg, msg)
} else {
GroupMsg = append(GroupMsg, msg)
}
count++
if count == (seqEnd - seqBegin + 1) {
break
}
}
}
return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil
}
func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq int64, err error) {
var i int64
func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq uint32, err error) {
var i, NB uint32
var seqUid string
session := d.mgoSession.Clone()
if session == nil {
@@ -89,7 +46,7 @@ func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq int64, err error) {
if err != nil && err != redis.ErrNil {
return MinSeq, err
}
NB := MaxSeq / singleGocMsgNum
NB = uint32(MaxSeq / singleGocMsgNum)
for i = 0; i <= NB; i++ {
seqUid = indexGen(uid, i)
n, err := c.Find(bson.M{"uid": seqUid}).Count()
@@ -97,29 +54,28 @@ func (d *DataBases) GetMinSeqFromMongo(uid string) (MinSeq int64, err error) {
if i == 0 {
MinSeq = 1
} else {
MinSeq = i * singleGocMsgNum
MinSeq = uint32(i * singleGocMsgNum)
}
break
}
}
return MinSeq, nil
}
func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*open_im_sdk.MsgData, GroupMsg []*open_im_sdk.MsgData, MaxSeq int64, MinSeq int64, err error) {
allCount := 0
func (d *DataBases) GetMsgBySeqList(uid string, seqList []uint32, operationID string) (seqMsg []*open_im_sdk.MsgData, err error) {
var hasSeqList []uint32
singleCount := 0
session := d.mgoSession.Clone()
if session == nil {
return nil, nil, MaxSeq, MinSeq, errors.New("session == nil")
return nil, errors.New("session == nil")
}
defer session.Close()
c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
m := func(uid string, seqList []int64) map[string][]int64 {
t := make(map[string][]int64)
m := func(uid string, seqList []uint32) map[string][]uint32 {
t := make(map[string][]uint32)
for i := 0; i < len(seqList); i++ {
seqUid := getSeqUid(uid, seqList[i])
if value, ok := t[seqUid]; !ok {
var temp []int64
var temp []uint32
t[seqUid] = append(temp, seqList[i])
} else {
t[seqUid] = append(value, seqList[i])
@@ -130,32 +86,19 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*o
sChat := UserChat{}
for seqUid, value := range m {
if err = c.Find(bson.M{"uid": seqUid}).One(&sChat); err != nil {
log.NewError("", "not find seqUid", seqUid, value, uid, seqList)
log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error())
continue
}
singleCount = 0
for i := 0; i < len(sChat.Msg); i++ {
msg := new(open_im_sdk.MsgData)
if err = proto.Unmarshal(sChat.Msg[i].Msg, msg); err != nil {
log.NewError("", "not find seqUid", seqUid, value, uid, seqList)
return nil, nil, MaxSeq, MinSeq, err
log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error())
return nil, err
}
if isContainInt64(msg.Seq, value) {
if msg.Seq > MaxSeq {
MaxSeq = msg.Seq
}
if allCount == 0 {
MinSeq = msg.Seq
}
if msg.Seq < MinSeq {
MinSeq = msg.Seq
}
if msg.SessionType == constant.SingleChatType {
SingleMsg = append(SingleMsg, msg)
} else {
GroupMsg = append(GroupMsg, msg)
}
allCount++
if isContainInt32(msg.Seq, value) {
seqMsg = append(seqMsg, msg)
hasSeqList = append(hasSeqList, msg.Seq)
singleCount++
if singleCount == len(value) {
break
@@ -163,8 +106,24 @@ func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*o
}
}
}
return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil
if len(hasSeqList) != len(seqList) {
var diff []uint32
diff = utils.Difference(hasSeqList, seqList)
exceptionMSg := genExceptionMessageBySeqList(diff)
seqMsg = append(seqMsg, exceptionMSg...)
}
return seqMsg, nil
}
func genExceptionMessageBySeqList(seqList []uint32) (exceptionMsg []*open_im_sdk.MsgData) {
for _, v := range seqList {
msg := new(open_im_sdk.MsgData)
msg.Seq = v
exceptionMsg = append(exceptionMsg, msg)
}
return exceptionMsg
}
func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToDB) error {
var seqUid string
newTime := getCurrentTimestampByMill()
@@ -183,7 +142,7 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m *pbMsg.MsgDataToD
log.NewDebug("", "find mgo uid cost time", getCurrentTimestampByMill()-newTime)
sMsg := MsgInfo{}
sMsg.SendTime = sendTime
if sMsg.Msg, err = proto.Marshal(m); err != nil {
if sMsg.Msg, err = proto.Marshal(m.MsgData); err != nil {
return err
}
if n == 0 {
@@ -318,11 +277,11 @@ func (d *DataBases) DelGroupMember(groupID, uid string) error {
func getCurrentTimestampByMill() int64 {
return time.Now().UnixNano() / 1e6
}
func getSeqUid(uid string, seq int64) string {
func getSeqUid(uid string, seq uint32) string {
seqSuffix := seq / singleGocMsgNum
return indexGen(uid, seqSuffix)
}
func isContainInt64(target int64, List []int64) bool {
func isContainInt32(target uint32, List []uint32) bool {
for _, element := range List {
@@ -333,6 +292,6 @@ func isContainInt64(target int64, List []int64) bool {
return false
}
func indexGen(uid string, seqSuffix int64) string {
return uid + ":" + strconv.FormatInt(seqSuffix, 10)
func indexGen(uid string, seqSuffix uint32) string {
return uid + ":" + strconv.FormatInt(int64(seqSuffix), 10)
}
+1 -1
View File
@@ -58,7 +58,7 @@ func initMysqlDB() {
&GroupMember{},
&GroupRequest{},
&User{},
&Black{})
&Black{}, &ChatLog{})
db.Set("gorm:table_options", "CHARSET=utf8")
//
@@ -11,45 +11,24 @@ import (
"Open_IM/pkg/common/db"
pbMsg "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils"
"database/sql"
"time"
"github.com/jinzhu/copier"
)
// ChatLog Chat information table structure
type ChatLog struct {
MsgId string `gorm:"primary_key"` // Chat history primary key ID
SendID string `gorm:"column:send_id"` // Send ID
RecvID string `gorm:"column:recv_id"` //Receive ID
SendTime time.Time `gorm:"column:send_time"` // Send time
SessionType int32 `gorm:"column:session_type"` // Session type
ContentType int32 `gorm:"column:content_type"` // Message content type
MsgFrom int32 `gorm:"column:msg_from"` // Source, user, system
Content string `gorm:"column:content"` // Chat content
SenderPlatformID int32 `gorm:"column:sender_platform_id"` //The sender's platform ID
Remark sql.NullString `gorm:"column:remark"` // remark
}
func InsertMessageToChatLog(msg pbMsg.MsgDataToMQ) error {
dbConn, err := db.DB.MysqlDB.DefaultGormDB()
if err != nil {
return err
}
chatLog := ChatLog{
MsgId: msg.MsgData.ServerMsgID,
SendID: msg.MsgData.SendID,
SendTime: utils.UnixNanoSecondToTime(msg.MsgData.SendTime),
SessionType: msg.MsgData.SessionType,
ContentType: msg.MsgData.ContentType,
MsgFrom: msg.MsgData.MsgFrom,
Content: string(msg.MsgData.Content),
SenderPlatformID: msg.MsgData.SenderPlatformID,
}
chatLog := new(db.ChatLog)
copier.Copy(chatLog, msg.MsgData)
switch msg.MsgData.SessionType {
case constant.GroupChatType:
chatLog.RecvID = msg.MsgData.GroupID
case constant.SingleChatType:
chatLog.RecvID = msg.MsgData.RecvID
}
return dbConn.Table("chat_log").Create(chatLog).Error
chatLog.Content = string(msg.MsgData.Content)
chatLog.CreateTime = utils.UnixMillSecondToTime(msg.MsgData.CreateTime)
chatLog.SendTime = utils.UnixMillSecondToTime(msg.MsgData.SendTime)
return dbConn.Table("chat_logs").Create(chatLog).Error
}
+7 -7
View File
@@ -35,28 +35,28 @@ func (d *DataBases) Exec(cmd string, key interface{}, args ...interface{}) (inte
}
//Perform seq auto-increment operation of user messages
func (d *DataBases) IncrUserSeq(uid string) (int64, error) {
func (d *DataBases) IncrUserSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
return redis.Int64(d.Exec("INCR", key))
return redis.Uint64(d.Exec("INCR", key))
}
//Get the largest Seq
func (d *DataBases) GetUserMaxSeq(uid string) (int64, error) {
func (d *DataBases) GetUserMaxSeq(uid string) (uint64, error) {
key := userIncrSeq + uid
return redis.Int64(d.Exec("GET", key))
return redis.Uint64(d.Exec("GET", key))
}
//Set the user's minimum seq
func (d *DataBases) SetUserMinSeq(uid string, minSeq int64) (err error) {
func (d *DataBases) SetUserMinSeq(uid string, minSeq uint32) (err error) {
key := userMinSeq + uid
_, err = d.Exec("SET", key, minSeq)
return err
}
//Get the smallest Seq
func (d *DataBases) GetUserMinSeq(uid string) (int64, error) {
func (d *DataBases) GetUserMinSeq(uid string) (uint64, error) {
key := userMinSeq + uid
return redis.Int64(d.Exec("GET", key))
return redis.Uint64(d.Exec("GET", key))
}
//Store Apple's device token to redis