Compare commits

...

14 Commits

Author SHA1 Message Date
github-actions[bot] 391eafb402 Update version to v3.8.3-patch.7 2025-07-29 09:07:09 +00:00
Gordon 482284a0fb Merge remote-tracking branch 'upstream/3.8.3-patch' into 3.8.3-patch 2025-07-29 16:00:51 +08:00
Gordon a893141ae6 eat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. 2025-07-29 16:00:08 +08:00
Monet Lee 9eccfee997 fix: solve redis config db field in v3.8.3-patch (#3490) 2025-07-29 14:11:43 +08:00
Monet Lee f950dbc5e7 fix: import friends send notification in v3.8.3-patch (#3488) 2025-07-29 11:11:42 +08:00
OpenIM-Gordon 55113e5277 feat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. (#3239) (#3247) (#3483)
Co-authored-by: OpenIM-Robot <openim4@gmail.com>
2025-07-28 14:43:58 +08:00
OpenIM-Robot 7fdc438500 fix: correctly aggregate read seqs by conversation and user before DB update. [Created by @FGadvancer from #3442] (#3482)
* fix: correctly aggregate read seqs by conversation and user before DB update. (#3442)

* build: docker compose file add some comments.

* fix: correctly aggregate read seqs by conversation and user before DB update.

* solve declarate pkg issue.

---------

Co-authored-by: OpenIM-Gordon <1432970085@qq.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
2025-07-28 12:36:48 +08:00
Monet Lee 2804d90020 fix: use safe submodule init in workflows in v3.8.3-patch. (#3469) 2025-07-22 10:59:16 +08:00
chao 4ca3f2dc0c Merge pull request #3454 from withchao/3.8.3-patch
fix: s3 aws init
2025-07-10 15:03:58 +08:00
withchao b92c5ab821 Merge remote-tracking branch 'origin/3.8.3-patch' into 3.8.3-patch 2025-07-10 14:59:46 +08:00
withchao 9c4b6e50ef fix: s3 aws init 2025-07-10 14:59:33 +08:00
Monet Lee 54e189d80f fix: remove update version file workflows have new line in 3.8.3-patch branch. (#3452) 2025-07-09 17:27:59 +08:00
icey-yu 1b6d70e4a7 Merge pull request #3438 from icey-yu/add-friend
fix: Add friend DB in notification sender
2025-06-25 16:39:23 +08:00
chao 4c9cf55867 fix: add friend db in notification sender 2025-06-25 16:35:53 +08:00
11 changed files with 252 additions and 108 deletions
@@ -8,19 +8,40 @@ jobs:
update-version: update-version:
runs-on: ubuntu-latest runs-on: ubuntu-latest
env: env:
TAG_VERSION: ${{ github.event.release.tag_name }} TAG_VERSION: ${{ github.event.release.tag_name }}
steps: steps:
# Step 1: Checkout the original repository's code # Step 1: Checkout the original repository's code
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
# submodules: "recursive"
- name: Safe submodule initialization
run: |
echo "Checking for submodules..."
if [ -f .gitmodules ]; then
if [ -s .gitmodules ]; then
echo "Initializing submodules..."
if git submodule sync --recursive 2>/dev/null; then
git submodule update --init --force --recursive || {
echo "Warning: Some submodules failed to initialize, continuing anyway..."
}
else
echo "Warning: Submodule sync failed, continuing without submodules..."
fi
else
echo ".gitmodules exists but is empty, skipping submodule initialization"
fi
else
echo "No .gitmodules file found, no submodules to initialize"
fi
# Step 2: Set up Git with official account # Step 2: Set up Git with official account
- name: Set up Git - name: Set up Git
run: | run: |
git config user.name "github-actions[bot]" git config --global user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com" git config --global user.email "github-actions[bot]@users.noreply.github.com"
# Step 3: Check and delete existing tag # Step 3: Check and delete existing tag
- name: Check and delete existing tag - name: Check and delete existing tag
@@ -33,7 +54,8 @@ jobs:
# Step 4: Update version file # Step 4: Update version file
- name: Update version file - name: Update version file
run: | run: |
echo "${{ env.TAG_VERSION }}" > version/version mkdir -p version
echo -n "${{ env.TAG_VERSION }}" > version/version
# Step 5: Commit and push changes # Step 5: Commit and push changes
- name: Commit and push changes - name: Commit and push changes
@@ -42,43 +64,56 @@ jobs:
run: | run: |
git add version/version git add version/version
git commit -m "Update version to ${{ env.TAG_VERSION }}" git commit -m "Update version to ${{ env.TAG_VERSION }}"
git push origin HEAD:${{ github.ref }}
# Step 6: Create and push tag # Step 6: Update tag
- name: Create and push tag - name: Update tag
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: | run: |
git tag ${{ env.TAG_VERSION }} git tag -fa ${{ env.TAG_VERSION }} -m "Update version to ${{ env.TAG_VERSION }}"
git push origin ${{ env.TAG_VERSION }} git push origin ${{ env.TAG_VERSION }} --force
# Step 7: Find and Publish Draft Release # Step 7: Find and Publish Draft Release
- name: Find and Publish Draft Release - name: Find and Publish Draft Release
uses: actions/github-script@v6 uses: actions/github-script@v7
with: with:
github-token: ${{ secrets.GITHUB_TOKEN }} github-token: ${{ secrets.GITHUB_TOKEN }}
script: | script: |
// Get the list of releases const { owner, repo } = context.repo;
const releases = await github.rest.repos.listReleases({ const tagName = process.env.TAG_VERSION;
owner: context.repo.owner,
repo: context.repo.repo
});
// Find the draft release where the title and tag_name are the same try {
const draftRelease = releases.data.find(release => let release;
release.draft && release.name === release.tag_name try {
); const response = await github.rest.repos.getReleaseByTag({
owner,
if (draftRelease) { repo,
// Publish the draft release using the release_id tag: tagName
});
release = response.data;
} catch (tagError) {
core.info(`Release not found by tag, searching all releases...`);
const releases = await github.rest.repos.listReleases({
owner,
repo,
per_page: 100
});
release = releases.data.find(r => r.draft && r.tag_name === tagName);
if (!release) {
throw new Error(`No release found with tag ${tagName}`);
}
}
await github.rest.repos.updateRelease({ await github.rest.repos.updateRelease({
owner: context.repo.owner, owner,
repo: context.repo.repo, repo,
release_id: draftRelease.id, // Use release_id release_id: release.id,
draft: false draft: false,
prerelease: release.prerelease
}); });
core.info(`Draft Release ${draftRelease.tag_name} published successfully.`); const status = release.draft ? "was draft" : "was already published";
} else { core.info(`Release ${tagName} ensured to be published (${status}).`);
core.info("No matching draft release found.");
} } catch (error) {
core.warning(`Could not find or update release for tag ${tagName}: ${error.message}`);
}
+50 -3
View File
@@ -99,15 +99,62 @@ services:
environment: environment:
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m" #KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
TZ: Asia/Shanghai TZ: Asia/Shanghai
# Unique identifier for the Kafka node (required in controller mode)
KAFKA_CFG_NODE_ID: 0 KAFKA_CFG_NODE_ID: 0
# Defines the roles this Kafka node plays: broker, controller, or both
KAFKA_CFG_PROCESS_ROLES: controller,broker KAFKA_CFG_PROCESS_ROLES: controller,broker
# Specifies which nodes are controller nodes for quorum voting.
# The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port
# The controller listener endpoint here is kafka:9093
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 # Specifies which listener is used for controller-to-controller communication
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
# Default number of partitions for new topics
KAFKA_NUM_PARTITIONS: 8 KAFKA_NUM_PARTITIONS: 8
# Whether to enable automatic topic creation
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
# Kafka internal listeners; Kafka supports multiple ports with different protocols
# Each port is used for a specific purpose: INTERNAL for internal broker communication,
# CONTROLLER for controller communication, EXTERNAL for external client connections.
# These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
# In short, Kafka is listening on three logical ports: 9092 for internal communication,
# 9093 for controller traffic, and 9094 for external access.
KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
# Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka',
# so other containers can access Kafka via kafka:9092.
# EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect.
# If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP.
KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094"
# Maps logical listener names to actual protocols.
# Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"
# Defines which listener is used for inter-broker communication within the Kafka cluster
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
# Authentication configuration variables - comment out to disable auth
# KAFKA_USERNAME: "openIM"
# KAFKA_PASSWORD: "openIM123"
command: >
/bin/sh -c '
if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then
echo "=== Kafka SASL Authentication ENABLED ==="
echo "Username: $${KAFKA_USERNAME}"
# Set environment variables for SASL authentication
export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT"
export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}"
export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}"
fi
# Start Kafka with the configured environment
exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
'
networks: networks:
- openim - openim
+61 -21
View File
@@ -2,10 +2,14 @@ package jssdk
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort" "sort"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/log"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/jssdk" "github.com/openimsdk/protocol/jssdk"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
@@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if len(conversationIDs) == 0 { if len(conversationIDs) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil return &jssdk.GetActiveConversationsResp{}, nil
} }
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if len(activeConversation) == 0 { if len(activeConversation) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil return &jssdk.GetActiveConversationsResp{}, nil
} }
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
sortConversations := sortActiveConversations{ sortConversations := sortActiveConversations{
Conversation: activeConversation, Conversation: activeConversation,
} }
@@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if err != nil { if err != nil {
return nil, err return nil, err
} }
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
return c.ConversationID return c.ConversationID
}) })
@@ -156,16 +162,15 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
if !ok { if !ok {
continue continue
} }
var lastMsg *sdkws.MsgData
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
lastMsg = msgList.Msgs[0] resp = append(resp, &jssdk.ConversationMsg{
Conversation: conv,
LastMsg: msgList.Msgs[0],
MaxSeq: c.MaxSeq,
ReadSeq: readSeq[c.ConversationID],
})
} }
resp = append(resp, &jssdk.ConversationMsg{
Conversation: conv,
LastMsg: lastMsg,
MaxSeq: c.MaxSeq,
ReadSeq: readSeq[c.ConversationID],
})
} }
if err := x.fillConversations(ctx, resp); err != nil { if err := x.fillConversations(ctx, resp); err != nil {
return nil, err return nil, err
@@ -219,18 +224,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
return nil, err return nil, err
} }
} }
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
for _, c := range conversations { for _, c := range conversations {
var lastMsg *sdkws.MsgData
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
lastMsg = msgList.Msgs[0] resp = append(resp, &jssdk.ConversationMsg{
Conversation: c,
LastMsg: msgList.Msgs[0],
MaxSeq: maxSeqs[c.ConversationID],
ReadSeq: readSeqs[c.ConversationID],
})
} }
resp = append(resp, &jssdk.ConversationMsg{
Conversation: c,
LastMsg: lastMsg,
MaxSeq: maxSeqs[c.ConversationID],
ReadSeq: readSeqs[c.ConversationID],
})
} }
if err := x.fillConversations(ctx, resp); err != nil { if err := x.fillConversations(ctx, resp); err != nil {
return nil, err return nil, err
@@ -247,3 +252,38 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
UnreadCount: unreadCount, UnreadCount: unreadCount,
}, nil }, nil
} }
// This function checks whether the latest MaxSeq message is valid.
// If not, it needs to fetch a valid message again.
func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) {
var conversationIDs []string
for conversationID, message := range messages {
allInValid := true
for _, data := range message.Msgs {
if data.Status < constant.MsgStatusHasDeleted {
allInValid = false
break
}
}
// when the conversation has been deleted by the user, the length of message.Msgs is empty
if allInValid && len(message.Msgs) > 0 {
conversationIDs = append(conversationIDs, conversationID)
}
}
if len(conversationIDs) > 0 {
resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
UserID: userID,
ConversationIDs: conversationIDs,
})
if err != nil {
log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs)
return
}
for conversationID, message := range resp.Msgs {
messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}}
}
}
}
@@ -26,10 +26,12 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery"
"github.com/go-redis/redis"
"google.golang.org/protobuf/proto"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/IBM/sarama" "github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@@ -41,7 +43,6 @@ import (
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil" "github.com/openimsdk/tools/utils/stringutil"
"google.golang.org/protobuf/proto"
) )
const ( const (
@@ -140,53 +141,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
var conversationID string // Outer map: conversationID -> (userID -> maxHasReadSeq)
var userSeqMap map[string]int64 conversationUserSeq := make(map[string]map[string]int64)
for _, msg := range msgs { for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt { if msg.message.ContentType != constant.HasReadReceipt {
continue continue
} }
var elem sdkws.NotificationElem var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil { if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
continue continue
} }
var tips sdkws.MarkAsReadTips var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
continue continue
} }
//The conversation ID for each batch of messages processed by the batcher is the same. if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
conversationID = tips.ConversationID
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue continue
} }
if userSeqMap == nil {
userSeqMap = make(map[string]int64)
}
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq { // Calculate the max seq from tips.Seqs
continue for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
}
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
} }
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
} }
if userSeqMap == nil { log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
return
} // persist to db
if len(conversationID) == 0 { for convID, userSeqMap := range conversationUserSeq {
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID) if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
} log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil { }
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
} }
} }
+10 -8
View File
@@ -103,22 +103,24 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
} }
userClient := rpcli.NewUserClient(userConn) userClient := rpcli.NewUserClient(userConn)
database := controller.NewFriendDatabase(
friendMongoDB,
friendRequestMongoDB,
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()),
mgocli.GetTx(),
)
// Initialize notification sender // Initialize notification sender
notificationSender := NewFriendNotificationSender( notificationSender := NewFriendNotificationSender(
&config.NotificationConfig, &config.NotificationConfig,
rpcli.NewMsgClient(msgConn), rpcli.NewMsgClient(msgConn),
WithRpcFunc(userClient.GetUsersInfo), WithRpcFunc(userClient.GetUsersInfo),
WithFriendDB(database),
) )
localcache.InitLocalCache(&config.LocalCacheConfig) localcache.InitLocalCache(&config.LocalCacheConfig)
// Register Friend server with refactored MongoDB and Redis integrations // Register Friend server with refactored MongoDB and Redis integrations
relation.RegisterFriendServer(server, &friendServer{ relation.RegisterFriendServer(server, &friendServer{
db: controller.NewFriendDatabase( db: database,
friendMongoDB,
friendRequestMongoDB,
redis.NewFriendCacheRedis(rdb, &config.LocalCacheConfig, friendMongoDB, redis.GetRocksCacheOptions()),
mgocli.GetTx(),
),
blackDatabase: controller.NewBlackDatabase( blackDatabase: controller.NewBlackDatabase(
blackMongoDB, blackMongoDB,
redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()), redis.NewBlackCacheRedis(rdb, &config.LocalCacheConfig, blackMongoDB, redis.GetRocksCacheOptions()),
@@ -192,7 +194,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
FromUserID: req.OwnerUserID, FromUserID: req.OwnerUserID,
ToUserID: userID, ToUserID: userID,
HandleResult: constant.FriendResponseAgree, HandleResult: constant.FriendResponseAgree,
}) }, false)
} }
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req) s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
@@ -221,7 +223,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
return nil, err return nil, err
} }
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req) s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
s.notificationSender.FriendApplicationAgreedNotification(ctx, req) s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true)
return resp, nil return resp, nil
} }
if req.HandleResult == constant.FriendResponseRefuse { if req.HandleResult == constant.FriendResponseRefuse {
+11 -5
View File
@@ -171,11 +171,17 @@ func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips) f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
} }
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) { func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) {
request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID) var (
if err != nil { request *sdkws.FriendRequest
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID) err error
return )
if checkReq {
request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
if err != nil {
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
return
}
} }
tips := sdkws.FriendApplicationApprovedTips{ tips := sdkws.FriendApplicationApprovedTips{
FromToUserID: &sdkws.FromToUserID{ FromToUserID: &sdkws.FromToUserID{
+10 -2
View File
@@ -17,9 +17,14 @@ package third
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"time" "time"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/s3/aws"
"github.com/openimsdk/tools/s3/disable"
"google.golang.org/grpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
@@ -34,7 +39,6 @@ import (
"github.com/openimsdk/tools/s3/kodo" "github.com/openimsdk/tools/s3/kodo"
"github.com/openimsdk/tools/s3/minio" "github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/s3/oss" "github.com/openimsdk/tools/s3/oss"
"google.golang.org/grpc"
) )
type thirdServer struct { type thirdServer struct {
@@ -90,6 +94,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build()) o, err = oss.NewOSS(*config.RpcConfig.Object.Oss.Build())
case "kodo": case "kodo":
o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build()) o, err = kodo.NewKodo(*config.RpcConfig.Object.Kodo.Build())
case "aws":
o, err = aws.NewAws(*config.RpcConfig.Object.Aws.Build())
case "":
o = disable.NewDisable()
default: default:
err = fmt.Errorf("invalid object enable: %s", enable) err = fmt.Errorf("invalid object enable: %s", enable)
} }
+1 -1
View File
@@ -358,7 +358,7 @@ type Redis struct {
Username string `mapstructure:"username"` Username string `mapstructure:"username"`
Password string `mapstructure:"password"` Password string `mapstructure:"password"`
ClusterMode bool `mapstructure:"clusterMode"` ClusterMode bool `mapstructure:"clusterMode"`
DB int `mapstructure:"storage"` DB int `mapstructure:"db"`
MaxRetry int `mapstructure:"maxRetry"` MaxRetry int `mapstructure:"maxRetry"`
PoolSize int `mapstructure:"poolSize"` PoolSize int `mapstructure:"poolSize"`
} }
+3 -1
View File
@@ -2,9 +2,11 @@ package rpcli
import ( import (
"context" "context"
"google.golang.org/grpc"
"github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/sdkws"
"google.golang.org/grpc"
) )
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
+1 -1
View File
@@ -1 +1 @@
v3.8.3 v3.8.3-patch.7
+9 -1
View File
@@ -1,6 +1,14 @@
package version package version
import _ "embed" import (
_ "embed"
"strings"
)
//go:embed version //go:embed version
var Version string var Version string
func init() {
Version = strings.Trim(Version, "\n")
Version = strings.TrimSpace(Version)
}