mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-29 06:49:19 +08:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 047fa33704 | |||
| caf5d5c2f3 | |||
| 7fa2d08636 | |||
| 7b5c18b549 | |||
| 0a565070b8 | |||
| 43bc87ce99 | |||
| c4fe659c69 | |||
| 59c4c7575d |
@@ -0,0 +1,65 @@
|
||||
name: Cleanup After Milestone PRs Merged
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types:
|
||||
- closed
|
||||
|
||||
jobs:
|
||||
handle_pr:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4.2.0
|
||||
|
||||
- name: Get the PR title and extract PR numbers
|
||||
id: extract_pr_numbers
|
||||
run: |
|
||||
# Get the PR title
|
||||
PR_TITLE="${{ github.event.pull_request.title }}"
|
||||
|
||||
echo "PR Title: $PR_TITLE"
|
||||
|
||||
# Extract PR numbers from the title
|
||||
PR_NUMBERS=$(echo "$PR_TITLE" | grep -oE "#[0-9]+" | tr -d '#' | tr '\n' ' ')
|
||||
echo "Extracted PR Numbers: $PR_NUMBERS"
|
||||
|
||||
# Save PR numbers to a file
|
||||
echo "$PR_NUMBERS" > pr_numbers.txt
|
||||
echo "Saved PR Numbers to pr_numbers.txt"
|
||||
|
||||
# Check if the title matches a specific pattern
|
||||
if echo "$PR_TITLE" | grep -qE "^deps: Merge( #[0-9]+)+ PRs into .+"; then
|
||||
echo "proceed=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "proceed=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
|
||||
- name: Use extracted PR numbers and label PRs
|
||||
if: (steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')) && github.event.pull_request.merged == true
|
||||
run: |
|
||||
# Read the previously saved PR numbers
|
||||
PR_NUMBERS=$(cat pr_numbers.txt)
|
||||
echo "Using extracted PR Numbers: $PR_NUMBERS"
|
||||
|
||||
# Loop through each PR number and add label
|
||||
for PR_NUMBER in $PR_NUMBERS; do
|
||||
echo "Adding 'cherry-picked' label to PR #$PR_NUMBER"
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
https://api.github.com/repos/${{ github.repository }}/issues/$PR_NUMBER/labels \
|
||||
-d '{"labels":["cherry-picked"]}'
|
||||
done
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Delete branch after PR close
|
||||
if: steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')
|
||||
run: |
|
||||
BRANCH_NAME="${{ github.event.pull_request.head.ref }}"
|
||||
echo "Branch to delete: $BRANCH_NAME"
|
||||
git push origin --delete "$BRANCH_NAME"
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
@@ -2,11 +2,7 @@ name: Go Build Test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
pull_request:
|
||||
branches:
|
||||
- main
|
||||
paths-ignore:
|
||||
- '**/*.md'
|
||||
|
||||
|
||||
@@ -0,0 +1,218 @@
|
||||
name: Create Pre-Release PR from Milestone
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
pull-requests: write
|
||||
issues: write
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
milestone_name:
|
||||
description: 'Milestone name to collect closed PRs from'
|
||||
required: true
|
||||
default: 'v3.8.2'
|
||||
target_branch:
|
||||
description: 'Target branch to merge the consolidated PR'
|
||||
required: true
|
||||
default: 'pre-release-v3.8.2'
|
||||
|
||||
env:
|
||||
MILESTONE_NAME: ${{ github.event.inputs.milestone_name || 'v3.8.2' }}
|
||||
TARGET_BRANCH: ${{ github.event.inputs.target_branch || 'pre-release-v3.8.2' }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
|
||||
LABEL_NAME: cherry-picked
|
||||
TEMP_DIR: /tmp # Using /tmp as the temporary directory
|
||||
|
||||
jobs:
|
||||
cherry_pick_milestone_prs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Setup temp directory
|
||||
run: |
|
||||
# Create the temporary directory and initialize necessary files
|
||||
mkdir -p ${{ env.TEMP_DIR }}
|
||||
touch ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
touch ${{ env.TEMP_DIR }}/commit_hashes.txt
|
||||
touch ${{ env.TEMP_DIR }}/pr_title.txt
|
||||
touch ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
touch ${{ env.TEMP_DIR }}/created_pr_number.txt
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
token: ${{ secrets.BOT_TOKEN }}
|
||||
|
||||
- name: Setup Git User for OpenIM-Robot
|
||||
run: |
|
||||
# Set up Git credentials for the bot
|
||||
git config --global user.email "OpenIM-Robot@users.noreply.github.com"
|
||||
git config --global user.name "OpenIM-Robot"
|
||||
|
||||
- name: Fetch Milestone ID and Filter PR Numbers
|
||||
env:
|
||||
MILESTONE_NAME: ${{ env.MILESTONE_NAME }}
|
||||
run: |
|
||||
# Fetch milestone details and extract milestone ID
|
||||
milestones=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/milestones")
|
||||
milestone_id=$(echo "$milestones" | grep -B3 "\"title\": \"$MILESTONE_NAME\"" | grep '"number":' | head -n1 | grep -o '[0-9]\+')
|
||||
if [ -z "$milestone_id" ]; then
|
||||
echo "Milestone '$MILESTONE_NAME' not found. Exiting."
|
||||
exit 1
|
||||
fi
|
||||
echo "Milestone ID: $milestone_id"
|
||||
echo "MILESTONE_ID=$milestone_id" >> $GITHUB_ENV
|
||||
|
||||
# Fetch issues for the milestone
|
||||
issues=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/issues?milestone=$milestone_id&state=closed&per_page=100")
|
||||
|
||||
> ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
|
||||
# Filter PRs that do not have the 'cherry-picked' label
|
||||
for pr_number in $(echo "$issues" | jq -r '.[] | select(.pull_request != null) | .number'); do
|
||||
labels=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | jq -r '.[].name')
|
||||
|
||||
if ! echo "$labels" | grep -q "${LABEL_NAME}"; then
|
||||
echo "PR #$pr_number does not have the 'cherry-picked' label. Adding to the list."
|
||||
echo "$pr_number" >> ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
else
|
||||
echo "PR #$pr_number already has the 'cherry-picked' label. Skipping."
|
||||
fi
|
||||
done
|
||||
|
||||
# Sort the filtered PR numbers
|
||||
sort -n ${{ env.TEMP_DIR }}/pr_numbers.txt -o ${{ env.TEMP_DIR }}/pr_numbers.txt
|
||||
|
||||
echo "Filtered and sorted PR numbers:"
|
||||
cat ${{ env.TEMP_DIR }}/pr_numbers.txt || echo "No closed PR numbers found for milestone."
|
||||
|
||||
- name: Fetch Merge Commits for PRs and Generate Title and Body
|
||||
run: |
|
||||
# Ensure the files are initialized
|
||||
> ${{ env.TEMP_DIR }}/commit_hashes.txt
|
||||
> ${{ env.TEMP_DIR }}/pr_title.txt
|
||||
> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
|
||||
# Write description to the PR body
|
||||
echo "### Description:" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
echo "Merging PRs from milestone \`$MILESTONE_NAME\` into target branch \`$TARGET_BRANCH\`." >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
echo "" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
echo "### Need Merge PRs:" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
|
||||
pr_numbers_in_title=""
|
||||
|
||||
# Process sorted PR numbers and generate commit hashes
|
||||
for pr_number in $(cat ${{ env.TEMP_DIR }}/pr_numbers.txt); do
|
||||
echo "Processing PR #$pr_number"
|
||||
pr_details=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
"https://api.github.com/repos/${{ github.repository }}/pulls/$pr_number")
|
||||
pr_title=$(echo "$pr_details" | jq -r '.title')
|
||||
merge_commit=$(echo "$pr_details" | jq -r '.merge_commit_sha')
|
||||
short_commit_hash=$(echo "$merge_commit" | cut -c 1-7)
|
||||
|
||||
# Append PR details to the body
|
||||
echo "- $pr_title: (#$pr_number) ($short_commit_hash)" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
||||
|
||||
if [ "$merge_commit" != "null" ];then
|
||||
echo "$merge_commit" >> ${{ env.TEMP_DIR }}/commit_hashes.txt
|
||||
echo "#$pr_number" >> ${{ env.TEMP_DIR }}/pr_title.txt
|
||||
pr_numbers_in_title="$pr_numbers_in_title #$pr_number"
|
||||
fi
|
||||
done
|
||||
|
||||
commit_hashes=$(cat ${{ env.TEMP_DIR }}/commit_hashes.txt | tr '\n' ' ')
|
||||
first_commit_hash=$(head -n 1 ${{ env.TEMP_DIR }}/commit_hashes.txt)
|
||||
cherry_pick_branch="cherry-pick-${first_commit_hash:0:7}"
|
||||
echo "COMMIT_HASHES=$commit_hashes" >> $GITHUB_ENV
|
||||
echo "CHERRY_PICK_BRANCH=$cherry_pick_branch" >> $GITHUB_ENV
|
||||
echo "pr_numbers_in_title=$pr_numbers_in_title" >> $GITHUB_ENV
|
||||
|
||||
- name: Pull and Cherry-pick Commits, Then Push
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
|
||||
run: |
|
||||
# Fetch and pull the latest changes from the target branch
|
||||
git fetch origin
|
||||
git checkout $TARGET_BRANCH
|
||||
git pull origin $TARGET_BRANCH
|
||||
|
||||
# Create a new branch for cherry-picking
|
||||
git checkout -b $CHERRY_PICK_BRANCH
|
||||
|
||||
# Cherry-pick the commits and handle conflicts
|
||||
for commit_hash in $COMMIT_HASHES; do
|
||||
echo "Attempting to cherry-pick commit $commit_hash"
|
||||
if ! git cherry-pick "$commit_hash" --strategy=recursive -X theirs; then
|
||||
echo "Conflict detected for $commit_hash. Resolving with incoming changes."
|
||||
conflict_files=$(git diff --name-only --diff-filter=U)
|
||||
echo "Conflicting files:"
|
||||
echo "$conflict_files"
|
||||
|
||||
for file in $conflict_files; do
|
||||
if [ -f "$file" ]; then
|
||||
echo "Resolving conflict for $file"
|
||||
git add "$file"
|
||||
else
|
||||
echo "File $file has been deleted. Skipping."
|
||||
git rm "$file"
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Conflicts resolved. Continuing cherry-pick."
|
||||
git cherry-pick --continue
|
||||
else
|
||||
echo "Cherry-pick successful for commit $commit_hash."
|
||||
fi
|
||||
done
|
||||
|
||||
# Push the cherry-pick branch to the repository
|
||||
git remote set-url origin "https://${BOT_TOKEN}@github.com/${{ github.repository }}.git"
|
||||
git push origin $CHERRY_PICK_BRANCH --force
|
||||
|
||||
- name: Create Pull Request
|
||||
run: |
|
||||
# Prepare and create the PR
|
||||
pr_title="deps: Merge ${{ env.pr_numbers_in_title }} PRs into $TARGET_BRANCH"
|
||||
pr_body=$(cat ${{ env.TEMP_DIR }}/pr_body.txt)
|
||||
|
||||
echo "Prepared PR title:"
|
||||
echo "$pr_title"
|
||||
echo "Prepared PR body:"
|
||||
echo "$pr_body"
|
||||
|
||||
# Create the PR using the GitHub API
|
||||
response=$(curl -s -X POST -H "Authorization: token $BOT_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
https://api.github.com/repos/${{ github.repository }}/pulls \
|
||||
-d "$(jq -n --arg title "$pr_title" \
|
||||
--arg head "$CHERRY_PICK_BRANCH" \
|
||||
--arg base "$TARGET_BRANCH" \
|
||||
--arg body "$pr_body" \
|
||||
'{title: $title, head: $head, base: $base, body: $body}')")
|
||||
|
||||
pr_number=$(echo "$response" | jq -r '.number')
|
||||
echo "$pr_number" > ${{ env.TEMP_DIR }}/created_pr_number.txt
|
||||
echo "Created PR #$pr_number"
|
||||
|
||||
- name: Add Label to Created Pull Request
|
||||
run: |
|
||||
# Add 'milestone-merge' label to the created PR
|
||||
pr_number=$(cat ${{ env.TEMP_DIR }}/created_pr_number.txt)
|
||||
echo "Adding label to PR #$pr_number"
|
||||
|
||||
curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-d '{"labels": ["milestone-merge"]}' \
|
||||
"https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels"
|
||||
|
||||
echo "Added 'milestone-merge' label to PR #$pr_number."
|
||||
@@ -1,62 +0,0 @@
|
||||
# Version logging for OpenIM
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_TOC -->
|
||||
|
||||
<!-- END MUNGE: GENERATED_TOC -->
|
||||
|
||||
{{ if .Versions -}}
|
||||
<a name="unreleased"></a>
|
||||
## [Unreleased]
|
||||
|
||||
{{ if .Unreleased.CommitGroups -}}
|
||||
{{ range .Unreleased.CommitGroups -}}
|
||||
### {{ .Title }}
|
||||
{{ range .Commits -}}
|
||||
- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }}
|
||||
{{ end }}
|
||||
{{ end -}}
|
||||
{{ end -}}
|
||||
{{ end -}}
|
||||
|
||||
{{ range .Versions }}
|
||||
<a name="{{ .Tag.Name }}"></a>
|
||||
## {{ if .Tag.Previous }}[{{ .Tag.Name }}]{{ else }}{{ .Tag.Name }}{{ end }} - {{ datetime "2006-01-02" .Tag.Date }}
|
||||
{{ range .CommitGroups -}}
|
||||
### {{ .Title }}
|
||||
{{ range .Commits -}}
|
||||
- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }}
|
||||
{{ end }}
|
||||
{{ end -}}
|
||||
|
||||
{{- if .RevertCommits -}}
|
||||
### Reverts
|
||||
{{ range .RevertCommits -}}
|
||||
- {{ .Revert.Header }}
|
||||
{{ end }}
|
||||
{{ end -}}
|
||||
|
||||
{{- if .MergeCommits -}}
|
||||
### Pull Requests
|
||||
{{ range .MergeCommits -}}
|
||||
- {{ .Header }}
|
||||
{{ end }}
|
||||
{{ end -}}
|
||||
|
||||
{{- if .NoteGroups -}}
|
||||
{{ range .NoteGroups -}}
|
||||
### {{ .Title }}
|
||||
{{ range .Notes }}
|
||||
{{ .Body }}
|
||||
{{ end }}
|
||||
{{ end -}}
|
||||
{{ end -}}
|
||||
{{ end -}}
|
||||
|
||||
{{- if .Versions }}
|
||||
[Unreleased]: {{ .Info.RepositoryURL }}/compare/{{ $latest := index .Versions 0 }}{{ $latest.Tag.Name }}...HEAD
|
||||
{{ range .Versions -}}
|
||||
{{ if .Tag.Previous -}}
|
||||
[{{ .Tag.Name }}]: {{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }}
|
||||
{{ end -}}
|
||||
{{ end -}}
|
||||
{{ end -}}
|
||||
@@ -1,67 +0,0 @@
|
||||
bin: git
|
||||
style: github
|
||||
template: CHANGELOG.tpl.md
|
||||
info:
|
||||
title: CHANGELOG
|
||||
repository_url: https://github.com/openimsdk/open-im-server
|
||||
options:
|
||||
tag_filter_pattern: '^v'
|
||||
sort: "date"
|
||||
|
||||
commits:
|
||||
filters:
|
||||
Type:
|
||||
- feat
|
||||
- fix
|
||||
- perf
|
||||
- refactor
|
||||
- docs
|
||||
- test
|
||||
- chore
|
||||
- ci
|
||||
- build
|
||||
sort_by: Scope
|
||||
|
||||
commit_groups:
|
||||
group_by: Type
|
||||
sort_by: Title
|
||||
title_order:
|
||||
- feat
|
||||
- fix
|
||||
- perf
|
||||
- refactor
|
||||
- docs
|
||||
- test
|
||||
- chore
|
||||
- ci
|
||||
- build
|
||||
title_maps:
|
||||
feat: Features
|
||||
|
||||
header:
|
||||
pattern: "<regexp>"
|
||||
pattern_maps:
|
||||
- PropName
|
||||
|
||||
issues:
|
||||
prefix:
|
||||
- #
|
||||
|
||||
refs:
|
||||
actions:
|
||||
- Closes
|
||||
- Fixes
|
||||
|
||||
merges:
|
||||
pattern: "^Merge branch '(\\w+)'$"
|
||||
pattern_maps:
|
||||
- Source
|
||||
|
||||
reverts:
|
||||
pattern: "^Revert \"([\\s\\S]*)\"$"
|
||||
pattern_maps:
|
||||
- Header
|
||||
|
||||
notes:
|
||||
keywords:
|
||||
- BREAKING CHANGE
|
||||
@@ -53,15 +53,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-cmdutils
|
||||
id: openim-cmdutils
|
||||
@@ -71,15 +64,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-crontask
|
||||
id: openim-crontask
|
||||
@@ -89,15 +75,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-msggateway
|
||||
id: openim-msggateway
|
||||
@@ -107,15 +86,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-msgtransfer
|
||||
id: openim-msgtransfer
|
||||
@@ -125,15 +97,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-push
|
||||
id: openim-push
|
||||
@@ -143,15 +108,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-auth
|
||||
id: openim-rpc-auth
|
||||
@@ -161,15 +119,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-conversation
|
||||
id: openim-rpc-conversation
|
||||
@@ -179,15 +130,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-friend
|
||||
id: openim-rpc-friend
|
||||
@@ -197,15 +141,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-group
|
||||
id: openim-rpc-group
|
||||
@@ -215,15 +152,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-msg
|
||||
id: openim-rpc-msg
|
||||
@@ -233,15 +163,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-third
|
||||
id: openim-rpc-third
|
||||
@@ -251,15 +174,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
- binary: openim-rpc-user
|
||||
id: openim-rpc-user
|
||||
@@ -269,15 +185,8 @@ builds:
|
||||
- windows
|
||||
- linux
|
||||
goarch:
|
||||
- s390x
|
||||
- mips64
|
||||
- mips64le
|
||||
- amd64
|
||||
- ppc64le
|
||||
- arm64
|
||||
goarm:
|
||||
- "6"
|
||||
- "7"
|
||||
|
||||
|
||||
# TODO:Need a script, such as the init - release to help binary to find the right directory
|
||||
|
||||
@@ -15,9 +15,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
_ "net/http/pprof"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
||||
"github.com/openimsdk/tools/system/program"
|
||||
_ "net/http/pprof"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
+3
-20
@@ -1,20 +1,3 @@
|
||||
# Copyright © 2023 OpenIM. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# Determines if a message should be sent. If set to false, it triggers a silent sync without a message. If true, it requires triggering a conversation.
|
||||
# For rpc notification, send twice: once as a message and once as a notification.
|
||||
# The options field 'isNotification' indicates if it's a notification.
|
||||
groupCreated:
|
||||
isSendMsg: true
|
||||
# Reliability level of the message sending.
|
||||
@@ -309,9 +292,9 @@ userInfoUpdated:
|
||||
unreadCount: false
|
||||
offlinePush:
|
||||
enable: true
|
||||
title: Remove a blocked user
|
||||
desc: Remove a blocked user
|
||||
ext: Remove a blocked user
|
||||
title: userInfo updated
|
||||
desc: userInfo updated
|
||||
ext: userInfo updated
|
||||
|
||||
userStatusChanged:
|
||||
isSendMsg: false
|
||||
|
||||
@@ -12,7 +12,7 @@ require (
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.46
|
||||
github.com/openimsdk/protocol v0.0.72-pre-release-v3.8.2-alpha.5
|
||||
github.com/openimsdk/tools v0.0.50-alpha.16
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.18.0
|
||||
|
||||
@@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc=
|
||||
github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/protocol v0.0.72-pre-release-v3.8.2-alpha.5 h1:b0JAuBhzIYirHeXp7asB04bE1q+KhU3dpAaAroc/Am0=
|
||||
github.com/openimsdk/protocol v0.0.72-pre-release-v3.8.2-alpha.5/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc=
|
||||
github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
|
||||
+32
-90
@@ -1,15 +1,11 @@
|
||||
package jssdk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/group"
|
||||
"github.com/openimsdk/protocol/jssdk"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/relation"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/protocol/user"
|
||||
"github.com/openimsdk/tools/a2r"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"sort"
|
||||
@@ -20,22 +16,16 @@ const (
|
||||
defaultGetActiveConversation = 100
|
||||
)
|
||||
|
||||
func NewJSSdkApi(user user.UserClient, friend relation.FriendClient, group group.GroupClient, msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk {
|
||||
func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk {
|
||||
return &JSSdk{
|
||||
user: user,
|
||||
friend: friend,
|
||||
group: group,
|
||||
msg: msg,
|
||||
conv: conv,
|
||||
msg: msg,
|
||||
conv: conv,
|
||||
}
|
||||
}
|
||||
|
||||
type JSSdk struct {
|
||||
user user.UserClient
|
||||
friend relation.FriendClient
|
||||
group group.GroupClient
|
||||
msg msg.MsgClient
|
||||
conv conversation.ConversationClient
|
||||
msg msg.MsgClient
|
||||
conv conversation.ConversationClient
|
||||
}
|
||||
|
||||
func (x *JSSdk) GetActiveConversations(c *gin.Context) {
|
||||
@@ -46,71 +36,25 @@ func (x *JSSdk) GetConversations(c *gin.Context) {
|
||||
call(c, x.getConversations)
|
||||
}
|
||||
|
||||
func (x *JSSdk) fillConversations(ctx context.Context, conversations []*jssdk.ConversationMsg) error {
|
||||
if len(conversations) == 0 {
|
||||
return nil
|
||||
func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
||||
req, err := a2r.ParseRequest[ActiveConversationsReq](ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
userIDs []string
|
||||
groupIDs []string
|
||||
)
|
||||
for _, c := range conversations {
|
||||
if c.Conversation.GroupID == "" {
|
||||
userIDs = append(userIDs, c.Conversation.UserID)
|
||||
} else {
|
||||
groupIDs = append(groupIDs, c.Conversation.GroupID)
|
||||
}
|
||||
}
|
||||
var (
|
||||
userMap map[string]*sdkws.UserInfo
|
||||
friendMap map[string]*relation.FriendInfoOnly
|
||||
groupMap map[string]*sdkws.GroupInfo
|
||||
)
|
||||
if len(userIDs) > 0 {
|
||||
users, err := field(ctx, x.user.GetDesignateUsers, &user.GetDesignateUsersReq{UserIDs: userIDs}, (*user.GetDesignateUsersResp).GetUsersInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
friends, err := field(ctx, x.friend.GetFriendInfo, &relation.GetFriendInfoReq{OwnerUserID: conversations[0].Conversation.OwnerUserID, FriendUserIDs: userIDs}, (*relation.GetFriendInfoResp).GetFriendInfos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
userMap = datautil.SliceToMap(users, (*sdkws.UserInfo).GetUserID)
|
||||
friendMap = datautil.SliceToMap(friends, (*relation.FriendInfoOnly).GetFriendUserID)
|
||||
}
|
||||
if len(groupIDs) > 0 {
|
||||
resp, err := x.group.GetGroupsInfo(ctx, &group.GetGroupsInfoReq{GroupIDs: groupIDs})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
groupMap = datautil.SliceToMap(resp.GroupInfos, (*sdkws.GroupInfo).GetGroupID)
|
||||
}
|
||||
for _, c := range conversations {
|
||||
if c.Conversation.GroupID == "" {
|
||||
c.User = userMap[c.Conversation.UserID]
|
||||
c.Friend = friendMap[c.Conversation.UserID]
|
||||
} else {
|
||||
c.Group = groupMap[c.Conversation.GroupID]
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActiveConversationsReq) (*jssdk.GetActiveConversationsResp, error) {
|
||||
if req.Count <= 0 || req.Count > maxGetActiveConversation {
|
||||
req.Count = defaultGetActiveConversation
|
||||
}
|
||||
req.OwnerUserID = mcontext.GetOpUserID(ctx)
|
||||
opUserID := mcontext.GetOpUserID(ctx)
|
||||
conversationIDs, err := field(ctx, x.conv.GetConversationIDs,
|
||||
&conversation.GetConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
|
||||
&conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(conversationIDs) == 0 {
|
||||
return &jssdk.GetActiveConversationsResp{}, nil
|
||||
return &ConversationsResp{}, nil
|
||||
}
|
||||
readSeq, err := field(ctx, x.msg.GetHasReadSeqs,
|
||||
&msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
||||
&msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -120,24 +64,24 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
return nil, err
|
||||
}
|
||||
if len(activeConversation) == 0 {
|
||||
return &jssdk.GetActiveConversationsResp{}, nil
|
||||
return &ConversationsResp{}, nil
|
||||
}
|
||||
sortConversations := sortActiveConversations{
|
||||
Conversation: activeConversation,
|
||||
}
|
||||
if len(activeConversation) > 1 {
|
||||
pinnedConversationIDs, err := field(ctx, x.conv.GetPinnedConversationIDs,
|
||||
&conversation.GetPinnedConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs)
|
||||
&conversation.GetPinnedConversationIDsReq{UserID: opUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs)
|
||||
}
|
||||
sort.Sort(&sortConversations)
|
||||
sortList := sortConversations.Top(int(req.Count))
|
||||
sortList := sortConversations.Top(req.Count)
|
||||
conversations, err := field(ctx, x.conv.GetConversations,
|
||||
&conversation.GetConversationsReq{
|
||||
OwnerUserID: req.OwnerUserID,
|
||||
OwnerUserID: opUserID,
|
||||
ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string {
|
||||
return c.ConversationID
|
||||
})}, (*conversation.GetConversationsResp).GetConversations)
|
||||
@@ -146,7 +90,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
}
|
||||
msgs, err := field(ctx, x.msg.GetSeqMessage,
|
||||
&msg.GetSeqMessageReq{
|
||||
UserID: req.OwnerUserID,
|
||||
UserID: opUserID,
|
||||
Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs {
|
||||
return &msg.ConversationSeqs{
|
||||
ConversationID: c.ConversationID,
|
||||
@@ -160,7 +104,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
||||
return c.ConversationID
|
||||
})
|
||||
resp := make([]*jssdk.ConversationMsg, 0, len(sortList))
|
||||
resp := make([]ConversationMsg, 0, len(sortList))
|
||||
for _, c := range sortList {
|
||||
conv, ok := conversationMap[c.ConversationID]
|
||||
if !ok {
|
||||
@@ -170,16 +114,13 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||
lastMsg = msgList.Msgs[0]
|
||||
}
|
||||
resp = append(resp, &jssdk.ConversationMsg{
|
||||
resp = append(resp, ConversationMsg{
|
||||
Conversation: conv,
|
||||
LastMsg: lastMsg,
|
||||
MaxSeq: c.MaxSeq,
|
||||
ReadSeq: readSeq[c.ConversationID],
|
||||
})
|
||||
}
|
||||
if err := x.fillConversations(ctx, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var unreadCount int64
|
||||
for _, c := range activeConversation {
|
||||
count := c.MaxSeq - readSeq[c.ConversationID]
|
||||
@@ -187,20 +128,24 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
unreadCount += count
|
||||
}
|
||||
}
|
||||
return &jssdk.GetActiveConversationsResp{
|
||||
return &ConversationsResp{
|
||||
Conversations: resp,
|
||||
UnreadCount: unreadCount,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversationsReq) (*jssdk.GetConversationsResp, error) {
|
||||
func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
||||
req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.OwnerUserID = mcontext.GetOpUserID(ctx)
|
||||
conversations, err := field(ctx, x.conv.GetConversations, &conversation.GetConversationsReq{OwnerUserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*conversation.GetConversationsResp).GetConversations)
|
||||
conversations, err := field(ctx, x.conv.GetConversations, req, (*conversation.GetConversationsResp).GetConversations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(conversations) == 0 {
|
||||
return &jssdk.GetConversationsResp{}, nil
|
||||
return &ConversationsResp{}, nil
|
||||
}
|
||||
req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string {
|
||||
return c.ConversationID
|
||||
@@ -232,22 +177,19 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
||||
resp := make([]ConversationMsg, 0, len(conversations))
|
||||
for _, c := range conversations {
|
||||
var lastMsg *sdkws.MsgData
|
||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||
lastMsg = msgList.Msgs[0]
|
||||
}
|
||||
resp = append(resp, &jssdk.ConversationMsg{
|
||||
resp = append(resp, ConversationMsg{
|
||||
Conversation: c,
|
||||
LastMsg: lastMsg,
|
||||
MaxSeq: maxSeqs[c.ConversationID],
|
||||
ReadSeq: readSeqs[c.ConversationID],
|
||||
})
|
||||
}
|
||||
if err := x.fillConversations(ctx, resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var unreadCount int64
|
||||
for conversationID, maxSeq := range maxSeqs {
|
||||
count := maxSeq - readSeqs[conversationID]
|
||||
@@ -255,7 +197,7 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
||||
unreadCount += count
|
||||
}
|
||||
}
|
||||
return &jssdk.GetConversationsResp{
|
||||
return &ConversationsResp{
|
||||
Conversations: resp,
|
||||
UnreadCount: unreadCount,
|
||||
}, nil
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package jssdk
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
)
|
||||
|
||||
type ActiveConversationsReq struct {
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
||||
type ConversationMsg struct {
|
||||
Conversation *conversation.Conversation `json:"conversation"`
|
||||
LastMsg *sdkws.MsgData `json:"lastMsg"`
|
||||
MaxSeq int64 `json:"maxSeq"`
|
||||
ReadSeq int64 `json:"readSeq"`
|
||||
}
|
||||
|
||||
type ConversationsResp struct {
|
||||
UnreadCount int64 `json:"unreadCount"`
|
||||
Conversations []ConversationMsg `json:"conversations"`
|
||||
}
|
||||
@@ -3,14 +3,8 @@ package jssdk
|
||||
import (
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/openimsdk/tools/a2r"
|
||||
"github.com/openimsdk/tools/apiresp"
|
||||
"github.com/openimsdk/tools/checker"
|
||||
"github.com/openimsdk/tools/errs"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
|
||||
@@ -22,56 +16,11 @@ func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A
|
||||
return get(resp), nil
|
||||
}
|
||||
|
||||
func call[A, B any](c *gin.Context, fn func(ctx context.Context, req *A) (*B, error)) {
|
||||
var isJSON bool
|
||||
switch contentType := c.GetHeader("Content-Type"); {
|
||||
case contentType == "":
|
||||
isJSON = true
|
||||
case strings.Contains(contentType, "application/json"):
|
||||
isJSON = true
|
||||
case strings.Contains(contentType, "application/protobuf"):
|
||||
case strings.Contains(contentType, "application/x-protobuf"):
|
||||
default:
|
||||
apiresp.GinError(c, errs.ErrArgs.WrapMsg("unsupported content type"))
|
||||
return
|
||||
}
|
||||
var req *A
|
||||
if isJSON {
|
||||
var err error
|
||||
req, err = a2r.ParseRequest[A](c)
|
||||
if err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
body, err := io.ReadAll(c.Request.Body)
|
||||
if err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
req = new(A)
|
||||
if err := proto.Unmarshal(body, any(req).(proto.Message)); err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
if err := checker.Validate(&req); err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
resp, err := fn(c, req)
|
||||
func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) {
|
||||
resp, err := fn(c)
|
||||
if err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
if isJSON {
|
||||
apiresp.GinSuccess(c, resp)
|
||||
return
|
||||
}
|
||||
body, err := proto.Marshal(any(resp).(proto.Message))
|
||||
if err != nil {
|
||||
apiresp.GinError(c, err)
|
||||
return
|
||||
}
|
||||
apiresp.GinSuccess(c, body)
|
||||
apiresp.GinSuccess(c, resp)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
val := sortActiveConversations{
|
||||
Conversation: []*msg.ActiveConversation{
|
||||
{
|
||||
ConversationID: "100",
|
||||
LastTime: 100,
|
||||
},
|
||||
{
|
||||
ConversationID: "200",
|
||||
LastTime: 200,
|
||||
},
|
||||
{
|
||||
ConversationID: "300",
|
||||
LastTime: 300,
|
||||
},
|
||||
{
|
||||
ConversationID: "400",
|
||||
LastTime: 400,
|
||||
},
|
||||
},
|
||||
//PinnedConversationIDs: map[string]struct{}{
|
||||
// "100": {},
|
||||
// "300": {},
|
||||
//},
|
||||
}
|
||||
sort.Sort(&val)
|
||||
t.Log(val)
|
||||
|
||||
}
|
||||
@@ -77,7 +77,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
||||
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
|
||||
u := NewUserApi(*userRpc)
|
||||
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
|
||||
j := jssdk.NewJSSdkApi(userRpc.Client, friendRpc.Client, groupRpc.Client, messageRpc.Client, conversationRpc.Client)
|
||||
j := jssdk.NewJSSdkApi(messageRpc.Client, conversationRpc.Client)
|
||||
userRouterGroup := r.Group("/user")
|
||||
{
|
||||
userRouterGroup.POST("/user_register", u.UserRegister)
|
||||
|
||||
@@ -15,7 +15,8 @@
|
||||
package msggateway
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
|
||||
"github.com/openimsdk/tools/errs"
|
||||
)
|
||||
@@ -32,17 +33,19 @@ func NewGobEncoder() *GobEncoder {
|
||||
}
|
||||
|
||||
func (g *GobEncoder) Encode(data any) ([]byte, error) {
|
||||
b, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return nil, errs.New("Encoder.Encode failed", "action", "encode")
|
||||
buff := bytes.Buffer{}
|
||||
enc := gob.NewEncoder(&buff)
|
||||
if err := enc.Encode(data); err != nil {
|
||||
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
|
||||
}
|
||||
return b, nil
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
|
||||
func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||
err := json.Unmarshal(encodeData, decodeData)
|
||||
if err != nil {
|
||||
return errs.New("Encoder.Decode failed", "action", "decode")
|
||||
buff := bytes.NewBuffer(encodeData)
|
||||
dec := gob.NewDecoder(buff)
|
||||
if err := dec.Decode(decodeData); err != nil {
|
||||
return errs.WrapMsg(err, "GobEncoder.Decode failed", "action", "decode")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,48 +0,0 @@
|
||||
package msggateway
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGobEncoder_Encode(t *testing.T) {
|
||||
encoder := NewGobEncoder()
|
||||
|
||||
// 测试用例1: 编码 []byte 数据
|
||||
inputData := []byte("test data")
|
||||
encodedData, err := encoder.Encode(inputData)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got %v", err)
|
||||
}
|
||||
if string(encodedData) != string(inputData) {
|
||||
t.Fatalf("expected encoded data to be '%s', got '%s'", inputData, encodedData)
|
||||
}
|
||||
|
||||
// 测试用例2: 编码非 []byte 数据
|
||||
nonByteData := "string data"
|
||||
_, err = encoder.Encode(nonByteData)
|
||||
if err == nil {
|
||||
t.Fatalf("expected an error when encoding non-byte data, got none")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGobEncoder_Decode(t *testing.T) {
|
||||
encoder := NewGobEncoder()
|
||||
|
||||
// 测试用例1: 解码到 []byte 数据
|
||||
encodedData := []byte("test data")
|
||||
var decodedData []byte
|
||||
err := encoder.Decode(encodedData, &decodedData)
|
||||
if err != nil {
|
||||
t.Fatalf("expected no error, got %v", err)
|
||||
}
|
||||
if string(decodedData) != string(encodedData) {
|
||||
t.Fatalf("expected decoded data to be '%s', got '%s'", encodedData, decodedData)
|
||||
}
|
||||
|
||||
// 测试用例2: 解码到非 []byte 数据
|
||||
var nonByteData string
|
||||
err = encoder.Decode(encodedData, &nonByteData)
|
||||
if err == nil {
|
||||
t.Fatalf("expected an error when decoding to non-byte data, got none")
|
||||
}
|
||||
}
|
||||
@@ -155,7 +155,6 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M
|
||||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
||||
err := client.PushMessage(ctx, msgData)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
|
||||
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
|
||||
} else {
|
||||
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
||||
|
||||
@@ -128,6 +128,7 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
||||
|
||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
|
||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
|
||||
go m.historyCH.HandleUserHasReadSeqMessages(m.ctx)
|
||||
err := m.historyCH.redisMessageBatches.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -157,12 +158,14 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
||||
// graceful close kafka client.
|
||||
m.cancel()
|
||||
m.historyCH.redisMessageBatches.Close()
|
||||
m.historyCH.Close()
|
||||
m.historyCH.historyConsumerGroup.Close()
|
||||
m.historyMongoCH.historyConsumerGroup.Close()
|
||||
return nil
|
||||
case <-netDone:
|
||||
m.cancel()
|
||||
m.historyCH.redisMessageBatches.Close()
|
||||
m.historyCH.Close()
|
||||
m.historyCH.historyConsumerGroup.Close()
|
||||
m.historyMongoCH.historyConsumerGroup.Close()
|
||||
close(netDone)
|
||||
|
||||
@@ -18,8 +18,10 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
@@ -40,11 +42,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
size = 500
|
||||
mainDataBuffer = 500
|
||||
subChanBuffer = 50
|
||||
worker = 50
|
||||
interval = 100 * time.Millisecond
|
||||
size = 500
|
||||
mainDataBuffer = 500
|
||||
subChanBuffer = 50
|
||||
worker = 50
|
||||
interval = 100 * time.Millisecond
|
||||
hasReadChanBuffer = 1000
|
||||
)
|
||||
|
||||
type ContextMsg struct {
|
||||
@@ -52,14 +55,23 @@ type ContextMsg struct {
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB.
|
||||
// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10.
|
||||
type userHasReadSeq struct {
|
||||
conversationID string
|
||||
userHasReadMap map[string]int64
|
||||
}
|
||||
|
||||
type OnlineHistoryRedisConsumerHandler struct {
|
||||
historyConsumerGroup *kafka.MConsumerGroup
|
||||
|
||||
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
||||
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
msgTransferDatabase controller.MsgTransferDatabase
|
||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||
groupRpcClient *rpcclient.GroupRpcClient
|
||||
conversationUserHasReadChan chan *userHasReadSeq
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
|
||||
@@ -70,6 +82,8 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
|
||||
}
|
||||
var och OnlineHistoryRedisConsumerHandler
|
||||
och.msgTransferDatabase = database
|
||||
och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer)
|
||||
och.wg.Add(1)
|
||||
|
||||
b := batcher.New[sarama.ConsumerMessage](
|
||||
batcher.WithSize(size),
|
||||
@@ -115,25 +129,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
||||
type seqKey struct {
|
||||
conversationID string
|
||||
userID string
|
||||
}
|
||||
var readSeq map[seqKey]int64
|
||||
|
||||
var conversationID string
|
||||
var userSeqMap map[string]int64
|
||||
for _, msg := range msgs {
|
||||
if msg.message.ContentType != constant.HasReadReceipt {
|
||||
continue
|
||||
}
|
||||
var elem sdkws.NotificationElem
|
||||
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
||||
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
||||
continue
|
||||
}
|
||||
var tips sdkws.MarkAsReadTips
|
||||
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
||||
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
||||
continue
|
||||
}
|
||||
//The conversation ID for each batch of messages processed by the batcher is the same.
|
||||
conversationID = tips.ConversationID
|
||||
if len(tips.Seqs) > 0 {
|
||||
for _, seq := range tips.Seqs {
|
||||
if tips.HasReadSeq < seq {
|
||||
@@ -146,26 +160,25 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
|
||||
if tips.HasReadSeq < 0 {
|
||||
continue
|
||||
}
|
||||
if readSeq == nil {
|
||||
readSeq = make(map[seqKey]int64)
|
||||
if userSeqMap == nil {
|
||||
userSeqMap = make(map[string]int64)
|
||||
}
|
||||
key := seqKey{
|
||||
conversationID: tips.ConversationID,
|
||||
userID: tips.MarkAsReadUserID,
|
||||
}
|
||||
if readSeq[key] > tips.HasReadSeq {
|
||||
|
||||
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
|
||||
continue
|
||||
}
|
||||
readSeq[key] = tips.HasReadSeq
|
||||
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
|
||||
}
|
||||
if readSeq == nil {
|
||||
if userSeqMap == nil {
|
||||
return
|
||||
}
|
||||
for key, seq := range readSeq {
|
||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
|
||||
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
|
||||
}
|
||||
if len(conversationID) == 0 {
|
||||
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
|
||||
}
|
||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
|
||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
|
||||
@@ -250,12 +263,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
||||
}
|
||||
if len(storageMessageList) > 0 {
|
||||
msg := storageMessageList[0]
|
||||
lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
|
||||
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
||||
log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
||||
return
|
||||
}
|
||||
log.ZInfo(ctx, "BatchInsertChat2Cache end")
|
||||
err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
och.conversationUserHasReadChan <- &userHasReadSeq{
|
||||
conversationID: conversationID,
|
||||
userHasReadMap: userSeqMap,
|
||||
}
|
||||
|
||||
if isNewConversation {
|
||||
switch msg.SessionType {
|
||||
@@ -308,7 +330,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
||||
storageMessageList = append(storageMessageList, msg.message)
|
||||
}
|
||||
if len(storageMessageList) > 0 {
|
||||
lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
|
||||
"storageList", storageMessageList)
|
||||
@@ -323,6 +345,21 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
||||
och.toPushTopic(ctx, key, conversationID, storageList)
|
||||
}
|
||||
}
|
||||
func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) {
|
||||
defer och.wg.Done()
|
||||
|
||||
for msg := range och.conversationUserHasReadChan {
|
||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil {
|
||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap)
|
||||
}
|
||||
}
|
||||
|
||||
log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages")
|
||||
}
|
||||
func (och *OnlineHistoryRedisConsumerHandler) Close() {
|
||||
close(och.conversationUserHasReadChan)
|
||||
och.wg.Wait()
|
||||
}
|
||||
|
||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||
for _, v := range msgs {
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
)
|
||||
|
||||
func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
||||
@@ -70,7 +69,7 @@ func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *
|
||||
|
||||
func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error {
|
||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||
if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
|
||||
if msg.ContentType == constant.Typing {
|
||||
return nil
|
||||
}
|
||||
req := callbackstruct.CallbackBeforePushReq{
|
||||
|
||||
@@ -1026,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
|
||||
}
|
||||
num := len(update)
|
||||
if req.GroupInfoForSet.Notification != "" {
|
||||
num--
|
||||
num -= 3
|
||||
func() {
|
||||
conversation := &pbconversation.ConversationReq{
|
||||
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID),
|
||||
@@ -1133,8 +1133,9 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
|
||||
}
|
||||
|
||||
num := len(updatedData)
|
||||
|
||||
if req.Notification != nil {
|
||||
num--
|
||||
num -= 3
|
||||
|
||||
if req.Notification.Value != "" {
|
||||
func() {
|
||||
@@ -1180,36 +1181,53 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if group.Status == constant.GroupStatusDismissed {
|
||||
return nil, servererrs.ErrDismissedAlready.Wrap()
|
||||
}
|
||||
|
||||
if req.OldOwnerUserID == req.NewOwnerUserID {
|
||||
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID == NewOwnerUserID")
|
||||
}
|
||||
|
||||
members, err := g.db.FindGroupMembers(ctx, req.GroupID, []string{req.OldOwnerUserID, req.NewOwnerUserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := g.PopulateGroupMember(ctx, members...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
memberMap := datautil.SliceToMap(members, func(e *model.GroupMember) string { return e.UserID })
|
||||
if ids := datautil.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, datautil.Keys(memberMap)); len(ids) > 0 {
|
||||
return nil, errs.ErrArgs.WrapMsg("user not in group " + strings.Join(ids, ","))
|
||||
}
|
||||
|
||||
oldOwner := memberMap[req.OldOwnerUserID]
|
||||
if oldOwner == nil {
|
||||
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID not in group " + req.NewOwnerUserID)
|
||||
}
|
||||
|
||||
newOwner := memberMap[req.NewOwnerUserID]
|
||||
if newOwner == nil {
|
||||
return nil, errs.ErrArgs.WrapMsg("NewOwnerUser not in group " + req.NewOwnerUserID)
|
||||
}
|
||||
|
||||
if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
|
||||
if !(mcontext.GetOpUserID(ctx) == oldOwner.UserID && oldOwner.RoleLevel == constant.GroupOwner) {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("no permission transfer group owner")
|
||||
}
|
||||
}
|
||||
|
||||
if newOwner.MuteEndTime.After(time.Now()) {
|
||||
if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{
|
||||
GroupID: group.GroupID,
|
||||
UserID: req.NewOwnerUserID}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := g.db.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1217,6 +1235,7 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
|
||||
g.webhookAfterTransferGroupOwner(ctx, &g.config.WebhooksConfig.AfterTransferGroupOwner, req)
|
||||
|
||||
g.notification.GroupOwnerTransferredNotification(ctx, req)
|
||||
|
||||
return &pbgroup.TransferGroupOwnerResp{}, nil
|
||||
}
|
||||
|
||||
@@ -1425,32 +1444,38 @@ func (g *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbgroup.Ca
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := g.PopulateGroupMember(ctx, member); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
|
||||
opMember, err := g.db.TakeGroupMember(ctx, req.GroupID, mcontext.GetOpUserID(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch member.RoleLevel {
|
||||
case constant.GroupOwner:
|
||||
return nil, errs.ErrNoPermission.WrapMsg("set group owner mute")
|
||||
return nil, errs.ErrNoPermission.WrapMsg("Can not set group owner unmute")
|
||||
case constant.GroupAdmin:
|
||||
if opMember.RoleLevel != constant.GroupOwner {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("set group admin mute")
|
||||
return nil, errs.ErrNoPermission.WrapMsg("Can not set group admin unmute")
|
||||
}
|
||||
case constant.GroupOrdinaryUsers:
|
||||
if !(opMember.RoleLevel == constant.GroupAdmin || opMember.RoleLevel == constant.GroupOwner) {
|
||||
return nil, errs.ErrNoPermission.WrapMsg("set group ordinary users mute")
|
||||
return nil, errs.ErrNoPermission.WrapMsg("Can not set group ordinary users unmute")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data := UpdateGroupMemberMutedTimeMap(time.Unix(0, 0))
|
||||
if err := g.db.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
g.notification.GroupMemberCancelMutedNotification(ctx, req.GroupID, req.UserID)
|
||||
|
||||
return &pbgroup.CancelMuteGroupMemberResp{}, nil
|
||||
}
|
||||
|
||||
@@ -1621,7 +1646,7 @@ func (g *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
|
||||
g.notification.GroupMemberSetToOrdinaryUserNotification(ctx, member.GroupID, member.UserID)
|
||||
}
|
||||
}
|
||||
if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil || member.RoleLevel != nil {
|
||||
if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil {
|
||||
g.notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
"github.com/openimsdk/tools/log"
|
||||
"github.com/openimsdk/tools/mcontext"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
"github.com/openimsdk/tools/utils/stringutil"
|
||||
)
|
||||
|
||||
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
||||
@@ -80,13 +79,17 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq)
|
||||
|
||||
func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) {
|
||||
log.ZDebug(nctx, "setConversationAtInfo", "msg", msg)
|
||||
|
||||
ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx))
|
||||
|
||||
var atUserID []string
|
||||
|
||||
conversation := &pbconversation.ConversationReq{
|
||||
ConversationID: msgprocessor.GetConversationIDByMsg(msg),
|
||||
ConversationType: msg.SessionType,
|
||||
GroupID: msg.GroupID,
|
||||
}
|
||||
|
||||
tagAll := datautil.Contain(constant.AtAllString, msg.AtUserIDList...)
|
||||
if tagAll {
|
||||
memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID)
|
||||
@@ -94,25 +97,35 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
|
||||
log.ZWarn(ctx, "GetGroupMemberIDs", err)
|
||||
return
|
||||
}
|
||||
atUserID = stringutil.DifferenceString([]string{constant.AtAllString}, msg.AtUserIDList)
|
||||
|
||||
memberUserIDList = datautil.DeleteElems(memberUserIDList, msg.SendID)
|
||||
|
||||
atUserID = datautil.Single([]string{constant.AtAllString}, msg.AtUserIDList)
|
||||
|
||||
if len(atUserID) == 0 { // just @everyone
|
||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
|
||||
} else { // @Everyone and @other people
|
||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe}
|
||||
|
||||
err = m.Conversation.SetConversations(ctx, atUserID, conversation)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation)
|
||||
}
|
||||
memberUserIDList = stringutil.DifferenceString(atUserID, memberUserIDList)
|
||||
|
||||
memberUserIDList = datautil.Single(atUserID, memberUserIDList)
|
||||
}
|
||||
|
||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
|
||||
|
||||
err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe}
|
||||
|
||||
err := m.Conversation.SetConversations(ctx, msg.AtUserIDList, conversation)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation)
|
||||
|
||||
@@ -273,14 +273,7 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFri
|
||||
return &relation.SetFriendRemarkResp{}, nil
|
||||
}
|
||||
|
||||
func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFriendInfoReq) (*relation.GetFriendInfoResp, error) {
|
||||
friends, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &relation.GetFriendInfoResp{FriendInfos: convert.FriendOnlyDB2PbOnly(friends)}, nil
|
||||
}
|
||||
|
||||
// ok.
|
||||
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) {
|
||||
resp = &relation.GetDesignatedFriendsResp{}
|
||||
if datautil.Duplicate(req.FriendUserIDs) {
|
||||
|
||||
@@ -116,18 +116,17 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
|
||||
|
||||
func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) {
|
||||
resp = &pbuser.GetDesignateUsersResp{}
|
||||
users, err := s.db.FindWithError(ctx, req.UserIDs)
|
||||
users, err := s.db.Find(ctx, req.UserIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.UsersInfo = convert.UsersDB2Pb(users)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// deprecated:
|
||||
|
||||
//UpdateUserInfo
|
||||
|
||||
// UpdateUserInfo
|
||||
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
|
||||
resp = &pbuser.UpdateUserInfoResp{}
|
||||
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"github.com/openimsdk/protocol/relation"
|
||||
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"github.com/openimsdk/tools/utils/datautil"
|
||||
@@ -36,7 +35,9 @@ func FriendPb2DB(friend *sdkws.FriendInfo) *model.Friend {
|
||||
return dbFriend
|
||||
}
|
||||
|
||||
func FriendDB2Pb(ctx context.Context, friendDB *model.Friend, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) (*sdkws.FriendInfo, error) {
|
||||
func FriendDB2Pb(ctx context.Context, friendDB *model.Friend,
|
||||
getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
|
||||
) (*sdkws.FriendInfo, error) {
|
||||
users, err := getUsers(ctx, []string{friendDB.FriendUserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -52,7 +53,11 @@ func FriendDB2Pb(ctx context.Context, friendDB *model.Friend, getUsers func(ctx
|
||||
}, nil
|
||||
}
|
||||
|
||||
func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) (friendsPb []*sdkws.FriendInfo, err error) {
|
||||
func FriendsDB2Pb(
|
||||
ctx context.Context,
|
||||
friendsDB []*model.Friend,
|
||||
getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
|
||||
) (friendsPb []*sdkws.FriendInfo, err error) {
|
||||
if len(friendsDB) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
@@ -81,21 +86,7 @@ func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func(
|
||||
friendsPb = append(friendsPb, friendPb)
|
||||
}
|
||||
return friendsPb, nil
|
||||
}
|
||||
|
||||
func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly {
|
||||
return datautil.Slice(friendsDB, func(f *model.Friend) *relation.FriendInfoOnly {
|
||||
return &relation.FriendInfoOnly{
|
||||
OwnerUserID: f.OwnerUserID,
|
||||
FriendUserID: f.FriendUserID,
|
||||
Remark: f.Remark,
|
||||
CreateTime: f.CreateTime.UnixMilli(),
|
||||
AddSource: f.AddSource,
|
||||
OperatorUserID: f.OperatorUserID,
|
||||
Ex: f.Ex,
|
||||
IsPinned: f.IsPinned,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func FriendRequestDB2Pb(ctx context.Context, friendRequests []*model.FriendRequest, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) ([]*sdkws.FriendRequest, error) {
|
||||
|
||||
@@ -16,9 +16,10 @@ package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||
"time"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||
@@ -194,7 +195,7 @@ func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Con
|
||||
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||
cache := c.cache.CloneConversationCache()
|
||||
for _, conversation := range conversations {
|
||||
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID)
|
||||
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID, conversation.UserID)
|
||||
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
|
||||
ownerUserID := v[0]
|
||||
userID := v[1]
|
||||
|
||||
@@ -24,8 +24,11 @@ type MsgTransferDatabase interface {
|
||||
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||||
|
||||
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
||||
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error)
|
||||
|
||||
SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
|
||||
|
||||
SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
|
||||
|
||||
// to mq
|
||||
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||
@@ -219,18 +222,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv
|
||||
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) {
|
||||
lenList := len(msgs)
|
||||
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
|
||||
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||||
return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||||
}
|
||||
if lenList < 1 {
|
||||
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||||
return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||||
}
|
||||
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
|
||||
if err != nil {
|
||||
log.ZError(ctx, "storage.seq.Malloc", err)
|
||||
return 0, false, err
|
||||
return 0, false, nil, err
|
||||
}
|
||||
isNew = currentMaxSeq == 0
|
||||
lastMaxSeq := currentMaxSeq
|
||||
@@ -248,25 +251,25 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
|
||||
} else {
|
||||
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||||
}
|
||||
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
return lastMaxSeq, isNew, errs.Wrap(err)
|
||||
return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
for userID, seq := range userSeqMap {
|
||||
if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil {
|
||||
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
||||
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
|
||||
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||
for userID, seq := range userSeqMap {
|
||||
if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||
|
||||
@@ -16,6 +16,7 @@ package rpccache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||
@@ -97,6 +98,7 @@ func (u *UserLocalCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]
|
||||
user, err := u.GetUserInfo(ctx, userID)
|
||||
if err != nil {
|
||||
if errs.ErrRecordNotFound.Is(err) {
|
||||
log.ZWarn(ctx, "User info notFound", err, "userID", userID)
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user