Merge remote-tracking branch 'origin/v2.3.0release' into v2.3.0release

This commit is contained in:
Gordon
2022-08-21 19:07:03 +08:00
45 changed files with 3587 additions and 4983 deletions
+2 -2
View File
@@ -28,7 +28,7 @@ type UserRegisterResp struct {
type UserTokenReq struct {
Secret string `json:"secret" binding:"required,max=32"`
Platform int32 `json:"platform" binding:"required,min=1,max=8"`
Platform int32 `json:"platform" binding:"required,min=1,max=9"`
UserID string `json:"userID" binding:"required,min=1,max=64"`
LoginIp string `json:"loginIp"`
OperationID string `json:"operationID" binding:"required"`
@@ -40,7 +40,7 @@ type UserTokenResp struct {
}
type ForceLogoutReq struct {
Platform int32 `json:"platform" binding:"required,min=1,max=8"`
Platform int32 `json:"platform" binding:"required,min=1,max=9"`
FromUserID string `json:"fromUserID" binding:"required,min=1,max=64"`
OperationID string `json:"operationID" binding:"required"`
}
+1
View File
@@ -138,6 +138,7 @@ type config struct {
OpenImOrganizationPort []int `yaml:"openImOrganizationPort"`
OpenImConversationPort []int `yaml:"openImConversationPort"`
OpenImCachePort []int `yaml:"openImCachePort"`
OpenImRealTimeCommPort []int `yaml:"openImRealTimeCommPort"`
}
RpcRegisterName struct {
OpenImStatisticsName string `yaml:"openImStatisticsName"`
+8 -9
View File
@@ -11,11 +11,12 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"
go_redis "github.com/go-redis/redis/v8"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"strconv"
"time"
)
const (
@@ -230,16 +231,14 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string,
}
func (d *DataBases) DeleteMessageFromCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error {
ctx := context.Background()
var keys []string
for _, msg := range msgList {
key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq))
keys = append(keys, key)
err := d.RDB.Del(ctx, key).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, uid, err.Error(), msgList)
}
}
err := d.RDB.Del(ctx, keys...).Err()
if err != nil {
log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", keys, uid, err.Error(), msgList)
}
return err
return nil
}
func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error {
@@ -6,16 +6,18 @@ import (
"Open_IM/pkg/utils"
)
func SetConversation(conversation db.Conversation) error {
func SetConversation(conversation db.Conversation) (bool, error) {
var isUpdate bool
newConversation := conversation
if db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Find(&newConversation).RowsAffected == 0 {
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "not exist in db, create")
return db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error
return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Create(conversation).Error
// if exist, then update record
} else {
log.NewDebug("", utils.GetSelfFuncName(), "conversation", conversation, "exist in db, update")
//force update
return db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
isUpdate = true
return isUpdate, db.DB.MysqlDB.DefaultGormDB().Model(conversation).Where("owner_user_id = ? and conversation_id = ?", conversation.OwnerUserID, conversation.ConversationID).
Updates(map[string]interface{}{"recv_msg_opt": conversation.RecvMsgOpt, "is_pinned": conversation.IsPinned, "is_private_chat": conversation.IsPrivateChat,
"group_at_type": conversation.GroupAtType, "is_not_in_group": conversation.IsNotInGroup}).Error
}
@@ -83,12 +85,20 @@ func GetConversations(OwnerUserID string, conversationIDs []string) ([]db.Conver
err := db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Where("conversation_id IN (?) and owner_user_id=?", conversationIDs, OwnerUserID).Find(&conversations).Error
return conversations, err
}
func GetConversationsByConversationIDMultipleOwner(OwnerUserIDList []string, conversationID string) ([]db.Conversation, error) {
var conversations []db.Conversation
err := db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Where("owner_user_id IN (?) and conversation_id=?", OwnerUserIDList, conversationID).Find(&conversations).Error
return conversations, err
}
func UpdateColumnsConversations(ownerUserIDList []string, conversationID string, args map[string]interface{}) error {
return db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Where("owner_user_id IN (?) and conversation_id=?", ownerUserIDList, conversationID).Updates(args).Error
}
func GetConversationIDListByUserID(userID string) ([]string, error) {
var IDList []string
err := db.DB.MysqlDB.DefaultGormDB().Model(&db.Conversation{}).Where("user_id=?", userID).Pluck("conversation_id", &IDList).Error
return IDList, err
}
@@ -12,6 +12,11 @@ func GetRegister(account, areaCode, userID string) (*db.Register, error) {
userID, "", account, account, areaCode).Take(&r).Error
}
func GetRegisterInfo(userID string) (*db.Register, error) {
var r db.Register
return &r, db.DB.MysqlDB.DefaultGormDB().Table("registers").Where("user_id = ?", userID).Take(&r).Error
}
func SetPassword(account, password, ex, userID, areaCode, ip string) error {
r := db.Register{
Account: account,
+74
View File
@@ -32,6 +32,8 @@ const (
joinedSuperGroupListCache = "JOINED_SUPER_GROUP_LIST_CACHE:"
groupMemberListHashCache = "GROUP_MEMBER_LIST_HASH_CACHE:"
groupMemberNumCache = "GROUP_MEMBER_NUM_CACHE:"
conversationCache = "CONVERSATION_CACHE:"
conversationIDListCache = "CONVERSATION_ID_LIST_CACHE:"
)
func init() {
@@ -431,3 +433,75 @@ func GetGroupMemberNumFromCache(groupID string) (int64, error) {
func DelGroupMemberNumFromCache(groupID string) error {
return db.DB.Rc.TagAsDeleted(groupMemberNumCache + groupID)
}
func GetUserConversationIDListFromCache(userID string) ([]string, error) {
getConversationIDList := func() (string, error) {
conversationIDList, err := imdb.GetConversationIDListByUserID(userID)
if err != nil {
return "", utils.Wrap(err, "getConversationIDList failed")
}
bytes, err := json.Marshal(conversationIDList)
return string(bytes), utils.Wrap(err, "")
}
conversationIDListStr, err := db.DB.Rc.Fetch(conversationIDListCache+userID, time.Second*30*60, getConversationIDList)
var conversationIDList []string
err = json.Unmarshal([]byte(conversationIDListStr), &conversationIDList)
if err != nil {
return nil, err
}
return conversationIDList, nil
}
func DelUserConversationIDListFromCache(userID string) error {
return db.DB.Rc.TagAsDeleted(conversationIDListCache + userID)
}
func GetConversationFromCache(ownerUserID, conversationID string) (*db.Conversation, error) {
getConversation := func() (string, error) {
conversation, err := imdb.GetConversation(ownerUserID, conversationID)
if err != nil {
return "", utils.Wrap(err, "")
}
bytes, err := json.Marshal(conversation)
return string(bytes), utils.Wrap(err, "")
}
conversationStr, err := db.DB.Rc.Fetch(conversationCache+ownerUserID+":"+conversationID, time.Second*30*60, getConversation)
conversation := db.Conversation{}
err = json.Unmarshal([]byte(conversationStr), &conversation)
if err != nil {
return nil, err
}
return &conversation, nil
}
func GetConversationsFromCache(ownerUserID string, conversationIDList []string) ([]db.Conversation, error) {
var conversationList []db.Conversation
for _, conversationID := range conversationIDList {
conversation, err := GetConversationFromCache(ownerUserID, conversationID)
if err != nil {
return nil, utils.Wrap(err, "GetConversationFromCache failed")
}
conversationList = append(conversationList, *conversation)
}
return conversationList, nil
}
func GetUserAllConversationList(ownerUserID string) ([]db.Conversation, error) {
IDList, err := GetUserConversationIDListFromCache(ownerUserID)
if err != nil {
return nil, err
}
var conversationList []db.Conversation
for _, conversationID := range IDList {
conversation, err := GetConversationFromCache(ownerUserID, conversationID)
if err != nil {
return nil, utils.Wrap(err, "GetConversationFromCache failed")
}
conversationList = append(conversationList, *conversation)
}
return conversationList, nil
}
func DelConversationFromCache(ownerUserID, conversationID string) error {
return db.DB.Rc.TagAsDeleted(conversationCache + ownerUserID + ":" + conversationID)
}
+141 -7
View File
@@ -1,21 +1,24 @@
package getcdv3
import (
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/log"
"Open_IM/pkg/utils"
"context"
"fmt"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
//"go.etcd.io/etcd/mvcc/mvccpb"
//"google.golang.org/genproto/googleapis/ads/googleads/v1/services"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/resolver"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/resolver"
)
type Resolver struct {
@@ -66,7 +69,7 @@ func (r1 *Resolver) ResolveNow(rn resolver.ResolveNowOptions) {
func (r1 *Resolver) Close() {
}
func GetConn(schema, etcdaddr, serviceName string, operationID string) *grpc.ClientConn {
func getConn(schema, etcdaddr, serviceName string, operationID string) *grpc.ClientConn {
rwNameResolverMutex.RLock()
r, ok := nameResolver[schema+serviceName]
rwNameResolverMutex.RUnlock()
@@ -96,6 +99,99 @@ func GetConn(schema, etcdaddr, serviceName string, operationID string) *grpc.Cli
return r.grpcClientConn
}
func GetConfigConn(serviceName string, operationID string) *grpc.ClientConn {
rpcRegisterIP := config.Config.RpcRegisterIP
var err error
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error(operationID, "GetLocalIP failed ", err.Error())
return nil
}
}
var configPortList []int
//1
if config.Config.RpcRegisterName.OpenImUserName == serviceName {
configPortList = config.Config.RpcPort.OpenImUserPort
}
//2
if config.Config.RpcRegisterName.OpenImFriendName == serviceName {
configPortList = config.Config.RpcPort.OpenImFriendPort
}
//3
if config.Config.RpcRegisterName.OpenImMsgName == serviceName {
configPortList = config.Config.RpcPort.OpenImMessagePort
}
//4
if config.Config.RpcRegisterName.OpenImPushName == serviceName {
configPortList = config.Config.RpcPort.OpenImPushPort
}
//5
if config.Config.RpcRegisterName.OpenImRelayName == serviceName {
configPortList = config.Config.RpcPort.OpenImMessageGatewayPort
}
//6
if config.Config.RpcRegisterName.OpenImGroupName == serviceName {
configPortList = config.Config.RpcPort.OpenImGroupPort
}
//7
if config.Config.RpcRegisterName.OpenImAuthName == serviceName {
configPortList = config.Config.RpcPort.OpenImAuthPort
}
//8
if config.Config.RpcRegisterName.OpenImMessageCMSName == serviceName {
configPortList = config.Config.RpcPort.OpenImMessageCmsPort
}
//9
if config.Config.RpcRegisterName.OpenImAdminCMSName == serviceName {
configPortList = config.Config.RpcPort.OpenImAdminCmsPort
}
//10
if config.Config.RpcRegisterName.OpenImOfficeName == serviceName {
configPortList = config.Config.RpcPort.OpenImOfficePort
}
//11
if config.Config.RpcRegisterName.OpenImOrganizationName == serviceName {
configPortList = config.Config.RpcPort.OpenImOrganizationPort
}
//12
if config.Config.RpcRegisterName.OpenImConversationName == serviceName {
configPortList = config.Config.RpcPort.OpenImConversationPort
}
//13
if config.Config.RpcRegisterName.OpenImCacheName == serviceName {
configPortList = config.Config.RpcPort.OpenImCachePort
}
//14
if config.Config.RpcRegisterName.OpenImRealTimeCommName == serviceName {
configPortList = config.Config.RpcPort.OpenImRealTimeCommPort
}
if len(configPortList) == 0 {
log.Error(operationID, "len(configPortList) == 0 ")
return nil
}
target := rpcRegisterIP + ":" + utils.Int32ToString(int32(configPortList[0]))
log.Info(operationID, "rpcRegisterIP ", rpcRegisterIP, " port ", configPortList, " grpc target: ", target, " serviceName: ", serviceName)
conn, err := grpc.Dial(target, grpc.WithInsecure())
if err != nil {
log.Error(operationID, "grpc.Dail failed ", err.Error())
return nil
}
log.NewDebug(operationID, utils.GetSelfFuncName(), serviceName, conn)
return conn
}
func GetDefaultConn(schema, etcdaddr, serviceName string, operationID string) *grpc.ClientConn {
con := getConn(schema, etcdaddr, serviceName, operationID)
if con != nil {
return con
}
log.NewWarn(operationID, utils.GetSelfFuncName(), "conn is nil !!!!!", schema, etcdaddr, serviceName, operationID)
con = GetConfigConn(serviceName, operationID)
return con
}
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if r.cli == nil {
return nil, fmt.Errorf("etcd clientv3 client failed, etcd:%s", target)
@@ -181,7 +277,45 @@ func (r *Resolver) watch(prefix string, addrList []resolver.Address) {
}
}
func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
func GetDefaultGatewayConn4Unique(schema, etcdaddr, operationID string) []*grpc.ClientConn {
grpcConns := getConn4Unique(schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName)
if len(grpcConns) > 0 {
return grpcConns
}
log.NewWarn(operationID, utils.GetSelfFuncName(), " len(grpcConns) < 0 ", schema, etcdaddr, config.Config.RpcRegisterName.OpenImRelayName)
grpcConns = GetDefaultGatewayConn4UniqueFromcfg(operationID)
log.NewDebug(operationID, utils.GetSelfFuncName(), config.Config.RpcRegisterName.OpenImRelayName, grpcConns)
return grpcConns
}
func GetDefaultGatewayConn4UniqueFromcfg(operationID string) []*grpc.ClientConn {
rpcRegisterIP := config.Config.RpcRegisterIP
var err error
if config.Config.RpcRegisterIP == "" {
rpcRegisterIP, err = utils.GetLocalIP()
if err != nil {
log.Error("", "GetLocalIP failed ", err.Error())
return nil
}
}
var conns []*grpc.ClientConn
configPortList := config.Config.RpcPort.OpenImMessageGatewayPort
for _, port := range configPortList {
target := rpcRegisterIP + ":" + utils.Int32ToString(int32(port))
log.Info(operationID, "rpcRegisterIP ", rpcRegisterIP, " port ", configPortList, " grpc target: ", target, " serviceName: ", "msgGateway")
conn, err := grpc.Dial(target, grpc.WithInsecure())
if err != nil {
log.Error(operationID, "grpc.Dail failed ", err.Error())
continue
}
conns = append(conns, conn)
}
return conns
}
func getConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
gEtcdCli, err := clientv3.New(clientv3.Config{Endpoints: strings.Split(etcdaddr, ",")})
if err != nil {
log.Error("clientv3.New failed", err.Error())
@@ -215,7 +349,7 @@ func GetConn4Unique(schema, etcdaddr, servicename string) []*grpc.ClientConn {
allConn := make([]*grpc.ClientConn, 0)
for _, v := range allService {
r := GetConn(schema, etcdaddr, v, "0")
r := getConn(schema, etcdaddr, v, "0")
allConn = append(allConn, r)
}
@@ -228,7 +362,7 @@ var (
)
func GetconnFactory(schema, etcdaddr, servicename string) (*grpc.ClientConn, error) {
c := GetConn(schema, etcdaddr, servicename, "0")
c := getConn(schema, etcdaddr, servicename, "0")
if c != nil {
return c, nil
} else {
+2893 -4567
View File
File diff suppressed because it is too large Load Diff
+1
View File
@@ -188,6 +188,7 @@ message User{
int32 Gender = 12;
int32 LoginLimit = 13;
bool IsBlock = 14;
string RegisterIp = 15;
}
message GetUserByIdResp{