Compare commits

..

12 Commits

Author SHA1 Message Date
github-actions[bot] aa970c0d95 Update version to v3.8.3-patch.10 2025-09-01 02:55:53 +00:00
Monet Lee 4aea6bb4ff build: improve publish docker image workflow in v3.8.3-patch branch. (#3551) 2025-09-01 10:49:17 +08:00
OpenIM-Robot 2aab7034b0 feat: enable redis aof-use-rdb-preamble && disable auto rdb (#3529) (#3535)
Co-authored-by: icey-yu <119291641+icey-yu@users.noreply.github.com>
2025-08-19 16:36:59 +08:00
OpenIM-Gordon 7a9c336a96 fix: fill in the most recent sendTime for a gap message to prevent the client from repeatedly retrieving the same message due to sendTime being 0. (#3523) 2025-08-14 13:43:34 +08:00
Monet Lee 02025278b3 fix: solve batch incorrect error in Find DocIDs in v3.8.3-patch branch. (#3515) 2025-08-13 17:24:23 +08:00
Monet Lee 90596b1a02 fix: fix incorrect kicked logic and PCAndOther Login policy In v3.8.3-patch (#3511) 2025-08-12 16:31:41 +08:00
Gordon 482284a0fb Merge remote-tracking branch 'upstream/3.8.3-patch' into 3.8.3-patch 2025-07-29 16:00:51 +08:00
Gordon a893141ae6 eat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. 2025-07-29 16:00:08 +08:00
Monet Lee 9eccfee997 fix: solve redis config db field in v3.8.3-patch (#3490) 2025-07-29 14:11:43 +08:00
Monet Lee f950dbc5e7 fix: import friends send notification in v3.8.3-patch (#3488) 2025-07-29 11:11:42 +08:00
OpenIM-Gordon 55113e5277 feat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. (#3239) (#3247) (#3483)
Co-authored-by: OpenIM-Robot <openim4@gmail.com>
2025-07-28 14:43:58 +08:00
OpenIM-Robot 7fdc438500 fix: correctly aggregate read seqs by conversation and user before DB update. [Created by @FGadvancer from #3442] (#3482)
* fix: correctly aggregate read seqs by conversation and user before DB update. (#3442)

* build: docker compose file add some comments.

* fix: correctly aggregate read seqs by conversation and user before DB update.

* solve declarate pkg issue.

---------

Co-authored-by: OpenIM-Gordon <1432970085@qq.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
2025-07-28 12:36:48 +08:00
15 changed files with 328 additions and 143 deletions
+100 -56
View File
@@ -4,45 +4,80 @@ on:
push: push:
branches: branches:
- release-* - release-*
# tags:
# - 'v*'
release: release:
types: [published] types: [published]
workflow_dispatch: workflow_dispatch:
inputs: inputs:
tag: tag:
description: "Tag version to be used for Docker image" description: "Tag version to be used for Docker image"
required: true required: true
default: "v3.8.0" default: "v3.8.3"
env:
GO_VERSION: "1.22"
IMAGE_NAME: "openim-server"
# IMAGE_NAME: ${{ github.event.repository.name }}
DOCKER_BUILDKIT: 1
jobs: jobs:
build-and-test: publish-docker-images:
runs-on: ubuntu-latest runs-on: ubuntu-latest
if: ${{ !(github.event_name == 'pull_request' && github.event.pull_request.merged == false) }}
steps: steps:
- uses: actions/checkout@v4 - name: Checkout main repository
uses: actions/checkout@v4
with: with:
path: main-repo path: main-repo
- name: Set up QEMU - name: Set up QEMU
uses: docker/setup-qemu-action@v3 uses: docker/setup-qemu-action@v3.3.0
- name: Set up Docker Buildx - name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v3 uses: docker/setup-buildx-action@v3
- name: Build Docker image
id: build
uses: docker/build-push-action@v5
with: with:
context: ./main-repo driver-opts: network=host
load: true
tags: "openim/openim-server:local"
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Save Docker image to file - name: Extract metadata for Docker
run: docker save -o image.tar openim/openim-server:local id: meta
uses: docker/metadata-action@v5.6.0
with:
images: |
${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}
ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}
registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=tag
type=schedule
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern=v{{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=sha
- name: Install skopeo
run: |
sudo apt-get update && sudo apt-get install -y skopeo
- name: Build multi-arch images as OCI
run: |
mkdir -p /tmp/oci-image /tmp/docker-cache
# Build multi-architecture image and save in OCI format
docker buildx build \
--platform linux/amd64,linux/arm64 \
--output type=oci,dest=/tmp/oci-image/multi-arch.tar \
--cache-to type=local,dest=/tmp/docker-cache \
--cache-from type=gha \
./main-repo
# Use skopeo to convert the amd64 image from OCI format to Docker format and load it
skopeo copy --override-arch amd64 oci-archive:/tmp/oci-image/multi-arch.tar docker-daemon:${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local
# check image
docker image ls | grep openim
- name: Checkout compose repository - name: Checkout compose repository
uses: actions/checkout@v4 uses: actions/checkout@v4
@@ -55,18 +90,19 @@ jobs:
run: | run: |
IP=$(hostname -I | awk '{print $1}') IP=$(hostname -I | awk '{print $1}')
echo "The IP Address is: $IP" echo "The IP Address is: $IP"
echo "::set-output name=ip::$IP" echo "ip=$IP" >> $GITHUB_OUTPUT
- name: Update .env to use the local image - name: Update .env to use the local image
run: | run: |
sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=openim/openim-server:local|' ${{ github.workspace }}/compose-repo/.env sed -i 's|OPENIM_SERVER_IMAGE=.*|OPENIM_SERVER_IMAGE=${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}:local|' ${{ github.workspace }}/compose-repo/.env
sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env sed -i 's|MINIO_EXTERNAL_ADDRESS=.*|MINIO_EXTERNAL_ADDRESS=http://${{ steps.get-ip.outputs.ip }}:10005|' ${{ github.workspace }}/compose-repo/.env
- name: Start services using Docker Compose - name: Start services using Docker Compose
run: | run: |
cd ${{ github.workspace }}/compose-repo cd ${{ github.workspace }}/compose-repo
docker compose up -d docker compose up -d
sleep 60
docker compose ps
# - name: Check openim-server health # - name: Check openim-server health
# run: | # run: |
@@ -97,54 +133,62 @@ jobs:
# exit 0 # exit 0
# fi # fi
- name: Load Docker image from file
run: docker load -i image.tar
- name: Extract metadata for Docker (tags, labels)
id: meta
uses: docker/metadata-action@v5.5.1
with:
images: |
openim/openim-server
ghcr.io/openimsdk/openim-server
registry.cn-hangzhou.aliyuncs.com/openimsdk/openim-server
tags: |
type=ref,event=tag
type=schedule
type=ref,event=branch
type=semver,pattern={{version}}
type=semver,pattern=v{{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
type=semver,pattern=release-{{raw}}
type=sha
type=raw,value=${{ github.event.inputs.tag }}
- name: Log in to Docker Hub - name: Log in to Docker Hub
uses: docker/login-action@v2 uses: docker/login-action@v3.3.0
with: with:
username: ${{ secrets.DOCKER_USERNAME }} username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }} password: ${{ secrets.DOCKER_PASSWORD }}
- name: Log in to GitHub Container Registry - name: Log in to GitHub Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3.3.0
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Log in to Aliyun Container Registry - name: Log in to Aliyun Container Registry
uses: docker/login-action@v2 uses: docker/login-action@v3.3.0
with: with:
registry: registry.cn-hangzhou.aliyuncs.com registry: registry.cn-hangzhou.aliyuncs.com
username: ${{ secrets.ALIREGISTRY_USERNAME }} username: ${{ secrets.ALIREGISTRY_USERNAME }}
password: ${{ secrets.ALIREGISTRY_TOKEN }} password: ${{ secrets.ALIREGISTRY_TOKEN }}
- name: Push Docker images - name: Push multi-architecture images
uses: docker/build-push-action@v5 if: success()
with: run: |
context: ./main-repo docker buildx build \
push: true --platform linux/amd64,linux/arm64 \
platforms: linux/amd64,linux/arm64 $(echo "${{ steps.meta.outputs.tags }}" | sed 's/,/ --tag /g' | sed 's/^/--tag /') \
tags: ${{ steps.meta.outputs.tags }} --cache-from type=local,src=/tmp/docker-cache \
labels: ${{ steps.meta.outputs.labels }} --push \
./main-repo
- name: Verify multi-platform support
run: |
images=(
"${{ secrets.DOCKER_USERNAME }}/${{ env.IMAGE_NAME }}"
"ghcr.io/${{ github.repository_owner }}/${{ env.IMAGE_NAME }}"
"registry.cn-hangzhou.aliyuncs.com/openimsdk/${{ env.IMAGE_NAME }}"
)
for image in "${images[@]}"; do
for tag in $(echo "${{ steps.meta.outputs.tags }}" | tr ',' '\n' | cut -d':' -f2); do
echo "Verifying multi-arch support for $image:$tag"
manifest=$(docker manifest inspect "$image:$tag" || echo "error")
if [[ "$manifest" == "error" ]]; then
echo "Manifest not found for $image:$tag"
exit 1
fi
amd64_found=$(echo "$manifest" | jq '.manifests[] | select(.platform.architecture == "amd64")')
arm64_found=$(echo "$manifest" | jq '.manifests[] | select(.platform.architecture == "arm64")')
if [[ -z "$amd64_found" ]]; then
echo "Multi-platform support check failed for $image:$tag - missing amd64"
exit 1
fi
if [[ -z "$arm64_found" ]]; then
echo "Multi-platform support check failed for $image:$tag - missing arm64"
exit 1
fi
echo "✅ $image:$tag supports both amd64 and arm64 architectures"
done
done
+56 -4
View File
@@ -63,7 +63,12 @@ services:
restart: always restart: always
sysctls: sysctls:
net.core.somaxconn: 1024 net.core.somaxconn: 1024
command: redis-server /usr/local/redis/config/redis.conf --requirepass openIM123 --appendonly yes command: >
redis-server
--requirepass openIM123
--appendonly yes
--aof-use-rdb-preamble yes
--save ""
networks: networks:
- openim - openim
@@ -99,15 +104,62 @@ services:
environment: environment:
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m" #KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
TZ: Asia/Shanghai TZ: Asia/Shanghai
# Unique identifier for the Kafka node (required in controller mode)
KAFKA_CFG_NODE_ID: 0 KAFKA_CFG_NODE_ID: 0
# Defines the roles this Kafka node plays: broker, controller, or both
KAFKA_CFG_PROCESS_ROLES: controller,broker KAFKA_CFG_PROCESS_ROLES: controller,broker
# Specifies which nodes are controller nodes for quorum voting.
# The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port
# The controller listener endpoint here is kafka:9093
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 # Specifies which listener is used for controller-to-controller communication
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
# Default number of partitions for new topics
KAFKA_NUM_PARTITIONS: 8 KAFKA_NUM_PARTITIONS: 8
# Whether to enable automatic topic creation
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
# Kafka internal listeners; Kafka supports multiple ports with different protocols
# Each port is used for a specific purpose: INTERNAL for internal broker communication,
# CONTROLLER for controller communication, EXTERNAL for external client connections.
# These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
# In short, Kafka is listening on three logical ports: 9092 for internal communication,
# 9093 for controller traffic, and 9094 for external access.
KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
# Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka',
# so other containers can access Kafka via kafka:9092.
# EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect.
# If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP.
KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094"
# Maps logical listener names to actual protocols.
# Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"
# Defines which listener is used for inter-broker communication within the Kafka cluster
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
# Authentication configuration variables - comment out to disable auth
# KAFKA_USERNAME: "openIM"
# KAFKA_PASSWORD: "openIM123"
command: >
/bin/sh -c '
if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then
echo "=== Kafka SASL Authentication ENABLED ==="
echo "Username: $${KAFKA_USERNAME}"
# Set environment variables for SASL authentication
export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT"
export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}"
export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}"
fi
# Start Kafka with the configured environment
exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
'
networks: networks:
- openim - openim
+61 -21
View File
@@ -2,10 +2,14 @@ package jssdk
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort" "sort"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/jssdk" "github.com/openimsdk/protocol/jssdk"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
@@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if len(conversationIDs) == 0 { if len(conversationIDs) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil return &jssdk.GetActiveConversationsResp{}, nil
} }
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if len(activeConversation) == 0 { if len(activeConversation) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil return &jssdk.GetActiveConversationsResp{}, nil
} }
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
sortConversations := sortActiveConversations{ sortConversations := sortActiveConversations{
Conversation: activeConversation, Conversation: activeConversation,
} }
@@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if err != nil { if err != nil {
return nil, err return nil, err
} }
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
return c.ConversationID return c.ConversationID
}) })
@@ -156,16 +162,15 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if !ok { if !ok {
continue continue
} }
var lastMsg *sdkws.MsgData
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
lastMsg = msgList.Msgs[0] resp = append(resp, &jssdk.ConversationMsg{
Conversation: conv,
LastMsg: msgList.Msgs[0],
MaxSeq: c.MaxSeq,
ReadSeq: readSeq[c.ConversationID],
})
} }
resp = append(resp, &jssdk.ConversationMsg{
Conversation: conv,
LastMsg: lastMsg,
MaxSeq: c.MaxSeq,
ReadSeq: readSeq[c.ConversationID],
})
} }
if err := x.fillConversations(ctx, resp); err != nil { if err := x.fillConversations(ctx, resp); err != nil {
return nil, err return nil, err
@@ -219,18 +224,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
return nil, err return nil, err
} }
} }
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
for _, c := range conversations { for _, c := range conversations {
var lastMsg *sdkws.MsgData
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
lastMsg = msgList.Msgs[0] resp = append(resp, &jssdk.ConversationMsg{
Conversation: c,
LastMsg: msgList.Msgs[0],
MaxSeq: maxSeqs[c.ConversationID],
ReadSeq: readSeqs[c.ConversationID],
})
} }
resp = append(resp, &jssdk.ConversationMsg{
Conversation: c,
LastMsg: lastMsg,
MaxSeq: maxSeqs[c.ConversationID],
ReadSeq: readSeqs[c.ConversationID],
})
} }
if err := x.fillConversations(ctx, resp); err != nil { if err := x.fillConversations(ctx, resp); err != nil {
return nil, err return nil, err
@@ -247,3 +252,38 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
UnreadCount: unreadCount, UnreadCount: unreadCount,
}, nil }, nil
} }
// This function checks whether the latest MaxSeq message is valid.
// If not, it needs to fetch a valid message again.
func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) {
var conversationIDs []string
for conversationID, message := range messages {
allInValid := true
for _, data := range message.Msgs {
if data.Status < constant.MsgStatusHasDeleted {
allInValid = false
break
}
}
// when the conversation has been deleted by the user, the length of message.Msgs is empty
if allInValid && len(message.Msgs) > 0 {
conversationIDs = append(conversationIDs, conversationID)
}
}
if len(conversationIDs) > 0 {
resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
UserID: userID,
ConversationIDs: conversationIDs,
})
if err != nil {
log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs)
return
}
for conversationID, message := range resp.Msgs {
messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}}
}
}
}
+1
View File
@@ -244,6 +244,7 @@ func (s *Server) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.Mu
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx)) tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
client := &Client{} client := &Client{}
client.ctx = tempUserCtx client.ctx = tempUserCtx
client.token = req.Token
client.UserID = req.UserID client.UserID = req.UserID
client.PlatformID = int(req.PlatformID) client.PlatformID = int(req.PlatformID)
i := &kickHandler{ i := &kickHandler{
+38 -3
View File
@@ -337,17 +337,51 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
} }
} }
// If reconnect: When multiple msgGateway instances are deployed, a client may disconnect from instance A and reconnect to instance B.
// During this process, instance A might still be executing, resulting in two clients with the same token existing simultaneously.
// This situation needs to be filtered to prevent duplicate clients.
checkSameTokenFunc := func(oldClients []*Client) []*Client {
var clientsNeedToKick []*Client
for _, c := range oldClients {
if c.token == newClient.token {
log.ZDebug(newClient.ctx, "token is same, not kick",
"userID", newClient.UserID,
"platformID", newClient.PlatformID,
"token", newClient.token)
continue
}
clientsNeedToKick = append(clientsNeedToKick, c)
}
return clientsNeedToKick
}
switch ws.msgGatewayConfig.Share.MultiLogin.Policy { switch ws.msgGatewayConfig.Share.MultiLogin.Policy {
case constant.DefalutNotKick: case constant.DefalutNotKick:
case constant.PCAndOther: case constant.PCAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC { if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
return return
} }
clients, ok := ws.clients.GetAll(newClient.UserID)
clientOK = ok
oldClients = make([]*Client, 0, len(clients))
for _, c := range clients {
if constant.PlatformIDToClass(c.PlatformID) == constant.TerminalPC {
continue
}
oldClients = append(oldClients, c)
}
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if !clientOK { if !clientOK {
return return
} }
oldClients = checkSameTokenFunc(oldClients)
ws.clients.DeleteClients(newClient.UserID, oldClients) ws.clients.DeleteClients(newClient.UserID, oldClients)
for _, c := range oldClients { for _, c := range oldClients {
err := c.KickOnlineMessage() err := c.KickOnlineMessage()
@@ -373,14 +407,15 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
if !ok { if !ok {
return return
} }
var (
kickClients []*Client var kickClients []*Client
)
for _, client := range clients { for _, client := range clients {
if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) { if constant.PlatformIDToClass(client.PlatformID) == constant.PlatformIDToClass(newClient.PlatformID) {
kickClients = append(kickClients, client) kickClients = append(kickClients, client)
} }
} }
kickClients = checkSameTokenFunc(kickClients)
kickTokenFunc(kickClients) kickTokenFunc(kickClients)
} }
} }
@@ -26,10 +26,12 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@@ -41,7 +43,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
) )
const ( const (
@@ -140,53 +141,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
var conversationID string // Outer map: conversationID -> (userID -> maxHasReadSeq)
var userSeqMap map[string]int64 conversationUserSeq := make(map[string]map[string]int64)
for _, msg := range msgs { for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt { if msg.message.ContentType != constant.HasReadReceipt {
continue continue
} }
var elem sdkws.NotificationElem var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil { if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
continue continue
} }
var tips sdkws.MarkAsReadTips var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
continue continue
} }
//The conversation ID for each batch of messages processed by the batcher is the same. if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
conversationID = tips.ConversationID
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue continue
} }
if userSeqMap == nil {
userSeqMap = make(map[string]int64)
}
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { // Calculate the max seq from tips.Seqs
continue for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
}
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
} }
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
} }
if userSeqMap == nil { log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
return
} // persist to db
if len(conversationID) == 0 { for convID, userSeqMap := range conversationUserSeq {
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
} log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { }
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
} }
} }
+2 -2
View File
@@ -194,7 +194,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
FromUserID: req.OwnerUserID, FromUserID: req.OwnerUserID,
ToUserID: userID, ToUserID: userID,
HandleResult: constant.FriendResponseAgree, HandleResult: constant.FriendResponseAgree,
}) }, false)
} }
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req) s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
@@ -223,7 +223,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
return nil, err return nil, err
} }
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req) s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
s.notificationSender.FriendApplicationAgreedNotification(ctx, req) s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true)
return resp, nil return resp, nil
} }
if req.HandleResult == constant.FriendResponseRefuse { if req.HandleResult == constant.FriendResponseRefuse {
+11 -5
View File
@@ -171,11 +171,17 @@ func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips) f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
} }
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) { func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) {
request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID) var (
if err != nil { request *sdkws.FriendRequest
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID) err error
return )
if checkReq {
request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
if err != nil {
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
return
}
} }
tips := sdkws.FriendApplicationApprovedTips{ tips := sdkws.FriendApplicationApprovedTips{
FromToUserID: &sdkws.FromToUserID{ FromToUserID: &sdkws.FromToUserID{
+2 -1
View File
@@ -23,6 +23,8 @@ import (
"github.com/openimsdk/tools/s3/aws" "github.com/openimsdk/tools/s3/aws"
"github.com/openimsdk/tools/s3/disable" "github.com/openimsdk/tools/s3/disable"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
@@ -37,7 +39,6 @@ import (
"github.com/openimsdk/tools/s3/kodo" "github.com/openimsdk/tools/s3/kodo"
"github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/s3/oss" "github.com/openimsdk/tools/s3/oss"
"google.golang.org/grpc"
) )
type thirdServer struct { type thirdServer struct {
+1 -1
View File
@@ -358,7 +358,7 @@ type Redis struct {
Username string `mapstructure:"username"` Username string `mapstructure:"username"`
Password string `mapstructure:"password"` Password string `mapstructure:"password"`
ClusterMode bool `mapstructure:"clusterMode"` ClusterMode bool `mapstructure:"clusterMode"`
DB int `mapstructure:"storage"` DB int `mapstructure:"db"`
MaxRetry int `mapstructure:"maxRetry"` MaxRetry int `mapstructure:"maxRetry"`
PoolSize int `mapstructure:"poolSize"` PoolSize int `mapstructure:"poolSize"`
} }
+9 -2
View File
@@ -675,12 +675,19 @@ func (db *commonMsgDatabase) SearchMessage(ctx context.Context, req *pbmsg.Searc
func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) {
totalMsgs := make(map[string]*sdkws.MsgData) totalMsgs := make(map[string]*sdkws.MsgData)
for _, conversationID := range conversationIDs { for _, conversationID := range conversationIDs {
seq := seqs[conversationID] seq, ok := seqs[conversationID]
if !ok {
log.ZWarn(ctx, "seq not found for conversationID", errs.New("seq not found for conversation"), "conversationID", conversationID)
continue
}
docID := db.msgTable.GetDocID(conversationID, seq) docID := db.msgTable.GetDocID(conversationID, seq)
msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) msgs, err := db.msgDocDatabase.FindOneByDocID(ctx, docID)
if err != nil { if err != nil {
return nil, err log.ZWarn(ctx, "FindOneByDocID failed", err, "conversationID", conversationID, "docID", docID, "seq", seq)
continue
} }
index := db.msgTable.GetMsgIndex(seq) index := db.msgTable.GetMsgIndex(seq)
totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg) totalMsgs[conversationID] = convert.MsgDB2Pb(msgs.Msg[index].Msg)
} }
+7 -6
View File
@@ -5,6 +5,11 @@ import (
"fmt" "fmt"
"time" "time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/constant"
@@ -14,10 +19,6 @@ import (
"github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/jsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
) )
func NewMsgMongo(db *mongo.Database) (database.Msg, error) { func NewMsgMongo(db *mongo.Database) (database.Msg, error) {
@@ -1149,7 +1150,7 @@ func (m *MsgMgo) findBeforeDocSendTime(ctx context.Context, docID string, limit
if err != nil { if err != nil {
return 0, 0, err return 0, 0, err
} }
for i := len(res) - 1; i > 0; i-- { for i := len(res) - 1; i >= 0; i-- {
v := res[i] v := res[i]
if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 { if v.Msgs != nil && v.Msgs.Msg != nil && v.Msgs.Msg.SendTime > 0 {
return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil return v.Msgs.Msg.Seq, v.Msgs.Msg.SendTime, nil
@@ -1164,7 +1165,7 @@ func (m *MsgMgo) findBeforeSendTime(ctx context.Context, conversationID string,
limit := int64(-1) limit := int64(-1)
if first { if first {
first = false first = false
limit = m.model.GetMsgIndex(seq) limit = m.model.GetLimitForSingleDoc(seq)
} }
docID := m.model.BuildDocIDByIndex(conversationID, i) docID := m.model.BuildDocIDByIndex(conversationID, i)
msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit) msgSeq, msgSendTime, err := m.findBeforeDocSendTime(ctx, docID, limit)
+8 -8
View File
@@ -120,15 +120,11 @@ func (m *MsgDocModel) GetDocID(conversationID string, seq int64) string {
func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { func (m *MsgDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 {
t := make(map[string][]int64) t := make(map[string][]int64)
for i := 0; i < len(seqs); i++ { for _, seq := range seqs {
docID := m.GetDocID(conversationID, seqs[i]) docID := m.GetDocID(conversationID, seq)
if value, ok := t[docID]; !ok { t[docID] = append(t[docID], seq)
var temp []int64
t[docID] = append(temp, seqs[i])
} else {
t[docID] = append(value, seqs[i])
}
} }
return t return t
} }
@@ -136,6 +132,10 @@ func (*MsgDocModel) GetMsgIndex(seq int64) int64 {
return (seq - 1) % singleGocMsgNum return (seq - 1) % singleGocMsgNum
} }
func (*MsgDocModel) GetLimitForSingleDoc(seq int64) int64 {
return seq % singleGocMsgNum
}
func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string { func (*MsgDocModel) indexGen(conversationID string, seqSuffix int64) string {
return conversationID + ":" + strconv.FormatInt(seqSuffix, 10) return conversationID + ":" + strconv.FormatInt(seqSuffix, 10)
} }
+3 -1
View File
@@ -2,9 +2,11 @@ package rpcli
import ( import (
"context" "context"
"google.golang.org/grpc"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"google.golang.org/grpc"
) )
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
+1 -1
View File
@@ -1 +1 @@
v3.8.3-patch.6 v3.8.3-patch.10