Compare commits

...

13 Commits

Author SHA1 Message Date
Xinwei Xiong bed112d037 Update check-all.sh (#1643) 2023-12-30 03:12:19 +00:00
xuexihuang a7138cb3b7 feature:grafana web inside (#1636)
* feature:grafana web inside

* fix: using protocal v0.0.42

* fix: review values(but not use)
2023-12-30 02:40:42 +00:00
Xinwei Xiong 959c5a7ae1 feat: add openim scripts fix and optimize config about openim source code deployment for this PR? openim ci and scripts (#1641)
* fix: fix openim zk env set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix openim zk env set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker prom

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker prom

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-29 22:29:52 +08:00
Xinwei Xiong 1e52376d7b fix: fix openim scripts and ci add openim check (#1632)
* fix: fix openim zk env set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix openim zk env set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* feat: add openim docker prom

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-29 12:43:58 +00:00
AndrewZuo01 c68a61d8c2 fix setUserInfoEx (#1635)
* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* fix bugs

* fix bugs

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* Update go.mod

* Update friend.go

* debug

* debug

* debug

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* I cannot solve todo in test.sh

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* Update go.mod

* fix group notification

* fix group notification

* update openimsdk tools

* update openim server remove duplicate code

* update openim server remove duplicate code

* update user command get

* update user command get

* update response of callback response error

* update black ex

* update join group ex

* update user pb2map

* update go sum

* update go sum

* update updateUserInfoEx

* update updateUserInfoEx

* update updateUserInfoEx add callback functions

* fix dismiss group function

* fix dismiss group function

* fix dismiss group function

* fix dismiss group function

* update pin friend to update friend

* fix go mod

* fix err golangci-lint

* fix UserPb2DBMap

* update comments, update go.mod check for register username

* update comments, update go.mod check for register username

* update comments, update go.mod check for register username

* update comments, update go.mod check for register username

* fix callback

* fix go.mod

* fix debug

* fix bugs

* update notification

* update notification

* update notification

* update notification

* update notification

* update notification

* update notification

* update notification

* fix updateUserInfoEx

* fix updateUserInfoEx

* modify go.mod

* fix updateUserInfoEx

* fix updateUserInfoEx

---------

Co-authored-by: Xinwei Xiong <3293172751@qq.com>
2023-12-29 20:38:43 +08:00
Brabem 7ddb84f7fc fix: add the GetconversationAPI 1542 (#1604)
* feat: add GetConversationsUnreadSeqAndMaxSeq func

* feat: add GetConversationsUnreadSeqAndMaxSeq func

* feat: add GetConversationsUnreadSeqAndMaxSeq func

* fix: fix the conflect

* feat: add GetConversationList Api

* fix: fix the error

* fix: move the GetConversationList to conversation folder

* fix: fix the go.mod

* fix: add InitiateFormData and CompleteFormData

* fix: fix the error

* fix: find error

* fix: test

* feat: add notification API

* fix: find error

* fix: fix the error

* fix: fix the PinFriend error

* fix: fix the Ex error

* fix: find the error

* fix: fix the rpc error

* fix: fix the error

* fix: fix the log error

* fix: fix the error1

* fix: fix the error

* fix: fix the script

* fix: fix the error

* fix: fix the error

* fix: fix the error of tag

* fix: fix the protocol

* fix: fix the error

* fix: fix the error

* fix: fix the err not wrap

* fix: fix the error

* fix: fix GetGroupMembers by nickname

* fix: fix the error

* fix: fix the FindOneByDocIDs

* fix: fix the protocol version

* fix: fit the protocol version
2023-12-29 18:14:15 +08:00
a3d21 cfde7bb0ff fix: mongo uri connect (#1611) 2023-12-29 02:25:03 +00:00
AndrewZuo01 998d4a3504 Add updates friend, set user ex (#1592)
* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* update set pin friends

* fix bugs

* fix bugs

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* debug

* Update go.mod

* Update friend.go

* debug

* debug

* debug

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* add pin friend test

* I cannot solve todo in test.sh

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* update user command

* Update go.mod

* fix group notification

* fix group notification

* update openimsdk tools

* update openim server remove duplicate code

* update openim server remove duplicate code

* update user command get

* update user command get

* update response of callback response error

* update black ex

* update join group ex

* update user pb2map

* update go sum

* update go sum

* update updateUserInfoEx

* update updateUserInfoEx

* update updateUserInfoEx add callback functions

* fix dismiss group function

* fix dismiss group function

* fix dismiss group function

* fix dismiss group function

* update pin friend to update friend

* fix go mod

* fix err golangci-lint

* fix UserPb2DBMap

* update comments, update go.mod check for register username

* update comments, update go.mod check for register username

* update comments, update go.mod check for register username

* update comments, update go.mod check for register username

* fix callback

* fix go.mod

* fix debug

* fix bugs

* update notification

* update notification

* update notification

* update notification

* update notification

* update notification

* update notification

* update notification

---------

Co-authored-by: Xinwei Xiong <3293172751@qq.com>
2023-12-28 06:45:27 +00:00
Xinwei Xiong b90b8a1a55 fix: fix openim zk env set (#1623)
* fix: fix openim zk env set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

* fix: fix openim zk env set

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>

---------

Signed-off-by: Xinwei Xiong (cubxxw) <3293172751nss@gmail.com>
2023-12-28 06:17:27 +00:00
Gordon 53a3f475f3 fix: add notifications for some notifications. (#1609) 2023-12-27 16:41:51 +00:00
Brabem cff90a3099 fix: fix the searchNotificationAccout by userID or nickname (#1617)
* feat: add notification API

* fix: fix the script

* fix: fix the error

* fix: fix the searchNotificationAccount

* fix: fix the protocol version

* fix: fix the proto version
2023-12-27 16:29:44 +00:00
Xinwei Xiong 2220645429 Update README.md (#1615) 2023-12-26 10:03:07 +00:00
chao cd1235fb32 feat: s3 FormData upload (#1614)
* upgrade package and rtc convert

* upgrade package and rtc convert

* upgrade package and rtc convert

* upgrade package and rtc convert

* friend user

* s3 form data

* s3 form data

* s3 form data

* s3 form data

* s3 form data

* s3 form data

* s3 form data

* s3 form data

* s3 form data
2023-12-26 09:29:42 +00:00
59 changed files with 1105 additions and 244 deletions
-1
View File
@@ -201,7 +201,6 @@ jobs:
- name: Build, Start, Check Services and Print Logs for Ubuntu
if: runner.os == 'Linux'
run: |
sudo make init && \
sudo make build && \
sudo make start && \
sudo make check || \
+4
View File
@@ -97,6 +97,10 @@ It's crafted in Golang and supports cross-platform deployment, ensuring a cohere
## :rocket: Quick Start
We support many platforms. Here are the addresses for quick experience on the web side
👉 **[OpenIM online web demo](https://web-enterprise.rentsoft.cn/)**
You can quickly learn OpenIM engineering solutions, all it takes is one simple command:
```bash
+9 -1
View File
@@ -312,6 +312,14 @@ callback:
enable: false
timeout: 5
failedContinue: true
beforeUpdateUserInfoEx:
enable: false
timeout: 5
failedContinue: true
afterUpdateUserInfoEx:
enable: false
timeout: 5
failedContinue: true
afterSendSingleMsg:
enable: false
timeout: 5
@@ -498,7 +506,7 @@ callback:
# The number of ports needs to be consistent with msg_transfer_service_num in script/path_info.sh
prometheus:
enable: false
prometheusUrl: 172.28.0.1:13000
grafanaUrl: 172.28.0.1:13000
apiPrometheusPort: [20100]
userPrometheusPort: [ 20110 ]
friendPrometheusPort: [ 20120 ]
+65 -97
View File
@@ -1,42 +1,23 @@
# Copyright © 2023 OpenIM. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# General Configuration
# This section contains general configuration options for the entire environment.
# These options can be set via environment variables. If both environment variables
# and settings in this .env file exist, the environment variables take precedence.
# -----------------------------------------------------------------------------
# ==========================
# General Configuration
# ==========================
# These settings apply to the overall environment.
# ======================================
# ========= Basic Configuration ========
# ======================================
# The user for authentication or system operations.
# Default: OPENIM_USER=root
USER=${OPENIM_USER}
# Password associated with the specified user for authentication.
# Default: PASSWORD=openIM123
PASSWORD=${PASSWORD}
# Base URL for the application programming interface (API).
# Default: API_URL=http://172.28.0.1:10002
API_URL=${API_URL}
# Directory path for storing data files or related information.
# Default: DATA_DIR=./
# Data storage directory for persistent data.
# Example: DATA_DIR=/path/to/data
DATA_DIR=${DATA_DIR}
# Choose the appropriate image address, the default is GITHUB image,
# you can choose docker hub, for Chinese users can choose Ali Cloud
# export IMAGE_REGISTRY="ghcr.io/openimsdk"
# export IMAGE_REGISTRY="openim"
# export IMAGE_REGISTRY="registry.cn-hangzhou.aliyuncs.com/openimsdk"
# Docker image registry. Uncomment the preferred one.
# Options: ghcr.io/openimsdk, openim, registry.cn-hangzhou.aliyuncs.com/openimsdk
# IMAGE_REGISTRY="ghcr.io/openimsdk"
# IMAGE_REGISTRY="openim"
# IMAGE_REGISTRY="registry.cn-hangzhou.aliyuncs.com/openimsdk"
IMAGE_REGISTRY=${IMAGE_REGISTRY}
# ======================================
@@ -47,10 +28,9 @@ IMAGE_REGISTRY=${IMAGE_REGISTRY}
# Default: DOCKER_BRIDGE_SUBNET=172.28.0.0/16
DOCKER_BRIDGE_SUBNET=${DOCKER_BRIDGE_SUBNET}
# Gateway for the Docker network.
# Default: DOCKER_BRIDGE_GATEWAY=172.28.0.1
# Set and specify the IP addresses of some containers. Generally speaking,
# you do not need to modify these configurations to facilitate debugging
DOCKER_BRIDGE_GATEWAY=${DOCKER_BRIDGE_GATEWAY}
MONGO_NETWORK_ADDRESS=${MONGO_NETWORK_ADDRESS}
REDIS_NETWORK_ADDRESS=${REDIS_NETWORK_ADDRESS}
KAFKA_NETWORK_ADDRESS=${KAFKA_NETWORK_ADDRESS}
@@ -65,25 +45,45 @@ NODE_EXPORTER_NETWORK_ADDRESS=${NODE_EXPORTER_NETWORK_ADDRESS}
OPENIM_ADMIN_FRONT_NETWORK_ADDRESS=${OPENIM_ADMIN_FRONT_NETWORK_ADDRESS}
ALERT_MANAGER_NETWORK_ADDRESS=${ALERT_MANAGER_NETWORK_ADDRESS}
# ===============================================
# = Component Extension Configuration =
# ===============================================
# ==============================================================================
# Configuration Update Instructions
# ==============================================================================
# This header outlines the methods to update common variables in config.yaml and .env files.
# These instructions are vital for maintaining the OpenIM environment's configuration.
#
# METHOD 1: Regenerate All Configurations
# ----------------------------------------
# Use this method to regenerate all configurations.
# Steps:
# 1. Delete existing config files:
# - openim-server/config/config.yaml
# - openim-chat/config/config.yaml
# 2. Modify the .env file as required.
# 3. Run 'docker compose up -d'. This will regenerate:
# - config/config.yaml
#
# METHOD 2: Modify Individual Configuration Files
# -----------------------------------------------
# Use this method to update specific configuration files.
# Steps:
# 1. Modify the .env file as necessary.
# 2. Update the corresponding entries in:
# - config/config.yaml
# 3. Restart the services with 'docker compose up -d'.
# 4. Special Note: If you modify OPENIM_IP, API_OPENIM_PORT, or MINIO_PORT in .env,
# ensure to update the corresponding services and configurations accordingly.
#
# It is essential to follow these methods to ensure consistent and correct application behavior.
# ==============================================================================
# Local IP address of the service. Modify if necessary.
# Example: OPENIM_IP=172.28.0.1
OPENIM_IP=${OPENIM_IP}
# ============ Component Extension Configuration ==========
# ----- ZooKeeper Configuration -----
# Address or hostname for the ZooKeeper service.
# Default: ZOOKEEPER_ADDRESS=172.28.0.1
ZOOKEEPER_ADDRESS=${ZOOKEEPER_NETWORK_ADDRESS}
# Port for ZooKeeper service.
# Default: ZOOKEEPER_PORT=12181
ZOOKEEPER_PORT=${ZOOKEEPER_PORT}
# ----- MongoDB Configuration -----
# Address or hostname for the MongoDB service.
# Default: MONGO_ADDRESS=172.28.0.1
MONGO_ADDRESS=${MONGO_NETWORK_ADDRESS}
# Port on which MongoDB service is running.
# Default: MONGO_PORT=37017
# MONGO_PORT=${MONGO_PORT}
@@ -101,9 +101,6 @@ MONGO_PASSWORD=${MONGO_PASSWORD}
MONGO_DATABASE=${MONGO_DATABASE}
# ----- Redis Configuration -----
# Address or hostname for the Redis service.
# Default: REDIS_ADDRESS=172.28.0.1
REDIS_ADDRESS=${REDIS_NETWORK_ADDRESS}
# Port on which Redis in-memory data structure store is running.
# Default: REDIS_PORT=16379
@@ -113,11 +110,6 @@ REDIS_PORT=${REDIS_PORT}
# Default: REDIS_PASSWORD=openIM123
REDIS_PASSWORD=${REDIS_PASSWORD}
# ----- Kafka Configuration -----
# Address or hostname for the Kafka service.
# Default: KAFKA_ADDRESS=172.28.0.1
KAFKA_ADDRESS=${KAFKA_NETWORK_ADDRESS}
# Kakfa username to authenticate with the Kafka service.
# KAFKA_USERNAME=${KAFKA_USERNAME}
@@ -129,20 +121,13 @@ KAFKA_PORT=${KAFKA_PORT}
# Default: KAFKA_LATESTMSG_REDIS_TOPIC=latestMsgToRedis
KAFKA_LATESTMSG_REDIS_TOPIC=${KAFKA_LATESTMSG_REDIS_TOPIC}
# Topic in Kafka for pushing messages (e.g. notifications or updates).
# Default: KAFKA_MSG_PUSH_TOPIC=msgToPush
KAFKA_MSG_PUSH_TOPIC=${KAFKA_MSG_PUSH_TOPIC}
# Topic in Kafka for storing offline messages in MongoDB.
# Default: KAFKA_OFFLINEMSG_MONGO_TOPIC=offlineMsgToMongoMysql
KAFKA_OFFLINEMSG_MONGO_TOPIC=${KAFKA_OFFLINEMSG_MONGO_TOPIC}
# ----- MinIO Configuration ----
# Address or hostname for the MinIO object storage service.
# Default: MINIO_ADDRESS=172.28.0.1
MINIO_ADDRESS=${MINIO_NETWORK_ADDRESS}
# Port on which MinIO object storage service is running.
# MINIO_PORT
# ----------
# MINIO_PORT sets the port for the MinIO object storage service.
# Upon changing this port, the MinIO endpoint URLs in the `config/config.yaml` file must be updated
# to reflect this change. The endpoints include both the 'endpoint' and 'signEndpoint'
# under the MinIO configuration.
#
# Default: MINIO_PORT=10005
MINIO_PORT=${MINIO_PORT}
@@ -155,19 +140,11 @@ MINIO_PORT=${MINIO_PORT}
MINIO_SECRET_KEY=${MINIO_SECRET_KEY}
# ----- Prometheus Configuration -----
# Address or hostname for the Prometheus service.
# Default: PROMETHEUS_ADDRESS=172.28.0.1
PROMETHEUS_ADDRESS=${PROMETHEUS_NETWORK_ADDRESS}
# Port on which Prometheus service is running.
# Default: PROMETHEUS_PORT=19090
PROMETHEUS_PORT=${PROMETHEUS_PORT}
# ----- Grafana Configuration -----
# Address or hostname for the Grafana service.
# Default: GRAFANA_ADDRESS=172.28.0.1
GRAFANA_ADDRESS=${GRAFANA_NETWORK_ADDRESS}
# Port on which Grafana service is running.
# Default: GRAFANA_PORT=13000
GRAFANA_PORT=${GRAFANA_PORT}
@@ -184,23 +161,19 @@ OPENIM_WEB_DIST_PATH=${OPENIM_WEB_DIST_PATH}
# Default: OPENIM_WEB_PORT=11001
OPENIM_WEB_PORT=${OPENIM_WEB_PORT}
# Address or hostname for the OpenIM web service.
# Default: OPENIM_WEB_ADDRESS=172.28.0.1
OPENIM_WEB_ADDRESS=${OPENIM_WEB_NETWORK_ADDRESS}
# ======================================
# ========= OpenIM Server ==============
# ======================================
# Address or hostname for the OpenIM server.
# Default: OPENIM_SERVER_ADDRESS=172.28.0.1
OPENIM_SERVER_ADDRESS=${OPENIM_SERVER_NETWORK_ADDRESS}
# Port for the OpenIM WebSockets.
# Default: OPENIM_WS_PORT=10001
OPENIM_WS_PORT=${OPENIM_WS_PORT}
# Port for the OpenIM API.
# API_OPENIM_PORT
# ---------------
# This variable defines the port on which the OpenIM API service will listen.
# When changing this port, it's essential to update the apiURL in the config.yaml file
# to ensure the API service is accessible at the new port.
#
# Default: API_OPENIM_PORT=10002
API_OPENIM_PORT=${API_OPENIM_PORT}
@@ -213,10 +186,6 @@ API_OPENIM_PORT=${API_OPENIM_PORT}
# Default: CHAT_IMAGE_VERSION=main
CHAT_IMAGE_VERSION=${CHAT_IMAGE_VERSION}
# Address or hostname for the OpenIM chat service.
# Default: OPENIM_CHAT_ADDRESS=172.28.0.1
OPENIM_CHAT_ADDRESS=${OPENIM_CHAT_NETWORK_ADDRESS}
# Port for the OpenIM chat API.
# Default: OPENIM_CHAT_API_PORT=10008
OPENIM_CHAT_API_PORT=${OPENIM_CHAT_API_PORT}
@@ -225,7 +194,6 @@ OPENIM_CHAT_API_PORT=${OPENIM_CHAT_API_PORT}
# Default: OPENIM_CHAT_DATA_DIR=./openim-chat/main
OPENIM_CHAT_DATA_DIR=${OPENIM_CHAT_DATA_DIR}
# ======================================
# ========== OpenIM Admin ==============
# ======================================
+1 -1
View File
@@ -506,7 +506,7 @@ callback:
# The number of ports needs to be consistent with msg_transfer_service_num in script/path_info.sh
prometheus:
enable: ${PROMETHEUS_ENABLE}
prometheusUrl: ${PROMETHEUS_URL}
grafanaUrl: ${GRAFANA_URL}
apiPrometheusPort: [${API_PROM_PORT}]
userPrometheusPort: [ ${USER_PROM_PORT} ]
friendPrometheusPort: [ ${FRIEND_PROM_PORT} ]
+7 -1
View File
@@ -122,7 +122,7 @@ services:
server:
ipv4_address: ${OPENIM_WEB_NETWORK_ADDRESS:-172.28.0.7}
# Uncomment and configure the following services as needed
## Uncomment and configure the following services as needed
# openim-admin:
# image: ${IMAGE_REGISTRY:-ghcr.io/openimsdk}/openim-admin-front:v3.4.0
# container_name: openim-admin
@@ -167,6 +167,12 @@ services:
# hostname: grafana
# user: root
# restart: always
# environment:
# - GF_SECURITY_ALLOW_EMBEDDING=true
# - GF_SESSION_COOKIE_SAMESITE=none
# - GF_SESSION_COOKIE_SECURE=true
# - GF_AUTH_ANONYMOUS_ENABLED=true
# - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
# ports:
# - "${GRAFANA_PORT:-13000}:3000"
# volumes:
+4 -6
View File
@@ -4,6 +4,8 @@ go 1.19
require (
firebase.google.com/go v3.13.0+incompatible
github.com/OpenIMSDK/protocol v0.0.42
github.com/OpenIMSDK/tools v0.0.21
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/dtm-labs/rockscache v0.1.1
github.com/gin-gonic/gin v1.9.1
@@ -33,8 +35,6 @@ require github.com/google/uuid v1.3.1
require (
github.com/IBM/sarama v1.41.3
github.com/OpenIMSDK/protocol v0.0.36
github.com/OpenIMSDK/tools v0.0.21
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
github.com/go-redis/redis v6.15.9+incompatible
github.com/redis/go-redis/v9 v9.2.1
@@ -133,7 +133,7 @@ require (
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
@@ -155,6 +155,4 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
replace github.com/OpenIMSDK/protocol v0.0.36 => github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5
)
+4 -4
View File
@@ -18,6 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/OpenIMSDK/protocol v0.0.42 h1:vIWXqZJZZ1ddleJA25fxhjZ1GyEHATpYM3wVWh4/+PY=
github.com/OpenIMSDK/protocol v0.0.42/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.21 h1:iTapc2mIEVH/xl5Nd6jfwPub11Pgp44tVcE1rjB3a48=
github.com/OpenIMSDK/tools v0.0.21/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
@@ -225,8 +227,6 @@ github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205Ah
github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5 h1:nmrJmAgQsCAxKgw109kaTcBV4rMWDRvqOson0ehw708=
github.com/luhaoling/protocol v0.0.0-20231222100538-d625562d53d5/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
@@ -453,8 +453,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+4
View File
@@ -33,6 +33,10 @@ func (o *ConversationApi) GetAllConversations(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetAllConversations, o.Client, c)
}
func (o *ConversationApi) GetConversationsList(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetConversationList, o.Client, c)
}
func (o *ConversationApi) GetConversation(c *gin.Context) {
a2r.Call(conversation.ConversationClient.GetConversation, o.Client, c)
}
+3
View File
@@ -92,3 +92,6 @@ func (o *FriendApi) GetFriendIDs(c *gin.Context) {
func (o *FriendApi) GetSpecifiedFriendsInfo(c *gin.Context) {
a2r.Call(friend.FriendClient.GetSpecifiedFriendsInfo, o.Client, c)
}
func (o *FriendApi) UpdateFriends(c *gin.Context) {
a2r.Call(friend.FriendClient.UpdateFriends, o.Client, c)
}
+29 -2
View File
@@ -172,6 +172,7 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
if err = m.userRpcClient.GetNotificationByID(c, req.SendID); err != nil {
return nil, err
}
default:
return nil, errs.ErrArgs.WithDetail("not support err contentType")
}
@@ -185,38 +186,63 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
return m.newUserSendMsgReq(c, &req), nil
}
// SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework.
func (m *MessageApi) SendMessage(c *gin.Context) {
// Initialize a request struct for sending a message.
req := apistruct.SendMsgReq{}
// Bind the JSON request body to the request struct.
if err := c.BindJSON(&req); err != nil {
// Respond with an error if request body binding fails.
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
// Check if the user has the app manager role.
if !authverify.IsAppManagerUid(c) {
// Respond with a permission error if the user is not an app manager.
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return
}
// Prepare the message request with additional required data.
sendMsgReq, err := m.getSendMsgReq(c, req.SendMsg)
if err != nil {
// Log and respond with an error if preparation fails.
log.ZError(c, "decodeData failed", err)
apiresp.GinError(c, err)
return
}
// Set the receiver ID in the message data.
sendMsgReq.MsgData.RecvID = req.RecvID
// Declare a variable to store the message sending status.
var status int
// Attempt to send the message using the client.
respPb, err := m.Client.SendMsg(c, sendMsgReq)
if err != nil {
// Set the status to failed and respond with an error if sending fails.
status = constant.MsgSendFailed
log.ZError(c, "send message err", err)
apiresp.GinError(c, err)
return
}
// Set the status to successful if the message is sent.
status = constant.MsgSendSuccessed
// Attempt to update the message sending status in the system.
_, err = m.Client.SetSendMsgStatus(c, &msg.SetSendMsgStatusReq{
Status: int32(status),
})
if err != nil {
// Log the error if updating the status fails.
log.ZError(c, "SetSendMsgStatus failed", err)
}
// Respond with a success message and the response payload.
apiresp.GinSuccess(c, respPb)
}
@@ -224,13 +250,14 @@ func (m *MessageApi) SendBusinessNotification(c *gin.Context) {
req := struct {
Key string `json:"key"`
Data string `json:"data"`
SendUserID string `json:"sendUserID"`
RecvUserID string `json:"recvUserID"`
SendUserID string `json:"sendUserID" binding:"required"`
RecvUserID string `json:"recvUserID" binding:"required"`
}{}
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
if !authverify.IsAppManagerUid(c) {
apiresp.GinError(c, errs.ErrNoPermission.Wrap("only app manager can send message"))
return
+5 -1
View File
@@ -67,6 +67,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
{
userRouterGroup.POST("/user_register", u.UserRegister)
userRouterGroup.POST("/update_user_info", ParseToken, u.UpdateUserInfo)
userRouterGroup.POST("/update_user_info_ex", ParseToken, u.UpdateUserInfoEx)
userRouterGroup.POST("/set_global_msg_recv_opt", ParseToken, u.SetGlobalRecvMessageOpt)
userRouterGroup.POST("/get_users_info", ParseToken, u.GetUsersPublicInfo)
userRouterGroup.POST("/get_all_users_uid", ParseToken, u.GetAllUsersID)
@@ -107,7 +108,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
friendRouterGroup.POST("/is_friend", f.IsFriend)
friendRouterGroup.POST("/get_friend_id", f.GetFriendIDs)
friendRouterGroup.POST("/get_specified_friends_info", f.GetSpecifiedFriendsInfo)
//friendRouterGroup.POST("/set_pin_friend", f.SetPinFriends)
friendRouterGroup.POST("/update_friends", f.UpdateFriends)
}
g := NewGroupApi(*groupRpc)
groupRouterGroup := r.Group("/group", ParseToken)
@@ -171,6 +172,8 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
objectGroup.POST("/auth_sign", t.AuthSign)
objectGroup.POST("/complete_multipart_upload", t.CompleteMultipartUpload)
objectGroup.POST("/access_url", t.AccessURL)
objectGroup.POST("/initiate_form_data", t.InitiateFormData)
objectGroup.POST("/complete_form_data", t.CompleteFormData)
objectGroup.GET("/*name", t.ObjectRedirect)
}
// Message
@@ -201,6 +204,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive
conversationGroup := r.Group("/conversation", ParseToken)
{
c := NewConversationApi(*conversationRpc)
conversationGroup.POST("/get_conversations_list", c.GetConversationsList)
conversationGroup.POST("/get_all_conversations", c.GetAllConversations)
conversationGroup.POST("/get_conversation", c.GetConversation)
conversationGroup.POST("/get_conversations", c.GetConversations)
+9 -1
View File
@@ -71,6 +71,14 @@ func (o *ThirdApi) AccessURL(c *gin.Context) {
a2r.Call(third.ThirdClient.AccessURL, o.Client, c)
}
func (o *ThirdApi) InitiateFormData(c *gin.Context) {
a2r.Call(third.ThirdClient.InitiateFormData, o.Client, c)
}
func (o *ThirdApi) CompleteFormData(c *gin.Context) {
a2r.Call(third.ThirdClient.CompleteFormData, o.Client, c)
}
func (o *ThirdApi) ObjectRedirect(c *gin.Context) {
name := c.Param("name")
if name == "" {
@@ -122,5 +130,5 @@ func (o *ThirdApi) SearchLogs(c *gin.Context) {
}
func GetPrometheus(c *gin.Context) {
c.Redirect(http.StatusFound, config2.Config.Prometheus.PrometheusUrl)
c.Redirect(http.StatusFound, config2.Config.Prometheus.GrafanaUrl)
}
+3 -1
View File
@@ -41,7 +41,9 @@ func (u *UserApi) UserRegister(c *gin.Context) {
func (u *UserApi) UpdateUserInfo(c *gin.Context) {
a2r.Call(user.UserClient.UpdateUserInfo, u.Client, c)
}
func (u *UserApi) UpdateUserInfoEx(c *gin.Context) {
a2r.Call(user.UserClient.UpdateUserInfoEx, u.Client, c)
}
func (u *UserApi) SetGlobalRecvMessageOpt(c *gin.Context) {
a2r.Call(user.UserClient.SetGlobalRecvMessageOpt, u.Client, c)
}
+4
View File
@@ -87,6 +87,7 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
}
}
// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(
ctx *UserConnContext,
conn LongConn,
@@ -108,11 +109,13 @@ func (c *Client) ResetClient(
c.token = token
}
// pingHandler handles ping messages and sends pong responses.
func (c *Client) pingHandler(_ string) error {
_ = c.conn.SetReadDeadline(pongWait)
return c.writePongMsg()
}
// readMessage continuously reads messages from the connection.
func (c *Client) readMessage() {
defer func() {
if r := recover(); r != nil {
@@ -164,6 +167,7 @@ func (c *Client) readMessage() {
}
}
// handleMessage processes a single message received by the client.
func (c *Client) handleMessage(message []byte) error {
if c.IsCompress {
var err error
+173
View File
@@ -17,6 +17,8 @@ package conversation
import (
"context"
"errors"
"github.com/OpenIMSDK/protocol/sdkws"
"sort"
"github.com/OpenIMSDK/tools/tx"
@@ -41,6 +43,8 @@ import (
)
type conversationServer struct {
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
conversationNotificationSender *notification.ConversationNotificationSender
@@ -61,7 +65,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
}
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgRpcClient := rpcclient.NewMessageRpcClient(client)
userRpcClient := rpcclient.NewUserRpcClient(client)
pbconversation.RegisterConversationServer(server, &conversationServer{
msgRpcClient: &msgRpcClient,
user: &userRpcClient,
conversationNotificationSender: notification.NewConversationNotificationSender(&msgRpcClient),
groupRpcClient: &groupRpcClient,
conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewMongo(mongo.GetClient())),
@@ -82,6 +89,73 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers
return resp, nil
}
func (m *conversationServer) GetConversationList(ctx context.Context, req *pbconversation.GetConversationListReq) (resp *pbconversation.GetConversationListResp, err error) {
log.ZDebug(ctx, "GetConversationList", "seqs", req, "userID", req.UserID)
var conversationIDs []string
if len(req.ConversationIDs) == 0 {
conversationIDs, err = m.conversationDatabase.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}
} else {
conversationIDs = req.ConversationIDs
}
conversations, err := m.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
if len(conversations) == 0 {
return nil, errs.ErrRecordNotFound.Wrap()
}
maxSeqs, err := m.msgRpcClient.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
return nil, err
}
chatLogs, err := m.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
if err != nil {
return nil, err
}
conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID)
if err != nil {
return nil, err
}
hasReadSeqs, err := m.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs)
if err != nil {
return nil, err
}
conversation_unreadCount := make(map[string]int64)
for conversationID, maxSeq := range maxSeqs {
conversation_unreadCount[conversationID] = maxSeq - hasReadSeqs[conversationID]
}
conversation_isPinkTime := make(map[int64]string)
conversation_notPinkTime := make(map[int64]string)
for _, v := range conversations {
conversationID := v.ConversationID
time := conversationMsg[conversationID].MsgInfo.LatestMsgRecvTime
conversationMsg[conversationID].RecvMsgOpt = v.RecvMsgOpt
if v.IsPinned {
conversationMsg[conversationID].IsPinned = v.IsPinned
conversation_isPinkTime[time] = conversationID
continue
}
conversation_notPinkTime[time] = conversationID
}
resp = &pbconversation.GetConversationListResp{
ConversationElems: []*pbconversation.ConversationElem{},
}
m.conversationSort(conversation_isPinkTime, resp, conversation_unreadCount, conversationMsg)
m.conversationSort(conversation_notPinkTime, resp, conversation_unreadCount, conversationMsg)
return resp, nil
}
func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) {
conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID)
if err != nil {
@@ -348,3 +422,102 @@ func (c *conversationServer) GetConversationOfflinePushUserIDs(
}
return &pbconversation.GetConversationOfflinePushUserIDsResp{UserIDs: utils.Keys(userIDSet)}, nil
}
func (c *conversationServer) conversationSort(
conversations map[int64]string,
resp *pbconversation.GetConversationListResp,
conversation_unreadCount map[string]int64,
conversationMsg map[string]*pbconversation.ConversationElem,
) {
keys := []int64{}
for key := range conversations {
keys = append(keys, key)
}
sort.Slice(keys[:], func(i, j int) bool {
return keys[i] > keys[j]
})
index := 0
cons := make([]*pbconversation.ConversationElem, len(conversations))
for _, v := range keys {
conversationID := conversations[v]
conversationElem := conversationMsg[conversationID]
conversationElem.UnreadCount = conversation_unreadCount[conversationID]
cons[index] = conversationElem
index++
}
resp.ConversationElems = append(resp.ConversationElems, cons...)
}
func (c *conversationServer) getConversationInfo(
ctx context.Context,
chatLogs map[string]*sdkws.MsgData,
userID string) (map[string]*pbconversation.ConversationElem, error) {
var (
sendIDs []string
groupIDs []string
sendMap = make(map[string]*sdkws.UserInfo)
groupMap = make(map[string]*sdkws.GroupInfo)
conversationMsg = make(map[string]*pbconversation.ConversationElem)
)
for _, chatLog := range chatLogs {
switch chatLog.SessionType {
case constant.SingleChatType:
if chatLog.SendID == userID {
sendIDs = append(sendIDs, chatLog.RecvID)
}
sendIDs = append(sendIDs, chatLog.SendID)
case constant.GroupChatType, constant.SuperGroupChatType:
groupIDs = append(groupIDs, chatLog.GroupID)
sendIDs = append(sendIDs, chatLog.SendID)
}
}
if len(sendIDs) != 0 {
sendInfos, err := c.user.GetUsersInfo(ctx, sendIDs)
if err != nil {
return nil, err
}
for _, sendInfo := range sendInfos {
sendMap[sendInfo.UserID] = sendInfo
}
}
if len(groupIDs) != 0 {
groupInfos, err := c.groupRpcClient.GetGroupInfos(ctx, groupIDs, false)
if err != nil {
return nil, err
}
for _, groupInfo := range groupInfos {
groupMap[groupInfo.GroupID] = groupInfo
}
}
for conversationID, chatLog := range chatLogs {
pbchatLog := &pbconversation.ConversationElem{}
msgInfo := &pbconversation.MsgInfo{}
if err := utils.CopyStructFields(msgInfo, chatLog); err != nil {
return nil, err
}
switch chatLog.SessionType {
case constant.SingleChatType:
if chatLog.SendID == userID {
msgInfo.FaceURL = sendMap[chatLog.RecvID].FaceURL
msgInfo.SenderName = sendMap[chatLog.RecvID].Nickname
break
}
msgInfo.FaceURL = sendMap[chatLog.SendID].FaceURL
msgInfo.SenderName = sendMap[chatLog.SendID].Nickname
case constant.GroupChatType, constant.SuperGroupChatType:
msgInfo.GroupName = groupMap[chatLog.GroupID].GroupName
msgInfo.GroupFaceURL = groupMap[chatLog.GroupID].FaceURL
msgInfo.GroupMemberCount = groupMap[chatLog.GroupID].MemberCount
msgInfo.GroupID = chatLog.GroupID
msgInfo.GroupType = groupMap[chatLog.GroupID].GroupType
msgInfo.SenderName = sendMap[chatLog.SendID].Nickname
}
pbchatLog.ConversationID = conversationID
msgInfo.LatestMsgRecvTime = chatLog.SendTime
pbchatLog.MsgInfo = msgInfo
conversationMsg[conversationID] = pbchatLog
}
return conversationMsg, nil
}
+21 -16
View File
@@ -53,11 +53,6 @@ type friendServer struct {
RegisterCenter registry.SvcDiscoveryRegistry
}
func (s *friendServer) UpdateFriends(ctx context.Context, req *pbfriend.UpdateFriendsReq) (*pbfriend.UpdateFriendsResp, error) {
//TODO implement me
panic("implement me")
}
func Start(client registry.SvcDiscoveryRegistry, server *grpc.Server) error {
// Initialize MongoDB
mongo, err := unrelation.NewMongo()
@@ -441,7 +436,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *pbfrien
}
return resp, nil
}
func (s *friendServer) PinFriends(
func (s *friendServer) UpdateFriends(
ctx context.Context,
req *pbfriend.UpdateFriendsReq,
) (*pbfriend.UpdateFriendsResp, error) {
@@ -451,25 +446,35 @@ func (s *friendServer) PinFriends(
if utils.Duplicate(req.FriendUserIDs) {
return nil, errs.ErrArgs.Wrap("friendIDList repeated")
}
var isPinned bool
if req.IsPinned != nil {
isPinned = req.IsPinned.Value
} else {
return nil, errs.ErrArgs.Wrap("isPinned is nil")
}
//check whther in friend list
_, err := s.friendDatabase.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, err
}
//set friendslist friend pin status to isPinned
for _, friendID := range req.FriendUserIDs {
if err := s.friendDatabase.UpdateFriendPinStatus(ctx, req.OwnerUserID, friendID, isPinned); err != nil {
return nil, err
if req.IsPinned != nil {
if err = s.friendDatabase.UpdateFriendPinStatus(ctx, req.OwnerUserID, friendID, req.IsPinned.Value); err != nil {
return nil, err
}
}
if req.Remark != nil {
if err = s.friendDatabase.UpdateFriendRemark(ctx, req.OwnerUserID, friendID, req.Remark.Value); err != nil {
return nil, err
}
}
if req.Ex != nil {
if err = s.friendDatabase.UpdateFriendEx(ctx, req.OwnerUserID, friendID, req.Ex.Value); err != nil {
return nil, err
}
}
}
resp := &pbfriend.UpdateFriendsResp{}
err = s.notificationSender.FriendsInfoUpdateNotification(ctx, req.OwnerUserID, req.FriendUserIDs)
if err != nil {
return nil, errs.Wrap(err, "FriendsInfoUpdateNotification Error")
}
return resp, nil
}
+1 -2
View File
@@ -18,6 +18,7 @@ import (
"context"
utils2 "github.com/OpenIMSDK/tools/utils"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/redis/go-redis/v9"
@@ -26,8 +27,6 @@ import (
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/tools/errs"
"github.com/OpenIMSDK/tools/log"
cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
)
func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (resp *msg.GetConversationsHasReadAndMaxSeqResp, err error) {
+1 -1
View File
@@ -100,7 +100,7 @@ func callbackAfterSendSingleMsg(ctx context.Context, msg *pbchat.SendMsgReq) err
}
func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbchat.SendMsgReq) error {
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable {
if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable {
return nil
}
req := &cbapi.CallbackBeforeSendGroupMsgReq{
+24 -1
View File
@@ -16,7 +16,6 @@ package msg
import (
"context"
pbmsg "github.com/OpenIMSDK/protocol/msg"
)
@@ -30,3 +29,27 @@ func (m *msgServer) GetConversationMaxSeq(
}
return &pbmsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil
}
func (m *msgServer) GetMaxSeqs(ctx context.Context, req *pbmsg.GetMaxSeqsReq) (*pbmsg.SeqsInfoResp, error) {
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbmsg.SeqsInfoResp{MaxSeqs: maxSeqs}, nil
}
func (m *msgServer) GetHasReadSeqs(ctx context.Context, req *pbmsg.GetHasReadSeqsReq) (*pbmsg.SeqsInfoResp, error) {
hasReadSeqs, err := m.MsgDatabase.GetHasReadSeqs(ctx, req.UserID, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbmsg.SeqsInfoResp{MaxSeqs: hasReadSeqs}, nil
}
func (m *msgServer) GetMsgByConversationIDs(ctx context.Context, req *pbmsg.GetMsgByConversationIDsReq) (*pbmsg.GetMsgByConversationIDsResp, error) {
Msgs, err := m.MsgDatabase.FindOneByDocIDs(ctx, req.ConversationIDs, req.MaxSeqs)
if err != nil {
return nil, err
}
return &pbmsg.GetMsgByConversationIDsResp{MsgDatas: Msgs}, nil
}
+113
View File
@@ -16,6 +16,12 @@ package third
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv"
"time"
@@ -179,6 +185,113 @@ func (t *thirdServer) AccessURL(ctx context.Context, req *third.AccessURLReq) (*
}, nil
}
func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
if req.Name == "" {
return nil, errs.ErrArgs.Wrap("name is empty")
}
if req.Size <= 0 {
return nil, errs.ErrArgs.Wrap("size must be greater than 0")
}
if err := checkUploadName(ctx, req.Name); err != nil {
return nil, err
}
var duration time.Duration
opUserID := mcontext.GetOpUserID(ctx)
var key string
if authverify.IsManagerUserID(opUserID) {
if req.Millisecond <= 0 {
duration = time.Minute * 10
} else {
duration = time.Millisecond * time.Duration(req.Millisecond)
}
if req.Absolute {
key = req.Name
}
} else {
duration = time.Minute * 10
}
uid, err := uuid.NewRandom()
if err != nil {
return nil, err
}
if key == "" {
date := time.Now().Format("20060102")
key = path.Join(cont.DirectPath, date, opUserID, hex.EncodeToString(uid[:])+path.Ext(req.Name))
}
mate := FormDataMate{
Name: req.Name,
Size: req.Size,
ContentType: req.ContentType,
Group: req.Group,
Key: key,
}
mateData, err := json.Marshal(&mate)
if err != nil {
return nil, err
}
resp, err := t.s3dataBase.FormData(ctx, key, req.Size, req.ContentType, duration)
if err != nil {
return nil, err
}
return &third.InitiateFormDataResp{
Id: base64.RawStdEncoding.EncodeToString(mateData),
Url: resp.URL,
File: resp.File,
Header: toPbMapArray(resp.Header),
FormData: resp.FormData,
Expires: resp.Expires.UnixMilli(),
SuccessCodes: utils.Slice(resp.SuccessCodes, func(code int) int32 {
return int32(code)
}),
}, nil
}
func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
if req.Id == "" {
return nil, errs.ErrArgs.Wrap("id is empty")
}
data, err := base64.RawStdEncoding.DecodeString(req.Id)
if err != nil {
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
var mate FormDataMate
if err := json.Unmarshal(data, &mate); err != nil {
return nil, errs.ErrArgs.Wrap("invalid id " + err.Error())
}
if err := checkUploadName(ctx, mate.Name); err != nil {
return nil, err
}
info, err := t.s3dataBase.StatObject(ctx, mate.Key)
if err != nil {
return nil, err
}
if info.Size > 0 && info.Size != mate.Size {
return nil, errs.ErrData.Wrap("file size mismatch")
}
obj := &relation.ObjectModel{
Name: mate.Name,
UserID: mcontext.GetOpUserID(ctx),
Hash: "etag_" + info.ETag,
Key: info.Key,
Size: info.Size,
ContentType: mate.ContentType,
Group: mate.Group,
CreateTime: time.Now(),
}
if err := t.s3dataBase.SetObject(ctx, obj); err != nil {
return nil, err
}
return &third.CompleteFormDataResp{Url: t.apiAddress(mate.Name)}, nil
}
func (t *thirdServer) apiAddress(name string) string {
return t.apiURL + name
}
type FormDataMate struct {
Name string `json:"name"`
Size int64 `json:"size"`
ContentType string `json:"contentType"`
Group string `json:"group"`
Key string `json:"key"`
}
-10
View File
@@ -101,16 +101,6 @@ type thirdServer struct {
defaultExpire time.Duration
}
func (t *thirdServer) InitiateFormData(ctx context.Context, req *third.InitiateFormDataReq) (*third.InitiateFormDataResp, error) {
//TODO implement me
panic("implement me")
}
func (t *thirdServer) CompleteFormData(ctx context.Context, req *third.CompleteFormDataReq) (*third.CompleteFormDataResp, error) {
//TODO implement me
panic("implement me")
}
func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil {
+3
View File
@@ -29,6 +29,9 @@ import (
)
func toPbMapArray(m map[string][]string) []*third.KeyValues {
if len(m) == 0 {
return nil
}
res := make([]*third.KeyValues, 0, len(m))
for key := range m {
res = append(res, &third.KeyValues{
+35 -2
View File
@@ -16,7 +16,6 @@ package user
import (
"context"
pbuser "github.com/OpenIMSDK/protocol/user"
"github.com/OpenIMSDK/tools/utils"
@@ -44,7 +43,6 @@ func CallbackBeforeUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInf
utils.NotNilReplace(&req.UserInfo.Nickname, resp.Nickname)
return nil
}
func CallbackAfterUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) error {
if !config.Config.Callback.CallbackAfterUpdateUserInfo.Enable {
return nil
@@ -61,6 +59,41 @@ func CallbackAfterUpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfo
}
return nil
}
func CallbackBeforeUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) error {
if !config.Config.Callback.CallbackBeforeUpdateUserInfoEx.Enable {
return nil
}
cbReq := &cbapi.CallbackBeforeUpdateUserInfoExReq{
CallbackCommand: cbapi.CallbackBeforeUpdateUserInfoExCommand,
UserID: req.UserInfo.UserID,
FaceURL: req.UserInfo.FaceURL,
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackBeforeUpdateUserInfoExResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfoEx); err != nil {
return err
}
utils.NotNilReplace(req.UserInfo.FaceURL, resp.FaceURL)
utils.NotNilReplace(req.UserInfo.Ex, resp.Ex)
utils.NotNilReplace(req.UserInfo.Nickname, resp.Nickname)
return nil
}
func CallbackAfterUpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) error {
if !config.Config.Callback.CallbackAfterUpdateUserInfoEx.Enable {
return nil
}
cbReq := &cbapi.CallbackAfterUpdateUserInfoExReq{
CallbackCommand: cbapi.CallbackAfterUpdateUserInfoExCommand,
UserID: req.UserInfo.UserID,
FaceURL: req.UserInfo.FaceURL,
Nickname: req.UserInfo.Nickname,
}
resp := &cbapi.CallbackAfterUpdateUserInfoExResp{}
if err := http.CallBackPostReturn(ctx, config.Config.Callback.CallbackUrl, cbReq, resp, config.Config.Callback.CallbackBeforeUpdateUserInfoEx); err != nil {
return err
}
return nil
}
func CallbackBeforeUserRegister(ctx context.Context, req *pbuser.UserRegisterReq) error {
if !config.Config.Callback.CallbackBeforeUserRegister.Enable {
+72 -20
View File
@@ -17,6 +17,7 @@ package user
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
"math/rand"
"strings"
"time"
@@ -148,7 +149,41 @@ func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserI
}
return resp, nil
}
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (resp *pbuser.UpdateUserInfoExResp, err error) {
resp = &pbuser.UpdateUserInfoExResp{}
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
if err = CallbackBeforeUpdateUserInfoEx(ctx, req); err != nil {
return nil, err
}
data := convert.UserPb2DBMapEx(req.UserInfo)
if err = s.UpdateByMap(ctx, req.UserInfo.UserID, data); err != nil {
return nil, err
}
_ = s.friendNotificationSender.UserInfoUpdatedNotification(ctx, req.UserInfo.UserID)
friends, err := s.friendRpcClient.GetFriendIDs(ctx, req.UserInfo.UserID)
if err != nil {
return nil, err
}
if req.UserInfo.Nickname != nil || req.UserInfo.FaceURL != nil {
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
log.ZError(ctx, "NotificationUserInfoUpdate", err)
}
}
for _, friendID := range friends {
s.friendNotificationSender.FriendInfoUpdatedNotification(ctx, req.UserInfo.UserID, friendID)
}
if err := CallbackAfterUpdateUserInfoEx(ctx, req); err != nil {
return nil, err
}
if err := s.groupRpcClient.NotificationUserInfoUpdate(ctx, req.UserInfo.UserID); err != nil {
log.ZError(ctx, "NotificationUserInfoUpdate", err, "userID", req.UserInfo.UserID)
}
return resp, nil
}
func (s *userServer) SetGlobalRecvMessageOpt(ctx context.Context, req *pbuser.SetGlobalRecvMessageOptReq) (resp *pbuser.SetGlobalRecvMessageOptResp, err error) {
resp = &pbuser.SetGlobalRecvMessageOptResp{}
if _, err := s.FindWithError(ctx, []string{req.UserID}); err != nil {
@@ -462,31 +497,31 @@ func (s *userServer) SearchNotificationAccount(ctx context.Context, req *pbuser.
return nil, err
}
if req.NickName != "" {
users, err := s.UserDatabase.FindByNickname(ctx, req.NickName)
if err != nil {
return nil, err
}
resp := s.userModelToResp(users)
return resp, nil
}
if req.UserID != "" {
users, err := s.UserDatabase.Find(ctx, []string{req.UserID})
if err != nil {
return nil, err
}
resp := s.userModelToResp(users)
return resp, nil
}
_, users, err := s.UserDatabase.Page(ctx, req.Pagination)
if err != nil {
return nil, err
}
var total int64
accounts := make([]*pbuser.NotificationAccountInfo, 0, len(users))
for _, v := range users {
if v.AppMangerLevel != constant.AppNotificationAdmin {
continue
}
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
}
accounts = append(accounts, temp)
total += 1
}
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: accounts}, nil
}
func (s *userServer) UpdateUserInfoEx(ctx context.Context, req *pbuser.UpdateUserInfoExReq) (*pbuser.UpdateUserInfoExResp, error) {
//TODO implement me
panic("implement me")
resp := s.userModelToResp(users)
return resp, nil
}
func (s *userServer) GetNotificationAccount(ctx context.Context, req *pbuser.GetNotificationAccountReq) (*pbuser.GetNotificationAccountResp, error) {
@@ -518,3 +553,20 @@ func (s *userServer) genUserID() string {
}
return string(data)
}
func (s *userServer) userModelToResp(users []*relation.UserModel) *pbuser.SearchNotificationAccountResp {
accounts := make([]*pbuser.NotificationAccountInfo, 0)
var total int64
for _, v := range users {
if v.AppMangerLevel == constant.AppNotificationAdmin || v.AppMangerLevel == constant.AppAdmin {
temp := &pbuser.NotificationAccountInfo{
UserID: v.UserID,
FaceURL: v.FaceURL,
NickName: v.Nickname,
}
accounts = append(accounts, temp)
total += 1
}
}
return &pbuser.SearchNotificationAccountResp{Total: total, NotificationAccounts: accounts}
}
+24
View File
@@ -64,6 +64,30 @@ type SendMsgReq struct {
SendMsg
}
type GetConversationListReq struct {
// userID uniquely identifies the user.
UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty" binding:"required"`
// ConversationIDs contains a list of unique identifiers for conversations.
ConversationIDs []string `protobuf:"bytes,2,rep,name=conversationIDs,proto3" json:"conversationIDs,omitempty"`
}
type GetConversationListResp struct {
// ConversationElems is a map that associates conversation IDs with their respective details.
ConversationElems map[string]*ConversationElem `protobuf:"bytes,1,rep,name=conversationElems,proto3" json:"conversationElems,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
type ConversationElem struct {
// MaxSeq represents the maximum sequence number within the conversation.
MaxSeq int64 `protobuf:"varint,1,opt,name=maxSeq,proto3" json:"maxSeq,omitempty"`
// UnreadSeq represents the number of unread messages in the conversation.
UnreadSeq int64 `protobuf:"varint,2,opt,name=unreadSeq,proto3" json:"unreadSeq,omitempty"`
// LastSeqTime represents the timestamp of the last sequence in the conversation.
LastSeqTime int64 `protobuf:"varint,3,opt,name=LastSeqTime,proto3" json:"LastSeqTime,omitempty"`
}
// BatchSendMsgReq defines the structure for sending a message to multiple recipients.
type BatchSendMsgReq struct {
SendMsg
+2
View File
@@ -37,6 +37,8 @@ const (
CallbackGroupMsgReadCommand = "callbackGroupMsgReadCommand"
CallbackMsgModifyCommand = "callbackMsgModifyCommand"
CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand"
CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand"
CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand"
CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand"
CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand"
CallbackTransferGroupOwnerAfter = "callbackTransferGroupOwnerAfter"
+29 -1
View File
@@ -14,7 +14,10 @@
package callbackstruct
import "github.com/OpenIMSDK/protocol/sdkws"
import (
"github.com/OpenIMSDK/protocol/sdkws"
"github.com/OpenIMSDK/protocol/wrapperspb"
)
type CallbackBeforeUpdateUserInfoReq struct {
CallbackCommand `json:"callbackCommand"`
@@ -41,6 +44,31 @@ type CallbackAfterUpdateUserInfoResp struct {
CommonCallbackResp
}
type CallbackBeforeUpdateUserInfoExReq struct {
CallbackCommand `json:"callbackCommand"`
UserID string `json:"userID"`
Nickname *wrapperspb.StringValue `json:"nickName"`
FaceURL *wrapperspb.StringValue `json:"faceURL"`
Ex *wrapperspb.StringValue `json:"ex"`
}
type CallbackBeforeUpdateUserInfoExResp struct {
CommonCallbackResp
Nickname *wrapperspb.StringValue `json:"nickName"`
FaceURL *wrapperspb.StringValue `json:"faceURL"`
Ex *wrapperspb.StringValue `json:"ex"`
}
type CallbackAfterUpdateUserInfoExReq struct {
CallbackCommand `json:"callbackCommand"`
UserID string `json:"userID"`
Nickname *wrapperspb.StringValue `json:"nickName"`
FaceURL *wrapperspb.StringValue `json:"faceURL"`
Ex *wrapperspb.StringValue `json:"ex"`
}
type CallbackAfterUpdateUserInfoExResp struct {
CommonCallbackResp
}
type CallbackBeforeUserRegisterReq struct {
CallbackCommand `json:"callbackCommand"`
Secret string `json:"secret"`
+1 -1
View File
@@ -45,7 +45,7 @@ type CmdOpts struct {
func WithCronTaskLogName() func(*CmdOpts) {
return func(opts *CmdOpts) {
opts.loggerPrefixName = "OpenIM.CronTask.log.all"
opts.loggerPrefixName = "openim.crontask.log.all"
}
}
+3 -1
View File
@@ -282,6 +282,8 @@ type configStruct struct {
CallbackBeforeSetFriendRemark CallBackConfig `yaml:"callbackBeforeSetFriendRemark"`
CallbackAfterSetFriendRemark CallBackConfig `yaml:"callbackAfterSetFriendRemark"`
CallbackBeforeUpdateUserInfo CallBackConfig `yaml:"beforeUpdateUserInfo"`
CallbackBeforeUpdateUserInfoEx CallBackConfig `yaml:"beforeUpdateUserInfoEx"`
CallbackAfterUpdateUserInfoEx CallBackConfig `yaml:"afterUpdateUserInfoEx"`
CallbackBeforeUserRegister CallBackConfig `yaml:"beforeUserRegister"`
CallbackAfterUpdateUserInfo CallBackConfig `yaml:"updateUserInfo"`
CallbackAfterUserRegister CallBackConfig `yaml:"afterUserRegister"`
@@ -312,7 +314,7 @@ type configStruct struct {
Prometheus struct {
Enable bool `yaml:"enable"`
PrometheusUrl string `yaml:"prometheusUrl"`
GrafanaUrl string `yaml:"grafanaUrl"`
ApiPrometheusPort []int `yaml:"apiPrometheusPort"`
UserPrometheusPort []int `yaml:"userPrometheusPort"`
FriendPrometheusPort []int `yaml:"friendPrometheusPort"`
+23 -1
View File
@@ -64,7 +64,7 @@ func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
"global_recv_msg_opt": user.GlobalRecvMsgOpt,
}
for key, value := range fields {
if v, ok := value.(string); ok {
if v, ok := value.(string); ok && v != "" {
val[key] = v
} else if v, ok := value.(int32); ok && v != 0 {
val[key] = v
@@ -72,3 +72,25 @@ func UserPb2DBMap(user *sdkws.UserInfo) map[string]any {
}
return val
}
func UserPb2DBMapEx(user *sdkws.UserInfoWithEx) map[string]any {
if user == nil {
return nil
}
val := make(map[string]any)
// Map fields from UserInfoWithEx to val
if user.Nickname != nil {
val["nickname"] = user.Nickname.Value
}
if user.FaceURL != nil {
val["face_url"] = user.FaceURL.Value
}
if user.Ex != nil {
val["ex"] = user.Ex.Value
}
if user.GlobalRecvMsgOpt != nil {
val["global_recv_msg_opt"] = user.GlobalRecvMsgOpt.Value
}
return val
}
+9 -1
View File
@@ -87,7 +87,15 @@ func NewRedis() (redis.UniversalClient, error) {
// overrideConfigFromEnv overrides configuration fields with environment variables if present.
func overrideConfigFromEnv() {
if envAddr := os.Getenv("REDIS_ADDRESS"); envAddr != "" {
config.Config.Redis.Address = strings.Split(envAddr, ",") // Assuming addresses are comma-separated
if envPort := os.Getenv("REDIS_PORT"); envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + envPort
}
config.Config.Redis.Address = addresses
} else {
config.Config.Redis.Address = strings.Split(envAddr, ",")
}
}
if envUser := os.Getenv("REDIS_USERNAME"); envUser != "" {
config.Config.Redis.Username = envUser
+48 -12
View File
@@ -32,33 +32,57 @@ import (
)
type FriendDatabase interface {
// 检查user2是否在user1的好友列表中(inUser1Friends==true) 检查user1是否在user2的好友列表中(inUser2Friends==true)
// CheckIn checks if user2 is in user1's friend list (inUser1Friends==true) and if user1 is in user2's friend list (inUser2Friends==true)
CheckIn(ctx context.Context, user1, user2 string) (inUser1Friends bool, inUser2Friends bool, err error)
// 增加或者更新好友申请
// AddFriendRequest adds or updates a friend request
AddFriendRequest(ctx context.Context, fromUserID, toUserID string, reqMsg string, ex string) (err error)
// 先判断是否在好友表,如果在则不插入
// BecomeFriends first checks if the users are already in the friends table; if not, it inserts them as friends
BecomeFriends(ctx context.Context, ownerUserID string, friendUserIDs []string, addSource int32) (err error)
// 拒绝好友申请
// RefuseFriendRequest refuses a friend request
RefuseFriendRequest(ctx context.Context, friendRequest *relation.FriendRequestModel) (err error)
// 同意好友申请
// AgreeFriendRequest accepts a friend request
AgreeFriendRequest(ctx context.Context, friendRequest *relation.FriendRequestModel) (err error)
// 删除好友
// Delete removes a friend or friends from the owner's friend list
Delete(ctx context.Context, ownerUserID string, friendUserIDs []string) (err error)
// 更新好友备注
// UpdateRemark updates the remark for a friend
UpdateRemark(ctx context.Context, ownerUserID, friendUserID, remark string) (err error)
// 获取ownerUserID的好友列表
// PageOwnerFriends retrieves the friend list of ownerUserID with pagination
PageOwnerFriends(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendModel, err error)
// friendUserID在哪些人的好友列表中
// PageInWhoseFriends finds the users who have friendUserID in their friend list with pagination
PageInWhoseFriends(ctx context.Context, friendUserID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendModel, err error)
// 获取我发出去的好友申请
// PageFriendRequestFromMe retrieves the friend requests sent by the user with pagination
PageFriendRequestFromMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendRequestModel, err error)
// 获取我收到的的好友申请
// PageFriendRequestToMe retrieves the friend requests received by the user with pagination
PageFriendRequestToMe(ctx context.Context, userID string, pagination pagination.Pagination) (total int64, friends []*relation.FriendRequestModel, err error)
// 获取某人指定好友的信息
// FindFriendsWithError fetches specified friends of a user and returns an error if any do not exist
FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*relation.FriendModel, err error)
// FindFriendUserIDs retrieves the friend IDs of a user
FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error)
// FindBothFriendRequests finds friend requests sent and received
FindBothFriendRequests(ctx context.Context, fromUserID, toUserID string) (friends []*relation.FriendRequestModel, err error)
// UpdateFriendPinStatus updates the pinned status of a friend
UpdateFriendPinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error)
// UpdateFriendRemark updates the remark for a friend
UpdateFriendRemark(ctx context.Context, ownerUserID string, friendUserID string, remark string) (err error)
// UpdateFriendEx updates the 'ex' field for a friend
UpdateFriendEx(ctx context.Context, ownerUserID string, friendUserID string, ex string) (err error)
}
type friendDatabase struct {
@@ -305,3 +329,15 @@ func (f *friendDatabase) UpdateFriendPinStatus(ctx context.Context, ownerUserID
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
func (f *friendDatabase) UpdateFriendRemark(ctx context.Context, ownerUserID string, friendUserID string, remark string) (err error) {
if err := f.friend.UpdateFriendRemark(ctx, ownerUserID, friendUserID, remark); err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
func (f *friendDatabase) UpdateFriendEx(ctx context.Context, ownerUserID string, friendUserID string, ex string) (err error) {
if err := f.friend.UpdateFriendEx(ctx, ownerUserID, friendUserID, ex); err != nil {
return err
}
return f.cache.DelFriend(ownerUserID, friendUserID).ExecDel(ctx)
}
+16
View File
@@ -98,6 +98,7 @@ type CommonMsgDatabase interface {
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int32, msgData []*sdkws.MsgData, err error)
FindOneByDocIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error)
// to mq
MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error
@@ -1051,6 +1052,21 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
return total, totalMsgs, nil
}
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
totalMsgs := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs {
seq := seqs[conversationID]
docID := db.msg.GetDocID(conversationID, seq)
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil {
return nil, err
}
index := db.msg.GetMsgIndex(seq)
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
}
return totalMsgs, nil
}
func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) {
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}
+10
View File
@@ -35,6 +35,8 @@ type S3Database interface {
CompleteMultipartUpload(ctx context.Context, uploadID string, parts []string) (*cont.UploadResult, error)
AccessURL(ctx context.Context, name string, expire time.Duration, opt *s3.AccessURLOption) (time.Time, string, error)
SetObject(ctx context.Context, info *relation.ObjectModel) error
StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error)
}
func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj relation.ObjectInfoModelInterface) S3Database {
@@ -100,3 +102,11 @@ func (s *s3Database) AccessURL(ctx context.Context, name string, expire time.Dur
}
return expireTime, rawURL, nil
}
func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) {
return s.s3.StatObject(ctx, name)
}
func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return s.s3.FormData(ctx, name, size, contentType, duration)
}
+7
View File
@@ -38,6 +38,8 @@ type UserDatabase interface {
FindWithError(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// Find Get the information of the specified user If the userID is not found, no error will be returned
Find(ctx context.Context, userIDs []string) (users []*relation.UserModel, err error)
// Find userInfo By Nickname
FindByNickname(ctx context.Context, nickname string) (users []*relation.UserModel, err error)
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db
Create(ctx context.Context, users []*relation.UserModel) (err error)
// Update update (non-zero value) external guarantee userID exists
@@ -133,6 +135,11 @@ func (u *userDatabase) Find(ctx context.Context, userIDs []string) (users []*rel
return u.cache.GetUsersInfo(ctx, userIDs)
}
// Find userInfo By Nickname
func (u *userDatabase) FindByNickname(ctx context.Context, nickname string) (users []*relation.UserModel, err error) {
return u.userDB.TakeByNickname(ctx, nickname)
}
// Create Insert multiple external guarantees that the userID is not repeated and does not exist in the db.
func (u *userDatabase) Create(ctx context.Context, users []*relation.UserModel) (err error) {
return u.tx.Transaction(ctx, func(ctx context.Context) error {
+30
View File
@@ -160,3 +160,33 @@ func (f *FriendMgo) UpdatePinStatus(ctx context.Context, ownerUserID string, fri
return nil
}
func (f *FriendMgo) UpdateFriendRemark(ctx context.Context, ownerUserID string, friendUserID string, remark string) (err error) {
filter := bson.M{"owner_user_id": ownerUserID, "friend_user_id": friendUserID}
// Create an update operation to set the "is_pinned" field to isPinned for all documents.
update := bson.M{"$set": bson.M{"remark": remark}}
// Perform the update operation for all documents in the collection.
_, err = f.coll.UpdateMany(ctx, filter, update)
if err != nil {
return errs.Wrap(err, "update remark error")
}
return nil
}
func (f *FriendMgo) UpdateFriendEx(ctx context.Context, ownerUserID string, friendUserID string, ex string) (err error) {
filter := bson.M{"owner_user_id": ownerUserID, "friend_user_id": friendUserID}
// Create an update operation to set the "is_pinned" field to isPinned for all documents.
update := bson.M{"$set": bson.M{"ex": ex}}
// Perform the update operation for all documents in the collection.
_, err = f.coll.UpdateMany(ctx, filter, update)
if err != nil {
return errs.Wrap(err, "update ex error")
}
return nil
}
+4
View File
@@ -65,6 +65,10 @@ func (u *UserMgo) Take(ctx context.Context, userID string) (user *relation.UserM
return mgoutil.FindOne[*relation.UserModel](ctx, u.coll, bson.M{"user_id": userID})
}
func (u *UserMgo) TakeByNickname(ctx context.Context, nickname string) (user []*relation.UserModel, err error) {
return mgoutil.Find[*relation.UserModel](ctx, u.coll, bson.M{"nickname": nickname})
}
func (u *UserMgo) Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*relation.UserModel, err error) {
return mgoutil.FindPage[*relation.UserModel](ctx, u.coll, bson.M{}, pagination)
}
+1
View File
@@ -17,6 +17,7 @@ package cont
const (
hashPath = "openim/data/hash/"
tempPath = "openim/temp/"
DirectPath = "openim/direct"
UploadTypeMultipart = 1 // 分片上传
UploadTypePresigned = 2 // 预签名上传
partSeparator = ","
+4
View File
@@ -279,3 +279,7 @@ func (c *Controller) AccessURL(ctx context.Context, name string, expire time.Dur
}
return c.impl.AccessURL(ctx, name, expire, opt)
}
func (c *Controller) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
return c.impl.FormData(ctx, name, size, contentType, duration)
}
+69
View File
@@ -16,6 +16,11 @@ package cos
import (
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
@@ -44,6 +49,8 @@ const (
imageWebp = "webp"
)
const successCode = http.StatusOK
const (
videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg"
@@ -326,3 +333,65 @@ func (c *Cos) getPresignedURL(ctx context.Context, name string, expire time.Dura
}
return c.client.Object.GetObjectURL(name), nil
}
func (c *Cos) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
// https://cloud.tencent.com/document/product/436/14690
now := time.Now()
expiration := now.Add(duration)
keyTime := fmt.Sprintf("%d;%d", now.Unix(), expiration.Unix())
conditions := []any{
map[string]string{"q-sign-algorithm": "sha1"},
map[string]string{"q-ak": c.credential.SecretID},
map[string]string{"q-sign-time": keyTime},
map[string]string{"key": name},
}
if contentType != "" {
conditions = append(conditions, map[string]string{"Content-Type": contentType})
}
policy := map[string]any{
"expiration": expiration.Format("2006-01-02T15:04:05.000Z"),
"conditions": conditions,
}
policyJson, err := json.Marshal(policy)
if err != nil {
return nil, err
}
signKey := hmacSha1val(c.credential.SecretKey, keyTime)
strToSign := sha1val(string(policyJson))
signature := hmacSha1val(signKey, strToSign)
fd := &s3.FormData{
URL: c.client.BaseURL.BucketURL.String(),
File: "file",
Expires: expiration,
FormData: map[string]string{
"policy": base64.StdEncoding.EncodeToString(policyJson),
"q-sign-algorithm": "sha1",
"q-ak": c.credential.SecretID,
"q-key-time": keyTime,
"q-signature": signature,
"key": name,
"success_action_status": strconv.Itoa(successCode),
},
SuccessCodes: []int{successCode},
}
if contentType != "" {
fd.FormData["Content-Type"] = contentType
}
if c.credential.SessionToken != "" {
fd.FormData["x-cos-security-token"] = c.credential.SessionToken
}
return fd, nil
}
func hmacSha1val(key, msg string) string {
v := hmac.New(sha1.New, []byte(key))
v.Write([]byte(msg))
return hex.EncodeToString(v.Sum(nil))
}
func sha1val(msg string) string {
sha1Hash := sha1.New()
sha1Hash.Write([]byte(msg))
return hex.EncodeToString(sha1Hash.Sum(nil))
}
+50
View File
@@ -57,6 +57,8 @@ const (
imageThumbnailPath = "openim/thumbnail"
)
const successCode = http.StatusOK
func NewMinio(cache cache.MinioCache) (s3.Interface, error) {
u, err := url.Parse(config.Config.Object.Minio.Endpoint)
if err != nil {
@@ -441,3 +443,51 @@ func (m *Minio) getObjectData(ctx context.Context, name string, limit int64) ([]
}
return io.ReadAll(io.LimitReader(object, limit))
}
func (m *Minio) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
if err := m.initMinio(ctx); err != nil {
return nil, err
}
policy := minio.NewPostPolicy()
if err := policy.SetKey(name); err != nil {
return nil, err
}
expires := time.Now().Add(duration)
if err := policy.SetExpires(expires); err != nil {
return nil, err
}
if size > 0 {
if err := policy.SetContentLengthRange(0, size); err != nil {
return nil, err
}
}
if err := policy.SetSuccessStatusAction(strconv.Itoa(successCode)); err != nil {
return nil, err
}
if contentType != "" {
if err := policy.SetContentType(contentType); err != nil {
return nil, err
}
}
if err := policy.SetBucket(m.bucket); err != nil {
return nil, err
}
u, fd, err := m.core.PresignedPostPolicy(ctx, policy)
if err != nil {
return nil, err
}
sign, err := url.Parse(m.signEndpoint)
if err != nil {
return nil, err
}
u.Scheme = sign.Scheme
u.Host = sign.Host
return &s3.FormData{
URL: u.String(),
File: "file",
Header: nil,
FormData: fd,
Expires: expires,
SuccessCodes: []int{successCode},
}, nil
}
+49
View File
@@ -16,8 +16,13 @@ package oss
import (
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"reflect"
@@ -45,6 +50,8 @@ const (
imageWebp = "webp"
)
const successCode = http.StatusOK
const (
videoSnapshotImagePng = "png"
videoSnapshotImageJpg = "jpg"
@@ -327,3 +334,45 @@ func (o *OSS) AccessURL(ctx context.Context, name string, expire time.Duration,
params := getURLParams(*o.bucket.Client.Conn, rawParams)
return getURL(o.um, o.bucket.BucketName, name, params).String(), nil
}
func (o *OSS) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) {
// https://help.aliyun.com/zh/oss/developer-reference/postobject?spm=a2c4g.11186623.0.0.1cb83cebkP55nn
expires := time.Now().Add(duration)
conditions := []any{
map[string]string{"bucket": o.bucket.BucketName},
map[string]string{"key": name},
}
if size > 0 {
conditions = append(conditions, []any{"content-length-range", 0, size})
}
policy := map[string]any{
"expiration": expires.Format("2006-01-02T15:04:05.000Z"),
"conditions": conditions,
}
policyJson, err := json.Marshal(policy)
if err != nil {
return nil, err
}
policyStr := base64.StdEncoding.EncodeToString(policyJson)
h := hmac.New(sha1.New, []byte(o.credentials.GetAccessKeySecret()))
if _, err := io.WriteString(h, policyStr); err != nil {
return nil, err
}
fd := &s3.FormData{
URL: o.bucketURL,
File: "file",
Expires: expires,
FormData: map[string]string{
"key": name,
"policy": policyStr,
"OSSAccessKeyId": o.credentials.GetAccessKeyID(),
"success_action_status": strconv.Itoa(successCode),
"signature": base64.StdEncoding.EncodeToString(h.Sum(nil)),
},
SuccessCodes: []int{successCode},
}
if contentType != "" {
fd.FormData["x-oss-content-type"] = contentType
}
return fd, nil
}
+11
View File
@@ -74,6 +74,15 @@ type CopyObjectInfo struct {
ETag string `json:"etag"`
}
type FormData struct {
URL string `json:"url"`
File string `json:"file"`
Header http.Header `json:"header"`
FormData map[string]string `json:"form"`
Expires time.Time `json:"expires"`
SuccessCodes []int `json:"successActionStatus"`
}
type SignPart struct {
PartNumber int `json:"partNumber"`
URL string `json:"url"`
@@ -152,4 +161,6 @@ type Interface interface {
ListUploadedParts(ctx context.Context, uploadID string, name string, partNumberMarker int, maxParts int) (*ListUploadedPartsResult, error)
AccessURL(ctx context.Context, name string, expire time.Duration, opt *AccessURLOption) (string, error)
FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*FormData, error)
}
+4
View File
@@ -59,4 +59,8 @@ type FriendModelInterface interface {
FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error)
// UpdatePinStatus update friend's pin status
UpdatePinStatus(ctx context.Context, ownerUserID string, friendUserID string, isPinned bool) (err error)
// UpdateFriendRemark update friend's remark
UpdateFriendRemark(ctx context.Context, ownerUserID string, friendUserID string, remark string) (err error)
// UpdateFriendEx update friend's ex
UpdateFriendEx(ctx context.Context, ownerUserID string, friendUserID string, ex string) (err error)
}
+1
View File
@@ -53,6 +53,7 @@ type UserModelInterface interface {
UpdateByMap(ctx context.Context, userID string, args map[string]any) (err error)
Find(ctx context.Context, userIDs []string) (users []*UserModel, err error)
Take(ctx context.Context, userID string) (user *UserModel, err error)
TakeByNickname(ctx context.Context, nickname string) (user []*UserModel, err error)
Page(ctx context.Context, pagination pagination.Pagination) (count int64, users []*UserModel, err error)
Exist(ctx context.Context, userID string) (exist bool, err error)
GetAllUserID(ctx context.Context, pagination pagination.Pagination) (count int64, userIDs []string, err error)
+4
View File
@@ -74,6 +74,10 @@ func buildMongoURI() string {
return uri
}
if config.Config.Mongo.Uri != "" {
return config.Config.Mongo.Uri
}
username := os.Getenv("MONGO_USERNAME")
password := os.Getenv("MONGO_PASSWORD")
address := os.Getenv("MONGO_ADDRESS")
@@ -24,7 +24,8 @@ import (
func setupTestEnvironment() {
os.Setenv("ZOOKEEPER_SCHEMA", "openim")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1:12181")
os.Setenv("ZOOKEEPER_ADDRESS", "172.28.0.1")
os.Setenv("ZOOKEEPER_PORT", "12181")
os.Setenv("ZOOKEEPER_USERNAME", "")
os.Setenv("ZOOKEEPER_PASSWORD", "")
}
@@ -52,10 +52,18 @@ func getEnv(key, fallback string) string {
return fallback
}
// getZkAddrFromEnv returns the value of an environment variable if it exists, otherwise it returns the fallback value.
// getZkAddrFromEnv returns the Zookeeper addresses combined from the ZOOKEEPER_ADDRESS and ZOOKEEPER_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getZkAddrFromEnv(fallback []string) []string {
if value, exists := os.LookupEnv("ZOOKEEPER_ADDRESS"); exists {
return strings.Split(value, ",")
address, addrExists := os.LookupEnv("ZOOKEEPER_ADDRESS")
port, portExists := os.LookupEnv("ZOOKEEPER_PORT")
if addrExists && portExists {
addresses := strings.Split(address, ",")
for i, addr := range addresses {
addresses[i] = addr + ":" + port
}
return addresses
}
return fallback
}
+1 -1
View File
@@ -432,7 +432,7 @@ func computeApproximateRequestSize(r *http.Request) int {
}
s += len(r.Host)
// r.Form and r.MultipartForm are assumed to be included in r.URL.
// r.FormData and r.MultipartForm are assumed to be included in r.URL.
if r.ContentLength != -1 {
s += int(r.ContentLength)
+6 -7
View File
@@ -31,15 +31,14 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
const (
maxRetry = 10 // Maximum number of retries for producer creation
)
const maxRetry = 10 // number of retries
var errEmptyMsg = errors.New("binary msg is empty")
var errEmptyMsg = errors.New("kafka binary msg is empty")
// Producer represents a Kafka producer.
type Producer struct {
topic string
addr []string
topic string
config *sarama.Config
producer sarama.SyncProducer
}
@@ -68,7 +67,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
// Get Kafka configuration from environment variables or fallback to config file
kafkaUsername := getEnvOrConfig("KAFKA_USERNAME", config.Config.Kafka.Username)
kafkaPassword := getEnvOrConfig("KAFKA_PASSWORD", config.Config.Kafka.Password)
kafkaAddr := getEnvOrConfig("KAFKA_ADDRESS", addr[0]) // Assuming addr[0] contains address from config
kafkaAddr := getKafkaAddrFromEnv(addr) // Updated to use the new function
// Configure SASL authentication if credentials are provided
if kafkaUsername != "" && kafkaPassword != "" {
@@ -78,7 +77,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
}
// Set the Kafka address
p.addr = []string{kafkaAddr}
p.addr = kafkaAddr
// Set up TLS configuration (if required)
SetupTLSConfig(p.config)
+19
View File
@@ -15,7 +15,9 @@
package kafka
import (
"fmt"
"os"
"strings"
"github.com/IBM/sarama"
@@ -44,3 +46,20 @@ func getEnvOrConfig(envName string, configValue string) string {
}
return configValue
}
// getKafkaAddrFromEnv returns the Kafka addresses combined from the KAFKA_ADDRESS and KAFKA_PORT environment variables.
// If the environment variables are not set, it returns the fallback value.
func getKafkaAddrFromEnv(fallback []string) []string {
envAddr := os.Getenv("KAFKA_ADDRESS")
envPort := os.Getenv("KAFKA_PORT")
if envAddr != "" && envPort != "" {
addresses := strings.Split(envAddr, ",")
for i, addr := range addresses {
addresses[i] = fmt.Sprintf("%s:%s", addr, envPort)
}
return addresses
}
return fallback
}
+1 -12
View File
@@ -30,10 +30,9 @@ func NewOptions(opts ...OptionsOpt) Options {
options[constant.IsOfflinePush] = false
options[constant.IsUnreadCount] = false
options[constant.IsConversationUpdate] = false
options[constant.IsSenderSync] = false
options[constant.IsSenderSync] = true
options[constant.IsNotPrivate] = false
options[constant.IsSenderConversationUpdate] = false
options[constant.IsSenderNotificationPush] = false
options[constant.IsReactionFromCache] = false
for _, opt := range opts {
opt(options)
@@ -114,12 +113,6 @@ func WithSenderConversationUpdate() OptionsOpt {
}
}
func WithSenderNotificationPush() OptionsOpt {
return func(options Options) {
options[constant.IsSenderNotificationPush] = true
}
}
func WithReactionFromCache() OptionsOpt {
return func(options Options) {
options[constant.IsReactionFromCache] = true
@@ -174,10 +167,6 @@ func (o Options) IsSenderConversationUpdate() bool {
return o.Is(constant.IsSenderConversationUpdate)
}
func (o Options) IsSenderNotificationPush() bool {
return o.Is(constant.IsSenderNotificationPush)
}
func (o Options) IsReactionFromCache() bool {
return o.Is(constant.IsReactionFromCache)
}
+35 -1
View File
@@ -17,7 +17,6 @@ package rpcclient
import (
"context"
"encoding/json"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
@@ -68,6 +67,7 @@ func newContentTypeConf() map[int32]config.NotificationConf {
constant.BlackAddedNotification: config.Config.Notification.BlackAdded,
constant.BlackDeletedNotification: config.Config.Notification.BlackDeleted,
constant.FriendInfoUpdatedNotification: config.Config.Notification.FriendInfoUpdated,
constant.FriendsInfoUpdateNotification: config.Config.Notification.FriendInfoUpdated, //use the same FriendInfoUpdated
// conversation
constant.ConversationChangeNotification: config.Config.Notification.ConversationChanged,
constant.ConversationUnreadNotification: config.Config.Notification.ConversationChanged,
@@ -115,6 +115,7 @@ func newSessionTypeConf() map[int32]int32 {
constant.BlackAddedNotification: constant.SingleChatType,
constant.BlackDeletedNotification: constant.SingleChatType,
constant.FriendInfoUpdatedNotification: constant.SingleChatType,
constant.FriendsInfoUpdateNotification: constant.SingleChatType,
// conversation
constant.ConversationChangeNotification: constant.SingleChatType,
constant.ConversationUnreadNotification: constant.SingleChatType,
@@ -155,6 +156,30 @@ func (m *MessageRpcClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqRe
return resp, err
}
func (m *MessageRpcClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) {
log.ZDebug(ctx, "GetMaxSeqs", "conversationIDs", conversationIDs)
resp, err := m.Client.GetMaxSeqs(ctx, &msg.GetMaxSeqsReq{
ConversationIDs: conversationIDs,
})
return resp.MaxSeqs, err
}
func (m *MessageRpcClient) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
resp, err := m.Client.GetHasReadSeqs(ctx, &msg.GetHasReadSeqsReq{
UserID: userID,
ConversationIDs: conversationIDs,
})
return resp.MaxSeqs, err
}
func (m *MessageRpcClient) GetMsgByConversationIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
resp, err := m.Client.GetMsgByConversationIDs(ctx, &msg.GetMsgByConversationIDsReq{
ConversationIDs: docIDs,
MaxSeqs: seqs,
})
return resp.MsgDatas, err
}
func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
resp, err := m.Client.PullMessageBySeqs(ctx, req)
return resp, err
@@ -256,6 +281,7 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
optionsConfig.ReliabilityLevel = constant.UnreliableNotification
}
options := config.GetOptionsByNotification(optionsConfig)
s.SetOptionsByContentType(ctx, options, contentType)
msg.Options = options
offlineInfo.Title = title
offlineInfo.Desc = desc
@@ -274,3 +300,11 @@ func (s *NotificationSender) NotificationWithSesstionType(ctx context.Context, s
func (s *NotificationSender) Notification(ctx context.Context, sendID, recvID string, contentType int32, m proto.Message, opts ...NotificationOptions) error {
return s.NotificationWithSesstionType(ctx, sendID, recvID, contentType, s.sessionTypeConf[contentType], m, opts...)
}
func (s *NotificationSender) SetOptionsByContentType(_ context.Context, options map[string]bool, contentType int32) {
switch contentType {
case constant.UserStatusChangeNotification:
options[constant.IsSenderSync] = false
default:
}
}
+6 -1
View File
@@ -196,7 +196,12 @@ func (f *FriendNotificationSender) FriendRemarkSetNotification(ctx context.Conte
tips.FromToUserID.ToUserID = toUserID
return f.Notification(ctx, fromUserID, toUserID, constant.FriendRemarkSetNotification, &tips)
}
func (f *FriendNotificationSender) FriendsInfoUpdateNotification(ctx context.Context, toUserID string, friendIDs []string) error {
tips := sdkws.FriendsInfoUpdateTips{}
tips.FromToUserID.ToUserID = toUserID
tips.FriendIDs = friendIDs
return f.Notification(ctx, toUserID, toUserID, constant.FriendsInfoUpdateNotification, &tips)
}
func (f *FriendNotificationSender) BlackAddedNotification(ctx context.Context, req *pbfriend.AddBlackReq) error {
tips := sdkws.BlackAddedTips{FromToUserID: &sdkws.FromToUserID{}}
tips.FromToUserID.FromUserID = req.OwnerUserID
+12 -9
View File
@@ -30,29 +30,32 @@ OPENIM_VERBOSE=4
openim::log::info "\n# Begin to check all openim service"
# OpenIM status
# Elegant printing function
# Elegant printing function
print_services_and_ports() {
service_names=("$1[@]")
service_ports=("$2[@]")
local service_names=("$@")
local half_length=$((${#service_names[@]} / 2))
local service_ports=("${service_names[@]:half_length}")
echo "+-------------------------+----------+"
echo "| Service Name | Port |"
echo "+-------------------------+----------+"
for index in "${!service_names}"; do
printf "| %-23s | %-8s |\n" "${!service_names[$index]}" "${!service_ports[$index]}"
for ((index=0; index < half_length; index++)); do
printf "| %-23s | %-8s |\n" "${service_names[$index]}" "${service_ports[$index]}"
done
echo "+-------------------------+----------+"
}
# Assuming OPENIM_SERVER_NAME_TARGETS and OPENIM_SERVER_PORT_TARGETS are defined
# Similarly for OPENIM_DEPENDENCY_TARGETS and OPENIM_DEPENDENCY_PORT_TARGETS
# Print out services and their ports
print_services_and_ports OPENIM_SERVER_NAME_TARGETS OPENIM_SERVER_PORT_TARGETS
print_services_and_ports "${OPENIM_SERVER_NAME_TARGETS[@]}" "${OPENIM_SERVER_PORT_TARGETS[@]}"
# Print out dependencies and their ports
print_services_and_ports OPENIM_DEPENDENCY_TARGETS OPENIM_DEPENDENCY_PORT_TARGETS
print_services_and_ports "${OPENIM_DEPENDENCY_TARGETS[@]}" "${OPENIM_DEPENDENCY_PORT_TARGETS[@]}"
# OpenIM check
echo "++ The port being checked: ${OPENIM_SERVER_PORT_LISTARIES[@]}"
@@ -89,4 +92,4 @@ else
echo "++++ Check all openim service ports successfully !"
fi
set -e
set -e
+2 -3
View File
@@ -285,7 +285,6 @@ readonly ALERTMANAGER_SEND_RESOLVED=${ALERTMANAGER_SEND_RESOLVED:-"{SEND_RESOLVE
###################### Grafana 配置信息 ######################
def "GRAFANA_PORT" "13000" # Grafana的端口
def "GRAFANA_ADDRESS" "${DOCKER_BRIDGE_GATEWAY}" # Grafana的地址
###################### RPC Port Configuration Variables ######################
# For launching multiple programs, just fill in multiple ports separated by commas
# For example:
@@ -378,8 +377,8 @@ def "CALLBACK_TIMEOUT" "5" # 最长超时时间
def "CALLBACK_FAILED_CONTINUE" "true" # 失败后是否继续
###################### Prometheus 配置信息 ######################
# 是否启用 Prometheus
readonly PROMETHEUS_ENABLE=${PROMETHEUS_ENABLE:-'false'}
def "PROMETHEUS_URL" "${GRAFANA_ADDRESS}:${GRAFANA_PORT}"
readonly PROMETHEUS_ENABLE=${PROMETHEUS_ENABLE:-'true'}
readonly GRAFANA_URL=${GRAFANA_URL:-"http://${OPENIM_IP}:${GRAFANA_PORT}/"}
# Api 服务的 Prometheus 端口
readonly API_PROM_PORT=${API_PROM_PORT:-'20100'}
# User 服务的 Prometheus 端口
+17 -9
View File
@@ -16,6 +16,7 @@ package main
import (
"context"
"errors"
"flag"
"fmt"
"net"
@@ -222,8 +223,8 @@ func checkMinio() (string, error) {
defer cancel()
if minioClient.IsOffline() {
str := fmt.Sprintf("Minio server is offline;%s", str)
return "", ErrComponentStart.Wrap(str)
// str := fmt.Sprintf("Minio server is offline;%s", str)
// return "", ErrComponentStart.Wrap(str)
}
// Check for localhost in API URL and Minio SignEndpoint
@@ -285,10 +286,23 @@ func checkZookeeper() (string, error) {
// Connect to Zookeeper
str := "the addr is:" + address
c, _, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary
c, eventChan, err := zk.Connect(zookeeperAddresses, time.Second) // Adjust the timeout as necessary
if err != nil {
return "", errs.Wrap(errStr(err, str))
}
timeout := time.After(5 * time.Second)
for {
select {
case event := <-eventChan:
if event.State == zk.StateConnected {
fmt.Println("Connected to Zookeeper")
goto Connected
}
case <-timeout:
return "", errs.Wrap(errors.New("timeout waiting for Zookeeper connection"), "Zookeeper Addr: "+strings.Join(config.Config.Zookeeper.ZkAddr, " "))
}
}
Connected:
defer c.Close()
// Set authentication if username and password are provided
@@ -298,12 +312,6 @@ func checkZookeeper() (string, error) {
}
}
// Check if Zookeeper is reachable
_, _, err = c.Get("/")
if err != nil {
return "", errs.Wrap(err, str)
}
return str, nil
}
-12
View File
@@ -21,22 +21,10 @@ import (
"time"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
)
func TestCheckMysql(t *testing.T) {
err := mockInitCfg()
assert.NoError(t, err, "Initialization should not produce errors")
err = checkMysql()
if err != nil {
// You might expect an error if MySQL isn't running locally with the mock credentials.
t.Logf("Expected error due to mock configuration: %v", err)
}
}
// Mock for initCfg for testing purpose
func mockInitCfg() error {
config.Config.Mysql.Username = "root"