Compare commits

...

7 Commits

Author SHA1 Message Date
Xinwei Xiong (cubxxw) ad47590e13 feat: add openim docker
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:39:54 +08:00
Xinwei Xiong (cubxxw) a42a44e0a3 feat: add openim docker
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) d8838ee6b8 feat: add kafka and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) f480f52e2d feat: add zk and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) a23cbf13cf feat: add openim mongo and redis env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) 498e26a942 feat: add openim env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) e1990c179e feat: add openim server code
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
45 changed files with 599 additions and 890 deletions
@@ -1,139 +0,0 @@
# Copyright © 2023 OpenIM open source community. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Build OpenIM Web Docker image
on:
# schedule:
# - cron: '30 3 * * *'
push:
branches:
# - main
- release-*
tags:
- v*
workflow_dispatch:
env:
# Common versions
GO_VERSION: "1.20"
jobs:
build-openim-web-dockerhub:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# docker.io/openim/openim-web:latest
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v5.0.0
with:
images: openim/openim-web
# generate Docker tags based on the following events/attributes
tags: |
type=schedule
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./build/images/openim-tools/openim-web/Dockerfile
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-openim-web-aliyun:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-web:latest
- name: Extract metadata (tags, labels) for Docker
id: meta2
uses: docker/metadata-action@v5.0.0
with:
images: registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-web
- name: Log in to AliYun Docker Hub
uses: docker/login-action@v3
with:
registry: registry.cn-hangzhou.aliyuncs.com
username: ${{ secrets.ALIREGISTRY_USERNAME }}
password: ${{ secrets.ALIREGISTRY_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./build/images/openim-tools/openim-web/Dockerfile
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta2.outputs.tags }}
labels: ${{ steps.meta2.outputs.labels }}
build-openim-web-ghcr:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
# ghcr.io/openimsdk/openim-web:latest
- name: Extract metadata (tags, labels) for Docker
id: meta2
uses: docker/metadata-action@v5.0.0
with:
images: ghcr.io/openimsdk/openim-web
- name: Log in to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: ./build/images/openim-tools/openim-web/Dockerfile
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta2.outputs.tags }}
labels: ${{ steps.meta2.outputs.labels }}
+12 -5
View File
@@ -37,13 +37,20 @@ zookeeper:
###################### Mongo ######################
# MongoDB configuration
# If uri is not empty, it will be used directly
#
# MongoDB address for standalone setup, Mongos address for sharded cluster setup
# Default MongoDB database name
# Maximum connection pool size
# If uri is not empty, it will be used directly for the MongoDB connection.
# This is a complete MongoDB URI string.
# Example: mongodb://user:password@host1:port1,host2:port2/dbname?options
mongo:
uri: ${MONGO_URI}
# List of MongoDB server addresses.
# Used for constructing the MongoDB URI if 'uri' above is empty.
# For a standalone setup, specify the address of the single server.
# For a sharded cluster, specify the addresses of the Mongos servers.
# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ]
# Default MongoDB database name
# Maximum connection pool size
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ]
database: ${MONGO_DATABASE}
username: ${MONGO_USERNAME}
-5
View File
@@ -34,7 +34,6 @@ services:
ipv4_address: ${MONGO_NETWORK_ADDRESS:-172.28.0.2}
redis:
# image: redis:7.0.0
image: redis:${REDIS_IMAGE_VERSION:-7.0.0}
container_name: redis
ports:
@@ -53,7 +52,6 @@ services:
ipv4_address: ${REDIS_NETWORK_ADDRESS:-172.28.0.3}
zookeeper:
# image: bitnami/zookeeper:3.8
image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION:-3.8}
container_name: zookeeper
ports:
@@ -69,7 +67,6 @@ services:
ipv4_address: ${ZOOKEEPER_NETWORK_ADDRESS:-172.28.0.5}
kafka:
# image: 'bitnami/kafka:3.5.1'
image: 'bitnami/kafka:${KAFKA_IMAGE_VERSION:-3.5.1}'
container_name: kafka
restart: always
@@ -95,7 +92,6 @@ services:
ipv4_address: ${KAFKA_NETWORK_ADDRESS:-172.28.0.4}
minio:
# image: minio/minio
image: minio/minio:${MINIO_IMAGE_VERSION:-latest}
ports:
- "${MINIO_PORT:-10005}:9000"
@@ -114,7 +110,6 @@ services:
ipv4_address: ${MINIO_NETWORK_ADDRESS:-172.28.0.6}
openim-web:
# image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:latest
image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-web:${OPENIM_WEB_IMAGE_VERSION:-latest}
container_name: openim-web
environment:
+1
View File
@@ -27,6 +27,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/go-playground/validator/v10"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+2 -1
View File
@@ -16,9 +16,10 @@ package push
import (
"context"
"github.com/OpenIMSDK/tools/utils"
"sync"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant"
+2
View File
@@ -16,8 +16,10 @@ package friend
import (
"context"
pbfriend "github.com/OpenIMSDK/protocol/friend"
"github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/http"
+1
View File
@@ -16,6 +16,7 @@ package friend
import (
"context"
"github.com/OpenIMSDK/tools/tx"
"github.com/OpenIMSDK/protocol/sdkws"
+2 -1
View File
@@ -16,9 +16,10 @@ package group
import (
"context"
"github.com/OpenIMSDK/tools/log"
"time"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/protocol/wrapperspb"
+2 -1
View File
@@ -17,13 +17,14 @@ package group
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"math/big"
"math/rand"
"strconv"
"strings"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
pbconversation "github.com/OpenIMSDK/protocol/conversation"
"github.com/OpenIMSDK/protocol/wrapperspb"
"github.com/OpenIMSDK/tools/tx"
+5 -3
View File
@@ -16,16 +16,18 @@ package msg
import (
"context"
utils2 "github.com/OpenIMSDK/tools/utils"
"github.com/redis/go-redis/v9"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msg"
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
)
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) {
@@ -173,7 +175,7 @@ func (m *msgServer) MarkConversationAsRead(
m.conversationAndGetRecvID(conversation, req.UserID), seqs, hasReadSeq); err != nil {
return nil, err
}
} else if conversation.ConversationType == constant.SuperGroupChatType ||
conversation.ConversationType == constant.NotificationChatType {
if req.HasReadSeq > hasReadSeq {
@@ -222,4 +224,4 @@ func (m *msgServer) sendMarkAsReadNotification(
log.ZWarn(ctx, "send has read Receipt err", err)
}
return nil
}
}
+2
View File
@@ -16,6 +16,7 @@ package msg
import (
"context"
"github.com/OpenIMSDK/protocol/sdkws"
"google.golang.org/protobuf/proto"
@@ -24,6 +25,7 @@ import (
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
+1
View File
@@ -16,6 +16,7 @@ package user
import (
"context"
pbuser "github.com/OpenIMSDK/protocol/user"
"github.com/OpenIMSDK/tools/utils"
+18 -18
View File
@@ -16,11 +16,11 @@ package apistruct
type PictureBaseInfo struct {
UUID string `mapstructure:"uuid"`
Type string `mapstructure:"type" validate:"required"`
Type string `mapstructure:"type" validate:"required"`
Size int64 `mapstructure:"size"`
Width int32 `mapstructure:"width" validate:"required"`
Width int32 `mapstructure:"width" validate:"required"`
Height int32 `mapstructure:"height" validate:"required"`
Url string `mapstructure:"url" validate:"required"`
Url string `mapstructure:"url" validate:"required"`
}
type PictureElem struct {
@@ -34,28 +34,28 @@ type SoundElem struct {
SoundPath string `mapstructure:"soundPath"`
SourceURL string `mapstructure:"sourceUrl" validate:"required"`
DataSize int64 `mapstructure:"dataSize"`
Duration int64 `mapstructure:"duration" validate:"required,min=1"`
Duration int64 `mapstructure:"duration" validate:"required,min=1"`
}
type VideoElem struct {
VideoPath string `mapstructure:"videoPath" `
VideoPath string `mapstructure:"videoPath"`
VideoUUID string `mapstructure:"videoUUID"`
VideoURL string `mapstructure:"videoUrl" validate:"required"`
VideoType string `mapstructure:"videoType" validate:"required"`
VideoSize int64 `mapstructure:"videoSize" validate:"required"`
Duration int64 `mapstructure:"duration" validate:"required"`
VideoURL string `mapstructure:"videoUrl" validate:"required"`
VideoType string `mapstructure:"videoType" validate:"required"`
VideoSize int64 `mapstructure:"videoSize" validate:"required"`
Duration int64 `mapstructure:"duration" validate:"required"`
SnapshotPath string `mapstructure:"snapshotPath"`
SnapshotUUID string `mapstructure:"snapshotUUID"`
SnapshotSize int64 `mapstructure:"snapshotSize"`
SnapshotURL string `mapstructure:"snapshotUrl" validate:"required"`
SnapshotWidth int32 `mapstructure:"snapshotWidth" validate:"required"`
SnapshotURL string `mapstructure:"snapshotUrl" validate:"required"`
SnapshotWidth int32 `mapstructure:"snapshotWidth" validate:"required"`
SnapshotHeight int32 `mapstructure:"snapshotHeight" validate:"required"`
}
type FileElem struct {
FilePath string `mapstructure:"filePath" `
FilePath string `mapstructure:"filePath"`
UUID string `mapstructure:"uuid"`
SourceURL string `mapstructure:"sourceUrl" validate:"required"`
FileName string `mapstructure:"fileName" validate:"required"`
FileSize int64 `mapstructure:"fileSize" validate:"required"`
FileName string `mapstructure:"fileName" validate:"required"`
FileSize int64 `mapstructure:"fileSize" validate:"required"`
}
type AtElem struct {
Text string `mapstructure:"text"`
@@ -63,9 +63,9 @@ type AtElem struct {
IsAtSelf bool `mapstructure:"isAtSelf"`
}
type LocationElem struct {
Description string `mapstructure:"description" `
Longitude float64 `mapstructure:"longitude" validate:"required"`
Latitude float64 `mapstructure:"latitude" validate:"required"`
Description string `mapstructure:"description"`
Longitude float64 `mapstructure:"longitude" validate:"required"`
Latitude float64 `mapstructure:"latitude" validate:"required"`
}
type CustomElem struct {
Data string `mapstructure:"data" validate:"required"`
@@ -87,7 +87,7 @@ type OANotificationElem struct {
NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"`
Text string `mapstructure:"text" json:"text" validate:"required"`
Url string `mapstructure:"url" json:"url"`
MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"`
MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"`
PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"`
SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"`
VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"`
+1
View File
@@ -16,6 +16,7 @@ package config
import (
"bytes"
"github.com/OpenIMSDK/tools/discoveryregistry"
"gopkg.in/yaml.v3"
)
+2 -2
View File
@@ -35,7 +35,7 @@ const (
DefaultFolderPath = "../config/"
)
// return absolude path join ../config/, this is k8s container config path
// return absolude path join ../config/, this is k8s container config path.
func GetDefaultConfigPath() string {
b, err := filepath.Abs(os.Args[0])
if err != nil {
@@ -45,7 +45,7 @@ func GetDefaultConfigPath() string {
return filepath.Join(filepath.Dir(b), "../config/")
}
// getProjectRoot returns the absolute path of the project root directory
// getProjectRoot returns the absolute path of the project root directory.
func GetProjectRoot() string {
b, _ := filepath.Abs(os.Args[0])
+2 -1
View File
@@ -15,9 +15,10 @@
package convert
import (
"github.com/OpenIMSDK/protocol/sdkws"
"time"
"github.com/OpenIMSDK/protocol/sdkws"
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
)
+1 -1
View File
@@ -39,7 +39,7 @@ const (
groupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH2:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:"
//groupOwnerInfoKey = "GROUP_OWNER_INFO:"
//groupOwnerInfoKey = "GROUP_OWNER_INFO:".
joinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
+21 -3
View File
@@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/redis/go-redis/v9"
@@ -43,6 +45,9 @@ func NewRedis() (redis.UniversalClient, error) {
return redisClient, nil
}
// Read configuration from environment variables
overrideConfigFromEnv()
if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty")
}
@@ -60,9 +65,9 @@ func NewRedis() (redis.UniversalClient, error) {
rdb = redis.NewClient(&redis.Options{
Addr: config.Config.Redis.Address[0],
Username: config.Config.Redis.Username,
Password: config.Config.Redis.Password, // no password set
DB: 0, // use default DB
PoolSize: 100, // connection pool size
Password: config.Config.Redis.Password,
DB: 0, // use default DB
PoolSize: 100, // connection pool size
MaxRetries: maxRetry,
})
}
@@ -78,3 +83,16 @@ func NewRedis() (redis.UniversalClient, error) {
redisClient = rdb
return rdb, err
}
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv() {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
}
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Config.Redis.Username = envUser
}
if envPass := os.Getenv("REDIS_PASSWORD"); envPass != "" {
config.Config.Redis.Password = envPass
}
}
+85 -65
View File
@@ -1,27 +1,13 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package unrelation
import (
"context"
"fmt"
"os"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -34,7 +20,8 @@ import (
)
const (
maxRetry = 10 // number of retries
maxRetry = 10 // number of retries
mongoConnTimeout = 10 * time.Second
)
type Mongo struct {
@@ -44,90 +31,123 @@ type Mongo struct {
// NewMongo Initialize MongoDB connection.
func NewMongo() (*Mongo, error) {
specialerror.AddReplace(mongo.ErrNoDocuments, errs.ErrRecordNotFound)
uri := "mongodb://sample.host:27017/?maxPoolSize=20&w=majority"
if config.Config.Mongo.Uri != "" {
uri = config.Config.Mongo.Uri
} else {
mongodbHosts := ""
for i, v := range config.Config.Mongo.Address {
if i == len(config.Config.Mongo.Address)-1 {
mongodbHosts += v
} else {
mongodbHosts += v + ","
}
}
if config.Config.Mongo.Password != "" && config.Config.Mongo.Username != "" {
uri = fmt.Sprintf("mongodb://%s:%s@%s/%s?maxPoolSize=%d&authSource=admin",
config.Config.Mongo.Username, config.Config.Mongo.Password, mongodbHosts,
config.Config.Mongo.Database, config.Config.Mongo.MaxPoolSize)
} else {
uri = fmt.Sprintf("mongodb://%s/%s/?maxPoolSize=%d&authSource=admin",
mongodbHosts, config.Config.Mongo.Database,
config.Config.Mongo.MaxPoolSize)
}
}
uri := buildMongoURI()
fmt.Println("mongo:", uri)
var mongoClient *mongo.Client
var err error = nil
var err error
// Retry connecting to MongoDB
for i := 0; i <= maxRetry; i++ {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), mongoConnTimeout)
defer cancel()
mongoClient, err = mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err == nil {
return &Mongo{db: mongoClient}, nil
}
if cmdErr, ok := err.(mongo.CommandError); ok {
if cmdErr.Code == 13 || cmdErr.Code == 18 {
return nil, err
} else {
fmt.Printf("Failed to connect to MongoDB: %s\n", err)
}
if shouldRetry(err) {
fmt.Printf("Failed to connect to MongoDB, retrying: %s\n", err)
time.Sleep(time.Second) // exponential backoff could be implemented here
continue
}
return nil, err
}
return nil, err
}
func buildMongoURI() string {
uri := os.Getenv("MONGO_URI")
if uri != "" {
return uri
}
username := os.Getenv("MONGO_USERNAME")
password := os.Getenv("MONGO_PASSWORD")
address := os.Getenv("MONGO_ADDRESS")
port := os.Getenv("MONGO_PORT")
database := os.Getenv("MONGO_DATABASE")
maxPoolSize := os.Getenv("MONGO_MAX_POOL_SIZE")
if username == "" {
username = config.Config.Mongo.Username
}
if password == "" {
password = config.Config.Mongo.Password
}
if address == "" {
address = strings.Join(config.Config.Mongo.Address, ",")
} else if port != "" {
address = fmt.Sprintf("%s:%s", address, port)
}
if database == "" {
database = config.Config.Mongo.Database
}
if maxPoolSize == "" {
maxPoolSize = fmt.Sprint(config.Config.Mongo.MaxPoolSize)
}
uriFormat := "mongodb://%s/%s?maxPoolSize=%s&authSource=admin"
if username != "" && password != "" {
uriFormat = "mongodb://%s:%s@%s/%s?maxPoolSize=%s&authSource=admin"
return fmt.Sprintf(uriFormat, username, password, address, database, maxPoolSize)
}
return fmt.Sprintf(uriFormat, address, database, maxPoolSize)
}
func shouldRetry(err error) bool {
if cmdErr, ok := err.(mongo.CommandError); ok {
return cmdErr.Code != 13 && cmdErr.Code != 18
}
return true
}
// GetClient returns the MongoDB client.
func (m *Mongo) GetClient() *mongo.Client {
return m.db
}
// GetDatabase returns the specific database from MongoDB.
func (m *Mongo) GetDatabase() *mongo.Database {
return m.db.Database(config.Config.Mongo.Database)
}
// CreateMsgIndex creates an index for messages in MongoDB.
func (m *Mongo) CreateMsgIndex() error {
return m.createMongoIndex(unrelation.Msg, true, "doc_id")
}
// createMongoIndex creates an index in a MongoDB collection.
func (m *Mongo) createMongoIndex(collection string, isUnique bool, keys ...string) error {
db := m.db.Database(config.Config.Mongo.Database).Collection(collection)
db := m.GetDatabase().Collection(collection)
opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
indexView := db.Indexes()
keysDoc := bson.D{}
// create composite indexes
for _, key := range keys {
if strings.HasPrefix(key, "-") {
keysDoc = append(keysDoc, bson.E{Key: strings.TrimLeft(key, "-"), Value: -1})
// keysDoc = keysDoc.Append(strings.TrimLeft(key, "-"), bsonx.Int32(-1))
} else {
keysDoc = append(keysDoc, bson.E{Key: key, Value: 1})
// keysDoc = keysDoc.Append(key, bsonx.Int32(1))
}
}
// create index
keysDoc := buildIndexKeys(keys)
index := mongo.IndexModel{
Keys: keysDoc,
}
if isUnique {
index.Options = options.Index().SetUnique(true)
}
result, err := indexView.CreateOne(
context.Background(),
index,
opts,
)
_, err := indexView.CreateOne(context.Background(), index, opts)
if err != nil {
return utils.Wrap(err, result)
return utils.Wrap(err, "CreateIndex")
}
return nil
}
// buildIndexKeys builds the BSON document for index keys.
func buildIndexKeys(keys []string) bson.D {
keysDoc := bson.D{}
for _, key := range keys {
direction := 1 // default direction is ascending
if strings.HasPrefix(key, "-") {
direction = -1 // descending order for prefixed with "-"
key = strings.TrimLeft(key, "-")
}
keysDoc = append(keysDoc, bson.E{Key: key, Value: direction})
}
return keysDoc
}
@@ -1,93 +1,22 @@
package discoveryregister
import (
"context"
"errors"
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes"
"github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/zookeeper"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type.
func NewDiscoveryRegister(envType string) (discoveryregistry.SvcDiscoveryRegistry, error) {
var client discoveryregistry.SvcDiscoveryRegistry
var err error
switch envType {
case "zookeeper":
client, err = openkeeper.NewClient(config.Config.Zookeeper.ZkAddr, config.Config.Zookeeper.Schema,
openkeeper.WithFreq(time.Hour), openkeeper.WithUserNameAndPassword(
config.Config.Zookeeper.Username,
config.Config.Zookeeper.Password,
), openkeeper.WithRoundRobin(), openkeeper.WithTimeout(10), openkeeper.WithLogger(log.NewZkLogger()))
return zookeeper.NewZookeeperDiscoveryRegister()
case "k8s":
client, err = NewK8sDiscoveryRegister()
return kubernetes.NewK8sDiscoveryRegister()
default:
client = nil
err = errors.New("envType not correct")
return nil, errors.New("envType not correct")
}
return client, err
}
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
}
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
cli.rpcRegisterAddr = serviceName
return nil
}
func (cli *K8sDR) UnRegister() error {
return nil
}
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
func (cli *K8sDR) Close() {
return
}
@@ -1,407 +1,45 @@
package discoveryregister
import (
"context"
"reflect"
"os"
"testing"
"github.com/OpenIMSDK/tools/discoveryregistry"
"google.golang.org/grpc"
"github.com/stretchr/testify/assert"
)
func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
func TestNewDiscoveryRegister(t *testing.T) {
type args struct {
envType string
}
tests := []struct {
name string
args args
want discoveryregistry.SvcDiscoveryRegistry
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewDiscoveryRegister(tt.args.envType)
if (err != nil) != tt.wantErr {
t.Errorf("NewDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewDiscoveryRegister() = %v, want %v", got, tt.want)
}
})
}
}
setupTestEnvironment()
func TestNewK8sDiscoveryRegister(t *testing.T) {
tests := []struct {
name string
want discoveryregistry.SvcDiscoveryRegistry
wantErr bool
envType string
expectedError bool
expectedResult bool
}{
// TODO: Add test cases.
{"zookeeper", false, true},
{"k8s", false, true}, // 假设 k8s 配置也已正确设置
{"invalid", true, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewK8sDiscoveryRegister()
if (err != nil) != tt.wantErr {
t.Errorf("NewK8sDiscoveryRegister() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewK8sDiscoveryRegister() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_Register(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
serviceName string
host string
port int
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.Register(tt.args.serviceName, tt.args.host, tt.args.port, tt.args.opts...); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.Register() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
for _, test := range tests {
client, err := NewDiscoveryRegister(test.envType)
func TestK8sDR_UnRegister(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
if test.expectedResult {
assert.Implements(t, (*discoveryregistry.SvcDiscoveryRegistry)(nil), client)
} else {
assert.Nil(t, client)
}
if err := cli.UnRegister(); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.UnRegister() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_CreateRpcRootNodes(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
serviceNames []string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.CreateRpcRootNodes(tt.args.serviceNames); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.CreateRpcRootNodes() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_RegisterConf2Registry(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
key string
conf []byte
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if err := cli.RegisterConf2Registry(tt.args.key, tt.args.conf); (err != nil) != tt.wantErr {
t.Errorf("K8sDR.RegisterConf2Registry() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestK8sDR_GetConfFromRegistry(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
key string
}
tests := []struct {
name string
fields fields
args args
want []byte
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConfFromRegistry(tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConfFromRegistry() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConfFromRegistry() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetConns(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
ctx context.Context
serviceName string
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
want []*grpc.ClientConn
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConns(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConns() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConns() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetConn(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
ctx context.Context
serviceName string
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
want *grpc.ClientConn
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
got, err := cli.GetConn(tt.args.ctx, tt.args.serviceName, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("K8sDR.GetConn() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetConn() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_GetSelfConnTarget(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
want string
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if got := cli.GetSelfConnTarget(); got != tt.want {
t.Errorf("K8sDR.GetSelfConnTarget() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_AddOption(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
opts []grpc.DialOption
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.AddOption(tt.args.opts...)
})
}
}
func TestK8sDR_CloseConn(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
type args struct {
conn *grpc.ClientConn
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.CloseConn(tt.args.conn)
})
}
}
func TestK8sDR_GetClientLocalConns(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
want map[string][]*grpc.ClientConn
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
if got := cli.GetClientLocalConns(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("K8sDR.GetClientLocalConns() = %v, want %v", got, tt.want)
}
})
}
}
func TestK8sDR_Close(t *testing.T) {
type fields struct {
options []grpc.DialOption
rpcRegisterAddr string
}
tests := []struct {
name string
fields fields
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cli := &K8sDR{
options: tt.fields.options,
rpcRegisterAddr: tt.fields.rpcRegisterAddr,
}
cli.Close()
})
}
}
}
@@ -0,0 +1,92 @@
package kubernetes
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/OpenIMSDK/tools/discoveryregistry"
)
// K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct {
options []grpc.DialOption
rpcRegisterAddr string
}
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
return &K8sDR{}, nil
}
// Register registers a service with Kubernetes.
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
cli.rpcRegisterAddr = serviceName
return nil
}
// UnRegister removes a service registration from Kubernetes.
func (cli *K8sDR) UnRegister() error {
return nil
}
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil
}
// RegisterConf2Registry registers a configuration to the registry.
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil
}
// GetConfFromRegistry retrieves a configuration from the registry.
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil
}
// GetConns returns a list of gRPC client connections for a given service.
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
}
// GetConn returns a single gRPC client connection for a given service.
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
}
// GetSelfConnTarget returns the connection target of the client itself.
func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr
}
// AddOption adds gRPC dial options to the client.
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...)
}
// CloseConn closes a given gRPC client connection.
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close()
}
// do not use this method for call rpc.
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil
}
// Close closes the K8sDR client.
func (cli *K8sDR) Close() {
// Close any open resources here (if applicable)
return
}
@@ -0,0 +1,47 @@
package zookeeper
import (
"os"
"strings"
"time"
"github.com/OpenIMSDK/tools/discoveryregistry"
openkeeper "github.com/OpenIMSDK/tools/discoveryregistry/zookeeper"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
// NewZookeeperDiscoveryRegister creates a new instance of ZookeeperDR for Zookeeper service discovery and registration.
func NewZookeeperDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
schema := getEnv("ZOOKEEPER_SCHEMA", config.Config.Zookeeper.Schema)
zkAddr := getZkAddrFromEnv(config.Config.Zookeeper.ZkAddr)
username := getEnv("ZOOKEEPER_USERNAME", config.Config.Zookeeper.Username)
password := getEnv("ZOOKEEPER_PASSWORD", config.Config.Zookeeper.Password)
return openkeeper.NewClient(
zkAddr,
schema,
openkeeper.WithFreq(time.Hour),
openkeeper.WithUserNameAndPassword(username, password),
openkeeper.WithRoundRobin(),
openkeeper.WithTimeout(10),
openkeeper.WithLogger(log.NewZkLogger()),
)
}
// getEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getEnv(key, fallback string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return fallback
}
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string {
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
return strings.Split(value, ",")
}
return fallback
}
+87 -65
View File
@@ -1,17 +1,3 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
@@ -21,19 +7,18 @@ import (
"strings"
"time"
"github.com/IBM/sarama"
"github.com/OpenIMSDK/protocol/constant"
log "github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/log"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/IBM/sarama"
"google.golang.org/protobuf/proto"
)
const (
maxRetry = 10 // number of retries
maxRetry = 10 // Maximum number of retries for producer creation
)
var errEmptyMsg = errors.New("binary msg is empty")
@@ -45,62 +30,85 @@ type Producer struct {
producer sarama.SyncProducer
}
// NewKafkaProducer Initialize kafka producer.
// NewKafkaProducer initializes a new Kafka producer.
func NewKafkaProducer(addr []string, topic string) *Producer {
p := Producer{}
p.config = sarama.NewConfig() // Instantiate a sarama Config
p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully
p := Producer{
addr: addr,
topic: topic,
config: sarama.NewConfig(),
}
// Set producer return flags
p.config.Producer.Return.Successes = true
p.config.Producer.Return.Errors = true
p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
var producerAck = sarama.WaitForAll // default: WaitForAll
switch strings.ToLower(config.Config.Kafka.ProducerAck) {
case "no_response":
producerAck = sarama.NoResponse
case "wait_for_local":
producerAck = sarama.WaitForLocal
case "wait_for_all":
producerAck = sarama.WaitForAll
}
p.config.Producer.RequiredAcks = producerAck
// Set partitioner strategy
p.config.Producer.Partitioner = sarama.NewHashPartitioner
var compress = sarama.CompressionNone // default: no compress
_ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType)))
p.config.Producer.Compression = compress
// Configure producer acknowledgement level
configureProducerAck(&p, config.Config.Kafka.ProducerAck)
if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" {
// Configure message compression
configureCompression(&p, config.Config.Kafka.CompressType)
// Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
// Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" {
p.config.Net.SASL.Enable = true
p.config.Net.SASL.User = config.Config.Kafka.Username
p.config.Net.SASL.Password = config.Config.Kafka.Password
p.config.Net.SASL.User = kafkaUsername
p.config.Net.SASL.Password = kafkaPassword
}
p.addr = addr
p.topic = topic
// Set the Kafka address
p.addr = []string{kafkaAddr}
// Set up TLS configuration (if required)
SetupTLSConfig(p.config)
var producer sarama.SyncProducer
// Create the producer with retries
var err error
for i := 0; i <= maxRetry; i++ {
producer, err = sarama.NewSyncProducer(p.addr, p.config) // Initialize the client
p.producer, err = sarama.NewSyncProducer(p.addr, p.config)
if err == nil {
p.producer = producer
return &p
}
//TODO If the password is wrong, exit directly
//if packetErr, ok := err.(*sarama.PacketEncodingError); ok {
//if _, ok := packetErr.Err.(sarama.AuthenticationError); ok {
// fmt.Println("Kafka password is wrong.")
//}
//} else {
// fmt.Printf("Failed to create Kafka producer: %v\n", err)
//}
time.Sleep(time.Duration(1) * time.Second)
time.Sleep(1 * time.Second) // Wait before retrying
}
// Panic if unable to create producer after retries
if err != nil {
panic(err.Error())
panic("Failed to create Kafka producer: " + err.Error())
}
p.producer = producer
return &p
}
// configureProducerAck configures the producer's acknowledgement level.
func configureProducerAck(p *Producer, ackConfig string) {
switch strings.ToLower(ackConfig) {
case "no_response":
p.config.Producer.RequiredAcks = sarama.NoResponse
case "wait_for_local":
p.config.Producer.RequiredAcks = sarama.WaitForLocal
case "wait_for_all":
p.config.Producer.RequiredAcks = sarama.WaitForAll
default:
p.config.Producer.RequiredAcks = sarama.WaitForAll
}
}
// configureCompression configures the message compression type for the producer.
func configureCompression(p *Producer, compressType string) {
var compress sarama.CompressionCodec = sarama.CompressionNone
compress.UnmarshalText(bytes.ToLower([]byte(compressType)))
p.config.Producer.Compression = compress
}
// GetMQHeaderWithContext extracts message queue headers from the context.
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
if err != nil {
@@ -111,22 +119,23 @@ func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error)
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
{Key: []byte(constant.ConnID), Value: []byte(connID)},
}, err
}, nil
}
// GetContextWithMQHeader creates a context from message queue headers.
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
var values []string
for _, recordHeader := range header {
values = append(values, string(recordHeader.Value))
}
return mcontext.WithMustInfoCtx(values) // TODO
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
}
// SendMessage sends a message to the Kafka topic configured in the Producer.
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
log.ZDebug(ctx, "SendMessage", "msg", msg, "topic", p.topic, "key", key)
kMsg := &sarama.ProducerMessage{}
kMsg.Topic = p.topic
kMsg.Key = sarama.StringEncoder(key)
// Marshal the protobuf message
bMsg, err := proto.Marshal(msg)
if err != nil {
return 0, 0, utils.Wrap(err, "kafka proto Marshal err")
@@ -134,20 +143,33 @@ func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Messag
if len(bMsg) == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "")
}
kMsg.Value = sarama.ByteEncoder(bMsg)
// Prepare Kafka message
kMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(bMsg),
}
// Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, utils.Wrap(errEmptyMsg, "")
}
kMsg.Metadata = ctx
// Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx)
if err != nil {
return 0, 0, utils.Wrap(err, "")
}
kMsg.Headers = header
// Send the message
partition, offset, err := p.producer.SendMessage(kMsg)
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key ", kMsg.Key, "key length", kMsg.Value.Length())
if err != nil {
log.ZWarn(ctx, "p.producer.SendMessage error", err)
return 0, 0, utils.Wrap(err, "")
}
return partition, offset, utils.Wrap(err, "")
log.ZDebug(ctx, "ByteEncoder SendMessage end", "key", kMsg.Key, "key length", kMsg.Value.Length())
return partition, offset, nil
}
+11
View File
@@ -15,6 +15,8 @@
package kafka
import (
"os"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -33,3 +35,12 @@ func SetupTLSConfig(cfg *sarama.Config) {
)
}
}
// getEnvOrConfig returns the value of the environment variable if it exists,
// otherwise, it returns the value from the configuration file.
func getEnvOrConfig(envName string, configValue string) string {
if value, exists := os.LookupEnv(envName); exists {
return value
}
return configValue
}
+1 -1
View File
@@ -4,7 +4,7 @@ import ginProm "github.com/openimsdk/open-im-server/v3/pkg/common/ginprometheus"
/*
labels := prometheus.Labels{"label_one": "any", "label_two": "value"}
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc()
ApiCustomCnt.MetricCollector.(*prometheus.CounterVec).With(labels).Inc().
*/
var (
ApiCustomCnt = &ginProm.Metric{
+1 -1
View File
@@ -15,7 +15,7 @@ package version
// When releasing a new Kubernetes version, this file is updated by
// build/mark_new_version.sh to reflect the new version, and then a
// git annotated tag (using format vX.Y where X == Major version and Y
// == Minor version) is created to point to the commit that updates
// == Minor version) is created to point to the commit that updates.
var (
// TODO: Deprecate gitMajor and gitMinor, use only gitVersion
// instead. First step in deprecation, keep the fields but make
+2 -2
View File
@@ -25,7 +25,7 @@ func Get() Info {
}
}
// GetClientVersion returns the git version of the OpenIM client repository
// GetClientVersion returns the git version of the OpenIM client repository.
func GetClientVersion() (*OpenIMClientVersion, error) {
clientVersion, err := getClientVersion()
if err != nil {
@@ -52,7 +52,7 @@ func getClientVersion() (string, error) {
return ref.Hash().String(), nil
}
// GetSingleVersion returns single version of sealer
// GetSingleVersion returns single version of sealer.
func GetSingleVersion() string {
return gitVersion
}
+2
View File
@@ -28,6 +28,8 @@ openim::log::info "\n# Use Docker to start all openim service"
trap 'openim::util::onCtrlC' INT
"${OPENIM_ROOT}"/scripts/init-config.sh --skip
"${OPENIM_ROOT}"/scripts/start-all.sh
sleep 5
+102 -69
View File
@@ -1,17 +1,4 @@
#!/usr/bin/env bash
# Copyright © 2023 OpenIM. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script automatically initializes various configuration files and can generate example files.
@@ -48,7 +35,8 @@ declare -A EXAMPLES=(
FORCE_OVERWRITE=false
SKIP_EXISTING=false
GENERATE_EXAMPLES=false
CLEAN_ENV_EXAMPLES=false
CLEAN_CONFIG=false
CLEAN_EXAMPLES=false
# Function to display help information
show_help() {
@@ -58,59 +46,45 @@ show_help() {
echo " --force Overwrite existing files without prompt"
echo " --skip Skip generation if file exists"
echo " --examples Generate example files"
echo " --clean-env-examples Generate example files in a clean environment"
echo " --clean-config Clean all configuration files"
echo " --clean-examples Clean all example files"
}
# Function to generate configuration files
generate_config_files() {
# Loop through each template in TEMPLATES
for template in "${!TEMPLATES[@]}"; do
# Read the corresponding output files for the template
IFS=';' read -ra OUTPUT_FILES <<< "${TEMPLATES[$template]}"
for output_file in "${OUTPUT_FILES[@]}"; do
# Check if the output file already exists
if [[ -f "${output_file}" ]]; then
# Handle existing file based on command-line options
if [[ "${FORCE_OVERWRITE}" == true ]]; then
openim::log::info "Force overwriting ${output_file}."
elif [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Skipping generation of ${output_file} as it already exists."
local output_file="${TEMPLATES[$template]}"
if [[ -f "${output_file}" ]]; then
if [[ "${FORCE_OVERWRITE}" == true ]]; then
openim::log::info "Force overwriting ${output_file}."
elif [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Skipping generation of ${output_file} as it already exists."
continue
else
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
read -r -n 1 REPLY
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
openim::log::info "Skipping generation of ${output_file}."
continue
else
# Ask user for confirmation to overwrite
echo -n "File ${output_file} already exists. Overwrite? (Y/N): "
read -r -n 1 REPLY
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
openim::log::info "Skipping generation of ${output_file}."
continue
fi
fi
fi
# Process the template file to generate the output file
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
openim::log::error "genconfig.sh script not found"
exit 1
else
if [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Generating ${output_file} as it does not exist."
fi
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
openim::log::error "Error processing template file ${template}"
exit 1
}
sleep 0.5
done
done
}
# Function to generate example files
generate_example_files() {
for template in "${!EXAMPLES[@]}"; do
local example_file="${EXAMPLES[$template]}"
if [[ ! -f "${example_file}" ]]; then
openim::log::info "Generating example file: ${example_file} from ${template}..."
cp "${template}" "${example_file}"
fi
openim::log::info "⌚ Working with template file: ${template} to generate ${output_file}..."
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
openim::log::error "genconfig.sh script not found"
exit 1
fi
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
openim::log::error "Error processing template file ${template}"
exit 1
}
sleep 0.5
done
}
@@ -120,7 +94,8 @@ declare -A env_vars=(
["LOG_STORAGE_LOCATION"]="../logs/"
)
generate_clean_environment_examples() {
# Function to generate example files
generate_example_files() {
env_cmd="env -i"
for var in "${!env_vars[@]}"; do
env_cmd+=" $var='${env_vars[$var]}'"
@@ -128,12 +103,56 @@ generate_clean_environment_examples() {
for template in "${!EXAMPLES[@]}"; do
local example_file="${EXAMPLES[$template]}"
openim::log::info "Generating example file: ${example_file} from ${template}..."
if [[ -f "${example_file}" ]]; then
if [[ "${FORCE_OVERWRITE}" == true ]]; then
openim::log::info "Force overwriting example file: ${example_file}."
elif [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Skipping generation of example file: ${example_file} as it already exists."
continue
else
echo -n "Example file ${example_file} already exists. Overwrite? (Y/N): "
read -r -n 1 REPLY
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
openim::log::info "Skipping generation of example file: ${example_file}."
continue
fi
fi
elif [[ "${SKIP_EXISTING}" == true ]]; then
openim::log::info "Generating example file: ${example_file} as it does not exist."
fi
openim::log::info "⌚ Working with template file: ${template} to generate example file: ${example_file}..."
if [[ ! -f "${OPENIM_ROOT}/scripts/genconfig.sh" ]]; then
openim::log::error "genconfig.sh script not found"
exit 1
fi
eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${example_file}'" || {
openim::log::error "Error processing template file ${template}"
exit 1
}
sleep 0.5
done
}
# Function to clean configuration files
clean_config_files() {
for output_file in "${TEMPLATES[@]}"; do
if [[ -f "${output_file}" ]]; then
rm -f "${output_file}"
openim::log::info "Removed configuration file: ${output_file}"
fi
done
}
# Function to clean example files
clean_example_files() {
for example_file in "${EXAMPLES[@]}"; do
if [[ -f "${example_file}" ]]; then
rm -f "${example_file}"
openim::log::info "Removed example file: ${example_file}"
fi
done
}
@@ -155,8 +174,12 @@ while [[ $# -gt 0 ]]; do
GENERATE_EXAMPLES=true
shift
;;
--clean-env-examples)
CLEAN_ENV_EXAMPLES=true
--clean-config)
CLEAN_CONFIG=true
shift
;;
--clean-examples)
CLEAN_EXAMPLES=true
shift
;;
*)
@@ -167,19 +190,29 @@ while [[ $# -gt 0 ]]; do
esac
done
# Clean configuration files if --clean-config option is provided
if [[ "${CLEAN_CONFIG}" == true ]]; then
clean_config_files
fi
# Clean example files if --clean-examples option is provided
if [[ "${CLEAN_EXAMPLES}" == true ]]; then
clean_example_files
fi
# Generate configuration files if requested
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]]; then
if [[ "${FORCE_OVERWRITE}" == true || "${SKIP_EXISTING}" == false ]] && [[ "${CLEAN_CONFIG}" == false ]]; then
generate_config_files
fi
# Generate configuration files if requested
if [[ "${SKIP_EXISTING}" == true ]]; then
generate_config_files
fi
# Generate example files if --examples option is provided
if [[ "${GENERATE_EXAMPLES}" == true ]]; then
if [[ "${GENERATE_EXAMPLES}" == true ]] && [[ "${CLEAN_EXAMPLES}" == false ]]; then
generate_example_files
fi
# Generate example files in a clean environment if --clean-env-examples option is provided
if [[ "${CLEAN_ENV_EXAMPLES}" == true ]]; then
generate_clean_environment_examples
fi
openim::log::success "Configuration and example files generation complete!"
openim::log::success "Configuration and example files operation complete!"
+7 -7
View File
@@ -9,7 +9,7 @@ import (
"net/http"
)
// API endpoints and other constants
// API endpoints and other constants.
const (
APIHost = "http://127.0.0.1:10002"
UserTokenURL = APIHost + "/auth/user_token"
@@ -18,27 +18,27 @@ const (
OperationID = "1646445464564"
)
// UserTokenRequest represents a request to get a user token
// UserTokenRequest represents a request to get a user token.
type UserTokenRequest struct {
Secret string `json:"secret"`
PlatformID int `json:"platformID"`
UserID string `json:"userID"`
}
// UserTokenResponse represents a response containing a user token
// UserTokenResponse represents a response containing a user token.
type UserTokenResponse struct {
Token string `json:"token"`
ErrCode int `json:"errCode"`
}
// User represents user data for registration
// User represents user data for registration.
type User struct {
UserID string `json:"userID"`
Nickname string `json:"nickname"`
FaceURL string `json:"faceURL"`
}
// UserRegisterRequest represents a request to register a user
// UserRegisterRequest represents a request to register a user.
type UserRegisterRequest struct {
Secret string `json:"secret"`
Users []User `json:"users"`
@@ -58,7 +58,7 @@ func main() {
}
}
// GetUserToken requests a user token from the API
// GetUserToken requests a user token from the API.
func GetUserToken(userID string) (string, error) {
reqBody := UserTokenRequest{
Secret: SecretKey,
@@ -88,7 +88,7 @@ func GetUserToken(userID string) (string, error) {
return tokenResp.Token, nil
}
// RegisterUser registers a new user using the API
// RegisterUser registers a new user using the API.
func RegisterUser(token, userID, nickname, faceURL string) error {
user := User{
UserID: userID,
+5 -5
View File
@@ -7,18 +7,18 @@ import (
"github.com/openimsdk/open-im-server/v3/test/e2e/framework/config"
)
// UserInfoRequest represents a request to get or update user information
// UserInfoRequest represents a request to get or update user information.
type UserInfoRequest struct {
UserIDs []string `json:"userIDs,omitempty"`
UserInfo *gettoken.User `json:"userInfo,omitempty"`
}
// GetUsersOnlineStatusRequest represents a request to get users' online status
// GetUsersOnlineStatusRequest represents a request to get users' online status.
type GetUsersOnlineStatusRequest struct {
UserIDs []string `json:"userIDs"`
}
// GetUsersInfo retrieves detailed information for a list of user IDs
// GetUsersInfo retrieves detailed information for a list of user IDs.
func GetUsersInfo(token string, userIDs []string) error {
url := fmt.Sprintf("http://%s:%s/user/get_users_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
@@ -29,7 +29,7 @@ func GetUsersInfo(token string, userIDs []string) error {
return sendPostRequestWithToken(url, token, requestBody)
}
// UpdateUserInfo updates the information for a user
// UpdateUserInfo updates the information for a user.
func UpdateUserInfo(token, userID, nickname, faceURL string) error {
url := fmt.Sprintf("http://%s:%s/user/update_user_info", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
@@ -44,7 +44,7 @@ func UpdateUserInfo(token, userID, nickname, faceURL string) error {
return sendPostRequestWithToken(url, token, requestBody)
}
// GetUsersOnlineStatus retrieves the online status for a list of user IDs
// GetUsersOnlineStatus retrieves the online status for a list of user IDs.
func GetUsersOnlineStatus(token string, userIDs []string) error {
url := fmt.Sprintf("http://%s:%s/user/get_users_online_status", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
+8 -8
View File
@@ -11,29 +11,29 @@ import (
"github.com/openimsdk/open-im-server/v3/test/e2e/framework/config"
)
// ForceLogoutRequest represents a request to force a user logout
// ForceLogoutRequest represents a request to force a user logout.
type ForceLogoutRequest struct {
PlatformID int `json:"platformID"`
UserID string `json:"userID"`
}
// CheckUserAccountRequest represents a request to check a user account
// CheckUserAccountRequest represents a request to check a user account.
type CheckUserAccountRequest struct {
CheckUserIDs []string `json:"checkUserIDs"`
}
// GetUsersRequest represents a request to get a list of users
// GetUsersRequest represents a request to get a list of users.
type GetUsersRequest struct {
Pagination Pagination `json:"pagination"`
}
// Pagination specifies the page number and number of items per page
// Pagination specifies the page number and number of items per page.
type Pagination struct {
PageNumber int `json:"pageNumber"`
ShowNumber int `json:"showNumber"`
}
// ForceLogout forces a user to log out
// ForceLogout forces a user to log out.
func ForceLogout(token, userID string, platformID int) error {
url := fmt.Sprintf("http://%s:%s/auth/force_logout", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
@@ -45,7 +45,7 @@ func ForceLogout(token, userID string, platformID int) error {
return sendPostRequestWithToken(url, token, requestBody)
}
// CheckUserAccount checks if the user accounts exist
// CheckUserAccount checks if the user accounts exist.
func CheckUserAccount(token string, userIDs []string) error {
url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
@@ -56,7 +56,7 @@ func CheckUserAccount(token string, userIDs []string) error {
return sendPostRequestWithToken(url, token, requestBody)
}
// GetUsers retrieves a list of users with pagination
// GetUsers retrieves a list of users with pagination.
func GetUsers(token string, pageNumber, showNumber int) error {
url := fmt.Sprintf("http://%s:%s/user/account_check", config.LoadConfig().APIHost, config.LoadConfig().APIPort)
@@ -70,7 +70,7 @@ func GetUsers(token string, pageNumber, showNumber int) error {
return sendPostRequestWithToken(url, token, requestBody)
}
// sendPostRequestWithToken sends a POST request with a token in the header
// sendPostRequestWithToken sends a POST request with a token in the header.
func sendPostRequestWithToken(url, token string, body any) error {
reqBytes, err := json.Marshal(body)
if err != nil {
+4 -4
View File
@@ -10,7 +10,7 @@ import (
)
var (
// The default template version
// The default template version.
defaultTemplateVersion = "v1.3.0"
)
@@ -84,7 +84,7 @@ func main() {
select {}
}
// getLatestVersion fetches the latest version number from a given URL
// getLatestVersion fetches the latest version number from a given URL.
func getLatestVersion(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
@@ -102,7 +102,7 @@ func getLatestVersion(url string) (string, error) {
return latestVersion, nil
}
// downloadAndExtract downloads a file from a URL and extracts it to a destination directory
// downloadAndExtract downloads a file from a URL and extracts it to a destination directory.
func downloadAndExtract(url, destDir string) error {
resp, err := http.Get(url)
if err != nil {
@@ -141,7 +141,7 @@ func downloadAndExtract(url, destDir string) error {
return cmd.Run()
}
// startProcess starts a process and prints any errors encountered
// startProcess starts a process and prints any errors encountered.
func startProcess(cmdPath string) {
cmd := exec.Command(cmdPath)
cmd.Stdout = os.Stdout
+2 -1
View File
@@ -1,11 +1,12 @@
package pkg
import (
"time"
mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3"
mongoModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
mysqlModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mysql"
"time"
)
type convert struct{}
@@ -2,13 +2,15 @@ package mgo
import (
"context"
"time"
"github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
)
func NewMeeting(db *mongo.Database) (table.MeetingInterface, error) {
@@ -2,14 +2,16 @@ package mgo
import (
"context"
"time"
"github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
)
func NewMeetingInvitation(db *mongo.Database) (table.MeetingInvitationInterface, error) {
@@ -55,7 +57,12 @@ func (x *meetingInvitation) CreateMeetingInvitationInfo(ctx context.Context, roo
func (x *meetingInvitation) GetUserInvitedMeetingIDs(ctx context.Context, userID string) (meetingIDs []string, err error) {
fiveDaysAgo := time.Now().AddDate(0, 0, -5)
return mgoutil.Find[string](ctx, x.coll, bson.M{"user_id": userID, "create_time": bson.M{"$gte": fiveDaysAgo}}, options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1}))
return mgoutil.Find[string](
ctx,
x.coll,
bson.M{"user_id": userID, "create_time": bson.M{"$gte": fiveDaysAgo}},
options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1}),
)
}
func (x *meetingInvitation) Delete(ctx context.Context, roomIDs []string) error {
@@ -2,10 +2,12 @@ package mgo
import (
"context"
"github.com/OpenIMSDK/tools/mgoutil"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
)
func NewMeetingRecord(db *mongo.Database) (table.MeetingRecordInterface, error) {
@@ -2,13 +2,15 @@ package mgo
import (
"context"
"time"
"github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
)
func NewSignal(db *mongo.Database) (table.SignalInterface, error) {
@@ -2,14 +2,16 @@ package mgo
import (
"context"
"time"
"github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination"
"github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
)
func NewSignalInvitation(db *mongo.Database) (table.SignalInvitationInterface, error) {
@@ -2,8 +2,9 @@ package table
import (
"context"
"github.com/OpenIMSDK/tools/pagination"
"time"
"github.com/OpenIMSDK/tools/pagination"
)
type MeetingInfo struct {
@@ -2,11 +2,12 @@ package table
import (
"context"
"time"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/pagination"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/mongo"
"time"
)
type SignalModel struct {
+1 -1
View File
@@ -16,7 +16,7 @@ type SignalModel struct {
SessionType int32 `gorm:"column:sesstion_type"`
InitiateTime time.Time `gorm:"column:initiate_time"`
EndTime time.Time `gorm:"column:end_time"`
FileURL string `gorm:"column:file_url" json:"-"`
FileURL string `gorm:"column:file_url" json:"-"`
Title string `gorm:"column:title;size:128"`
Desc string `gorm:"column:desc;size:1024"`
+2 -1
View File
@@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"gopkg.in/yaml.v3"
"log"
"os"
"reflect"
"strconv"
"gopkg.in/yaml.v3"
"github.com/go-sql-driver/mysql"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
+2 -1
View File
@@ -2,9 +2,10 @@ package main
import (
"flag"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg"
"log"
"os"
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg"
)
func main() {