Compare commits

..

15 Commits

Author SHA1 Message Date
Xinwei Xiong (cubxxw) 080dfb222a fix: fix Security vulnerability
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-15 21:09:41 +08:00
Xinwei Xiong (cubxxw) 10646f26c0 fix: fix Security vulnerability
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-15 19:36:21 +08:00
Xinwei Xiong (cubxxw) 0992a36602 feat: add openim config set
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-15 19:04:41 +08:00
Xinwei Xiong (cubxxw) 9774f550b9 feat: add openim config set
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-15 15:03:00 +08:00
Xinwei Xiong (cubxxw) 61630275c9 fix: remove openim chat config file
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-15 11:32:22 +08:00
Xinwei Xiong (cubxxw) a1eebcaeba fix: docker compose
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 21:25:39 +08:00
Xinwei Xiong (cubxxw) b17b212866 feat: add openim copyright
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 17:07:59 +08:00
Xinwei Xiong (cubxxw) e1422ec8f4 feat: add openim docker
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 15:11:45 +08:00
Xinwei Xiong (cubxxw) ad47590e13 feat: add openim docker
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:39:54 +08:00
Xinwei Xiong (cubxxw) a42a44e0a3 feat: add openim docker
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) d8838ee6b8 feat: add kafka and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) f480f52e2d feat: add zk and redis mongo env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) a23cbf13cf feat: add openim mongo and redis env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) 498e26a942 feat: add openim env
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
Xinwei Xiong (cubxxw) e1990c179e feat: add openim server code
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-14 11:21:00 +08:00
53 changed files with 253 additions and 979 deletions
+6 -55
View File
@@ -130,14 +130,14 @@ jobs:
sudo make install sudo make install
execute-scripts: execute-scripts:
name: Execute OpenIM Script On ${{ matrix.os }} name: Execute OpenIM script on ${{ matrix.os }}
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
environment: environment:
name: openim name: openim
strategy: strategy:
matrix: matrix:
go_version: ["1.20"] go_version: ["1.20"]
os: ["ubuntu-latest", "macos-latest"] os: ["ubuntu-latest"]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4
@@ -154,52 +154,18 @@ jobs:
version: '3.x' # If available, use the latest major version that's compatible version: '3.x' # If available, use the latest major version that's compatible
repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token: ${{ secrets.GITHUB_TOKEN }}
# - name: Install latest Bash (macOS only) - name: Docker Operations
# if: runner.os == 'macOS'
# run: |
# /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# brew update
# brew install bash
# brew install gnu-sed
# echo "/usr/local/bin" >> $GITHUB_PATH
# echo "$(brew --prefix)/opt/gnu-sed/libexec/gnubin" >> $GITHUB_PATH
# continue-on-error: true
- name: Set up Docker for Ubuntu
if: runner.os == 'Linux'
run: | run: |
sudo make init sudo make init
sudo docker compose up -d sudo docker compose up -d
sudo sleep 20 sudo sleep 20
# - name: Set up Docker for macOS - name: Module Operations
# if: runner.os == 'macOS'
# run: |
# brew install --cask docker
# open /Applications/Docker.app
# sleep 10
# docker-compose --version || brew install docker-compose
# docker-compose up -d
# sleep 20
- name: Module Operations for Ubuntu
if: runner.os == 'Linux'
run: | run: |
sudo make tidy sudo make tidy
sudo make tools.verify.go-gitlint sudo make tools.verify.go-gitlint
# - name: Module Operations for macOS - name: Build, Start, Check Services and Print Logs
# if: runner.os == 'macOS'
# run: |
# make tidy
# make tools.verify.go-gitlint
- name: Build, Start, Check Services and Print Logs for Ubuntu
if: runner.os == 'Linux'
run: | run: |
sudo make init && \ sudo make init && \
sudo make build && \ sudo make build && \
@@ -207,21 +173,6 @@ jobs:
sudo make check || \ sudo make check || \
(echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null) (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
- name: Restart Services and Print Logs for Ubuntu
if: runner.os == 'Linux'
run: |
sudo make restart
sudo make check
# - name: Build, Start, Check Services and Print Logs for macOS
# if: runner.os == 'macOS'
# run: |
# make init && \
# make build && \
# make start && \
# make check || \
# (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
openim-test-build-image: openim-test-build-image:
name: Build OpenIM Docker Image name: Build OpenIM Docker Image
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -245,4 +196,4 @@ jobs:
- name: Test Docker Build - name: Test Docker Build
run: | run: |
sudo make init sudo make init
sudo make image sudo make image
+1 -1
View File
@@ -95,7 +95,7 @@ stop:
## restart: Restart openim (make init configuration file is initialized) ✨ ## restart: Restart openim (make init configuration file is initialized) ✨
.PHONY: restart .PHONY: restart
restart: clean stop build start check restart: clean stop build init start check
## multiarch: Build binaries for multiple platforms. See option PLATFORMS. ✨ ## multiarch: Build binaries for multiple platforms. See option PLATFORMS. ✨
.PHONY: multiarch .PHONY: multiarch
+3 -3
View File
@@ -122,14 +122,14 @@ api:
# minio.signEndpoint is minio public network address # minio.signEndpoint is minio public network address
object: object:
enable: "minio" enable: "minio"
apiURL: "http://14.155.64.202:10002" apiURL: "http://172.28.0.1:10002"
minio: minio:
bucket: "openim" bucket: "openim"
endpoint: "http://172.28.0.1:10005" endpoint: "http://172.28.0.1:10005"
accessKeyID: "root" accessKeyID: "root"
secretAccessKey: "openIM123" secretAccessKey: "openIM123"
sessionToken: '' sessionToken: ''
signEndpoint: "http://14.155.64.202:10005" signEndpoint: "http://172.28.0.1:10005"
publicRead: false publicRead: false
cos: cos:
bucketURL: https://temp-1252357374.cos.ap-chengdu.myqcloud.com bucketURL: https://temp-1252357374.cos.ap-chengdu.myqcloud.com
@@ -193,7 +193,7 @@ rpcRegisterName:
# Whether to output in json format # Whether to output in json format
# Whether to include stack trace in logs # Whether to include stack trace in logs
log: log:
storageLocation: /data/workspaces/open-im-server/logs/ storageLocation: ../logs/
rotationTime: 24 rotationTime: 24
remainRotationCount: 2 remainRotationCount: 2
remainLogLevel: 6 remainLogLevel: 6
+6 -7
View File
@@ -26,11 +26,11 @@ PASSWORD=openIM123
# Base URL for the application programming interface (API). # Base URL for the application programming interface (API).
# Default: API_URL=http://172.28.0.1:10002 # Default: API_URL=http://172.28.0.1:10002
API_URL=http://14.155.64.202:10002 API_URL=http://172.28.0.1:10002
# Directory path for storing data files or related information. # Directory path for storing data files or related information.
# Default: DATA_DIR=./ # Default: DATA_DIR=./
DATA_DIR=/data/workspaces/open-im-server DATA_DIR=./
# Choose the appropriate image address, the default is GITHUB image, # Choose the appropriate image address, the default is GITHUB image,
# you can choose docker hub, for Chinese users can choose Ali Cloud # you can choose docker hub, for Chinese users can choose Ali Cloud
@@ -210,9 +210,8 @@ API_OPENIM_PORT=10002
# ====================================== # ======================================
# Branch name for OpenIM chat. # Branch name for OpenIM chat.
# Default: CHAT_IMAGE_VERSION=main # Default: CHAT_BRANCH=main
# https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/version.md CHAT_BRANCH=main
CHAT_IMAGE_VERSION=main
# Address or hostname for the OpenIM chat service. # Address or hostname for the OpenIM chat service.
# Default: OPENIM_CHAT_ADDRESS=172.28.0.1 # Default: OPENIM_CHAT_ADDRESS=172.28.0.1
@@ -232,8 +231,8 @@ OPENIM_CHAT_DATA_DIR=./openim-chat/main
# ====================================== # ======================================
# Branch name for OpenIM server. # Branch name for OpenIM server.
# Default: SERVER_IMAGE_VERSION=main # Default: SERVER_BRANCH=main
SERVER_IMAGE_VERSION=main SERVER_BRANCH=main
# Port for the OpenIM admin API. # Port for the OpenIM admin API.
# Default: OPENIM_ADMIN_API_PORT=10009 # Default: OPENIM_ADMIN_API_PORT=10009
+4 -4
View File
@@ -210,8 +210,8 @@ API_OPENIM_PORT=${API_OPENIM_PORT}
# ====================================== # ======================================
# Branch name for OpenIM chat. # Branch name for OpenIM chat.
# Default: CHAT_IMAGE_VERSION=main # Default: CHAT_BRANCH=main
CHAT_IMAGE_VERSION=${CHAT_IMAGE_VERSION} CHAT_BRANCH=${CHAT_BRANCH}
# Address or hostname for the OpenIM chat service. # Address or hostname for the OpenIM chat service.
# Default: OPENIM_CHAT_ADDRESS=172.28.0.1 # Default: OPENIM_CHAT_ADDRESS=172.28.0.1
@@ -231,8 +231,8 @@ OPENIM_CHAT_DATA_DIR=${OPENIM_CHAT_DATA_DIR}
# ====================================== # ======================================
# Branch name for OpenIM server. # Branch name for OpenIM server.
# Default: SERVER_IMAGE_VERSION=main # Default: SERVER_BRANCH=main
SERVER_IMAGE_VERSION=${SERVER_IMAGE_VERSION} SERVER_BRANCH=${SERVER_BRANCH}
# Port for the OpenIM admin API. # Port for the OpenIM admin API.
# Default: OPENIM_ADMIN_API_PORT=10009 # Default: OPENIM_ADMIN_API_PORT=10009
-8
View File
@@ -247,14 +247,6 @@ manager:
userID: [ "${MANAGER_USERID_1}", "${MANAGER_USERID_2}", "${MANAGER_USERID_3}" ] userID: [ "${MANAGER_USERID_1}", "${MANAGER_USERID_2}", "${MANAGER_USERID_3}" ]
nickname: [ "${NICKNAME_1}", "${NICKNAME_2}", "${NICKNAME_3}" ] nickname: [ "${NICKNAME_1}", "${NICKNAME_2}", "${NICKNAME_3}" ]
# chatAdmin, use for send notification
#
# Built-in app system notification account ID
# Built-in app system notification account nickname
im-admin:
userID: [ "${IM_ADMIN_USERID}" ]
nickname: [ "${IM_ADMIN_NAME}" ]
# Multi-platform login policy # Multi-platform login policy
# For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time # For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time
multiLoginPolicy: ${MULTILOGIN_POLICY} multiLoginPolicy: ${MULTILOGIN_POLICY}
+10 -10
View File
@@ -44,12 +44,12 @@ scrape_configs:
# prometheus fetches application services # prometheus fetches application services
- job_name: 'openimserver-openim-api' - job_name: 'openimserver-openim-api'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${API_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${API_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-msggateway' - job_name: 'openimserver-openim-msggateway'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MSG_GATEWAY_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${MSG_GATEWAY_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-msgtransfer' - job_name: 'openimserver-openim-msgtransfer'
@@ -59,41 +59,41 @@ scrape_configs:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-push' - job_name: 'openimserver-openim-push'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${PUSH_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${PUSH_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-auth' - job_name: 'openimserver-openim-rpc-auth'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${AUTH_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${AUTH_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-conversation' - job_name: 'openimserver-openim-rpc-conversation'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${CONVERSATION_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${CONVERSATION_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-friend' - job_name: 'openimserver-openim-rpc-friend'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${FRIEND_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${FRIEND_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-group' - job_name: 'openimserver-openim-rpc-group'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${GROUP_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${GROUP_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-msg' - job_name: 'openimserver-openim-rpc-msg'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MESSAGE_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${MESSAGE_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-third' - job_name: 'openimserver-openim-rpc-third'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${THIRD_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${THIRD_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-user' - job_name: 'openimserver-openim-rpc-user'
static_configs: static_configs:
- targets: [ '${DOCKER_BRIDGE_GATEWAY}:${USER_PROM_PORT}' ] - targets: [ '${OPENIM_SERVER_ADDRESS}:${USER_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
+41 -43
View File
@@ -104,18 +104,18 @@ Docker deployment offers a slightly more intricate template. Within the [openim-
Configuration file modifications can be made by specifying corresponding environment variables, for instance: Configuration file modifications can be made by specifying corresponding environment variables, for instance:
```bash ```bash
export CHAT_IMAGE_VERSION="main" export CHAT_BRANCH="main"
export SERVER_IMAGE_VERSION="main" export SERVER_BRANCH="main"
``` ```
These variables are stored within the [`environment.sh`](https://github.com/OpenIMSDK/openim-docker/blob/main/scripts/install/environment.sh) configuration: These variables are stored within the [`environment.sh`](https://github.com/OpenIMSDK/openim-docker/blob/main/scripts/install/environment.sh) configuration:
```bash ```bash
readonly CHAT_IMAGE_VERSION=${CHAT_IMAGE_VERSION:-'main'} readonly CHAT_BRANCH=${CHAT_BRANCH:-'main'}
readonly SERVER_IMAGE_VERSION=${SERVER_IMAGE_VERSION:-'main'} readonly SERVER_BRANCH=${SERVER_BRANCH:-'main'}
``` ```
Setting a variable, e.g., `export CHAT_IMAGE_VERSION="release-v1.3"`, will prioritize `CHAT_IMAGE_VERSION="release-v1.3"` as the variable value. Ultimately, the chosen image version is determined, and rendering is achieved through `make init` (or `./scripts/init-config.sh`). Setting a variable, e.g., `export CHAT_BRANCH="release-v1.3"`, will prioritize `CHAT_BRANCH="release-v1.3"` as the variable value. Ultimately, the chosen image version is determined, and rendering is achieved through `make init` (or `./scripts/init-config.sh`).
> Note: Direct modifications to the `config.yaml` file are also permissible without utilizing `make init`. > Note: Direct modifications to the `config.yaml` file are also permissible without utilizing `make init`.
@@ -453,45 +453,43 @@ This section involves configuring the log settings, including storage location,
This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat. This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat.
| Parameter | Example Value | Description | | Parameter | Example Value | Description |
|-------------------------|-------------------|----------------------------------| |-------------------------|-------------------|------------------------------------|
| WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections | | WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections |
| WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length | | WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length |
| WEBSOCKET_TIMEOUT | "10" | Websocket timeout | | WEBSOCKET_TIMEOUT | "10" | Websocket timeout |
| PUSH_ENABLE | "getui" | Push notification enable status | | PUSH_ENABLE | "getui" | Push notification enable status |
| GETUI_PUSH_URL | [Generated URL] | GeTui Push Notification URL | | GETUI_PUSH_URL | [Generated URL] | GeTui Push Notification URL |
| GETUI_MASTER_SECRET | [User Defined] | GeTui Master Secret | | GETUI_MASTER_SECRET | [User Defined] | GeTui Master Secret |
| GETUI_APP_KEY | [User Defined] | GeTui Application Key | | GETUI_APP_KEY | [User Defined] | GeTui Application Key |
| GETUI_INTENT | [User Defined] | GeTui Push Intent | | GETUI_INTENT | [User Defined] | GeTui Push Intent |
| GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID | | GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID |
| GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name | | GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name |
| FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account | | FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account |
| JPNS_APP_KEY | [User Defined] | JPNS Application Key | | JPNS_APP_KEY | [User Defined] | JPNS Application Key |
| JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret | | JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret |
| JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL | | JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL |
| JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent | | JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent |
| MANAGER_USERID_1 | "openIM123456" | Administrator ID 1 | | MANAGER_USERID_1 | "openIM123456" | Administrator ID 1 |
| MANAGER_USERID_2 | "openIM654321" | Administrator ID 2 | | MANAGER_USERID_2 | "openIM654321" | Administrator ID 2 |
| MANAGER_USERID_3 | "openIMAdmin" | Administrator ID 3 | | MANAGER_USERID_3 | "openIMAdmin" | Administrator ID 3 |
| NICKNAME_1 | "system1" | Nickname 1 | | NICKNAME_1 | "system1" | Nickname 1 |
| NICKNAME_2 | "system2" | Nickname 2 | | NICKNAME_2 | "system2" | Nickname 2 |
| NICKNAME_3 | "system3" | Nickname 3 | | NICKNAME_3 | "system3" | Nickname 3 |
| IM_ADMIN_USERID | "imAdmin" | IM Administrator ID | | MULTILOGIN_POLICY | "1" | Multi-login Policy |
| IM_ADMIN_NAME | "imAdmin" | IM Administrator Nickname | | CHAT_PERSISTENCE_MYSQL | "true" | Chat Persistence in MySQL |
| MULTILOGIN_POLICY | "1" | Multi-login Policy | | MSG_CACHE_TIMEOUT | "86400" | Message Cache Timeout |
| CHAT_PERSISTENCE_MYSQL | "true" | Chat Persistence in MySQL | | GROUP_MSG_READ_RECEIPT | "true" | Group Message Read Receipt Enable |
| MSG_CACHE_TIMEOUT | "86400" | Message Cache Timeout |
| GROUP_MSG_READ_RECEIPT | "true" | Group Message Read Receipt Enable |
| SINGLE_MSG_READ_RECEIPT | "true" | Single Message Read Receipt Enable | | SINGLE_MSG_READ_RECEIPT | "true" | Single Message Read Receipt Enable |
| RETAIN_CHAT_RECORDS | "365" | Retain Chat Records (in days) | | RETAIN_CHAT_RECORDS | "365" | Retain Chat Records (in days) |
| CHAT_RECORDS_CLEAR_TIME | [Cron Expression] | Chat Records Clear Time | | CHAT_RECORDS_CLEAR_TIME | [Cron Expression] | Chat Records Clear Time |
| MSG_DESTRUCT_TIME | [Cron Expression] | Message Destruct Time | | MSG_DESTRUCT_TIME | [Cron Expression] | Message Destruct Time |
| SECRET | "${PASSWORD}" | Secret Key | | SECRET | "${PASSWORD}" | Secret Key |
| TOKEN_EXPIRE | "90" | Token Expiry Time | | TOKEN_EXPIRE | "90" | Token Expiry Time |
| FRIEND_VERIFY | "false" | Friend Verification Enable | | FRIEND_VERIFY | "false" | Friend Verification Enable |
| IOS_PUSH_SOUND | "xxx" | iOS | | IOS_PUSH_SOUND | "xxx" | iOS |
| CALLBACK_ENABLE | "false" | Enable callback | | CALLBACK_ENABLE | "false" | Enable callback |
| CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call | | CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call |
| CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step | | CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step |
### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration ### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration
+2 -6
View File
@@ -4,6 +4,8 @@ go 1.19
require ( require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.31
github.com/OpenIMSDK/tools v0.0.20
github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1 github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
@@ -33,12 +35,9 @@ require github.com/google/uuid v1.3.1
require ( require (
github.com/IBM/sarama v1.41.3 github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.36
github.com/OpenIMSDK/tools v0.0.21
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.2.1 github.com/redis/go-redis/v9 v9.2.1
github.com/stathat/consistent v1.0.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.45 github.com/tencentyun/cos-go-sdk-v5 v0.7.45
go.uber.org/automaxprocs v1.5.3 go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.4.0 golang.org/x/sync v0.4.0
@@ -142,7 +141,6 @@ require (
gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/gorm v1.23.8 // indirect gorm.io/gorm v1.23.8 // indirect
stathat.com/c/consistent v1.0.0 // indirect
) )
require ( require (
@@ -156,5 +154,3 @@ require (
golang.org/x/crypto v0.14.0 // indirect golang.org/x/crypto v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
replace github.com/OpenIMSDK/protocol v0.0.36 => github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5
+4 -8
View File
@@ -18,8 +18,10 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48= github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE=
github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k=
github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@@ -225,8 +227,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5 h1:nmrJmAgQsCAxKgw109kaTcBV4rMWDRvqOson0ehw708=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
@@ -308,8 +308,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4= github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4=
github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
@@ -538,5 +536,3 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
+3 -2
View File
@@ -169,8 +169,9 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
case constant.OANotification: case constant.OANotification:
data = apistruct.OANotificationElem{} data = apistruct.OANotificationElem{}
req.SessionType = constant.NotificationChatType req.SessionType = constant.NotificationChatType
if err = m.userRpcClient.GetNotificationByID(c, req.SendID); err != nil { if !authverify.IsManagerUserID(req.SendID) {
return nil, err return nil, errs.ErrNoPermission.
Wrap("only app manager can as sender send OANotificationElem")
} }
default: default:
return nil, errs.ErrArgs.WithDetail("not support err contentType") return nil, errs.ErrArgs.WithDetail("not support err contentType")
-10
View File
@@ -77,15 +77,6 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus) userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus)
userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus) userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus)
userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus) userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus)
userRouterGroup.POST("/process_user_command_add", ParseToken, u.ProcessUserCommandAdd)
userRouterGroup.POST("/process_user_command_delete", ParseToken, u.ProcessUserCommandDelete)
userRouterGroup.POST("/process_user_command_update", ParseToken, u.ProcessUserCommandUpdate)
userRouterGroup.POST("/process_user_command_get", ParseToken, u.ProcessUserCommandGet)
userRouterGroup.POST("/add_notification_account", ParseToken, u.AddNotificationAccount)
userRouterGroup.POST("/update_notification_account", ParseToken, u.UpdateNotificationAccountInfo)
userRouterGroup.POST("/search_notification_account", ParseToken, u.SearchNotificationAccount)
} }
// friend routing group // friend routing group
friendRouterGroup := r.Group("/friend", ParseToken) friendRouterGroup := r.Group("/friend", ParseToken)
@@ -107,7 +98,6 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
friendRouterGroup.POST("/is_friend", f.IsFriend) friendRouterGroup.POST("/is_friend", f.IsFriend)
friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs) friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs)
friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo) friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo)
//friendRouterGroup.POST("/set_pin_friend", f.SetPinFriends)
} }
g := NewGroupApi(*groupRpc) g := NewGroupApi(*groupRpc)
groupRouterGroup := r.Group("/group", ParseToken) groupRouterGroup := r.Group("/group", ParseToken)
+2 -33
View File
@@ -15,6 +15,8 @@
package api package api
import ( import (
"github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/protocol/user" "github.com/OpenIMSDK/protocol/user"
@@ -22,7 +24,6 @@ import (
"github.com/OpenIMSDK/tools/apiresp" "github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@@ -198,35 +199,3 @@ func (u *UserApi) GetUserStatus(c *gin.Context) {
func (u *UserApi) GetSubscribeUsersStatus(c *gin.Context) { func (u *UserApi) GetSubscribeUsersStatus(c *gin.Context) {
a2r.Call(user.UserClient.GetSubscribeUsersStatus, u.Client, c) a2r.Call(user.UserClient.GetSubscribeUsersStatus, u.Client, c)
} }
// ProcessUserCommandAdd user general function add
func (u *UserApi) ProcessUserCommandAdd(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandAdd, u.Client, c)
}
// ProcessUserCommandDelete user general function delete
func (u *UserApi) ProcessUserCommandDelete(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandDelete, u.Client, c)
}
// ProcessUserCommandUpdate user general function update
func (u *UserApi) ProcessUserCommandUpdate(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandUpdate, u.Client, c)
}
// ProcessUserCommandGet user general function get
func (u *UserApi) ProcessUserCommandGet(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandGet, u.Client, c)
}
func (u *UserApi) AddNotificationAccount(c *gin.Context) {
a2r.Call(user.UserClient.AddNotificationAccount, u.Client, c)
}
func (u *UserApi) UpdateNotificationAccountInfo(c *gin.Context) {
a2r.Call(user.UserClient.UpdateNotificationAccountInfo, u.Client, c)
}
func (u *UserApi) SearchNotificationAccount(c *gin.Context) {
a2r.Call(user.UserClient.SearchNotificationAccount, u.Client, c)
}
+6 -7
View File
@@ -288,13 +288,12 @@ func (ws *WsServer) registerClient(client *Client) {
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
if config.Config.Envs.Discovery == "zookeeper" { wg.Add(1)
wg.Add(1) go func() {
go func() { defer wg.Done()
defer wg.Done() _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) }()
}()
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
+5 -6
View File
@@ -67,14 +67,13 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
var pushUserIDList []string var pushUserIDs []string
isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync) if pbData.MsgData.SendID != pbData.MsgData.RecvID {
if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID { pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID}
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
} else { } else {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID) pushUserIDs = []string{pbData.MsgData.SendID}
} }
err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData) err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData)
} }
if err != nil { if err != nil {
if err == errNoOfflinePusher { if err == errNoOfflinePusher {
+2 -1
View File
@@ -16,9 +16,10 @@ package push
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/utils"
"sync" "sync"
"github.com/OpenIMSDK/tools/utils"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
+21 -118
View File
@@ -18,7 +18,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"google.golang.org/grpc"
"sync" "sync"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@@ -143,47 +142,6 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error {
return json.Unmarshal([]byte(notification.Detail), t) return json.Unmarshal([]byte(notification.Detail), t)
} }
/*
k8s deployment,offline push group messages function
*/
func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error {
var needOfflinePushUserIDs []string
for _, v := range wsResults {
if !v.OnlinePush {
needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID)
}
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
if msg.ContentType != constant.SignalingNotification {
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
ctx,
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
)
if err != nil {
return err
}
if len(resp.UserIDs) > 0 {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
}
}
}
return nil
}
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string var pushToUserIDs []string
@@ -247,10 +205,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush && config.Config.Envs.Discovery == "k8s" { if isOfflinePush {
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
}
if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" {
var ( var (
onlineSuccessUserIDs = []string{msg.SendID} onlineSuccessUserIDs = []string{msg.SendID}
webAndPcBackgroundUserIDs []string webAndPcBackgroundUserIDs []string
@@ -284,7 +239,14 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
} }
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
return err
}
needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs)
}
// Use offline push messaging // Use offline push messaging
if len(needOfflinePushUserIDs) > 0 { if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string var offlinePushUserIDs []string
@@ -296,89 +258,30 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if len(offlinePushUserIDs) > 0 { if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs needOfflinePushUserIDs = offlinePushUserIDs
} }
if msg.ContentType != constant.SignalingNotification { resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( ctx,
ctx, &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, )
) if err != nil {
return err
}
if len(resp.UserIDs) > 0 {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
if err != nil { if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err return err
} }
if len(resp.UserIDs) > 0 { if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs) log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
if err != nil { return err
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
return err
}
} }
} }
} }
} }
return nil return nil
} }
func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
var usersHost = make(map[string][]string)
for _, v := range pushToUserIDs {
tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v)
if err != nil {
log.ZError(ctx, "get msggateway hash error", err)
return nil, err
}
tUsers, tbl := usersHost[tHost]
if tbl {
tUsers = append(tUsers, v)
usersHost[tHost] = tUsers
} else {
usersHost[tHost] = []string{v}
}
}
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
var usersConns = make(map[*grpc.ClientConn][]string)
for host, userIds := range usersHost {
tconn, _ := p.discov.GetConn(ctx, host)
usersConns[tconn] = userIds
}
var (
mu sync.Mutex
wg = errgroup.Group{}
maxWorkers = config.Config.Push.MaxConcurrentWorkers
)
if maxWorkers < 3 {
maxWorkers = 3
}
wg.SetLimit(maxWorkers)
for conn, userIds := range usersConns {
tcon := conn
tuserIds := userIds
wg.Go(func() error {
input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds}
msgClient := msggateway.NewMsgGatewayClient(tcon)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
if err != nil {
return nil
}
log.ZDebug(ctx, "push result", "reply", reply)
if reply != nil && reply.SinglePushResult != nil {
mu.Lock()
wsResults = append(wsResults, reply.SinglePushResult...)
mu.Unlock()
}
return nil
})
}
_ = wg.Wait()
return wsResults, nil
}
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
if config.Config.Envs.Discovery == "k8s" {
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
}
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil { if err != nil {
-1
View File
@@ -79,7 +79,6 @@ func (s *friendServer) AddBlack(ctx context.Context, req *pbfriend.AddBlackReq)
BlockUserID: req.BlackUserID, BlockUserID: req.BlackUserID,
OperatorUserID: mcontext.GetOpUserID(ctx), OperatorUserID: mcontext.GetOpUserID(ctx),
CreateTime: time.Now(), CreateTime: time.Now(),
Ex: req.Ex,
} }
if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil { if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil {
return nil, err return nil, err
+2 -37
View File
@@ -53,9 +53,8 @@ type friendServer struct {
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
} }
func (s *friendServer) UpdateFriends(ctx context.Context, req *pbfriend.UpdateFriendsReq) (*pbfriend.UpdateFriendsResp, error) { func (s *friendServer) PinFriends(ctx context.Context, req *pbfriend.PinFriendsReq) (*pbfriend.PinFriendsResp, error) {
//TODO implement me return nil, errs.ErrInternalServer.Wrap("not implemented")
panic("implement me")
} }
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -412,7 +411,6 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
} }
var friendInfo *sdkws.FriendInfo var friendInfo *sdkws.FriendInfo
if friend := friendMap[userID]; friend != nil { if friend := friendMap[userID]; friend != nil {
friendInfo = &sdkws.FriendInfo{ friendInfo = &sdkws.FriendInfo{
OwnerUserID: friend.OwnerUserID, OwnerUserID: friend.OwnerUserID,
Remark: friend.Remark, Remark: friend.Remark,
@@ -420,7 +418,6 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
AddSource: friend.AddSource, AddSource: friend.AddSource,
OperatorUserID: friend.OperatorUserID, OperatorUserID: friend.OperatorUserID,
Ex: friend.Ex, Ex: friend.Ex,
IsPinned: friend.IsPinned,
} }
} }
var blackInfo *sdkws.BlackInfo var blackInfo *sdkws.BlackInfo
@@ -441,35 +438,3 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
} }
return resp, nil return resp, nil
} }
func (s *friendServer) PinFriends(
ctx context.Context,
req *pbfriend.UpdateFriendsReq,
) (*pbfriend.UpdateFriendsResp, error) {
if len(req.FriendUserIDs) == 0 {
return nil, errs.ErrArgs.Wrap("friendIDList is empty")
}
if utils.Duplicate(req.FriendUserIDs) {
return nil, errs.ErrArgs.Wrap("friendIDList repeated")
}
var isPinned bool
if req.IsPinned != nil {
isPinned = req.IsPinned.Value
} else {
return nil, errs.ErrArgs.Wrap("isPinned is nil")
}
//check whther in friend list
_, err := s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err
}
//set friendslist friend pin status to isPinned
for _, friendID := range req.FriendUserIDs {
if err := s.friendDatabase.UpdateFriendPinStatus(ctx, req.OwnerUserID, friendID, isPinned); err != nil {
return nil, err
}
}
resp := &pbfriend.UpdateFriendsResp{}
return resp, nil
}
+4 -4
View File
@@ -327,6 +327,8 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserT
// Handle the scenario where certain members are refused // Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic // You might want to update the req.Members list or handle it as per your business logic
} }
utils.StructFieldNotNilReplace(req, resp)
return nil return nil
} }
@@ -393,10 +395,7 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq)
if resp.ApplyMemberFriend != nil { if resp.ApplyMemberFriend != nil {
req.GroupInfoForSet.ApplyMemberFriend = wrapperspb.Int32(*resp.ApplyMemberFriend) req.GroupInfoForSet.ApplyMemberFriend = wrapperspb.Int32(*resp.ApplyMemberFriend)
} }
utils.NotNilReplace(&req.GroupInfoForSet.GroupID, &resp.GroupID) utils.StructFieldNotNilReplace(req, resp)
utils.NotNilReplace(&req.GroupInfoForSet.GroupName, &resp.GroupName)
utils.NotNilReplace(&req.GroupInfoForSet.FaceURL, &resp.FaceURL)
utils.NotNilReplace(&req.GroupInfoForSet.Introduction, &resp.Introduction)
return nil return nil
} }
func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error { func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error {
@@ -427,5 +426,6 @@ func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq)
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil {
return err return err
} }
utils.StructFieldNotNilReplace(req, resp)
return nil return nil
} }
+1 -2
View File
@@ -279,6 +279,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
break break
} }
} }
s.Notification.GroupCreatedNotification(ctx, tips)
if req.GroupInfo.GroupType == constant.SuperGroup { if req.GroupInfo.GroupType == constant.SuperGroup {
go func() { go func() {
for _, userID := range userIDs { for _, userID := range userIDs {
@@ -802,7 +803,6 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
GroupType: string(group.GroupType), GroupType: string(group.GroupType),
ApplyID: req.InviterUserID, ApplyID: req.InviterUserID,
ReqMessage: req.ReqMessage, ReqMessage: req.ReqMessage,
Ex: req.Ex,
} }
if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil { if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil {
@@ -849,7 +849,6 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
JoinSource: req.JoinSource, JoinSource: req.JoinSource,
ReqTime: time.Now(), ReqTime: time.Now(),
HandledTime: time.Unix(0, 0), HandledTime: time.Unix(0, 0),
Ex: req.Ex,
} }
if err := s.db.CreateGroupRequest(ctx, []*relationtb.GroupRequestModel{&groupRequest}); err != nil { if err := s.db.CreateGroupRequest(ctx, []*relationtb.GroupRequestModel{&groupRequest}); err != nil {
return nil, err return nil, err
+1
View File
@@ -202,5 +202,6 @@ func CallbackAfterRevokeMsg(ctx context.Context, req *pbchat.RevokeMsgReq) error
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterRevokeMsg); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterRevokeMsg); err != nil {
return err return err
} }
utils.StructFieldNotNilReplace(req, resp)
return nil return nil
} }
-10
View File
@@ -101,16 +101,6 @@ type thirdServer struct {
defaultExpire time.Duration defaultExpire time.Duration
} }
func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
//TODO implement me
panic("implement me")
}
func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
//TODO implement me
panic("implement me")
}
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) { func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime) err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil { if err != nil {
+16 -184
View File
@@ -17,7 +17,6 @@ package user
import ( import (
"context" "context"
"errors" "errors"
"math/rand"
"strings" "strings"
"time" "time"
@@ -57,6 +56,22 @@ type userServer struct {
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
} }
func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.ProcessUserCommandAddReq) (*pbuser.ProcessUserCommandAddResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.ProcessUserCommandUpdateReq) (*pbuser.ProcessUserCommandUpdateResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.ProcessUserCommandDeleteReq) (*pbuser.ProcessUserCommandDeleteResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
@@ -73,12 +88,6 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
for k, v := range config.Config.Manager.UserID { for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin}) users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
} }
if len(config.Config.IMAdmin.UserID) != len(config.Config.IMAdmin.Nickname) {
return errors.New("len(config.Config.AppNotificationAdmin.AppManagerUid) != len(config.Config.AppNotificationAdmin.Nickname)")
}
for k, v := range config.Config.IMAdmin.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
}
userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
if err != nil { if err != nil {
return err return err
@@ -341,180 +350,3 @@ func (s *userServer) GetSubscribeUsersStatus(ctx context.Context,
} }
return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil
} }
// ProcessUserCommandAdd user general function add
func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.ProcessUserCommandAddReq) (*pbuser.ProcessUserCommandAddResp, error) {
// Assuming you have a method in s.UserDatabase to add a user command
err := s.UserDatabase.AddUserCommand(ctx, req.UserID, req.Type, req.Uuid, req.Value)
if err != nil {
return nil, err
}
return &pbuser.ProcessUserCommandAddResp{}, nil
}
// ProcessUserCommandDelete user general function delete
func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.ProcessUserCommandDeleteReq) (*pbuser.ProcessUserCommandDeleteResp, error) {
// Assuming you have a method in s.UserDatabase to delete a user command
err := s.UserDatabase.DeleteUserCommand(ctx, req.UserID, req.Type, req.Uuid)
if err != nil {
return nil, err
}
return &pbuser.ProcessUserCommandDeleteResp{}, nil
}
// ProcessUserCommandUpdate user general function update
func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.ProcessUserCommandUpdateReq) (*pbuser.ProcessUserCommandUpdateResp, error) {
// Assuming you have a method in s.UserDatabase to update a user command
err := s.UserDatabase.UpdateUserCommand(ctx, req.UserID, req.Type, req.Uuid, req.Value)
if err != nil {
return nil, err
}
return &pbuser.ProcessUserCommandUpdateResp{}, nil
}
func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) {
// Fetch user commands from the database
commands, err := s.UserDatabase.GetUserCommands(ctx, req.UserID, req.Type)
if err != nil {
return nil, err
}
// Initialize commandInfoSlice as an empty slice
commandInfoSlice := make([]*pbuser.CommandInfoResp, 0, len(commands))
for _, command := range commands {
// No need to use index since command is already a pointer
commandInfoSlice = append(commandInfoSlice, &pbuser.CommandInfoResp{
Uuid: command.Uuid,
Value: command.Value,
CreateTime: command.CreateTime,
})
}
// Return the response with the slice
return &pbuser.ProcessUserCommandGetResp{KVArray: commandInfoSlice}, nil
}
func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.AddNotificationAccountReq) (*pbuser.AddNotificationAccountResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
var userID string
for i := 0; i < 20; i++ {
userId := s.genUserID()
_, err := s.UserDatabase.FindWithError(ctx, []string{userId})
if err == nil {
continue
}
userID = userId
break
}
if userID == "" {
return nil, errs.ErrInternalServer.Wrap("gen user id failed")
}
user := &tablerelation.UserModel{
UserID: userID,
Nickname: req.NickName,
FaceURL: req.FaceURL,
CreateTime: time.Now(),
AppMangerLevel: constant.AppNotificationAdmin,
}
if err := s.UserDatabase.Create(ctx, []*tablerelation.UserModel{user}); err != nil {
return nil, err
}
return &pbuser.AddNotificationAccountResp{}, nil
}
func (s *userServer) UpdateNotificationAccountInfo(ctx context.Context, req *pbuser.UpdateNotificationAccountInfoReq) (*pbuser.UpdateNotificationAccountInfoResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
if _, err := s.UserDatabase.FindWithError(ctx, []string{req.UserID}); err != nil {
return nil, errs.ErrArgs.Wrap()
}
user := map[string]interface{}{}
if req.NickName != "" {
user["nickname"] = req.NickName
}
if req.FaceURL != "" {
user["face_url"] = req.FaceURL
}
if err := s.UserDatabase.UpdateByMap(ctx, req.UserID, user); err != nil {
return nil, err
}
return &pbuser.UpdateNotificationAccountInfoResp{}, nil
}
func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.SearchNotificationAccountReq) (*pbuser.SearchNotificationAccountResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
_, users, err := s.UserDatabase.Page(ctx, req.Pagination)
if err != nil {
return nil, err
}
var total int64
accounts := make([]*pbuser.NotificationAccountInfo, 0, len(users))
for _, v := range users {
if v.AppMangerLevel != constant.AppNotificationAdmin {
continue
}
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
}
accounts = append(accounts, temp)
total += 1
}
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: accounts}, nil
}
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (*pbuser.UpdateUserInfoExResp, error) {
//TODO implement me
panic("implement me")
}
func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.GetNotificationAccountReq) (*pbuser.GetNotificationAccountResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.Wrap("userID is empty")
}
user, err := s.UserDatabase.GetUserByID(ctx, req.UserID)
if err != nil {
return nil, errs.ErrUserIDNotFound.Wrap()
}
if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel == constant.AppNotificationAdmin {
return &pbuser.GetNotificationAccountResp{}, nil
}
return nil, errs.ErrNoPermission.Wrap("notification messages cannot be sent for this ID")
}
func (s *userServer) genUserID() string {
const l = 10
data := make([]byte, l)
rand.Read(data)
chars := []byte("0123456789")
for i := 0; i < len(data); i++ {
if i == 0 {
data[i] = chars[1:][data[i]%9]
} else {
data[i] = chars[data[i]%10]
}
}
return string(data)
}
+1 -1
View File
@@ -87,7 +87,7 @@ type OANotificationElem struct {
NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"` NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"`
Text string `mapstructure:"text" json:"text" validate:"required"` Text string `mapstructure:"text" json:"text" validate:"required"`
Url string `mapstructure:"url" json:"url"` Url string `mapstructure:"url" json:"url"`
MixType int32 `mapstructure:"mixType" json:"mixType"` MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"`
PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"` PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"`
SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"` SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"`
VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"` VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"`
-9
View File
@@ -54,15 +54,6 @@ func CheckAdmin(ctx context.Context) error {
} }
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
} }
func CheckIMAdmin(ctx context.Context) error {
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) {
return nil
}
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) {
return nil
}
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx)))
}
func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) { func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) {
return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret()) return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret())
-1
View File
@@ -148,7 +148,6 @@ type CallbackJoinGroupReq struct {
GroupType string `json:"groupType"` GroupType string `json:"groupType"`
ApplyID string `json:"applyID"` ApplyID string `json:"applyID"`
ReqMessage string `json:"reqMessage"` ReqMessage string `json:"reqMessage"`
Ex string `json:"ex"`
} }
type CallbackJoinGroupResp struct { type CallbackJoinGroupResp struct {
-5
View File
@@ -236,11 +236,6 @@ type configStruct struct {
Nickname []string `yaml:"nickname"` Nickname []string `yaml:"nickname"`
} `yaml:"manager"` } `yaml:"manager"`
IMAdmin struct {
UserID []string `yaml:"userID"`
Nickname []string `yaml:"nickname"`
} `yaml:"im-admin"`
MultiLoginPolicy int `yaml:"multiLoginPolicy"` MultiLoginPolicy int `yaml:"multiLoginPolicy"`
ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"` ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"`
MsgCacheTimeout int `yaml:"msgCacheTimeout"` MsgCacheTimeout int `yaml:"msgCacheTimeout"`
+1 -2
View File
@@ -17,6 +17,7 @@ package convert
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
@@ -61,7 +62,6 @@ func FriendsDB2Pb(
for _, friendDB := range friendsDB { for _, friendDB := range friendsDB {
userID = append(userID, friendDB.FriendUserID) userID = append(userID, friendDB.FriendUserID)
} }
users, err := getUsers(ctx, userID) users, err := getUsers(ctx, userID)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -74,7 +74,6 @@ func FriendsDB2Pb(
friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL
friendPb.FriendUser.Ex = users[friend.FriendUserID].Ex friendPb.FriendUser.Ex = users[friend.FriendUserID].Ex
friendPb.CreateTime = friend.CreateTime.Unix() friendPb.CreateTime = friend.CreateTime.Unix()
friendPb.IsPinned = friend.IsPinned
friendsPb = append(friendsPb, friendPb) friendsPb = append(friendsPb, friendPb)
} }
return friendsPb, nil return friendsPb, nil
+1 -1
View File
@@ -64,7 +64,7 @@ func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
"global_recv_msg_opt": user.GlobalRecvMsgOpt, "global_recv_msg_opt": user.GlobalRecvMsgOpt,
} }
for key, value := range fields { for key, value := range fields {
if v, ok := value.(string); ok { if v, ok := value.(string); ok && v != "" {
val[key] = v val[key] = v
} else if v, ok := value.(int32); ok && v != 0 { } else if v, ok := value.(int32); ok && v != 0 {
val[key] = v val[key] = v
+1 -9
View File
@@ -87,15 +87,7 @@ func NewRedis() (redis.UniversalClient, error) {
// overrideConfigFromEnv overrides configuration fields with environment variables if present. // overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv() { func overrideConfigFromEnv() {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
if envPort := os.Getenv("REDIS_PORT"); envPort != "" { config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + envPort
}
config.Config.Redis.Address = addresses
} else {
config.Config.Redis.Address = strings.Split(envAddr, ",")
}
} }
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Config.Redis.Username = envUser config.Config.Redis.Username = envUser
-7
View File
@@ -58,7 +58,6 @@ type FriendDatabase interface {
FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error)
FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error)
FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error)
UpdateFriendPinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error)
} }
type friendDatabase struct { type friendDatabase struct {
@@ -299,9 +298,3 @@ func (f *friendDatabase) FindFriendUserIDs(ctx context.Context, ownerUserID stri
func (f *friendDatabase) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error) { func (f *friendDatabase) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error) {
return f.friendRequest.FindBothFriendRequests(ctx, fromUserID, toUserID) return f.friendRequest.FindBothFriendRequests(ctx, fromUserID, toUserID)
} }
func (f *friendDatabase) UpdateFriendPinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error) {
if err := f.friend.UpdatePinStatus(ctx, ownerUserID, friendUserID, isPinned); err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
-25
View File
@@ -50,8 +50,6 @@ type UserDatabase interface {
IsExist(ctx context.Context, userIDs []string) (exist bool, err error) IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
// GetAllUserID Get all user IDs // GetAllUserID Get all user IDs
GetAllUserID(ctx context.Context, pagination pagination.Pagination) (int64, []string, error) GetAllUserID(ctx context.Context, pagination pagination.Pagination) (int64, []string, error)
// Get user by userID
GetUserByID(ctx context.Context, userID string) (user *relation.UserModel, err error)
// InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it // InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it
InitOnce(ctx context.Context, users []*relation.UserModel) (err error) InitOnce(ctx context.Context, users []*relation.UserModel) (err error)
// CountTotal Get the total number of users // CountTotal Get the total number of users
@@ -70,12 +68,6 @@ type UserDatabase interface {
GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error)
// SetUserStatus Set the user status and store the user status in redis // SetUserStatus Set the user status and store the user status in redis
SetUserStatus(ctx context.Context, userID string, status, platformID int32) error SetUserStatus(ctx context.Context, userID string, status, platformID int32) error
//CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error
UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
GetUserCommands(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error)
} }
type userDatabase struct { type userDatabase struct {
@@ -185,10 +177,6 @@ func (u *userDatabase) GetAllUserID(ctx context.Context, pagination pagination.P
return u.userDB.GetAllUserID(ctx, pagination) return u.userDB.GetAllUserID(ctx, pagination)
} }
func (u *userDatabase) GetUserByID(ctx context.Context, userID string) (user *relation.UserModel, err error) {
return u.userDB.Take(ctx, userID)
}
// CountTotal Get the total number of users. // CountTotal Get the total number of users.
func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) { func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
return u.userDB.CountTotal(ctx, before) return u.userDB.CountTotal(ctx, before)
@@ -239,16 +227,3 @@ func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*
func (u *userDatabase) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error { func (u *userDatabase) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error {
return u.cache.SetUserStatus(ctx, userID, status, platformID) return u.cache.SetUserStatus(ctx, userID, status, platformID)
} }
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value)
}
func (u *userDatabase) DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error {
return u.userDB.DeleteUserCommand(ctx, userID, Type, UUID)
}
func (u *userDatabase) UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
return u.userDB.UpdateUserCommand(ctx, userID, Type, UUID, value)
}
func (u *userDatabase) GetUserCommands(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error) {
commands, err := u.userDB.GetUserCommand(ctx, userID, Type)
return commands, err
}
+1 -18
View File
@@ -16,7 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination" "github.com/OpenIMSDK/tools/pagination"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
@@ -143,20 +143,3 @@ func (f *FriendMgo) FindFriendUserIDs(ctx context.Context, ownerUserID string) (
filter := bson.M{"owner_user_id": ownerUserID} filter := bson.M{"owner_user_id": ownerUserID}
return mgoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1})) return mgoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1}))
} }
// UpdatePinStatus update friend's pin status
func (f *FriendMgo) UpdatePinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error) {
filter := bson.M{"owner_user_id": ownerUserID, "friend_user_id": friendUserID}
// Create an update operation to set the "is_pinned" field to isPinned for all documents.
update := bson.M{"$set": bson.M{"is_pinned": isPinned}}
// Perform the update operation for all documents in the collection.
_, err = f.coll.UpdateMany(ctx, filter, update)
if err != nil {
return errs.Wrap(err, "update pin error")
}
return nil
}
-73
View File
@@ -16,7 +16,6 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/user"
"time" "time"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
@@ -88,78 +87,6 @@ func (u *UserMgo) CountTotal(ctx context.Context, before *time.Time) (count int6
return mgoutil.Count(ctx, u.coll, bson.M{"create_time": bson.M{"$lt": before}}) return mgoutil.Count(ctx, u.coll, bson.M{"create_time": bson.M{"$lt": before}})
} }
func (u *UserMgo) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
collection := u.coll.Database().Collection("userCommands")
// Create a new document instead of updating an existing one
doc := bson.M{
"userID": userID,
"type": Type,
"uuid": UUID,
"createTime": time.Now().Unix(), // assuming you want the creation time in Unix timestamp
"value": value,
}
_, err := collection.InsertOne(ctx, doc)
return err
}
func (u *UserMgo) DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error {
collection := u.coll.Database().Collection("userCommands")
filter := bson.M{"userID": userID, "type": Type, "uuid": UUID}
_, err := collection.DeleteOne(ctx, filter)
return err
}
func (u *UserMgo) UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
collection := u.coll.Database().Collection("userCommands")
filter := bson.M{"userID": userID, "type": Type, "uuid": UUID}
update := bson.M{"$set": bson.M{"value": value}}
_, err := collection.UpdateOne(ctx, filter, update)
return err
}
func (u *UserMgo) GetUserCommand(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error) {
collection := u.coll.Database().Collection("userCommands")
filter := bson.M{"userID": userID, "type": Type}
cursor, err := collection.Find(ctx, filter)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
// Initialize commands as a slice of pointers
commands := []*user.CommandInfoResp{}
for cursor.Next(ctx) {
var document struct {
UUID string `bson:"uuid"`
Value string `bson:"value"`
CreateTime int64 `bson:"createTime"`
}
if err := cursor.Decode(&document); err != nil {
return nil, err
}
commandInfo := &user.CommandInfoResp{ // Change here: use a pointer to the struct
Uuid: document.UUID,
Value: document.Value,
CreateTime: document.CreateTime,
}
commands = append(commands, commandInfo)
}
if err := cursor.Err(); err != nil {
return nil, err
}
return commands, nil
}
func (u *UserMgo) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) { func (u *UserMgo) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) {
pipeline := bson.A{ pipeline := bson.A{
bson.M{ bson.M{
-3
View File
@@ -30,7 +30,6 @@ type FriendModel struct {
AddSource int32 `bson:"add_source"` AddSource int32 `bson:"add_source"`
OperatorUserID string `bson:"operator_user_id"` OperatorUserID string `bson:"operator_user_id"`
Ex string `bson:"ex"` Ex string `bson:"ex"`
IsPinned bool `bson:"is_pinned"`
} }
// FriendModelInterface defines the operations for managing friends in MongoDB. // FriendModelInterface defines the operations for managing friends in MongoDB.
@@ -57,6 +56,4 @@ type FriendModelInterface interface {
FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*FriendModel, err error) FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*FriendModel, err error)
// FindFriendUserIDs retrieves a list of friend user IDs for a given owner. // FindFriendUserIDs retrieves a list of friend user IDs for a given owner.
FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error)
// UpdatePinStatus update friend's pin status
UpdatePinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error)
} }
-6
View File
@@ -16,7 +16,6 @@ package relation
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/user"
"time" "time"
"github.com/OpenIMSDK/tools/pagination" "github.com/OpenIMSDK/tools/pagination"
@@ -61,9 +60,4 @@ type UserModelInterface interface {
CountTotal(ctx context.Context, before *time.Time) (count int64, err error) CountTotal(ctx context.Context, before *time.Time) (count int64, err error)
// 获取范围内用户增量 // 获取范围内用户增量
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
//CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error
UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
GetUserCommand(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error)
} }
@@ -24,8 +24,7 @@ import (
func setupTestEnvironment() { func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim") os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1") os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
os.Setenv("ZOOKEEPER_PORT", "12181")
os.Setenv("ZOOKEEPER_USERNAME", "") os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "") os.Setenv("ZOOKEEPER_PASSWORD", "")
} }
@@ -16,153 +16,91 @@ package kubernetes
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/stathat/consistent"
"os"
"strconv"
"strings"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
// K8sDR represents the Kubernetes service discovery and registration client. // K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct { type K8sDR struct {
options []grpc.DialOption options []grpc.DialOption
rpcRegisterAddr string rpcRegisterAddr string
gatewayHostConsistent *consistent.Consistent
} }
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
gatewayConsistent := consistent.New()
gatewayHosts := getMsgGatewayHost(context.Background()) return &K8sDR{}, nil
for _, v := range gatewayHosts {
gatewayConsistent.Add(v)
}
return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil
} }
// Register registers a service with Kubernetes.
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { cli.rpcRegisterAddr = serviceName
cli.rpcRegisterAddr = serviceName
} else {
cli.rpcRegisterAddr = getSelfHost(context.Background())
}
return nil return nil
} }
// UnRegister removes a service registration from Kubernetes.
func (cli *K8sDR) UnRegister() error { func (cli *K8sDR) UnRegister() error {
return nil return nil
} }
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil return nil
} }
// RegisterConf2Registry registers a configuration to the registry.
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil return nil
} }
// GetConfFromRegistry retrieves a configuration from the registry.
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil return nil, nil
} }
func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
host, err := cli.gatewayHostConsistent.Get(userId)
if err != nil {
log.ZError(ctx, "GetUserIdHashGatewayHost error", err)
}
return host, err
}
func getSelfHost(ctx context.Context) string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
ns := os.Getenv("MY_POD_NAMESPACE")
statefuleIndex := 0
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
count := len(podInfo)
statefuleIndex, _ = strconv.Atoi(podInfo[count-1])
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port)
return host
}
// like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88 // GetConns returns a list of gRPC client connections for a given service.
func getMsgGatewayHost(ctx context.Context) []string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT")
ns := os.Getenv("MY_POD_NAMESPACE")
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
nReplicas, _ := strconv.Atoi(replicas)
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
var ret []string
for i := 0; i < nReplicas; i++ {
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port)
ret = append(ret, host)
}
log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
return ret
}
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName { return []*grpc.ClientConn{conn}, err
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
} else {
var ret []*grpc.ClientConn
gatewayHosts := getMsgGatewayHost(ctx)
for _, host := range gatewayHosts {
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
if err != nil {
return nil, err
} else {
ret = append(ret, conn)
}
}
return ret, nil
}
} }
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
// GetConn returns a single gRPC client connection for a given service.
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
} }
// GetSelfConnTarget returns the connection target of the client itself.
func (cli *K8sDR) GetSelfConnTarget() string { func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr return cli.rpcRegisterAddr
} }
// AddOption adds gRPC dial options to the client.
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...) cli.options = append(cli.options, opts...)
} }
// CloseConn closes a given gRPC client connection.
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close() conn.Close()
} }
// do not use this method for call rpc // do not use this method for call rpc.
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil return nil
} }
// Close closes the K8sDR client.
func (cli *K8sDR) Close() { func (cli *K8sDR) Close() {
// Close any open resources here (if applicable)
return return
} }
@@ -52,18 +52,10 @@ func getEnv(key, fallback string) string {
return fallback return fallback
} }
// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables. // getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
// If the environment variables are not set, it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string { func getZkAddrFromEnv(fallback []string) []string {
address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS") if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
port, portExists := os.LookupEnv("ZOOKEEPER_PORT") return strings.Split(value, ",")
if addrExists && portExists {
addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + port
}
return addresses
} }
return fallback return fallback
} }
+2 -2
View File
@@ -112,6 +112,7 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
//v.Set(constant.CallbackCommand, command) //v.Set(constant.CallbackCommand, command)
//url = url + "/" + v.Encode() //url = url + "/" + v.Encode()
url = url + "/" + command url = url + "/" + command
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
if err != nil { if err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
@@ -120,14 +121,13 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
} }
return errs.ErrNetwork.Wrap(err.Error()) return errs.ErrNetwork.Wrap(err.Error())
} }
defer log.ZDebug(ctx, "callback", "data", string(b))
if err = json.Unmarshal(b, output); err != nil { if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url) log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return nil return nil
} }
return errs.ErrData.WithDetail(err.Error() + "response format error") return errs.ErrData.Wrap(err.Error())
} }
return output.Parse() return output.Parse()
+7 -6
View File
@@ -31,14 +31,15 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
const maxRetry = 10 // number of retries const (
maxRetry = 10 // Maximum number of retries for producer creation
)
var errEmptyMsg = errors.New("kafka binary msg is empty") var errEmptyMsg = errors.New("binary msg is empty")
// Producer represents a Kafka producer.
type Producer struct { type Producer struct {
addr []string
topic string topic string
addr []string
config *sarama.Config config *sarama.Config
producer sarama.SyncProducer producer sarama.SyncProducer
} }
@@ -67,7 +68,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
// Get Kafka configuration from environment variables or fallback to config file // Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username) kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password) kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
// Configure SASL authentication if credentials are provided // Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" { if kafkaUsername != "" && kafkaPassword != "" {
@@ -77,7 +78,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
} }
// Set the Kafka address // Set the Kafka address
p.addr = kafkaAddr p.addr = []string{kafkaAddr}
// Set up TLS configuration (if required) // Set up TLS configuration (if required)
SetupTLSConfig(p.config) SetupTLSConfig(p.config)
-19
View File
@@ -15,9 +15,7 @@
package kafka package kafka
import ( import (
"fmt"
"os" "os"
"strings"
"github.com/IBM/sarama" "github.com/IBM/sarama"
@@ -46,20 +44,3 @@ func getEnvOrConfig(envName string, configValue string) string {
} }
return configValue return configValue
} }
// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getKafkaAddrFromEnv(fallback []string) []string {
envAddr := os.Getenv("KAFKA_ADDRESS")
envPort := os.Getenv("KAFKA_PORT")
if envAddr != "" && envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = fmt.Sprintf("%s:%s", addr, envPort)
}
return addresses
}
return fallback
}
-7
View File
@@ -179,10 +179,3 @@ func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status
}) })
return err return err
} }
func (u *UserRpcClient) GetNotificationByID(ctx context.Context, userID string) error {
_, err := u.Client.GetNotificationAccount(ctx, &user.GetNotificationAccountReq{
UserID: userID,
})
return err
}
+5 -4
View File
@@ -33,20 +33,21 @@ openim::log::info "\n# Begin to check all openim service"
# OpenIM status # OpenIM status
# Elegant printing function # Elegant printing function
print_services_and_ports() { print_services_and_ports() {
service_names=("$1[@]") declare -g service_names=("${!1}")
service_ports=("$2[@]") declare -g service_ports=("${!2}")
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
echo "| Service Name | Port |" echo "| Service Name | Port |"
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
for index in "${!service_names}"; do for index in "${!service_names[@]}"; do
printf "| %-23s | %-8s |\n" "${!service_names[$index]}" "${!service_ports[$index]}" printf "| %-23s | %-8s |\n" "${service_names[$index]}" "${service_ports[$index]}"
done done
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
} }
# Print out services and their ports # Print out services and their ports
print_services_and_ports OPENIM_SERVER_NAME_TARGETS OPENIM_SERVER_PORT_TARGETS print_services_and_ports OPENIM_SERVER_NAME_TARGETS OPENIM_SERVER_PORT_TARGETS
+1 -1
View File
@@ -102,7 +102,7 @@ print_color "Deleted Files: ${deleted_files}" "${BACKGROUND_GREEN}"
if [[ ! $local_branch =~ $valid_branch_regex ]] if [[ ! $local_branch =~ $valid_branch_regex ]]
then then
printError "There is something wrong with your branch name. Branch names in this project must adhere to this contract: $valid_branch_regex. printError "There is something wrong with your branch name. Branch names in this project must adhere to this contract: $valid_branch_regex.
Your commit will be rejected. You should rename your branch to a valid name(feat/name OR fix/name) and try again." Your commit will be rejected. You should rename your branch to a valid name(feat/name OR bug/name) and try again."
printError "For more on this, read on: https://gist.github.com/cubxxw/126b72104ac0b0ca484c9db09c3e5694" printError "For more on this, read on: https://gist.github.com/cubxxw/126b72104ac0b0ca484c9db09c3e5694"
exit 1 exit 1
fi fi
+7
View File
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# This script automatically initializes various configuration files and can generate example files. # This script automatically initializes various configuration files and can generate example files.
set -o errexit set -o errexit
@@ -168,6 +169,12 @@ process_file() {
sleep 0.5 sleep 0.5
} }
declare -A env_vars=(
["OPENIM_IP"]="172.28.0.1"
["DATA_DIR"]="./"
["LOG_STORAGE_LOCATION"]="../logs/"
)
clean_config_files() { clean_config_files() {
local all_templates=("${TEMPLATES[@]}" "${COPY_TEMPLATES[@]}") local all_templates=("${TEMPLATES[@]}" "${COPY_TEMPLATES[@]}")
+3 -5
View File
@@ -66,8 +66,8 @@ def "ENV_FILE" ""${OPENIM_ROOT}"/scripts/install/environment.sh"
###################### Docker compose ################### ###################### Docker compose ###################
# OPENIM AND CHAT # OPENIM AND CHAT
def "CHAT_IMAGE_VERSION" "main" def "CHAT_BRANCH" "main"
def "SERVER_IMAGE_VERSION" "main" def "SERVER_BRANCH" "main"
# Choose the appropriate image address, the default is GITHUB image, # Choose the appropriate image address, the default is GITHUB image,
# you can choose docker hub, for Chinese users can choose Ali Cloud # you can choose docker hub, for Chinese users can choose Ali Cloud
@@ -139,7 +139,7 @@ readonly API_OPENIM_PORT=${API_OPENIM_PORT:-'10002'}
def "API_LISTEN_IP" "0.0.0.0" # API的监听IP def "API_LISTEN_IP" "0.0.0.0" # API的监听IP
###################### openim-chat 配置信息 ###################### ###################### openim-chat 配置信息 ######################
def "OPENIM_CHAT_DATA_DIR" "./openim-chat/${CHAT_IMAGE_VERSION}" def "OPENIM_CHAT_DATA_DIR" "./openim-chat/${CHAT_BRANCH}"
def "OPENIM_CHAT_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # OpenIM服务地址 def "OPENIM_CHAT_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # OpenIM服务地址
def "OPENIM_CHAT_API_PORT" "10008" # OpenIM API端口 def "OPENIM_CHAT_API_PORT" "10008" # OpenIM API端口
def "CHAT_API_LISTEN_IP" "" # OpenIM API的监听IP def "CHAT_API_LISTEN_IP" "" # OpenIM API的监听IP
@@ -353,8 +353,6 @@ def "MANAGER_USERID_3" "openIMAdmin" # 管理员ID 3
def "NICKNAME_1" "system1" # 昵称1 def "NICKNAME_1" "system1" # 昵称1
def "NICKNAME_2" "system2" # 昵称2 def "NICKNAME_2" "system2" # 昵称2
def "NICKNAME_3" "system3" # 昵称3 def "NICKNAME_3" "system3" # 昵称3
def "IM_ADMIN_USERID" "imAdmin" # IM管理员ID
def "IM_ADMIN_NAME" "imAdmin" # IM管理员昵称
def "MULTILOGIN_POLICY" "1" # 多登录策略 def "MULTILOGIN_POLICY" "1" # 多登录策略
def "CHAT_PERSISTENCE_MYSQL" "true" # 聊天持久化MySQL def "CHAT_PERSISTENCE_MYSQL" "true" # 聊天持久化MySQL
def "MSG_CACHE_TIMEOUT" "86400" # 消息缓存超时 def "MSG_CACHE_TIMEOUT" "86400" # 消息缓存超时
+3 -47
View File
@@ -70,16 +70,14 @@ function openim::test::auth() {
#################################### Auth Module #################################### #################################### Auth Module ####################################
# Define a function to get a token for a specific user # Define a function to get a token (Admin Token)
openim::test::get_token() { openim::test::get_token() {
local user_id="${1:-openIM123456}" # Default user ID if not provided
token_response=$(${CCURL} "${OperationID}" "${Header}" ${INSECURE_OPENIMAPI}/auth/user_token \ token_response=$(${CCURL} "${OperationID}" "${Header}" ${INSECURE_OPENIMAPI}/auth/user_token \
-d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "'$user_id'"}') -d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "openIM123456"}')
token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+') token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+')
echo "$token" echo "$token"
} }
Header="-HContent-Type: application/json" Header="-HContent-Type: application/json"
OperationID="-HoperationID: 1646445464564" OperationID="-HoperationID: 1646445464564"
Token="-Htoken: $(openim::test::get_token)" Token="-Htoken: $(openim::test::get_token)"
@@ -532,36 +530,6 @@ EOF
openim::test::check_error "$response" openim::test::check_error "$response"
} }
# Updates the pin status of multiple friends.
openim::test::update_pin_status() {
local ownerUserID="${1}"
shift # Shift the arguments to skip the first one (ownerUserID)
local isPinned="${1}"
shift # Shift the arguments to skip the isPinned argument
# Constructing the list of friendUserIDs
local friendUserIDsArray=()
for friendUserID in "$@"; do
friendUserIDsArray+=("\"${friendUserID}\"")
done
local friendUserIDs=$(IFS=,; echo "${friendUserIDsArray[*]}")
local request_body=$(cat <<EOF
{
"ownerUserID": "${ownerUserID}",
"friendUserIDs": [${friendUserIDs}],
"isPinned": ${isPinned}
}
EOF
)
echo "Requesting to update pin status: $request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/update_pin_status" -d "${request_body}")
echo "Response: $response"
openim::test::check_error "$response"
}
# [openim::test::friend function description] # [openim::test::friend function description]
# The `openim::test::friend` function serves as a test suite for friend-related operations. # The `openim::test::friend` function serves as a test suite for friend-related operations.
# It sequentially invokes all friend-related test functions to ensure the API's friend operations are functioning correctly. # It sequentially invokes all friend-related test functions to ensure the API's friend operations are functioning correctly.
@@ -574,22 +542,17 @@ function openim::test::friend() {
openim::test::user_register "${TEST_USER_ID}" "user01" "new_face_url" openim::test::user_register "${TEST_USER_ID}" "user01" "new_face_url"
openim::test::user_register "${FRIEND_USER_ID}" "frient01" "new_face_url" openim::test::user_register "${FRIEND_USER_ID}" "frient01" "new_face_url"
openim::test::user_register "${BLACK_USER_ID}" "frient02" "new_face_url" openim::test::user_register "${BLACK_USER_ID}" "frient02" "new_face_url"
# 1. Check if two users are friends. # 1. Check if two users are friends.
openim::test::is_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}" openim::test::is_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}"
# 2. Send a friend request from one user to another. # 2. Send a friend request from one user to another.
openim::test::add_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}" openim::test::add_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}"
local original_token=$Token
# Switch to FRIEND_USER_ID's token
local friend_token="-Htoken: $(openim::test::get_token "${FRIEND_USER_ID}")"
# 3. Respond to a friend request. # 3. Respond to a friend request.
# TODO # TODO
# openim::test::add_friend_response "${FRIEND_USER_ID}" "${TEST_USER_ID}" # openim::test::add_friend_response "${FRIEND_USER_ID}" "${TEST_USER_ID}"
Token=$original_token
# 4. Retrieve the friend list of the test user. # 4. Retrieve the friend list of the test user.
openim::test::get_friend_list "${TEST_USER_ID}" openim::test::get_friend_list "${TEST_USER_ID}"
@@ -620,13 +583,6 @@ function openim::test::friend() {
# TODO # TODO
# openim::test::import_friend "${TEST_USER_ID}" "11111114" "11111115" # openim::test::import_friend "${TEST_USER_ID}" "11111114" "11111115"
# 13. pin Friend
# Add this call to your test suite where appropriate
# TODO
# openim::test::update_pin_status "${TEST_USER_ID}" true "${FRIEND_USER_ID}"
#
# openim::test::update_pin_status "${TEST_USER_ID}" false "${FRIEND_USER_ID}"
# Log the completion of the friend test suite. # Log the completion of the friend test suite.
openim::log::success "Friend test suite completed successfully." openim::log::success "Friend test suite completed successfully."
} }
+1 -1
View File
@@ -20,7 +20,7 @@ OPENIM_VERBOSE="${OPENIM_VERBOSE:-5}"
ENABLE_LOGGING="${ENABLE_LOGGING:-true}" ENABLE_LOGGING="${ENABLE_LOGGING:-true}"
# If OPENIM_OUTPUT is not set, set it to the default value # If OPENIM_OUTPUT is not set, set it to the default value
if [ -z "${OPENIM_OUTPUT+x}" ]; then if [[ ! -v OPENIM_OUTPUT ]]; then
OPENIM_OUTPUT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../_output" && pwd -P)" OPENIM_OUTPUT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../_output" && pwd -P)"
fi fi
+3 -20
View File
@@ -302,12 +302,8 @@ openim::util::check_ports() {
# Iterate over each given port. # Iterate over each given port.
for port in "$@"; do for port in "$@"; do
# Use the `ss` command to find process information related to the given port. # Use the `ss` command to find process information related to the given port.
if command -v ss > /dev/null 2>&1; then local info=$(ss -ltnp | grep ":$port" || true)
info=$(ss -ltnp | grep ":$port" || true)
else
info=$(netstat -ltnp | grep ":$port" || true)
fi
# If there's no process information, it means the process associated with the port is not running. # If there's no process information, it means the process associated with the port is not running.
if [[ -z $info ]]; then if [[ -z $info ]]; then
not_started+=($port) not_started+=($port)
@@ -368,18 +364,6 @@ openim::util::check_ports() {
# openim::util::check_process_names nginx mysql redis # openim::util::check_process_names nginx mysql redis
# The function returns a status of 1 if any of the processes is not running. # The function returns a status of 1 if any of the processes is not running.
openim::util::check_process_names() { openim::util::check_process_names() {
# Function to get the port of a process
get_port() {
local pid=$1
if command -v ss > /dev/null 2>&1; then
# used ss comment
ss -ltnp 2>/dev/null | grep $pid | awk '{print $4}' | cut -d ':' -f2
else
# used netstat comment replace ss
netstat -ltnp 2>/dev/null | grep $pid | awk '{print $4}' | sed 's/.*://'
fi
}
# Arrays to collect details of processes # Arrays to collect details of processes
local not_started=() local not_started=()
local started=() local started=()
@@ -398,7 +382,7 @@ openim::util::check_process_names() {
for pid in "${pids[@]}"; do for pid in "${pids[@]}"; do
local command=$(ps -p $pid -o cmd=) local command=$(ps -p $pid -o cmd=)
local start_time=$(ps -p $pid -o lstart=) local start_time=$(ps -p $pid -o lstart=)
local port=$(get_port $pid) local port=$(ss -ltnp 2>/dev/null | grep $pid | awk '{print $4}' | cut -d ':' -f2)
# Check if port information was found for the PID # Check if port information was found for the PID
if [[ -z $port ]]; then if [[ -z $port ]]; then
@@ -435,7 +419,6 @@ openim::util::check_process_names() {
return 0 return 0
fi fi
} }
# openim::util::check_process_names docker-pr # openim::util::check_process_names docker-pr
# The `openim::util::stop_services_on_ports` function stops services running on specified ports. # The `openim::util::stop_services_on_ports` function stops services running on specified ports.
+30 -42
View File
@@ -72,7 +72,7 @@ func initCfg() error {
type checkFunc struct { type checkFunc struct {
name string name string
function func() (string, error) function func() error
} }
func main() { func main() {
@@ -101,13 +101,13 @@ func main() {
allSuccess := true allSuccess := true
for _, check := range checks { for _, check := range checks {
str, err := check.function() err := check.function()
if err != nil { if err != nil {
errorPrint(fmt.Sprintf("Starting %s failed, %v", check.name, err)) errorPrint(fmt.Sprintf("Starting %s failed: %v", check.name, err))
allSuccess = false allSuccess = false
break break
} else { } else {
successPrint(fmt.Sprintf("%s connected successfully, %s", check.name, str)) successPrint(fmt.Sprintf("%s starts successfully", check.name))
} }
} }
@@ -142,22 +142,21 @@ func getEnv(key, fallback string) string {
} }
// checkMongo checks the MongoDB connection // checkMongo checks the MongoDB connection
func checkMongo() (string, error) { func checkMongo() error {
// Use environment variables or fallback to config // Use environment variables or fallback to config
uri := getEnv("MONGO_URI", buildMongoURI()) uri := getEnv("MONGO_URI", buildMongoURI())
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
str := "ths addr is:" + strings.Join(config.Config.Mongo.Address, ",")
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
defer client.Disconnect(context.TODO()) defer client.Disconnect(context.TODO())
if err = client.Ping(context.TODO(), nil); err != nil { if err = client.Ping(context.TODO(), nil); err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
return str, nil return nil
} }
// buildMongoURI constructs the MongoDB URI using configuration settings // buildMongoURI constructs the MongoDB URI using configuration settings
@@ -179,10 +178,10 @@ func buildMongoURI() string {
} }
// checkMinio checks the MinIO connection // checkMinio checks the MinIO connection
func checkMinio() (string, error) { func checkMinio() error {
// Check if MinIO is enabled // Check if MinIO is enabled
if config.Config.Object.Enable != "minio" { if config.Config.Object.Enable != "minio" {
return "", nil return nil
} }
// Prioritize environment variables // Prioritize environment variables
@@ -192,14 +191,13 @@ func checkMinio() (string, error) {
useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default
if endpoint == "" || accessKeyID == "" || secretAccessKey == "" { if endpoint == "" || accessKeyID == "" || secretAccessKey == "" {
return "", ErrConfig.Wrap("MinIO configuration missing") return ErrConfig.Wrap("MinIO configuration missing")
} }
// Parse endpoint URL to determine if SSL is enabled // Parse endpoint URL to determine if SSL is enabled
u, err := url.Parse(endpoint) u, err := url.Parse(endpoint)
if err != nil { if err != nil {
str := "the endpoint is:" + endpoint return errs.Wrap(err)
return "", errs.Wrap(errStr(err, str))
} }
secure := u.Scheme == "https" || useSSL == "true" secure := u.Scheme == "https" || useSSL == "true"
@@ -208,34 +206,31 @@ func checkMinio() (string, error) {
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure, Secure: secure,
}) })
str := "ths addr is:" + u.Host
if err != nil { if err != nil {
strs := fmt.Sprintf("%v;host:%s,accessKeyID:%s,secretAccessKey:%s,Secure:%v", err, u.Host, accessKeyID, secretAccessKey, secure) return errs.Wrap(err)
return "", errs.Wrap(err, strs)
} }
// Perform health check // Perform health check
cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second)
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
defer cancel() defer cancel()
if minioClient.IsOffline() { if minioClient.IsOffline() {
str := fmt.Sprintf("Minio server is offline;%s", str) return ErrComponentStart.Wrap("Minio server is offline")
return "", ErrComponentStart.Wrap(str)
} }
// Check for localhost in API URL and Minio SignEndpoint // Check for localhost in API URL and Minio SignEndpoint
if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" { if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" {
return "", ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1") return ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1")
} }
return str, nil return nil
} }
// checkRedis checks the Redis connection // checkRedis checks the Redis connection
func checkRedis() (string, error) { func checkRedis() error {
// Prioritize environment variables // Prioritize environment variables
address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ",")) address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ","))
username := getEnv("REDIS_USERNAME", config.Config.Redis.Username) username := getEnv("REDIS_USERNAME", config.Config.Redis.Username)
@@ -264,16 +259,15 @@ func checkRedis() (string, error) {
// Ping Redis to check connectivity // Ping Redis to check connectivity
_, err := redisClient.Ping(context.Background()).Result() _, err := redisClient.Ping(context.Background()).Result()
str := "the addr is:" + strings.Join(redisAddresses, ",")
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
return str, nil return nil
} }
// checkZookeeper checks the Zookeeper connection // checkZookeeper checks the Zookeeper connection
func checkZookeeper() (string, error) { func checkZookeeper() error {
// Prioritize environment variables // Prioritize environment variables
schema := getEnv("ZOOKEEPER_SCHEMA", "digest") schema := getEnv("ZOOKEEPER_SCHEMA", "digest")
address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ",")) address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ","))
@@ -284,31 +278,30 @@ func checkZookeeper() (string, error) {
zookeeperAddresses := strings.Split(address, ",") zookeeperAddresses := strings.Split(address, ",")
// Connect to Zookeeper // Connect to Zookeeper
str := "the addr is:" + address
c, _, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary c, _, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
defer c.Close() defer c.Close()
// Set authentication if username and password are provided // Set authentication if username and password are provided
if username != "" && password != "" { if username != "" && password != "" {
if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil { if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
} }
// Check if Zookeeper is reachable // Check if Zookeeper is reachable
_, _, err = c.Get("/") _, _, err = c.Get("/")
if err != nil { if err != nil {
return "", errs.Wrap(err, str) return errs.Wrap(err)
} }
return str, nil return nil
} }
// checkKafka checks the Kafka connection // checkKafka checks the Kafka connection
func checkKafka() (string, error) { func checkKafka() error {
// Prioritize environment variables // Prioritize environment variables
username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username)
password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password)
@@ -328,17 +321,16 @@ func checkKafka() (string, error) {
// kafka.SetupTLSConfig(cfg) // kafka.SetupTLSConfig(cfg)
// Create Kafka client // Create Kafka client
str := "the addr is:" + address
kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg) kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg)
if err != nil { if err != nil {
return "", errs.Wrap(errStr(err, str)) return errs.Wrap(err)
} }
defer kafkaClient.Close() defer kafkaClient.Close()
// Verify if necessary topics exist // Verify if necessary topics exist
topics, err := kafkaClient.Topics() topics, err := kafkaClient.Topics()
if err != nil { if err != nil {
return "", errs.Wrap(err) return errs.Wrap(err)
} }
requiredTopics := []string{ requiredTopics := []string{
@@ -349,11 +341,11 @@ func checkKafka() (string, error) {
for _, requiredTopic := range requiredTopics { for _, requiredTopic := range requiredTopics {
if !isTopicPresent(requiredTopic, topics) { if !isTopicPresent(requiredTopic, topics) {
return "", ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) return ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic))
} }
} }
return str, nil return nil
} }
// isTopicPresent checks if a topic is present in the list of topics // isTopicPresent checks if a topic is present in the list of topics
@@ -381,7 +373,3 @@ func successPrint(s string) {
func warningPrint(s string) { func warningPrint(s string) {
colorPrint(colorYellow, "Warning: But %v", s) colorPrint(colorYellow, "Warning: But %v", s)
} }
func errStr(err error, str string) error {
return fmt.Errorf("%v;%s", err, str)
}
+12
View File
@@ -21,10 +21,22 @@ import (
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
func TestCheckMysql(t *testing.T) {
err := mockInitCfg()
assert.NoError(t, err, "Initialization should not produce errors")
err = checkMysql()
if err != nil {
// You might expect an error if MySQL isn't running locally with the mock credentials.
t.Logf("Expected error due to mock configuration: %v", err)
}
}
// Mock for initCfg for testing purpose // Mock for initCfg for testing purpose
func mockInitCfg() error { func mockInitCfg() error {
config.Config.Mysql.Username = "root" config.Config.Mysql.Username = "root"