mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-10 03:56:00 +08:00
Compare commits
23 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c97e39fc09 | |||
| 515d7bb318 | |||
| cdf928d64e | |||
| 2440575093 | |||
| 817d2b11a5 | |||
| 4b4dce80cf | |||
| 52e15fc9d4 | |||
| 7dbaa1b8c1 | |||
| 4215d65aa7 | |||
| 62a6195a4f | |||
| 51622e77d4 | |||
| 2e14e0f767 | |||
| b219f8542b | |||
| 6d67bb9d65 | |||
| af37abb394 | |||
| dfd028625d | |||
| a87c16de0d | |||
| 082de2b6b7 | |||
| 7f19f2f046 | |||
| 71873f7f0e | |||
| e124a26c52 | |||
| 09b501f7df | |||
| a86980657c |
+3
-3
@@ -74,7 +74,7 @@ credential:
|
|||||||
rpcport:
|
rpcport:
|
||||||
openImUserPort: [ 10100 ]
|
openImUserPort: [ 10100 ]
|
||||||
openImFriendPort: [ 10200 ]
|
openImFriendPort: [ 10200 ]
|
||||||
openImOfflineMessagePort: [ 10300 ]
|
openImOfflineMessagePort: [ 10300]
|
||||||
openImOnlineRelayPort: [ 10400 ]
|
openImOnlineRelayPort: [ 10400 ]
|
||||||
openImGroupPort: [ 10500 ]
|
openImGroupPort: [ 10500 ]
|
||||||
openImAuthPort: [ 10600 ]
|
openImAuthPort: [ 10600 ]
|
||||||
@@ -120,8 +120,8 @@ push:
|
|||||||
accessID: 111
|
accessID: 111
|
||||||
secretKey: 111
|
secretKey: 111
|
||||||
jpns:
|
jpns:
|
||||||
appKey: 2783339cee4de379cc798fe1
|
appKey: cf47465a368f24c659608e7e
|
||||||
masterSecret: 66e5f309e032c68cc668c28a
|
masterSecret: 02204efe3f3832947a236ee5
|
||||||
pushUrl: "https://api.jpush.cn/v3/push"
|
pushUrl: "https://api.jpush.cn/v3/push"
|
||||||
manager:
|
manager:
|
||||||
appManagerUid: ["openIM123456","openIM654321"]
|
appManagerUid: ["openIM123456","openIM654321"]
|
||||||
|
|||||||
+1
-1
@@ -33,7 +33,7 @@ done
|
|||||||
#Check launched service process
|
#Check launched service process
|
||||||
check=$(ps aux | grep -w ./${msg_transfer_name} | grep -v grep | wc -l)
|
check=$(ps aux | grep -w ./${msg_transfer_name} | grep -v grep | wc -l)
|
||||||
if [ $check -eq ${msg_transfer_service_num} ]; then
|
if [ $check -eq ${msg_transfer_service_num} ]; then
|
||||||
echo -e ${GREEN_PREFIX}"service has been starting,belongs service is openImMsgTransfer"${COLOR_SUFFIX}
|
echo -e ${GREEN_PREFIX}"none port service has been starting,belongs service is openImMsgTransfer"${COLOR_SUFFIX}
|
||||||
else
|
else
|
||||||
echo -e ${RED_PREFIX}"openImMsgTransfer service does not start normally, num err"${COLOR_SUFFIX}
|
echo -e ${RED_PREFIX}"openImMsgTransfer service does not start normally, num err"${COLOR_SUFFIX}
|
||||||
echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX}
|
echo -e ${RED_PREFIX}"please check ../logs/openIM.log "${COLOR_SUFFIX}
|
||||||
|
|||||||
@@ -49,6 +49,10 @@ func UserRegister(c *gin.Context) {
|
|||||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if params.Secret != config.Config.Secret {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
|
||||||
|
return
|
||||||
|
}
|
||||||
pbData := newUserRegisterReq(¶ms)
|
pbData := newUserRegisterReq(¶ms)
|
||||||
|
|
||||||
log.Info("", "", "api user_register is server, [data: %s]", pbData.String())
|
log.Info("", "", "api user_register is server, [data: %s]", pbData.String())
|
||||||
|
|||||||
@@ -37,6 +37,10 @@ func UserToken(c *gin.Context) {
|
|||||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if params.Secret != config.Config.Secret {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": "not authorized"})
|
||||||
|
return
|
||||||
|
}
|
||||||
pbData := newUserTokenReq(¶ms)
|
pbData := newUserTokenReq(¶ms)
|
||||||
|
|
||||||
log.Info("", "", "api user_token is server, [data: %s]", pbData.String())
|
log.Info("", "", "api user_token is server, [data: %s]", pbData.String())
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ func UserSendMsg(c *gin.Context) {
|
|||||||
|
|
||||||
token := c.Request.Header.Get("token")
|
token := c.Request.Header.Get("token")
|
||||||
|
|
||||||
log.InfoByKv("Ws call success to sendMsgReq", params.OperationID, "Parameters", params)
|
log.InfoByKv("api call success to sendMsgReq", params.OperationID, "Parameters", params)
|
||||||
|
|
||||||
pbData := newUserSendMsgReq(token, ¶ms)
|
pbData := newUserSendMsgReq(token, ¶ms)
|
||||||
log.Info("", "", "api UserSendMsg call start..., [data: %s]", pbData.String())
|
log.Info("", "", "api UserSendMsg call start..., [data: %s]", pbData.String())
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ type paramsManagementSendMsg struct {
|
|||||||
SessionType int32 `json:"sessionType" binding:"required"`
|
SessionType int32 `json:"sessionType" binding:"required"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func newUserSendMsgReq(token string, params *paramsManagementSendMsg) *pbChat.UserSendMsgReq {
|
func newUserSendMsgReq(params *paramsManagementSendMsg) *pbChat.UserSendMsgReq {
|
||||||
var newContent string
|
var newContent string
|
||||||
switch params.ContentType {
|
switch params.ContentType {
|
||||||
case constant.Text:
|
case constant.Text:
|
||||||
@@ -53,7 +53,6 @@ func newUserSendMsgReq(token string, params *paramsManagementSendMsg) *pbChat.Us
|
|||||||
}
|
}
|
||||||
pbData := pbChat.UserSendMsgReq{
|
pbData := pbChat.UserSendMsgReq{
|
||||||
ReqIdentifier: constant.WSSendMsg,
|
ReqIdentifier: constant.WSSendMsg,
|
||||||
Token: token,
|
|
||||||
SendID: params.SendID,
|
SendID: params.SendID,
|
||||||
SenderNickName: params.SenderNickName,
|
SenderNickName: params.SenderNickName,
|
||||||
SenderFaceURL: params.SenderFaceURL,
|
SenderFaceURL: params.SenderFaceURL,
|
||||||
@@ -103,15 +102,19 @@ func ManagementSendMsg(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
token := c.Request.Header.Get("token")
|
token := c.Request.Header.Get("token")
|
||||||
if !utils.IsContain(params.SendID, config.Config.Manager.AppManagerUid) {
|
claims, err := utils.ParseToken(token)
|
||||||
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not appManager", "sendTime": 0, "MsgID": ""})
|
if err != nil {
|
||||||
|
log.NewError(params.OperationID, "parse token failed", err.Error())
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""})
|
||||||
|
}
|
||||||
|
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""})
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params)
|
log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params)
|
||||||
|
|
||||||
pbData := newUserSendMsgReq(token, ¶ms)
|
pbData := newUserSendMsgReq(¶ms)
|
||||||
log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String())
|
log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String())
|
||||||
|
|
||||||
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
|
||||||
|
|||||||
@@ -26,15 +26,18 @@ const (
|
|||||||
|
|
||||||
///ContentType
|
///ContentType
|
||||||
//UserRelated
|
//UserRelated
|
||||||
Text = 101
|
Text = 101
|
||||||
Picture = 102
|
Picture = 102
|
||||||
Voice = 103
|
Voice = 103
|
||||||
Video = 104
|
Video = 104
|
||||||
File = 105
|
File = 105
|
||||||
AtText = 106
|
AtText = 106
|
||||||
Custom = 110
|
Custom = 110
|
||||||
|
HasReadReceipt = 112
|
||||||
|
Typing = 113
|
||||||
|
Common = 200
|
||||||
|
GroupMsg = 201
|
||||||
|
|
||||||
SyncSenderMsg = 108
|
|
||||||
//SysRelated
|
//SysRelated
|
||||||
AcceptFriendApplicationTip = 201
|
AcceptFriendApplicationTip = 201
|
||||||
AddFriendTip = 202
|
AddFriendTip = 202
|
||||||
@@ -66,10 +69,14 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var ContentType2PushContent = map[int64]string{
|
var ContentType2PushContent = map[int64]string{
|
||||||
Picture: "[picture]",
|
Picture: "[图片]",
|
||||||
Voice: "[voice]",
|
Voice: "[语音]",
|
||||||
Video: "[video]",
|
Video: "[视频]",
|
||||||
File: "[file]",
|
File: "[文件]",
|
||||||
|
Text: "你收到了一条文本消息",
|
||||||
|
AtText: "[有人@你]",
|
||||||
|
GroupMsg: "你收到一条群聊消息",
|
||||||
|
Common: "你收到一条新消息",
|
||||||
}
|
}
|
||||||
|
|
||||||
const FriendAcceptTip = "You have successfully become friends, so start chatting"
|
const FriendAcceptTip = "You have successfully become friends, so start chatting"
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ import (
|
|||||||
|
|
||||||
const cChat = "chat"
|
const cChat = "chat"
|
||||||
const cGroup = "group"
|
const cGroup = "group"
|
||||||
const singleGocMsgNum = 10000
|
const singleGocMsgNum = 5000
|
||||||
|
|
||||||
type MsgInfo struct {
|
type MsgInfo struct {
|
||||||
SendTime int64
|
SendTime int64
|
||||||
@@ -310,6 +310,14 @@ func (d *DataBases) DelGroupMember(groupID, uid string) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getCurrentTimestampByMill() int64 {
|
||||||
|
return time.Now().UnixNano() / 1e6
|
||||||
|
}
|
||||||
|
func getSeqUid(uid string, seq int64) string {
|
||||||
|
seqSuffix := seq / singleGocMsgNum
|
||||||
|
return uid + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||||
|
}
|
||||||
func isContainInt64(target int64, List []int64) bool {
|
func isContainInt64(target int64, List []int64) bool {
|
||||||
|
|
||||||
for _, element := range List {
|
for _, element := range List {
|
||||||
@@ -321,10 +329,3 @@ func isContainInt64(target int64, List []int64) bool {
|
|||||||
return false
|
return false
|
||||||
|
|
||||||
}
|
}
|
||||||
func getCurrentTimestampByMill() int64 {
|
|
||||||
return time.Now().UnixNano() / 1e6
|
|
||||||
}
|
|
||||||
func getSeqUid(uid string, seq int64) string {
|
|
||||||
seqSuffix := seq / singleGocMsgNum
|
|
||||||
return uid + ":" + strconv.FormatInt(seqSuffix, 10)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"Open_IM/src/common/db"
|
"Open_IM/src/common/db"
|
||||||
pbAuth "Open_IM/src/proto/auth"
|
pbAuth "Open_IM/src/proto/auth"
|
||||||
"Open_IM/src/utils"
|
"Open_IM/src/utils"
|
||||||
|
"fmt"
|
||||||
_ "github.com/jinzhu/gorm/dialects/mysql"
|
_ "github.com/jinzhu/gorm/dialects/mysql"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -18,7 +19,7 @@ func init() {
|
|||||||
pb.Name = "AppManager" + utils.IntToString(k+1)
|
pb.Name = "AppManager" + utils.IntToString(k+1)
|
||||||
err := UserRegister(&pb)
|
err := UserRegister(&pb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
fmt.Println("AppManager insert error", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
|
|||||||
enc := gob.NewEncoder(&replyBytes)
|
enc := gob.NewEncoder(&replyBytes)
|
||||||
err := enc.Encode(mReply)
|
err := enc.Encode(mReply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
log.NewError(in.OperationID, "data encode err", err.Error())
|
||||||
}
|
}
|
||||||
switch in.GetSessionType() {
|
switch in.GetSessionType() {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
@@ -92,10 +92,11 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
|
|||||||
RecvID = strings.Split(in.GetRecvID(), " ")[0]
|
RecvID = strings.Split(in.GetRecvID(), " ")[0]
|
||||||
}
|
}
|
||||||
var tag bool
|
var tag bool
|
||||||
|
var UIDAndPID []string
|
||||||
userIDList := genUidPlatformArray(RecvID)
|
userIDList := genUidPlatformArray(RecvID)
|
||||||
for _, v := range userIDList {
|
for _, v := range userIDList {
|
||||||
|
UIDAndPID = strings.Split(v, " ")
|
||||||
if conn := ws.getUserConn(v); conn != nil {
|
if conn := ws.getUserConn(v); conn != nil {
|
||||||
UIDAndPID := strings.Split(v, " ")
|
|
||||||
tag = true
|
tag = true
|
||||||
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
|
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, UIDAndPID[1], UIDAndPID[0])
|
||||||
temp := &pbRelay.SingleMsgToUser{
|
temp := &pbRelay.SingleMsgToUser{
|
||||||
@@ -104,10 +105,17 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
|
|||||||
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
||||||
}
|
}
|
||||||
resp = append(resp, temp)
|
resp = append(resp, temp)
|
||||||
|
} else {
|
||||||
|
temp := &pbRelay.SingleMsgToUser{
|
||||||
|
ResultCode: -1,
|
||||||
|
RecvID: UIDAndPID[0],
|
||||||
|
RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
|
||||||
|
}
|
||||||
|
resp = append(resp, temp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !tag {
|
if !tag {
|
||||||
log.NewError(in.OperationID, "push err ,ws conn not in map", in.String())
|
log.NewError(in.OperationID, "push err ,no matched ws conn not in map", in.String())
|
||||||
}
|
}
|
||||||
return &pbRelay.MsgToUserResp{
|
return &pbRelay.MsgToUserResp{
|
||||||
Resp: resp,
|
Resp: resp,
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ func (mc *HistoryConsumerHandler) Init() {
|
|||||||
|
|
||||||
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
|
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
|
||||||
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
|
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
|
||||||
time := utils.GetCurrentTimestampBySecond()
|
time := utils.GetCurrentTimestampByNano()
|
||||||
pbData := pbMsg.WSToMsgSvrChatMsg{}
|
pbData := pbMsg.WSToMsgSvrChatMsg{}
|
||||||
err := proto.Unmarshal(msg, &pbData)
|
err := proto.Unmarshal(msg, &pbData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -59,47 +59,53 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
|
|||||||
isHistory := utils.GetSwitchFromOptions(Options, "history")
|
isHistory := utils.GetSwitchFromOptions(Options, "history")
|
||||||
//Control whether to store history messages (mysql)
|
//Control whether to store history messages (mysql)
|
||||||
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
|
isPersist := utils.GetSwitchFromOptions(Options, "persistent")
|
||||||
if pbData.SessionType == constant.SingleChatType {
|
switch pbData.SessionType {
|
||||||
log.Info("", "", "msg_transfer chat type = SingleChatType", isHistory, isPersist)
|
case constant.SingleChatType:
|
||||||
|
log.NewDebug(pbSaveData.OperationID, "msg_transfer chat type = SingleChatType", isHistory, isPersist)
|
||||||
if isHistory {
|
if isHistory {
|
||||||
if msgKey == pbSaveData.RecvID {
|
if msgKey == pbSaveData.RecvID {
|
||||||
err := saveUserChat(pbData.RecvID, &pbSaveData)
|
err := saveUserChat(pbData.RecvID, &pbSaveData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
|
log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if msgKey == pbSaveData.SendID {
|
} else if msgKey == pbSaveData.SendID {
|
||||||
err := saveUserChat(pbData.SendID, &pbSaveData)
|
err := saveUserChat(pbData.SendID, &pbSaveData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
|
log.NewError(pbSaveData.OperationID, "single data insert to mongo err", err.Error(), pbSaveData.String())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.NewInfo(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampBySecond()-time)
|
log.NewDebug(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampByNano()-time)
|
||||||
}
|
}
|
||||||
if msgKey == pbSaveData.RecvID {
|
if msgKey == pbSaveData.RecvID {
|
||||||
pbSaveData.Options = pbData.Options
|
pbSaveData.Options = pbData.Options
|
||||||
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
||||||
go sendMessageToPush(&pbSaveData)
|
go sendMessageToPush(&pbSaveData)
|
||||||
log.NewInfo(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampBySecond()-time)
|
log.NewDebug(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampByNano()-time)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.InfoByKv("msg_transfer handle topic success...", "", "")
|
case constant.GroupChatType:
|
||||||
} else if pbData.SessionType == constant.GroupChatType {
|
log.NewDebug(pbSaveData.OperationID, "msg_transfer chat type = GroupChatType", isHistory, isPersist)
|
||||||
log.Info("", "", "msg_transfer chat type = GroupChatType")
|
|
||||||
if isHistory {
|
if isHistory {
|
||||||
uidAndGroupID := strings.Split(pbData.RecvID, " ")
|
uidAndGroupID := strings.Split(pbData.RecvID, " ")
|
||||||
saveUserChat(uidAndGroupID[0], &pbSaveData)
|
err := saveUserChat(uidAndGroupID[0], &pbSaveData)
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(pbSaveData.OperationID, "group data insert to mongo err", pbSaveData.String(), uidAndGroupID[0], err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pbSaveData.Options = pbData.Options
|
pbSaveData.Options = pbData.Options
|
||||||
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
pbSaveData.OfflineInfo = pbData.OfflineInfo
|
||||||
sendMessageToPush(&pbSaveData)
|
go sendMessageToPush(&pbSaveData)
|
||||||
log.InfoByKv("msg_transfer handle topic success...", "", "")
|
default:
|
||||||
} else {
|
log.NewError(pbSaveData.OperationID, "SessionType error", pbSaveData.String())
|
||||||
log.Error("", "", "msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType)
|
return
|
||||||
}
|
}
|
||||||
|
log.NewDebug(pbSaveData.OperationID, "msg_transfer handle topic data to database success...", pbSaveData.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
func (HistoryConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||||||
|
|||||||
@@ -20,6 +20,6 @@ func Init() {
|
|||||||
}
|
}
|
||||||
func Run() {
|
func Run() {
|
||||||
//register mysqlConsumerHandler to
|
//register mysqlConsumerHandler to
|
||||||
//go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
|
||||||
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
|
||||||
}
|
}
|
||||||
|
|||||||
+10
-5
@@ -6,22 +6,23 @@ import (
|
|||||||
"Open_IM/src/push/jpush/requestBody"
|
"Open_IM/src/push/jpush/requestBody"
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JPushResp struct {
|
type JPushResp struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func JGAccountListPush(accounts []string, jsonCustomContent string, platform string) (*http.Response, error) {
|
func JGAccountListPush(accounts []string, content, detailContent, platform string) ([]byte, error) {
|
||||||
|
|
||||||
var pf requestBody.Platform
|
var pf requestBody.Platform
|
||||||
_ = pf.SetPlatform(platform)
|
_ = pf.SetPlatform(platform)
|
||||||
var au requestBody.Audience
|
var au requestBody.Audience
|
||||||
au.SetAlias(accounts)
|
au.SetAlias(accounts)
|
||||||
var no requestBody.Notification
|
var no requestBody.Notification
|
||||||
no.SetAlert(jsonCustomContent)
|
no.SetAlert(content)
|
||||||
var me requestBody.Message
|
var me requestBody.Message
|
||||||
me.SetMsgContent(jsonCustomContent)
|
me.SetMsgContent(detailContent)
|
||||||
var po requestBody.PushObj
|
var po requestBody.PushObj
|
||||||
po.SetPlatform(&pf)
|
po.SetPlatform(&pf)
|
||||||
po.SetAudience(&au)
|
po.SetAudience(&au)
|
||||||
@@ -42,9 +43,13 @@ func JGAccountListPush(accounts []string, jsonCustomContent string, platform str
|
|||||||
req.Header.Set("Authorization", common.GetAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret))
|
req.Header.Set("Authorization", common.GetAuthorization(config.Config.Push.Jpns.AppKey, config.Config.Push.Jpns.MasterSecret))
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
defer resp.Body.Close()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return resp, nil
|
defer resp.Body.Close()
|
||||||
|
result, err := ioutil.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,16 +22,20 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
type EChatContent struct {
|
type OpenIMContent struct {
|
||||||
SessionType int `json:"chatType"`
|
SessionType int `json:"sessionType"`
|
||||||
From string `json:"from"`
|
From string `json:"from"`
|
||||||
To string `json:"to"`
|
To string `json:"to"`
|
||||||
Seq int64 `json:"seq"`
|
Seq int64 `json:"seq"`
|
||||||
}
|
}
|
||||||
|
type AtContent struct {
|
||||||
|
Text string `json:"text"`
|
||||||
|
AtUserList []string `json:"atUserList"`
|
||||||
|
IsAtSelf bool `json:"isAtSelf"`
|
||||||
|
}
|
||||||
|
|
||||||
func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
|
func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
|
||||||
var wsResult []*pbRelay.SingleMsgToUser
|
var wsResult []*pbRelay.SingleMsgToUser
|
||||||
isShouldOfflinePush := true
|
|
||||||
MOptions := utils.JsonStringToMap(Options) //Control whether to push message to sender's other terminal
|
MOptions := utils.JsonStringToMap(Options) //Control whether to push message to sender's other terminal
|
||||||
//isSenderSync := utils.GetSwitchFromOptions(MOptions, "senderSync")
|
//isSenderSync := utils.GetSwitchFromOptions(MOptions, "senderSync")
|
||||||
isOfflinePush := utils.GetSwitchFromOptions(MOptions, "offlinePush")
|
isOfflinePush := utils.GetSwitchFromOptions(MOptions, "offlinePush")
|
||||||
@@ -50,49 +54,60 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData)
|
log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData)
|
||||||
if isOfflinePush {
|
if sendPbData.ContentType != constant.Typing && sendPbData.ContentType != constant.HasReadReceipt {
|
||||||
|
if isOfflinePush {
|
||||||
for _, t := range pushTerminal {
|
|
||||||
for _, v := range wsResult {
|
for _, v := range wsResult {
|
||||||
if v.RecvPlatFormID == t && v.ResultCode == 0 {
|
if v.ResultCode == 0 {
|
||||||
isShouldOfflinePush = false
|
continue
|
||||||
break
|
}
|
||||||
|
//supported terminal
|
||||||
|
for _, t := range pushTerminal {
|
||||||
|
if v.RecvPlatFormID == t {
|
||||||
|
//Use offline push messaging
|
||||||
|
var UIDList []string
|
||||||
|
UIDList = append(UIDList, v.RecvID)
|
||||||
|
customContent := OpenIMContent{
|
||||||
|
SessionType: int(sendPbData.SessionType),
|
||||||
|
From: sendPbData.SendID,
|
||||||
|
To: sendPbData.RecvID,
|
||||||
|
Seq: sendPbData.RecvSeq,
|
||||||
|
}
|
||||||
|
bCustomContent, _ := json.Marshal(customContent)
|
||||||
|
jsonCustomContent := string(bCustomContent)
|
||||||
|
var content string
|
||||||
|
switch sendPbData.ContentType {
|
||||||
|
case constant.Text:
|
||||||
|
content = constant.ContentType2PushContent[constant.Text]
|
||||||
|
case constant.Picture:
|
||||||
|
content = constant.ContentType2PushContent[constant.Picture]
|
||||||
|
case constant.Voice:
|
||||||
|
content = constant.ContentType2PushContent[constant.Voice]
|
||||||
|
case constant.Video:
|
||||||
|
content = constant.ContentType2PushContent[constant.Video]
|
||||||
|
case constant.File:
|
||||||
|
content = constant.ContentType2PushContent[constant.File]
|
||||||
|
case constant.AtText:
|
||||||
|
a := AtContent{}
|
||||||
|
_ = utils.JsonStringToStruct(sendPbData.Content, &a)
|
||||||
|
if utils.IsContain(v.RecvID, a.AtUserList) {
|
||||||
|
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
|
||||||
|
} else {
|
||||||
|
content = constant.ContentType2PushContent[constant.GroupMsg]
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, utils.PlatformIDToName(t))
|
||||||
|
if err != nil {
|
||||||
|
log.NewError(sendPbData.OperationID, "offline push error", sendPbData.String(), err.Error(), t)
|
||||||
|
} else {
|
||||||
|
log.NewDebug(sendPbData.OperationID, "offline push return result is ", string(pushResult), sendPbData, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if isShouldOfflinePush {
|
|
||||||
//Use offline push messaging
|
|
||||||
var UIDList []string
|
|
||||||
UIDList = append(UIDList, sendPbData.RecvID)
|
|
||||||
customContent := EChatContent{
|
|
||||||
SessionType: int(sendPbData.SessionType),
|
|
||||||
From: sendPbData.SendID,
|
|
||||||
To: sendPbData.RecvID,
|
|
||||||
Seq: sendPbData.RecvSeq,
|
|
||||||
}
|
|
||||||
bCustomContent, _ := json.Marshal(customContent)
|
|
||||||
|
|
||||||
jsonCustomContent := string(bCustomContent)
|
|
||||||
//switch sendPbData.ContentType {
|
|
||||||
//case constant.Text:
|
|
||||||
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, sendPbData.Content, jsonCustomContent)
|
|
||||||
//case constant.Picture:
|
|
||||||
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.Picture], jsonCustomContent)
|
|
||||||
//case constant.Voice:
|
|
||||||
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.Voice], jsonCustomContent)
|
|
||||||
//case constant.Video:
|
|
||||||
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.Video], jsonCustomContent)
|
|
||||||
//case constant.File:
|
|
||||||
// IOSAccountListPush(UIDList, sendPbData.SenderNickName, constant.ContentType2PushContent[constant.File], jsonCustomContent)
|
|
||||||
//default:
|
|
||||||
//
|
|
||||||
//}
|
|
||||||
push.JGAccountListPush(UIDList, jsonCustomContent, utils.PlatformIDToName(t))
|
|
||||||
|
|
||||||
} else {
|
|
||||||
isShouldOfflinePush = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,12 +42,9 @@ type MsgCallBackResp struct {
|
|||||||
|
|
||||||
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
|
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
|
||||||
replay := pbChat.UserSendMsgResp{}
|
replay := pbChat.UserSendMsgResp{}
|
||||||
log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String())
|
log.NewDebug(pb.OperationID, "rpc sendMsg come here", pb.String())
|
||||||
time := utils.GetCurrentTimestampByMill()
|
|
||||||
//if !utils.VerifyToken(pb.Token, pb.SendID) {
|
//if !utils.VerifyToken(pb.Token, pb.SendID) {
|
||||||
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
|
// return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
|
||||||
//}
|
|
||||||
log.NewInfo(pb.OperationID, "VerifyToken cost time ", utils.GetCurrentTimestampByMill()-time)
|
|
||||||
serverMsgID := GetMsgID(pb.SendID)
|
serverMsgID := GetMsgID(pb.SendID)
|
||||||
pbData := pbChat.WSToMsgSvrChatMsg{}
|
pbData := pbChat.WSToMsgSvrChatMsg{}
|
||||||
pbData.MsgFrom = pb.MsgFrom
|
pbData.MsgFrom = pb.MsgFrom
|
||||||
@@ -99,10 +96,8 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
|
|||||||
}
|
}
|
||||||
switch pbData.SessionType {
|
switch pbData.SessionType {
|
||||||
case constant.SingleChatType:
|
case constant.SingleChatType:
|
||||||
time := utils.GetCurrentTimestampByMill()
|
|
||||||
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
|
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
|
||||||
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
|
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
|
||||||
log.NewInfo(pb.OperationID, "send kafka cost time ", utils.GetCurrentTimestampByMill()-time)
|
|
||||||
if err1 != nil || err2 != nil {
|
if err1 != nil || err2 != nil {
|
||||||
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,25 +55,33 @@ func (s *friendServer) AddFriend(ctx context.Context, req *pbFriend.AddFriendReq
|
|||||||
func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFriendReq) (*pbFriend.ImportFriendResp, error) {
|
func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFriendReq) (*pbFriend.ImportFriendResp, error) {
|
||||||
log.Info(req.Token, req.OperationID, "ImportFriend come here,args=%s", req.String())
|
log.Info(req.Token, req.OperationID, "ImportFriend come here,args=%s", req.String())
|
||||||
var resp pbFriend.ImportFriendResp
|
var resp pbFriend.ImportFriendResp
|
||||||
|
var c pbFriend.CommonResp
|
||||||
//Parse token, to find current user information
|
//Parse token, to find current user information
|
||||||
claims, err := utils.ParseToken(req.Token)
|
claims, err := utils.ParseToken(req.Token)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error())
|
log.Error(req.Token, req.OperationID, "err=%s,parse token failed", err.Error())
|
||||||
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: config.ErrParseToken.ErrMsg}, FailedUidList: req.UidList}, nil
|
c.ErrorCode = config.ErrAddFriend.ErrCode
|
||||||
|
c.ErrorMsg = config.ErrParseToken.ErrMsg
|
||||||
|
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
|
if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) {
|
||||||
log.Error(req.Token, req.OperationID, "not magager uid", claims.UID)
|
log.Error(req.Token, req.OperationID, "not manager uid", claims.UID)
|
||||||
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: "not authorized"}, FailedUidList: req.UidList}, nil
|
c.ErrorCode = config.ErrAddFriend.ErrCode
|
||||||
|
c.ErrorMsg = "not authorized"
|
||||||
|
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
|
||||||
}
|
}
|
||||||
if _, err = im_mysql_model.FindUserByUID(req.OwnerUid); err != nil {
|
if _, err = im_mysql_model.FindUserByUID(req.OwnerUid); err != nil {
|
||||||
log.Error(req.Token, req.OperationID, "this user not exists,cant not add friend", req.OwnerUid)
|
log.Error(req.Token, req.OperationID, "this user not exists,cant not add friend", req.OwnerUid)
|
||||||
return &pbFriend.ImportFriendResp{CommonResp: &pbFriend.CommonResp{ErrorCode: config.ErrAddFriend.ErrCode, ErrorMsg: "this user not exists,cant not add friend"}, FailedUidList: req.UidList}, nil
|
c.ErrorCode = config.ErrAddFriend.ErrCode
|
||||||
|
c.ErrorMsg = "this user not exists,cant not add friend"
|
||||||
|
return &pbFriend.ImportFriendResp{CommonResp: &c, FailedUidList: req.UidList}, nil
|
||||||
}
|
}
|
||||||
for _, v := range req.UidList {
|
for _, v := range req.UidList {
|
||||||
if _, err = im_mysql_model.FindUserByUID(v); err != nil {
|
if _, fErr := im_mysql_model.FindUserByUID(v); fErr != nil {
|
||||||
resp.CommonResp.ErrorMsg = "some uid establish failed"
|
c.ErrorMsg = "some uid establish failed"
|
||||||
resp.CommonResp.ErrorCode = 408
|
c.ErrorCode = 408
|
||||||
|
resp.CommonResp = &c
|
||||||
resp.FailedUidList = append(resp.FailedUidList, v)
|
resp.FailedUidList = append(resp.FailedUidList, v)
|
||||||
} else {
|
} else {
|
||||||
if _, err = im_mysql_model.FindFriendRelationshipFromFriend(req.OwnerUid, v); err != nil {
|
if _, err = im_mysql_model.FindFriendRelationshipFromFriend(req.OwnerUid, v); err != nil {
|
||||||
@@ -81,18 +89,18 @@ func (s *friendServer) ImportFriend(ctx context.Context, req *pbFriend.ImportFri
|
|||||||
err1 := im_mysql_model.InsertToFriend(req.OwnerUid, v, 1)
|
err1 := im_mysql_model.InsertToFriend(req.OwnerUid, v, 1)
|
||||||
if err1 != nil {
|
if err1 != nil {
|
||||||
resp.FailedUidList = append(resp.FailedUidList, v)
|
resp.FailedUidList = append(resp.FailedUidList, v)
|
||||||
log.Error(req.Token, req.OperationID, "err=%s,create friendship failed", err.Error())
|
log.NewError(req.OperationID, "err1,create friendship failed", req.OwnerUid, v, err1.Error())
|
||||||
}
|
}
|
||||||
err2 := im_mysql_model.InsertToFriend(v, req.OwnerUid, 1)
|
err2 := im_mysql_model.InsertToFriend(v, req.OwnerUid, 1)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
log.Error(req.Token, req.OperationID, "err=%s,create friendship failed", err.Error())
|
log.NewError(req.OperationID, "err2,create friendship failed", v, req.OwnerUid, err2.Error())
|
||||||
}
|
}
|
||||||
if err1 == nil && err2 == nil {
|
if err1 == nil && err2 == nil {
|
||||||
var name, faceUrl string
|
var name, faceUrl string
|
||||||
n := content_struct.NotificationContent{1, constant.FriendAcceptTip, ""}
|
n := content_struct.NotificationContent{IsDisplay: 1, DefaultTips: constant.FriendAcceptTip}
|
||||||
r, err := im_mysql_model.FindUserByUID(v)
|
r, err := im_mysql_model.FindUserByUID(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorByKv("get info failed", req.OperationID, "err", err.Error(), "req", req.String())
|
log.NewError(req.OperationID, "get info failed", err.Error(), v)
|
||||||
}
|
}
|
||||||
if r != nil {
|
if r != nil {
|
||||||
name, faceUrl = r.Name, r.Icon
|
name, faceUrl = r.Name, r.Icon
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ func IsContain(target string, List []string) bool {
|
|||||||
return false
|
return false
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func InterfaceArrayToStringArray(data []interface{}) (i []string) {
|
func InterfaceArrayToStringArray(data []interface{}) (i []string) {
|
||||||
for _, param := range data {
|
for _, param := range data {
|
||||||
i = append(i, param.(string))
|
i = append(i, param.(string))
|
||||||
|
|||||||
Reference in New Issue
Block a user