Compare commits

..

15 Commits

Author SHA1 Message Date
Xinwei Xiong (cubxxw) e2286f03e5 fix: fix openim zk env set
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-28 01:59:45 +08:00
Xinwei Xiong (cubxxw) f937419175 fix: fix openim zk env set
Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-28 01:40:23 +08:00
Xinwei Xiong 47dd6b17f6 Update openimci.yml (#1610)
* Update openimci.yml

* Update Makefile
2023-12-26 04:16:55 +00:00
Brabem 7389639f17 feat: add the notificationAccount (#1602)
* feat: add notification API

* fix: fix the script

* fix: fix the error
2023-12-26 02:15:15 +00:00
Gordon de451d4cea fix: online notifications do not push to herself. (#1534)
* fix: online notifications do not push to herself.

* fix: online notifications do not push to herself.

* fix: online notifications do not push to herself.
2023-12-25 08:30:18 +00:00
Xinwei Xiong 11a147792d Update prometheus.yml (#1586) 2023-12-25 08:28:42 +00:00
Xinwei Xiong f10528010b Update check-all.sh (#1591) 2023-12-25 08:26:59 +00:00
xuexihuang 34ed032af1 K8s environment supports multiple msggateway by consistent hash (#1600)
* feature:support multi msggateway

* feature:support multi msggateway

* feature:support multi msggateway by hash method

* fix:fix log

* change to consistent hash

* change go.mod

* fix:fix go routine values error

* fix:fix push filter logic

* fix:fix push filter logic

---------

Co-authored-by: lin.huang <lin.huang@apulis.com>
2023-12-25 15:32:07 +08:00
Brabem ed5f012c0d fix: fix the output format (#1585)
* fix: update the component output format

* fix: fix the successful tiops

* fix: update the error format
2023-12-20 02:22:55 +00:00
AndrewZuo01 87610568ae add crud for general function user process, add pinFriend (#1532)
* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* fix bugs

* fix bugs

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* Update go.mod

* Update friend.go

* debug

* debug

* debug

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* I cannot solve todo in test.sh

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* Update go.mod

* fix group notification

* fix group notification

* update openimsdk tools

* update openim server remove duplicate code

* update openim server remove duplicate code

* update user command get

* update user command get

* update response of callback response error

* update black ex

* update join group ex

* update user pb2map

* update go sum

* update go sum

---------

Co-authored-by: Xinwei Xiong <3293172751@qq.com>
2023-12-18 10:39:58 +00:00
Xinwei Xiong c8463a0a80 Update check-all.sh (#1573) 2023-12-18 04:30:38 +00:00
Xinwei Xiong f1c9686ada feat: Add OpenIM server, environment support for Docker Compose, and Kubernetes deployment. (#1559)
* feat: add openim server code

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim mongo and redis env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add zk and redis mongo env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add kafka and redis mongo env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim copyright

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: docker compose

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: remove openim chat config file

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim config set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim config set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix Security vulnerability

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix Security vulnerability

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: docker compose

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* Update kubernetes.go

* Update discoveryregister.go

* fix: copyright-add

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-18 02:24:12 +00:00
xuexihuang c5c5b2fd8e support multipe msggateway services in k8s deployments (#1565)
* feature:support multi msggateway

* feature:support multi msggateway

---------

Co-authored-by: lin.huang <lin.huang@apulis.com>
2023-12-17 09:40:04 +00:00
Xinwei Xiong c2dfc37b0b feat: support mac deployment and optimization make check (#1570)
* feat: add openim server code

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim env

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add openim scripts check and mac support ss comment

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: add mac os

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-16 15:01:15 +00:00
AndrewZuo01 2cef0f62b3 fix group notification (#1557) 2023-12-15 09:14:43 +00:00
53 changed files with 995 additions and 257 deletions
+55 -6
View File
@@ -130,14 +130,14 @@ jobs:
sudo make install sudo make install
execute-scripts: execute-scripts:
name: Execute OpenIM script on ${{ matrix.os }} name: Execute OpenIM Script On ${{ matrix.os }}
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
environment: environment:
name: openim name: openim
strategy: strategy:
matrix: matrix:
go_version: ["1.20"] go_version: ["1.20"]
os: ["ubuntu-latest"] os: ["ubuntu-latest", "macos-latest"]
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4
@@ -154,18 +154,52 @@ jobs:
version: '3.x' # If available, use the latest major version that's compatible version: '3.x' # If available, use the latest major version that's compatible
repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Docker Operations # - name: Install latest Bash (macOS only)
# if: runner.os == 'macOS'
# run: |
# /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# brew update
# brew install bash
# brew install gnu-sed
# echo "/usr/local/bin" >> $GITHUB_PATH
# echo "$(brew --prefix)/opt/gnu-sed/libexec/gnubin" >> $GITHUB_PATH
# continue-on-error: true
- name: Set up Docker for Ubuntu
if: runner.os == 'Linux'
run: | run: |
sudo make init sudo make init
sudo docker compose up -d sudo docker compose up -d
sudo sleep 20 sudo sleep 20
- name: Module Operations # - name: Set up Docker for macOS
# if: runner.os == 'macOS'
# run: |
# brew install --cask docker
# open /Applications/Docker.app
# sleep 10
# docker-compose --version || brew install docker-compose
# docker-compose up -d
# sleep 20
- name: Module Operations for Ubuntu
if: runner.os == 'Linux'
run: | run: |
sudo make tidy sudo make tidy
sudo make tools.verify.go-gitlint sudo make tools.verify.go-gitlint
- name: Build, Start, Check Services and Print Logs # - name: Module Operations for macOS
# if: runner.os == 'macOS'
# run: |
# make tidy
# make tools.verify.go-gitlint
- name: Build, Start, Check Services and Print Logs for Ubuntu
if: runner.os == 'Linux'
run: | run: |
sudo make init && \ sudo make init && \
sudo make build && \ sudo make build && \
@@ -173,6 +207,21 @@ jobs:
sudo make check || \ sudo make check || \
(echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null) (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
- name: Restart Services and Print Logs for Ubuntu
if: runner.os == 'Linux'
run: |
sudo make restart
sudo make check
# - name: Build, Start, Check Services and Print Logs for macOS
# if: runner.os == 'macOS'
# run: |
# make init && \
# make build && \
# make start && \
# make check || \
# (echo "An error occurred, printing logs:" && sudo cat ./_output/logs/* 2>/dev/null)
openim-test-build-image: openim-test-build-image:
name: Build OpenIM Docker Image name: Build OpenIM Docker Image
runs-on: ubuntu-latest runs-on: ubuntu-latest
@@ -196,4 +245,4 @@ jobs:
- name: Test Docker Build - name: Test Docker Build
run: | run: |
sudo make init sudo make init
sudo make image sudo make image
+1 -1
View File
@@ -10,7 +10,7 @@ ENV GOPROXY=$GOPROXY
# Set up the working directory # Set up the working directory
WORKDIR /openim/openim-server WORKDIR /openim/openim-server
COPY go.mod go.sum ./dd COPY go.mod go.sum ./
RUN go mod download RUN go mod download
# Copy all files to the container # Copy all files to the container
+1 -1
View File
@@ -95,7 +95,7 @@ stop:
## restart: Restart openim (make init configuration file is initialized) ✨ ## restart: Restart openim (make init configuration file is initialized) ✨
.PHONY: restart .PHONY: restart
restart: clean stop build init start check restart: clean stop build start check
## multiarch: Build binaries for multiple platforms. See option PLATFORMS. ✨ ## multiarch: Build binaries for multiple platforms. See option PLATFORMS. ✨
.PHONY: multiarch .PHONY: multiarch
+5 -4
View File
@@ -210,8 +210,9 @@ API_OPENIM_PORT=10002
# ====================================== # ======================================
# Branch name for OpenIM chat. # Branch name for OpenIM chat.
# Default: CHAT_BRANCH=main # Default: CHAT_IMAGE_VERSION=main
CHAT_BRANCH=main # https://github.com/openimsdk/open-im-server/blob/main/docs/contrib/version.md
CHAT_IMAGE_VERSION=main
# Address or hostname for the OpenIM chat service. # Address or hostname for the OpenIM chat service.
# Default: OPENIM_CHAT_ADDRESS=172.28.0.1 # Default: OPENIM_CHAT_ADDRESS=172.28.0.1
@@ -231,8 +232,8 @@ OPENIM_CHAT_DATA_DIR=./openim-chat/main
# ====================================== # ======================================
# Branch name for OpenIM server. # Branch name for OpenIM server.
# Default: SERVER_BRANCH=main # Default: SERVER_IMAGE_VERSION=main
SERVER_BRANCH=main SERVER_IMAGE_VERSION=main
# Port for the OpenIM admin API. # Port for the OpenIM admin API.
# Default: OPENIM_ADMIN_API_PORT=10009 # Default: OPENIM_ADMIN_API_PORT=10009
+4 -4
View File
@@ -210,8 +210,8 @@ API_OPENIM_PORT=${API_OPENIM_PORT}
# ====================================== # ======================================
# Branch name for OpenIM chat. # Branch name for OpenIM chat.
# Default: CHAT_BRANCH=main # Default: CHAT_IMAGE_VERSION=main
CHAT_BRANCH=${CHAT_BRANCH} CHAT_IMAGE_VERSION=${CHAT_IMAGE_VERSION}
# Address or hostname for the OpenIM chat service. # Address or hostname for the OpenIM chat service.
# Default: OPENIM_CHAT_ADDRESS=172.28.0.1 # Default: OPENIM_CHAT_ADDRESS=172.28.0.1
@@ -231,8 +231,8 @@ OPENIM_CHAT_DATA_DIR=${OPENIM_CHAT_DATA_DIR}
# ====================================== # ======================================
# Branch name for OpenIM server. # Branch name for OpenIM server.
# Default: SERVER_BRANCH=main # Default: SERVER_IMAGE_VERSION=main
SERVER_BRANCH=${SERVER_BRANCH} SERVER_IMAGE_VERSION=${SERVER_IMAGE_VERSION}
# Port for the OpenIM admin API. # Port for the OpenIM admin API.
# Default: OPENIM_ADMIN_API_PORT=10009 # Default: OPENIM_ADMIN_API_PORT=10009
+8
View File
@@ -247,6 +247,14 @@ manager:
userID: [ "${MANAGER_USERID_1}", "${MANAGER_USERID_2}", "${MANAGER_USERID_3}" ] userID: [ "${MANAGER_USERID_1}", "${MANAGER_USERID_2}", "${MANAGER_USERID_3}" ]
nickname: [ "${NICKNAME_1}", "${NICKNAME_2}", "${NICKNAME_3}" ] nickname: [ "${NICKNAME_1}", "${NICKNAME_2}", "${NICKNAME_3}" ]
# chatAdmin, use for send notification
#
# Built-in app system notification account ID
# Built-in app system notification account nickname
im-admin:
userID: [ "${IM_ADMIN_USERID}" ]
nickname: [ "${IM_ADMIN_NAME}" ]
# Multi-platform login policy # Multi-platform login policy
# For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time # For each platform(Android, iOS, Windows, Mac, web), only one can be online at a time
multiLoginPolicy: ${MULTILOGIN_POLICY} multiLoginPolicy: ${MULTILOGIN_POLICY}
+10 -10
View File
@@ -44,12 +44,12 @@ scrape_configs:
# prometheus fetches application services # prometheus fetches application services
- job_name: 'openimserver-openim-api' - job_name: 'openimserver-openim-api'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${API_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${API_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-msggateway' - job_name: 'openimserver-openim-msggateway'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${MSG_GATEWAY_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MSG_GATEWAY_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-msgtransfer' - job_name: 'openimserver-openim-msgtransfer'
@@ -59,41 +59,41 @@ scrape_configs:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-push' - job_name: 'openimserver-openim-push'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${PUSH_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${PUSH_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-auth' - job_name: 'openimserver-openim-rpc-auth'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${AUTH_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${AUTH_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-conversation' - job_name: 'openimserver-openim-rpc-conversation'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${CONVERSATION_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${CONVERSATION_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-friend' - job_name: 'openimserver-openim-rpc-friend'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${FRIEND_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${FRIEND_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-group' - job_name: 'openimserver-openim-rpc-group'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${GROUP_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${GROUP_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-msg' - job_name: 'openimserver-openim-rpc-msg'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${MESSAGE_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${MESSAGE_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-third' - job_name: 'openimserver-openim-rpc-third'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${THIRD_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${THIRD_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
- job_name: 'openimserver-openim-rpc-user' - job_name: 'openimserver-openim-rpc-user'
static_configs: static_configs:
- targets: [ '${OPENIM_SERVER_ADDRESS}:${USER_PROM_PORT}' ] - targets: [ '${DOCKER_BRIDGE_GATEWAY}:${USER_PROM_PORT}' ]
labels: labels:
namespace: 'default' namespace: 'default'
+43 -41
View File
@@ -104,18 +104,18 @@ Docker deployment offers a slightly more intricate template. Within the [openim-
Configuration file modifications can be made by specifying corresponding environment variables, for instance: Configuration file modifications can be made by specifying corresponding environment variables, for instance:
```bash ```bash
export CHAT_BRANCH="main" export CHAT_IMAGE_VERSION="main"
export SERVER_BRANCH="main" export SERVER_IMAGE_VERSION="main"
``` ```
These variables are stored within the [`environment.sh`](https://github.com/OpenIMSDK/openim-docker/blob/main/scripts/install/environment.sh) configuration: These variables are stored within the [`environment.sh`](https://github.com/OpenIMSDK/openim-docker/blob/main/scripts/install/environment.sh) configuration:
```bash ```bash
readonly CHAT_BRANCH=${CHAT_BRANCH:-'main'} readonly CHAT_IMAGE_VERSION=${CHAT_IMAGE_VERSION:-'main'}
readonly SERVER_BRANCH=${SERVER_BRANCH:-'main'} readonly SERVER_IMAGE_VERSION=${SERVER_IMAGE_VERSION:-'main'}
``` ```
Setting a variable, e.g., `export CHAT_BRANCH="release-v1.3"`, will prioritize `CHAT_BRANCH="release-v1.3"` as the variable value. Ultimately, the chosen image version is determined, and rendering is achieved through `make init` (or `./scripts/init-config.sh`). Setting a variable, e.g., `export CHAT_IMAGE_VERSION="release-v1.3"`, will prioritize `CHAT_IMAGE_VERSION="release-v1.3"` as the variable value. Ultimately, the chosen image version is determined, and rendering is achieved through `make init` (or `./scripts/init-config.sh`).
> Note: Direct modifications to the `config.yaml` file are also permissible without utilizing `make init`. > Note: Direct modifications to the `config.yaml` file are also permissible without utilizing `make init`.
@@ -453,43 +453,45 @@ This section involves configuring the log settings, including storage location,
This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat. This section involves setting up additional configuration variables for Websocket, Push Notifications, and Chat.
| Parameter | Example Value | Description | | Parameter | Example Value | Description |
|-------------------------|-------------------|------------------------------------| |-------------------------|-------------------|----------------------------------|
| WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections | | WEBSOCKET_MAX_CONN_NUM | "100000" | Maximum Websocket connections |
| WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length | | WEBSOCKET_MAX_MSG_LEN | "4096" | Maximum Websocket message length |
| WEBSOCKET_TIMEOUT | "10" | Websocket timeout | | WEBSOCKET_TIMEOUT | "10" | Websocket timeout |
| PUSH_ENABLE | "getui" | Push notification enable status | | PUSH_ENABLE | "getui" | Push notification enable status |
| GETUI_PUSH_URL | [Generated URL] | GeTui Push Notification URL | | GETUI_PUSH_URL | [Generated URL] | GeTui Push Notification URL |
| GETUI_MASTER_SECRET | [User Defined] | GeTui Master Secret | | GETUI_MASTER_SECRET | [User Defined] | GeTui Master Secret |
| GETUI_APP_KEY | [User Defined] | GeTui Application Key | | GETUI_APP_KEY | [User Defined] | GeTui Application Key |
| GETUI_INTENT | [User Defined] | GeTui Push Intent | | GETUI_INTENT | [User Defined] | GeTui Push Intent |
| GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID | | GETUI_CHANNEL_ID | [User Defined] | GeTui Channel ID |
| GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name | | GETUI_CHANNEL_NAME | [User Defined] | GeTui Channel Name |
| FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account | | FCM_SERVICE_ACCOUNT | "x.json" | FCM Service Account |
| JPNS_APP_KEY | [User Defined] | JPNS Application Key | | JPNS_APP_KEY | [User Defined] | JPNS Application Key |
| JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret | | JPNS_MASTER_SECRET | [User Defined] | JPNS Master Secret |
| JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL | | JPNS_PUSH_URL | [User Defined] | JPNS Push Notification URL |
| JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent | | JPNS_PUSH_INTENT | [User Defined] | JPNS Push Intent |
| MANAGER_USERID_1 | "openIM123456" | Administrator ID 1 | | MANAGER_USERID_1 | "openIM123456" | Administrator ID 1 |
| MANAGER_USERID_2 | "openIM654321" | Administrator ID 2 | | MANAGER_USERID_2 | "openIM654321" | Administrator ID 2 |
| MANAGER_USERID_3 | "openIMAdmin" | Administrator ID 3 | | MANAGER_USERID_3 | "openIMAdmin" | Administrator ID 3 |
| NICKNAME_1 | "system1" | Nickname 1 | | NICKNAME_1 | "system1" | Nickname 1 |
| NICKNAME_2 | "system2" | Nickname 2 | | NICKNAME_2 | "system2" | Nickname 2 |
| NICKNAME_3 | "system3" | Nickname 3 | | NICKNAME_3 | "system3" | Nickname 3 |
| MULTILOGIN_POLICY | "1" | Multi-login Policy | | IM_ADMIN_USERID | "imAdmin" | IM Administrator ID |
| CHAT_PERSISTENCE_MYSQL | "true" | Chat Persistence in MySQL | | IM_ADMIN_NAME | "imAdmin" | IM Administrator Nickname |
| MSG_CACHE_TIMEOUT | "86400" | Message Cache Timeout | | MULTILOGIN_POLICY | "1" | Multi-login Policy |
| GROUP_MSG_READ_RECEIPT | "true" | Group Message Read Receipt Enable | | CHAT_PERSISTENCE_MYSQL | "true" | Chat Persistence in MySQL |
| MSG_CACHE_TIMEOUT | "86400" | Message Cache Timeout |
| GROUP_MSG_READ_RECEIPT | "true" | Group Message Read Receipt Enable |
| SINGLE_MSG_READ_RECEIPT | "true" | Single Message Read Receipt Enable | | SINGLE_MSG_READ_RECEIPT | "true" | Single Message Read Receipt Enable |
| RETAIN_CHAT_RECORDS | "365" | Retain Chat Records (in days) | | RETAIN_CHAT_RECORDS | "365" | Retain Chat Records (in days) |
| CHAT_RECORDS_CLEAR_TIME | [Cron Expression] | Chat Records Clear Time | | CHAT_RECORDS_CLEAR_TIME | [Cron Expression] | Chat Records Clear Time |
| MSG_DESTRUCT_TIME | [Cron Expression] | Message Destruct Time | | MSG_DESTRUCT_TIME | [Cron Expression] | Message Destruct Time |
| SECRET | "${PASSWORD}" | Secret Key | | SECRET | "${PASSWORD}" | Secret Key |
| TOKEN_EXPIRE | "90" | Token Expiry Time | | TOKEN_EXPIRE | "90" | Token Expiry Time |
| FRIEND_VERIFY | "false" | Friend Verification Enable | | FRIEND_VERIFY | "false" | Friend Verification Enable |
| IOS_PUSH_SOUND | "xxx" | iOS | | IOS_PUSH_SOUND | "xxx" | iOS |
| CALLBACK_ENABLE | "false" | Enable callback | | CALLBACK_ENABLE | "false" | Enable callback |
| CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call | | CALLBACK_TIMEOUT | "5" | Maximum timeout for callback call |
| CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step | | CALLBACK_FAILED_CONTINUE| "true" | fails to continue to the next step |
### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration ### 2.20. <a name='PrometheusConfiguration-1'></a>Prometheus Configuration
+6 -2
View File
@@ -4,8 +4,6 @@ go 1.19
require ( require (
firebase.google.com/go v3.13.0+incompatible firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.31
github.com/OpenIMSDK/tools v0.0.20
github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1 github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1 github.com/gin-gonic/gin v1.9.1
@@ -35,9 +33,12 @@ require github.com/google/uuid v1.3.1
require ( require (
github.com/IBM/sarama v1.41.3 github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.36
github.com/OpenIMSDK/tools v0.0.21
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.2.1 github.com/redis/go-redis/v9 v9.2.1
github.com/stathat/consistent v1.0.0
github.com/tencentyun/cos-go-sdk-v5 v0.7.45 github.com/tencentyun/cos-go-sdk-v5 v0.7.45
go.uber.org/automaxprocs v1.5.3 go.uber.org/automaxprocs v1.5.3
golang.org/x/sync v0.4.0 golang.org/x/sync v0.4.0
@@ -141,6 +142,7 @@ require (
gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect gopkg.in/src-d/go-billy.v4 v4.3.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect
gorm.io/gorm v1.23.8 // indirect gorm.io/gorm v1.23.8 // indirect
stathat.com/c/consistent v1.0.0 // indirect
) )
require ( require (
@@ -154,3 +156,5 @@ require (
golang.org/x/crypto v0.14.0 // indirect golang.org/x/crypto v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
) )
replace github.com/OpenIMSDK/protocol v0.0.36 => github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5
+8 -4
View File
@@ -18,10 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE= github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48=
github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k=
github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs= github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
@@ -227,6 +225,8 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw= github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w= github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w= github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5 h1:nmrJmAgQsCAxKgw109kaTcBV4rMWDRvqOson0ehw708=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
@@ -308,6 +308,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4= github.com/src-d/gcfg v1.4.0 h1:xXbNR5AlLSA315x2UO+fTSSAXCDf+Ar38/6oyGbDKQ4=
github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI= github.com/src-d/gcfg v1.4.0/go.mod h1:p/UMsR43ujA89BJY9duynAwIpvqEujIH/jFlfL7jWoI=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
@@ -536,3 +538,5 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
stathat.com/c/consistent v1.0.0 h1:ezyc51EGcRPJUxfHGSgJjWzJdj3NiMU9pNfLNGiXV0c=
stathat.com/c/consistent v1.0.0/go.mod h1:QkzMWzcbB+yQBL2AttO6sgsQS/JSTapcDISJalmCDS0=
+2 -3
View File
@@ -169,9 +169,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
case constant.OANotification: case constant.OANotification:
data = apistruct.OANotificationElem{} data = apistruct.OANotificationElem{}
req.SessionType = constant.NotificationChatType req.SessionType = constant.NotificationChatType
if !authverify.IsManagerUserID(req.SendID) { if err = m.userRpcClient.GetNotificationByID(c, req.SendID); err != nil {
return nil, errs.ErrNoPermission. return nil, err
Wrap("only app manager can as sender send OANotificationElem")
} }
default: default:
return nil, errs.ErrArgs.WithDetail("not support err contentType") return nil, errs.ErrArgs.WithDetail("not support err contentType")
+10
View File
@@ -77,6 +77,15 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus) userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus)
userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus) userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus)
userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus) userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus)
userRouterGroup.POST("/process_user_command_add", ParseToken, u.ProcessUserCommandAdd)
userRouterGroup.POST("/process_user_command_delete", ParseToken, u.ProcessUserCommandDelete)
userRouterGroup.POST("/process_user_command_update", ParseToken, u.ProcessUserCommandUpdate)
userRouterGroup.POST("/process_user_command_get", ParseToken, u.ProcessUserCommandGet)
userRouterGroup.POST("/add_notification_account", ParseToken, u.AddNotificationAccount)
userRouterGroup.POST("/update_notification_account", ParseToken, u.UpdateNotificationAccountInfo)
userRouterGroup.POST("/search_notification_account", ParseToken, u.SearchNotificationAccount)
} }
// friend routing group // friend routing group
friendRouterGroup := r.Group("/friend", ParseToken) friendRouterGroup := r.Group("/friend", ParseToken)
@@ -98,6 +107,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
friendRouterGroup.POST("/is_friend", f.IsFriend) friendRouterGroup.POST("/is_friend", f.IsFriend)
friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs) friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs)
friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo) friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo)
//friendRouterGroup.POST("/set_pin_friend", f.SetPinFriends)
} }
g := NewGroupApi(*groupRpc) g := NewGroupApi(*groupRpc)
groupRouterGroup := r.Group("/group", ParseToken) groupRouterGroup := r.Group("/group", ParseToken)
+33 -2
View File
@@ -15,8 +15,6 @@
package api package api
import ( import (
"github.com/gin-gonic/gin"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/protocol/msggateway" "github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/protocol/user" "github.com/OpenIMSDK/protocol/user"
@@ -24,6 +22,7 @@ import (
"github.com/OpenIMSDK/tools/apiresp" "github.com/OpenIMSDK/tools/apiresp"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log" "github.com/OpenIMSDK/tools/log"
"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@@ -199,3 +198,35 @@ func (u *UserApi) GetUserStatus(c *gin.Context) {
func (u *UserApi) GetSubscribeUsersStatus(c *gin.Context) { func (u *UserApi) GetSubscribeUsersStatus(c *gin.Context) {
a2r.Call(user.UserClient.GetSubscribeUsersStatus, u.Client, c) a2r.Call(user.UserClient.GetSubscribeUsersStatus, u.Client, c)
} }
// ProcessUserCommandAdd user general function add
func (u *UserApi) ProcessUserCommandAdd(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandAdd, u.Client, c)
}
// ProcessUserCommandDelete user general function delete
func (u *UserApi) ProcessUserCommandDelete(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandDelete, u.Client, c)
}
// ProcessUserCommandUpdate user general function update
func (u *UserApi) ProcessUserCommandUpdate(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandUpdate, u.Client, c)
}
// ProcessUserCommandGet user general function get
func (u *UserApi) ProcessUserCommandGet(c *gin.Context) {
a2r.Call(user.UserClient.ProcessUserCommandGet, u.Client, c)
}
func (u *UserApi) AddNotificationAccount(c *gin.Context) {
a2r.Call(user.UserClient.AddNotificationAccount, u.Client, c)
}
func (u *UserApi) UpdateNotificationAccountInfo(c *gin.Context) {
a2r.Call(user.UserClient.UpdateNotificationAccountInfo, u.Client, c)
}
func (u *UserApi) SearchNotificationAccount(c *gin.Context) {
a2r.Call(user.UserClient.SearchNotificationAccount, u.Client, c)
}
+7 -6
View File
@@ -288,12 +288,13 @@ func (ws *WsServer) registerClient(client *Client) {
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) if config.Config.Envs.Discovery == "zookeeper" {
go func() { wg.Add(1)
defer wg.Done() go func() {
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) defer wg.Done()
}() _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
+6 -5
View File
@@ -67,13 +67,14 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
case constant.SuperGroupChatType: case constant.SuperGroupChatType:
err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData) err = c.pusher.Push2SuperGroup(ctx, pbData.MsgData.GroupID, pbData.MsgData)
default: default:
var pushUserIDs []string var pushUserIDList []string
if pbData.MsgData.SendID != pbData.MsgData.RecvID { isSenderSync := utils.GetSwitchFromOptions(pbData.MsgData.Options, constant.IsSenderSync)
pushUserIDs = []string{pbData.MsgData.SendID, pbData.MsgData.RecvID} if !isSenderSync || pbData.MsgData.SendID == pbData.MsgData.RecvID {
pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID)
} else { } else {
pushUserIDs = []string{pbData.MsgData.SendID} pushUserIDList = append(pushUserIDList, pbData.MsgData.RecvID, pbData.MsgData.SendID)
} }
err = c.pusher.Push2User(ctx, pushUserIDs, pbData.MsgData) err = c.pusher.Push2User(ctx, pushUserIDList, pbData.MsgData)
} }
if err != nil { if err != nil {
if err == errNoOfflinePusher { if err == errNoOfflinePusher {
+1 -2
View File
@@ -16,9 +16,8 @@ package push
import ( import (
"context" "context"
"sync"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"sync"
"google.golang.org/grpc" "google.golang.org/grpc"
+118 -21
View File
@@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"google.golang.org/grpc"
"sync" "sync"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@@ -142,6 +143,47 @@ func (p *Pusher) UnmarshalNotificationElem(bytes []byte, t any) error {
return json.Unmarshal([]byte(notification.Detail), t) return json.Unmarshal([]byte(notification.Detail), t)
} }
/*
k8s deployment,offline push group messages function
*/
func (p *Pusher) k8sOfflinePush2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData, wsResults []*msggateway.SingleMsgToUserResults) error {
var needOfflinePushUserIDs []string
for _, v := range wsResults {
if !v.OnlinePush {
needOfflinePushUserIDs = append(needOfflinePushUserIDs, v.UserID)
}
}
if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string
err := callbackOfflinePush(ctx, needOfflinePushUserIDs, msg, &offlinePushUserIDs)
if err != nil {
return err
}
if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs
}
if msg.ContentType != constant.SignalingNotification {
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
ctx,
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
)
if err != nil {
return err
}
if len(resp.UserIDs) > 0 {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
}
}
}
return nil
}
func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) log.ZDebug(ctx, "Get super group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string var pushToUserIDs []string
@@ -205,7 +247,10 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg) log.ZDebug(ctx, "get conn and online push success", "result", wsResults, "msg", msg)
isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush) isOfflinePush := utils.GetSwitchFromOptions(msg.Options, constant.IsOfflinePush)
if isOfflinePush { if isOfflinePush && config.Config.Envs.Discovery == "k8s" {
return p.k8sOfflinePush2SuperGroup(ctx, groupID, msg, wsResults)
}
if isOfflinePush && config.Config.Envs.Discovery == "zookeeper" {
var ( var (
onlineSuccessUserIDs = []string{msg.SendID} onlineSuccessUserIDs = []string{msg.SendID}
webAndPcBackgroundUserIDs []string webAndPcBackgroundUserIDs []string
@@ -239,14 +284,7 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
} }
needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs) needOfflinePushUserIDs := utils.DifferenceString(onlineSuccessUserIDs, pushToUserIDs)
if msg.ContentType != constant.SignalingNotification {
notNotificationUserIDs, err := p.conversationLocalCache.GetRecvMsgNotNotifyUserIDs(ctx, groupID)
if err != nil {
return err
}
needOfflinePushUserIDs = utils.SliceSub(needOfflinePushUserIDs, notNotificationUserIDs)
}
// Use offline push messaging // Use offline push messaging
if len(needOfflinePushUserIDs) > 0 { if len(needOfflinePushUserIDs) > 0 {
var offlinePushUserIDs []string var offlinePushUserIDs []string
@@ -258,30 +296,89 @@ func (p *Pusher) Push2SuperGroup(ctx context.Context, groupID string, msg *sdkws
if len(offlinePushUserIDs) > 0 { if len(offlinePushUserIDs) > 0 {
needOfflinePushUserIDs = offlinePushUserIDs needOfflinePushUserIDs = offlinePushUserIDs
} }
resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs( if msg.ContentType != constant.SignalingNotification {
ctx, resp, err := p.conversationRpcClient.Client.GetConversationOfflinePushUserIDs(
&conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs}, ctx,
) &conversation.GetConversationOfflinePushUserIDsReq{ConversationID: utils.GenGroupConversationID(groupID), UserIDs: needOfflinePushUserIDs},
if err != nil { )
return err
}
if len(resp.UserIDs) > 0 {
err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
if err != nil { if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err return err
} }
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil { if len(resp.UserIDs) > 0 {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs)) err = p.offlinePushMsg(ctx, groupID, msg, resp.UserIDs)
return err if err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg)
return err
}
if _, err := p.GetConnsAndOnlinePush(ctx, msg, utils.IntersectString(resp.UserIDs, webAndPcBackgroundUserIDs)); err != nil {
log.ZError(ctx, "offlinePushMsg failed", err, "groupID", groupID, "msg", msg, "userIDs", utils.IntersectString(needOfflinePushUserIDs, webAndPcBackgroundUserIDs))
return err
}
} }
} }
} }
} }
return nil return nil
} }
func (p *Pusher) k8sOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
var usersHost = make(map[string][]string)
for _, v := range pushToUserIDs {
tHost, err := p.discov.GetUserIdHashGatewayHost(ctx, v)
if err != nil {
log.ZError(ctx, "get msggateway hash error", err)
return nil, err
}
tUsers, tbl := usersHost[tHost]
if tbl {
tUsers = append(tUsers, v)
usersHost[tHost] = tUsers
} else {
usersHost[tHost] = []string{v}
}
}
log.ZDebug(ctx, "genUsers send hosts struct:", "usersHost", usersHost)
var usersConns = make(map[*grpc.ClientConn][]string)
for host, userIds := range usersHost {
tconn, _ := p.discov.GetConn(ctx, host)
usersConns[tconn] = userIds
}
var (
mu sync.Mutex
wg = errgroup.Group{}
maxWorkers = config.Config.Push.MaxConcurrentWorkers
)
if maxWorkers < 3 {
maxWorkers = 3
}
wg.SetLimit(maxWorkers)
for conn, userIds := range usersConns {
tcon := conn
tuserIds := userIds
wg.Go(func() error {
input := &msggateway.OnlineBatchPushOneMsgReq{MsgData: msg, PushToUserIDs: tuserIds}
msgClient := msggateway.NewMsgGatewayClient(tcon)
reply, err := msgClient.SuperGroupOnlineBatchPushOneMsg(ctx, input)
if err != nil {
return nil
}
log.ZDebug(ctx, "push result", "reply", reply)
if reply != nil && reply.SinglePushResult != nil {
mu.Lock()
wsResults = append(wsResults, reply.SinglePushResult...)
mu.Unlock()
}
return nil
})
}
_ = wg.Wait()
return wsResults, nil
}
func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) { func (p *Pusher) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) (wsResults []*msggateway.SingleMsgToUserResults, err error) {
if config.Config.Envs.Discovery == "k8s" {
return p.k8sOnlinePush(ctx, msg, pushToUserIDs)
}
conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) conns, err := p.discov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
log.ZDebug(ctx, "get gateway conn", "conn length", len(conns)) log.ZDebug(ctx, "get gateway conn", "conn length", len(conns))
if err != nil { if err != nil {
+1
View File
@@ -79,6 +79,7 @@ func (s *friendServer) AddBlack(ctx context.Context, req *pbfriend.AddBlackReq)
BlockUserID: req.BlackUserID, BlockUserID: req.BlackUserID,
OperatorUserID: mcontext.GetOpUserID(ctx), OperatorUserID: mcontext.GetOpUserID(ctx),
CreateTime: time.Now(), CreateTime: time.Now(),
Ex: req.Ex,
} }
if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil { if err := s.blackDatabase.Create(ctx, []*relation.BlackModel{&black}); err != nil {
return nil, err return nil, err
+37 -2
View File
@@ -53,8 +53,9 @@ type friendServer struct {
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
} }
func (s *friendServer) PinFriends(ctx context.Context, req *pbfriend.PinFriendsReq) (*pbfriend.PinFriendsResp, error) { func (s *friendServer) UpdateFriends(ctx context.Context, req *pbfriend.UpdateFriendsReq) (*pbfriend.UpdateFriendsResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented") //TODO implement me
panic("implement me")
} }
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
@@ -411,6 +412,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
} }
var friendInfo *sdkws.FriendInfo var friendInfo *sdkws.FriendInfo
if friend := friendMap[userID]; friend != nil { if friend := friendMap[userID]; friend != nil {
friendInfo = &sdkws.FriendInfo{ friendInfo = &sdkws.FriendInfo{
OwnerUserID: friend.OwnerUserID, OwnerUserID: friend.OwnerUserID,
Remark: friend.Remark, Remark: friend.Remark,
@@ -418,6 +420,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
AddSource: friend.AddSource, AddSource: friend.AddSource,
OperatorUserID: friend.OperatorUserID, OperatorUserID: friend.OperatorUserID,
Ex: friend.Ex, Ex: friend.Ex,
IsPinned: friend.IsPinned,
} }
} }
var blackInfo *sdkws.BlackInfo var blackInfo *sdkws.BlackInfo
@@ -438,3 +441,35 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
} }
return resp, nil return resp, nil
} }
func (s *friendServer) PinFriends(
ctx context.Context,
req *pbfriend.UpdateFriendsReq,
) (*pbfriend.UpdateFriendsResp, error) {
if len(req.FriendUserIDs) == 0 {
return nil, errs.ErrArgs.Wrap("friendIDList is empty")
}
if utils.Duplicate(req.FriendUserIDs) {
return nil, errs.ErrArgs.Wrap("friendIDList repeated")
}
var isPinned bool
if req.IsPinned != nil {
isPinned = req.IsPinned.Value
} else {
return nil, errs.ErrArgs.Wrap("isPinned is nil")
}
//check whther in friend list
_, err := s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err
}
//set friendslist friend pin status to isPinned
for _, friendID := range req.FriendUserIDs {
if err := s.friendDatabase.UpdateFriendPinStatus(ctx, req.OwnerUserID, friendID, isPinned); err != nil {
return nil, err
}
}
resp := &pbfriend.UpdateFriendsResp{}
return resp, nil
}
+4 -4
View File
@@ -327,8 +327,6 @@ func CallbackBeforeInviteUserToGroup(ctx context.Context, req *group.InviteUserT
// Handle the scenario where certain members are refused // Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic // You might want to update the req.Members list or handle it as per your business logic
} }
utils.StructFieldNotNilReplace(req, resp)
return nil return nil
} }
@@ -395,7 +393,10 @@ func CallbackBeforeSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq)
if resp.ApplyMemberFriend != nil { if resp.ApplyMemberFriend != nil {
req.GroupInfoForSet.ApplyMemberFriend = wrapperspb.Int32(*resp.ApplyMemberFriend) req.GroupInfoForSet.ApplyMemberFriend = wrapperspb.Int32(*resp.ApplyMemberFriend)
} }
utils.StructFieldNotNilReplace(req, resp) utils.NotNilReplace(&req.GroupInfoForSet.GroupID, &resp.GroupID)
utils.NotNilReplace(&req.GroupInfoForSet.GroupName, &resp.GroupName)
utils.NotNilReplace(&req.GroupInfoForSet.FaceURL, &resp.FaceURL)
utils.NotNilReplace(&req.GroupInfoForSet.Introduction, &resp.Introduction)
return nil return nil
} }
func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error { func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq) error {
@@ -426,6 +427,5 @@ func CallbackAfterSetGroupInfo(ctx context.Context, req *group.SetGroupInfoReq)
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterSetGroupInfo); err != nil {
return err return err
} }
utils.StructFieldNotNilReplace(req, resp)
return nil return nil
} }
+2 -1
View File
@@ -279,7 +279,6 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
break break
} }
} }
s.Notification.GroupCreatedNotification(ctx, tips)
if req.GroupInfo.GroupType == constant.SuperGroup { if req.GroupInfo.GroupType == constant.SuperGroup {
go func() { go func() {
for _, userID := range userIDs { for _, userID := range userIDs {
@@ -803,6 +802,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
GroupType: string(group.GroupType), GroupType: string(group.GroupType),
ApplyID: req.InviterUserID, ApplyID: req.InviterUserID,
ReqMessage: req.ReqMessage, ReqMessage: req.ReqMessage,
Ex: req.Ex,
} }
if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil { if err = CallbackApplyJoinGroupBefore(ctx, reqCall); err != nil {
@@ -849,6 +849,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
JoinSource: req.JoinSource, JoinSource: req.JoinSource,
ReqTime: time.Now(), ReqTime: time.Now(),
HandledTime: time.Unix(0, 0), HandledTime: time.Unix(0, 0),
Ex: req.Ex,
} }
if err := s.db.CreateGroupRequest(ctx, []*relationtb.GroupRequestModel{&groupRequest}); err != nil { if err := s.db.CreateGroupRequest(ctx, []*relationtb.GroupRequestModel{&groupRequest}); err != nil {
return nil, err return nil, err
-1
View File
@@ -202,6 +202,5 @@ func CallbackAfterRevokeMsg(ctx context.Context, req *pbchat.RevokeMsgReq) error
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterRevokeMsg); err != nil { if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, callbackReq, resp, config.Config.Callback.CallbackAfterRevokeMsg); err != nil {
return err return err
} }
utils.StructFieldNotNilReplace(req, resp)
return nil return nil
} }
+10
View File
@@ -101,6 +101,16 @@ type thirdServer struct {
defaultExpire time.Duration defaultExpire time.Duration
} }
func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
//TODO implement me
panic("implement me")
}
func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
//TODO implement me
panic("implement me")
}
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) { func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime) err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil { if err != nil {
+184 -16
View File
@@ -17,6 +17,7 @@ package user
import ( import (
"context" "context"
"errors" "errors"
"math/rand"
"strings" "strings"
"time" "time"
@@ -56,22 +57,6 @@ type userServer struct {
RegisterCenter registry.SvcDiscoveryRegistry RegisterCenter registry.SvcDiscoveryRegistry
} }
func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.ProcessUserCommandAddReq) (*pbuser.ProcessUserCommandAddResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.ProcessUserCommandUpdateReq) (*pbuser.ProcessUserCommandUpdateResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.ProcessUserCommandDeleteReq) (*pbuser.ProcessUserCommandDeleteResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) {
return nil, errs.ErrInternalServer.Wrap("not implemented")
}
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error { func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
@@ -88,6 +73,12 @@ func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
for k, v := range config.Config.Manager.UserID { for k, v := range config.Config.Manager.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin}) users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.Manager.Nickname[k], AppMangerLevel: constant.AppAdmin})
} }
if len(config.Config.IMAdmin.UserID) != len(config.Config.IMAdmin.Nickname) {
return errors.New("len(config.Config.AppNotificationAdmin.AppManagerUid) != len(config.Config.AppNotificationAdmin.Nickname)")
}
for k, v := range config.Config.IMAdmin.UserID {
users = append(users, &tablerelation.UserModel{UserID: v, Nickname: config.Config.IMAdmin.Nickname[k], AppMangerLevel: constant.AppNotificationAdmin})
}
userDB, err := mgo.NewUserMongo(mongo.GetDatabase()) userDB, err := mgo.NewUserMongo(mongo.GetDatabase())
if err != nil { if err != nil {
return err return err
@@ -350,3 +341,180 @@ func (s *userServer) GetSubscribeUsersStatus(ctx context.Context,
} }
return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil return &pbuser.GetSubscribeUsersStatusResp{StatusList: onlineStatusList}, nil
} }
// ProcessUserCommandAdd user general function add
func (s *userServer) ProcessUserCommandAdd(ctx context.Context, req *pbuser.ProcessUserCommandAddReq) (*pbuser.ProcessUserCommandAddResp, error) {
// Assuming you have a method in s.UserDatabase to add a user command
err := s.UserDatabase.AddUserCommand(ctx, req.UserID, req.Type, req.Uuid, req.Value)
if err != nil {
return nil, err
}
return &pbuser.ProcessUserCommandAddResp{}, nil
}
// ProcessUserCommandDelete user general function delete
func (s *userServer) ProcessUserCommandDelete(ctx context.Context, req *pbuser.ProcessUserCommandDeleteReq) (*pbuser.ProcessUserCommandDeleteResp, error) {
// Assuming you have a method in s.UserDatabase to delete a user command
err := s.UserDatabase.DeleteUserCommand(ctx, req.UserID, req.Type, req.Uuid)
if err != nil {
return nil, err
}
return &pbuser.ProcessUserCommandDeleteResp{}, nil
}
// ProcessUserCommandUpdate user general function update
func (s *userServer) ProcessUserCommandUpdate(ctx context.Context, req *pbuser.ProcessUserCommandUpdateReq) (*pbuser.ProcessUserCommandUpdateResp, error) {
// Assuming you have a method in s.UserDatabase to update a user command
err := s.UserDatabase.UpdateUserCommand(ctx, req.UserID, req.Type, req.Uuid, req.Value)
if err != nil {
return nil, err
}
return &pbuser.ProcessUserCommandUpdateResp{}, nil
}
func (s *userServer) ProcessUserCommandGet(ctx context.Context, req *pbuser.ProcessUserCommandGetReq) (*pbuser.ProcessUserCommandGetResp, error) {
// Fetch user commands from the database
commands, err := s.UserDatabase.GetUserCommands(ctx, req.UserID, req.Type)
if err != nil {
return nil, err
}
// Initialize commandInfoSlice as an empty slice
commandInfoSlice := make([]*pbuser.CommandInfoResp, 0, len(commands))
for _, command := range commands {
// No need to use index since command is already a pointer
commandInfoSlice = append(commandInfoSlice, &pbuser.CommandInfoResp{
Uuid: command.Uuid,
Value: command.Value,
CreateTime: command.CreateTime,
})
}
// Return the response with the slice
return &pbuser.ProcessUserCommandGetResp{KVArray: commandInfoSlice}, nil
}
func (s *userServer) AddNotificationAccount(ctx context.Context, req *pbuser.AddNotificationAccountReq) (*pbuser.AddNotificationAccountResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
var userID string
for i := 0; i < 20; i++ {
userId := s.genUserID()
_, err := s.UserDatabase.FindWithError(ctx, []string{userId})
if err == nil {
continue
}
userID = userId
break
}
if userID == "" {
return nil, errs.ErrInternalServer.Wrap("gen user id failed")
}
user := &tablerelation.UserModel{
UserID: userID,
Nickname: req.NickName,
FaceURL: req.FaceURL,
CreateTime: time.Now(),
AppMangerLevel: constant.AppNotificationAdmin,
}
if err := s.UserDatabase.Create(ctx, []*tablerelation.UserModel{user}); err != nil {
return nil, err
}
return &pbuser.AddNotificationAccountResp{}, nil
}
func (s *userServer) UpdateNotificationAccountInfo(ctx context.Context, req *pbuser.UpdateNotificationAccountInfoReq) (*pbuser.UpdateNotificationAccountInfoResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
if _, err := s.UserDatabase.FindWithError(ctx, []string{req.UserID}); err != nil {
return nil, errs.ErrArgs.Wrap()
}
user := map[string]interface{}{}
if req.NickName != "" {
user["nickname"] = req.NickName
}
if req.FaceURL != "" {
user["face_url"] = req.FaceURL
}
if err := s.UserDatabase.UpdateByMap(ctx, req.UserID, user); err != nil {
return nil, err
}
return &pbuser.UpdateNotificationAccountInfoResp{}, nil
}
func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.SearchNotificationAccountReq) (*pbuser.SearchNotificationAccountResp, error) {
if err := authverify.CheckIMAdmin(ctx); err != nil {
return nil, err
}
_, users, err := s.UserDatabase.Page(ctx, req.Pagination)
if err != nil {
return nil, err
}
var total int64
accounts := make([]*pbuser.NotificationAccountInfo, 0, len(users))
for _, v := range users {
if v.AppMangerLevel != constant.AppNotificationAdmin {
continue
}
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
}
accounts = append(accounts, temp)
total += 1
}
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: accounts}, nil
}
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (*pbuser.UpdateUserInfoExResp, error) {
//TODO implement me
panic("implement me")
}
func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.GetNotificationAccountReq) (*pbuser.GetNotificationAccountResp, error) {
if req.UserID == "" {
return nil, errs.ErrArgs.Wrap("userID is empty")
}
user, err := s.UserDatabase.GetUserByID(ctx, req.UserID)
if err != nil {
return nil, errs.ErrUserIDNotFound.Wrap()
}
if user.AppMangerLevel == constant.AppAdmin || user.AppMangerLevel == constant.AppNotificationAdmin {
return &pbuser.GetNotificationAccountResp{}, nil
}
return nil, errs.ErrNoPermission.Wrap("notification messages cannot be sent for this ID")
}
func (s *userServer) genUserID() string {
const l = 10
data := make([]byte, l)
rand.Read(data)
chars := []byte("0123456789")
for i := 0; i < len(data); i++ {
if i == 0 {
data[i] = chars[1:][data[i]%9]
} else {
data[i] = chars[data[i]%10]
}
}
return string(data)
}
+1 -1
View File
@@ -87,7 +87,7 @@ type OANotificationElem struct {
NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"` NotificationType int32 `mapstructure:"notificationType" json:"notificationType" validate:"required"`
Text string `mapstructure:"text" json:"text" validate:"required"` Text string `mapstructure:"text" json:"text" validate:"required"`
Url string `mapstructure:"url" json:"url"` Url string `mapstructure:"url" json:"url"`
MixType int32 `mapstructure:"mixType" json:"mixType" validate:"required"` MixType int32 `mapstructure:"mixType" json:"mixType"`
PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"` PictureElem *PictureElem `mapstructure:"pictureElem" json:"pictureElem"`
SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"` SoundElem *SoundElem `mapstructure:"soundElem" json:"soundElem"`
VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"` VideoElem *VideoElem `mapstructure:"videoElem" json:"videoElem"`
+9
View File
@@ -54,6 +54,15 @@ func CheckAdmin(ctx context.Context) error {
} }
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx))) return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not admin userID", mcontext.GetOpUserID(ctx)))
} }
func CheckIMAdmin(ctx context.Context) error {
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.IMAdmin.UserID) {
return nil
}
if utils.IsContain(mcontext.GetOpUserID(ctx), config.Config.Manager.UserID) {
return nil
}
return errs.ErrNoPermission.Wrap(fmt.Sprintf("user %s is not CheckIMAdmin userID", mcontext.GetOpUserID(ctx)))
}
func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) { func ParseRedisInterfaceToken(redisToken any) (*tokenverify.Claims, error) {
return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret()) return tokenverify.GetClaimFromToken(string(redisToken.([]uint8)), Secret())
+1
View File
@@ -148,6 +148,7 @@ type CallbackJoinGroupReq struct {
GroupType string `json:"groupType"` GroupType string `json:"groupType"`
ApplyID string `json:"applyID"` ApplyID string `json:"applyID"`
ReqMessage string `json:"reqMessage"` ReqMessage string `json:"reqMessage"`
Ex string `json:"ex"`
} }
type CallbackJoinGroupResp struct { type CallbackJoinGroupResp struct {
+5
View File
@@ -236,6 +236,11 @@ type configStruct struct {
Nickname []string `yaml:"nickname"` Nickname []string `yaml:"nickname"`
} `yaml:"manager"` } `yaml:"manager"`
IMAdmin struct {
UserID []string `yaml:"userID"`
Nickname []string `yaml:"nickname"`
} `yaml:"im-admin"`
MultiLoginPolicy int `yaml:"multiLoginPolicy"` MultiLoginPolicy int `yaml:"multiLoginPolicy"`
ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"` ChatPersistenceMysql bool `yaml:"chatPersistenceMysql"`
MsgCacheTimeout int `yaml:"msgCacheTimeout"` MsgCacheTimeout int `yaml:"msgCacheTimeout"`
+2 -1
View File
@@ -17,7 +17,6 @@ package convert
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/OpenIMSDK/protocol/sdkws" "github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
@@ -62,6 +61,7 @@ func FriendsDB2Pb(
for _, friendDB := range friendsDB { for _, friendDB := range friendsDB {
userID = append(userID, friendDB.FriendUserID) userID = append(userID, friendDB.FriendUserID)
} }
users, err := getUsers(ctx, userID) users, err := getUsers(ctx, userID)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -74,6 +74,7 @@ func FriendsDB2Pb(
friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL friendPb.FriendUser.FaceURL = users[friend.FriendUserID].FaceURL
friendPb.FriendUser.Ex = users[friend.FriendUserID].Ex friendPb.FriendUser.Ex = users[friend.FriendUserID].Ex
friendPb.CreateTime = friend.CreateTime.Unix() friendPb.CreateTime = friend.CreateTime.Unix()
friendPb.IsPinned = friend.IsPinned
friendsPb = append(friendsPb, friendPb) friendsPb = append(friendsPb, friendPb)
} }
return friendsPb, nil return friendsPb, nil
+1 -1
View File
@@ -64,7 +64,7 @@ func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
"global_recv_msg_opt": user.GlobalRecvMsgOpt, "global_recv_msg_opt": user.GlobalRecvMsgOpt,
} }
for key, value := range fields { for key, value := range fields {
if v, ok := value.(string); ok && v != "" { if v, ok := value.(string); ok {
val[key] = v val[key] = v
} else if v, ok := value.(int32); ok && v != 0 { } else if v, ok := value.(int32); ok && v != 0 {
val[key] = v val[key] = v
+9 -1
View File
@@ -87,7 +87,15 @@ func NewRedis() (redis.UniversalClient, error) {
// overrideConfigFromEnv overrides configuration fields with environment variables if present. // overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv() { func overrideConfigFromEnv() {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" { if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated if envPort := os.Getenv("REDIS_PORT"); envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + envPort
}
config.Config.Redis.Address = addresses
} else {
config.Config.Redis.Address = strings.Split(envAddr, ",")
}
} }
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" { if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Config.Redis.Username = envUser config.Config.Redis.Username = envUser
+7
View File
@@ -58,6 +58,7 @@ type FriendDatabase interface {
FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error)
FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error)
FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error)
UpdateFriendPinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error)
} }
type friendDatabase struct { type friendDatabase struct {
@@ -298,3 +299,9 @@ func (f *friendDatabase) FindFriendUserIDs(ctx context.Context, ownerUserID stri
func (f *friendDatabase) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error) { func (f *friendDatabase) FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error) {
return f.friendRequest.FindBothFriendRequests(ctx, fromUserID, toUserID) return f.friendRequest.FindBothFriendRequests(ctx, fromUserID, toUserID)
} }
func (f *friendDatabase) UpdateFriendPinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error) {
if err := f.friend.UpdatePinStatus(ctx, ownerUserID, friendUserID, isPinned); err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
+25
View File
@@ -50,6 +50,8 @@ type UserDatabase interface {
IsExist(ctx context.Context, userIDs []string) (exist bool, err error) IsExist(ctx context.Context, userIDs []string) (exist bool, err error)
// GetAllUserID Get all user IDs // GetAllUserID Get all user IDs
GetAllUserID(ctx context.Context, pagination pagination.Pagination) (int64, []string, error) GetAllUserID(ctx context.Context, pagination pagination.Pagination) (int64, []string, error)
// Get user by userID
GetUserByID(ctx context.Context, userID string) (user *relation.UserModel, err error)
// InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it // InitOnce Inside the function, first query whether it exists in the db, if it exists, do nothing; if it does not exist, insert it
InitOnce(ctx context.Context, users []*relation.UserModel) (err error) InitOnce(ctx context.Context, users []*relation.UserModel) (err error)
// CountTotal Get the total number of users // CountTotal Get the total number of users
@@ -68,6 +70,12 @@ type UserDatabase interface {
GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) GetUserStatus(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error)
// SetUserStatus Set the user status and store the user status in redis // SetUserStatus Set the user status and store the user status in redis
SetUserStatus(ctx context.Context, userID string, status, platformID int32) error SetUserStatus(ctx context.Context, userID string, status, platformID int32) error
//CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error
UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
GetUserCommands(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error)
} }
type userDatabase struct { type userDatabase struct {
@@ -177,6 +185,10 @@ func (u *userDatabase) GetAllUserID(ctx context.Context, pagination pagination.P
return u.userDB.GetAllUserID(ctx, pagination) return u.userDB.GetAllUserID(ctx, pagination)
} }
func (u *userDatabase) GetUserByID(ctx context.Context, userID string) (user *relation.UserModel, err error) {
return u.userDB.Take(ctx, userID)
}
// CountTotal Get the total number of users. // CountTotal Get the total number of users.
func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) { func (u *userDatabase) CountTotal(ctx context.Context, before *time.Time) (count int64, err error) {
return u.userDB.CountTotal(ctx, before) return u.userDB.CountTotal(ctx, before)
@@ -227,3 +239,16 @@ func (u *userDatabase) GetUserStatus(ctx context.Context, userIDs []string) ([]*
func (u *userDatabase) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error { func (u *userDatabase) SetUserStatus(ctx context.Context, userID string, status, platformID int32) error {
return u.cache.SetUserStatus(ctx, userID, status, platformID) return u.cache.SetUserStatus(ctx, userID, status, platformID)
} }
func (u *userDatabase) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
return u.userDB.AddUserCommand(ctx, userID, Type, UUID, value)
}
func (u *userDatabase) DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error {
return u.userDB.DeleteUserCommand(ctx, userID, Type, UUID)
}
func (u *userDatabase) UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
return u.userDB.UpdateUserCommand(ctx, userID, Type, UUID, value)
}
func (u *userDatabase) GetUserCommands(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error) {
commands, err := u.userDB.GetUserCommand(ctx, userID, Type)
return commands, err
}
+18 -1
View File
@@ -16,7 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
"github.com/OpenIMSDK/tools/pagination" "github.com/OpenIMSDK/tools/pagination"
"go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/options"
@@ -143,3 +143,20 @@ func (f *FriendMgo) FindFriendUserIDs(ctx context.Context, ownerUserID string) (
filter := bson.M{"owner_user_id": ownerUserID} filter := bson.M{"owner_user_id": ownerUserID}
return mgoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1})) return mgoutil.Find[string](ctx, f.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "friend_user_id": 1}))
} }
// UpdatePinStatus update friend's pin status
func (f *FriendMgo) UpdatePinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error) {
filter := bson.M{"owner_user_id": ownerUserID, "friend_user_id": friendUserID}
// Create an update operation to set the "is_pinned" field to isPinned for all documents.
update := bson.M{"$set": bson.M{"is_pinned": isPinned}}
// Perform the update operation for all documents in the collection.
_, err = f.coll.UpdateMany(ctx, filter, update)
if err != nil {
return errs.Wrap(err, "update pin error")
}
return nil
}
+73
View File
@@ -16,6 +16,7 @@ package mgo
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/user"
"time" "time"
"github.com/OpenIMSDK/tools/mgoutil" "github.com/OpenIMSDK/tools/mgoutil"
@@ -87,6 +88,78 @@ func (u *UserMgo) CountTotal(ctx context.Context, before *time.Time) (count int6
return mgoutil.Count(ctx, u.coll, bson.M{"create_time": bson.M{"$lt": before}}) return mgoutil.Count(ctx, u.coll, bson.M{"create_time": bson.M{"$lt": before}})
} }
func (u *UserMgo) AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
collection := u.coll.Database().Collection("userCommands")
// Create a new document instead of updating an existing one
doc := bson.M{
"userID": userID,
"type": Type,
"uuid": UUID,
"createTime": time.Now().Unix(), // assuming you want the creation time in Unix timestamp
"value": value,
}
_, err := collection.InsertOne(ctx, doc)
return err
}
func (u *UserMgo) DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error {
collection := u.coll.Database().Collection("userCommands")
filter := bson.M{"userID": userID, "type": Type, "uuid": UUID}
_, err := collection.DeleteOne(ctx, filter)
return err
}
func (u *UserMgo) UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error {
collection := u.coll.Database().Collection("userCommands")
filter := bson.M{"userID": userID, "type": Type, "uuid": UUID}
update := bson.M{"$set": bson.M{"value": value}}
_, err := collection.UpdateOne(ctx, filter, update)
return err
}
func (u *UserMgo) GetUserCommand(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error) {
collection := u.coll.Database().Collection("userCommands")
filter := bson.M{"userID": userID, "type": Type}
cursor, err := collection.Find(ctx, filter)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
// Initialize commands as a slice of pointers
commands := []*user.CommandInfoResp{}
for cursor.Next(ctx) {
var document struct {
UUID string `bson:"uuid"`
Value string `bson:"value"`
CreateTime int64 `bson:"createTime"`
}
if err := cursor.Decode(&document); err != nil {
return nil, err
}
commandInfo := &user.CommandInfoResp{ // Change here: use a pointer to the struct
Uuid: document.UUID,
Value: document.Value,
CreateTime: document.CreateTime,
}
commands = append(commands, commandInfo)
}
if err := cursor.Err(); err != nil {
return nil, err
}
return commands, nil
}
func (u *UserMgo) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) { func (u *UserMgo) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) {
pipeline := bson.A{ pipeline := bson.A{
bson.M{ bson.M{
+3
View File
@@ -30,6 +30,7 @@ type FriendModel struct {
AddSource int32 `bson:"add_source"` AddSource int32 `bson:"add_source"`
OperatorUserID string `bson:"operator_user_id"` OperatorUserID string `bson:"operator_user_id"`
Ex string `bson:"ex"` Ex string `bson:"ex"`
IsPinned bool `bson:"is_pinned"`
} }
// FriendModelInterface defines the operations for managing friends in MongoDB. // FriendModelInterface defines the operations for managing friends in MongoDB.
@@ -56,4 +57,6 @@ type FriendModelInterface interface {
FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*FriendModel, err error) FindInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*FriendModel, err error)
// FindFriendUserIDs retrieves a list of friend user IDs for a given owner. // FindFriendUserIDs retrieves a list of friend user IDs for a given owner.
FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error)
// UpdatePinStatus update friend's pin status
UpdatePinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error)
} }
+6
View File
@@ -16,6 +16,7 @@ package relation
import ( import (
"context" "context"
"github.com/OpenIMSDK/protocol/user"
"time" "time"
"github.com/OpenIMSDK/tools/pagination" "github.com/OpenIMSDK/tools/pagination"
@@ -60,4 +61,9 @@ type UserModelInterface interface {
CountTotal(ctx context.Context, before *time.Time) (count int64, err error) CountTotal(ctx context.Context, before *time.Time) (count int64, err error)
// 获取范围内用户增量 // 获取范围内用户增量
CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error) CountRangeEverydayTotal(ctx context.Context, start time.Time, end time.Time) (map[string]int64, error)
//CRUD user command
AddUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
DeleteUserCommand(ctx context.Context, userID string, Type int32, UUID string) error
UpdateUserCommand(ctx context.Context, userID string, Type int32, UUID string, value string) error
GetUserCommand(ctx context.Context, userID string, Type int32) ([]*user.CommandInfoResp, error)
} }
@@ -24,7 +24,8 @@ import (
func setupTestEnvironment() { func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim") os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181") os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1")
os.Setenv("ZOOKEEPER_PORT", "12181")
os.Setenv("ZOOKEEPER_USERNAME", "") os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "") os.Setenv("ZOOKEEPER_PASSWORD", "")
} }
@@ -16,91 +16,153 @@ package kubernetes
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"github.com/stathat/consistent"
"os"
"strconv"
"strings"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/OpenIMSDK/tools/discoveryregistry" "github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
// K8sDR represents the Kubernetes service discovery and registration client. // K8sDR represents the Kubernetes service discovery and registration client.
type K8sDR struct { type K8sDR struct {
options []grpc.DialOption options []grpc.DialOption
rpcRegisterAddr string rpcRegisterAddr string
gatewayHostConsistent *consistent.Consistent
} }
// NewK8sDiscoveryRegister creates a new instance of K8sDR for Kubernetes service discovery and registration.
func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) { func NewK8sDiscoveryRegister() (discoveryregistry.SvcDiscoveryRegistry, error) {
gatewayConsistent := consistent.New()
return &K8sDR{}, nil gatewayHosts := getMsgGatewayHost(context.Background())
for _, v := range gatewayHosts {
gatewayConsistent.Add(v)
}
return &K8sDR{gatewayHostConsistent: gatewayConsistent}, nil
} }
// Register registers a service with Kubernetes.
func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error { func (cli *K8sDR) Register(serviceName, host string, port int, opts ...grpc.DialOption) error {
cli.rpcRegisterAddr = serviceName if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
cli.rpcRegisterAddr = serviceName
} else {
cli.rpcRegisterAddr = getSelfHost(context.Background())
}
return nil return nil
} }
// UnRegister removes a service registration from Kubernetes.
func (cli *K8sDR) UnRegister() error { func (cli *K8sDR) UnRegister() error {
return nil return nil
} }
// CreateRpcRootNodes creates root nodes for RPC in Kubernetes.
func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error { func (cli *K8sDR) CreateRpcRootNodes(serviceNames []string) error {
return nil return nil
} }
// RegisterConf2Registry registers a configuration to the registry.
func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error { func (cli *K8sDR) RegisterConf2Registry(key string, conf []byte) error {
return nil return nil
} }
// GetConfFromRegistry retrieves a configuration from the registry.
func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) { func (cli *K8sDR) GetConfFromRegistry(key string) ([]byte, error) {
return nil, nil return nil, nil
} }
func (cli *K8sDR) GetUserIdHashGatewayHost(ctx context.Context, userId string) (string, error) {
// GetConns returns a list of gRPC client connections for a given service. host, err := cli.gatewayHostConsistent.Get(userId)
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) { if err != nil {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) log.ZError(ctx, "GetUserIdHashGatewayHost error", err)
return []*grpc.ClientConn{conn}, err }
return host, err
}
func getSelfHost(ctx context.Context) string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
ns := os.Getenv("MY_POD_NAMESPACE")
statefuleIndex := 0
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
count := len(podInfo)
statefuleIndex, _ = strconv.Atoi(podInfo[count-1])
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, statefuleIndex, instance, ns, port)
return host
} }
// GetConn returns a single gRPC client connection for a given service. // like openimserver-openim-msggateway-0.openimserver-openim-msggateway-headless.openim-lin.svc.cluster.local:88
func getMsgGatewayHost(ctx context.Context) []string {
port := 88
instance := "openimserver"
selfPodName := os.Getenv("MY_POD_NAME")
replicas := os.Getenv("MY_MSGGATEWAY_REPLICACOUNT")
ns := os.Getenv("MY_POD_NAMESPACE")
gatewayEnds := strings.Split(config.Config.RpcRegisterName.OpenImMessageGatewayName, ":")
if len(gatewayEnds) != 2 {
log.ZError(ctx, "msggateway RpcRegisterName is error:config.Config.RpcRegisterName.OpenImMessageGatewayName", errors.New("config error"))
} else {
port, _ = strconv.Atoi(gatewayEnds[1])
}
nReplicas, _ := strconv.Atoi(replicas)
podInfo := strings.Split(selfPodName, "-")
instance = podInfo[0]
var ret []string
for i := 0; i < nReplicas; i++ {
host := fmt.Sprintf("%s-openim-msggateway-%d.%s-openim-msggateway-headless.%s.svc.cluster.local:%d", instance, i, instance, ns, port)
ret = append(ret, host)
}
log.ZInfo(ctx, "getMsgGatewayHost", "instance", instance, "selfPodName", selfPodName, "replicas", replicas, "ns", ns, "ret", ret)
return ret
}
func (cli *K8sDR) GetConns(ctx context.Context, serviceName string, opts ...grpc.DialOption) ([]*grpc.ClientConn, error) {
if serviceName != config.Config.RpcRegisterName.OpenImMessageGatewayName {
conn, err := grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
return []*grpc.ClientConn{conn}, err
} else {
var ret []*grpc.ClientConn
gatewayHosts := getMsgGatewayHost(ctx)
for _, host := range gatewayHosts {
conn, err := grpc.DialContext(ctx, host, append(cli.options, opts...)...)
if err != nil {
return nil, err
} else {
ret = append(ret, conn)
}
}
return ret, nil
}
}
func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { func (cli *K8sDR) GetConn(ctx context.Context, serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...) return grpc.DialContext(ctx, serviceName, append(cli.options, opts...)...)
} }
// GetSelfConnTarget returns the connection target of the client itself.
func (cli *K8sDR) GetSelfConnTarget() string { func (cli *K8sDR) GetSelfConnTarget() string {
return cli.rpcRegisterAddr return cli.rpcRegisterAddr
} }
// AddOption adds gRPC dial options to the client.
func (cli *K8sDR) AddOption(opts ...grpc.DialOption) { func (cli *K8sDR) AddOption(opts ...grpc.DialOption) {
cli.options = append(cli.options, opts...) cli.options = append(cli.options, opts...)
} }
// CloseConn closes a given gRPC client connection.
func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) { func (cli *K8sDR) CloseConn(conn *grpc.ClientConn) {
conn.Close() conn.Close()
} }
// do not use this method for call rpc. // do not use this method for call rpc
func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn { func (cli *K8sDR) GetClientLocalConns() map[string][]*grpc.ClientConn {
fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!") fmt.Println("should not call this function!!!!!!!!!!!!!!!!!!!!!!!!!")
return nil return nil
} }
// Close closes the K8sDR client.
func (cli *K8sDR) Close() { func (cli *K8sDR) Close() {
// Close any open resources here (if applicable)
return return
} }
@@ -52,10 +52,18 @@ func getEnv(key, fallback string) string {
return fallback return fallback
} }
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value. // getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string { func getZkAddrFromEnv(fallback []string) []string {
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists { address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS")
return strings.Split(value, ",") port, portExists := os.LookupEnv("ZOOKEEPER_PORT")
if addrExists && portExists {
addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + port
}
return addresses
} }
return fallback return fallback
} }
+2 -2
View File
@@ -112,7 +112,6 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
//v.Set(constant.CallbackCommand, command) //v.Set(constant.CallbackCommand, command)
//url = url + "/" + v.Encode() //url = url + "/" + v.Encode()
url = url + "/" + command url = url + "/" + command
b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut) b, err := Post(ctx, url, nil, input, callbackConfig.CallbackTimeOut)
if err != nil { if err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
@@ -121,13 +120,14 @@ func callBackPostReturn(ctx context.Context, url, command string, input interfac
} }
return errs.ErrNetwork.Wrap(err.Error()) return errs.ErrNetwork.Wrap(err.Error())
} }
defer log.ZDebug(ctx, "callback", "data", string(b))
if err = json.Unmarshal(b, output); err != nil { if err = json.Unmarshal(b, output); err != nil {
if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue { if callbackConfig.CallbackFailedContinue != nil && *callbackConfig.CallbackFailedContinue {
log.ZWarn(ctx, "callback failed but continue", err, "url", url) log.ZWarn(ctx, "callback failed but continue", err, "url", url)
return nil return nil
} }
return errs.ErrData.Wrap(err.Error()) return errs.ErrData.WithDetail(err.Error() + "response format error")
} }
return output.Parse() return output.Parse()
+6 -7
View File
@@ -31,15 +31,14 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
const ( const maxRetry = 10 // number of retries
maxRetry = 10 // Maximum number of retries for producer creation
)
var errEmptyMsg = errors.New("binary msg is empty") var errEmptyMsg = errors.New("kafka binary msg is empty")
// Producer represents a Kafka producer.
type Producer struct { type Producer struct {
topic string
addr []string addr []string
topic string
config *sarama.Config config *sarama.Config
producer sarama.SyncProducer producer sarama.SyncProducer
} }
@@ -68,7 +67,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
// Get Kafka configuration from environment variables or fallback to config file // Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username) kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password) kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function
// Configure SASL authentication if credentials are provided // Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" { if kafkaUsername != "" && kafkaPassword != "" {
@@ -78,7 +77,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
} }
// Set the Kafka address // Set the Kafka address
p.addr = []string{kafkaAddr} p.addr = kafkaAddr
// Set up TLS configuration (if required) // Set up TLS configuration (if required)
SetupTLSConfig(p.config) SetupTLSConfig(p.config)
+19
View File
@@ -15,7 +15,9 @@
package kafka package kafka
import ( import (
"fmt"
"os" "os"
"strings"
"github.com/IBM/sarama" "github.com/IBM/sarama"
@@ -44,3 +46,20 @@ func getEnvOrConfig(envName string, configValue string) string {
} }
return configValue return configValue
} }
// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getKafkaAddrFromEnv(fallback []string) []string {
envAddr := os.Getenv("KAFKA_ADDRESS")
envPort := os.Getenv("KAFKA_PORT")
if envAddr != "" && envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = fmt.Sprintf("%s:%s", addr, envPort)
}
return addresses
}
return fallback
}
+7
View File
@@ -179,3 +179,10 @@ func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status
}) })
return err return err
} }
func (u *UserRpcClient) GetNotificationByID(ctx context.Context, userID string) error {
_, err := u.Client.GetNotificationAccount(ctx, &user.GetNotificationAccountReq{
UserID: userID,
})
return err
}
+4 -5
View File
@@ -33,21 +33,20 @@ openim::log::info "\n# Begin to check all openim service"
# OpenIM status # OpenIM status
# Elegant printing function # Elegant printing function
print_services_and_ports() { print_services_and_ports() {
declare -g service_names=("${!1}") service_names=("$1[@]")
declare -g service_ports=("${!2}") service_ports=("$2[@]")
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
echo "| Service Name | Port |" echo "| Service Name | Port |"
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
for index in "${!service_names[@]}"; do for index in "${!service_names}"; do
printf "| %-23s | %-8s |\n" "${service_names[$index]}" "${service_ports[$index]}" printf "| %-23s | %-8s |\n" "${!service_names[$index]}" "${!service_ports[$index]}"
done done
echo "+-------------------------+----------+" echo "+-------------------------+----------+"
} }
# Print out services and their ports # Print out services and their ports
print_services_and_ports OPENIM_SERVER_NAME_TARGETS OPENIM_SERVER_PORT_TARGETS print_services_and_ports OPENIM_SERVER_NAME_TARGETS OPENIM_SERVER_PORT_TARGETS
+1 -1
View File
@@ -102,7 +102,7 @@ print_color "Deleted Files: ${deleted_files}" "${BACKGROUND_GREEN}"
if [[ ! $local_branch =~ $valid_branch_regex ]] if [[ ! $local_branch =~ $valid_branch_regex ]]
then then
printError "There is something wrong with your branch name. Branch names in this project must adhere to this contract: $valid_branch_regex. printError "There is something wrong with your branch name. Branch names in this project must adhere to this contract: $valid_branch_regex.
Your commit will be rejected. You should rename your branch to a valid name(feat/name OR bug/name) and try again." Your commit will be rejected. You should rename your branch to a valid name(feat/name OR fix/name) and try again."
printError "For more on this, read on: https://gist.github.com/cubxxw/126b72104ac0b0ca484c9db09c3e5694" printError "For more on this, read on: https://gist.github.com/cubxxw/126b72104ac0b0ca484c9db09c3e5694"
exit 1 exit 1
fi fi
+19 -14
View File
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# This script automatically initializes various configuration files and can generate example files. # This script automatically initializes various configuration files and can generate example files.
set -o errexit set -o errexit
@@ -111,6 +110,7 @@ generate_example_files() {
local example_file="${COPY_EXAMPLES[$template]}" local example_file="${COPY_EXAMPLES[$template]}"
process_file "$template" "$example_file" false process_file "$template" "$example_file" false
done done
} }
# Function to process a single file, either by generating or copying # Function to process a single file, either by generating or copying
@@ -146,10 +146,17 @@ process_file() {
openim::log::error "genconfig.sh script not found" openim::log::error "genconfig.sh script not found"
exit 1 exit 1
fi fi
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || { if [[ -n "${env_cmd}" ]]; then
openim::log::error "Error processing template file ${template}" eval "$env_cmd ${OPENIM_ROOT}/scripts/genconfig.sh '${ENV_FILE}' '${template}' > '${output_file}'" || {
exit 1 openim::log::error "Error processing template file ${template}"
} exit 1
}
else
"${OPENIM_ROOT}/scripts/genconfig.sh" "${ENV_FILE}" "${template}" > "${output_file}" || {
openim::log::error "Error processing template file ${template}"
exit 1
}
fi
else else
openim::log::info "📋 Copying ${template} to ${output_file}..." openim::log::info "📋 Copying ${template} to ${output_file}..."
cp "${template}" "${output_file}" || { cp "${template}" "${output_file}" || {
@@ -161,15 +168,10 @@ process_file() {
sleep 0.5 sleep 0.5
} }
declare -A env_vars=(
["OPENIM_IP"]="172.28.0.1"
["DATA_DIR"]="./"
["LOG_STORAGE_LOCATION"]="../logs/"
)
# Function to clean configuration files
clean_config_files() { clean_config_files() {
for output_file in "${TEMPLATES[@]}"; do local all_templates=("${TEMPLATES[@]}" "${COPY_TEMPLATES[@]}")
for output_file in "${all_templates[@]}"; do
if [[ -f "${output_file}" ]]; then if [[ -f "${output_file}" ]]; then
rm -f "${output_file}" rm -f "${output_file}"
openim::log::info "Removed configuration file: ${output_file}" openim::log::info "Removed configuration file: ${output_file}"
@@ -179,7 +181,10 @@ clean_config_files() {
# Function to clean example files # Function to clean example files
clean_example_files() { clean_example_files() {
for example_file in "${EXAMPLES[@]}"; do # 合并 EXAMPLES 和 COPY_EXAMPLES 数组
local all_examples=("${EXAMPLES[@]}" "${COPY_EXAMPLES[@]}")
for example_file in "${all_examples[@]}"; do
if [[ -f "${example_file}" ]]; then if [[ -f "${example_file}" ]]; then
rm -f "${example_file}" rm -f "${example_file}"
openim::log::info "Removed example file: ${example_file}" openim::log::info "Removed example file: ${example_file}"
+5 -3
View File
@@ -66,8 +66,8 @@ def "ENV_FILE" ""${OPENIM_ROOT}"/scripts/install/environment.sh"
###################### Docker compose ################### ###################### Docker compose ###################
# OPENIM AND CHAT # OPENIM AND CHAT
def "CHAT_BRANCH" "main" def "CHAT_IMAGE_VERSION" "main"
def "SERVER_BRANCH" "main" def "SERVER_IMAGE_VERSION" "main"
# Choose the appropriate image address, the default is GITHUB image, # Choose the appropriate image address, the default is GITHUB image,
# you can choose docker hub, for Chinese users can choose Ali Cloud # you can choose docker hub, for Chinese users can choose Ali Cloud
@@ -139,7 +139,7 @@ readonly API_OPENIM_PORT=${API_OPENIM_PORT:-'10002'}
def "API_LISTEN_IP" "0.0.0.0" # API的监听IP def "API_LISTEN_IP" "0.0.0.0" # API的监听IP
###################### openim-chat 配置信息 ###################### ###################### openim-chat 配置信息 ######################
def "OPENIM_CHAT_DATA_DIR" "./openim-chat/${CHAT_BRANCH}" def "OPENIM_CHAT_DATA_DIR" "./openim-chat/${CHAT_IMAGE_VERSION}"
def "OPENIM_CHAT_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # OpenIM服务地址 def "OPENIM_CHAT_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # OpenIM服务地址
def "OPENIM_CHAT_API_PORT" "10008" # OpenIM API端口 def "OPENIM_CHAT_API_PORT" "10008" # OpenIM API端口
def "CHAT_API_LISTEN_IP" "" # OpenIM API的监听IP def "CHAT_API_LISTEN_IP" "" # OpenIM API的监听IP
@@ -353,6 +353,8 @@ def "MANAGER_USERID_3" "openIMAdmin" # 管理员ID 3
def "NICKNAME_1" "system1" # 昵称1 def "NICKNAME_1" "system1" # 昵称1
def "NICKNAME_2" "system2" # 昵称2 def "NICKNAME_2" "system2" # 昵称2
def "NICKNAME_3" "system3" # 昵称3 def "NICKNAME_3" "system3" # 昵称3
def "IM_ADMIN_USERID" "imAdmin" # IM管理员ID
def "IM_ADMIN_NAME" "imAdmin" # IM管理员昵称
def "MULTILOGIN_POLICY" "1" # 多登录策略 def "MULTILOGIN_POLICY" "1" # 多登录策略
def "CHAT_PERSISTENCE_MYSQL" "true" # 聊天持久化MySQL def "CHAT_PERSISTENCE_MYSQL" "true" # 聊天持久化MySQL
def "MSG_CACHE_TIMEOUT" "86400" # 消息缓存超时 def "MSG_CACHE_TIMEOUT" "86400" # 消息缓存超时
+47 -3
View File
@@ -70,14 +70,16 @@ function openim::test::auth() {
#################################### Auth Module #################################### #################################### Auth Module ####################################
# Define a function to get a token (Admin Token) # Define a function to get a token for a specific user
openim::test::get_token() { openim::test::get_token() {
local user_id="${1:-openIM123456}" # Default user ID if not provided
token_response=$(${CCURL} "${OperationID}" "${Header}" ${INSECURE_OPENIMAPI}/auth/user_token \ token_response=$(${CCURL} "${OperationID}" "${Header}" ${INSECURE_OPENIMAPI}/auth/user_token \
-d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "openIM123456"}') -d'{"secret": "'"$SECRET"'","platformID": 1,"userID": "'$user_id'"}')
token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+') token=$(echo $token_response | grep -Po 'token[" :]+\K[^"]+')
echo "$token" echo "$token"
} }
Header="-HContent-Type: application/json" Header="-HContent-Type: application/json"
OperationID="-HoperationID: 1646445464564" OperationID="-HoperationID: 1646445464564"
Token="-Htoken: $(openim::test::get_token)" Token="-Htoken: $(openim::test::get_token)"
@@ -530,6 +532,36 @@ EOF
openim::test::check_error "$response" openim::test::check_error "$response"
} }
# Updates the pin status of multiple friends.
openim::test::update_pin_status() {
local ownerUserID="${1}"
shift # Shift the arguments to skip the first one (ownerUserID)
local isPinned="${1}"
shift # Shift the arguments to skip the isPinned argument
# Constructing the list of friendUserIDs
local friendUserIDsArray=()
for friendUserID in "$@"; do
friendUserIDsArray+=("\"${friendUserID}\"")
done
local friendUserIDs=$(IFS=,; echo "${friendUserIDsArray[*]}")
local request_body=$(cat <<EOF
{
"ownerUserID": "${ownerUserID}",
"friendUserIDs": [${friendUserIDs}],
"isPinned": ${isPinned}
}
EOF
)
echo "Requesting to update pin status: $request_body"
local response=$(${CCURL} "${Token}" "${OperationID}" "${Header}" "${INSECURE_OPENIMAPI}/friend/update_pin_status" -d "${request_body}")
echo "Response: $response"
openim::test::check_error "$response"
}
# [openim::test::friend function description] # [openim::test::friend function description]
# The `openim::test::friend` function serves as a test suite for friend-related operations. # The `openim::test::friend` function serves as a test suite for friend-related operations.
# It sequentially invokes all friend-related test functions to ensure the API's friend operations are functioning correctly. # It sequentially invokes all friend-related test functions to ensure the API's friend operations are functioning correctly.
@@ -542,17 +574,22 @@ function openim::test::friend() {
openim::test::user_register "${TEST_USER_ID}" "user01" "new_face_url" openim::test::user_register "${TEST_USER_ID}" "user01" "new_face_url"
openim::test::user_register "${FRIEND_USER_ID}" "frient01" "new_face_url" openim::test::user_register "${FRIEND_USER_ID}" "frient01" "new_face_url"
openim::test::user_register "${BLACK_USER_ID}" "frient02" "new_face_url" openim::test::user_register "${BLACK_USER_ID}" "frient02" "new_face_url"
# 1. Check if two users are friends. # 1. Check if two users are friends.
openim::test::is_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}" openim::test::is_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}"
# 2. Send a friend request from one user to another. # 2. Send a friend request from one user to another.
openim::test::add_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}" openim::test::add_friend "${TEST_USER_ID}" "${FRIEND_USER_ID}"
local original_token=$Token
# Switch to FRIEND_USER_ID's token
local friend_token="-Htoken: $(openim::test::get_token "${FRIEND_USER_ID}")"
# 3. Respond to a friend request. # 3. Respond to a friend request.
# TODO # TODO
# openim::test::add_friend_response "${FRIEND_USER_ID}" "${TEST_USER_ID}" # openim::test::add_friend_response "${FRIEND_USER_ID}" "${TEST_USER_ID}"
Token=$original_token
# 4. Retrieve the friend list of the test user. # 4. Retrieve the friend list of the test user.
openim::test::get_friend_list "${TEST_USER_ID}" openim::test::get_friend_list "${TEST_USER_ID}"
@@ -583,6 +620,13 @@ function openim::test::friend() {
# TODO # TODO
# openim::test::import_friend "${TEST_USER_ID}" "11111114" "11111115" # openim::test::import_friend "${TEST_USER_ID}" "11111114" "11111115"
# 13. pin Friend
# Add this call to your test suite where appropriate
# TODO
# openim::test::update_pin_status "${TEST_USER_ID}" true "${FRIEND_USER_ID}"
#
# openim::test::update_pin_status "${TEST_USER_ID}" false "${FRIEND_USER_ID}"
# Log the completion of the friend test suite. # Log the completion of the friend test suite.
openim::log::success "Friend test suite completed successfully." openim::log::success "Friend test suite completed successfully."
} }
+1 -1
View File
@@ -20,7 +20,7 @@ OPENIM_VERBOSE="${OPENIM_VERBOSE:-5}"
ENABLE_LOGGING="${ENABLE_LOGGING:-true}" ENABLE_LOGGING="${ENABLE_LOGGING:-true}"
# If OPENIM_OUTPUT is not set, set it to the default value # If OPENIM_OUTPUT is not set, set it to the default value
if [[ ! -v OPENIM_OUTPUT ]]; then if [ -z "${OPENIM_OUTPUT+x}" ]; then
OPENIM_OUTPUT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../_output" && pwd -P)" OPENIM_OUTPUT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../_output" && pwd -P)"
fi fi
+20 -3
View File
@@ -302,8 +302,12 @@ openim::util::check_ports() {
# Iterate over each given port. # Iterate over each given port.
for port in "$@"; do for port in "$@"; do
# Use the `ss` command to find process information related to the given port. # Use the `ss` command to find process information related to the given port.
local info=$(ss -ltnp | grep ":$port" || true) if command -v ss > /dev/null 2>&1; then
info=$(ss -ltnp | grep ":$port" || true)
else
info=$(netstat -ltnp | grep ":$port" || true)
fi
# If there's no process information, it means the process associated with the port is not running. # If there's no process information, it means the process associated with the port is not running.
if [[ -z $info ]]; then if [[ -z $info ]]; then
not_started+=($port) not_started+=($port)
@@ -364,6 +368,18 @@ openim::util::check_ports() {
# openim::util::check_process_names nginx mysql redis # openim::util::check_process_names nginx mysql redis
# The function returns a status of 1 if any of the processes is not running. # The function returns a status of 1 if any of the processes is not running.
openim::util::check_process_names() { openim::util::check_process_names() {
# Function to get the port of a process
get_port() {
local pid=$1
if command -v ss > /dev/null 2>&1; then
# used ss comment
ss -ltnp 2>/dev/null | grep $pid | awk '{print $4}' | cut -d ':' -f2
else
# used netstat comment replace ss
netstat -ltnp 2>/dev/null | grep $pid | awk '{print $4}' | sed 's/.*://'
fi
}
# Arrays to collect details of processes # Arrays to collect details of processes
local not_started=() local not_started=()
local started=() local started=()
@@ -382,7 +398,7 @@ openim::util::check_process_names() {
for pid in "${pids[@]}"; do for pid in "${pids[@]}"; do
local command=$(ps -p $pid -o cmd=) local command=$(ps -p $pid -o cmd=)
local start_time=$(ps -p $pid -o lstart=) local start_time=$(ps -p $pid -o lstart=)
local port=$(ss -ltnp 2>/dev/null | grep $pid | awk '{print $4}' | cut -d ':' -f2) local port=$(get_port $pid)
# Check if port information was found for the PID # Check if port information was found for the PID
if [[ -z $port ]]; then if [[ -z $port ]]; then
@@ -419,6 +435,7 @@ openim::util::check_process_names() {
return 0 return 0
fi fi
} }
# openim::util::check_process_names docker-pr # openim::util::check_process_names docker-pr
# The `openim::util::stop_services_on_ports` function stops services running on specified ports. # The `openim::util::stop_services_on_ports` function stops services running on specified ports.
+42 -30
View File
@@ -72,7 +72,7 @@ func initCfg() error {
type checkFunc struct { type checkFunc struct {
name string name string
function func() error function func() (string, error)
} }
func main() { func main() {
@@ -101,13 +101,13 @@ func main() {
allSuccess := true allSuccess := true
for _, check := range checks { for _, check := range checks {
err := check.function() str, err := check.function()
if err != nil { if err != nil {
errorPrint(fmt.Sprintf("Starting %s failed: %v", check.name, err)) errorPrint(fmt.Sprintf("Starting %s failed, %v", check.name, err))
allSuccess = false allSuccess = false
break break
} else { } else {
successPrint(fmt.Sprintf("%s starts successfully", check.name)) successPrint(fmt.Sprintf("%s connected successfully, %s", check.name, str))
} }
} }
@@ -142,21 +142,22 @@ func getEnv(key, fallback string) string {
} }
// checkMongo checks the MongoDB connection // checkMongo checks the MongoDB connection
func checkMongo() error { func checkMongo() (string, error) {
// Use environment variables or fallback to config // Use environment variables or fallback to config
uri := getEnv("MONGO_URI", buildMongoURI()) uri := getEnv("MONGO_URI", buildMongoURI())
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
str := "ths addr is:" + strings.Join(config.Config.Mongo.Address, ",")
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer client.Disconnect(context.TODO()) defer client.Disconnect(context.TODO())
if err = client.Ping(context.TODO(), nil); err != nil { if err = client.Ping(context.TODO(), nil); err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
return nil return str, nil
} }
// buildMongoURI constructs the MongoDB URI using configuration settings // buildMongoURI constructs the MongoDB URI using configuration settings
@@ -178,10 +179,10 @@ func buildMongoURI() string {
} }
// checkMinio checks the MinIO connection // checkMinio checks the MinIO connection
func checkMinio() error { func checkMinio() (string, error) {
// Check if MinIO is enabled // Check if MinIO is enabled
if config.Config.Object.Enable != "minio" { if config.Config.Object.Enable != "minio" {
return nil return "", nil
} }
// Prioritize environment variables // Prioritize environment variables
@@ -191,13 +192,14 @@ func checkMinio() error {
useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default useSSL := getEnv("MINIO_USE_SSL", "false") // Assuming SSL is not used by default
if endpoint == "" || accessKeyID == "" || secretAccessKey == "" { if endpoint == "" || accessKeyID == "" || secretAccessKey == "" {
return ErrConfig.Wrap("MinIO configuration missing") return "", ErrConfig.Wrap("MinIO configuration missing")
} }
// Parse endpoint URL to determine if SSL is enabled // Parse endpoint URL to determine if SSL is enabled
u, err := url.Parse(endpoint) u, err := url.Parse(endpoint)
if err != nil { if err != nil {
return errs.Wrap(err) str := "the endpoint is:" + endpoint
return "", errs.Wrap(errStr(err, str))
} }
secure := u.Scheme == "https" || useSSL == "true" secure := u.Scheme == "https" || useSSL == "true"
@@ -206,31 +208,34 @@ func checkMinio() error {
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: secure, Secure: secure,
}) })
str := "ths addr is:" + u.Host
if err != nil { if err != nil {
return errs.Wrap(err) strs := fmt.Sprintf("%v;host:%s,accessKeyID:%s,secretAccessKey:%s,Secure:%v", err, u.Host, accessKeyID, secretAccessKey, secure)
return "", errs.Wrap(err, strs)
} }
// Perform health check // Perform health check
cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second) cancel, err := minioClient.HealthCheck(time.Duration(minioHealthCheckDuration) * time.Second)
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer cancel() defer cancel()
if minioClient.IsOffline() { if minioClient.IsOffline() {
return ErrComponentStart.Wrap("Minio server is offline") str := fmt.Sprintf("Minio server is offline;%s", str)
return "", ErrComponentStart.Wrap(str)
} }
// Check for localhost in API URL and Minio SignEndpoint // Check for localhost in API URL and Minio SignEndpoint
if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" { if exactIP(config.Config.Object.ApiURL) == "127.0.0.1" || exactIP(config.Config.Object.Minio.SignEndpoint) == "127.0.0.1" {
return ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1") return "", ErrConfig.Wrap("apiURL or Minio SignEndpoint endpoint contain 127.0.0.1")
} }
return nil return str, nil
} }
// checkRedis checks the Redis connection // checkRedis checks the Redis connection
func checkRedis() error { func checkRedis() (string, error) {
// Prioritize environment variables // Prioritize environment variables
address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ",")) address := getEnv("REDIS_ADDRESS", strings.Join(config.Config.Redis.Address, ","))
username := getEnv("REDIS_USERNAME", config.Config.Redis.Username) username := getEnv("REDIS_USERNAME", config.Config.Redis.Username)
@@ -259,15 +264,16 @@ func checkRedis() error {
// Ping Redis to check connectivity // Ping Redis to check connectivity
_, err := redisClient.Ping(context.Background()).Result() _, err := redisClient.Ping(context.Background()).Result()
str := "the addr is:" + strings.Join(redisAddresses, ",")
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
return nil return str, nil
} }
// checkZookeeper checks the Zookeeper connection // checkZookeeper checks the Zookeeper connection
func checkZookeeper() error { func checkZookeeper() (string, error) {
// Prioritize environment variables // Prioritize environment variables
schema := getEnv("ZOOKEEPER_SCHEMA", "digest") schema := getEnv("ZOOKEEPER_SCHEMA", "digest")
address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ",")) address := getEnv("ZOOKEEPER_ADDRESS", strings.Join(config.Config.Zookeeper.ZkAddr, ","))
@@ -278,30 +284,31 @@ func checkZookeeper() error {
zookeeperAddresses := strings.Split(address, ",") zookeeperAddresses := strings.Split(address, ",")
// Connect to Zookeeper // Connect to Zookeeper
str := "the addr is:" + address
c, _, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary c, _, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer c.Close() defer c.Close()
// Set authentication if username and password are provided // Set authentication if username and password are provided
if username != "" && password != "" { if username != "" && password != "" {
if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil { if err := c.AddAuth(schema, []byte(username+":"+password)); err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
} }
// Check if Zookeeper is reachable // Check if Zookeeper is reachable
_, _, err = c.Get("/") _, _, err = c.Get("/")
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(err, str)
} }
return nil return str, nil
} }
// checkKafka checks the Kafka connection // checkKafka checks the Kafka connection
func checkKafka() error { func checkKafka() (string, error) {
// Prioritize environment variables // Prioritize environment variables
username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username) username := getEnv("KAFKA_USERNAME", config.Config.Kafka.Username)
password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password) password := getEnv("KAFKA_PASSWORD", config.Config.Kafka.Password)
@@ -321,16 +328,17 @@ func checkKafka() error {
// kafka.SetupTLSConfig(cfg) // kafka.SetupTLSConfig(cfg)
// Create Kafka client // Create Kafka client
str := "the addr is:" + address
kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg) kafkaClient, err := sarama.NewClient(kafkaAddresses, cfg)
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(errStr(err, str))
} }
defer kafkaClient.Close() defer kafkaClient.Close()
// Verify if necessary topics exist // Verify if necessary topics exist
topics, err := kafkaClient.Topics() topics, err := kafkaClient.Topics()
if err != nil { if err != nil {
return errs.Wrap(err) return "", errs.Wrap(err)
} }
requiredTopics := []string{ requiredTopics := []string{
@@ -341,11 +349,11 @@ func checkKafka() error {
for _, requiredTopic := range requiredTopics { for _, requiredTopic := range requiredTopics {
if !isTopicPresent(requiredTopic, topics) { if !isTopicPresent(requiredTopic, topics) {
return ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic)) return "", ErrComponentStart.Wrap(fmt.Sprintf("Kafka doesn't contain topic: %v", requiredTopic))
} }
} }
return nil return str, nil
} }
// isTopicPresent checks if a topic is present in the list of topics // isTopicPresent checks if a topic is present in the list of topics
@@ -373,3 +381,7 @@ func successPrint(s string) {
func warningPrint(s string) { func warningPrint(s string) {
colorPrint(colorYellow, "Warning: But %v", s) colorPrint(colorYellow, "Warning: But %v", s)
} }
func errStr(err error, str string) error {
return fmt.Errorf("%v;%s", err, str)
}
-12
View File
@@ -21,22 +21,10 @@ import (
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
) )
func TestCheckMysql(t *testing.T) {
err := mockInitCfg()
assert.NoError(t, err, "Initialization should not produce errors")
err = checkMysql()
if err != nil {
// You might expect an error if MySQL isn't running locally with the mock credentials.
t.Logf("Expected error due to mock configuration: %v", err)
}
}
// Mock for initCfg for testing purpose // Mock for initCfg for testing purpose
func mockInitCfg() error { func mockInitCfg() error {
config.Config.Mysql.Username = "root" config.Config.Mysql.Username = "root"