Compare commits

...

23 Commits

Author SHA1 Message Date
Gordon c97e39fc09 user change 2021-11-04 21:01:25 +08:00
Gordon 515d7bb318 config change 2021-11-04 18:07:39 +08:00
Gordon cdf928d64e push content modify 2021-11-04 17:27:12 +08:00
Gordon 2440575093 push content modify 2021-11-04 16:52:48 +08:00
Gordon 817d2b11a5 push modify 2021-11-04 10:19:33 +08:00
Gordon 4b4dce80cf push modify 2021-11-03 21:16:46 +08:00
Gordon 52e15fc9d4 push modify 2021-11-03 20:03:43 +08:00
Gordon 7dbaa1b8c1 compose commit 2021-11-03 17:54:31 +08:00
Gordon 4215d65aa7 compose commit 2021-11-03 17:51:53 +08:00
Gordon 62a6195a4f push log add 2021-11-03 16:10:24 +08:00
Gordon 51622e77d4 push log add 2021-11-03 15:59:08 +08:00
Gordon 2e14e0f767 jpush secret 2021-11-03 14:13:41 +08:00
Gordon b219f8542b push fix 2021-11-03 09:37:26 +08:00
Gordon 6d67bb9d65 docker-compose modify 2021-11-02 18:00:43 +08:00
Gordon af37abb394 docker-compose modify 2021-11-02 17:37:51 +08:00
Gordon dfd028625d fix secret check and management 2021-11-02 16:12:55 +08:00
Gordon a87c16de0d push fix 2021-11-02 12:00:24 +08:00
Gordon 082de2b6b7 group message fix bug 2021-11-01 17:16:47 +08:00
Gordon 7f19f2f046 fix import friend panic 2021-11-01 16:14:39 +08:00
Gordon 71873f7f0e docker-compose.yaml change 2021-10-29 14:14:27 +08:00
Gordon e124a26c52 docker-compose.yaml change 2021-10-29 12:00:10 +08:00
Gordon 09b501f7df msg split num change 2021-10-29 10:41:02 +08:00
Gordon a86980657c remove note 2021-10-29 09:53:39 +08:00
17 changed files with 173 additions and 115 deletions
+3 -3
View File
@@ -74,7 +74,7 @@ credential:
rpcport: rpcport:
openImUserPort: [ 10100 ] openImUserPort: [ 10100 ]
openImFriendPort: [ 10200 ] openImFriendPort: [ 10200 ]
openImOfflineMessagePort: [ 10300 ] openImOfflineMessagePort: [ 10300]
openImOnlineRelayPort: [ 10400 ] openImOnlineRelayPort: [ 10400 ]
openImGroupPort: [ 10500 ] openImGroupPort: [ 10500 ]
openImAuthPort: [ 10600 ] openImAuthPort: [ 10600 ]
@@ -120,8 +120,8 @@ push:
accessID: 111 accessID: 111
secretKey: 111 secretKey: 111
jpns: jpns:
appKey: 2783339cee4de379cc798fe1 appKey: cf47465a368f24c659608e7e
masterSecret: 66e5f309e032c68cc668c28a masterSecret: 02204efe3f3832947a236ee5
pushUrl: "https://api.jpush.cn/v3/push" pushUrl: "https://api.jpush.cn/v3/push"
manager: manager:
appManagerUid: ["openIM123456","openIM654321"] appManagerUid: ["openIM123456","openIM654321"]
+1 -1
View File
@@ -33,7 +33,7 @@ done
#Check launched service process #Check launched service process
check=$(ps aux | grep -w ./${msg_transfer_name} | grep -v grep | wc -l) check=$(ps aux | grep -w ./${msg_transfer_name} | grep -v grep | wc -l)
if [ $check -eq ${msg_transfer_service_num} ]; then if [ $check -eq ${msg_transfer_service_num} ]; then
echo -e ${GREEN_PREFIX}"service has been starting,belongs service is openImMsgTransfer"${COLOR_SUFFIX} echo -e ${GREEN_PREFIX}"none port service has been starting,belongs service is openImMsgTransfer"${COLOR_SUFFIX}
else else
echo -e ${RED_PREFIX}"openImMsgTransfer service does not start normally, num err"${COLOR_SUFFIX} echo -e ${RED_PREFIX}"openImMsgTransfer service does not start normally, num err"${COLOR_SUFFIX}
echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX} echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX}
+4
View File
@@ -49,6 +49,10 @@ func UserRegister(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return return
} }
if params.Secret != config.Config.Secret {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
return
}
pbData := newUserRegisterReq(&params) pbData := newUserRegisterReq(&params)
log.Info("", "", "api user_register is server, [data: %s]", pbData.String()) log.Info("", "", "api user_register is server, [data: %s]", pbData.String())
+4
View File
@@ -37,6 +37,10 @@ func UserToken(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return return
} }
if params.Secret != config.Config.Secret {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
return
}
pbData := newUserTokenReq(&params) pbData := newUserTokenReq(&params)
log.Info("", "", "api user_token is server, [data: %s]", pbData.String()) log.Info("", "", "api user_token is server, [data: %s]", pbData.String())
+1 -1
View File
@@ -67,7 +67,7 @@ func UserSendMsg(c *gin.Context) {
token := c.Request.Header.Get("token") token := c.Request.Header.Get("token")
log.InfoByKv("Ws call success to sendMsgReq", params.OperationID, "Parameters", params) log.InfoByKv("api call success to sendMsgReq", params.OperationID, "Parameters", params)
pbData := newUserSendMsgReq(token, &params) pbData := newUserSendMsgReq(token, &params)
log.Info("", "", "api UserSendMsg call start..., [data: %s]", pbData.String()) log.Info("", "", "api UserSendMsg call start..., [data: %s]", pbData.String())
+9 -6
View File
@@ -35,7 +35,7 @@ type paramsManagementSendMsg struct {
SessionType int32 `json:"sessionType" binding:"required"` SessionType int32 `json:"sessionType" binding:"required"`
} }
func newUserSendMsgReq(token string, params *paramsManagementSendMsg) *pbChat.UserSendMsgReq { func newUserSendMsgReq(params *paramsManagementSendMsg) *pbChat.UserSendMsgReq {
var newContent string var newContent string
switch params.ContentType { switch params.ContentType {
case constant.Text: case constant.Text:
@@ -53,7 +53,6 @@ func newUserSendMsgReq(token string, params *paramsManagementSendMsg) *pbChat.Us
} }
pbData := pbChat.UserSendMsgReq{ pbData := pbChat.UserSendMsgReq{
ReqIdentifier: constant.WSSendMsg, ReqIdentifier: constant.WSSendMsg,
Token: token,
SendID: params.SendID, SendID: params.SendID,
SenderNickName: params.SenderNickName, SenderNickName: params.SenderNickName,
SenderFaceURL: params.SenderFaceURL, SenderFaceURL: params.SenderFaceURL,
@@ -103,15 +102,19 @@ func ManagementSendMsg(c *gin.Context) {
} }
token := c.Request.Header.Get("token") token := c.Request.Header.Get("token")
if !utils.IsContain(params.SendID, config.Config.Manager.AppManagerUid) { claims, err := utils.ParseToken(token)
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not appManager", "sendTime": 0, "MsgID": ""}) if err != nil {
log.NewError(params.OperationID, "parse token failed", err.Error())
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""})
}
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""})
return return
} }
log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params) log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params)
pbData := newUserSendMsgReq(token, &params) pbData := newUserSendMsgReq(&params)
log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String()) log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String())
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
+19 -12
View File
@@ -26,15 +26,18 @@ const (
///ContentType ///ContentType
//UserRelated //UserRelated
Text = 101 Text = 101
Picture = 102 Picture = 102
Voice = 103 Voice = 103
Video = 104 Video = 104
File = 105 File = 105
AtText = 106 AtText = 106
Custom = 110 Custom = 110
HasReadReceipt = 112
Typing = 113
Common = 200
GroupMsg = 201
SyncSenderMsg = 108
//SysRelated //SysRelated
AcceptFriendApplicationTip = 201 AcceptFriendApplicationTip = 201
AddFriendTip = 202 AddFriendTip = 202
@@ -66,10 +69,14 @@ const (
) )
var ContentType2PushContent = map[int64]string{ var ContentType2PushContent = map[int64]string{
Picture: "[picture]", Picture: "[图片]",
Voice: "[voice]", Voice: "[语音]",
Video: "[video]", Video: "[视频]",
File: "[file]", File: "[文件]",
Text: "你收到了一条文本消息",
AtText: "[有人@你]",
GroupMsg: "你收到一条群聊消息",
Common: "你收到一条新消息",
} }
const FriendAcceptTip = "You have successfully become friends, so start chatting" const FriendAcceptTip = "You have successfully become friends, so start chatting"
+9 -8
View File
@@ -14,7 +14,7 @@ import (
const cChat = "chat" const cChat = "chat"
const cGroup = "group" const cGroup = "group"
const singleGocMsgNum = 10000 const singleGocMsgNum = 5000
type MsgInfo struct { type MsgInfo struct {
SendTime int64 SendTime int64
@@ -310,6 +310,14 @@ func (d *DataBases) DelGroupMember(groupID, uid string) error {
return nil return nil
} }
func getCurrentTimestampByMill() int64 {
return time.Now().UnixNano() / 1e6
}
func getSeqUid(uid string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return uid + ":" + strconv.FormatInt(seqSuffix, 10)
}
func isContainInt64(target int64, List []int64) bool { func isContainInt64(target int64, List []int64) bool {
for _, element := range List { for _, element := range List {
@@ -321,10 +329,3 @@ func isContainInt64(target int64, List []int64) bool {
return false return false
} }
func getCurrentTimestampByMill() int64 {
return time.Now().UnixNano() / 1e6
}
func getSeqUid(uid string, seq int64) string {
seqSuffix := seq / singleGocMsgNum
return uid + ":" + strconv.FormatInt(seqSuffix, 10)
}
@@ -5,6 +5,7 @@ import (
"Open_IM/src/common/db" "Open_IM/src/common/db"
pbAuth "Open_IM/src/proto/auth" pbAuth "Open_IM/src/proto/auth"
"Open_IM/src/utils" "Open_IM/src/utils"
"fmt"
_ "github.com/jinzhu/gorm/dialects/mysql" _ "github.com/jinzhu/gorm/dialects/mysql"
"time" "time"
) )
@@ -18,7 +19,7 @@ func init() {
pb.Name = "AppManager" + utils.IntToString(k+1) pb.Name = "AppManager" + utils.IntToString(k+1)
err := UserRegister(&pb) err := UserRegister(&pb)
if err != nil { if err != nil {
panic(err) fmt.Println("AppManager insert error", err.Error())
} }
} }
} }
+11 -3
View File
@@ -83,7 +83,7 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
enc := gob.NewEncoder(&replyBytes) enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply) err := enc.Encode(mReply)
if err != nil { if err != nil {
fmt.Println(err) log.NewError(in.OperationID, "data encode err", err.Error())
} }
switch in.GetSessionType() { switch in.GetSessionType() {
case constant.SingleChatType: case constant.SingleChatType:
@@ -92,10 +92,11 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
RecvID = strings.Split(in.GetRecvID(), " ")[0] RecvID = strings.Split(in.GetRecvID(), " ")[0]
} }
var tag bool var tag bool
var UIDAndPID []string
userIDList := genUidPlatformArray(RecvID) userIDList := genUidPlatformArray(RecvID)
for _, v := range userIDList { for _, v := range userIDList {
UIDAndPID = strings.Split(v, " ")
if conn := ws.getUserConn(v); conn != nil { if conn := ws.getUserConn(v); conn != nil {
UIDAndPID := strings.Split(v, " ")
tag = true tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0]) resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
temp := &pbRelay.SingleMsgToUser{ temp := &pbRelay.SingleMsgToUser{
@@ -104,10 +105,17 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]), RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
} }
resp = append(resp, temp) resp = append(resp, temp)
} else {
temp := &pbRelay.SingleMsgToUser{
ResultCode: -1,
RecvID: UIDAndPID[0],
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
}
resp = append(resp, temp)
} }
} }
if !tag { if !tag {
log.NewError(in.OperationID, "push err ,ws conn not in map", in.String()) log.NewError(in.OperationID, "push err ,no matched ws conn not in map", in.String())
} }
return &pbRelay.MsgToUserResp{ return &pbRelay.MsgToUserResp{
Resp: resp, Resp: resp,
+22 -16
View File
@@ -33,7 +33,7 @@ func (mc *HistoryConsumerHandler) Init() {
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg)) log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
time := utils.GetCurrentTimestampBySecond() time := utils.GetCurrentTimestampByNano()
pbData := pbMsg.WSToMsgSvrChatMsg{} pbData := pbMsg.WSToMsgSvrChatMsg{}
err := proto.Unmarshal(msg, &pbData) err := proto.Unmarshal(msg, &pbData)
if err != nil { if err != nil {
@@ -59,47 +59,53 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
isHistory := utils.GetSwitchFromOptions(Options, "history") isHistory := utils.GetSwitchFromOptions(Options, "history")
//Control whether to store history messages (mysql) //Control whether to store history messages (mysql)
isPersist := utils.GetSwitchFromOptions(Options, "persistent") isPersist := utils.GetSwitchFromOptions(Options, "persistent")
if pbData.SessionType == constant.SingleChatType { switch pbData.SessionType {
log.Info("", "", "msg_transfer chat type = SingleChatType", isHistory, isPersist) case constant.SingleChatType:
log.NewDebug(pbSaveData.OperationID, "msg_transfer chat type = SingleChatType", isHistory, isPersist)
if isHistory { if isHistory {
if msgKey == pbSaveData.RecvID { if msgKey == pbSaveData.RecvID {
err := saveUserChat(pbData.RecvID, &pbSaveData) err := saveUserChat(pbData.RecvID, &pbSaveData)
if err != nil { if err != nil {
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error()) log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
return
} }
} else if msgKey == pbSaveData.SendID { } else if msgKey == pbSaveData.SendID {
err := saveUserChat(pbData.SendID, &pbSaveData) err := saveUserChat(pbData.SendID, &pbSaveData)
if err != nil { if err != nil {
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error()) log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
return
} }
} }
log.NewInfo(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampBySecond()-time) log.NewDebug(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time)
} }
if msgKey == pbSaveData.RecvID { if msgKey == pbSaveData.RecvID {
pbSaveData.Options = pbData.Options pbSaveData.Options = pbData.Options
pbSaveData.OfflineInfo = pbData.OfflineInfo pbSaveData.OfflineInfo = pbData.OfflineInfo
go sendMessageToPush(&pbSaveData) go sendMessageToPush(&pbSaveData)
log.NewInfo(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampBySecond()-time) log.NewDebug(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
} }
log.InfoByKv("msg_transfer handle topic success...", "", "") case constant.GroupChatType:
} else if pbData.SessionType == constant.GroupChatType { log.NewDebug(pbSaveData.OperationID, "msg_transfer chat type = GroupChatType", isHistory, isPersist)
log.Info("", "", "msg_transfer chat type = GroupChatType")
if isHistory { if isHistory {
uidAndGroupID := strings.Split(pbData.RecvID, " ") uidAndGroupID := strings.Split(pbData.RecvID, " ")
saveUserChat(uidAndGroupID[0], &pbSaveData) err := saveUserChat(uidAndGroupID[0], &pbSaveData)
if err != nil {
log.NewError(pbSaveData.OperationID, "group data insert to mongo err", pbSaveData.String(), uidAndGroupID[0], err.Error())
return
}
} }
pbSaveData.Options = pbData.Options pbSaveData.Options = pbData.Options
pbSaveData.OfflineInfo = pbData.OfflineInfo pbSaveData.OfflineInfo = pbData.OfflineInfo
sendMessageToPush(&pbSaveData) go sendMessageToPush(&pbSaveData)
log.InfoByKv("msg_transfer handle topic success...", "", "") default:
} else { log.NewError(pbSaveData.OperationID, "SessionType error", pbSaveData.String())
log.Error("", "", "msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType) return
} }
log.NewDebug(pbSaveData.OperationID, "msg_transfer handle topic data to database success...", pbSaveData.String())
} }
func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
+1 -1
View File
@@ -20,6 +20,6 @@ func Init() {
} }
func Run() { func Run() {
//register mysqlConsumerHandler to //register mysqlConsumerHandler to
//go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
} }
+10 -5
View File
@@ -6,22 +6,23 @@ import (
"Open_IM/src/push/jpush/requestBody" "Open_IM/src/push/jpush/requestBody"
"bytes" "bytes"
"encoding/json" "encoding/json"
"io/ioutil"
"net/http" "net/http"
) )
type JPushResp struct { type JPushResp struct {
} }
func JGAccountListPush(accounts []string, jsonCustomContent string, platform string) (*http.Response, error) { func JGAccountListPush(accounts []string, content, detailContent, platform string) ([]byte, error) {
var pf requestBody.Platform var pf requestBody.Platform
_ = pf.SetPlatform(platform) _ = pf.SetPlatform(platform)
var au requestBody.Audience var au requestBody.Audience
au.SetAlias(accounts) au.SetAlias(accounts)
var no requestBody.Notification var no requestBody.Notification
no.SetAlert(jsonCustomContent) no.SetAlert(content)
var me requestBody.Message var me requestBody.Message
me.SetMsgContent(jsonCustomContent) me.SetMsgContent(detailContent)
var po requestBody.PushObj var po requestBody.PushObj
po.SetPlatform(&pf) po.SetPlatform(&pf)
po.SetAudience(&au) po.SetAudience(&au)
@@ -42,9 +43,13 @@ func JGAccountListPush(accounts []string, jsonCustomContent string, platform str
req.Header.Set("Authorization", common.GetAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret)) req.Header.Set("Authorization", common.GetAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret))
resp, err := client.Do(req) resp, err := client.Do(req)
defer resp.Body.Close()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return resp, nil defer resp.Body.Close()
result, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return result, nil
} }
+56 -41
View File
@@ -22,16 +22,20 @@ import (
"strings" "strings"
) )
type EChatContent struct { type OpenIMContent struct {
SessionType int `json:"chatType"` SessionType int `json:"sessionType"`
From string `json:"from"` From string `json:"from"`
To string `json:"to"` To string `json:"to"`
Seq int64 `json:"seq"` Seq int64 `json:"seq"`
} }
type AtContent struct {
Text string `json:"text"`
AtUserList []string `json:"atUserList"`
IsAtSelf bool `json:"isAtSelf"`
}
func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) { func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
var wsResult []*pbRelay.SingleMsgToUser var wsResult []*pbRelay.SingleMsgToUser
isShouldOfflinePush := true
MOptions := utils.JsonStringToMap(Options) //Control whether to push message to sender's other terminal MOptions := utils.JsonStringToMap(Options) //Control whether to push message to sender's other terminal
//isSenderSync := utils.GetSwitchFromOptions(MOptions, "senderSync") //isSenderSync := utils.GetSwitchFromOptions(MOptions, "senderSync")
isOfflinePush := utils.GetSwitchFromOptions(MOptions, "offlinePush") isOfflinePush := utils.GetSwitchFromOptions(MOptions, "offlinePush")
@@ -50,49 +54,60 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
} }
} }
log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData) log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData)
if isOfflinePush { if sendPbData.ContentType != constant.Typing && sendPbData.ContentType != constant.HasReadReceipt {
if isOfflinePush {
for _, t := range pushTerminal {
for _, v := range wsResult { for _, v := range wsResult {
if v.RecvPlatFormID == t && v.ResultCode == 0 { if v.ResultCode == 0 {
isShouldOfflinePush = false continue
break }
//supported terminal
for _, t := range pushTerminal {
if v.RecvPlatFormID == t {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, v.RecvID)
customContent := OpenIMContent{
SessionType: int(sendPbData.SessionType),
From: sendPbData.SendID,
To: sendPbData.RecvID,
Seq: sendPbData.RecvSeq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
switch sendPbData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(sendPbData.Content, &a)
if utils.IsContain(v.RecvID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
content = constant.ContentType2PushContent[constant.GroupMsg]
}
default:
}
pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, utils.PlatformIDToName(t))
if err != nil {
log.NewError(sendPbData.OperationID, "offline push error", sendPbData.String(), err.Error(), t)
} else {
log.NewDebug(sendPbData.OperationID, "offline push return result is ", string(pushResult), sendPbData, t)
}
}
} }
} }
if isShouldOfflinePush {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, sendPbData.RecvID)
customContent := EChatContent{
SessionType: int(sendPbData.SessionType),
From: sendPbData.SendID,
To: sendPbData.RecvID,
Seq: sendPbData.RecvSeq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
//switch sendPbData.ContentType {
//case constant.Text:
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, sendPbData.Content, jsonCustomContent)
//case constant.Picture:
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.Picture], jsonCustomContent)
//case constant.Voice:
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.Voice], jsonCustomContent)
//case constant.Video:
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.Video], jsonCustomContent)
//case constant.File:
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.File], jsonCustomContent)
//default:
//
//}
push.JGAccountListPush(UIDList, jsonCustomContent, utils.PlatformIDToName(t))
} else {
isShouldOfflinePush = true
}
} }
} }
} }
+1 -6
View File
@@ -42,12 +42,9 @@ type MsgCallBackResp struct {
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) { func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
replay := pbChat.UserSendMsgResp{} replay := pbChat.UserSendMsgResp{}
log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String()) log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
time := utils.GetCurrentTimestampByMill()
//if !utils.VerifyToken(pb.Token, pb.SendID) { //if !utils.VerifyToken(pb.Token, pb.SendID) {
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
//}
log.NewInfo(pb.OperationID, "VerifyToken cost time ", utils.GetCurrentTimestampByMill()-time)
serverMsgID := GetMsgID(pb.SendID) serverMsgID := GetMsgID(pb.SendID)
pbData := pbChat.WSToMsgSvrChatMsg{} pbData := pbChat.WSToMsgSvrChatMsg{}
pbData.MsgFrom = pb.MsgFrom pbData.MsgFrom = pb.MsgFrom
@@ -99,10 +96,8 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
} }
switch pbData.SessionType { switch pbData.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
time := utils.GetCurrentTimestampByMill()
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
log.NewInfo(pb.OperationID, "send kafka cost time ", utils.GetCurrentTimestampByMill()-time)
if err1 != nil || err2 != nil { if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
} }
+19 -11
View File
@@ -55,25 +55,33 @@ func (s *friendServer) AddFriend(ctx context.Context, req *pbFriend.AddFriendReq
func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFriendReq) (*pbFriend.ImportFriendResp, error) { func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFriendReq) (*pbFriend.ImportFriendResp, error) {
log.Info(req.Token, req.OperationID, "ImportFriend come here,args=%s", req.String()) log.Info(req.Token, req.OperationID, "ImportFriend come here,args=%s", req.String())
var resp pbFriend.ImportFriendResp var resp pbFriend.ImportFriendResp
var c pbFriend.CommonResp
//Parse token, to find current user information //Parse token, to find current user information
claims, err := utils.ParseToken(req.Token) claims, err := utils.ParseToken(req.Token)
if err != nil { if err != nil {
log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error()) log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error())
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: config.ErrParseToken.ErrMsg}, FailedUidList: req.UidList}, nil c.ErrorCode = config.ErrAddFriend.ErrCode
c.ErrorMsg = config.ErrParseToken.ErrMsg
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
} }
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) { if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
log.Error(req.Token, req.OperationID, "not magager uid", claims.UID) log.Error(req.Token, req.OperationID, "not manager uid", claims.UID)
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: "not authorized"}, FailedUidList: req.UidList}, nil c.ErrorCode = config.ErrAddFriend.ErrCode
c.ErrorMsg = "not authorized"
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
} }
if _, err = im_mysql_model.FindUserByUID(req.OwnerUid); err != nil { if _, err = im_mysql_model.FindUserByUID(req.OwnerUid); err != nil {
log.Error(req.Token, req.OperationID, "this user not exists,cant not add friend", req.OwnerUid) log.Error(req.Token, req.OperationID, "this user not exists,cant not add friend", req.OwnerUid)
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: "this user not exists,cant not add friend"}, FailedUidList: req.UidList}, nil c.ErrorCode = config.ErrAddFriend.ErrCode
c.ErrorMsg = "this user not exists,cant not add friend"
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
} }
for _, v := range req.UidList { for _, v := range req.UidList {
if _, err = im_mysql_model.FindUserByUID(v); err != nil { if _, fErr := im_mysql_model.FindUserByUID(v); fErr != nil {
resp.CommonResp.ErrorMsg = "some uid establish failed" c.ErrorMsg = "some uid establish failed"
resp.CommonResp.ErrorCode = 408 c.ErrorCode = 408
resp.CommonResp = &c
resp.FailedUidList = append(resp.FailedUidList, v) resp.FailedUidList = append(resp.FailedUidList, v)
} else { } else {
if _, err = im_mysql_model.FindFriendRelationshipFromFriend(req.OwnerUid, v); err != nil { if _, err = im_mysql_model.FindFriendRelationshipFromFriend(req.OwnerUid, v); err != nil {
@@ -81,18 +89,18 @@ func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFri
err1 := im_mysql_model.InsertToFriend(req.OwnerUid, v, 1) err1 := im_mysql_model.InsertToFriend(req.OwnerUid, v, 1)
if err1 != nil { if err1 != nil {
resp.FailedUidList = append(resp.FailedUidList, v) resp.FailedUidList = append(resp.FailedUidList, v)
log.Error(req.Token, req.OperationID, "err=%s,create friendship failed", err.Error()) log.NewError(req.OperationID, "err1,create friendship failed", req.OwnerUid, v, err1.Error())
} }
err2 := im_mysql_model.InsertToFriend(v, req.OwnerUid, 1) err2 := im_mysql_model.InsertToFriend(v, req.OwnerUid, 1)
if err2 != nil { if err2 != nil {
log.Error(req.Token, req.OperationID, "err=%s,create friendship failed", err.Error()) log.NewError(req.OperationID, "err2,create friendship failed", v, req.OwnerUid, err2.Error())
} }
if err1 == nil && err2 == nil { if err1 == nil && err2 == nil {
var name, faceUrl string var name, faceUrl string
n := content_struct.NotificationContent{1, constant.FriendAcceptTip, ""} n := content_struct.NotificationContent{IsDisplay: 1, DefaultTips: constant.FriendAcceptTip}
r, err := im_mysql_model.FindUserByUID(v) r, err := im_mysql_model.FindUserByUID(v)
if err != nil { if err != nil {
log.ErrorByKv("get info failed", req.OperationID, "err", err.Error(), "req", req.String()) log.NewError(req.OperationID, "get info failed", err.Error(), v)
} }
if r != nil { if r != nil {
name, faceUrl = r.Name, r.Icon name, faceUrl = r.Name, r.Icon
+1
View File
@@ -37,6 +37,7 @@ func IsContain(target string, List []string) bool {
return false return false
} }
func InterfaceArrayToStringArray(data []interface{}) (i []string) { func InterfaceArrayToStringArray(data []interface{}) (i []string) {
for _, param := range data { for _, param := range data {
i = append(i, param.(string)) i = append(i, param.(string))