mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-02 16:15:59 +08:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 13b929444a |
@@ -4,80 +4,45 @@ on:
|
|||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
- release-*
|
- release-*
|
||||||
|
# tags:
|
||||||
|
# - 'v*'
|
||||||
|
|
||||||
release:
|
release:
|
||||||
types: [published]
|
types: [published]
|
||||||
|
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
inputs:
|
inputs:
|
||||||
tag:
|
tag:
|
||||||
description: "Tag version to be used for Docker image"
|
description: "Tag version to be used for Docker image"
|
||||||
required: true
|
required: true
|
||||||
default: "v3.8.3"
|
default: "v3.8.0"
|
||||||
|
|
||||||
env:
|
|
||||||
GO_VERSION: "1.22"
|
|
||||||
IMAGE_NAME: "openim-server"
|
|
||||||
# IMAGE_NAME: ${{ github.event.repository.name }}
|
|
||||||
DOCKER_BUILDKIT: 1
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
publish-docker-images:
|
build-and-test:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.merged == false) }}
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout main repository
|
- uses: actions/checkout@v4
|
||||||
uses: actions/checkout@v4
|
|
||||||
with:
|
with:
|
||||||
path: main-repo
|
path: main-repo
|
||||||
|
|
||||||
- name: Set up QEMU
|
- name: Set up QEMU
|
||||||
uses: docker/setup-qemu-action@v3.3.0
|
uses: docker/setup-qemu-action@v3
|
||||||
|
|
||||||
- name: Set up Docker Buildx
|
- name: Set up Docker Buildx
|
||||||
id: buildx
|
|
||||||
uses: docker/setup-buildx-action@v3
|
uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: Build Docker image
|
||||||
|
id: build
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
with:
|
with:
|
||||||
driver-opts: network=host
|
context: ./main-repo
|
||||||
|
load: true
|
||||||
|
tags: "openim/openim-server:local"
|
||||||
|
cache-from: type=gha
|
||||||
|
cache-to: type=gha,mode=max
|
||||||
|
|
||||||
- name: Extract metadata for Docker
|
- name: Save Docker image to file
|
||||||
id: meta
|
run: docker save -o image.tar openim/openim-server:local
|
||||||
uses: docker/metadata-action@v5.6.0
|
|
||||||
with:
|
|
||||||
images: |
|
|
||||||
${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}
|
|
||||||
ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}
|
|
||||||
registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}
|
|
||||||
tags: |
|
|
||||||
type=ref,event=tag
|
|
||||||
type=schedule
|
|
||||||
type=ref,event=branch
|
|
||||||
type=ref,event=pr
|
|
||||||
type=semver,pattern={{version}}
|
|
||||||
type=semver,pattern=v{{version}}
|
|
||||||
type=semver,pattern={{major}}.{{minor}}
|
|
||||||
type=semver,pattern={{major}}
|
|
||||||
type=sha
|
|
||||||
|
|
||||||
- name: Install skopeo
|
|
||||||
run: |
|
|
||||||
sudo apt-get update && sudo apt-get install -y skopeo
|
|
||||||
|
|
||||||
- name: Build multi-arch images as OCI
|
|
||||||
run: |
|
|
||||||
mkdir -p /tmp/oci-image /tmp/docker-cache
|
|
||||||
|
|
||||||
# Build multi-architecture image and save in OCI format
|
|
||||||
docker buildx build \
|
|
||||||
--platform linux/amd64,linux/arm64 \
|
|
||||||
--output type=oci,dest=/tmp/oci-image/multi-arch.tar \
|
|
||||||
--cache-to type=local,dest=/tmp/docker-cache \
|
|
||||||
--cache-from type=gha \
|
|
||||||
./main-repo
|
|
||||||
|
|
||||||
# Use skopeo to convert the amd64 image from OCI format to Docker format and load it
|
|
||||||
skopeo copy --override-arch amd64 oci-archive:/tmp/oci-image/multi-arch.tar docker-daemon:${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local
|
|
||||||
|
|
||||||
# check image
|
|
||||||
docker image ls | grep openim
|
|
||||||
|
|
||||||
- name: Checkout compose repository
|
- name: Checkout compose repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
@@ -90,19 +55,18 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
IP=$(hostname -I | awk '{print $1}')
|
IP=$(hostname -I | awk '{print $1}')
|
||||||
echo "The IP Address is: $IP"
|
echo "The IP Address is: $IP"
|
||||||
echo "ip=$IP" >> $GITHUB_OUTPUT
|
echo "::set-output name=ip::$IP"
|
||||||
|
|
||||||
- name: Update .env to use the local image
|
- name: Update .env to use the local image
|
||||||
run: |
|
run: |
|
||||||
sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local|' ${{ github.workspace }}/compose-repo/.env
|
sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=openim/openim-server:local|' ${{ github.workspace }}/compose-repo/.env
|
||||||
sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env
|
sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env
|
||||||
|
|
||||||
- name: Start services using Docker Compose
|
- name: Start services using Docker Compose
|
||||||
run: |
|
run: |
|
||||||
cd ${{ github.workspace }}/compose-repo
|
cd ${{ github.workspace }}/compose-repo
|
||||||
docker compose up -d
|
docker compose up -d
|
||||||
|
sleep 60
|
||||||
docker compose ps
|
|
||||||
|
|
||||||
# - name: Check openim-server health
|
# - name: Check openim-server health
|
||||||
# run: |
|
# run: |
|
||||||
@@ -133,62 +97,54 @@ jobs:
|
|||||||
# exit 0
|
# exit 0
|
||||||
# fi
|
# fi
|
||||||
|
|
||||||
|
- name: Load Docker image from file
|
||||||
|
run: docker load -i image.tar
|
||||||
|
|
||||||
|
- name: Extract metadata for Docker (tags, labels)
|
||||||
|
id: meta
|
||||||
|
uses: docker/metadata-action@v5.5.1
|
||||||
|
with:
|
||||||
|
images: |
|
||||||
|
openim/openim-server
|
||||||
|
ghcr.io/openimsdk/openim-server
|
||||||
|
registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server
|
||||||
|
tags: |
|
||||||
|
type=ref,event=tag
|
||||||
|
type=schedule
|
||||||
|
type=ref,event=branch
|
||||||
|
type=semver,pattern={{version}}
|
||||||
|
type=semver,pattern=v{{version}}
|
||||||
|
type=semver,pattern={{major}}.{{minor}}
|
||||||
|
type=semver,pattern={{major}}
|
||||||
|
type=semver,pattern=release-{{raw}}
|
||||||
|
type=sha
|
||||||
|
type=raw,value=${{ github.event.inputs.tag }}
|
||||||
|
|
||||||
- name: Log in to Docker Hub
|
- name: Log in to Docker Hub
|
||||||
uses: docker/login-action@v3.3.0
|
uses: docker/login-action@v2
|
||||||
with:
|
with:
|
||||||
username: ${{ secrets.DOCKER_USERNAME }}
|
username: ${{ secrets.DOCKER_USERNAME }}
|
||||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
password: ${{ secrets.DOCKER_PASSWORD }}
|
||||||
|
|
||||||
- name: Log in to GitHub Container Registry
|
- name: Log in to GitHub Container Registry
|
||||||
uses: docker/login-action@v3.3.0
|
uses: docker/login-action@v2
|
||||||
with:
|
with:
|
||||||
registry: ghcr.io
|
registry: ghcr.io
|
||||||
username: ${{ github.repository_owner }}
|
username: ${{ github.repository_owner }}
|
||||||
password: ${{ secrets.GITHUB_TOKEN }}
|
password: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
|
||||||
- name: Log in to Aliyun Container Registry
|
- name: Log in to Aliyun Container Registry
|
||||||
uses: docker/login-action@v3.3.0
|
uses: docker/login-action@v2
|
||||||
with:
|
with:
|
||||||
registry: registry.cn-hangzhou.aliyuncs.com
|
registry: registry.cn-hangzhou.aliyuncs.com
|
||||||
username: ${{ secrets.ALIREGISTRY_USERNAME }}
|
username: ${{ secrets.ALIREGISTRY_USERNAME }}
|
||||||
password: ${{ secrets.ALIREGISTRY_TOKEN }}
|
password: ${{ secrets.ALIREGISTRY_TOKEN }}
|
||||||
|
|
||||||
- name: Push multi-architecture images
|
- name: Push Docker images
|
||||||
if: success()
|
uses: docker/build-push-action@v5
|
||||||
run: |
|
with:
|
||||||
docker buildx build \
|
context: ./main-repo
|
||||||
--platform linux/amd64,linux/arm64 \
|
push: true
|
||||||
$(echo "${{ steps.meta.outputs.tags }}" | sed 's/,/ --tag /g' | sed 's/^/--tag /') \
|
platforms: linux/amd64,linux/arm64
|
||||||
--cache-from type=local,src=/tmp/docker-cache \
|
tags: ${{ steps.meta.outputs.tags }}
|
||||||
--push \
|
labels: ${{ steps.meta.outputs.labels }}
|
||||||
./main-repo
|
|
||||||
|
|
||||||
- name: Verify multi-platform support
|
|
||||||
run: |
|
|
||||||
images=(
|
|
||||||
"${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}"
|
|
||||||
"ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}"
|
|
||||||
"registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}"
|
|
||||||
)
|
|
||||||
|
|
||||||
for image in "${images[@]}"; do
|
|
||||||
for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n' | cut -d':' -f2); do
|
|
||||||
echo "Verifying multi-arch support for $image:$tag"
|
|
||||||
manifest=$(docker manifest inspect "$image:$tag" || echo "error")
|
|
||||||
if [[ "$manifest" == "error" ]]; then
|
|
||||||
echo "Manifest not found for $image:$tag"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
amd64_found=$(echo "$manifest" | jq '.manifests[] | select(.platform.architecture == "amd64")')
|
|
||||||
arm64_found=$(echo "$manifest" | jq '.manifests[] | select(.platform.architecture == "arm64")')
|
|
||||||
if [[ -z "$amd64_found" ]]; then
|
|
||||||
echo "Multi-platform support check failed for $image:$tag - missing amd64"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
if [[ -z "$arm64_found" ]]; then
|
|
||||||
echo "Multi-platform support check failed for $image:$tag - missing arm64"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "✅ $image:$tag supports both amd64 and arm64 architectures"
|
|
||||||
done
|
|
||||||
done
|
|
||||||
|
|||||||
@@ -8,40 +8,19 @@ jobs:
|
|||||||
update-version:
|
update-version:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
env:
|
env:
|
||||||
TAG_VERSION: ${{ github.event.release.tag_name }}
|
TAG_VERSION: ${{ github.event.release.tag_name }}
|
||||||
steps:
|
steps:
|
||||||
# Step 1: Checkout the original repository's code
|
# Step 1: Checkout the original repository's code
|
||||||
- name: Checkout code
|
- name: Checkout code
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
with:
|
with:
|
||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
# submodules: "recursive"
|
|
||||||
|
|
||||||
- name: Safe submodule initialization
|
|
||||||
run: |
|
|
||||||
echo "Checking for submodules..."
|
|
||||||
if [ -f .gitmodules ]; then
|
|
||||||
if [ -s .gitmodules ]; then
|
|
||||||
echo "Initializing submodules..."
|
|
||||||
if git submodule sync --recursive 2>/dev/null; then
|
|
||||||
git submodule update --init --force --recursive || {
|
|
||||||
echo "Warning: Some submodules failed to initialize, continuing anyway..."
|
|
||||||
}
|
|
||||||
else
|
|
||||||
echo "Warning: Submodule sync failed, continuing without submodules..."
|
|
||||||
fi
|
|
||||||
else
|
|
||||||
echo ".gitmodules exists but is empty, skipping submodule initialization"
|
|
||||||
fi
|
|
||||||
else
|
|
||||||
echo "No .gitmodules file found, no submodules to initialize"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Step 2: Set up Git with official account
|
# Step 2: Set up Git with official account
|
||||||
- name: Set up Git
|
- name: Set up Git
|
||||||
run: |
|
run: |
|
||||||
git config --global user.name "github-actions[bot]"
|
git config user.name "github-actions[bot]"
|
||||||
git config --global user.email "github-actions[bot]@users.noreply.github.com"
|
git config user.email "github-actions[bot]@users.noreply.github.com"
|
||||||
|
|
||||||
# Step 3: Check and delete existing tag
|
# Step 3: Check and delete existing tag
|
||||||
- name: Check and delete existing tag
|
- name: Check and delete existing tag
|
||||||
@@ -54,8 +33,7 @@ jobs:
|
|||||||
# Step 4: Update version file
|
# Step 4: Update version file
|
||||||
- name: Update version file
|
- name: Update version file
|
||||||
run: |
|
run: |
|
||||||
mkdir -p version
|
echo "${{ env.TAG_VERSION }}" > version/version
|
||||||
echo -n "${{ env.TAG_VERSION }}" > version/version
|
|
||||||
|
|
||||||
# Step 5: Commit and push changes
|
# Step 5: Commit and push changes
|
||||||
- name: Commit and push changes
|
- name: Commit and push changes
|
||||||
@@ -64,56 +42,43 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
git add version/version
|
git add version/version
|
||||||
git commit -m "Update version to ${{ env.TAG_VERSION }}"
|
git commit -m "Update version to ${{ env.TAG_VERSION }}"
|
||||||
|
git push origin HEAD:${{ github.ref }}
|
||||||
|
|
||||||
# Step 6: Update tag
|
# Step 6: Create and push tag
|
||||||
- name: Update tag
|
- name: Create and push tag
|
||||||
|
env:
|
||||||
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
run: |
|
run: |
|
||||||
git tag -fa ${{ env.TAG_VERSION }} -m "Update version to ${{ env.TAG_VERSION }}"
|
git tag ${{ env.TAG_VERSION }}
|
||||||
git push origin ${{ env.TAG_VERSION }} --force
|
git push origin ${{ env.TAG_VERSION }}
|
||||||
|
|
||||||
# Step 7: Find and Publish Draft Release
|
# Step 7: Find and Publish Draft Release
|
||||||
- name: Find and Publish Draft Release
|
- name: Find and Publish Draft Release
|
||||||
uses: actions/github-script@v7
|
uses: actions/github-script@v6
|
||||||
with:
|
with:
|
||||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
script: |
|
script: |
|
||||||
const { owner, repo } = context.repo;
|
// Get the list of releases
|
||||||
const tagName = process.env.TAG_VERSION;
|
const releases = await github.rest.repos.listReleases({
|
||||||
|
owner: context.repo.owner,
|
||||||
|
repo: context.repo.repo
|
||||||
|
});
|
||||||
|
|
||||||
try {
|
// Find the draft release where the title and tag_name are the same
|
||||||
let release;
|
const draftRelease = releases.data.find(release =>
|
||||||
try {
|
release.draft && release.name === release.tag_name
|
||||||
const response = await github.rest.repos.getReleaseByTag({
|
);
|
||||||
owner,
|
|
||||||
repo,
|
if (draftRelease) {
|
||||||
tag: tagName
|
// Publish the draft release using the release_id
|
||||||
});
|
|
||||||
release = response.data;
|
|
||||||
} catch (tagError) {
|
|
||||||
core.info(`Release not found by tag, searching all releases...`);
|
|
||||||
const releases = await github.rest.repos.listReleases({
|
|
||||||
owner,
|
|
||||||
repo,
|
|
||||||
per_page: 100
|
|
||||||
});
|
|
||||||
|
|
||||||
release = releases.data.find(r => r.draft && r.tag_name === tagName);
|
|
||||||
if (!release) {
|
|
||||||
throw new Error(`No release found with tag ${tagName}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await github.rest.repos.updateRelease({
|
await github.rest.repos.updateRelease({
|
||||||
owner,
|
owner: context.repo.owner,
|
||||||
repo,
|
repo: context.repo.repo,
|
||||||
release_id: release.id,
|
release_id: draftRelease.id, // Use release_id
|
||||||
draft: false,
|
draft: false
|
||||||
prerelease: release.prerelease
|
|
||||||
});
|
});
|
||||||
|
|
||||||
const status = release.draft ? "was draft" : "was already published";
|
core.info(`Draft Release ${draftRelease.tag_name} published successfully.`);
|
||||||
core.info(`Release ${tagName} ensured to be published (${status}).`);
|
} else {
|
||||||
|
core.info("No matching draft release found.");
|
||||||
} catch (error) {
|
}
|
||||||
core.warning(`Could not find or update release for tag ${tagName}: ${error.message}`);
|
|
||||||
}
|
|
||||||
+4
-56
@@ -63,12 +63,7 @@ services:
|
|||||||
restart: always
|
restart: always
|
||||||
sysctls:
|
sysctls:
|
||||||
net.core.somaxconn: 1024
|
net.core.somaxconn: 1024
|
||||||
command: >
|
command: redis-server /usr/local/redis/config/redis.conf --requirepass openIM123 --appendonly yes
|
||||||
redis-server
|
|
||||||
--requirepass openIM123
|
|
||||||
--appendonly yes
|
|
||||||
--aof-use-rdb-preamble yes
|
|
||||||
--save ""
|
|
||||||
networks:
|
networks:
|
||||||
- openim
|
- openim
|
||||||
|
|
||||||
@@ -104,62 +99,15 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
|
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
|
||||||
TZ: Asia/Shanghai
|
TZ: Asia/Shanghai
|
||||||
# Unique identifier for the Kafka node (required in controller mode)
|
|
||||||
KAFKA_CFG_NODE_ID: 0
|
KAFKA_CFG_NODE_ID: 0
|
||||||
# Defines the roles this Kafka node plays: broker, controller, or both
|
|
||||||
KAFKA_CFG_PROCESS_ROLES: controller,broker
|
KAFKA_CFG_PROCESS_ROLES: controller,broker
|
||||||
# Specifies which nodes are controller nodes for quorum voting.
|
|
||||||
# The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port
|
|
||||||
# The controller listener endpoint here is kafka:9093
|
|
||||||
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
||||||
# Specifies which listener is used for controller-to-controller communication
|
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
|
||||||
|
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
|
||||||
|
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||||
# Default number of partitions for new topics
|
|
||||||
KAFKA_NUM_PARTITIONS: 8
|
KAFKA_NUM_PARTITIONS: 8
|
||||||
# Whether to enable automatic topic creation
|
|
||||||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
||||||
# Kafka internal listeners; Kafka supports multiple ports with different protocols
|
|
||||||
# Each port is used for a specific purpose: INTERNAL for internal broker communication,
|
|
||||||
# CONTROLLER for controller communication, EXTERNAL for external client connections.
|
|
||||||
# These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
|
|
||||||
# In short, Kafka is listening on three logical ports: 9092 for internal communication,
|
|
||||||
# 9093 for controller traffic, and 9094 for external access.
|
|
||||||
KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
|
|
||||||
# Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka',
|
|
||||||
# so other containers can access Kafka via kafka:9092.
|
|
||||||
# EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect.
|
|
||||||
# If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP.
|
|
||||||
KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094"
|
|
||||||
# Maps logical listener names to actual protocols.
|
|
||||||
# Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
|
|
||||||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"
|
|
||||||
# Defines which listener is used for inter-broker communication within the Kafka cluster
|
|
||||||
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
|
|
||||||
|
|
||||||
|
|
||||||
# Authentication configuration variables - comment out to disable auth
|
|
||||||
# KAFKA_USERNAME: "openIM"
|
|
||||||
# KAFKA_PASSWORD: "openIM123"
|
|
||||||
command: >
|
|
||||||
/bin/sh -c '
|
|
||||||
if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then
|
|
||||||
echo "=== Kafka SASL Authentication ENABLED ==="
|
|
||||||
echo "Username: $${KAFKA_USERNAME}"
|
|
||||||
|
|
||||||
# Set environment variables for SASL authentication
|
|
||||||
export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
|
|
||||||
export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
|
|
||||||
export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
|
|
||||||
export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
|
|
||||||
export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
|
|
||||||
export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT"
|
|
||||||
export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}"
|
|
||||||
export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Start Kafka with the configured environment
|
|
||||||
exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
|
|
||||||
'
|
|
||||||
networks:
|
networks:
|
||||||
- openim
|
- openim
|
||||||
|
|
||||||
|
|||||||
+21
-61
@@ -2,14 +2,10 @@ package jssdk
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
||||||
"github.com/openimsdk/protocol/constant"
|
|
||||||
"github.com/openimsdk/tools/log"
|
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/conversation"
|
"github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/jssdk"
|
"github.com/openimsdk/protocol/jssdk"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
@@ -113,7 +109,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if len(conversationIDs) == 0 {
|
if len(conversationIDs) == 0 {
|
||||||
return &jssdk.GetActiveConversationsResp{}, nil
|
return &jssdk.GetActiveConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
|
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
|
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -121,10 +120,6 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if len(activeConversation) == 0 {
|
if len(activeConversation) == 0 {
|
||||||
return &jssdk.GetActiveConversationsResp{}, nil
|
return &jssdk.GetActiveConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
sortConversations := sortActiveConversations{
|
sortConversations := sortActiveConversations{
|
||||||
Conversation: activeConversation,
|
Conversation: activeConversation,
|
||||||
}
|
}
|
||||||
@@ -152,7 +147,6 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
|
|
||||||
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
||||||
return c.ConversationID
|
return c.ConversationID
|
||||||
})
|
})
|
||||||
@@ -162,15 +156,16 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
var lastMsg *sdkws.MsgData
|
||||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||||
resp = append(resp, &jssdk.ConversationMsg{
|
lastMsg = msgList.Msgs[0]
|
||||||
Conversation: conv,
|
|
||||||
LastMsg: msgList.Msgs[0],
|
|
||||||
MaxSeq: c.MaxSeq,
|
|
||||||
ReadSeq: readSeq[c.ConversationID],
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
resp = append(resp, &jssdk.ConversationMsg{
|
||||||
|
Conversation: conv,
|
||||||
|
LastMsg: lastMsg,
|
||||||
|
MaxSeq: c.MaxSeq,
|
||||||
|
ReadSeq: readSeq[c.ConversationID],
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err := x.fillConversations(ctx, resp); err != nil {
|
if err := x.fillConversations(ctx, resp); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -224,18 +219,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
|
|
||||||
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
||||||
for _, c := range conversations {
|
for _, c := range conversations {
|
||||||
|
var lastMsg *sdkws.MsgData
|
||||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||||
resp = append(resp, &jssdk.ConversationMsg{
|
lastMsg = msgList.Msgs[0]
|
||||||
Conversation: c,
|
|
||||||
LastMsg: msgList.Msgs[0],
|
|
||||||
MaxSeq: maxSeqs[c.ConversationID],
|
|
||||||
ReadSeq: readSeqs[c.ConversationID],
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
resp = append(resp, &jssdk.ConversationMsg{
|
||||||
|
Conversation: c,
|
||||||
|
LastMsg: lastMsg,
|
||||||
|
MaxSeq: maxSeqs[c.ConversationID],
|
||||||
|
ReadSeq: readSeqs[c.ConversationID],
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if err := x.fillConversations(ctx, resp); err != nil {
|
if err := x.fillConversations(ctx, resp); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -252,38 +247,3 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
|||||||
UnreadCount: unreadCount,
|
UnreadCount: unreadCount,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function checks whether the latest MaxSeq message is valid.
|
|
||||||
// If not, it needs to fetch a valid message again.
|
|
||||||
func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) {
|
|
||||||
var conversationIDs []string
|
|
||||||
|
|
||||||
for conversationID, message := range messages {
|
|
||||||
allInValid := true
|
|
||||||
for _, data := range message.Msgs {
|
|
||||||
if data.Status < constant.MsgStatusHasDeleted {
|
|
||||||
allInValid = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// when the conversation has been deleted by the user, the length of message.Msgs is empty
|
|
||||||
if allInValid && len(message.Msgs) > 0 {
|
|
||||||
conversationIDs = append(conversationIDs, conversationID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(conversationIDs) > 0 {
|
|
||||||
resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
|
||||||
UserID: userID,
|
|
||||||
ConversationIDs: conversationIDs,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for conversationID, message := range resp.Msgs {
|
|
||||||
messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -244,7 +244,6 @@ func (s *Server) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.Mu
|
|||||||
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
|
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
|
||||||
client := &Client{}
|
client := &Client{}
|
||||||
client.ctx = tempUserCtx
|
client.ctx = tempUserCtx
|
||||||
client.token = req.Token
|
|
||||||
client.UserID = req.UserID
|
client.UserID = req.UserID
|
||||||
client.PlatformID = int(req.PlatformID)
|
client.PlatformID = int(req.PlatformID)
|
||||||
i := &kickHandler{
|
i := &kickHandler{
|
||||||
|
|||||||
@@ -337,51 +337,17 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If reconnect: When multiple msgGateway instances are deployed, a client may disconnect from instance A and reconnect to instance B.
|
|
||||||
// During this process, instance A might still be executing, resulting in two clients with the same token existing simultaneously.
|
|
||||||
// This situation needs to be filtered to prevent duplicate clients.
|
|
||||||
checkSameTokenFunc := func(oldClients []*Client) []*Client {
|
|
||||||
var clientsNeedToKick []*Client
|
|
||||||
|
|
||||||
for _, c := range oldClients {
|
|
||||||
if c.token == newClient.token {
|
|
||||||
log.ZDebug(newClient.ctx, "token is same, not kick",
|
|
||||||
"userID", newClient.UserID,
|
|
||||||
"platformID", newClient.PlatformID,
|
|
||||||
"token", newClient.token)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
clientsNeedToKick = append(clientsNeedToKick, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
return clientsNeedToKick
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
|
switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
|
||||||
case constant.DefalutNotKick:
|
case constant.DefalutNotKick:
|
||||||
case constant.PCAndOther:
|
case constant.PCAndOther:
|
||||||
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
|
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
clients, ok := ws.clients.GetAll(newClient.UserID)
|
|
||||||
clientOK = ok
|
|
||||||
oldClients = make([]*Client, 0, len(clients))
|
|
||||||
for _, c := range clients {
|
|
||||||
if constant.PlatformIDToClass(c.PlatformID) == constant.TerminalPC {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
oldClients = append(oldClients, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
fallthrough
|
fallthrough
|
||||||
case constant.AllLoginButSameTermKick:
|
case constant.AllLoginButSameTermKick:
|
||||||
if !clientOK {
|
if !clientOK {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
oldClients = checkSameTokenFunc(oldClients)
|
|
||||||
|
|
||||||
ws.clients.DeleteClients(newClient.UserID, oldClients)
|
ws.clients.DeleteClients(newClient.UserID, oldClients)
|
||||||
for _, c := range oldClients {
|
for _, c := range oldClients {
|
||||||
err := c.KickOnlineMessage()
|
err := c.KickOnlineMessage()
|
||||||
@@ -407,15 +373,14 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
var (
|
||||||
var kickClients []*Client
|
kickClients []*Client
|
||||||
|
)
|
||||||
for _, client := range clients {
|
for _, client := range clients {
|
||||||
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
|
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
|
||||||
kickClients = append(kickClients, client)
|
kickClients = append(kickClients, client)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
kickClients = checkSameTokenFunc(kickClients)
|
|
||||||
|
|
||||||
kickTokenFunc(kickClients)
|
kickTokenFunc(kickClients)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,12 +26,10 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
|
|
||||||
"github.com/go-redis/redis"
|
|
||||||
"google.golang.org/protobuf/proto"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
|
"github.com/go-redis/redis"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
@@ -43,6 +41,7 @@ import (
|
|||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -141,48 +140,53 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
|
|||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
||||||
|
|
||||||
// Outer map: conversationID -> (userID -> maxHasReadSeq)
|
var conversationID string
|
||||||
conversationUserSeq := make(map[string]map[string]int64)
|
var userSeqMap map[string]int64
|
||||||
|
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
if msg.message.ContentType != constant.HasReadReceipt {
|
if msg.message.ContentType != constant.HasReadReceipt {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var elem sdkws.NotificationElem
|
var elem sdkws.NotificationElem
|
||||||
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
||||||
log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
|
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var tips sdkws.MarkAsReadTips
|
var tips sdkws.MarkAsReadTips
|
||||||
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
||||||
log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
|
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
|
//The conversation ID for each batch of messages processed by the batcher is the same.
|
||||||
continue
|
conversationID = tips.ConversationID
|
||||||
}
|
if len(tips.Seqs) > 0 {
|
||||||
|
for _, seq := range tips.Seqs {
|
||||||
// Calculate the max seq from tips.Seqs
|
if tips.HasReadSeq < seq {
|
||||||
for _, seq := range tips.Seqs {
|
tips.HasReadSeq = seq
|
||||||
if tips.HasReadSeq < seq {
|
}
|
||||||
tips.HasReadSeq = seq
|
|
||||||
}
|
}
|
||||||
|
clear(tips.Seqs)
|
||||||
|
tips.Seqs = nil
|
||||||
|
}
|
||||||
|
if tips.HasReadSeq < 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if userSeqMap == nil {
|
||||||
|
userSeqMap = make(map[string]int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
|
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
|
||||||
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
|
continue
|
||||||
}
|
|
||||||
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
|
|
||||||
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
|
|
||||||
}
|
}
|
||||||
|
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
|
||||||
}
|
}
|
||||||
log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
|
if userSeqMap == nil {
|
||||||
|
return
|
||||||
// persist to db
|
}
|
||||||
for convID, userSeqMap := range conversationUserSeq {
|
if len(conversationID) == 0 {
|
||||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
|
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
|
||||||
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
|
}
|
||||||
}
|
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
|
||||||
|
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -103,24 +103,22 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
|||||||
}
|
}
|
||||||
userClient := rpcli.NewUserClient(userConn)
|
userClient := rpcli.NewUserClient(userConn)
|
||||||
|
|
||||||
database := controller.NewFriendDatabase(
|
|
||||||
friendMongoDB,
|
|
||||||
friendRequestMongoDB,
|
|
||||||
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()),
|
|
||||||
mgocli.GetTx(),
|
|
||||||
)
|
|
||||||
// Initialize notification sender
|
// Initialize notification sender
|
||||||
notificationSender := NewFriendNotificationSender(
|
notificationSender := NewFriendNotificationSender(
|
||||||
&config.NotificationConfig,
|
&config.NotificationConfig,
|
||||||
rpcli.NewMsgClient(msgConn),
|
rpcli.NewMsgClient(msgConn),
|
||||||
WithRpcFunc(userClient.GetUsersInfo),
|
WithRpcFunc(userClient.GetUsersInfo),
|
||||||
WithFriendDB(database),
|
|
||||||
)
|
)
|
||||||
localcache.InitLocalCache(&config.LocalCacheConfig)
|
localcache.InitLocalCache(&config.LocalCacheConfig)
|
||||||
|
|
||||||
// Register Friend server with refactored MongoDB and Redis integrations
|
// Register Friend server with refactored MongoDB and Redis integrations
|
||||||
relation.RegisterFriendServer(server, &friendServer{
|
relation.RegisterFriendServer(server, &friendServer{
|
||||||
db: database,
|
db: controller.NewFriendDatabase(
|
||||||
|
friendMongoDB,
|
||||||
|
friendRequestMongoDB,
|
||||||
|
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()),
|
||||||
|
mgocli.GetTx(),
|
||||||
|
),
|
||||||
blackDatabase: controller.NewBlackDatabase(
|
blackDatabase: controller.NewBlackDatabase(
|
||||||
blackMongoDB,
|
blackMongoDB,
|
||||||
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()),
|
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()),
|
||||||
@@ -194,7 +192,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
|
|||||||
FromUserID: req.OwnerUserID,
|
FromUserID: req.OwnerUserID,
|
||||||
ToUserID: userID,
|
ToUserID: userID,
|
||||||
HandleResult: constant.FriendResponseAgree,
|
HandleResult: constant.FriendResponseAgree,
|
||||||
}, false)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
|
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
|
||||||
@@ -223,7 +221,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
|
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
|
||||||
s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true)
|
s.notificationSender.FriendApplicationAgreedNotification(ctx, req)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
if req.HandleResult == constant.FriendResponseRefuse {
|
if req.HandleResult == constant.FriendResponseRefuse {
|
||||||
|
|||||||
@@ -171,17 +171,11 @@ func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.
|
|||||||
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
|
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) {
|
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) {
|
||||||
var (
|
request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
||||||
request *sdkws.FriendRequest
|
if err != nil {
|
||||||
err error
|
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
|
||||||
)
|
return
|
||||||
if checkReq {
|
|
||||||
request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
|
||||||
if err != nil {
|
|
||||||
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
tips := sdkws.FriendApplicationApprovedTips{
|
tips := sdkws.FriendApplicationApprovedTips{
|
||||||
FromToUserID: &sdkws.FromToUserID{
|
FromToUserID: &sdkws.FromToUserID{
|
||||||
|
|||||||
@@ -17,13 +17,8 @@ package third
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
"github.com/openimsdk/tools/s3/aws"
|
"time"
|
||||||
"github.com/openimsdk/tools/s3/disable"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||||
@@ -39,6 +34,7 @@ import (
|
|||||||
"github.com/openimsdk/tools/s3/kodo"
|
"github.com/openimsdk/tools/s3/kodo"
|
||||||
"github.com/openimsdk/tools/s3/minio"
|
"github.com/openimsdk/tools/s3/minio"
|
||||||
"github.com/openimsdk/tools/s3/oss"
|
"github.com/openimsdk/tools/s3/oss"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type thirdServer struct {
|
type thirdServer struct {
|
||||||
@@ -94,10 +90,6 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
|
|||||||
o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build())
|
o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build())
|
||||||
case "kodo":
|
case "kodo":
|
||||||
o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build())
|
o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build())
|
||||||
case "aws":
|
|
||||||
o, err = aws.NewAws(*config.RpcConfig.Object.Aws.Build())
|
|
||||||
case "":
|
|
||||||
o = disable.NewDisable()
|
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("invalid object enable: %s", enable)
|
err = fmt.Errorf("invalid object enable: %s", enable)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -358,7 +358,7 @@ type Redis struct {
|
|||||||
Username string `mapstructure:"username"`
|
Username string `mapstructure:"username"`
|
||||||
Password string `mapstructure:"password"`
|
Password string `mapstructure:"password"`
|
||||||
ClusterMode bool `mapstructure:"clusterMode"`
|
ClusterMode bool `mapstructure:"clusterMode"`
|
||||||
DB int `mapstructure:"db"`
|
DB int `mapstructure:"storage"`
|
||||||
MaxRetry int `mapstructure:"maxRetry"`
|
MaxRetry int `mapstructure:"maxRetry"`
|
||||||
PoolSize int `mapstructure:"poolSize"`
|
PoolSize int `mapstructure:"poolSize"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -675,19 +675,12 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
|
|||||||
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
|
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
|
||||||
totalMsgs := make(map[string]*sdkws.MsgData)
|
totalMsgs := make(map[string]*sdkws.MsgData)
|
||||||
for _, conversationID := range conversationIDs {
|
for _, conversationID := range conversationIDs {
|
||||||
seq, ok := seqs[conversationID]
|
seq := seqs[conversationID]
|
||||||
if !ok {
|
|
||||||
log.ZWarn(ctx, "seq not found for conversationID", errs.New("seq not found for conversation"), "conversationID", conversationID)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
docID := db.msgTable.GetDocID(conversationID, seq)
|
docID := db.msgTable.GetDocID(conversationID, seq)
|
||||||
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "FindOneByDocID failed", err, "conversationID", conversationID, "docID", docID, "seq", seq)
|
return nil, err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
index := db.msgTable.GetMsgIndex(seq)
|
index := db.msgTable.GetMsgIndex(seq)
|
||||||
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
|
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,11 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
@@ -19,6 +14,10 @@ import (
|
|||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"github.com/openimsdk/tools/utils/jsonutil"
|
"github.com/openimsdk/tools/utils/jsonutil"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
|
func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
|
||||||
@@ -1150,7 +1149,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
for i := len(res) - 1; i >= 0; i-- {
|
for i := len(res) - 1; i > 0; i-- {
|
||||||
v := res[i]
|
v := res[i]
|
||||||
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
|
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
|
||||||
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
|
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
|
||||||
@@ -1165,7 +1164,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
|
|||||||
limit := int64(-1)
|
limit := int64(-1)
|
||||||
if first {
|
if first {
|
||||||
first = false
|
first = false
|
||||||
limit = m.model.GetLimitForSingleDoc(seq)
|
limit = m.model.GetMsgIndex(seq)
|
||||||
}
|
}
|
||||||
docID := m.model.BuildDocIDByIndex(conversationID, i)
|
docID := m.model.BuildDocIDByIndex(conversationID, i)
|
||||||
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
|
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
|
||||||
|
|||||||
@@ -120,11 +120,15 @@ func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
|
|||||||
|
|
||||||
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
|
||||||
t := make(map[string][]int64)
|
t := make(map[string][]int64)
|
||||||
for _, seq := range seqs {
|
for i := 0; i < len(seqs); i++ {
|
||||||
docID := m.GetDocID(conversationID, seq)
|
docID := m.GetDocID(conversationID, seqs[i])
|
||||||
t[docID] = append(t[docID], seq)
|
if value, ok := t[docID]; !ok {
|
||||||
|
var temp []int64
|
||||||
|
t[docID] = append(temp, seqs[i])
|
||||||
|
} else {
|
||||||
|
t[docID] = append(value, seqs[i])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -132,10 +136,6 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
|
|||||||
return (seq - 1) % singleGocMsgNum
|
return (seq - 1) % singleGocMsgNum
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
|
|
||||||
return seq % singleGocMsgNum
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
|
||||||
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-3
@@ -2,11 +2,9 @@ package rpcli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
|
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
|
||||||
|
|||||||
+1
-1
@@ -1 +1 @@
|
|||||||
v3.8.3-patch.10
|
v3.8.3-patch.5
|
||||||
|
|||||||
+1
-9
@@ -1,14 +1,6 @@
|
|||||||
package version
|
package version
|
||||||
|
|
||||||
import (
|
import _ "embed"
|
||||||
_ "embed"
|
|
||||||
"strings"
|
|
||||||
)
|
|
||||||
|
|
||||||
//go:embed version
|
//go:embed version
|
||||||
var Version string
|
var Version string
|
||||||
|
|
||||||
func init() {
|
|
||||||
Version = strings.Trim(Version, "\n")
|
|
||||||
Version = strings.TrimSpace(Version)
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user