Compare commits

...

12 Commits

Author SHA1 Message Date
dsx137 d16a617ba8 feat: gomake upgrade (#3702) 2026-03-19 04:09:02 +00:00
icey-yu 942d155d2d feat: update protocol support botPlatform (#3696) 2026-03-11 05:44:36 +00:00
chao b7200c163c feat: add error code for handled friend requests and improve error handling in friend operations (#3670)
* fix: performance issues with Kafka caused by encapsulating the MQ interface

* fix: admin token in standalone mode

* fix: full id version

* fix: resolve deadlock in cache eviction and improve GetBatch implementation

* refactor: replace LongConn with ClientConn interface and simplify message handling

* refactor: replace LongConn with ClientConn interface and simplify message handling

* fix: seq use $setOnInsert for min_seq in conversation update

* feat: add error code for handled friend requests and improve error handling in friend operations
2026-01-22 08:34:00 +00:00
神奇bug在哪里 579db3bd48 bugfix(conversation):removed unexpectedly called functions and itself to avoid out of index query. (#3668)
# Conflicts:
#	internal/rpc/conversation/conversation.go
#	pkg/common/storage/database/mgo/conversation.go
2026-01-21 10:10:29 +00:00
chao a0e6d9aa69 fix: Mongo Malloc upsert overwrites min_seq initialization (#3657)
* fix: performance issues with Kafka caused by encapsulating the MQ interface

* fix: admin token in standalone mode

* fix: full id version

* fix: resolve deadlock in cache eviction and improve GetBatch implementation

* refactor: replace LongConn with ClientConn interface and simplify message handling

* refactor: replace LongConn with ClientConn interface and simplify message handling

* fix: seq use $setOnInsert for min_seq in conversation update
2026-01-15 06:24:17 +00:00
dsx137 fbca49d431 fix(group): set max_seq to 0 when join group (#3649) 2025-12-31 10:02:38 +00:00
dsx137 0a93fb1b6d fix(group): move member count retrieval after member deletion for accurate updates (#3651) 2025-12-31 10:01:51 +00:00
chao 78b255396f feat: replace LongConn with ClientConn interface and simplify message handling (#3643)
* fix: performance issues with Kafka caused by encapsulating the MQ interface

* fix: admin token in standalone mode

* fix: full id version

* fix: resolve deadlock in cache eviction and improve GetBatch implementation

* refactor: replace LongConn with ClientConn interface and simplify message handling

* refactor: replace LongConn with ClientConn interface and simplify message handling
2025-12-25 08:27:16 +00:00
ribin2333 6f33c0a515 fix: reset user conversation seq when rejoining group to resolve message recall issue (#3640)
* fix: reset user conversation seq when rejoining group to resolve message recall issue

* fix: refactor setMemberJoinSeq based on review feedback

* group: 入群个人上限重置为不受限值;退出个人上限固化;通知控制入群 minSeq
2025-12-23 06:37:32 +00:00
Gagan Singh c97d63754b Simplify iOS background push gating (#3611) (#3612) 2025-12-15 09:16:29 +00:00
chao 1b8a3b0b75 fix: resolve deadlock in cache eviction and improve GetBatch implementation and full id version (#3591)
* fix: performance issues with Kafka caused by encapsulating the MQ interface

* fix: admin token in standalone mode

* fix: full id version

* fix: resolve deadlock in cache eviction and improve GetBatch implementation
2025-12-12 08:24:39 +00:00
icey-yu b8c4b459fa merge: pre-release-v3.8.4 (#3623)
* merge: pre-release-v3.8.4

* merge: v3.8.4
2025-11-25 08:23:22 +00:00
41 changed files with 1921 additions and 659 deletions
@@ -96,13 +96,13 @@ jobs:
repo,
per_page: 100
});
release = releases.data.find(r => r.draft && r.tag_name === tagName);
if (!release) {
throw new Error(`No release found with tag ${tagName}`);
}
}
await github.rest.repos.updateRelease({
owner,
repo,
@@ -110,10 +110,10 @@ jobs:
draft: false,
prerelease: release.prerelease
});
const status = release.draft ? "was draft" : "was already published";
core.info(`Release ${tagName} ensured to be published (${status}).`);
} catch (error) {
core.warning(`Could not find or update release for tag ${tagName}: ${error.message}`);
}
+1 -1
View File
@@ -1,6 +1,6 @@
rpc:
# The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP
registerIP:
registerIP:
# IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP
listenIP: 0.0.0.0
# autoSetPorts indicates whether to automatically set the ports
+16 -13
View File
@@ -1,6 +1,6 @@
module github.com/openimsdk/open-im-server/v3
go 1.22.7
go 1.25.0
require (
firebase.google.com/go/v4 v4.14.1
@@ -12,11 +12,11 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.73-alpha.17
github.com/openimsdk/tools v0.0.50-alpha.105
github.com/openimsdk/protocol v0.0.73-alpha.19
github.com/openimsdk/tools v0.0.50-alpha.113
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
go.mongodb.org/mongo-driver v1.14.0
google.golang.org/api v0.170.0
google.golang.org/grpc v1.71.0
@@ -27,6 +27,7 @@ require (
require github.com/google/uuid v1.6.0
require (
github.com/IBM/sarama v1.43.0
github.com/fatih/color v1.14.1
github.com/gin-contrib/gzip v1.0.1
github.com/go-redis/redis v6.15.9+incompatible
@@ -34,7 +35,7 @@ require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/kelindar/bitmap v1.5.2
github.com/likexian/gokit v0.25.13
github.com/openimsdk/gomake v0.0.15-alpha.11
github.com/openimsdk/gomake v0.0.17
github.com/redis/go-redis/v9 v9.4.0
github.com/robfig/cron/v3 v3.0.1
github.com/shirou/gopsutil v3.21.11+incompatible
@@ -54,7 +55,6 @@ require (
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/longrunning v0.5.5 // indirect
cloud.google.com/go/storage v1.40.0 // indirect
github.com/IBM/sarama v1.43.0 // indirect
github.com/MicahParks/keyfunc v1.9.0 // indirect
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect
github.com/aws/aws-sdk-go-v2 v1.32.5 // indirect
@@ -76,6 +76,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.33.1 // indirect
github.com/aws/smithy-go v1.22.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
@@ -90,6 +91,7 @@ require (
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/ebitengine/purego v0.10.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
@@ -98,7 +100,7 @@ require (
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
@@ -108,7 +110,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
@@ -135,7 +137,7 @@ require (
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.6 // indirect
github.com/lithammer/shortuuid v3.0.0+incompatible // indirect
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de // indirect
github.com/lufia/plan9stats v0.0.0-20260216142805-b3301c5f2a88 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
@@ -152,7 +154,7 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
@@ -163,6 +165,7 @@ require (
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sercand/kuberesolver/v6 v6.0.1 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shirou/gopsutil/v4 v4.26.2 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
@@ -171,8 +174,8 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tencentyun/cos-go-sdk-v5 v0.7.47 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/tklauser/go-sysconf v0.3.16 // indirect
github.com/tklauser/numcpus v0.11.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
@@ -196,7 +199,7 @@ require (
golang.org/x/image v0.15.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/oauth2 v0.25.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
+28 -21
View File
@@ -61,6 +61,8 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmatcuk/doublestar/v4 v4.10.0 h1:zU9WiOla1YA122oLM6i4EXvGW62DvKZVxIe6TYWexEs=
github.com/bmatcuk/doublestar/v4 v4.10.0/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
@@ -103,6 +105,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/ebitengine/purego v0.10.0 h1:QIw4xfpWT6GWTzaW5XEKy3HXoqrJGx1ijYHzTF0/ISU=
github.com/ebitengine/purego v0.10.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g=
github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@@ -134,8 +138,9 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE=
github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=
github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE=
@@ -202,8 +207,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
@@ -303,8 +308,8 @@ github.com/likexian/gokit v0.25.13 h1:p2Uw3+6fGG53CwdU2Dz0T6bOycdb2+bAFAa3ymwWVk
github.com/likexian/gokit v0.25.13/go.mod h1:qQhEWFBEfqLCO3/vOEo2EDKd+EycekVtUK4tex+l2H4=
github.com/lithammer/shortuuid v3.0.0+incompatible h1:NcD0xWW/MZYXEHa6ITy6kaXN5nwm/V115vj2YXfhS0w=
github.com/lithammer/shortuuid v3.0.0+incompatible/go.mod h1:FR74pbAuElzOUuenUHTK2Tciko1/vKuIKS9dSkDrA4w=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de h1:V53FWzU6KAZVi1tPp5UIsMoUWJ2/PNwYIDXnu7QuBCE=
github.com/lufia/plan9stats v0.0.0-20230110061619-bbe2e5e100de/go.mod h1:JKx41uQRwqlTZabZc+kILPrO/3jlKnQ2Z8b7YiVw5cE=
github.com/lufia/plan9stats v0.0.0-20260216142805-b3301c5f2a88 h1:PTw+yKnXcOFCR6+8hHTyWBeQ/P4Nb7dd4/0ohEcWQuM=
github.com/lufia/plan9stats v0.0.0-20260216142805-b3301c5f2a88/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
@@ -347,12 +352,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.11 h1:PQudYDRESYeYlUYrrLLJhYIlUPO5x7FAx+o5El9U/Bw=
github.com/openimsdk/gomake v0.0.15-alpha.11/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.73-alpha.17 h1:ddo0QMns1GVwAmrPIPlAQ7uKmThAYLnOt+CIOgLsJyE=
github.com/openimsdk/protocol v0.0.73-alpha.17/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.105 h1:axuCvKXhxY2RGLhpMMFNgBtE0B65T2Sr1JDW3UD9nBs=
github.com/openimsdk/tools v0.0.50-alpha.105/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4=
github.com/openimsdk/gomake v0.0.17 h1:q8haP48VOH45WhJRiLj1YSBJyUFJqD8CTedH65i1YH8=
github.com/openimsdk/gomake v0.0.17/go.mod h1:nnjS8yCtrPJAt1knMbyPiUwCH2gpyBzj/EZAONfUOXg=
github.com/openimsdk/protocol v0.0.73-alpha.19 h1:CvXoDF2U73UcMhLnrtMFks2Aw+bXiDgH8AITEt783/s=
github.com/openimsdk/protocol v0.0.73-alpha.19/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw=
github.com/openimsdk/tools v0.0.50-alpha.113 h1:rhLWaSJuhjgJFNVzmpChLCG7dPXS0+bte+CPI0008Us=
github.com/openimsdk/tools v0.0.50-alpha.113/go.mod h1:x9i/e+WJFW4tocy6RNJQ9NofQiP3KJ1Y576/06TqOG4=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
@@ -363,8 +368,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig=
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
@@ -403,6 +408,8 @@ github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKl
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI=
github.com/shirou/gopsutil/v3 v3.24.5/go.mod h1:bsoOS1aStSs9ErQ1WWfxllSeS1K5D+U30r2NfcubMVk=
github.com/shirou/gopsutil/v4 v4.26.2 h1:X8i6sicvUFih4BmYIGT1m2wwgw2VG9YgrDTi7cIRGUI=
github.com/shirou/gopsutil/v4 v4.26.2/go.mod h1:LZ6ewCSkBqUpvSOf+LsTGnRinC6iaNUNMGBtDkJBaLQ=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
@@ -433,18 +440,18 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.563/go.mod h1:7sCQWVkxcsR38nffDW057DRGk8mUjK1Ing/EFOK8s8Y=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.563/go.mod h1:uom4Nvi9W+Qkom0exYiJ9VWJjXwyxtPYTkKkaLMlfE0=
github.com/tencentyun/cos-go-sdk-v5 v0.7.47 h1:uoS4Sob16qEYoapkqJq1D1Vnsy9ira9BfNUMtoFYTI4=
github.com/tencentyun/cos-go-sdk-v5 v0.7.47/go.mod h1:DH9US8nB+AJXqwu/AMOrCFN1COv3dpytXuJWHgdg7kE=
github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4=
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/tklauser/go-sysconf v0.3.16 h1:frioLaCQSsF5Cy1jgRBrzr6t502KIIwQ0MArYICU0nA=
github.com/tklauser/go-sysconf v0.3.16/go.mod h1:/qNL9xxDhc7tx3HSRsLWNnuzbVfh3e7gh/BmM179nYI=
github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9RXw=
github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
@@ -567,8 +574,8 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+1
View File
@@ -290,6 +290,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cf
conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs)
conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs)
conversationGroup.POST("/delete_conversations", c.DeleteConversations)
conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser)
}
{
+10 -151
View File
@@ -16,7 +16,6 @@ package msggateway
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
@@ -31,7 +30,6 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/stringutil"
)
var (
@@ -64,7 +62,7 @@ type PingPongHandler func(string) error
type Client struct {
w *sync.Mutex
conn LongConn
conn ClientConn
PlatformID int `json:"platformID"`
IsCompress bool `json:"isCompress"`
UserID string `json:"userID"`
@@ -84,10 +82,10 @@ type Client struct {
}
// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer) {
func (c *Client) ResetClient(ctx *UserConnContext, conn ClientConn, longConnServer LongConnServer) {
c.w = new(sync.Mutex)
c.conn = conn
c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
c.PlatformID = ctx.GetPlatformID()
c.IsCompress = ctx.GetCompression()
c.IsBackground = ctx.GetBackground()
c.UserID = ctx.GetUserID()
@@ -112,22 +110,6 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.subUserIDs = make(map[string]struct{})
}
func (c *Client) pingHandler(appData string) error {
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
log.ZDebug(c.ctx, "ping Handler Success.", "appData", appData)
return c.writePongMsg(appData)
}
func (c *Client) pongHandler(_ string) error {
if err := c.conn.SetReadDeadline(pongWait); err != nil {
return err
}
return nil
}
// readMessage continuously reads messages from the connection.
func (c *Client) readMessage() {
defer func() {
@@ -138,52 +120,25 @@ func (c *Client) readMessage() {
c.close()
}()
c.conn.SetReadLimit(maxMessageSize)
_ = c.conn.SetReadDeadline(pongWait)
c.conn.SetPongHandler(c.pongHandler)
c.conn.SetPingHandler(c.pingHandler)
c.activeHeartbeat(c.hbCtx)
for {
log.ZDebug(c.ctx, "readMessage")
messageType, message, returnErr := c.conn.ReadMessage()
message, returnErr := c.conn.ReadMessage()
if returnErr != nil {
log.ZWarn(c.ctx, "readMessage", returnErr, "messageType", messageType)
log.ZWarn(c.ctx, "readMessage", returnErr)
c.closedErr = returnErr
return
}
log.ZDebug(c.ctx, "readMessage", "messageType", messageType)
if c.closed.Load() {
// The scenario where the connection has just been closed, but the coroutine has not exited
c.closedErr = ErrConnClosed
return
}
switch messageType {
case MessageBinary:
_ = c.conn.SetReadDeadline(pongWait)
parseDataErr := c.handleMessage(message)
if parseDataErr != nil {
c.closedErr = parseDataErr
return
}
case MessageText:
_ = c.conn.SetReadDeadline(pongWait)
parseDataErr := c.handlerTextMessage(message)
if parseDataErr != nil {
c.closedErr = parseDataErr
return
}
case PingMessage:
err := c.writePongMsg("")
log.ZError(c.ctx, "writePongMsg", err)
case CloseMessage:
c.closedErr = ErrClientClosed
parseDataErr := c.handleMessage(message)
if parseDataErr != nil {
c.closedErr = parseDataErr
return
default:
}
}
}
@@ -358,109 +313,13 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
c.w.Lock()
defer c.w.Unlock()
err = c.conn.SetWriteDeadline(writeWait)
if err != nil {
return err
}
if c.IsCompress {
resultBuf, compressErr := c.longConnServer.CompressWithPool(encodedBuf)
if compressErr != nil {
return compressErr
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
return c.conn.WriteMessage(resultBuf)
}
return c.conn.WriteMessage(MessageBinary, encodedBuf)
}
// Actively initiate Heartbeat when platform in Web.
func (c *Client) activeHeartbeat(ctx context.Context) {
if c.PlatformID == constant.WebPlatformID {
go func() {
defer func() {
if r := recover(); r != nil {
log.ZPanic(ctx, "activeHeartbeat Panic", errs.ErrPanic(r))
}
}()
log.ZDebug(ctx, "server initiative send heartbeat start.")
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.writePingMsg(); err != nil {
log.ZWarn(c.ctx, "send Ping Message error.", err)
return
}
case <-c.hbCtx.Done():
return
}
}
}()
}
}
func (c *Client) writePingMsg() error {
if c.closed.Load() {
return nil
}
c.w.Lock()
defer c.w.Unlock()
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
return err
}
return c.conn.WriteMessage(PingMessage, nil)
}
func (c *Client) writePongMsg(appData string) error {
log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
if c.closed.Load() {
log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData, "closed err", c.closedErr)
return nil
}
c.w.Lock()
defer c.w.Unlock()
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData)
return errs.Wrap(err)
}
err = c.conn.WriteMessage(PongMessage, []byte(appData))
if err != nil {
log.ZWarn(c.ctx, "Write Message have error", errs.Wrap(err), "Pong msg", PongMessage)
}
return errs.Wrap(err)
}
func (c *Client) handlerTextMessage(b []byte) error {
var msg TextMessage
if err := json.Unmarshal(b, &msg); err != nil {
return err
}
switch msg.Type {
case TextPong:
return nil
case TextPing:
msg.Type = TextPong
msgData, err := json.Marshal(msg)
if err != nil {
return err
}
c.w.Lock()
defer c.w.Unlock()
if err := c.conn.SetWriteDeadline(writeWait); err != nil {
return err
}
return c.conn.WriteMessage(MessageText, msgData)
default:
return fmt.Errorf("not support message type %s", msg.Type)
}
return c.conn.WriteMessage(encodedBuf)
}
+229
View File
@@ -0,0 +1,229 @@
package msggateway
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/openimsdk/tools/log"
)
var ErrWriteFull = fmt.Errorf("websocket write buffer full,close connection")
type ClientConn interface {
ReadMessage() ([]byte, error)
WriteMessage(message []byte) error
Close() error
}
type websocketMessage struct {
MessageType int
Data []byte
}
func NewWebSocketClientConn(conn *websocket.Conn, readLimit int64, readTimeout time.Duration, pingInterval time.Duration) ClientConn {
c := &websocketClientConn{
readTimeout: readTimeout,
conn: conn,
writer: make(chan *websocketMessage, 256),
done: make(chan struct{}),
}
if readLimit > 0 {
c.conn.SetReadLimit(readLimit)
}
c.conn.SetPingHandler(c.pingHandler)
c.conn.SetPongHandler(c.pongHandler)
go c.loopSend()
if pingInterval > 0 {
go c.doPing(pingInterval)
}
return c
}
type websocketClientConn struct {
readTimeout time.Duration
conn *websocket.Conn
writer chan *websocketMessage
done chan struct{}
err atomic.Pointer[error]
}
func (c *websocketClientConn) ReadMessage() ([]byte, error) {
buf, err := c.readMessage()
if err != nil {
return nil, c.closeBy(fmt.Errorf("read message %w", err))
}
return buf, nil
}
func (c *websocketClientConn) WriteMessage(message []byte) error {
return c.writeMessage(websocket.BinaryMessage, message)
}
func (c *websocketClientConn) Close() error {
return c.closeBy(fmt.Errorf("websocket connection closed"))
}
func (c *websocketClientConn) closeBy(err error) error {
if !c.err.CompareAndSwap(nil, &err) {
return *c.err.Load()
}
close(c.done)
log.ZWarn(context.Background(), "websocket connection closed", err, "remoteAddr", c.conn.RemoteAddr(),
"chan length", len(c.writer))
return err
}
func (c *websocketClientConn) writeMessage(messageType int, data []byte) error {
if errPtr := c.err.Load(); errPtr != nil {
return *errPtr
}
select {
case c.writer <- &websocketMessage{MessageType: messageType, Data: data}:
return nil
default:
return c.closeBy(ErrWriteFull)
}
}
func (c *websocketClientConn) loopSend() {
defer func() {
_ = c.conn.Close()
}()
var err error
for {
select {
case <-c.done:
for {
select {
case msg := <-c.writer:
switch msg.MessageType {
case websocket.TextMessage, websocket.BinaryMessage:
err = c.conn.WriteMessage(msg.MessageType, msg.Data)
default:
err = c.conn.WriteControl(msg.MessageType, msg.Data, time.Time{})
}
if err != nil {
_ = c.closeBy(err)
return
}
default:
return
}
}
case msg := <-c.writer:
switch msg.MessageType {
case websocket.TextMessage, websocket.BinaryMessage:
err = c.conn.WriteMessage(msg.MessageType, msg.Data)
default:
err = c.conn.WriteControl(msg.MessageType, msg.Data, time.Time{})
}
if err != nil {
_ = c.closeBy(err)
return
}
}
}
}
func (c *websocketClientConn) setReadDeadline() error {
deadline := time.Now().Add(c.readTimeout)
return c.conn.SetReadDeadline(deadline)
}
func (c *websocketClientConn) readMessage() ([]byte, error) {
for {
if err := c.setReadDeadline(); err != nil {
return nil, err
}
messageType, buf, err := c.conn.ReadMessage()
if err != nil {
return nil, err
}
switch messageType {
case websocket.BinaryMessage:
return buf, nil
case websocket.TextMessage:
if err := c.onReadTextMessage(buf); err != nil {
return nil, err
}
case websocket.PingMessage:
if err := c.pingHandler(string(buf)); err != nil {
return nil, err
}
case websocket.PongMessage:
if err := c.pongHandler(string(buf)); err != nil {
return nil, err
}
case websocket.CloseMessage:
if len(buf) == 0 {
return nil, errors.New("websocket connection closed by peer")
}
return nil, fmt.Errorf("websocket connection closed by peer, data %s", string(buf))
default:
return nil, fmt.Errorf("unknown websocket message type %d", messageType)
}
}
}
func (c *websocketClientConn) onReadTextMessage(buf []byte) error {
var msg struct {
Type string `json:"type"`
Body json.RawMessage `json:"body"`
}
if err := json.Unmarshal(buf, &msg); err != nil {
return err
}
switch msg.Type {
case TextPong:
return nil
case TextPing:
msg.Type = TextPong
msgData, err := json.Marshal(msg)
if err != nil {
return err
}
return c.writeMessage(websocket.TextMessage, msgData)
default:
return fmt.Errorf("not support text message type %s", msg.Type)
}
}
func (c *websocketClientConn) pingHandler(appData string) error {
log.ZDebug(context.Background(), "ping handler recv ping", "remoteAddr", c.conn.RemoteAddr(), "appData", appData)
if err := c.setReadDeadline(); err != nil {
return err
}
err := c.conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(time.Second*1))
if err != nil {
log.ZWarn(context.Background(), "ping handler write pong error", err, "remoteAddr", c.conn.RemoteAddr(), "appData", appData)
}
log.ZDebug(context.Background(), "ping handler write pong success", "remoteAddr", c.conn.RemoteAddr(), "appData", appData)
return nil
}
func (c *websocketClientConn) pongHandler(string) error {
return nil
}
func (c *websocketClientConn) doPing(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-c.done:
return
case <-ticker.C:
if err := c.writeMessage(websocket.PingMessage, nil); err != nil {
_ = c.closeBy(fmt.Errorf("send ping %w", err))
return
}
}
}
}
+142 -85
View File
@@ -15,6 +15,8 @@
package msggateway
import (
"encoding/base64"
"encoding/json"
"net/http"
"net/url"
"strconv"
@@ -24,10 +26,21 @@ import (
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/utils/encrypt"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/openimsdk/tools/utils/timeutil"
)
type UserConnContextInfo struct {
Token string `json:"token"`
UserID string `json:"userID"`
PlatformID int `json:"platformID"`
OperationID string `json:"operationID"`
Compression string `json:"compression"`
SDKType string `json:"sdkType"`
SendResponse bool `json:"sendResponse"`
Background bool `json:"background"`
SDKVersion string `json:"sdkVersion"`
}
type UserConnContext struct {
RespWriter http.ResponseWriter
Req *http.Request
@@ -35,6 +48,7 @@ type UserConnContext struct {
Method string
RemoteAddr string
ConnID string
info *UserConnContextInfo
}
func (c *UserConnContext) Deadline() (deadline time.Time, ok bool) {
@@ -58,9 +72,11 @@ func (c *UserConnContext) Value(key any) any {
case constant.ConnID:
return c.GetConnID()
case constant.OpUserPlatform:
return constant.PlatformIDToName(stringutil.StringToInt(c.GetPlatformID()))
return c.GetPlatformID()
case constant.RemoteAddr:
return c.RemoteAddr
case SDKVersion:
return c.info.SDKVersion
default:
return ""
}
@@ -83,30 +99,92 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
func newTempContext() *UserConnContext {
return &UserConnContext{
Req: &http.Request{URL: &url.URL{}},
Req: &http.Request{URL: &url.URL{}},
info: &UserConnContextInfo{},
}
}
func (c *UserConnContext) ParseEssentialArgs() error {
query := c.Req.URL.Query()
if data := query.Get("v"); data != "" {
return c.parseByJson(data)
} else {
return c.parseByQuery(query, c.Req.Header)
}
}
func (c *UserConnContext) parseByQuery(query url.Values, header http.Header) error {
info := UserConnContextInfo{
Token: query.Get(Token),
UserID: query.Get(WsUserID),
OperationID: query.Get(OperationID),
Compression: query.Get(Compression),
SDKType: query.Get(SDKType),
SDKVersion: query.Get(SDKVersion),
}
platformID, err := strconv.Atoi(query.Get(PlatformID))
if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")
}
info.PlatformID = platformID
if val := query.Get(SendResponse); val != "" {
ok, err := strconv.ParseBool(val)
if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("isMsgResp is not bool")
}
info.SendResponse = ok
}
if info.Compression == "" {
info.Compression = header.Get(Compression)
}
background, err := strconv.ParseBool(query.Get(BackgroundStatus))
if err != nil {
return err
}
info.Background = background
return c.checkInfo(&info)
}
func (c *UserConnContext) parseByJson(data string) error {
reqInfo, err := base64.RawURLEncoding.DecodeString(data)
if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("data is not base64")
}
var info UserConnContextInfo
if err := json.Unmarshal(reqInfo, &info); err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("data is not json", "info", err.Error())
}
return c.checkInfo(&info)
}
func (c *UserConnContext) checkInfo(info *UserConnContextInfo) error {
if info.OperationID == "" {
return servererrs.ErrConnArgsErr.WrapMsg("operationID is empty")
}
if info.Token == "" {
return servererrs.ErrConnArgsErr.WrapMsg("token is empty")
}
if info.UserID == "" {
return servererrs.ErrConnArgsErr.WrapMsg("sendID is empty")
}
if _, ok := constant.PlatformID2Name[info.PlatformID]; !ok {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is invalid")
}
switch info.SDKType {
case "":
info.SDKType = GoSDK
case GoSDK, JsSDK:
default:
return servererrs.ErrConnArgsErr.WrapMsg("sdkType is invalid")
}
c.info = info
return nil
}
func (c *UserConnContext) GetRemoteAddr() string {
return c.RemoteAddr
}
func (c *UserConnContext) Query(key string) (string, bool) {
var value string
if value = c.Req.URL.Query().Get(key); value == "" {
return value, false
}
return value, true
}
func (c *UserConnContext) GetHeader(key string) (string, bool) {
var value string
if value = c.Req.Header.Get(key); value == "" {
return value, false
}
return value, true
}
func (c *UserConnContext) SetHeader(key, value string) {
c.RespWriter.Header().Set(key, value)
}
@@ -120,97 +198,76 @@ func (c *UserConnContext) GetConnID() string {
}
func (c *UserConnContext) GetUserID() string {
return c.Req.URL.Query().Get(WsUserID)
if c == nil || c.info == nil {
return ""
}
return c.info.UserID
}
func (c *UserConnContext) GetPlatformID() string {
return c.Req.URL.Query().Get(PlatformID)
func (c *UserConnContext) GetPlatformID() int {
if c == nil || c.info == nil {
return 0
}
return c.info.PlatformID
}
func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID)
if c == nil || c.info == nil {
return ""
}
return c.info.OperationID
}
func (c *UserConnContext) SetOperationID(operationID string) {
values := c.Req.URL.Query()
values.Set(OperationID, operationID)
c.Req.URL.RawQuery = values.Encode()
if c.info == nil {
c.info = &UserConnContextInfo{}
}
c.info.OperationID = operationID
}
func (c *UserConnContext) GetToken() string {
return c.Req.URL.Query().Get(Token)
}
func (c *UserConnContext) GetSDKVersion() string {
return c.Req.URL.Query().Get(SDKVersion)
if c == nil || c.info == nil {
return ""
}
return c.info.Token
}
func (c *UserConnContext) GetCompression() bool {
compression, exists := c.Query(Compression)
if exists && compression == GzipCompressionProtocol {
return true
} else {
compression, exists := c.GetHeader(Compression)
if exists && compression == GzipCompressionProtocol {
return true
}
}
return false
return c != nil && c.info != nil && c.info.Compression == GzipCompressionProtocol
}
func (c *UserConnContext) GetSDKType() string {
sdkType := c.Req.URL.Query().Get(SDKType)
if sdkType == "" {
sdkType = GoSDK
if c == nil || c.info == nil {
return GoSDK
}
return sdkType
switch c.info.SDKType {
case "", GoSDK:
return GoSDK
case JsSDK:
return JsSDK
default:
return ""
}
}
func (c *UserConnContext) GetSDKVersion() string {
if c == nil || c.info == nil {
return ""
}
return c.info.SDKVersion
}
func (c *UserConnContext) ShouldSendResp() bool {
errResp, exists := c.Query(SendResponse)
if exists {
b, err := strconv.ParseBool(errResp)
if err != nil {
return false
} else {
return b
}
}
return false
return c != nil && c.info != nil && c.info.SendResponse
}
func (c *UserConnContext) SetToken(token string) {
c.Req.URL.RawQuery = Token + "=" + token
if c.info == nil {
c.info = &UserConnContextInfo{}
}
c.info.Token = token
}
func (c *UserConnContext) GetBackground() bool {
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))
if err != nil {
return false
}
return b
}
func (c *UserConnContext) ParseEssentialArgs() error {
_, exists := c.Query(Token)
if !exists {
return servererrs.ErrConnArgsErr.WrapMsg("token is empty")
}
_, exists = c.Query(WsUserID)
if !exists {
return servererrs.ErrConnArgsErr.WrapMsg("sendID is empty")
}
platformIDStr, exists := c.Query(PlatformID)
if !exists {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is empty")
}
_, err := strconv.Atoi(platformIDStr)
if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")
}
switch sdkType, _ := c.Query(SDKType); sdkType {
case "", GoSDK, JsSDK:
default:
return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js")
}
return nil
return c != nil && c.info != nil && c.info.Background
}
+9 -12
View File
@@ -152,19 +152,16 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M
userPlatform := &msggateway.SingleMsgToUserPlatform{
RecvPlatFormID: int32(client.PlatformID),
}
if !client.IsBackground ||
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, msgData)
if err != nil {
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
} else {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
result.OnlinePush = true
}
}
} else {
if client.IsBackground && client.PlatformID == constant.IOSPlatformID {
userPlatform.ResultCode = int64(servererrs.ErrIOSBackgroundPushErr.Code())
result.Resp = append(result.Resp, userPlatform)
continue
}
if err := client.PushMessage(ctx, msgData); err != nil {
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
} else if _, ok := s.pushTerminal[client.PlatformID]; ok {
result.OnlinePush = true
}
result.Resp = append(result.Resp, userPlatform)
}
-179
View File
@@ -1,179 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msggateway
import (
"encoding/json"
"net/http"
"time"
"github.com/openimsdk/tools/apiresp"
"github.com/gorilla/websocket"
"github.com/openimsdk/tools/errs"
)
type LongConn interface {
// Close this connection
Close() error
// WriteMessage Write message to connection,messageType means data type,can be set binary(2) and text(1).
WriteMessage(messageType int, message []byte) error
// ReadMessage Read message from connection.
ReadMessage() (int, []byte, error)
// SetReadDeadline sets the read deadline on the underlying network connection,
// after a read has timed out, will return an error.
SetReadDeadline(timeout time.Duration) error
// SetWriteDeadline sets to write deadline when send message,when read has timed out,will return error.
SetWriteDeadline(timeout time.Duration) error
// Dial Try to dial a connection,url must set auth args,header can control compress data
Dial(urlStr string, requestHeader http.Header) (*http.Response, error)
// IsNil Whether the connection of the current long connection is nil
IsNil() bool
// SetConnNil Set the connection of the current long connection to nil
SetConnNil()
// SetReadLimit sets the maximum size for a message read from the peer.bytes
SetReadLimit(limit int64)
SetPongHandler(handler PingPongHandler)
SetPingHandler(handler PingPongHandler)
// GenerateLongConn Check the connection of the current and when it was sent are the same
GenerateLongConn(w http.ResponseWriter, r *http.Request) error
}
type GWebSocket struct {
protocolType int
conn *websocket.Conn
handshakeTimeout time.Duration
writeBufferSize int
}
func newGWebSocket(protocolType int, handshakeTimeout time.Duration, wbs int) *GWebSocket {
return &GWebSocket{protocolType: protocolType, handshakeTimeout: handshakeTimeout, writeBufferSize: wbs}
}
func (d *GWebSocket) Close() error {
return d.conn.Close()
}
func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) error {
upgrader := &websocket.Upgrader{
HandshakeTimeout: d.handshakeTimeout,
CheckOrigin: func(r *http.Request) bool { return true },
}
if d.writeBufferSize > 0 { // default is 4kb.
upgrader.WriteBufferSize = d.writeBufferSize
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
// The upgrader.Upgrade method usually returns enough error messages to diagnose problems that may occur during the upgrade
return errs.WrapMsg(err, "GenerateLongConn: WebSocket upgrade failed")
}
d.conn = conn
return nil
}
func (d *GWebSocket) WriteMessage(messageType int, message []byte) error {
// d.setSendConn(d.conn)
return d.conn.WriteMessage(messageType, message)
}
// func (d *GWebSocket) setSendConn(sendConn *websocket.Conn) {
// d.sendConn = sendConn
//}
func (d *GWebSocket) ReadMessage() (int, []byte, error) {
return d.conn.ReadMessage()
}
func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error {
return d.conn.SetReadDeadline(time.Now().Add(timeout))
}
func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error {
if timeout <= 0 {
return errs.New("timeout must be greater than 0")
}
// TODO SetWriteDeadline Future add error handling
if err := d.conn.SetWriteDeadline(time.Now().Add(timeout)); err != nil {
return errs.WrapMsg(err, "GWebSocket.SetWriteDeadline failed")
}
return nil
}
func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) {
conn, httpResp, err := websocket.DefaultDialer.Dial(urlStr, requestHeader)
if err != nil {
return httpResp, errs.WrapMsg(err, "GWebSocket.Dial failed", "url", urlStr)
}
d.conn = conn
return httpResp, nil
}
func (d *GWebSocket) IsNil() bool {
return d.conn == nil
//
// if d.conn != nil {
// return false
// }
// return true
}
func (d *GWebSocket) SetConnNil() {
d.conn = nil
}
func (d *GWebSocket) SetReadLimit(limit int64) {
d.conn.SetReadLimit(limit)
}
func (d *GWebSocket) SetPongHandler(handler PingPongHandler) {
d.conn.SetPongHandler(handler)
}
func (d *GWebSocket) SetPingHandler(handler PingPongHandler) {
d.conn.SetPingHandler(handler)
}
func (d *GWebSocket) RespondWithError(err error, w http.ResponseWriter, r *http.Request) error {
if err := d.GenerateLongConn(w, r); err != nil {
return err
}
data, err := json.Marshal(apiresp.ParseError(err))
if err != nil {
_ = d.Close()
return errs.WrapMsg(err, "json marshal failed")
}
if err := d.WriteMessage(MessageText, data); err != nil {
_ = d.Close()
return errs.WrapMsg(err, "WriteMessage failed")
}
_ = d.Close()
return nil
}
func (d *GWebSocket) RespondWithSuccess() error {
data, err := json.Marshal(apiresp.ParseError(nil))
if err != nil {
_ = d.Close()
return errs.WrapMsg(err, "json marshal failed")
}
if err := d.WriteMessage(MessageText, data); err != nil {
_ = d.Close()
return errs.WrapMsg(err, "WriteMessage failed")
}
return nil
}
+57 -51
View File
@@ -2,18 +2,20 @@ package msggateway
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
pbAuth "github.com/openimsdk/protocol/auth"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mcontext"
"github.com/go-playground/validator/v10"
@@ -23,10 +25,11 @@ import (
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
)
var wsSuccessResponse, _ = json.Marshal(&apiresp.ApiResponse{})
type LongConnServer interface {
Run(ctx context.Context) error
wsHandler(w http.ResponseWriter, r *http.Request)
@@ -43,6 +46,7 @@ type LongConnServer interface {
}
type WsServer struct {
websocket *websocket.Upgrader
msgGatewayConfig *Config
port int
wsMaxConnNum int64
@@ -136,9 +140,13 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
o(&config)
}
//userRpcClient := rpcclient.NewUserRpcClient(client, config.Discovery.RpcService.User, config.Share.IMAdminUser)
upgrader := &websocket.Upgrader{
HandshakeTimeout: config.handshakeTimeout,
CheckOrigin: func(r *http.Request) bool { return true },
}
v := validator.New()
return &WsServer{
websocket: upgrader,
msgGatewayConfig: msgGatewayConfig,
port: config.port,
wsMaxConnNum: config.maxConnNum,
@@ -260,8 +268,7 @@ func (ws *WsServer) registerClient(client *Client) {
)
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID,
"sdkVersion", client.SDKVersion)
log.ZInfo(client.ctx, "registerClient", "userID", client.UserID, "platformID", client.PlatformID)
if !userOK {
ws.clients.Set(client.UserID, client)
@@ -448,7 +455,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
// validateRespWithRequest checks if the response matches the expected userID and platformID.
func (ws *WsServer) validateRespWithRequest(ctx *UserConnContext, resp *pbAuth.ParseTokenResp) error {
userID := ctx.GetUserID()
platformID := stringutil.StringToInt32(ctx.GetPlatformID())
platformID := int32(ctx.GetPlatformID())
if resp.UserID != userID {
return servererrs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token uid %s != userID %s", resp.UserID, userID))
}
@@ -458,19 +465,37 @@ func (ws *WsServer) validateRespWithRequest(ctx *UserConnContext, resp *pbAuth.P
return nil
}
func (ws *WsServer) handlerError(ctx *UserConnContext, w http.ResponseWriter, r *http.Request, err error) {
if !ctx.ShouldSendResp() {
httpError(ctx, err)
return
}
// the browser cannot get the response of upgrade failure
data, err := json.Marshal(apiresp.ParseError(err))
if err != nil {
log.ZError(ctx, "json marshal failed", err)
return
}
conn, upgradeErr := ws.websocket.Upgrade(w, r, nil)
if upgradeErr != nil {
log.ZWarn(ctx, "websocket upgrade failed", upgradeErr, "respErr", err, "resp", string(data))
return
}
defer conn.Close()
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
log.ZWarn(ctx, "WriteMessage failed", err, "respErr", err, "resp", string(data))
return
}
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
// Create a new connection context
connContext := newContext(w, r)
if !ws.ready.Load() {
httpError(connContext, errs.New("ws server not ready"))
return
}
// Check if the current number of online user connections exceeds the maximum limit
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
// If it exceeds the maximum connection number, return an error via HTTP and stop processing
httpError(connContext, servererrs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit"))
ws.handlerError(connContext, w, r, servererrs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit"))
return
}
@@ -478,31 +503,14 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
err := connContext.ParseEssentialArgs()
if err != nil {
// If there's an error during parsing, return an error via HTTP and stop processing
httpError(connContext, err)
return
}
if ws.authClient == nil {
httpError(connContext, errs.New("auth client is not initialized"))
ws.handlerError(connContext, w, r, err)
return
}
// Call the authentication client to parse the Token obtained from the context
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
if err != nil {
// If there's an error parsing the Token, decide whether to send the error message via WebSocket based on the context flag
shouldSendError := connContext.ShouldSendResp()
if shouldSendError {
// Create a WebSocket connection object and attempt to send the error message via WebSocket
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
if err := wsLongConn.RespondWithError(err, w, r); err == nil {
// If the error message is successfully sent via WebSocket, stop processing
return
}
}
// If sending via WebSocket is not required or fails, return the error via HTTP and stop processing
httpError(connContext, err)
ws.handlerError(connContext, w, r, err)
return
}
@@ -510,32 +518,30 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
err = ws.validateRespWithRequest(connContext, resp)
if err != nil {
// If validation fails, return an error via HTTP and stop processing
httpError(connContext, err)
ws.handlerError(connContext, w, r, err)
return
}
log.ZDebug(connContext, "new conn", "token", connContext.GetToken())
// Create a WebSocket long connection object
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
if err := wsLongConn.GenerateLongConn(w, r); err != nil {
//If the creation of the long connection fails, the error is handled internally during the handshake process.
log.ZWarn(connContext, "long connection fails", err)
conn, err := ws.websocket.Upgrade(w, r, nil)
if err != nil {
log.ZWarn(connContext, "websocket upgrade failed", err)
return
} else {
// Check if a normal response should be sent via WebSocket
shouldSendSuccessResp := connContext.ShouldSendResp()
if shouldSendSuccessResp {
// Attempt to send a success message through WebSocket
if err := wsLongConn.RespondWithSuccess(); err != nil {
// If the success message is successfully sent, end further processing
return
}
}
if connContext.ShouldSendResp() {
if err := conn.WriteMessage(websocket.TextMessage, wsSuccessResponse); err != nil {
log.ZWarn(connContext, "WriteMessage first response", err)
return
}
}
// Retrieve a client object from the client pool, reset its state, and associate it with the current WebSocket long connection
client := ws.clientPool.Get().(*Client)
client.ResetClient(connContext, wsLongConn, ws)
log.ZDebug(connContext, "new conn", "token", connContext.GetToken())
var pingInterval time.Duration
if connContext.GetPlatformID() == constant.WebPlatformID {
pingInterval = pingPeriod
}
client := new(Client)
client.ResetClient(connContext, NewWebSocketClientConn(conn, maxMessageSize, pongWait, pingInterval), ws)
// Register the client with the server and start message processing
ws.registerChan <- client
-58
View File
@@ -513,14 +513,6 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req
return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil
}
func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) {
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs)
if err != nil {
return nil, err
}
return &pbconversation.GetConversationsByConversationIDResp{Conversations: convert.ConversationsDB2Pb(conversations)}, nil
}
func (c *conversationServer) GetConversationOfflinePushUserIDs(ctx context.Context, req *pbconversation.GetConversationOfflinePushUserIDsReq) (*pbconversation.GetConversationOfflinePushUserIDsResp, error) {
if req.ConversationID == "" {
return nil, errs.ErrArgs.WrapMsg("conversationID is empty")
@@ -717,56 +709,6 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
}, nil
}
func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return nil, err
}
const batchNum = 100
if num == 0 {
return nil, errs.New("Need Destruct Msg is nil").Wrap()
}
maxPage := (num + batchNum - 1) / batchNum
temp := make([]*dbModel.Conversation, 0, maxPage*batchNum)
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
// log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
for _, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
conversation.LatestMsgDestructTime.IsZero()) {
temp = append(temp, conversation)
}
}
}
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}
func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
if err := authverify.CheckAccess(ctx, req.UserID); err != nil {
return nil, err
+1 -1
View File
@@ -27,7 +27,7 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re
conversationIDs = nil
}
return &conversation.GetFullOwnerConversationIDsResp{
Version: idHash,
Version: uint64(vl.Version),
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
ConversationIDs: conversationIDs,
+19 -5
View File
@@ -472,6 +472,9 @@ func (g *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
g.notification.GroupApplicationAgreeMemberEnterNotification(ctx, req.GroupID, req.SendMessage, opUserID, userIDs...)
}
}
if err := g.setMemberJoinSeq(ctx, req.GroupID, req.InvitedUserIDs); err != nil {
return nil, err
}
return &pbgroup.InviteUserToGroupResp{}, nil
}
@@ -602,10 +605,6 @@ func (g *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
}
}
}
num, err := g.db.FindGroupMemberNum(ctx, req.GroupID)
if err != nil {
return nil, err
}
ownerUserIDs, err := g.db.GetGroupRoleLevelMemberIDs(ctx, req.GroupID, constant.GroupOwner)
if err != nil {
return nil, err
@@ -617,6 +616,10 @@ func (g *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
if err := g.db.DeleteGroupMember(ctx, group.GroupID, req.KickedUserIDs); err != nil {
return nil, err
}
num, err := g.db.FindGroupMemberNum(ctx, req.GroupID)
if err != nil {
return nil, err
}
tips := &sdkws.MemberKickedTips{
Group: &sdkws.GroupInfo{
GroupID: group.GroupID,
@@ -626,7 +629,7 @@ func (g *groupServer) KickGroupMember(ctx context.Context, req *pbgroup.KickGrou
FaceURL: group.FaceURL,
OwnerUserID: ownerUserID,
CreateTime: group.CreateTime.UnixMilli(),
MemberCount: num - uint32(len(req.KickedUserIDs)),
MemberCount: num,
Ex: group.Ex,
Status: group.Status,
CreatorUserID: group.CreatorUserID,
@@ -905,6 +908,9 @@ func (g *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
return nil, err
}
}
if err := g.setMemberJoinSeq(ctx, req.GroupID, []string{req.FromUserID}); err != nil {
return nil, err
}
}
case constant.GroupResponseRefuse:
g.notification.GroupApplicationRejectedNotification(ctx, req)
@@ -967,6 +973,9 @@ func (g *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
if err = g.notification.MemberEnterNotification(ctx, req.GroupID, req.InviterUserID); err != nil {
return nil, err
}
if err := g.setMemberJoinSeq(ctx, req.GroupID, []string{req.InviterUserID}); err != nil {
return nil, err
}
g.webhookAfterJoinGroup(ctx, &g.config.WebhooksConfig.AfterJoinGroup, req)
return &pbgroup.JoinGroupResp{}, nil
@@ -1028,6 +1037,11 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq)
}
func (g *groupServer) setMemberJoinSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, 0)
}
func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) {
var opMember *model.GroupMember
if !authverify.IsAdmin(ctx) {
+2 -2
View File
@@ -34,7 +34,7 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou
userIDs = nil
}
return &pbgroup.GetFullGroupMemberUserIDsResp{
Version: idHash,
Version: uint64(vl.Version),
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
UserIDs: userIDs,
@@ -58,7 +58,7 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF
groupIDs = nil
}
return &pbgroup.GetFullJoinGroupIDsResp{
Version: idHash,
Version: uint64(vl.Version),
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
GroupIDs: groupIDs,
+3 -6
View File
@@ -19,7 +19,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/log"
@@ -74,7 +73,7 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms
if err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs); err != nil {
return nil, err
}
conv, err := m.conversationClient.GetConversationsByConversationID(ctx, req.ConversationID)
conv, err := m.conversationClient.GetConversation(ctx, req.ConversationID, req.UserID)
if err != nil {
return nil, err
}
@@ -116,14 +115,12 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
}
func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []string, userID string, deleteSyncOpt *msg.DeleteSyncOpt) error {
conversations, err := m.conversationClient.GetConversationsByConversationIDs(ctx, conversationIDs)
conversations, err := m.conversationClient.GetConversations(ctx, conversationIDs, userID)
if err != nil {
return err
}
var existConversations []*conversation.Conversation
var existConversationIDs []string
for _, conversation := range conversations {
existConversations = append(existConversations, conversation)
existConversationIDs = append(existConversationIDs, conversation.ConversationID)
}
log.ZDebug(ctx, "ClearConversationsMsg", "existConversationIDs", existConversationIDs)
@@ -152,7 +149,7 @@ func (m *msgServer) clearConversation(ctx context.Context, conversationIDs []str
if err := m.MsgDatabase.SetMinSeqs(ctx, m.getMinSeqs(maxSeqs)); err != nil {
return err
}
for _, conversation := range existConversations {
for _, conversation := range conversations {
tips := &sdkws.ClearConversationTips{UserID: userID, ConversationIDs: []string{conversation.ConversationID}}
m.notificationSender.NotificationWithSessionType(ctx, userID, m.conversationAndGetRecvID(conversation, userID), constant.ClearConversationNotification, conversation.ConversationType, tips)
}
+1 -1
View File
@@ -56,7 +56,7 @@ func (s *friendServer) GetFullFriendUserIDs(ctx context.Context, req *relation.G
userIDs = nil
}
return &relation.GetFullFriendUserIDsResp{
Version: idHash,
Version: uint64(vl.Version),
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
UserIDs: userIDs,
+42 -8
View File
@@ -5,9 +5,12 @@ package main
import (
"flag"
"fmt"
"os"
"github.com/openimsdk/gomake/mageutil"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/utils/datautil"
)
var Default = Build
@@ -25,7 +28,6 @@ var (
customToolsDir = "tools" // tools source code directory, default is "tools"
)
// Build support specifical binary build.
//
// Example: `mage build openim-api openim-rpc-user seq`
@@ -35,8 +37,7 @@ func Build() {
if len(bin) != 0 {
bin = bin[1:]
}
mageutil.Build(bin, nil)
mageutil.WithSpinner("Building binaries...", func() { mageutil.Build(bin, nil, nil) })
}
func BuildWithCustomConfig() {
@@ -53,7 +54,9 @@ func BuildWithCustomConfig() {
ToolsDir: &customToolsDir,
}
mageutil.Build(bin, config)
mageutil.WithSpinner("Building binaries with custom config...", func() {
mageutil.Build(bin, config, nil)
})
}
func Start() {
@@ -70,7 +73,9 @@ func Start() {
bin = bin[1:]
}
mageutil.StartToolsAndServices(bin, nil)
mageutil.WithSpinner("Starting...", func() {
mageutil.StartToolsAndServices(bin, nil)
})
}
func StartWithCustomConfig() {
@@ -93,13 +98,42 @@ func StartWithCustomConfig() {
ConfigDir: &customConfigDir,
}
mageutil.StartToolsAndServices(bin, config)
mageutil.WithSpinner("Starting with custom config...", func() {
mageutil.StartToolsAndServices(bin, config)
})
}
func Stop() {
mageutil.StopAndCheckBinaries()
mageutil.WithSpinner("Stopping...", mageutil.StopAndCheckBinaries)
}
func Check() {
mageutil.CheckAndReportBinariesStatus()
mageutil.WithSpinner("Checking binaries...", mageutil.CheckAndReportBinariesStatus)
}
func Export() {
mappingPaths, err := mageutil.GetDefaultExportMappingPaths([]string{
"cmd",
"internal",
"pkg",
"test",
"tools",
"**/*.go",
"go.mod",
"go.work",
})
if err != nil {
mageutil.PrintRed("GetDefaultExportMappingPaths failed " + err.Error())
os.Exit(1)
}
mageutil.WithSpinner("Exporting...", func() {
mageutil.ExportMageLauncherArchived(mappingPaths, &mageutil.ExportOptions{
ProjectName: datautil.ToPtr(fmt.Sprintf("open-im-server_%s", version.Version)),
BuildOpt: &mageutil.BuildOptions{
Release: datautil.ToPtr(true),
Compress: datautil.ToPtr(true),
},
})
})
}
+1
View File
@@ -70,6 +70,7 @@ const (
BlockedByPeer = 1302 // Blocked by the peer
NotPeersFriend = 1303 // Not the peer's friend
RelationshipAlreadyError = 1304 // Already in a friend relationship
FriendRequestHandled = 1305 // Friend request has already been handled
// Message error codes.
MessageHasReadDisable = 1401
+5 -4
View File
@@ -51,10 +51,11 @@ var (
ErrMessageHasReadDisable = errs.NewCodeError(MessageHasReadDisable, "MessageHasReadDisable")
ErrCanNotAddYourself = errs.NewCodeError(CanNotAddYourselfError, "CanNotAddYourselfError")
ErrBlockedByPeer = errs.NewCodeError(BlockedByPeer, "BlockedByPeer")
ErrNotPeersFriend = errs.NewCodeError(NotPeersFriend, "NotPeersFriend")
ErrRelationshipAlready = errs.NewCodeError(RelationshipAlreadyError, "RelationshipAlreadyError")
ErrCanNotAddYourself = errs.NewCodeError(CanNotAddYourselfError, "CanNotAddYourselfError")
ErrBlockedByPeer = errs.NewCodeError(BlockedByPeer, "BlockedByPeer")
ErrNotPeersFriend = errs.NewCodeError(NotPeersFriend, "NotPeersFriend")
ErrRelationshipAlready = errs.NewCodeError(RelationshipAlreadyError, "RelationshipAlreadyError")
ErrFriendRequestHandled = errs.NewCodeError(FriendRequestHandled, "FriendRequestHandled")
ErrMutedInGroup = errs.NewCodeError(MutedInGroup, "MutedInGroup")
ErrMutedGroup = errs.NewCodeError(MutedGroup, "MutedGroup")
-1
View File
@@ -220,7 +220,6 @@ func (c *tokenCache) DeleteAndSetTemporary(ctx context.Context, userID string, p
if err := c.rdb.HDel(ctx, key, fields...).Err(); err != nil {
return errs.Wrap(err)
}
if c.localCache != nil {
c.removeLocalTokenCache(ctx, key)
}
@@ -61,8 +61,6 @@ type ConversationDatabase interface {
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
// PageConversationIDs paginates through conversation IDs based on the specified pagination settings.
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
// GetConversationsByConversationID retrieves conversations by their IDs.
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error)
// GetConversationIDsNeedDestruct fetches conversations that need to be destructed based on specific criteria.
GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error)
// GetConversationNotReceiveMessageUserIDs gets user IDs for users in a conversation who have not received messages.
@@ -375,10 +373,6 @@ func (c *conversationDatabase) PageConversationIDs(ctx context.Context, paginati
return c.conversationDB.PageConversationIDs(ctx, pagination)
}
func (c *conversationDatabase) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*relationtb.Conversation, error) {
return c.conversationDB.GetConversationsByConversationID(ctx, conversationIDs)
}
func (c *conversationDatabase) GetConversationIDsNeedDestruct(ctx context.Context) ([]*relationtb.Conversation, error) {
return c.conversationDB.GetConversationIDsNeedDestruct(ctx)
}
+8 -10
View File
@@ -16,9 +16,9 @@ package controller
import (
"context"
"fmt"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
@@ -109,15 +109,13 @@ func (f *friendDatabase) CheckIn(ctx context.Context, userID1, userID2 string) (
// Retrieve friend IDs of userID1 from the cache
userID1FriendIDs, err := f.cache.GetFriendIDs(ctx, userID1)
if err != nil {
err = fmt.Errorf("error retrieving friend IDs for user %s: %w", userID1, err)
return
return false, false, err
}
// Retrieve friend IDs of userID2 from the cache
userID2FriendIDs, err := f.cache.GetFriendIDs(ctx, userID2)
if err != nil {
err = fmt.Errorf("error retrieving friend IDs for user %s: %w", userID2, err)
return
return false, false, err
}
// Check if userID2 is in userID1's friend list and vice versa
@@ -214,12 +212,12 @@ func (f *friendDatabase) RefuseFriendRequest(ctx context.Context, friendRequest
// Attempt to retrieve the friend request from the database.
fr, err := f.friendRequest.Take(ctx, friendRequest.FromUserID, friendRequest.ToUserID)
if err != nil {
return fmt.Errorf("failed to retrieve friend request from %s to %s: %w", friendRequest.FromUserID, friendRequest.ToUserID, err)
return err
}
// Check if the friend request has already been handled.
if fr.HandleResult != 0 {
return fmt.Errorf("friend request from %s to %s has already been processed", friendRequest.FromUserID, friendRequest.ToUserID)
return servererrs.ErrFriendRequestHandled.WrapMsg("friend request has already been processed", "from", friendRequest.FromUserID, "to", friendRequest.ToUserID)
}
// Log the action of refusing the friend request for debugging and auditing purposes.
@@ -232,7 +230,7 @@ func (f *friendDatabase) RefuseFriendRequest(ctx context.Context, friendRequest
friendRequest.HandleResult = constant.FriendResponseRefuse
friendRequest.HandleTime = time.Now()
if err := f.friendRequest.Update(ctx, friendRequest); err != nil {
return fmt.Errorf("failed to update friend request from %s to %s as refused: %w", friendRequest.FromUserID, friendRequest.ToUserID, err)
return err
}
return nil
@@ -350,9 +348,9 @@ func (f *friendDatabase) PageFriendRequestToMe(ctx context.Context, userID strin
func (f *friendDatabase) FindFriendsWithError(ctx context.Context, ownerUserID string, friendUserIDs []string) (friends []*model.Friend, err error) {
friends, err = f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
return
return nil, err
}
return
return friends, nil
}
func (f *friendDatabase) FindFriendUserIDs(ctx context.Context, ownerUserID string) (friendUserIDs []string, err error) {
@@ -39,7 +39,6 @@ type Conversation interface {
GetAllConversationIDs(ctx context.Context) ([]string, error)
GetAllConversationIDsNumber(ctx context.Context) (int64, error)
PageConversationIDs(ctx context.Context, pagination pagination.Pagination) (conversationIDs []string, err error)
GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error)
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
@@ -47,6 +47,12 @@ func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) {
},
Options: options.Index(),
},
{
Keys: bson.D{
{Key: "conversation_id", Value: 1},
},
Options: options.Index().SetUnique(true),
},
})
if err != nil {
return nil, errs.Wrap(err)
@@ -232,10 +238,6 @@ func (c *ConversationMgo) PageConversationIDs(ctx context.Context, pagination pa
return mongoutil.FindPageOnly[string](ctx, c.coll, bson.M{}, pagination, options.Find().SetProjection(bson.M{"conversation_id": 1}))
}
func (c *ConversationMgo) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*model.Conversation, error) {
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{"conversation_id": bson.M{"$in": conversationIDs}})
}
func (c *ConversationMgo) GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) {
// "is_msg_destruct = 1 && msg_destruct_time != 0 && (UNIX_TIMESTAMP(NOW()) > (msg_destruct_time + UNIX_TIMESTAMP(latest_msg_destruct_time)) || latest_msg_destruct_time is NULL)"
return mongoutil.Find[*model.Conversation](ctx, c.coll, bson.M{
@@ -57,8 +57,8 @@ func (s *seqConversationMongo) Malloc(ctx context.Context, conversationID string
}
filter := map[string]any{"conversation_id": conversationID}
update := map[string]any{
"$inc": map[string]any{"max_seq": size},
"$set": map[string]any{"min_seq": int64(0)},
"$inc": map[string]any{"max_seq": size},
"$setOnInsert": map[string]any{"min_seq": int64(0)},
}
opt := options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After).SetProjection(map[string]any{"_id": 0, "max_seq": 1})
lastSeq, err := mongoutil.FindOneAndUpdate[int64](ctx, s.coll, filter, update, opt)
+33
View File
@@ -0,0 +1,33 @@
// Copyright © 2024 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
type TLSConfig struct {
EnableTLS bool `yaml:"enableTLS"`
CACrt string `yaml:"caCrt"`
ClientCrt string `yaml:"clientCrt"`
ClientKey string `yaml:"clientKey"`
ClientKeyPwd string `yaml:"clientKeyPwd"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify"`
}
type Config struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
ProducerAck string `yaml:"producerAck"`
CompressType string `yaml:"compressType"`
Addr []string `yaml:"addr"`
TLS TLSConfig `yaml:"tls"`
}
@@ -0,0 +1,68 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/log"
)
type MConsumerGroup struct {
sarama.ConsumerGroup
groupID string
topics []string
}
func NewMConsumerGroup(conf *Config, groupID string, topics []string, autoCommitEnable bool) (*MConsumerGroup, error) {
config, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, autoCommitEnable)
if err != nil {
return nil, err
}
group, err := NewConsumerGroup(config, conf.Addr, groupID)
if err != nil {
return nil, err
}
return &MConsumerGroup{
ConsumerGroup: group,
groupID: groupID,
topics: topics,
}, nil
}
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
return GetContextWithMQHeader(cMsg.Headers)
}
func (mc *MConsumerGroup) RegisterHandleAndConsumer(ctx context.Context, handler sarama.ConsumerGroupHandler) {
for {
err := mc.ConsumerGroup.Consume(ctx, mc.topics, handler)
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
if errors.Is(err, context.Canceled) {
return
}
if err != nil {
log.ZWarn(ctx, "consume err", err, "topic", mc.topics, "groupID", mc.groupID)
}
}
}
func (mc *MConsumerGroup) Close() error {
return mc.ConsumerGroup.Close()
}
+82
View File
@@ -0,0 +1,82 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
"google.golang.org/protobuf/proto"
)
// Producer represents a Kafka producer.
type Producer struct {
addr []string
topic string
config *sarama.Config
producer sarama.SyncProducer
}
func NewKafkaProducer(config *sarama.Config, addr []string, topic string) (*Producer, error) {
producer, err := NewProducer(config, addr)
if err != nil {
return nil, err
}
return &Producer{
addr: addr,
topic: topic,
config: config,
producer: producer,
}, nil
}
// SendMessage sends a message to the Kafka topic configured in the Producer.
func (p *Producer) SendMessage(ctx context.Context, key string, msg proto.Message) (int32, int64, error) {
// Marshal the protobuf message
bMsg, err := proto.Marshal(msg)
if err != nil {
return 0, 0, errs.WrapMsg(err, "kafka proto Marshal err")
}
if len(bMsg) == 0 {
return 0, 0, errs.WrapMsg(errEmptyMsg, "kafka proto Marshal err")
}
// Prepare Kafka message
kMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.ByteEncoder(bMsg),
}
// Validate message key and value
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
return 0, 0, errs.Wrap(errEmptyMsg)
}
// Attach context metadata as headers
header, err := GetMQHeaderWithContext(ctx)
if err != nil {
return 0, 0, err
}
kMsg.Headers = header
// Send the message
partition, offset, err := p.producer.SendMessage(kMsg)
if err != nil {
return 0, 0, errs.WrapMsg(err, "p.producer.SendMessage error")
}
return partition, offset, nil
}
+85
View File
@@ -0,0 +1,85 @@
package kafka
import (
"bytes"
"strings"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
)
func BuildConsumerGroupConfig(conf *Config, initial int64, autoCommitEnable bool) (*sarama.Config, error) {
kfk := sarama.NewConfig()
kfk.Version = sarama.V2_0_0_0
kfk.Consumer.Offsets.Initial = initial
kfk.Consumer.Offsets.AutoCommit.Enable = autoCommitEnable
kfk.Consumer.Return.Errors = false
if conf.Username != "" || conf.Password != "" {
kfk.Net.SASL.Enable = true
kfk.Net.SASL.User = conf.Username
kfk.Net.SASL.Password = conf.Password
}
if conf.TLS.EnableTLS {
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
if err != nil {
return nil, err
}
kfk.Net.TLS.Config = tls
kfk.Net.TLS.Enable = true
}
return kfk, nil
}
func NewConsumerGroup(conf *sarama.Config, addr []string, groupID string) (sarama.ConsumerGroup, error) {
cg, err := sarama.NewConsumerGroup(addr, groupID, conf)
if err != nil {
return nil, errs.WrapMsg(err, "NewConsumerGroup failed", "addr", addr, "groupID", groupID, "conf", *conf)
}
return cg, nil
}
func BuildProducerConfig(conf Config) (*sarama.Config, error) {
kfk := sarama.NewConfig()
kfk.Producer.Return.Successes = true
kfk.Producer.Return.Errors = true
kfk.Producer.Partitioner = sarama.NewHashPartitioner
if conf.Username != "" || conf.Password != "" {
kfk.Net.SASL.Enable = true
kfk.Net.SASL.User = conf.Username
kfk.Net.SASL.Password = conf.Password
}
switch strings.ToLower(conf.ProducerAck) {
case "no_response":
kfk.Producer.RequiredAcks = sarama.NoResponse
case "wait_for_local":
kfk.Producer.RequiredAcks = sarama.WaitForLocal
case "wait_for_all":
kfk.Producer.RequiredAcks = sarama.WaitForAll
default:
kfk.Producer.RequiredAcks = sarama.WaitForAll
}
if conf.CompressType == "" {
kfk.Producer.Compression = sarama.CompressionNone
} else {
if err := kfk.Producer.Compression.UnmarshalText(bytes.ToLower([]byte(conf.CompressType))); err != nil {
return nil, errs.WrapMsg(err, "UnmarshalText failed", "compressType", conf.CompressType)
}
}
if conf.TLS.EnableTLS {
tls, err := newTLSConfig(conf.TLS.ClientCrt, conf.TLS.ClientKey, conf.TLS.CACrt, []byte(conf.TLS.ClientKeyPwd), conf.TLS.InsecureSkipVerify)
if err != nil {
return nil, err
}
kfk.Net.TLS.Config = tls
kfk.Net.TLS.Enable = true
}
return kfk, nil
}
func NewProducer(conf *sarama.Config, addr []string) (sarama.SyncProducer, error) {
producer, err := sarama.NewSyncProducer(addr, conf)
if err != nil {
return nil, errs.WrapMsg(err, "NewSyncProducer failed", "addr", addr, "conf", *conf)
}
return producer, nil
}
+83
View File
@@ -0,0 +1,83 @@
// Copyright © 2024 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"os"
"github.com/openimsdk/tools/errs"
)
// decryptPEM decrypts a PEM block using a password.
func decryptPEM(data []byte, passphrase []byte) ([]byte, error) {
if len(passphrase) == 0 {
return data, nil
}
b, _ := pem.Decode(data)
d, err := x509.DecryptPEMBlock(b, passphrase)
if err != nil {
return nil, errs.WrapMsg(err, "DecryptPEMBlock failed")
}
return pem.EncodeToMemory(&pem.Block{
Type: b.Type,
Bytes: d,
}), nil
}
func readEncryptablePEMBlock(path string, pwd []byte) ([]byte, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile failed", "path", path)
}
return decryptPEM(data, pwd)
}
// newTLSConfig setup the TLS config from general config file.
func newTLSConfig(clientCertFile, clientKeyFile, caCertFile string, keyPwd []byte, insecureSkipVerify bool) (*tls.Config, error) {
var tlsConfig tls.Config
if clientCertFile != "" && clientKeyFile != "" {
certPEMBlock, err := os.ReadFile(clientCertFile)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile failed", "clientCertFile", clientCertFile)
}
keyPEMBlock, err := readEncryptablePEMBlock(clientKeyFile, keyPwd)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock)
if err != nil {
return nil, errs.WrapMsg(err, "X509KeyPair failed")
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if caCertFile != "" {
caCert, err := os.ReadFile(caCertFile)
if err != nil {
return nil, errs.WrapMsg(err, "ReadFile failed", "caCertFile", caCertFile)
}
caCertPool := x509.NewCertPool()
if ok := caCertPool.AppendCertsFromPEM(caCert); !ok {
return nil, errs.New("AppendCertsFromPEM failed")
}
tlsConfig.RootCAs = caCertPool
}
tlsConfig.InsecureSkipVerify = insecureSkipVerify
return &tlsConfig, nil
}
+34
View File
@@ -0,0 +1,34 @@
package kafka
import (
"context"
"errors"
"github.com/IBM/sarama"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/mcontext"
)
var errEmptyMsg = errors.New("kafka binary msg is empty")
// GetMQHeaderWithContext extracts message queue headers from the context.
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, opUserID, platform, connID, err := mcontext.GetCtxInfos(ctx)
if err != nil {
return nil, err
}
return []sarama.RecordHeader{
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
{Key: []byte(constant.OpUserPlatform), Value: []byte(platform)},
{Key: []byte(constant.ConnID), Value: []byte(connID)},
}, nil
}
// GetContextWithMQHeader creates a context from message queue headers.
func GetContextWithMQHeader(header []*sarama.RecordHeader) context.Context {
var values []string
for _, recordHeader := range header {
values = append(values, string(recordHeader.Value))
}
return mcontext.WithMustInfoCtx(values) // Attach extracted values to context
}
+79
View File
@@ -0,0 +1,79 @@
// Copyright © 2024 OpenIM open source community. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kafka
import (
"context"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/tools/errs"
)
func CheckTopics(ctx context.Context, conf *Config, topics []string) error {
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
if err != nil {
return err
}
cli, err := sarama.NewClient(conf.Addr, kfk)
if err != nil {
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
}
defer cli.Close()
existingTopics, err := cli.Topics()
if err != nil {
return errs.WrapMsg(err, "Failed to list topics")
}
existingTopicsMap := make(map[string]bool)
for _, t := range existingTopics {
existingTopicsMap[t] = true
}
for _, topic := range topics {
if !existingTopicsMap[topic] {
return errs.New("topic not exist", "topic", topic).Wrap()
}
}
return nil
}
func CheckHealth(ctx context.Context, conf *Config) error {
kfk, err := BuildConsumerGroupConfig(conf, sarama.OffsetNewest, false)
if err != nil {
return err
}
cli, err := sarama.NewClient(conf.Addr, kfk)
if err != nil {
return errs.WrapMsg(err, "NewClient failed", "config: ", fmt.Sprintf("%+v", conf))
}
defer cli.Close()
// Get broker list
brokers := cli.Brokers()
if len(brokers) == 0 {
return errs.New("no brokers found").Wrap()
}
// Check if all brokers are reachable
for _, broker := range brokers {
if err := broker.Open(kfk); err != nil {
return errs.WrapMsg(err, "failed to open broker", "broker", broker.Addr())
}
}
return nil
}
+16 -11
View File
@@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] {
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
createSimpleLRU := func() lru.LRU[string, V] {
if opt.expirationEvict {
return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} else {
return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
}
}
if opt.localSlotNum == 1 {
c.local = createSimpleLRU()
} else {
c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU)
c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU)
}
if opt.linkSlotNum > 0 {
c.link = link.New(opt.linkSlotNum)
@@ -71,14 +71,19 @@ type cache[V any] struct {
}
func (c *cache[V]) onEvict(key string, value V) {
_ = value
if c.link != nil {
lks := c.link.Del(key)
for k := range lks {
if key != k { // prevent deadlock
c.local.Del(k)
}
// Do not delete other keys while the underlying LRU still holds its lock;
// defer linked deletions to avoid re-entering the same slot and deadlocking.
if lks := c.link.Del(key); len(lks) > 0 {
go c.delLinked(key, lks)
}
}
}
func (c *cache[V]) delLinked(src string, keys map[string]struct{}) {
for k := range keys {
if src != k {
c.local.Del(k)
}
}
}
@@ -105,7 +110,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C
func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) {
if c.local != nil {
return c.local.Get(key, func() (V, error) {
if len(link) > 0 {
if len(link) > 0 && c.link != nil {
c.link.Link(key, link...)
}
return fetch(ctx)
+67
View File
@@ -22,6 +22,8 @@ import (
"sync/atomic"
"testing"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
)
func TestName(t *testing.T) {
@@ -91,3 +93,68 @@ func TestName(t *testing.T) {
t.Log("del", del.Load())
// 137.35s
}
// Test deadlock scenario when eviction callback deletes a linked key that hashes to the same slot.
func TestCacheEvictDeadlock(t *testing.T) {
ctx := context.Background()
c := New[string](WithLocalSlotNum(1), WithLocalSlotSize(1), WithLazy())
if _, err := c.GetLink(ctx, "k1", func(ctx context.Context) (string, error) {
return "v1", nil
}, "k2"); err != nil {
t.Fatalf("seed cache failed: %v", err)
}
done := make(chan struct{})
go func() {
defer close(done)
_, _ = c.GetLink(ctx, "k2", func(ctx context.Context) (string, error) {
return "v2", nil
}, "k1")
}()
select {
case <-done:
// expected to finish quickly; current implementation deadlocks here.
case <-time.After(time.Second):
t.Fatal("GetLink deadlocked during eviction of linked key")
}
}
func TestExpirationLRUGetBatch(t *testing.T) {
l := lru.NewExpirationLRU[string, string](2, time.Minute, time.Second*5, EmptyTarget{}, nil)
keys := []string{"a", "b"}
values, err := l.GetBatch(keys, func(keys []string) (map[string]string, error) {
res := make(map[string]string)
for _, k := range keys {
res[k] = k + "_v"
}
return res, nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(values) != len(keys) {
t.Fatalf("expected %d values, got %d", len(keys), len(values))
}
for _, k := range keys {
if v, ok := values[k]; !ok || v != k+"_v" {
t.Fatalf("unexpected value for %s: %q, ok=%v", k, v, ok)
}
}
// second batch should hit cache
values, err = l.GetBatch(keys, func(keys []string) (map[string]string, error) {
t.Fatalf("should not fetch on cache hit")
return nil, nil
})
if err != nil {
t.Fatalf("unexpected error on cache hit: %v", err)
}
for _, k := range keys {
if v, ok := values[k]; !ok || v != k+"_v" {
t.Fatalf("unexpected cached value for %s: %q, ok=%v", k, v, ok)
}
}
}
-4
View File
@@ -33,10 +33,6 @@ func InitLocalCache(localCache *config.LocalCache) {
Local config.CacheConfig
Keys []string
}{
{
Local: localCache.Auth,
Keys: []string{cachekey.UidPidToken},
},
{
Local: localCache.User,
Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey},
+47 -2
View File
@@ -52,8 +52,53 @@ type ExpirationLRU[K comparable, V any] struct {
}
func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
//TODO implement me
panic("implement me")
var (
err error
results = make(map[K]V)
misses = make([]K, 0, len(keys))
)
for _, key := range keys {
x.lock.Lock()
v, ok := x.core.Get(key)
x.lock.Unlock()
if ok {
x.target.IncrGetHit()
v.lock.RLock()
results[key] = v.value
if v.err != nil && err == nil {
err = v.err
}
v.lock.RUnlock()
continue
}
misses = append(misses, key)
}
if len(misses) == 0 {
return results, err
}
fetchValues, fetchErr := fetch(misses)
if fetchErr != nil && err == nil {
err = fetchErr
}
for key, val := range fetchValues {
results[key] = val
if fetchErr != nil {
x.target.IncrGetFailed()
continue
}
x.target.IncrGetSuccess()
item := &expirationLruItem[V]{value: val}
x.lock.Lock()
x.core.Add(key, item)
x.lock.Unlock()
}
// any keys not returned from fetch remain absent (no cache write)
return results, err
}
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
+3 -3
View File
@@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct {
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
var (
slotKeys = make(map[uint64][]K)
kVs = make(map[K]V)
vs = make(map[K]V)
)
for _, k := range keys {
@@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
return nil, err
}
for key, value := range batches {
kVs[key] = value
vs[key] = value
}
}
return kVs, nil
return vs, nil
}
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
+1 -12
View File
@@ -2,6 +2,7 @@ package rpcli
import (
"context"
"github.com/openimsdk/protocol/conversation"
"google.golang.org/grpc"
)
@@ -30,18 +31,6 @@ func (x *ConversationClient) SetConversations(ctx context.Context, ownerUserIDs
return ignoreResp(x.ConversationClient.SetConversations(ctx, req))
}
func (x *ConversationClient) GetConversationsByConversationIDs(ctx context.Context, conversationIDs []string) ([]*conversation.Conversation, error) {
if len(conversationIDs) == 0 {
return nil, nil
}
req := &conversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}
return extractField(ctx, x.ConversationClient.GetConversationsByConversationID, req, (*conversation.GetConversationsByConversationIDResp).GetConversations)
}
func (x *ConversationClient) GetConversationsByConversationID(ctx context.Context, conversationID string) (*conversation.Conversation, error) {
return firstValue(x.GetConversationsByConversationIDs(ctx, []string{conversationID}))
}
func (x *ConversationClient) SetConversationMinSeq(ctx context.Context, conversationID string, ownerUserIDs []string, minSeq int64) error {
if len(ownerUserIDs) == 0 {
return nil
+1 -1
View File
@@ -72,7 +72,7 @@ func Main(conf string, del time.Duration) error {
if err != nil {
return err
}
mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName)
if err != nil {
return err
+735
View File
@@ -0,0 +1,735 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/protocol/auth"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/system/program"
)
// 1. Create 100K New Users
// 2. Create 100 100K Groups
// 3. Create 1000 999 Groups
// 4. Send message to 100K Groups every second
// 5. Send message to 999 Groups every minute
var (
// Use default userIDs List for testing, need to be created.
TestTargetUserList = []string{
// "<need-update-it>",
}
// DefaultGroupID = "<need-update-it>" // Use default group ID for testing, need to be created.
)
var (
ApiAddress string
// API method
GetAdminToken = "/auth/get_admin_token"
UserCheck = "/user/account_check"
CreateUser = "/user/user_register"
ImportFriend = "/friend/import_friend"
InviteToGroup = "/group/invite_user_to_group"
GetGroupMemberInfo = "/group/get_group_members_info"
SendMsg = "/msg/send_msg"
CreateGroup = "/group/create_group"
GetUserToken = "/auth/user_token"
)
const (
MaxUser = 100000
Max100KGroup = 100
Max999Group = 1000
MaxInviteUserLimit = 999
CreateUserTicker = 1 * time.Second
CreateGroupTicker = 1 * time.Second
Create100KGroupTicker = 1 * time.Second
Create999GroupTicker = 1 * time.Second
SendMsgTo100KGroupTicker = 1 * time.Second
SendMsgTo999GroupTicker = 1 * time.Minute
)
type BaseResp struct {
ErrCode int `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data json.RawMessage `json:"data"`
}
type StressTest struct {
Conf *conf
AdminUserID string
AdminToken string
DefaultGroupID string
DefaultUserID string
UserCounter int
CreateUserCounter int
Create100kGroupCounter int
Create999GroupCounter int
MsgCounter int
CreatedUsers []string
CreatedGroups []string
Mutex sync.Mutex
Ctx context.Context
Cancel context.CancelFunc
HttpClient *http.Client
Wg sync.WaitGroup
Once sync.Once
}
type conf struct {
Share config.Share
Api config.API
}
func initConfig(configDir string) (*config.Share, *config.API, error) {
var (
share = &config.Share{}
apiConfig = &config.API{}
)
err := config.Load(configDir, config.ShareFileName, config.EnvPrefixMap[config.ShareFileName], share)
if err != nil {
return nil, nil, err
}
err = config.Load(configDir, config.OpenIMAPICfgFileName, config.EnvPrefixMap[config.OpenIMAPICfgFileName], apiConfig)
if err != nil {
return nil, nil, err
}
return share, apiConfig, nil
}
// Post Request
func (st *StressTest) PostRequest(ctx context.Context, url string, reqbody any) ([]byte, error) {
// Marshal body
jsonBody, err := json.Marshal(reqbody)
if err != nil {
log.ZError(ctx, "Failed to marshal request body", err, "url", url, "reqbody", reqbody)
return nil, err
}
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("operationID", st.AdminUserID)
if st.AdminToken != "" {
req.Header.Set("token", st.AdminToken)
}
// log.ZInfo(ctx, "Header info is ", "Content-Type", "application/json", "operationID", st.AdminUserID, "token", st.AdminToken)
resp, err := st.HttpClient.Do(req)
if err != nil {
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody)
return nil, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
log.ZError(ctx, "Failed to read response body", err, "url", url)
return nil, err
}
var baseResp BaseResp
if err := json.Unmarshal(respBody, &baseResp); err != nil {
log.ZError(ctx, "Failed to unmarshal response body", err, "url", url, "respBody", string(respBody))
return nil, err
}
if baseResp.ErrCode != 0 {
err = fmt.Errorf(baseResp.ErrMsg)
log.ZError(ctx, "Failed to send request", err, "url", url, "reqbody", reqbody, "resp", baseResp)
return nil, err
}
return baseResp.Data, nil
}
func (st *StressTest) GetAdminToken(ctx context.Context) (string, error) {
req := auth.GetAdminTokenReq{
Secret: st.Conf.Share.Secret,
UserID: st.AdminUserID,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetAdminToken, &req)
if err != nil {
return "", err
}
data := &auth.GetAdminTokenResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return "", err
}
return data.Token, nil
}
func (st *StressTest) CheckUser(ctx context.Context, userIDs []string) ([]string, error) {
req := pbuser.AccountCheckReq{
CheckUserIDs: userIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+UserCheck, &req)
if err != nil {
return nil, err
}
data := &pbuser.AccountCheckResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return nil, err
}
unRegisteredUserIDs := make([]string, 0)
for _, res := range data.Results {
if res.AccountStatus == constant.UnRegistered {
unRegisteredUserIDs = append(unRegisteredUserIDs, res.UserID)
}
}
return unRegisteredUserIDs, nil
}
func (st *StressTest) CreateUser(ctx context.Context, userID string) (string, error) {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
req := pbuser.UserRegisterReq{
Users: []*sdkws.UserInfo{user},
}
_, err := st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return "", err
}
st.UserCounter++
return userID, nil
}
func (st *StressTest) CreateUserBatch(ctx context.Context, userIDs []string) error {
// The method can import a large number of users at once.
var userList []*sdkws.UserInfo
defer st.Once.Do(
func() {
st.DefaultUserID = userIDs[0]
fmt.Println("Default Send User Created ID:", st.DefaultUserID)
})
needUserIDs, err := st.CheckUser(ctx, userIDs)
if err != nil {
return err
}
for _, userID := range needUserIDs {
user := &sdkws.UserInfo{
UserID: userID,
Nickname: userID,
}
userList = append(userList, user)
}
req := pbuser.UserRegisterReq{
Users: userList,
}
_, err = st.PostRequest(ctx, ApiAddress+CreateUser, &req)
if err != nil {
return err
}
st.UserCounter += len(userList)
return nil
}
func (st *StressTest) GetGroupMembersInfo(ctx context.Context, groupID string, userIDs []string) ([]string, error) {
needInviteUserIDs := make([]string, 0)
const maxBatchSize = 500
if len(userIDs) > maxBatchSize {
for i := 0; i < len(userIDs); i += maxBatchSize {
end := min(i+maxBatchSize, len(userIDs))
batchUserIDs := userIDs[i:end]
// log.ZInfo(ctx, "Processing group members batch", "groupID", groupID, "batch", i/maxBatchSize+1,
// "batchUserCount", len(batchUserIDs))
// Process a single batch
batchReq := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: batchUserIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &batchReq)
if err != nil {
log.ZError(ctx, "Batch query failed", err, "batch", i/maxBatchSize+1)
continue
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
log.ZError(ctx, "Failed to parse batch response", err, "batch", i/maxBatchSize+1)
continue
}
// Process the batch results
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range batchUserIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
}
return needInviteUserIDs, nil
}
req := group.GetGroupMembersInfoReq{
GroupID: groupID,
UserIDs: userIDs,
}
resp, err := st.PostRequest(ctx, ApiAddress+GetGroupMemberInfo, &req)
if err != nil {
return nil, err
}
data := &group.GetGroupMembersInfoResp{}
if err := json.Unmarshal(resp, &data); err != nil {
return nil, err
}
existingMembers := make(map[string]bool)
for _, member := range data.Members {
existingMembers[member.UserID] = true
}
for _, userID := range userIDs {
if !existingMembers[userID] {
needInviteUserIDs = append(needInviteUserIDs, userID)
}
}
return needInviteUserIDs, nil
}
func (st *StressTest) InviteToGroup(ctx context.Context, groupID string, userIDs []string) error {
req := group.InviteUserToGroupReq{
GroupID: groupID,
InvitedUserIDs: userIDs,
}
_, err := st.PostRequest(ctx, ApiAddress+InviteToGroup, &req)
if err != nil {
return err
}
return nil
}
func (st *StressTest) SendMsg(ctx context.Context, userID string, groupID string) error {
contentObj := map[string]any{
// "content": fmt.Sprintf("index %d. The current time is %s", st.MsgCounter, time.Now().Format("2006-01-02 15:04:05.000")),
"content": fmt.Sprintf("The current time is %s", time.Now().Format("2006-01-02 15:04:05.000")),
}
req := &apistruct.SendMsgReq{
SendMsg: apistruct.SendMsg{
SendID: userID,
SenderNickname: userID,
GroupID: groupID,
ContentType: constant.Text,
SessionType: constant.ReadGroupChatType,
Content: contentObj,
},
}
_, err := st.PostRequest(ctx, ApiAddress+SendMsg, &req)
if err != nil {
log.ZError(ctx, "Failed to send message", err, "userID", userID, "req", &req)
return err
}
st.MsgCounter++
return nil
}
// Max userIDs number is 1000
func (st *StressTest) CreateGroup(ctx context.Context, groupID string, userID string, userIDsList []string) (string, error) {
groupInfo := &sdkws.GroupInfo{
GroupID: groupID,
GroupName: groupID,
GroupType: constant.WorkingGroup,
}
req := group.CreateGroupReq{
OwnerUserID: userID,
MemberUserIDs: userIDsList,
GroupInfo: groupInfo,
}
resp := group.CreateGroupResp{}
response, err := st.PostRequest(ctx, ApiAddress+CreateGroup, &req)
if err != nil {
return "", err
}
if err := json.Unmarshal(response, &resp); err != nil {
return "", err
}
// st.GroupCounter++
return resp.GroupInfo.GroupID, nil
}
func main() {
var configPath string
// defaultConfigDir := filepath.Join("..", "..", "..", "..", "..", "config")
// flag.StringVar(&configPath, "c", defaultConfigDir, "config path")
flag.StringVar(&configPath, "c", "", "config path")
flag.Parse()
if configPath == "" {
_, _ = fmt.Fprintln(os.Stderr, "config path is empty")
os.Exit(1)
return
}
fmt.Printf(" Config Path: %s\n", configPath)
share, apiConfig, err := initConfig(configPath)
if err != nil {
program.ExitWithError(err)
return
}
ApiAddress = fmt.Sprintf("http://%s:%s", "127.0.0.1", fmt.Sprint(apiConfig.Api.Ports[0]))
ctx, cancel := context.WithCancel(context.Background())
// ch := make(chan struct{})
st := &StressTest{
Conf: &conf{
Share: *share,
Api: *apiConfig,
},
AdminUserID: share.IMAdminUser.UserIDs[0],
Ctx: ctx,
Cancel: cancel,
HttpClient: &http.Client{
Timeout: 50 * time.Second,
},
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Println("\nReceived stop signal, stopping...")
go func() {
// time.Sleep(5 * time.Second)
fmt.Println("Force exit")
os.Exit(0)
}()
st.Cancel()
}()
token, err := st.GetAdminToken(st.Ctx)
if err != nil {
log.ZError(ctx, "Get Admin Token failed.", err, "AdminUserID", st.AdminUserID)
}
st.AdminToken = token
fmt.Println("Admin Token:", st.AdminToken)
fmt.Println("ApiAddress:", ApiAddress)
for i := 0; i < MaxUser; i++ {
userID := fmt.Sprintf("v2_StressTest_User_%d", i)
st.CreatedUsers = append(st.CreatedUsers, userID)
st.CreateUserCounter++
}
// err = st.CreateUserBatch(st.Ctx, st.CreatedUsers)
// if err != nil {
// log.ZError(ctx, "Create user failed.", err)
// }
const batchSize = 1000
totalUsers := len(st.CreatedUsers)
successCount := 0
if st.DefaultUserID == "" && len(st.CreatedUsers) > 0 {
st.DefaultUserID = st.CreatedUsers[0]
}
for i := 0; i < totalUsers; i += batchSize {
end := min(i+batchSize, totalUsers)
userBatch := st.CreatedUsers[i:end]
log.ZInfo(st.Ctx, "Creating user batch", "batch", i/batchSize+1, "count", len(userBatch))
err = st.CreateUserBatch(st.Ctx, userBatch)
if err != nil {
log.ZError(st.Ctx, "Batch user creation failed", err, "batch", i/batchSize+1)
} else {
successCount += len(userBatch)
log.ZInfo(st.Ctx, "Batch user creation succeeded", "batch", i/batchSize+1,
"progress", fmt.Sprintf("%d/%d", successCount, totalUsers))
}
}
// Execute create 100k group
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
create100kGroupTicker := time.NewTicker(Create100KGroupTicker)
defer create100kGroupTicker.Stop()
for i := 0; i < Max100KGroup; i++ {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 100K Group")
return
case <-create100kGroupTicker.C:
// Create 100K groups
st.Wg.Add(1)
go func(idx int) {
defer st.Wg.Done()
defer func() {
st.Create100kGroupCounter++
}()
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
if err != nil {
log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
continue
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
// create 999 groups
st.Wg.Add(1)
go func() {
defer st.Wg.Done()
create999GroupTicker := time.NewTicker(Create999GroupTicker)
defer create999GroupTicker.Stop()
for i := 0; i < Max999Group; i++ {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Create 999 Group")
return
case <-create999GroupTicker.C:
// Create 999 groups
st.Wg.Add(1)
go func(idx int) {
defer st.Wg.Done()
defer func() {
st.Create999GroupCounter++
}()
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", idx)
if _, err = st.CreateGroup(st.Ctx, groupID, st.DefaultUserID, TestTargetUserList); err != nil {
log.ZError(st.Ctx, "Create group failed.", err)
// continue
}
for i := 0; i < MaxUser/MaxInviteUserLimit; i++ {
InviteUserIDs := make([]string, 0)
// ensure TargetUserList is in group
InviteUserIDs = append(InviteUserIDs, TestTargetUserList...)
startIdx := max(i*MaxInviteUserLimit, 1)
endIdx := min((i+1)*MaxInviteUserLimit, MaxUser)
for j := startIdx; j < endIdx; j++ {
userCreatedID := fmt.Sprintf("v2_StressTest_User_%d", j)
InviteUserIDs = append(InviteUserIDs, userCreatedID)
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
InviteUserIDs, err := st.GetGroupMembersInfo(ctx, groupID, InviteUserIDs)
if err != nil {
log.ZError(st.Ctx, "GetGroupMembersInfo failed.", err, "groupID", groupID)
continue
}
if len(InviteUserIDs) == 0 {
log.ZWarn(st.Ctx, "InviteUserIDs is empty", nil, "groupID", groupID)
continue
}
// Invite To Group
if err = st.InviteToGroup(st.Ctx, groupID, InviteUserIDs); err != nil {
log.ZError(st.Ctx, "Invite To Group failed.", err, "UserID", InviteUserIDs)
continue
// os.Exit(1)
// return
}
}
}(i)
}
}
}()
// Send message to 100K groups
st.Wg.Wait()
fmt.Println("All groups created successfully, starting to send messages...")
log.ZInfo(ctx, "All groups created successfully, starting to send messages...")
var groups100K []string
var groups999 []string
for i := 0; i < Max100KGroup; i++ {
groupID := fmt.Sprintf("v2_StressTest_Group_100K_%d", i)
groups100K = append(groups100K, groupID)
}
for i := 0; i < Max999Group; i++ {
groupID := fmt.Sprintf("v2_StressTest_Group_1K_%d", i)
groups999 = append(groups999, groupID)
}
send100kGroupLimiter := make(chan struct{}, 20)
send999GroupLimiter := make(chan struct{}, 100)
// execute Send message to 100K groups
go func() {
ticker := time.NewTicker(SendMsgTo100KGroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 100K Group")
return
case <-ticker.C:
// Send message to 100K groups
for _, groupID := range groups100K {
send100kGroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send100kGroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 100K group failed.", err)
}
}(groupID)
}
// log.ZInfo(st.Ctx, "Send message to 100K groups successfully.")
}
}
}()
// execute Send message to 999 groups
go func() {
ticker := time.NewTicker(SendMsgTo999GroupTicker)
defer ticker.Stop()
for {
select {
case <-st.Ctx.Done():
log.ZInfo(st.Ctx, "Stop Send Message to 999 Group")
return
case <-ticker.C:
// Send message to 999 groups
for _, groupID := range groups999 {
send999GroupLimiter <- struct{}{}
go func(groupID string) {
defer func() { <-send999GroupLimiter }()
if err := st.SendMsg(st.Ctx, st.DefaultUserID, groupID); err != nil {
log.ZError(st.Ctx, "Send message to 999 group failed.", err)
}
}(groupID)
}
// log.ZInfo(st.Ctx, "Send message to 999 groups successfully.")
}
}
}()
<-st.Ctx.Done()
fmt.Println("Received signal to exit, shutting down...")
}