mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-05-03 16:45:59 +08:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b04ab20455 | |||
| b5ef71f5c2 | |||
| 43919bc5fe | |||
| 0d03b46ac8 | |||
| a84f7bd217 | |||
| 9e8a389698 | |||
| a2110e416a | |||
| 0b612c13c6 | |||
| e7c7bf3bd1 | |||
| 3167f9943f | |||
| 598750e8c7 | |||
| 87f79d3cee | |||
| 7f44319b9b | |||
| 758606f627 | |||
| 71f328ef94 | |||
| 9b94063d60 | |||
| 165eecf037 | |||
| 6890da44c9 |
@@ -1,65 +0,0 @@
|
|||||||
name: Cleanup After Milestone PRs Merged
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
types:
|
|
||||||
- closed
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
handle_pr:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v4.2.0
|
|
||||||
|
|
||||||
- name: Get the PR title and extract PR numbers
|
|
||||||
id: extract_pr_numbers
|
|
||||||
run: |
|
|
||||||
# Get the PR title
|
|
||||||
PR_TITLE="${{ github.event.pull_request.title }}"
|
|
||||||
|
|
||||||
echo "PR Title: $PR_TITLE"
|
|
||||||
|
|
||||||
# Extract PR numbers from the title
|
|
||||||
PR_NUMBERS=$(echo "$PR_TITLE" | grep -oE "#[0-9]+" | tr -d '#' | tr '\n' ' ')
|
|
||||||
echo "Extracted PR Numbers: $PR_NUMBERS"
|
|
||||||
|
|
||||||
# Save PR numbers to a file
|
|
||||||
echo "$PR_NUMBERS" > pr_numbers.txt
|
|
||||||
echo "Saved PR Numbers to pr_numbers.txt"
|
|
||||||
|
|
||||||
# Check if the title matches a specific pattern
|
|
||||||
if echo "$PR_TITLE" | grep -qE "^deps: Merge( #[0-9]+)+ PRs into .+"; then
|
|
||||||
echo "proceed=true" >> $GITHUB_OUTPUT
|
|
||||||
else
|
|
||||||
echo "proceed=false" >> $GITHUB_OUTPUT
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: Use extracted PR numbers and label PRs
|
|
||||||
if: (steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')) && github.event.pull_request.merged == true
|
|
||||||
run: |
|
|
||||||
# Read the previously saved PR numbers
|
|
||||||
PR_NUMBERS=$(cat pr_numbers.txt)
|
|
||||||
echo "Using extracted PR Numbers: $PR_NUMBERS"
|
|
||||||
|
|
||||||
# Loop through each PR number and add label
|
|
||||||
for PR_NUMBER in $PR_NUMBERS; do
|
|
||||||
echo "Adding 'cherry-picked' label to PR #$PR_NUMBER"
|
|
||||||
curl -X POST \
|
|
||||||
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
https://api.github.com/repos/${{ github.repository }}/issues/$PR_NUMBER/labels \
|
|
||||||
-d '{"labels":["cherry-picked"]}'
|
|
||||||
done
|
|
||||||
env:
|
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
|
|
||||||
- name: Delete branch after PR close
|
|
||||||
if: steps.extract_pr_numbers.outputs.proceed == 'true' || contains(github.event.pull_request.labels.*.name, 'milestone-merge')
|
|
||||||
run: |
|
|
||||||
BRANCH_NAME="${{ github.event.pull_request.head.ref }}"
|
|
||||||
echo "Branch to delete: $BRANCH_NAME"
|
|
||||||
git push origin --delete "$BRANCH_NAME"
|
|
||||||
env:
|
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
@@ -2,7 +2,11 @@ name: Go Build Test
|
|||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
pull_request:
|
pull_request:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
paths-ignore:
|
paths-ignore:
|
||||||
- '**/*.md'
|
- '**/*.md'
|
||||||
|
|
||||||
|
|||||||
@@ -1,218 +0,0 @@
|
|||||||
name: Create Pre-Release PR from Milestone
|
|
||||||
|
|
||||||
permissions:
|
|
||||||
contents: write
|
|
||||||
pull-requests: write
|
|
||||||
issues: write
|
|
||||||
|
|
||||||
on:
|
|
||||||
workflow_dispatch:
|
|
||||||
inputs:
|
|
||||||
milestone_name:
|
|
||||||
description: 'Milestone name to collect closed PRs from'
|
|
||||||
required: true
|
|
||||||
default: 'v3.8.2'
|
|
||||||
target_branch:
|
|
||||||
description: 'Target branch to merge the consolidated PR'
|
|
||||||
required: true
|
|
||||||
default: 'pre-release-v3.8.2'
|
|
||||||
|
|
||||||
env:
|
|
||||||
MILESTONE_NAME: ${{ github.event.inputs.milestone_name || 'v3.8.2' }}
|
|
||||||
TARGET_BRANCH: ${{ github.event.inputs.target_branch || 'pre-release-v3.8.2' }}
|
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
|
|
||||||
LABEL_NAME: cherry-picked
|
|
||||||
TEMP_DIR: /tmp # Using /tmp as the temporary directory
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
cherry_pick_milestone_prs:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- name: Setup temp directory
|
|
||||||
run: |
|
|
||||||
# Create the temporary directory and initialize necessary files
|
|
||||||
mkdir -p ${{ env.TEMP_DIR }}
|
|
||||||
touch ${{ env.TEMP_DIR }}/pr_numbers.txt
|
|
||||||
touch ${{ env.TEMP_DIR }}/commit_hashes.txt
|
|
||||||
touch ${{ env.TEMP_DIR }}/pr_title.txt
|
|
||||||
touch ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
touch ${{ env.TEMP_DIR }}/created_pr_number.txt
|
|
||||||
|
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
with:
|
|
||||||
fetch-depth: 0
|
|
||||||
token: ${{ secrets.BOT_TOKEN }}
|
|
||||||
|
|
||||||
- name: Setup Git User for OpenIM-Robot
|
|
||||||
run: |
|
|
||||||
# Set up Git credentials for the bot
|
|
||||||
git config --global user.email "OpenIM-Robot@users.noreply.github.com"
|
|
||||||
git config --global user.name "OpenIM-Robot"
|
|
||||||
|
|
||||||
- name: Fetch Milestone ID and Filter PR Numbers
|
|
||||||
env:
|
|
||||||
MILESTONE_NAME: ${{ env.MILESTONE_NAME }}
|
|
||||||
run: |
|
|
||||||
# Fetch milestone details and extract milestone ID
|
|
||||||
milestones=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
"https://api.github.com/repos/${{ github.repository }}/milestones")
|
|
||||||
milestone_id=$(echo "$milestones" | grep -B3 "\"title\": \"$MILESTONE_NAME\"" | grep '"number":' | head -n1 | grep -o '[0-9]\+')
|
|
||||||
if [ -z "$milestone_id" ]; then
|
|
||||||
echo "Milestone '$MILESTONE_NAME' not found. Exiting."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "Milestone ID: $milestone_id"
|
|
||||||
echo "MILESTONE_ID=$milestone_id" >> $GITHUB_ENV
|
|
||||||
|
|
||||||
# Fetch issues for the milestone
|
|
||||||
issues=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
"https://api.github.com/repos/${{ github.repository }}/issues?milestone=$milestone_id&state=closed&per_page=100")
|
|
||||||
|
|
||||||
> ${{ env.TEMP_DIR }}/pr_numbers.txt
|
|
||||||
|
|
||||||
# Filter PRs that do not have the 'cherry-picked' label
|
|
||||||
for pr_number in $(echo "$issues" | jq -r '.[] | select(.pull_request != null) | .number'); do
|
|
||||||
labels=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
"https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels" | jq -r '.[].name')
|
|
||||||
|
|
||||||
if ! echo "$labels" | grep -q "${LABEL_NAME}"; then
|
|
||||||
echo "PR #$pr_number does not have the 'cherry-picked' label. Adding to the list."
|
|
||||||
echo "$pr_number" >> ${{ env.TEMP_DIR }}/pr_numbers.txt
|
|
||||||
else
|
|
||||||
echo "PR #$pr_number already has the 'cherry-picked' label. Skipping."
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
# Sort the filtered PR numbers
|
|
||||||
sort -n ${{ env.TEMP_DIR }}/pr_numbers.txt -o ${{ env.TEMP_DIR }}/pr_numbers.txt
|
|
||||||
|
|
||||||
echo "Filtered and sorted PR numbers:"
|
|
||||||
cat ${{ env.TEMP_DIR }}/pr_numbers.txt || echo "No closed PR numbers found for milestone."
|
|
||||||
|
|
||||||
- name: Fetch Merge Commits for PRs and Generate Title and Body
|
|
||||||
run: |
|
|
||||||
# Ensure the files are initialized
|
|
||||||
> ${{ env.TEMP_DIR }}/commit_hashes.txt
|
|
||||||
> ${{ env.TEMP_DIR }}/pr_title.txt
|
|
||||||
> ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
|
|
||||||
# Write description to the PR body
|
|
||||||
echo "### Description:" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
echo "Merging PRs from milestone \`$MILESTONE_NAME\` into target branch \`$TARGET_BRANCH\`." >> ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
echo "" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
echo "### Need Merge PRs:" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
|
|
||||||
pr_numbers_in_title=""
|
|
||||||
|
|
||||||
# Process sorted PR numbers and generate commit hashes
|
|
||||||
for pr_number in $(cat ${{ env.TEMP_DIR }}/pr_numbers.txt); do
|
|
||||||
echo "Processing PR #$pr_number"
|
|
||||||
pr_details=$(curl -s -H "Authorization: token $BOT_TOKEN" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
"https://api.github.com/repos/${{ github.repository }}/pulls/$pr_number")
|
|
||||||
pr_title=$(echo "$pr_details" | jq -r '.title')
|
|
||||||
merge_commit=$(echo "$pr_details" | jq -r '.merge_commit_sha')
|
|
||||||
short_commit_hash=$(echo "$merge_commit" | cut -c 1-7)
|
|
||||||
|
|
||||||
# Append PR details to the body
|
|
||||||
echo "- $pr_title: (#$pr_number) ($short_commit_hash)" >> ${{ env.TEMP_DIR }}/pr_body.txt
|
|
||||||
|
|
||||||
if [ "$merge_commit" != "null" ];then
|
|
||||||
echo "$merge_commit" >> ${{ env.TEMP_DIR }}/commit_hashes.txt
|
|
||||||
echo "#$pr_number" >> ${{ env.TEMP_DIR }}/pr_title.txt
|
|
||||||
pr_numbers_in_title="$pr_numbers_in_title #$pr_number"
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
commit_hashes=$(cat ${{ env.TEMP_DIR }}/commit_hashes.txt | tr '\n' ' ')
|
|
||||||
first_commit_hash=$(head -n 1 ${{ env.TEMP_DIR }}/commit_hashes.txt)
|
|
||||||
cherry_pick_branch="cherry-pick-${first_commit_hash:0:7}"
|
|
||||||
echo "COMMIT_HASHES=$commit_hashes" >> $GITHUB_ENV
|
|
||||||
echo "CHERRY_PICK_BRANCH=$cherry_pick_branch" >> $GITHUB_ENV
|
|
||||||
echo "pr_numbers_in_title=$pr_numbers_in_title" >> $GITHUB_ENV
|
|
||||||
|
|
||||||
- name: Pull and Cherry-pick Commits, Then Push
|
|
||||||
env:
|
|
||||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
BOT_TOKEN: ${{ secrets.BOT_TOKEN }}
|
|
||||||
run: |
|
|
||||||
# Fetch and pull the latest changes from the target branch
|
|
||||||
git fetch origin
|
|
||||||
git checkout $TARGET_BRANCH
|
|
||||||
git pull origin $TARGET_BRANCH
|
|
||||||
|
|
||||||
# Create a new branch for cherry-picking
|
|
||||||
git checkout -b $CHERRY_PICK_BRANCH
|
|
||||||
|
|
||||||
# Cherry-pick the commits and handle conflicts
|
|
||||||
for commit_hash in $COMMIT_HASHES; do
|
|
||||||
echo "Attempting to cherry-pick commit $commit_hash"
|
|
||||||
if ! git cherry-pick "$commit_hash" --strategy=recursive -X theirs; then
|
|
||||||
echo "Conflict detected for $commit_hash. Resolving with incoming changes."
|
|
||||||
conflict_files=$(git diff --name-only --diff-filter=U)
|
|
||||||
echo "Conflicting files:"
|
|
||||||
echo "$conflict_files"
|
|
||||||
|
|
||||||
for file in $conflict_files; do
|
|
||||||
if [ -f "$file" ]; then
|
|
||||||
echo "Resolving conflict for $file"
|
|
||||||
git add "$file"
|
|
||||||
else
|
|
||||||
echo "File $file has been deleted. Skipping."
|
|
||||||
git rm "$file"
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
echo "Conflicts resolved. Continuing cherry-pick."
|
|
||||||
git cherry-pick --continue
|
|
||||||
else
|
|
||||||
echo "Cherry-pick successful for commit $commit_hash."
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
|
|
||||||
# Push the cherry-pick branch to the repository
|
|
||||||
git remote set-url origin "https://${BOT_TOKEN}@github.com/${{ github.repository }}.git"
|
|
||||||
git push origin $CHERRY_PICK_BRANCH --force
|
|
||||||
|
|
||||||
- name: Create Pull Request
|
|
||||||
run: |
|
|
||||||
# Prepare and create the PR
|
|
||||||
pr_title="deps: Merge ${{ env.pr_numbers_in_title }} PRs into $TARGET_BRANCH"
|
|
||||||
pr_body=$(cat ${{ env.TEMP_DIR }}/pr_body.txt)
|
|
||||||
|
|
||||||
echo "Prepared PR title:"
|
|
||||||
echo "$pr_title"
|
|
||||||
echo "Prepared PR body:"
|
|
||||||
echo "$pr_body"
|
|
||||||
|
|
||||||
# Create the PR using the GitHub API
|
|
||||||
response=$(curl -s -X POST -H "Authorization: token $BOT_TOKEN" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
https://api.github.com/repos/${{ github.repository }}/pulls \
|
|
||||||
-d "$(jq -n --arg title "$pr_title" \
|
|
||||||
--arg head "$CHERRY_PICK_BRANCH" \
|
|
||||||
--arg base "$TARGET_BRANCH" \
|
|
||||||
--arg body "$pr_body" \
|
|
||||||
'{title: $title, head: $head, base: $base, body: $body}')")
|
|
||||||
|
|
||||||
pr_number=$(echo "$response" | jq -r '.number')
|
|
||||||
echo "$pr_number" > ${{ env.TEMP_DIR }}/created_pr_number.txt
|
|
||||||
echo "Created PR #$pr_number"
|
|
||||||
|
|
||||||
- name: Add Label to Created Pull Request
|
|
||||||
run: |
|
|
||||||
# Add 'milestone-merge' label to the created PR
|
|
||||||
pr_number=$(cat ${{ env.TEMP_DIR }}/created_pr_number.txt)
|
|
||||||
echo "Adding label to PR #$pr_number"
|
|
||||||
|
|
||||||
curl -s -X POST -H "Authorization: token $GITHUB_TOKEN" \
|
|
||||||
-H "Accept: application/vnd.github+json" \
|
|
||||||
-d '{"labels": ["milestone-merge"]}' \
|
|
||||||
"https://api.github.com/repos/${{ github.repository }}/issues/$pr_number/labels"
|
|
||||||
|
|
||||||
echo "Added 'milestone-merge' label to PR #$pr_number."
|
|
||||||
@@ -0,0 +1,62 @@
|
|||||||
|
# Version logging for OpenIM
|
||||||
|
|
||||||
|
<!-- BEGIN MUNGE: GENERATED_TOC -->
|
||||||
|
|
||||||
|
<!-- END MUNGE: GENERATED_TOC -->
|
||||||
|
|
||||||
|
{{ if .Versions -}}
|
||||||
|
<a name="unreleased"></a>
|
||||||
|
## [Unreleased]
|
||||||
|
|
||||||
|
{{ if .Unreleased.CommitGroups -}}
|
||||||
|
{{ range .Unreleased.CommitGroups -}}
|
||||||
|
### {{ .Title }}
|
||||||
|
{{ range .Commits -}}
|
||||||
|
- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }}
|
||||||
|
{{ end }}
|
||||||
|
{{ end -}}
|
||||||
|
{{ end -}}
|
||||||
|
{{ end -}}
|
||||||
|
|
||||||
|
{{ range .Versions }}
|
||||||
|
<a name="{{ .Tag.Name }}"></a>
|
||||||
|
## {{ if .Tag.Previous }}[{{ .Tag.Name }}]{{ else }}{{ .Tag.Name }}{{ end }} - {{ datetime "2006-01-02" .Tag.Date }}
|
||||||
|
{{ range .CommitGroups -}}
|
||||||
|
### {{ .Title }}
|
||||||
|
{{ range .Commits -}}
|
||||||
|
- {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }}
|
||||||
|
{{ end }}
|
||||||
|
{{ end -}}
|
||||||
|
|
||||||
|
{{- if .RevertCommits -}}
|
||||||
|
### Reverts
|
||||||
|
{{ range .RevertCommits -}}
|
||||||
|
- {{ .Revert.Header }}
|
||||||
|
{{ end }}
|
||||||
|
{{ end -}}
|
||||||
|
|
||||||
|
{{- if .MergeCommits -}}
|
||||||
|
### Pull Requests
|
||||||
|
{{ range .MergeCommits -}}
|
||||||
|
- {{ .Header }}
|
||||||
|
{{ end }}
|
||||||
|
{{ end -}}
|
||||||
|
|
||||||
|
{{- if .NoteGroups -}}
|
||||||
|
{{ range .NoteGroups -}}
|
||||||
|
### {{ .Title }}
|
||||||
|
{{ range .Notes }}
|
||||||
|
{{ .Body }}
|
||||||
|
{{ end }}
|
||||||
|
{{ end -}}
|
||||||
|
{{ end -}}
|
||||||
|
{{ end -}}
|
||||||
|
|
||||||
|
{{- if .Versions }}
|
||||||
|
[Unreleased]: {{ .Info.RepositoryURL }}/compare/{{ $latest := index .Versions 0 }}{{ $latest.Tag.Name }}...HEAD
|
||||||
|
{{ range .Versions -}}
|
||||||
|
{{ if .Tag.Previous -}}
|
||||||
|
[{{ .Tag.Name }}]: {{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }}
|
||||||
|
{{ end -}}
|
||||||
|
{{ end -}}
|
||||||
|
{{ end -}}
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
bin: git
|
||||||
|
style: github
|
||||||
|
template: CHANGELOG.tpl.md
|
||||||
|
info:
|
||||||
|
title: CHANGELOG
|
||||||
|
repository_url: https://github.com/openimsdk/open-im-server
|
||||||
|
options:
|
||||||
|
tag_filter_pattern: '^v'
|
||||||
|
sort: "date"
|
||||||
|
|
||||||
|
commits:
|
||||||
|
filters:
|
||||||
|
Type:
|
||||||
|
- feat
|
||||||
|
- fix
|
||||||
|
- perf
|
||||||
|
- refactor
|
||||||
|
- docs
|
||||||
|
- test
|
||||||
|
- chore
|
||||||
|
- ci
|
||||||
|
- build
|
||||||
|
sort_by: Scope
|
||||||
|
|
||||||
|
commit_groups:
|
||||||
|
group_by: Type
|
||||||
|
sort_by: Title
|
||||||
|
title_order:
|
||||||
|
- feat
|
||||||
|
- fix
|
||||||
|
- perf
|
||||||
|
- refactor
|
||||||
|
- docs
|
||||||
|
- test
|
||||||
|
- chore
|
||||||
|
- ci
|
||||||
|
- build
|
||||||
|
title_maps:
|
||||||
|
feat: Features
|
||||||
|
|
||||||
|
header:
|
||||||
|
pattern: "<regexp>"
|
||||||
|
pattern_maps:
|
||||||
|
- PropName
|
||||||
|
|
||||||
|
issues:
|
||||||
|
prefix:
|
||||||
|
- #
|
||||||
|
|
||||||
|
refs:
|
||||||
|
actions:
|
||||||
|
- Closes
|
||||||
|
- Fixes
|
||||||
|
|
||||||
|
merges:
|
||||||
|
pattern: "^Merge branch '(\\w+)'$"
|
||||||
|
pattern_maps:
|
||||||
|
- Source
|
||||||
|
|
||||||
|
reverts:
|
||||||
|
pattern: "^Revert \"([\\s\\S]*)\"$"
|
||||||
|
pattern_maps:
|
||||||
|
- Header
|
||||||
|
|
||||||
|
notes:
|
||||||
|
keywords:
|
||||||
|
- BREAKING CHANGE
|
||||||
@@ -53,8 +53,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-cmdutils
|
- binary: openim-cmdutils
|
||||||
id: openim-cmdutils
|
id: openim-cmdutils
|
||||||
@@ -64,8 +71,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-crontask
|
- binary: openim-crontask
|
||||||
id: openim-crontask
|
id: openim-crontask
|
||||||
@@ -75,8 +89,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-msggateway
|
- binary: openim-msggateway
|
||||||
id: openim-msggateway
|
id: openim-msggateway
|
||||||
@@ -86,8 +107,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-msgtransfer
|
- binary: openim-msgtransfer
|
||||||
id: openim-msgtransfer
|
id: openim-msgtransfer
|
||||||
@@ -97,8 +125,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-push
|
- binary: openim-push
|
||||||
id: openim-push
|
id: openim-push
|
||||||
@@ -108,8 +143,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-auth
|
- binary: openim-rpc-auth
|
||||||
id: openim-rpc-auth
|
id: openim-rpc-auth
|
||||||
@@ -119,8 +161,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-conversation
|
- binary: openim-rpc-conversation
|
||||||
id: openim-rpc-conversation
|
id: openim-rpc-conversation
|
||||||
@@ -130,8 +179,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-friend
|
- binary: openim-rpc-friend
|
||||||
id: openim-rpc-friend
|
id: openim-rpc-friend
|
||||||
@@ -141,8 +197,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-group
|
- binary: openim-rpc-group
|
||||||
id: openim-rpc-group
|
id: openim-rpc-group
|
||||||
@@ -152,8 +215,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-msg
|
- binary: openim-rpc-msg
|
||||||
id: openim-rpc-msg
|
id: openim-rpc-msg
|
||||||
@@ -163,8 +233,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-third
|
- binary: openim-rpc-third
|
||||||
id: openim-rpc-third
|
id: openim-rpc-third
|
||||||
@@ -174,8 +251,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
- binary: openim-rpc-user
|
- binary: openim-rpc-user
|
||||||
id: openim-rpc-user
|
id: openim-rpc-user
|
||||||
@@ -185,8 +269,15 @@ builds:
|
|||||||
- windows
|
- windows
|
||||||
- linux
|
- linux
|
||||||
goarch:
|
goarch:
|
||||||
|
- s390x
|
||||||
|
- mips64
|
||||||
|
- mips64le
|
||||||
- amd64
|
- amd64
|
||||||
|
- ppc64le
|
||||||
- arm64
|
- arm64
|
||||||
|
goarm:
|
||||||
|
- "6"
|
||||||
|
- "7"
|
||||||
|
|
||||||
|
|
||||||
# TODO:Need a script, such as the init - release to help binary to find the right directory
|
# TODO:Need a script, such as the init - release to help binary to find the right directory
|
||||||
|
|||||||
@@ -15,10 +15,9 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
_ "net/http/pprof"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
|
||||||
"github.com/openimsdk/tools/system/program"
|
"github.com/openimsdk/tools/system/program"
|
||||||
|
_ "net/http/pprof"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
+20
-3
@@ -1,3 +1,20 @@
|
|||||||
|
# Copyright © 2023 OpenIM. All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# Determines if a message should be sent. If set to false, it triggers a silent sync without a message. If true, it requires triggering a conversation.
|
||||||
|
# For rpc notification, send twice: once as a message and once as a notification.
|
||||||
|
# The options field 'isNotification' indicates if it's a notification.
|
||||||
groupCreated:
|
groupCreated:
|
||||||
isSendMsg: true
|
isSendMsg: true
|
||||||
# Reliability level of the message sending.
|
# Reliability level of the message sending.
|
||||||
@@ -292,9 +309,9 @@ userInfoUpdated:
|
|||||||
unreadCount: false
|
unreadCount: false
|
||||||
offlinePush:
|
offlinePush:
|
||||||
enable: true
|
enable: true
|
||||||
title: userInfo updated
|
title: Remove a blocked user
|
||||||
desc: userInfo updated
|
desc: Remove a blocked user
|
||||||
ext: userInfo updated
|
ext: Remove a blocked user
|
||||||
|
|
||||||
userStatusChanged:
|
userStatusChanged:
|
||||||
isSendMsg: false
|
isSendMsg: false
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ require (
|
|||||||
github.com/gorilla/websocket v1.5.1
|
github.com/gorilla/websocket v1.5.1
|
||||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
|
||||||
github.com/mitchellh/mapstructure v1.5.0
|
github.com/mitchellh/mapstructure v1.5.0
|
||||||
github.com/openimsdk/protocol v0.0.72-pre-release-v3.8.2-alpha.5
|
github.com/openimsdk/protocol v0.0.72-alpha.46
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.16
|
github.com/openimsdk/tools v0.0.50-alpha.16
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
github.com/prometheus/client_golang v1.18.0
|
github.com/prometheus/client_golang v1.18.0
|
||||||
|
|||||||
@@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
|
|||||||
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
|
||||||
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
|
||||||
github.com/openimsdk/protocol v0.0.72-pre-release-v3.8.2-alpha.5 h1:b0JAuBhzIYirHeXp7asB04bE1q+KhU3dpAaAroc/Am0=
|
github.com/openimsdk/protocol v0.0.72-alpha.46 h1:1LZlfEHLzw1F4afFmqBczmXKJWm5rUQ+yr8rJ4oyEAc=
|
||||||
github.com/openimsdk/protocol v0.0.72-pre-release-v3.8.2-alpha.5/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
github.com/openimsdk/protocol v0.0.72-alpha.46/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc=
|
github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc=
|
||||||
github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||||
|
|||||||
+90
-32
@@ -1,11 +1,15 @@
|
|||||||
package jssdk
|
package jssdk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/openimsdk/protocol/conversation"
|
"github.com/openimsdk/protocol/conversation"
|
||||||
|
"github.com/openimsdk/protocol/group"
|
||||||
|
"github.com/openimsdk/protocol/jssdk"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
|
"github.com/openimsdk/protocol/relation"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/a2r"
|
"github.com/openimsdk/protocol/user"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
"sort"
|
"sort"
|
||||||
@@ -16,16 +20,22 @@ const (
|
|||||||
defaultGetActiveConversation = 100
|
defaultGetActiveConversation = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewJSSdkApi(msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk {
|
func NewJSSdkApi(user user.UserClient, friend relation.FriendClient, group group.GroupClient, msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk {
|
||||||
return &JSSdk{
|
return &JSSdk{
|
||||||
msg: msg,
|
user: user,
|
||||||
conv: conv,
|
friend: friend,
|
||||||
|
group: group,
|
||||||
|
msg: msg,
|
||||||
|
conv: conv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type JSSdk struct {
|
type JSSdk struct {
|
||||||
msg msg.MsgClient
|
user user.UserClient
|
||||||
conv conversation.ConversationClient
|
friend relation.FriendClient
|
||||||
|
group group.GroupClient
|
||||||
|
msg msg.MsgClient
|
||||||
|
conv conversation.ConversationClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *JSSdk) GetActiveConversations(c *gin.Context) {
|
func (x *JSSdk) GetActiveConversations(c *gin.Context) {
|
||||||
@@ -36,25 +46,71 @@ func (x *JSSdk) GetConversations(c *gin.Context) {
|
|||||||
call(c, x.getConversations)
|
call(c, x.getConversations)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
func (x *JSSdk) fillConversations(ctx context.Context, conversations []*jssdk.ConversationMsg) error {
|
||||||
req, err := a2r.ParseRequest[ActiveConversationsReq](ctx)
|
if len(conversations) == 0 {
|
||||||
if err != nil {
|
return nil
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
var (
|
||||||
|
userIDs []string
|
||||||
|
groupIDs []string
|
||||||
|
)
|
||||||
|
for _, c := range conversations {
|
||||||
|
if c.Conversation.GroupID == "" {
|
||||||
|
userIDs = append(userIDs, c.Conversation.UserID)
|
||||||
|
} else {
|
||||||
|
groupIDs = append(groupIDs, c.Conversation.GroupID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
userMap map[string]*sdkws.UserInfo
|
||||||
|
friendMap map[string]*relation.FriendInfoOnly
|
||||||
|
groupMap map[string]*sdkws.GroupInfo
|
||||||
|
)
|
||||||
|
if len(userIDs) > 0 {
|
||||||
|
users, err := field(ctx, x.user.GetDesignateUsers, &user.GetDesignateUsersReq{UserIDs: userIDs}, (*user.GetDesignateUsersResp).GetUsersInfo)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
friends, err := field(ctx, x.friend.GetFriendInfo, &relation.GetFriendInfoReq{OwnerUserID: conversations[0].Conversation.OwnerUserID, FriendUserIDs: userIDs}, (*relation.GetFriendInfoResp).GetFriendInfos)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
userMap = datautil.SliceToMap(users, (*sdkws.UserInfo).GetUserID)
|
||||||
|
friendMap = datautil.SliceToMap(friends, (*relation.FriendInfoOnly).GetFriendUserID)
|
||||||
|
}
|
||||||
|
if len(groupIDs) > 0 {
|
||||||
|
resp, err := x.group.GetGroupsInfo(ctx, &group.GetGroupsInfoReq{GroupIDs: groupIDs})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
groupMap = datautil.SliceToMap(resp.GroupInfos, (*sdkws.GroupInfo).GetGroupID)
|
||||||
|
}
|
||||||
|
for _, c := range conversations {
|
||||||
|
if c.Conversation.GroupID == "" {
|
||||||
|
c.User = userMap[c.Conversation.UserID]
|
||||||
|
c.Friend = friendMap[c.Conversation.UserID]
|
||||||
|
} else {
|
||||||
|
c.Group = groupMap[c.Conversation.GroupID]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActiveConversationsReq) (*jssdk.GetActiveConversationsResp, error) {
|
||||||
if req.Count <= 0 || req.Count > maxGetActiveConversation {
|
if req.Count <= 0 || req.Count > maxGetActiveConversation {
|
||||||
req.Count = defaultGetActiveConversation
|
req.Count = defaultGetActiveConversation
|
||||||
}
|
}
|
||||||
opUserID := mcontext.GetOpUserID(ctx)
|
req.OwnerUserID = mcontext.GetOpUserID(ctx)
|
||||||
conversationIDs, err := field(ctx, x.conv.GetConversationIDs,
|
conversationIDs, err := field(ctx, x.conv.GetConversationIDs,
|
||||||
&conversation.GetConversationIDsReq{UserID: opUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
|
&conversation.GetConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(conversationIDs) == 0 {
|
if len(conversationIDs) == 0 {
|
||||||
return &ConversationsResp{}, nil
|
return &jssdk.GetActiveConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
readSeq, err := field(ctx, x.msg.GetHasReadSeqs,
|
readSeq, err := field(ctx, x.msg.GetHasReadSeqs,
|
||||||
&msg.GetHasReadSeqsReq{UserID: opUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
&msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -64,24 +120,24 @@ func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, er
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(activeConversation) == 0 {
|
if len(activeConversation) == 0 {
|
||||||
return &ConversationsResp{}, nil
|
return &jssdk.GetActiveConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
sortConversations := sortActiveConversations{
|
sortConversations := sortActiveConversations{
|
||||||
Conversation: activeConversation,
|
Conversation: activeConversation,
|
||||||
}
|
}
|
||||||
if len(activeConversation) > 1 {
|
if len(activeConversation) > 1 {
|
||||||
pinnedConversationIDs, err := field(ctx, x.conv.GetPinnedConversationIDs,
|
pinnedConversationIDs, err := field(ctx, x.conv.GetPinnedConversationIDs,
|
||||||
&conversation.GetPinnedConversationIDsReq{UserID: opUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs)
|
&conversation.GetPinnedConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs)
|
sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs)
|
||||||
}
|
}
|
||||||
sort.Sort(&sortConversations)
|
sort.Sort(&sortConversations)
|
||||||
sortList := sortConversations.Top(req.Count)
|
sortList := sortConversations.Top(int(req.Count))
|
||||||
conversations, err := field(ctx, x.conv.GetConversations,
|
conversations, err := field(ctx, x.conv.GetConversations,
|
||||||
&conversation.GetConversationsReq{
|
&conversation.GetConversationsReq{
|
||||||
OwnerUserID: opUserID,
|
OwnerUserID: req.OwnerUserID,
|
||||||
ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string {
|
ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string {
|
||||||
return c.ConversationID
|
return c.ConversationID
|
||||||
})}, (*conversation.GetConversationsResp).GetConversations)
|
})}, (*conversation.GetConversationsResp).GetConversations)
|
||||||
@@ -90,7 +146,7 @@ func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, er
|
|||||||
}
|
}
|
||||||
msgs, err := field(ctx, x.msg.GetSeqMessage,
|
msgs, err := field(ctx, x.msg.GetSeqMessage,
|
||||||
&msg.GetSeqMessageReq{
|
&msg.GetSeqMessageReq{
|
||||||
UserID: opUserID,
|
UserID: req.OwnerUserID,
|
||||||
Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs {
|
Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs {
|
||||||
return &msg.ConversationSeqs{
|
return &msg.ConversationSeqs{
|
||||||
ConversationID: c.ConversationID,
|
ConversationID: c.ConversationID,
|
||||||
@@ -104,7 +160,7 @@ func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, er
|
|||||||
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
||||||
return c.ConversationID
|
return c.ConversationID
|
||||||
})
|
})
|
||||||
resp := make([]ConversationMsg, 0, len(sortList))
|
resp := make([]*jssdk.ConversationMsg, 0, len(sortList))
|
||||||
for _, c := range sortList {
|
for _, c := range sortList {
|
||||||
conv, ok := conversationMap[c.ConversationID]
|
conv, ok := conversationMap[c.ConversationID]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -114,13 +170,16 @@ func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, er
|
|||||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||||
lastMsg = msgList.Msgs[0]
|
lastMsg = msgList.Msgs[0]
|
||||||
}
|
}
|
||||||
resp = append(resp, ConversationMsg{
|
resp = append(resp, &jssdk.ConversationMsg{
|
||||||
Conversation: conv,
|
Conversation: conv,
|
||||||
LastMsg: lastMsg,
|
LastMsg: lastMsg,
|
||||||
MaxSeq: c.MaxSeq,
|
MaxSeq: c.MaxSeq,
|
||||||
ReadSeq: readSeq[c.ConversationID],
|
ReadSeq: readSeq[c.ConversationID],
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
if err := x.fillConversations(ctx, resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
var unreadCount int64
|
var unreadCount int64
|
||||||
for _, c := range activeConversation {
|
for _, c := range activeConversation {
|
||||||
count := c.MaxSeq - readSeq[c.ConversationID]
|
count := c.MaxSeq - readSeq[c.ConversationID]
|
||||||
@@ -128,24 +187,20 @@ func (x *JSSdk) getActiveConversations(ctx *gin.Context) (*ConversationsResp, er
|
|||||||
unreadCount += count
|
unreadCount += count
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &ConversationsResp{
|
return &jssdk.GetActiveConversationsResp{
|
||||||
Conversations: resp,
|
Conversations: resp,
|
||||||
UnreadCount: unreadCount,
|
UnreadCount: unreadCount,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversationsReq) (*jssdk.GetConversationsResp, error) {
|
||||||
req, err := a2r.ParseRequest[conversation.GetConversationsReq](ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.OwnerUserID = mcontext.GetOpUserID(ctx)
|
req.OwnerUserID = mcontext.GetOpUserID(ctx)
|
||||||
conversations, err := field(ctx, x.conv.GetConversations, req, (*conversation.GetConversationsResp).GetConversations)
|
conversations, err := field(ctx, x.conv.GetConversations, &conversation.GetConversationsReq{OwnerUserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*conversation.GetConversationsResp).GetConversations)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if len(conversations) == 0 {
|
if len(conversations) == 0 {
|
||||||
return &ConversationsResp{}, nil
|
return &jssdk.GetConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string {
|
req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string {
|
||||||
return c.ConversationID
|
return c.ConversationID
|
||||||
@@ -177,19 +232,22 @@ func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp := make([]ConversationMsg, 0, len(conversations))
|
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
||||||
for _, c := range conversations {
|
for _, c := range conversations {
|
||||||
var lastMsg *sdkws.MsgData
|
var lastMsg *sdkws.MsgData
|
||||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||||
lastMsg = msgList.Msgs[0]
|
lastMsg = msgList.Msgs[0]
|
||||||
}
|
}
|
||||||
resp = append(resp, ConversationMsg{
|
resp = append(resp, &jssdk.ConversationMsg{
|
||||||
Conversation: c,
|
Conversation: c,
|
||||||
LastMsg: lastMsg,
|
LastMsg: lastMsg,
|
||||||
MaxSeq: maxSeqs[c.ConversationID],
|
MaxSeq: maxSeqs[c.ConversationID],
|
||||||
ReadSeq: readSeqs[c.ConversationID],
|
ReadSeq: readSeqs[c.ConversationID],
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
if err := x.fillConversations(ctx, resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
var unreadCount int64
|
var unreadCount int64
|
||||||
for conversationID, maxSeq := range maxSeqs {
|
for conversationID, maxSeq := range maxSeqs {
|
||||||
count := maxSeq - readSeqs[conversationID]
|
count := maxSeq - readSeqs[conversationID]
|
||||||
@@ -197,7 +255,7 @@ func (x *JSSdk) getConversations(ctx *gin.Context) (*ConversationsResp, error) {
|
|||||||
unreadCount += count
|
unreadCount += count
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &ConversationsResp{
|
return &jssdk.GetConversationsResp{
|
||||||
Conversations: resp,
|
Conversations: resp,
|
||||||
UnreadCount: unreadCount,
|
UnreadCount: unreadCount,
|
||||||
}, nil
|
}, nil
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
package jssdk
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/openimsdk/protocol/conversation"
|
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
|
||||||
)
|
|
||||||
|
|
||||||
type ActiveConversationsReq struct {
|
|
||||||
Count int `json:"count"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ConversationMsg struct {
|
|
||||||
Conversation *conversation.Conversation `json:"conversation"`
|
|
||||||
LastMsg *sdkws.MsgData `json:"lastMsg"`
|
|
||||||
MaxSeq int64 `json:"maxSeq"`
|
|
||||||
ReadSeq int64 `json:"readSeq"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ConversationsResp struct {
|
|
||||||
UnreadCount int64 `json:"unreadCount"`
|
|
||||||
Conversations []ConversationMsg `json:"conversations"`
|
|
||||||
}
|
|
||||||
@@ -3,8 +3,14 @@ package jssdk
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
"github.com/openimsdk/tools/a2r"
|
||||||
"github.com/openimsdk/tools/apiresp"
|
"github.com/openimsdk/tools/apiresp"
|
||||||
|
"github.com/openimsdk/tools/checker"
|
||||||
|
"github.com/openimsdk/tools/errs"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
|
func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
|
||||||
@@ -16,11 +22,56 @@ func field[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A
|
|||||||
return get(resp), nil
|
return get(resp), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func call[R any](c *gin.Context, fn func(ctx *gin.Context) (R, error)) {
|
func call[A, B any](c *gin.Context, fn func(ctx context.Context, req *A) (*B, error)) {
|
||||||
resp, err := fn(c)
|
var isJSON bool
|
||||||
|
switch contentType := c.GetHeader("Content-Type"); {
|
||||||
|
case contentType == "":
|
||||||
|
isJSON = true
|
||||||
|
case strings.Contains(contentType, "application/json"):
|
||||||
|
isJSON = true
|
||||||
|
case strings.Contains(contentType, "application/protobuf"):
|
||||||
|
case strings.Contains(contentType, "application/x-protobuf"):
|
||||||
|
default:
|
||||||
|
apiresp.GinError(c, errs.ErrArgs.WrapMsg("unsupported content type"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req *A
|
||||||
|
if isJSON {
|
||||||
|
var err error
|
||||||
|
req, err = a2r.ParseRequest[A](c)
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
body, err := io.ReadAll(c.Request.Body)
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req = new(A)
|
||||||
|
if err := proto.Unmarshal(body, any(req).(proto.Message)); err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := checker.Validate(&req); err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp, err := fn(c, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
apiresp.GinError(c, err)
|
apiresp.GinError(c, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
apiresp.GinSuccess(c, resp)
|
if isJSON {
|
||||||
|
apiresp.GinSuccess(c, resp)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
body, err := proto.Marshal(any(resp).(proto.Message))
|
||||||
|
if err != nil {
|
||||||
|
apiresp.GinError(c, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
apiresp.GinSuccess(c, body)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,37 +0,0 @@
|
|||||||
package api
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/openimsdk/protocol/msg"
|
|
||||||
"sort"
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestName(t *testing.T) {
|
|
||||||
val := sortActiveConversations{
|
|
||||||
Conversation: []*msg.ActiveConversation{
|
|
||||||
{
|
|
||||||
ConversationID: "100",
|
|
||||||
LastTime: 100,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ConversationID: "200",
|
|
||||||
LastTime: 200,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ConversationID: "300",
|
|
||||||
LastTime: 300,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ConversationID: "400",
|
|
||||||
LastTime: 400,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
//PinnedConversationIDs: map[string]struct{}{
|
|
||||||
// "100": {},
|
|
||||||
// "300": {},
|
|
||||||
//},
|
|
||||||
}
|
|
||||||
sort.Sort(&val)
|
|
||||||
t.Log(val)
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -77,7 +77,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
|
|||||||
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
|
r.Use(prommetricsGin(), gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(authRpc))
|
||||||
u := NewUserApi(*userRpc)
|
u := NewUserApi(*userRpc)
|
||||||
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
|
m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID)
|
||||||
j := jssdk.NewJSSdkApi(messageRpc.Client, conversationRpc.Client)
|
j := jssdk.NewJSSdkApi(userRpc.Client, friendRpc.Client, groupRpc.Client, messageRpc.Client, conversationRpc.Client)
|
||||||
userRouterGroup := r.Group("/user")
|
userRouterGroup := r.Group("/user")
|
||||||
{
|
{
|
||||||
userRouterGroup.POST("/user_register", u.UserRegister)
|
userRouterGroup.POST("/user_register", u.UserRegister)
|
||||||
|
|||||||
@@ -15,8 +15,7 @@
|
|||||||
package msggateway
|
package msggateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"encoding/json"
|
||||||
"encoding/gob"
|
|
||||||
|
|
||||||
"github.com/openimsdk/tools/errs"
|
"github.com/openimsdk/tools/errs"
|
||||||
)
|
)
|
||||||
@@ -33,19 +32,17 @@ func NewGobEncoder() *GobEncoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *GobEncoder) Encode(data any) ([]byte, error) {
|
func (g *GobEncoder) Encode(data any) ([]byte, error) {
|
||||||
buff := bytes.Buffer{}
|
b, err := json.Marshal(data)
|
||||||
enc := gob.NewEncoder(&buff)
|
if err != nil {
|
||||||
if err := enc.Encode(data); err != nil {
|
return nil, errs.New("Encoder.Encode failed", "action", "encode")
|
||||||
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
|
|
||||||
}
|
}
|
||||||
return buff.Bytes(), nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
|
||||||
buff := bytes.NewBuffer(encodeData)
|
err := json.Unmarshal(encodeData, decodeData)
|
||||||
dec := gob.NewDecoder(buff)
|
if err != nil {
|
||||||
if err := dec.Decode(decodeData); err != nil {
|
return errs.New("Encoder.Decode failed", "action", "decode")
|
||||||
return errs.WrapMsg(err, "GobEncoder.Decode failed", "action", "decode")
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,48 @@
|
|||||||
|
package msggateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGobEncoder_Encode(t *testing.T) {
|
||||||
|
encoder := NewGobEncoder()
|
||||||
|
|
||||||
|
// 测试用例1: 编码 []byte 数据
|
||||||
|
inputData := []byte("test data")
|
||||||
|
encodedData, err := encoder.Encode(inputData)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, got %v", err)
|
||||||
|
}
|
||||||
|
if string(encodedData) != string(inputData) {
|
||||||
|
t.Fatalf("expected encoded data to be '%s', got '%s'", inputData, encodedData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试用例2: 编码非 []byte 数据
|
||||||
|
nonByteData := "string data"
|
||||||
|
_, err = encoder.Encode(nonByteData)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected an error when encoding non-byte data, got none")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGobEncoder_Decode(t *testing.T) {
|
||||||
|
encoder := NewGobEncoder()
|
||||||
|
|
||||||
|
// 测试用例1: 解码到 []byte 数据
|
||||||
|
encodedData := []byte("test data")
|
||||||
|
var decodedData []byte
|
||||||
|
err := encoder.Decode(encodedData, &decodedData)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected no error, got %v", err)
|
||||||
|
}
|
||||||
|
if string(decodedData) != string(encodedData) {
|
||||||
|
t.Fatalf("expected decoded data to be '%s', got '%s'", encodedData, decodedData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 测试用例2: 解码到非 []byte 数据
|
||||||
|
var nonByteData string
|
||||||
|
err = encoder.Decode(encodedData, &nonByteData)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected an error when decoding to non-byte data, got none")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -155,6 +155,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M
|
|||||||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
|
||||||
err := client.PushMessage(ctx, msgData)
|
err := client.PushMessage(ctx, msgData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
|
||||||
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
|
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
|
||||||
} else {
|
} else {
|
||||||
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
if _, ok := s.pushTerminal[client.PlatformID]; ok {
|
||||||
|
|||||||
@@ -128,7 +128,6 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
|||||||
|
|
||||||
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
|
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyCH)
|
||||||
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
|
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.ctx, m.historyMongoCH)
|
||||||
go m.historyCH.HandleUserHasReadSeqMessages(m.ctx)
|
|
||||||
err := m.historyCH.redisMessageBatches.Start()
|
err := m.historyCH.redisMessageBatches.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -158,14 +157,12 @@ func (m *MsgTransfer) Start(index int, config *Config) error {
|
|||||||
// graceful close kafka client.
|
// graceful close kafka client.
|
||||||
m.cancel()
|
m.cancel()
|
||||||
m.historyCH.redisMessageBatches.Close()
|
m.historyCH.redisMessageBatches.Close()
|
||||||
m.historyCH.Close()
|
|
||||||
m.historyCH.historyConsumerGroup.Close()
|
m.historyCH.historyConsumerGroup.Close()
|
||||||
m.historyMongoCH.historyConsumerGroup.Close()
|
m.historyMongoCH.historyConsumerGroup.Close()
|
||||||
return nil
|
return nil
|
||||||
case <-netDone:
|
case <-netDone:
|
||||||
m.cancel()
|
m.cancel()
|
||||||
m.historyCH.redisMessageBatches.Close()
|
m.historyCH.redisMessageBatches.Close()
|
||||||
m.historyCH.Close()
|
|
||||||
m.historyCH.historyConsumerGroup.Close()
|
m.historyCH.historyConsumerGroup.Close()
|
||||||
m.historyMongoCH.historyConsumerGroup.Close()
|
m.historyMongoCH.historyConsumerGroup.Close()
|
||||||
close(netDone)
|
close(netDone)
|
||||||
|
|||||||
@@ -18,10 +18,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
@@ -42,12 +40,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
size = 500
|
size = 500
|
||||||
mainDataBuffer = 500
|
mainDataBuffer = 500
|
||||||
subChanBuffer = 50
|
subChanBuffer = 50
|
||||||
worker = 50
|
worker = 50
|
||||||
interval = 100 * time.Millisecond
|
interval = 100 * time.Millisecond
|
||||||
hasReadChanBuffer = 1000
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ContextMsg struct {
|
type ContextMsg struct {
|
||||||
@@ -55,23 +52,14 @@ type ContextMsg struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// This structure is used for asynchronously writing the sender’s read sequence (seq) regarding a message into MongoDB.
|
|
||||||
// For example, if the sender sends a message with a seq of 10, then their own read seq for this conversation should be set to 10.
|
|
||||||
type userHasReadSeq struct {
|
|
||||||
conversationID string
|
|
||||||
userHasReadMap map[string]int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type OnlineHistoryRedisConsumerHandler struct {
|
type OnlineHistoryRedisConsumerHandler struct {
|
||||||
historyConsumerGroup *kafka.MConsumerGroup
|
historyConsumerGroup *kafka.MConsumerGroup
|
||||||
|
|
||||||
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage]
|
||||||
|
|
||||||
msgTransferDatabase controller.MsgTransferDatabase
|
msgTransferDatabase controller.MsgTransferDatabase
|
||||||
conversationRpcClient *rpcclient.ConversationRpcClient
|
conversationRpcClient *rpcclient.ConversationRpcClient
|
||||||
groupRpcClient *rpcclient.GroupRpcClient
|
groupRpcClient *rpcclient.GroupRpcClient
|
||||||
conversationUserHasReadChan chan *userHasReadSeq
|
|
||||||
wg sync.WaitGroup
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
|
func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase,
|
||||||
@@ -82,8 +70,6 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
|
|||||||
}
|
}
|
||||||
var och OnlineHistoryRedisConsumerHandler
|
var och OnlineHistoryRedisConsumerHandler
|
||||||
och.msgTransferDatabase = database
|
och.msgTransferDatabase = database
|
||||||
och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer)
|
|
||||||
och.wg.Add(1)
|
|
||||||
|
|
||||||
b := batcher.New[sarama.ConsumerMessage](
|
b := batcher.New[sarama.ConsumerMessage](
|
||||||
batcher.WithSize(size),
|
batcher.WithSize(size),
|
||||||
@@ -129,25 +115,25 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
||||||
|
type seqKey struct {
|
||||||
var conversationID string
|
conversationID string
|
||||||
var userSeqMap map[string]int64
|
userID string
|
||||||
|
}
|
||||||
|
var readSeq map[seqKey]int64
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
if msg.message.ContentType != constant.HasReadReceipt {
|
if msg.message.ContentType != constant.HasReadReceipt {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var elem sdkws.NotificationElem
|
var elem sdkws.NotificationElem
|
||||||
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
||||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var tips sdkws.MarkAsReadTips
|
var tips sdkws.MarkAsReadTips
|
||||||
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
||||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//The conversation ID for each batch of messages processed by the batcher is the same.
|
|
||||||
conversationID = tips.ConversationID
|
|
||||||
if len(tips.Seqs) > 0 {
|
if len(tips.Seqs) > 0 {
|
||||||
for _, seq := range tips.Seqs {
|
for _, seq := range tips.Seqs {
|
||||||
if tips.HasReadSeq < seq {
|
if tips.HasReadSeq < seq {
|
||||||
@@ -160,25 +146,26 @@ func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context,
|
|||||||
if tips.HasReadSeq < 0 {
|
if tips.HasReadSeq < 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if userSeqMap == nil {
|
if readSeq == nil {
|
||||||
userSeqMap = make(map[string]int64)
|
readSeq = make(map[seqKey]int64)
|
||||||
}
|
}
|
||||||
|
key := seqKey{
|
||||||
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
|
conversationID: tips.ConversationID,
|
||||||
|
userID: tips.MarkAsReadUserID,
|
||||||
|
}
|
||||||
|
if readSeq[key] > tips.HasReadSeq {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
|
readSeq[key] = tips.HasReadSeq
|
||||||
}
|
}
|
||||||
if userSeqMap == nil {
|
if readSeq == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(conversationID) == 0 {
|
for key, seq := range readSeq {
|
||||||
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
|
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
|
||||||
|
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
|
|
||||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
|
func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
|
||||||
@@ -263,21 +250,12 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
|
|||||||
}
|
}
|
||||||
if len(storageMessageList) > 0 {
|
if len(storageMessageList) > 0 {
|
||||||
msg := storageMessageList[0]
|
msg := storageMessageList[0]
|
||||||
lastSeq, isNewConversation, userSeqMap, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
lastSeq, isNewConversation, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||||
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
|
if err != nil && !errors.Is(errs.Unwrap(err), redis.Nil) {
|
||||||
log.ZWarn(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageMessageList)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.ZInfo(ctx, "BatchInsertChat2Cache end")
|
log.ZInfo(ctx, "BatchInsertChat2Cache end")
|
||||||
err = och.msgTransferDatabase.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
|
||||||
if err != nil {
|
|
||||||
log.ZWarn(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
|
||||||
prommetrics.SeqSetFailedCounter.Inc()
|
|
||||||
}
|
|
||||||
och.conversationUserHasReadChan <- &userHasReadSeq{
|
|
||||||
conversationID: conversationID,
|
|
||||||
userHasReadMap: userSeqMap,
|
|
||||||
}
|
|
||||||
|
|
||||||
if isNewConversation {
|
if isNewConversation {
|
||||||
switch msg.SessionType {
|
switch msg.SessionType {
|
||||||
@@ -330,7 +308,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|||||||
storageMessageList = append(storageMessageList, msg.message)
|
storageMessageList = append(storageMessageList, msg.message)
|
||||||
}
|
}
|
||||||
if len(storageMessageList) > 0 {
|
if len(storageMessageList) > 0 {
|
||||||
lastSeq, _, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
lastSeq, _, err := och.msgTransferDatabase.BatchInsertChat2Cache(ctx, conversationID, storageMessageList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
|
log.ZError(ctx, "notification batch insert to redis error", err, "conversationID", conversationID,
|
||||||
"storageList", storageMessageList)
|
"storageList", storageMessageList)
|
||||||
@@ -345,21 +323,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con
|
|||||||
och.toPushTopic(ctx, key, conversationID, storageList)
|
och.toPushTopic(ctx, key, conversationID, storageList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) HandleUserHasReadSeqMessages(ctx context.Context) {
|
|
||||||
defer och.wg.Done()
|
|
||||||
|
|
||||||
for msg := range och.conversationUserHasReadChan {
|
|
||||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, msg.conversationID, msg.userHasReadMap); err != nil {
|
|
||||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", msg.conversationID, "userSeqMap", msg.userHasReadMap)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.ZInfo(ctx, "Channel closed, exiting handleUserHasReadSeqMessages")
|
|
||||||
}
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) Close() {
|
|
||||||
close(och.conversationUserHasReadChan)
|
|
||||||
och.wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, key, conversationID string, msgs []*ContextMsg) {
|
||||||
for _, v := range msgs {
|
for _, v := range msgs {
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/openimsdk/protocol/constant"
|
"github.com/openimsdk/protocol/constant"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData, offlinePushUserIDs *[]string) error {
|
||||||
@@ -69,7 +70,7 @@ func (c *ConsumerHandler) webhookBeforeOfflinePush(ctx context.Context, before *
|
|||||||
|
|
||||||
func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error {
|
func (c *ConsumerHandler) webhookBeforeOnlinePush(ctx context.Context, before *config.BeforeConfig, userIDs []string, msg *sdkws.MsgData) error {
|
||||||
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
|
||||||
if msg.ContentType == constant.Typing {
|
if datautil.Contain(msg.SendID, userIDs...) || msg.ContentType == constant.Typing {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
req := callbackstruct.CallbackBeforePushReq{
|
req := callbackstruct.CallbackBeforePushReq{
|
||||||
|
|||||||
@@ -1026,7 +1026,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
|
|||||||
}
|
}
|
||||||
num := len(update)
|
num := len(update)
|
||||||
if req.GroupInfoForSet.Notification != "" {
|
if req.GroupInfoForSet.Notification != "" {
|
||||||
num -= 3
|
num--
|
||||||
func() {
|
func() {
|
||||||
conversation := &pbconversation.ConversationReq{
|
conversation := &pbconversation.ConversationReq{
|
||||||
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID),
|
ConversationID: msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupInfoForSet.GroupID),
|
||||||
@@ -1133,9 +1133,8 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
|
|||||||
}
|
}
|
||||||
|
|
||||||
num := len(updatedData)
|
num := len(updatedData)
|
||||||
|
|
||||||
if req.Notification != nil {
|
if req.Notification != nil {
|
||||||
num -= 3
|
num--
|
||||||
|
|
||||||
if req.Notification.Value != "" {
|
if req.Notification.Value != "" {
|
||||||
func() {
|
func() {
|
||||||
@@ -1181,53 +1180,36 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if group.Status == constant.GroupStatusDismissed {
|
if group.Status == constant.GroupStatusDismissed {
|
||||||
return nil, servererrs.ErrDismissedAlready.Wrap()
|
return nil, servererrs.ErrDismissedAlready.Wrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.OldOwnerUserID == req.NewOwnerUserID {
|
if req.OldOwnerUserID == req.NewOwnerUserID {
|
||||||
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID == NewOwnerUserID")
|
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID == NewOwnerUserID")
|
||||||
}
|
}
|
||||||
|
|
||||||
members, err := g.db.FindGroupMembers(ctx, req.GroupID, []string{req.OldOwnerUserID, req.NewOwnerUserID})
|
members, err := g.db.FindGroupMembers(ctx, req.GroupID, []string{req.OldOwnerUserID, req.NewOwnerUserID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := g.PopulateGroupMember(ctx, members...); err != nil {
|
if err := g.PopulateGroupMember(ctx, members...); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
memberMap := datautil.SliceToMap(members, func(e *model.GroupMember) string { return e.UserID })
|
memberMap := datautil.SliceToMap(members, func(e *model.GroupMember) string { return e.UserID })
|
||||||
if ids := datautil.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, datautil.Keys(memberMap)); len(ids) > 0 {
|
if ids := datautil.Single([]string{req.OldOwnerUserID, req.NewOwnerUserID}, datautil.Keys(memberMap)); len(ids) > 0 {
|
||||||
return nil, errs.ErrArgs.WrapMsg("user not in group " + strings.Join(ids, ","))
|
return nil, errs.ErrArgs.WrapMsg("user not in group " + strings.Join(ids, ","))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldOwner := memberMap[req.OldOwnerUserID]
|
oldOwner := memberMap[req.OldOwnerUserID]
|
||||||
if oldOwner == nil {
|
if oldOwner == nil {
|
||||||
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID not in group " + req.NewOwnerUserID)
|
return nil, errs.ErrArgs.WrapMsg("OldOwnerUserID not in group " + req.NewOwnerUserID)
|
||||||
}
|
}
|
||||||
|
|
||||||
newOwner := memberMap[req.NewOwnerUserID]
|
newOwner := memberMap[req.NewOwnerUserID]
|
||||||
if newOwner == nil {
|
if newOwner == nil {
|
||||||
return nil, errs.ErrArgs.WrapMsg("NewOwnerUser not in group " + req.NewOwnerUserID)
|
return nil, errs.ErrArgs.WrapMsg("NewOwnerUser not in group " + req.NewOwnerUserID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
|
if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
|
||||||
if !(mcontext.GetOpUserID(ctx) == oldOwner.UserID && oldOwner.RoleLevel == constant.GroupOwner) {
|
if !(mcontext.GetOpUserID(ctx) == oldOwner.UserID && oldOwner.RoleLevel == constant.GroupOwner) {
|
||||||
return nil, errs.ErrNoPermission.WrapMsg("no permission transfer group owner")
|
return nil, errs.ErrNoPermission.WrapMsg("no permission transfer group owner")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if newOwner.MuteEndTime.After(time.Now()) {
|
|
||||||
if _, err := g.CancelMuteGroupMember(ctx, &pbgroup.CancelMuteGroupMemberReq{
|
|
||||||
GroupID: group.GroupID,
|
|
||||||
UserID: req.NewOwnerUserID}); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := g.db.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil {
|
if err := g.db.TransferGroupOwner(ctx, req.GroupID, req.OldOwnerUserID, req.NewOwnerUserID, newOwner.RoleLevel); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -1235,7 +1217,6 @@ func (g *groupServer) TransferGroupOwner(ctx context.Context, req *pbgroup.Trans
|
|||||||
g.webhookAfterTransferGroupOwner(ctx, &g.config.WebhooksConfig.AfterTransferGroupOwner, req)
|
g.webhookAfterTransferGroupOwner(ctx, &g.config.WebhooksConfig.AfterTransferGroupOwner, req)
|
||||||
|
|
||||||
g.notification.GroupOwnerTransferredNotification(ctx, req)
|
g.notification.GroupOwnerTransferredNotification(ctx, req)
|
||||||
|
|
||||||
return &pbgroup.TransferGroupOwnerResp{}, nil
|
return &pbgroup.TransferGroupOwnerResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1444,38 +1425,32 @@ func (g *groupServer) CancelMuteGroupMember(ctx context.Context, req *pbgroup.Ca
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := g.PopulateGroupMember(ctx, member); err != nil {
|
if err := g.PopulateGroupMember(ctx, member); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
|
if !authverify.IsAppManagerUid(ctx, g.config.Share.IMAdminUserID) {
|
||||||
opMember, err := g.db.TakeGroupMember(ctx, req.GroupID, mcontext.GetOpUserID(ctx))
|
opMember, err := g.db.TakeGroupMember(ctx, req.GroupID, mcontext.GetOpUserID(ctx))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch member.RoleLevel {
|
switch member.RoleLevel {
|
||||||
case constant.GroupOwner:
|
case constant.GroupOwner:
|
||||||
return nil, errs.ErrNoPermission.WrapMsg("Can not set group owner unmute")
|
return nil, errs.ErrNoPermission.WrapMsg("set group owner mute")
|
||||||
case constant.GroupAdmin:
|
case constant.GroupAdmin:
|
||||||
if opMember.RoleLevel != constant.GroupOwner {
|
if opMember.RoleLevel != constant.GroupOwner {
|
||||||
return nil, errs.ErrNoPermission.WrapMsg("Can not set group admin unmute")
|
return nil, errs.ErrNoPermission.WrapMsg("set group admin mute")
|
||||||
}
|
}
|
||||||
case constant.GroupOrdinaryUsers:
|
case constant.GroupOrdinaryUsers:
|
||||||
if !(opMember.RoleLevel == constant.GroupAdmin || opMember.RoleLevel == constant.GroupOwner) {
|
if !(opMember.RoleLevel == constant.GroupAdmin || opMember.RoleLevel == constant.GroupOwner) {
|
||||||
return nil, errs.ErrNoPermission.WrapMsg("Can not set group ordinary users unmute")
|
return nil, errs.ErrNoPermission.WrapMsg("set group ordinary users mute")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
data := UpdateGroupMemberMutedTimeMap(time.Unix(0, 0))
|
data := UpdateGroupMemberMutedTimeMap(time.Unix(0, 0))
|
||||||
if err := g.db.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
|
if err := g.db.UpdateGroupMember(ctx, member.GroupID, member.UserID, data); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
g.notification.GroupMemberCancelMutedNotification(ctx, req.GroupID, req.UserID)
|
g.notification.GroupMemberCancelMutedNotification(ctx, req.GroupID, req.UserID)
|
||||||
|
|
||||||
return &pbgroup.CancelMuteGroupMemberResp{}, nil
|
return &pbgroup.CancelMuteGroupMemberResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1646,7 +1621,7 @@ func (g *groupServer) SetGroupMemberInfo(ctx context.Context, req *pbgroup.SetGr
|
|||||||
g.notification.GroupMemberSetToOrdinaryUserNotification(ctx, member.GroupID, member.UserID)
|
g.notification.GroupMemberSetToOrdinaryUserNotification(ctx, member.GroupID, member.UserID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil {
|
if member.Nickname != nil || member.FaceURL != nil || member.Ex != nil || member.RoleLevel != nil {
|
||||||
g.notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID)
|
g.notification.GroupMemberInfoSetNotification(ctx, member.GroupID, member.UserID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) {
|
||||||
@@ -79,17 +80,13 @@ func (m *msgServer) sendMsgGroupChat(ctx context.Context, req *pbmsg.SendMsgReq)
|
|||||||
|
|
||||||
func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) {
|
func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgData) {
|
||||||
log.ZDebug(nctx, "setConversationAtInfo", "msg", msg)
|
log.ZDebug(nctx, "setConversationAtInfo", "msg", msg)
|
||||||
|
|
||||||
ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx))
|
ctx := mcontext.NewCtx("@@@" + mcontext.GetOperationID(nctx))
|
||||||
|
|
||||||
var atUserID []string
|
var atUserID []string
|
||||||
|
|
||||||
conversation := &pbconversation.ConversationReq{
|
conversation := &pbconversation.ConversationReq{
|
||||||
ConversationID: msgprocessor.GetConversationIDByMsg(msg),
|
ConversationID: msgprocessor.GetConversationIDByMsg(msg),
|
||||||
ConversationType: msg.SessionType,
|
ConversationType: msg.SessionType,
|
||||||
GroupID: msg.GroupID,
|
GroupID: msg.GroupID,
|
||||||
}
|
}
|
||||||
|
|
||||||
tagAll := datautil.Contain(constant.AtAllString, msg.AtUserIDList...)
|
tagAll := datautil.Contain(constant.AtAllString, msg.AtUserIDList...)
|
||||||
if tagAll {
|
if tagAll {
|
||||||
memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID)
|
memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID)
|
||||||
@@ -97,35 +94,25 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa
|
|||||||
log.ZWarn(ctx, "GetGroupMemberIDs", err)
|
log.ZWarn(ctx, "GetGroupMemberIDs", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
atUserID = stringutil.DifferenceString([]string{constant.AtAllString}, msg.AtUserIDList)
|
||||||
memberUserIDList = datautil.DeleteElems(memberUserIDList, msg.SendID)
|
|
||||||
|
|
||||||
atUserID = datautil.Single([]string{constant.AtAllString}, msg.AtUserIDList)
|
|
||||||
|
|
||||||
if len(atUserID) == 0 { // just @everyone
|
if len(atUserID) == 0 { // just @everyone
|
||||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
|
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
|
||||||
} else { // @Everyone and @other people
|
} else { // @Everyone and @other people
|
||||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe}
|
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe}
|
||||||
|
|
||||||
err = m.Conversation.SetConversations(ctx, atUserID, conversation)
|
err = m.Conversation.SetConversations(ctx, atUserID, conversation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation)
|
log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation)
|
||||||
}
|
}
|
||||||
|
memberUserIDList = stringutil.DifferenceString(atUserID, memberUserIDList)
|
||||||
memberUserIDList = datautil.Single(atUserID, memberUserIDList)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
|
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll}
|
||||||
|
|
||||||
err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation)
|
err = m.Conversation.SetConversations(ctx, memberUserIDList, conversation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation)
|
log.ZWarn(ctx, "SetConversations", err, "userID", memberUserIDList, "conversation", conversation)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe}
|
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe}
|
||||||
|
|
||||||
err := m.Conversation.SetConversations(ctx, msg.AtUserIDList, conversation)
|
err := m.Conversation.SetConversations(ctx, msg.AtUserIDList, conversation)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation)
|
log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation)
|
||||||
|
|||||||
@@ -273,7 +273,14 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFri
|
|||||||
return &relation.SetFriendRemarkResp{}, nil
|
return &relation.SetFriendRemarkResp{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ok.
|
func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFriendInfoReq) (*relation.GetFriendInfoResp, error) {
|
||||||
|
friends, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &relation.GetFriendInfoResp{FriendInfos: convert.FriendOnlyDB2PbOnly(friends)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) {
|
func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) {
|
||||||
resp = &relation.GetDesignatedFriendsResp{}
|
resp = &relation.GetDesignatedFriendsResp{}
|
||||||
if datautil.Duplicate(req.FriendUserIDs) {
|
if datautil.Duplicate(req.FriendUserIDs) {
|
||||||
|
|||||||
@@ -116,17 +116,18 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
|
|||||||
|
|
||||||
func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) {
|
func (s *userServer) GetDesignateUsers(ctx context.Context, req *pbuser.GetDesignateUsersReq) (resp *pbuser.GetDesignateUsersResp, err error) {
|
||||||
resp = &pbuser.GetDesignateUsersResp{}
|
resp = &pbuser.GetDesignateUsersResp{}
|
||||||
users, err := s.db.Find(ctx, req.UserIDs)
|
users, err := s.db.FindWithError(ctx, req.UserIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.UsersInfo = convert.UsersDB2Pb(users)
|
resp.UsersInfo = convert.UsersDB2Pb(users)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deprecated:
|
// deprecated:
|
||||||
// UpdateUserInfo
|
|
||||||
|
//UpdateUserInfo
|
||||||
|
|
||||||
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
|
func (s *userServer) UpdateUserInfo(ctx context.Context, req *pbuser.UpdateUserInfoReq) (resp *pbuser.UpdateUserInfoResp, err error) {
|
||||||
resp = &pbuser.UpdateUserInfoResp{}
|
resp = &pbuser.UpdateUserInfoResp{}
|
||||||
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
|
err = authverify.CheckAccessV3(ctx, req.UserInfo.UserID, s.config.Share.IMAdminUserID)
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"github.com/openimsdk/protocol/relation"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"github.com/openimsdk/tools/utils/datautil"
|
"github.com/openimsdk/tools/utils/datautil"
|
||||||
@@ -35,9 +36,7 @@ func FriendPb2DB(friend *sdkws.FriendInfo) *model.Friend {
|
|||||||
return dbFriend
|
return dbFriend
|
||||||
}
|
}
|
||||||
|
|
||||||
func FriendDB2Pb(ctx context.Context, friendDB *model.Friend,
|
func FriendDB2Pb(ctx context.Context, friendDB *model.Friend, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) (*sdkws.FriendInfo, error) {
|
||||||
getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
|
|
||||||
) (*sdkws.FriendInfo, error) {
|
|
||||||
users, err := getUsers(ctx, []string{friendDB.FriendUserID})
|
users, err := getUsers(ctx, []string{friendDB.FriendUserID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -53,11 +52,7 @@ func FriendDB2Pb(ctx context.Context, friendDB *model.Friend,
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func FriendsDB2Pb(
|
func FriendsDB2Pb(ctx context.Context, friendsDB []*model.Friend, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) (friendsPb []*sdkws.FriendInfo, err error) {
|
||||||
ctx context.Context,
|
|
||||||
friendsDB []*model.Friend,
|
|
||||||
getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error),
|
|
||||||
) (friendsPb []*sdkws.FriendInfo, err error) {
|
|
||||||
if len(friendsDB) == 0 {
|
if len(friendsDB) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
@@ -86,7 +81,21 @@ func FriendsDB2Pb(
|
|||||||
friendsPb = append(friendsPb, friendPb)
|
friendsPb = append(friendsPb, friendPb)
|
||||||
}
|
}
|
||||||
return friendsPb, nil
|
return friendsPb, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func FriendOnlyDB2PbOnly(friendsDB []*model.Friend) []*relation.FriendInfoOnly {
|
||||||
|
return datautil.Slice(friendsDB, func(f *model.Friend) *relation.FriendInfoOnly {
|
||||||
|
return &relation.FriendInfoOnly{
|
||||||
|
OwnerUserID: f.OwnerUserID,
|
||||||
|
FriendUserID: f.FriendUserID,
|
||||||
|
Remark: f.Remark,
|
||||||
|
CreateTime: f.CreateTime.UnixMilli(),
|
||||||
|
AddSource: f.AddSource,
|
||||||
|
OperatorUserID: f.OperatorUserID,
|
||||||
|
Ex: f.Ex,
|
||||||
|
IsPinned: f.IsPinned,
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func FriendRequestDB2Pb(ctx context.Context, friendRequests []*model.FriendRequest, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) ([]*sdkws.FriendRequest, error) {
|
func FriendRequestDB2Pb(ctx context.Context, friendRequests []*model.FriendRequest, getUsers func(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error)) ([]*sdkws.FriendRequest, error) {
|
||||||
|
|||||||
@@ -16,10 +16,9 @@ package controller
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
|
||||||
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
@@ -195,7 +194,7 @@ func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Con
|
|||||||
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
return c.tx.Transaction(ctx, func(ctx context.Context) error {
|
||||||
cache := c.cache.CloneConversationCache()
|
cache := c.cache.CloneConversationCache()
|
||||||
for _, conversation := range conversations {
|
for _, conversation := range conversations {
|
||||||
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID, conversation.UserID)
|
cache = cache.DelConversationVersionUserIDs(conversation.OwnerUserID)
|
||||||
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
|
for _, v := range [][2]string{{conversation.OwnerUserID, conversation.UserID}, {conversation.UserID, conversation.OwnerUserID}} {
|
||||||
ownerUserID := v[0]
|
ownerUserID := v[0]
|
||||||
userID := v[1]
|
userID := v[1]
|
||||||
|
|||||||
@@ -24,11 +24,8 @@ type MsgTransferDatabase interface {
|
|||||||
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
|
||||||
|
|
||||||
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
|
// BatchInsertChat2Cache increments the sequence number and then batch inserts messages into the cache.
|
||||||
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, userHasReadMap map[string]int64, err error)
|
BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNewConversation bool, err error)
|
||||||
|
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
|
||||||
SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
|
|
||||||
|
|
||||||
SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error
|
|
||||||
|
|
||||||
// to mq
|
// to mq
|
||||||
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error)
|
||||||
@@ -222,18 +219,18 @@ func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conv
|
|||||||
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
|
return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) {
|
func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||||
lenList := len(msgs)
|
lenList := len(msgs)
|
||||||
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
|
if int64(lenList) > db.msgTable.GetSingleGocMsgNum() {
|
||||||
return 0, false, nil, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
return 0, false, errs.New("message count exceeds limit", "limit", db.msgTable.GetSingleGocMsgNum()).Wrap()
|
||||||
}
|
}
|
||||||
if lenList < 1 {
|
if lenList < 1 {
|
||||||
return 0, false, nil, errs.New("no messages to insert", "minCount", 1).Wrap()
|
return 0, false, errs.New("no messages to insert", "minCount", 1).Wrap()
|
||||||
}
|
}
|
||||||
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
|
currentMaxSeq, err := db.seqConversation.Malloc(ctx, conversationID, int64(len(msgs)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ZError(ctx, "storage.seq.Malloc", err)
|
log.ZError(ctx, "storage.seq.Malloc", err)
|
||||||
return 0, false, nil, err
|
return 0, false, err
|
||||||
}
|
}
|
||||||
isNew = currentMaxSeq == 0
|
isNew = currentMaxSeq == 0
|
||||||
lastMaxSeq := currentMaxSeq
|
lastMaxSeq := currentMaxSeq
|
||||||
@@ -251,19 +248,15 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
|
|||||||
} else {
|
} else {
|
||||||
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||||||
}
|
}
|
||||||
return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err)
|
err = db.setHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||||
}
|
if err != nil {
|
||||||
|
log.ZError(ctx, "SetHasReadSeqs error", err, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||||
func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
prommetrics.SeqSetFailedCounter.Inc()
|
||||||
for userID, seq := range userSeqMap {
|
|
||||||
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
return lastMaxSeq, isNew, errs.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
func (db *msgTransferDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
|
||||||
for userID, seq := range userSeqMap {
|
for userID, seq := range userSeqMap {
|
||||||
if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil {
|
if err := db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, seq); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -272,6 +265,10 @@ func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, conversati
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *msgTransferDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
|
||||||
|
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
|
||||||
|
}
|
||||||
|
|
||||||
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
func (db *msgTransferDatabase) MsgToPushMQ(ctx context.Context, key, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) {
|
||||||
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
partition, offset, err := db.producerToPush.SendMessage(ctx, key, &pbmsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ package rpccache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
|
||||||
@@ -98,7 +97,6 @@ func (u *UserLocalCache) GetUsersInfo(ctx context.Context, userIDs []string) ([]
|
|||||||
user, err := u.GetUserInfo(ctx, userID)
|
user, err := u.GetUserInfo(ctx, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errs.ErrRecordNotFound.Is(err) {
|
if errs.ErrRecordNotFound.Is(err) {
|
||||||
log.ZWarn(ctx, "User info notFound", err, "userID", userID)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
Reference in New Issue
Block a user