mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-29 14:59:19 +08:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 391eafb402 | |||
| 482284a0fb | |||
| a893141ae6 | |||
| 9eccfee997 | |||
| f950dbc5e7 | |||
| 55113e5277 | |||
| 7fdc438500 |
+50
-3
@@ -99,15 +99,62 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
|
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
|
||||||
TZ: Asia/Shanghai
|
TZ: Asia/Shanghai
|
||||||
|
# Unique identifier for the Kafka node (required in controller mode)
|
||||||
KAFKA_CFG_NODE_ID: 0
|
KAFKA_CFG_NODE_ID: 0
|
||||||
|
# Defines the roles this Kafka node plays: broker, controller, or both
|
||||||
KAFKA_CFG_PROCESS_ROLES: controller,broker
|
KAFKA_CFG_PROCESS_ROLES: controller,broker
|
||||||
|
# Specifies which nodes are controller nodes for quorum voting.
|
||||||
|
# The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port
|
||||||
|
# The controller listener endpoint here is kafka:9093
|
||||||
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
||||||
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
|
# Specifies which listener is used for controller-to-controller communication
|
||||||
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094
|
|
||||||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
|
||||||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||||
|
# Default number of partitions for new topics
|
||||||
KAFKA_NUM_PARTITIONS: 8
|
KAFKA_NUM_PARTITIONS: 8
|
||||||
|
# Whether to enable automatic topic creation
|
||||||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
|
||||||
|
# Kafka internal listeners; Kafka supports multiple ports with different protocols
|
||||||
|
# Each port is used for a specific purpose: INTERNAL for internal broker communication,
|
||||||
|
# CONTROLLER for controller communication, EXTERNAL for external client connections.
|
||||||
|
# These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
|
||||||
|
# In short, Kafka is listening on three logical ports: 9092 for internal communication,
|
||||||
|
# 9093 for controller traffic, and 9094 for external access.
|
||||||
|
KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
|
||||||
|
# Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka',
|
||||||
|
# so other containers can access Kafka via kafka:9092.
|
||||||
|
# EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect.
|
||||||
|
# If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP.
|
||||||
|
KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094"
|
||||||
|
# Maps logical listener names to actual protocols.
|
||||||
|
# Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
|
||||||
|
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"
|
||||||
|
# Defines which listener is used for inter-broker communication within the Kafka cluster
|
||||||
|
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
|
||||||
|
|
||||||
|
|
||||||
|
# Authentication configuration variables - comment out to disable auth
|
||||||
|
# KAFKA_USERNAME: "openIM"
|
||||||
|
# KAFKA_PASSWORD: "openIM123"
|
||||||
|
command: >
|
||||||
|
/bin/sh -c '
|
||||||
|
if [ -n "$${KAFKA_USERNAME}" ] && [ -n "$${KAFKA_PASSWORD}" ]; then
|
||||||
|
echo "=== Kafka SASL Authentication ENABLED ==="
|
||||||
|
echo "Username: $${KAFKA_USERNAME}"
|
||||||
|
|
||||||
|
# Set environment variables for SASL authentication
|
||||||
|
export KAFKA_CFG_LISTENERS="SASL_PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
|
||||||
|
export KAFKA_CFG_ADVERTISED_LISTENERS="SASL_PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
|
||||||
|
export KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
|
||||||
|
export KAFKA_CFG_SASL_ENABLED_MECHANISMS="PLAIN"
|
||||||
|
export KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL="PLAIN"
|
||||||
|
export KAFKA_CFG_INTER_BROKER_LISTENER_NAME="SASL_PLAINTEXT"
|
||||||
|
export KAFKA_CLIENT_USERS="$${KAFKA_USERNAME}"
|
||||||
|
export KAFKA_CLIENT_PASSWORDS="$${KAFKA_PASSWORD}"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Start Kafka with the configured environment
|
||||||
|
exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
|
||||||
|
'
|
||||||
networks:
|
networks:
|
||||||
- openim
|
- openim
|
||||||
|
|
||||||
|
|||||||
+61
-21
@@ -2,10 +2,14 @@ package jssdk
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
|
"github.com/openimsdk/protocol/constant"
|
||||||
|
"github.com/openimsdk/tools/log"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/conversation"
|
"github.com/openimsdk/protocol/conversation"
|
||||||
"github.com/openimsdk/protocol/jssdk"
|
"github.com/openimsdk/protocol/jssdk"
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
@@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if len(conversationIDs) == 0 {
|
if len(conversationIDs) == 0 {
|
||||||
return &jssdk.GetActiveConversationsResp{}, nil
|
return &jssdk.GetActiveConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
|
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if len(activeConversation) == 0 {
|
if len(activeConversation) == 0 {
|
||||||
return &jssdk.GetActiveConversationsResp{}, nil
|
return &jssdk.GetActiveConversationsResp{}, nil
|
||||||
}
|
}
|
||||||
|
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
sortConversations := sortActiveConversations{
|
sortConversations := sortActiveConversations{
|
||||||
Conversation: activeConversation,
|
Conversation: activeConversation,
|
||||||
}
|
}
|
||||||
@@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
|
||||||
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
||||||
return c.ConversationID
|
return c.ConversationID
|
||||||
})
|
})
|
||||||
@@ -156,16 +162,15 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
|||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var lastMsg *sdkws.MsgData
|
|
||||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||||
lastMsg = msgList.Msgs[0]
|
resp = append(resp, &jssdk.ConversationMsg{
|
||||||
|
Conversation: conv,
|
||||||
|
LastMsg: msgList.Msgs[0],
|
||||||
|
MaxSeq: c.MaxSeq,
|
||||||
|
ReadSeq: readSeq[c.ConversationID],
|
||||||
|
})
|
||||||
}
|
}
|
||||||
resp = append(resp, &jssdk.ConversationMsg{
|
|
||||||
Conversation: conv,
|
|
||||||
LastMsg: lastMsg,
|
|
||||||
MaxSeq: c.MaxSeq,
|
|
||||||
ReadSeq: readSeq[c.ConversationID],
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
if err := x.fillConversations(ctx, resp); err != nil {
|
if err := x.fillConversations(ctx, resp); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -219,18 +224,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
|
||||||
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
||||||
for _, c := range conversations {
|
for _, c := range conversations {
|
||||||
var lastMsg *sdkws.MsgData
|
|
||||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||||
lastMsg = msgList.Msgs[0]
|
resp = append(resp, &jssdk.ConversationMsg{
|
||||||
|
Conversation: c,
|
||||||
|
LastMsg: msgList.Msgs[0],
|
||||||
|
MaxSeq: maxSeqs[c.ConversationID],
|
||||||
|
ReadSeq: readSeqs[c.ConversationID],
|
||||||
|
})
|
||||||
}
|
}
|
||||||
resp = append(resp, &jssdk.ConversationMsg{
|
|
||||||
Conversation: c,
|
|
||||||
LastMsg: lastMsg,
|
|
||||||
MaxSeq: maxSeqs[c.ConversationID],
|
|
||||||
ReadSeq: readSeqs[c.ConversationID],
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
if err := x.fillConversations(ctx, resp); err != nil {
|
if err := x.fillConversations(ctx, resp); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -247,3 +252,38 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
|||||||
UnreadCount: unreadCount,
|
UnreadCount: unreadCount,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function checks whether the latest MaxSeq message is valid.
|
||||||
|
// If not, it needs to fetch a valid message again.
|
||||||
|
func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) {
|
||||||
|
var conversationIDs []string
|
||||||
|
|
||||||
|
for conversationID, message := range messages {
|
||||||
|
allInValid := true
|
||||||
|
for _, data := range message.Msgs {
|
||||||
|
if data.Status < constant.MsgStatusHasDeleted {
|
||||||
|
allInValid = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// when the conversation has been deleted by the user, the length of message.Msgs is empty
|
||||||
|
if allInValid && len(message.Msgs) > 0 {
|
||||||
|
conversationIDs = append(conversationIDs, conversationID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(conversationIDs) > 0 {
|
||||||
|
resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
||||||
|
UserID: userID,
|
||||||
|
ConversationIDs: conversationIDs,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for conversationID, message := range resp.Msgs {
|
||||||
|
messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
@@ -26,10 +26,12 @@ import (
|
|||||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||||
"github.com/openimsdk/tools/discovery"
|
"github.com/openimsdk/tools/discovery"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
|
||||||
|
|
||||||
"github.com/IBM/sarama"
|
"github.com/IBM/sarama"
|
||||||
"github.com/go-redis/redis"
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/kafka"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
|
||||||
@@ -41,7 +43,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/log"
|
"github.com/openimsdk/tools/log"
|
||||||
"github.com/openimsdk/tools/mcontext"
|
"github.com/openimsdk/tools/mcontext"
|
||||||
"github.com/openimsdk/tools/utils/stringutil"
|
"github.com/openimsdk/tools/utils/stringutil"
|
||||||
"google.golang.org/protobuf/proto"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -140,53 +141,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
|
|||||||
|
|
||||||
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
|
||||||
|
|
||||||
var conversationID string
|
// Outer map: conversationID -> (userID -> maxHasReadSeq)
|
||||||
var userSeqMap map[string]int64
|
conversationUserSeq := make(map[string]map[string]int64)
|
||||||
|
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
if msg.message.ContentType != constant.HasReadReceipt {
|
if msg.message.ContentType != constant.HasReadReceipt {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var elem sdkws.NotificationElem
|
var elem sdkws.NotificationElem
|
||||||
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
|
||||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
|
log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
var tips sdkws.MarkAsReadTips
|
var tips sdkws.MarkAsReadTips
|
||||||
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
|
||||||
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
|
log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
//The conversation ID for each batch of messages processed by the batcher is the same.
|
if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
|
||||||
conversationID = tips.ConversationID
|
|
||||||
if len(tips.Seqs) > 0 {
|
|
||||||
for _, seq := range tips.Seqs {
|
|
||||||
if tips.HasReadSeq < seq {
|
|
||||||
tips.HasReadSeq = seq
|
|
||||||
}
|
|
||||||
}
|
|
||||||
clear(tips.Seqs)
|
|
||||||
tips.Seqs = nil
|
|
||||||
}
|
|
||||||
if tips.HasReadSeq < 0 {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if userSeqMap == nil {
|
|
||||||
userSeqMap = make(map[string]int64)
|
|
||||||
}
|
|
||||||
|
|
||||||
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
|
// Calculate the max seq from tips.Seqs
|
||||||
continue
|
for _, seq := range tips.Seqs {
|
||||||
|
if tips.HasReadSeq < seq {
|
||||||
|
tips.HasReadSeq = seq
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
|
||||||
|
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
|
||||||
|
}
|
||||||
|
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
|
||||||
|
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
|
||||||
}
|
}
|
||||||
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
|
|
||||||
}
|
}
|
||||||
if userSeqMap == nil {
|
log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
|
||||||
return
|
|
||||||
}
|
// persist to db
|
||||||
if len(conversationID) == 0 {
|
for convID, userSeqMap := range conversationUserSeq {
|
||||||
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
|
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
|
||||||
}
|
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
|
||||||
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
|
}
|
||||||
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -194,7 +194,7 @@ func (s *friendServer) ImportFriends(ctx context.Context, req *relation.ImportFr
|
|||||||
FromUserID: req.OwnerUserID,
|
FromUserID: req.OwnerUserID,
|
||||||
ToUserID: userID,
|
ToUserID: userID,
|
||||||
HandleResult: constant.FriendResponseAgree,
|
HandleResult: constant.FriendResponseAgree,
|
||||||
})
|
}, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
|
s.webhookAfterImportFriends(ctx, &s.config.WebhooksConfig.AfterImportFriends, req)
|
||||||
@@ -223,7 +223,7 @@ func (s *friendServer) RespondFriendApply(ctx context.Context, req *relation.Res
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
|
s.webhookAfterAddFriendAgree(ctx, &s.config.WebhooksConfig.AfterAddFriendAgree, req)
|
||||||
s.notificationSender.FriendApplicationAgreedNotification(ctx, req)
|
s.notificationSender.FriendApplicationAgreedNotification(ctx, req, true)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
if req.HandleResult == constant.FriendResponseRefuse {
|
if req.HandleResult == constant.FriendResponseRefuse {
|
||||||
|
|||||||
@@ -171,11 +171,17 @@ func (f *FriendNotificationSender) FriendApplicationAddNotification(ctx context.
|
|||||||
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
|
f.Notification(ctx, req.FromUserID, req.ToUserID, constant.FriendApplicationNotification, &tips)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq) {
|
func (f *FriendNotificationSender) FriendApplicationAgreedNotification(ctx context.Context, req *relation.RespondFriendApplyReq, checkReq bool) {
|
||||||
request, err := f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
var (
|
||||||
if err != nil {
|
request *sdkws.FriendRequest
|
||||||
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
|
err error
|
||||||
return
|
)
|
||||||
|
if checkReq {
|
||||||
|
request, err = f.getFriendRequests(ctx, req.FromUserID, req.ToUserID)
|
||||||
|
if err != nil {
|
||||||
|
log.ZError(ctx, "FriendApplicationAgreedNotification get friend request", err, "fromUserID", req.FromUserID, "toUserID", req.ToUserID)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tips := sdkws.FriendApplicationApprovedTips{
|
tips := sdkws.FriendApplicationApprovedTips{
|
||||||
FromToUserID: &sdkws.FromToUserID{
|
FromToUserID: &sdkws.FromToUserID{
|
||||||
|
|||||||
@@ -23,6 +23,8 @@ import (
|
|||||||
"github.com/openimsdk/tools/s3/aws"
|
"github.com/openimsdk/tools/s3/aws"
|
||||||
"github.com/openimsdk/tools/s3/disable"
|
"github.com/openimsdk/tools/s3/disable"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
|
||||||
@@ -37,7 +39,6 @@ import (
|
|||||||
"github.com/openimsdk/tools/s3/kodo"
|
"github.com/openimsdk/tools/s3/kodo"
|
||||||
"github.com/openimsdk/tools/s3/minio"
|
"github.com/openimsdk/tools/s3/minio"
|
||||||
"github.com/openimsdk/tools/s3/oss"
|
"github.com/openimsdk/tools/s3/oss"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type thirdServer struct {
|
type thirdServer struct {
|
||||||
|
|||||||
@@ -358,7 +358,7 @@ type Redis struct {
|
|||||||
Username string `mapstructure:"username"`
|
Username string `mapstructure:"username"`
|
||||||
Password string `mapstructure:"password"`
|
Password string `mapstructure:"password"`
|
||||||
ClusterMode bool `mapstructure:"clusterMode"`
|
ClusterMode bool `mapstructure:"clusterMode"`
|
||||||
DB int `mapstructure:"storage"`
|
DB int `mapstructure:"db"`
|
||||||
MaxRetry int `mapstructure:"maxRetry"`
|
MaxRetry int `mapstructure:"maxRetry"`
|
||||||
PoolSize int `mapstructure:"poolSize"`
|
PoolSize int `mapstructure:"poolSize"`
|
||||||
}
|
}
|
||||||
|
|||||||
+3
-1
@@ -2,9 +2,11 @@ package rpcli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/openimsdk/protocol/msg"
|
"github.com/openimsdk/protocol/msg"
|
||||||
"github.com/openimsdk/protocol/sdkws"
|
"github.com/openimsdk/protocol/sdkws"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
|
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
|
||||||
|
|||||||
+1
-1
@@ -1 +1 @@
|
|||||||
v3.8.3
|
v3.8.3-patch.7
|
||||||
Reference in New Issue
Block a user