Compare commits

...

15 Commits

Author SHA1 Message Date
Monet Lee 840ddc15fe chore: revert tool pkg version. (#2476)
* feat: update go mod pkg to latest.

* revert tool pkg version.
2024-08-02 12:13:13 +00:00
Monet Lee c45967079e feat: update go mod pkg to latest. (#2475) 2024-08-02 18:30:17 +08:00
chao 84049a1f55 fix: the local cache obtained can be modified (#2473)
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

* fix bug

* fix: ImportFriends

* add online cache

* add some logs

* add some logs

* fix: onlineUserIDs

* add logs

* test

* test

* test

* test

* add log

* feat: solve the problem that modifying the cached data affects other

* feat: solve the problem that modifying the cached data affects other

* feat: search messages to filter out notifications

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
2024-08-02 08:58:04 +00:00
Monet Lee 51e170ade1 feat: implement scheduled destruct msgs feature in cron task. (#2466)
* update protocol in go mod.

* add debug log in writePongMsg.

* update log level.

* add Warn log in writePongMsg.

* add debug log.

* feat: update webhookBeforeMemberJoinGroup to batch method.

* feat: update version field implement.

* update webhook implement contents.

* update method field and contents.

* update callbackCommand field.

* fix: add correct fields.

* update struct tags.

* refactor: rename friend module to relation.

* feat: implement scheduled destruct msgs feature in cron task.

* update log contents.

* update func name and comments.

* update waitgroup to errgroup.

* update errgroup wait.

* remove unnecessary contents.

* update clearMsg logic.
2024-08-01 08:12:41 +00:00
chao 375f63a447 fix: online status renewal (#2468)
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

* fix bug

* fix: ImportFriends

* add online cache

* add some logs

* add some logs

* fix: onlineUserIDs

* add logs

* test

* test

* test

* test

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
2024-08-01 07:11:17 +00:00
Monet Lee 5dac91569d refactor: rename friend module to relation. (#2463)
* update protocol in go mod.

* add debug log in writePongMsg.

* update log level.

* add Warn log in writePongMsg.

* add debug log.

* feat: update webhookBeforeMemberJoinGroup to batch method.

* feat: update version field implement.

* update webhook implement contents.

* update method field and contents.

* update callbackCommand field.

* fix: add correct fields.

* update struct tags.

* refactor: rename friend module to relation.
2024-07-31 10:12:19 +00:00
chao 9cfbc3aaff fix: return value (#2462)
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

* fix bug

* fix: ImportFriends

* add online cache

* add some logs

* add some logs

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
2024-07-31 06:21:19 +00:00
chao d2b02dbabe feat: add some logs (#2461)
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* new mongo

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* friend incr sync

* mage

* optimization version log

* optimization version log

* sync

* sync

* sync

* group sync

* sync option

* sync option

* refactor: replace `friend` package with `realtion`.

* refactor: update lastest commit to relation.

* sync option

* sync option

* sync option

* sync

* sync

* go.mod

* seq

* update: go mod

* refactor: change incremental to full

* feat: get full friend user ids

* feat: api and config

* seq

* group version

* merge

* seq

* seq

* seq

* fix: sort by id avoid unstable sort friends.

* group

* group

* group

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* fix: sort by id avoid unstable sort friends.

* user version

* seq

* seq

* seq user

* user online

* implement minio expire delete.

* user online

* config

* fix

* fix

* implement minio expire delete logic.

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* online cache

* feat: implement scheduled delete outdated object in minio.

* update gomake version

* update gomake version

* implement FindExpires pagination.

* remove unnesseary incr.

* fix uncorrect args call.

* online push

* online push

* online push

* resolving conflicts

* resolving conflicts

* test

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* api prommetrics

* rpc prommetrics

* rpc prommetrics

* online status

* online status

* online status

* online status

* sub

* conversation version incremental

* merge seq

* merge online

* merge online

* merge online

* merge seq

* GetOwnerConversation

* fix: change incremental syncer router name.

* rockscache batch get

* rockscache seq batch get

* fix: GetMsgDocModelByIndex bug

* update go.mod

* update go.mod

* merge

* feat: prometheus

* feat: prometheus

* group member sort

* sub

* sub

* fix: seq conversion bug

* fix: redis pipe exec

* sort version

* sort version

* sort version

* remove old version online subscription

* remove old version online subscription

* version log index

* version log index

* batch push

* batch push

* seq void filling

* fix: batchGetMaxSeq

* fix: batchGetMaxSeq

* cache db error log

* 111

* fix bug

* fix: ImportFriends

* add online cache

* add some logs

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
Co-authored-by: Monet Lee <monet_lee@163.com>
Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com>
Co-authored-by: icey-yu <1186114839@qq.com>
2024-07-31 02:50:55 +00:00
Monet Lee fb689618d8 feat: update webhookBeforeMemberJoinGroup to batch method. (#2459)
* update protocol in go mod.

* add debug log in writePongMsg.

* update log level.

* add Warn log in writePongMsg.

* add debug log.

* feat: update webhookBeforeMemberJoinGroup to batch method.

* feat: update version field implement.

* update webhook implement contents.

* update method field and contents.

* update callbackCommand field.

* fix: add correct fields.

* update struct tags.
2024-07-30 10:09:52 +00:00
chao ed0ab58a9e fix ImportFriends (#2458)
* fix: GroupApplicationAcceptedNotification

* fix: GroupApplicationAcceptedNotification

* fix: NotificationUserInfoUpdate

* cicd: robot automated Change

* fix: component

* fix: getConversationInfo

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* feat: cron task

* fix: minio config url recognition error

* update gomake version

* update gomake version

* fix: seq conversion bug

* fix: redis pipe exec

* fix: ImportFriends

---------

Co-authored-by: withchao <withchao@users.noreply.github.com>
2024-07-29 08:53:26 +00:00
icey-yu 231aac2b8a fix: search log can return platform (#2456)
* fix: search log can return platform

* fix: change platform type to string
2024-07-29 08:37:22 +00:00
printlin b0cc4373a5 fix: #2410 BeforeMemberJoinGroup callback member error (#2423) 2024-07-29 02:08:28 +00:00
Monet Lee ef46abd193 chore: add Warn log in writePongMsg. (#2452)
* update protocol in go mod.

* add debug log in writePongMsg.

* update log level.

* add Warn log in writePongMsg.

* add debug log.
2024-07-28 03:18:23 +00:00
printlin 220a01d7f8 fix: fill notification offlinePush by config (#2422)
* fix: fill notification offlinePush by config

* fix: fill notification OfflinePush by config
2024-07-26 10:24:25 +00:00
blooming ff66e97221 fix: rm e2e in ci (#2449) 2024-07-26 18:20:54 +08:00
53 changed files with 702 additions and 361 deletions
-1
View File
@@ -1,4 +1,3 @@
MONGO_IMAGE=mongo:6.0.2
REDIS_IMAGE=redis:7.0.0
ZOOKEEPER_IMAGE=bitnami/zookeeper:3.8
+20 -20
View File
@@ -121,29 +121,29 @@ jobs:
exit 0
fi
- name: Checkout e2e
if: success()
uses: actions/checkout@v4
with:
repository: "openimsdk/test-e2e"
path: e2e-repo
# - name: Checkout e2e
# if: success()
# uses: actions/checkout@v4
# with:
# repository: "openimsdk/test-e2e"
# path: e2e-repo
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: '3.9'
# - name: Set up Python 3.9
# uses: actions/setup-python@v4
# with:
# python-version: '3.9'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y xvfb libxi6 libgconf-2-4
cd ${{ github.workspace }}/e2e-repo
pip install -r requirements.txt
# - name: Install dependencies
# run: |
# sudo apt-get update
# sudo apt-get install -y xvfb libxi6 libgconf-2-4
# cd ${{ github.workspace }}/e2e-repo
# pip install -r requirements.txt
- name: Run tests
run: |
cd ${{ github.workspace }}/e2e-repo
xvfb-run --auto-servernum --server-args='-screen 0 1920x1080x24' pytest -v -s ./script
# - name: Run tests
# run: |
# cd ${{ github.workspace }}/e2e-repo
# xvfb-run --auto-servernum --server-args='-screen 0 1920x1080x24' pytest -v -s ./script
- name: Extract metadata (tags, labels) for Docker
if: success()
+29 -29
View File
@@ -71,17 +71,17 @@ jobs:
run: sudo bash bootstrap.sh
timeout-minutes: 20
- name: Get Internal IP Address
id: get-ip
run: |
IP=$(hostname -I | awk '{print $1}')
echo "The IP Address is: $IP"
echo "::set-output name=ip::$IP"
# - name: Get Internal IP Address
# id: get-ip
# run: |
# IP=$(hostname -I | awk '{print $1}')
# echo "The IP Address is: $IP"
# echo "::set-output name=ip::$IP"
- name: Update .env
run: |
sed -i 's|externalAddress:.*|externalAddress: "http://${{ steps.get-ip.outputs.ip }}:10005"|' config/minio.yml
cat config/minio.yml
# - name: Update .env
# run: |
# sed -i 's|externalAddress:.*|externalAddress: "http://${{ steps.get-ip.outputs.ip }}:10005"|' config/minio.yml
# cat config/minio.yml
- name: Build, Start, Check Services and Print Logs for Linux
run: |
@@ -109,27 +109,27 @@ jobs:
sudo mage start
sudo mage check
- name: Checkout e2e repository
uses: actions/checkout@v4
with:
repository: "openimsdk/test-e2e"
path: e2e-repo
# - name: Checkout e2e repository
# uses: actions/checkout@v4
# with:
# repository: "openimsdk/test-e2e"
# path: e2e-repo
- name: Set up Python 3.9
uses: actions/setup-python@v4
with:
python-version: '3.9'
# - name: Set up Python 3.9
# uses: actions/setup-python@v4
# with:
# python-version: '3.9'
- name: Install dependencies
run: |
sudo apt-get update
sudo apt-get install -y xvfb libxi6 libgconf-2-4
cd ${{ github.workspace }}/e2e-repo
pip install -r requirements.txt
# - name: Install dependencies
# run: |
# sudo apt-get update
# sudo apt-get install -y xvfb libxi6 libgconf-2-4
# cd ${{ github.workspace }}/e2e-repo
# pip install -r requirements.txt
- name: Run tests
run: |
cd ${{ github.workspace }}/e2e-repo
xvfb-run --auto-servernum --server-args='-screen 0 1920x1080x24' pytest -v -s ./script
# - name: Run tests
# run: |
# cd ${{ github.workspace }}/e2e-repo
# xvfb-run --auto-servernum --server-args='-screen 0 1920x1080x24' pytest -v -s ./script
+2 -1
View File
@@ -12,7 +12,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.69-alpha.42
github.com/openimsdk/protocol v0.0.69
github.com/openimsdk/tools v0.0.49-alpha.55
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
@@ -74,6 +74,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chai2010/webp v1.1.1 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
+4 -2
View File
@@ -71,6 +71,8 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.1 h1:jTRmEccAJ4MGrhFOrPMpNGIJ/eybIgwKpcACsrTEapk=
github.com/chai2010/webp v1.1.1/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
@@ -319,8 +321,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.69-alpha.42 h1:Vwuru2NtyTHuqaM+1JGxcoGvP25QWjS92oI0zGJp+lM=
github.com/openimsdk/protocol v0.0.69-alpha.42/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo=
github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
+2 -2
View File
@@ -393,16 +393,16 @@ func (c *Client) writePingMsg() error {
func (c *Client) writePongMsg(appData string) error {
log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
if c.closed.Load() {
log.ZWarn(c.ctx, "is closed in server", nil, "appdata", appData, "closed err", c.closedErr)
return nil
}
log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
c.w.Lock()
defer c.w.Unlock()
log.ZDebug(c.ctx, "write Pong Msg in Server", "appData", appData)
err := c.conn.SetWriteDeadline(writeWait)
if err != nil {
log.ZWarn(c.ctx, "SetWriteDeadline in Server have error", errs.Wrap(err), "writeWait", writeWait, "appData", appData)
return errs.Wrap(err)
}
err = c.conn.WriteMessage(PongMessage, []byte(appData))
+7 -2
View File
@@ -4,13 +4,16 @@ import (
"context"
"crypto/md5"
"encoding/binary"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
pbuser "github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"math/rand"
"os"
"strconv"
"sync/atomic"
"time"
)
@@ -78,8 +81,10 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
}
}
opIdCtx := mcontext.SetOperationID(context.Background(), "r"+strconv.FormatUint(rNum, 10))
var count atomic.Int64
operationIDPrefix := fmt.Sprintf("p_%d_", os.Getpid())
doRequest := func(req *pbuser.SetUserOnlineStatusReq) {
opIdCtx := mcontext.SetOperationID(context.Background(), operationIDPrefix+strconv.FormatInt(count.Add(1), 10))
ctx, cancel := context.WithTimeout(opIdCtx, time.Second*5)
defer cancel()
if _, err := ws.userClient.Client.SetUserOnlineStatus(ctx, req); err != nil {
@@ -102,7 +107,7 @@ func (ws *WsServer) ChangeOnlineStatus(concurrent int) {
case now := <-renewalTicker.C:
deadline := now.Add(-cachekey.OnlineExpire / 3)
users := ws.clients.GetAllUserStatus(deadline, now)
log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "num", len(users))
log.ZDebug(context.Background(), "renewal ticker", "deadline", deadline, "nowtime", now, "num", len(users), "users", users)
pushUserState(users...)
case state := <-ws.clients.UserState():
log.ZDebug(context.Background(), "OnlineCache user online change", "userID", state.UserID, "online", state.Online, "offline", state.Offline)
+3 -3
View File
@@ -162,12 +162,12 @@ func (u *userMap) DeleteClients(userID string, clients []*Client) (isDeleteUser
return true
}
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState {
func (u *userMap) GetAllUserStatus(deadline time.Time, nowtime time.Time) (result []UserState) {
u.lock.RLock()
defer u.lock.RUnlock()
result := make([]UserState, 0, len(u.data))
result = make([]UserState, 0, len(u.data))
for userID, userPlatform := range u.data {
if userPlatform.Time.Before(deadline) {
if deadline.Before(userPlatform.Time) {
continue
}
userPlatform.Time = nowtime
+2 -1
View File
@@ -198,10 +198,11 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
offlineUserIDs = append(offlineUserIDs, userID)
}
}
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 {
var err error
result, err = c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, pushToUserIDs)
result, err = c.onlinePusher.GetConnsAndOnlinePush(ctx, msg, onlineUserIDs)
if err != nil {
return nil, err
}
+73 -16
View File
@@ -16,13 +16,16 @@ package conversation
import (
"context"
"sort"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
tablerelation "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/tools/db/redisutil"
"sort"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
@@ -40,10 +43,11 @@ import (
)
type conversationServer struct {
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient
conversationDatabase controller.ConversationDatabase
conversationNotificationSender *ConversationNotificationSender
config *Config
}
@@ -204,11 +208,11 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s
}
func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) {
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil {
return nil, err
}
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tablerelation.Conversation{&conversation})
err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*dbModel.Conversation{&conversation})
if err != nil {
return nil, err
}
@@ -232,7 +236,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
}
var unequal int
var conv tablerelation.Conversation
var conv dbModel.Conversation
if len(req.UserIDs) == 1 {
cs, err := c.conversationDatabase.FindConversations(ctx, req.UserIDs[0], []string{req.Conversation.ConversationID})
if err != nil {
@@ -243,7 +247,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
conv = *cs[0]
}
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
conversation.ConversationID = req.Conversation.ConversationID
conversation.ConversationType = req.Conversation.ConversationType
conversation.UserID = req.Conversation.UserID
@@ -292,7 +296,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
}
}
if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType {
var conversations []*tablerelation.Conversation
var conversations []*dbModel.Conversation
for _, ownerUserID := range req.UserIDs {
conversation2 := conversation
conversation2.OwnerUserID = ownerUserID
@@ -340,12 +344,12 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
) (*pbconversation.CreateSingleChatConversationsResp, error) {
switch req.ConversationType {
case constant.SingleChatType:
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.SendID
conversation.UserID = req.RecvID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation)
}
@@ -353,17 +357,17 @@ func (c *conversationServer) CreateSingleChatConversations(ctx context.Context,
conversation2 := conversation
conversation2.OwnerUserID = req.RecvID
conversation2.UserID = req.SendID
err = c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation2})
err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
case constant.NotificationChatType:
var conversation tablerelation.Conversation
var conversation dbModel.Conversation
conversation.ConversationID = req.ConversationID
conversation.ConversationType = req.ConversationType
conversation.OwnerUserID = req.RecvID
conversation.UserID = req.SendID
err := c.conversationDatabase.CreateConversation(ctx, []*tablerelation.Conversation{&conversation})
err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation})
if err != nil {
log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation)
}
@@ -584,6 +588,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv
if req.MaxSeq != nil {
m["max_seq"] = req.MaxSeq.Value
}
if req.LatestMsgDestructTime != nil {
m["latest_msg_destruct_time"] = time.UnixMilli(req.LatestMsgDestructTime.Value)
}
if len(m) > 0 {
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.UserIDs, req.ConversationID, m); err != nil {
return nil, err
@@ -602,3 +609,53 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
Conversations: convert.ConversationsDB2Pb(conversations),
}, nil
}
func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
return nil, err
}
const batchNum = 100
if num == 0 {
return nil, errs.New("Need Destruct Msg is nil").Wrap()
}
maxPage := (num + batchNum - 1) / batchNum
temp := make([]*model.Conversation, 0, maxPage*batchNum)
for pageNumber := 0; pageNumber < int(maxPage); pageNumber++ {
pagination := &sdkws.RequestPagination{
PageNumber: int32(pageNumber),
ShowNumber: batchNum,
}
conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}
log.ZDebug(ctx, "PageConversationIDs success", "pageNumber", pageNumber, "conversationIDsNum", len(conversationIDs), "conversationIDs", conversationIDs)
if len(conversationIDs) == 0 {
continue
}
conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, conversationIDs)
if err != nil {
log.ZError(ctx, "GetConversationsByConversationID failed", err, "conversationIDs", conversationIDs)
continue
}
for _, conversation := range conversations {
if conversation.IsMsgDestruct && conversation.MsgDestructTime != 0 && ((time.Now().UnixMilli() > (conversation.MsgDestructTime + conversation.LatestMsgDestructTime.UnixMilli() + 8*60*60)) || // 8*60*60 is UTC+8
conversation.LatestMsgDestructTime.IsZero()) {
temp = append(temp, conversation)
}
}
}
return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}
+40 -18
View File
@@ -16,6 +16,8 @@ package group
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/callbackstruct"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -27,7 +29,6 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"time"
)
// CallbackBeforeCreateGroup callback before create group.
@@ -100,27 +101,45 @@ func (s *groupServer) webhookAfterCreateGroup(ctx context.Context, after *config
s.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupResp{}, after)
}
func (s *groupServer) webhookBeforeMemberJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMember *model.GroupMember, groupEx string) error {
func (s *groupServer) webhookBeforeMembersJoinGroup(ctx context.Context, before *config.BeforeConfig, groupMembers []*model.GroupMember, groupID string, groupEx string) error {
return webhook.WithCondition(ctx, before, func(ctx context.Context) error {
cbReq := &callbackstruct.CallbackBeforeMemberJoinGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeMemberJoinGroupCommand,
GroupID: groupMember.GroupID,
UserID: groupMember.UserID,
Ex: groupMember.Ex,
groupMembersMap := datautil.SliceToMap(groupMembers, func(e *model.GroupMember) string {
return e.UserID
})
var groupMembersCallback []*callbackstruct.CallbackGroupMember
for _, member := range groupMembers {
groupMembersCallback = append(groupMembersCallback, &callbackstruct.CallbackGroupMember{
UserID: member.UserID,
Ex: member.Ex,
})
}
cbReq := &callbackstruct.CallbackBeforeMembersJoinGroupReq{
CallbackCommand: callbackstruct.CallbackBeforeMembersJoinGroupCommand,
GroupID: groupID,
MembersList: groupMembersCallback,
GroupEx: groupEx,
}
resp := &callbackstruct.CallbackBeforeMemberJoinGroupResp{}
resp := &callbackstruct.CallbackBeforeMembersJoinGroupResp{}
if err := s.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil {
return err
}
if resp.MuteEndTime != nil {
groupMember.MuteEndTime = time.UnixMilli(*resp.MuteEndTime)
for _, memberCallbackResp := range resp.MemberCallbackList {
if _, ok := groupMembersMap[(*memberCallbackResp.UserID)]; ok {
if memberCallbackResp.MuteEndTime != nil {
groupMembersMap[(*memberCallbackResp.UserID)].MuteEndTime = time.UnixMilli(*memberCallbackResp.MuteEndTime)
}
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].FaceURL, memberCallbackResp.FaceURL)
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Ex, memberCallbackResp.Ex)
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].Nickname, memberCallbackResp.Nickname)
datautil.NotNilReplace(&groupMembersMap[(*memberCallbackResp.UserID)].RoleLevel, memberCallbackResp.RoleLevel)
}
}
datautil.NotNilReplace(&groupMember.FaceURL, resp.FaceURL)
datautil.NotNilReplace(&groupMember.Ex, resp.Ex)
datautil.NotNilReplace(&groupMember.Nickname, resp.Nickname)
datautil.NotNilReplace(&groupMember.RoleLevel, resp.RoleLevel)
return nil
})
}
@@ -244,10 +263,13 @@ func (s *groupServer) webhookBeforeInviteUserToGroup(ctx context.Context, before
return err
}
if len(resp.RefusedMembersAccount) > 0 {
// Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic
}
// Handle the scenario where certain members are refused
// You might want to update the req.Members list or handle it as per your business logic
// if len(resp.RefusedMembersAccount) > 0 {
// implement members are refused
// }
return nil
})
}
+23 -23
View File
@@ -246,7 +246,7 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
return nil, err
}
joinGroup := func(userID string, roleLevel int32) error {
joinGroupFunc := func(userID string, roleLevel int32) {
groupMember := &model.GroupMember{
GroupID: group.GroupID,
UserID: userID,
@@ -258,25 +258,23 @@ func (s *groupServer) CreateGroup(ctx context.Context, req *pbgroup.CreateGroupR
MuteEndTime: time.UnixMilli(0),
}
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return err
}
groupMembers = append(groupMembers, groupMember)
return nil
}
if err := joinGroup(req.OwnerUserID, constant.GroupOwner); err != nil {
joinGroupFunc(req.OwnerUserID, constant.GroupOwner)
for _, userID := range req.AdminUserIDs {
joinGroupFunc(userID, constant.GroupAdmin)
}
for _, userID := range req.MemberUserIDs {
joinGroupFunc(userID, constant.GroupOrdinaryUsers)
}
if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
for _, userID := range req.AdminUserIDs {
if err := joinGroup(userID, constant.GroupAdmin); err != nil {
return nil, err
}
}
for _, userID := range req.MemberUserIDs {
if err := joinGroup(userID, constant.GroupOrdinaryUsers); err != nil {
return nil, err
}
}
if err := s.db.CreateGroup(ctx, []*model.Group{group}, groupMembers); err != nil {
return nil, err
}
@@ -442,12 +440,13 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbgroup.Invite
MuteEndTime: time.UnixMilli(0),
}
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
groupMembers = append(groupMembers, member)
}
if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMembers, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
if err := s.db.CreateGroup(ctx, nil, groupMembers); err != nil {
return nil, err
}
@@ -811,9 +810,9 @@ func (s *groupServer) GroupApplicationResponse(ctx context.Context, req *pbgroup
MuteEndTime: time.Unix(0, 0),
InviterUserID: groupRequest.InviterUserID,
OperatorUserID: mcontext.GetOpUserID(ctx),
Ex: groupRequest.Ex,
}
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, member, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{member}, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
}
@@ -882,7 +881,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
MuteEndTime: time.UnixMilli(0),
}
if err := s.webhookBeforeMemberJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, groupMember, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
if err := s.webhookBeforeMembersJoinGroup(ctx, &s.config.WebhooksConfig.BeforeMemberJoinGroup, []*model.GroupMember{groupMember}, group.GroupID, group.Ex); err != nil && err != servererrs.ErrCallbackContinue {
return nil, err
}
@@ -898,6 +897,7 @@ func (s *groupServer) JoinGroup(ctx context.Context, req *pbgroup.JoinGroupReq)
return &pbgroup.JoinGroupResp{}, nil
}
groupRequest := model.GroupRequest{
UserID: req.InviterUserID,
ReqMsg: req.ReqMessage,
+66 -19
View File
@@ -2,16 +2,22 @@ package msg
import (
"context"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"strings"
"time"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
)
// hard delete in Database.
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
@@ -25,18 +31,6 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
start = time.Now()
)
clearMsg := func(ctx context.Context) (bool, error) {
conversationSeqs := make(map[string]struct{})
defer func() {
req := &conversation.UpdateConversationReq{
MsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
}
for conversationID := range conversationSeqs {
req.ConversationID = conversationID
if err := m.Conversation.UpdateConversations(ctx, req); err != nil {
log.ZError(ctx, "update conversation max seq failed", err, "conversationID", conversationID, "msgDestructTime", req.MsgDestructTime)
}
}
}()
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, 100)
if err != nil {
return false, err
@@ -44,6 +38,7 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
if len(msgs) == 0 {
return false, nil
}
for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
@@ -52,15 +47,14 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
}
docNum++
msgNum += len(index)
conversationID := msg.DocID[:strings.LastIndex(msg.DocID, ":")]
if _, ok := conversationSeqs[conversationID]; !ok {
conversationSeqs[conversationID] = struct{}{}
}
}
return true, nil
}
for {
keep, err := clearMsg(ctx)
if err != nil {
@@ -71,7 +65,60 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
log.ZInfo(ctx, "clear msg success", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
break
}
log.ZInfo(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
}
return &msg.ClearMsgResp{}, nil
}
// soft delete for self
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
temp := convert.ConversationsPb2DB(req.Conversations)
batchNum := 100
errg, _ := errgroup.WithContext(ctx)
errg.SetLimit(100)
for i := 0; i < len(temp); i += batchNum {
batch := temp[i:min(i+batchNum, len(temp))]
errg.Go(func() error {
for _, conversation := range batch {
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(handleCtx, "User MsgsDestruct",
"conversationID", conversation.ConversationID,
"ownerUserID", conversation.OwnerUserID,
"msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime)
seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
if len(seqs) > 0 {
if err := m.Conversation.UpdateConversation(handleCtx,
&pbconversation.UpdateConversationReq{
UserIDs: []string{conversation.OwnerUserID},
ConversationID: conversation.ConversationID,
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}
// if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
return nil
})
}
if err := errg.Wait(); err != nil {
return nil, err
}
return nil, nil
}
+5
View File
@@ -16,6 +16,7 @@ package msg
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
@@ -50,6 +51,7 @@ type (
ConversationLocalCache *rpccache.ConversationLocalCache // Local cache for conversation data.
Handlers MessageInterceptorChain // Chain of handlers for processing messages.
notificationSender *rpcclient.NotificationSender // RPC client for sending notifications.
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
}
@@ -117,7 +119,10 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
}
s.notificationSender = rpcclient.NewNotificationSender(&config.NotificationConfig, rpcclient.WithLocalSendMsg(s.SendMsg))
s.msgNotificationSender = NewMsgNotificationSender(config, rpcclient.WithLocalSendMsg(s.SendMsg))
msg.RegisterMsgServer(server, s)
return nil
}
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package friend
package relation
import (
"context"
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package friend
package relation
import (
"context"
@@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package friend
package relation
import (
"context"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package friend
package relation
import (
"context"
@@ -1,4 +1,4 @@
package friend
package relation
import (
"context"
+1 -2
View File
@@ -26,7 +26,6 @@ import (
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/stringutil"
)
func genLogID() string {
@@ -111,7 +110,7 @@ func dbToPbLogInfos(logs []*relationtb.Log) []*third.LogInfo {
return &third.LogInfo{
Filename: log.FileName,
UserID: log.UserID,
Platform: stringutil.StringToInt32(log.Platform),
Platform: log.Platform,
Url: log.Url,
CreateTime: log.CreateTime.UnixMilli(),
LogID: log.LogID,
+3 -3
View File
@@ -17,7 +17,7 @@ package user
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
@@ -54,7 +54,7 @@ import (
type userServer struct {
online cache.OnlineCache
db controller.UserDatabase
friendNotificationSender *friend.FriendNotificationSender
friendNotificationSender *relation.FriendNotificationSender
userNotificationSender *UserNotificationSender
friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient
@@ -105,7 +105,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)),
friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, relation.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
+53 -14
View File
@@ -17,16 +17,19 @@ package tools
import (
"context"
"fmt"
"os"
"time"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"os"
"time"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
@@ -50,34 +53,69 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()))
ctx = mcontext.SetOpUserID(ctx, config.Share.IMAdminUserID[0])
conn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg)
if err != nil {
return err
}
cli := msg.NewMsgClient(conn)
thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return err
}
conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation)
if err != nil {
return err
}
msgClient := msg.NewMsgClient(msgConn)
conversationClient := pbconversation.NewConversationClient(conversationConn)
thirdClient := third.NewThirdClient(thirdConn)
crontab := cron.New()
clearFunc := func() {
// scheduled hard delete outdated Msgs in specific time.
clearMsgFunc := func() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZInfo(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
if _, err := cli.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}
log.ZInfo(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearFunc); err != nil {
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
return errs.Wrap(err)
}
tConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third)
if err != nil {
return err
}
thirdClient := third.NewThirdClient(tConn)
// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
msgDestructFunc := func() {
now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZInfo(ctx, "msg destruct cron start", "now", now)
deleteFunc := func() {
conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
return
} else {
_, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations})
if err != nil {
log.ZError(ctx, "Destruct Msgs failed.", err)
return
}
}
log.ZInfo(ctx, "msg destruct cron task completed", "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
return errs.Wrap(err)
}
// scheduled delete outdated file Objects and their datas in specific time.
deleteObjectFunc := func() {
now := time.Now()
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
@@ -88,9 +126,10 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
}
log.ZInfo(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteFunc); err != nil {
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
return errs.Wrap(err)
}
log.ZInfo(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start()
<-ctx.Done()
+1 -1
View File
@@ -56,7 +56,7 @@ const (
CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand"
CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand"
CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand"
CallbackBeforeMemberJoinGroupCommand = "callbackBeforeMemberJoinGroupCommand"
CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand"
CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand"
CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand"
)
+17 -8
View File
@@ -59,16 +59,20 @@ type CallbackAfterCreateGroupResp struct {
CommonCallbackResp
}
type CallbackBeforeMemberJoinGroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
UserID string `json:"userID"`
Ex string `json:"ex"`
GroupEx string `json:"groupEx"`
type CallbackGroupMember struct {
UserID string `json:"userID"`
Ex string `json:"ex"`
}
type CallbackBeforeMemberJoinGroupResp struct {
CommonCallbackResp
type CallbackBeforeMembersJoinGroupReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
MembersList []*CallbackGroupMember `json:"memberList"`
GroupEx string `json:"groupEx"`
}
type MemberJoinGroupCallBack struct {
UserID *string `json:"userID"`
Nickname *string `json:"nickname"`
FaceURL *string `json:"faceURL"`
RoleLevel *int32 `json:"roleLevel"`
@@ -76,6 +80,11 @@ type CallbackBeforeMemberJoinGroupResp struct {
Ex *string `json:"ex"`
}
type CallbackBeforeMembersJoinGroupResp struct {
CommonCallbackResp
MemberCallbackList []*MemberJoinGroupCallBack `json:"memberCallbackList"`
}
type CallbackBeforeSetGroupMemberInfoReq struct {
CallbackCommand `json:"callbackCommand"`
GroupID string `json:"groupID"`
+3 -2
View File
@@ -16,8 +16,9 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -38,7 +39,7 @@ func NewApiCmd() *ApiCmd {
DiscoveryConfigFilename: &apiConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,9 +16,10 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/auth"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -40,7 +41,7 @@ func NewAuthRpcCmd() *AuthRpcCmd {
DiscoveryConfigFilename: &authConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,9 +16,10 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -43,7 +44,7 @@ func NewConversationRpcCmd() *ConversationRpcCmd {
DiscoveryConfigFilename: &conversationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,8 +16,9 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/tools"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -38,7 +39,7 @@ func NewCronTaskCmd() *CronTaskCmd {
DiscoveryConfigFilename: &cronTaskConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+20 -19
View File
@@ -16,35 +16,36 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
type FriendRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]any
friendConfig *friend.Config
ctx context.Context
configMap map[string]any
relationConfig *relation.Config
}
func NewFriendRpcCmd() *FriendRpcCmd {
var friendConfig friend.Config
ret := &FriendRpcCmd{friendConfig: &friendConfig}
var relationConfig relation.Config
ret := &FriendRpcCmd{relationConfig: &relationConfig}
ret.configMap = map[string]any{
OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig,
RedisConfigFileName: &friendConfig.RedisConfig,
MongodbConfigFileName: &friendConfig.MongodbConfig,
ShareFileName: &friendConfig.Share,
NotificationFileName: &friendConfig.NotificationConfig,
WebhooksConfigFileName: &friendConfig.WebhooksConfig,
LocalCacheConfigFileName: &friendConfig.LocalCacheConfig,
DiscoveryConfigFilename: &friendConfig.Discovery,
OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig,
RedisConfigFileName: &relationConfig.RedisConfig,
MongodbConfigFileName: &relationConfig.MongodbConfig,
ShareFileName: &relationConfig.Share,
NotificationFileName: &relationConfig.NotificationConfig,
WebhooksConfigFileName: &relationConfig.WebhooksConfig,
LocalCacheConfigFileName: &relationConfig.LocalCacheConfig,
DiscoveryConfigFilename: &relationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
@@ -56,7 +57,7 @@ func (a *FriendRpcCmd) Exec() error {
}
func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports,
a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start)
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start)
}
+3 -2
View File
@@ -16,10 +16,11 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/group"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -45,7 +46,7 @@ func NewGroupRpcCmd() *GroupRpcCmd {
DiscoveryConfigFilename: &groupConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,9 +16,10 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/msg"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -45,7 +46,7 @@ func NewMsgRpcCmd() *MsgRpcCmd {
DiscoveryConfigFilename: &msgConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+2 -2
View File
@@ -16,9 +16,9 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/internal/msggateway"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
@@ -42,7 +42,7 @@ func NewMsgGatewayCmd() *MsgGatewayCmd {
DiscoveryConfigFilename: &msgGatewayConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,8 +16,9 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/msgtransfer"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -42,7 +43,7 @@ func NewMsgTransferCmd() *MsgTransferCmd {
DiscoveryConfigFilename: &msgTransferConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,9 +16,10 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/push"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -45,7 +46,7 @@ func NewPushRpcCmd() *PushRpcCmd {
DiscoveryConfigFilename: &pushConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
ret.pushConfig.FcmConfigPath = ret.ConfigPath()
return ret.runE()
+3 -2
View File
@@ -19,6 +19,7 @@ import (
"path/filepath"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/spf13/cobra"
@@ -138,13 +139,13 @@ func (r *RootCmd) initializeLogger(cmdOpts *CmdOpts) error {
r.log.StorageLocation,
r.log.RemainRotationCount,
r.log.RotationTime,
config.Version,
version.Version,
r.log.IsSimplify,
)
if err != nil {
return errs.Wrap(err)
}
return errs.Wrap(log.InitConsoleLogger(r.processName, r.log.RemainLogLevel, r.log.IsJson, config.Version))
return errs.Wrap(log.InitConsoleLogger(r.processName, r.log.RemainLogLevel, r.log.IsJson, version.Version))
}
+3 -2
View File
@@ -16,9 +16,10 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/third"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -44,7 +45,7 @@ func NewThirdRpcCmd() *ThirdRpcCmd {
DiscoveryConfigFilename: &thirdConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
+3 -2
View File
@@ -16,9 +16,10 @@ package cmd
import (
"context"
"github.com/openimsdk/open-im-server/v3/internal/rpc/user"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
"github.com/spf13/cobra"
)
@@ -45,7 +46,7 @@ func NewUserRpcCmd() *UserRpcCmd {
DiscoveryConfigFilename: &userConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", config.Version)
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
ret.Command.RunE = func(cmd *cobra.Command, args []string) error {
return ret.runE()
}
-4
View File
@@ -15,7 +15,6 @@
package config
import (
_ "embed"
"os"
"path/filepath"
@@ -26,9 +25,6 @@ import (
"gopkg.in/yaml.v3"
)
//go:embed version
var Version string
const (
FileName = "config.yaml"
NotificationFileName = "notification.yaml"
-1
View File
@@ -1 +0,0 @@
3.7.0
+2 -2
View File
@@ -22,7 +22,7 @@ import (
func ConversationDB2Pb(conversationDB *model.Conversation) *conversation.Conversation {
conversationPB := &conversation.Conversation{}
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli()
if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
return nil
}
@@ -35,7 +35,7 @@ func ConversationsDB2Pb(conversationsDB []*model.Conversation) (conversationsPB
if err := datautil.CopyStructFields(conversationPB, conversationDB); err != nil {
continue
}
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.Unix()
conversationPB.LatestMsgDestructTime = conversationDB.LatestMsgDestructTime.UnixMilli()
conversationsPB = append(conversationsPB, conversationPB)
}
return conversationsPB
+5 -1
View File
@@ -5,6 +5,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"time"
@@ -82,8 +83,11 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil {
status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err
}
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
return nil
}
+39 -26
View File
@@ -152,42 +152,55 @@ func (f *friendDatabase) BecomeFriends(ctx context.Context, ownerUserID string,
return f.tx.Transaction(ctx, func(ctx context.Context) error {
cache := f.cache.CloneFriendCache()
// user find friends
fs1, err := f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
myFriends, err := f.friend.FindFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
return err
}
addOwners, err := f.friend.FindReversalFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
return err
}
opUserID := mcontext.GetOperationID(ctx)
for _, v := range friendUserIDs {
fs1 = append(fs1, &model.Friend{OwnerUserID: ownerUserID, FriendUserID: v, AddSource: addSource, OperatorUserID: opUserID})
}
fs11 := datautil.DistinctAny(fs1, func(e *model.Friend) string {
return e.FriendUserID
friends := make([]*model.Friend, 0, len(friendUserIDs)*2)
myFriendsSet := datautil.SliceSetAny(myFriends, func(friend *model.Friend) string {
return friend.FriendUserID
})
err = f.friend.Create(ctx, fs11)
if err != nil {
return err
}
fs2, err := f.friend.FindReversalFriends(ctx, ownerUserID, friendUserIDs)
if err != nil {
return err
}
var newFriendIDs []string
for _, v := range friendUserIDs {
fs2 = append(fs2, &model.Friend{OwnerUserID: v, FriendUserID: ownerUserID, AddSource: addSource, OperatorUserID: opUserID})
newFriendIDs = append(newFriendIDs, v)
}
fs22 := datautil.DistinctAny(fs2, func(e *model.Friend) string {
return e.OwnerUserID
addOwnersSet := datautil.SliceSetAny(addOwners, func(friend *model.Friend) string {
return friend.OwnerUserID
})
err = f.friend.Create(ctx, fs22)
newMyFriendIDs := make([]string, 0, len(friendUserIDs))
newMyOwnerIDs := make([]string, 0, len(friendUserIDs))
for _, userID := range friendUserIDs {
if ownerUserID == userID {
continue
}
if _, ok := myFriendsSet[userID]; !ok {
myFriendsSet[userID] = struct{}{}
newMyFriendIDs = append(newMyFriendIDs, userID)
friends = append(friends, &model.Friend{OwnerUserID: ownerUserID, FriendUserID: userID, AddSource: addSource, OperatorUserID: opUserID})
}
if _, ok := addOwnersSet[userID]; !ok {
addOwnersSet[userID] = struct{}{}
newMyOwnerIDs = append(newMyOwnerIDs, userID)
friends = append(friends, &model.Friend{OwnerUserID: userID, FriendUserID: ownerUserID, AddSource: addSource, OperatorUserID: opUserID})
}
}
if len(friends) == 0 {
return nil
}
err = f.friend.Create(ctx, friends)
if err != nil {
return err
}
newFriendIDs = append(newFriendIDs, ownerUserID)
cache = cache.DelFriendIDs(newFriendIDs...).DelMaxFriendVersion(newFriendIDs...)
if len(newMyFriendIDs) > 0 {
cache = cache.DelFriendIDs(newMyFriendIDs...)
cache = cache.DelFriends(ownerUserID, newMyFriendIDs).DelMaxFriendVersion(newMyFriendIDs...)
}
if len(newMyOwnerIDs) > 0 {
cache = cache.DelFriendIDs(newMyOwnerIDs...)
cache = cache.DelOwner(ownerUserID, newMyOwnerIDs).DelMaxFriendVersion(newMyOwnerIDs...)
}
return cache.ChainExecDel(ctx)
})
}
+11
View File
@@ -377,8 +377,19 @@ func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID prim
if !nextID.IsZero() {
pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}})
}
coarseFilter := bson.M{
"$or": bson.A{
bson.M{
"doc_id": primitive.Regex{Pattern: "^sg_"},
},
bson.M{
"doc_id": primitive.Regex{Pattern: "^si_"},
},
},
}
pipeline = append(pipeline,
bson.M{"$sort": bson.M{"_id": 1}},
bson.M{"$match": coarseFilter},
bson.M{"$match": filter},
bson.M{"$limit": limit},
bson.M{
+43
View File
@@ -14,6 +14,11 @@
package rpccache
import (
"github.com/openimsdk/tools/errs"
"google.golang.org/protobuf/proto"
)
func newListMap[V comparable](values []V, err error) (*listMap[V], error) {
if err != nil {
return nil, err
@@ -32,3 +37,41 @@ type listMap[V comparable] struct {
List []V
Map map[V]struct{}
}
func respProtoMarshal(resp proto.Message, err error) ([]byte, error) {
if err != nil {
return nil, err
}
return proto.Marshal(resp)
}
func cacheUnmarshal[V any](resp []byte, err error) (*V, error) {
if err != nil {
return nil, err
}
var val V
if err := proto.Unmarshal(resp, any(&val).(proto.Message)); err != nil {
return nil, errs.WrapMsg(err, "local cache proto.Unmarshal error")
}
return &val, nil
}
type cacheProto[V any] struct{}
func (cacheProto[V]) Marshal(resp *V, err error) ([]byte, error) {
if err != nil {
return nil, err
}
return proto.Marshal(any(resp).(proto.Message))
}
func (cacheProto[V]) Unmarshal(resp []byte, err error) (*V, error) {
if err != nil {
return nil, err
}
var val V
if err := proto.Unmarshal(resp, any(&val).(proto.Message)); err != nil {
return nil, errs.WrapMsg(err, "local cache proto.Unmarshal error")
}
return &val, nil
}
+38 -17
View File
@@ -23,6 +23,7 @@ import (
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
@@ -36,7 +37,7 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach
log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &ConversationLocalCache{
client: client,
local: localcache.New[any](
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
@@ -52,21 +53,30 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach
type ConversationLocalCache struct {
client rpcclient.ConversationRpcClient
local localcache.Cache[any]
local localcache.Cache[[]byte]
}
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID)
resp, err := c.getConversationIDs(ctx, ownerUserID)
if err != nil {
return nil, err
}
return resp.ConversationIDs, nil
}
func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUserID string) (val *pbconversation.GetConversationIDsResp, err error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationIDs req", "ownerUserID", ownerUserID)
defer func() {
if err == nil {
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val)
log.ZDebug(ctx, "ConversationLocalCache getConversationIDs return", "ownerUserID", ownerUserID, "value", val)
} else {
log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err)
log.ZError(ctx, "ConversationLocalCache getConversationIDs return", err, "ownerUserID", ownerUserID)
}
}()
return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID)
return c.client.GetConversationIDs(ctx, ownerUserID)
var cache cacheProto[pbconversation.GetConversationIDsResp]
return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationIDs rpc", "ownerUserID", ownerUserID)
return cache.Marshal(c.client.Client.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID}))
}))
}
@@ -74,14 +84,15 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co
log.ZDebug(ctx, "ConversationLocalCache GetConversation req", "userID", userID, "conversationID", conversationID)
defer func() {
if err == nil {
log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "value", val)
log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val)
} else {
log.ZError(ctx, "ConversationLocalCache GetConversation return", err)
log.ZError(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID)
}
}()
return localcache.AnyValue[*pbconversation.Conversation](c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) (any, error) {
var cache cacheProto[pbconversation.Conversation]
return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache GetConversation rpc", "userID", userID, "conversationID", conversationID)
return c.client.GetConversation(ctx, userID, conversationID)
return cache.Marshal(c.client.GetConversation(ctx, userID, conversationID))
}))
}
@@ -126,9 +137,19 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser
return conversations, nil
}
func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (*listMap[string], error) {
return localcache.AnyValue[*listMap[string]](c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) (any, error) {
return newListMap(c.client.GetConversationNotReceiveMessageUserIDs(ctx, conversationID))
func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (val *pbconversation.GetConversationNotReceiveMessageUserIDsResp, err error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs req", "conversationID", conversationID)
defer func() {
if err == nil {
log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs return", "conversationID", conversationID, "value", val)
} else {
log.ZError(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs return", err, "conversationID", conversationID)
}
}()
var cache cacheProto[pbconversation.GetConversationNotReceiveMessageUserIDsResp]
return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs rpc", "conversationID", conversationID)
return cache.Marshal(c.client.Client.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID}))
}))
}
@@ -137,7 +158,7 @@ func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx con
if err != nil {
return nil, err
}
return res.List, nil
return res.UserIDs, nil
}
func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx context.Context, conversationID string) (map[string]struct{}, error) {
@@ -145,5 +166,5 @@ func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx c
if err != nil {
return nil, err
}
return res.Map, nil
return datautil.SliceSet(res.UserIDs), nil
}
+36 -16
View File
@@ -16,7 +16,8 @@ package rpccache
import (
"context"
cachekey2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
@@ -30,7 +31,7 @@ func NewFriendLocalCache(client rpcclient.FriendRpcClient, localCache *config.Lo
log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &FriendLocalCache{
client: client,
local: localcache.New[any](
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
@@ -46,36 +47,55 @@ func NewFriendLocalCache(client rpcclient.FriendRpcClient, localCache *config.Lo
type FriendLocalCache struct {
client rpcclient.FriendRpcClient
local localcache.Cache[any]
local localcache.Cache[[]byte]
}
func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
res, err := f.isFriend(ctx, possibleFriendUserID, userID)
if err != nil {
return false, err
}
return res.InUser1Friends, nil
}
func (f *FriendLocalCache) isFriend(ctx context.Context, possibleFriendUserID, userID string) (val *relation.IsFriendResp, err error) {
log.ZDebug(ctx, "FriendLocalCache isFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "FriendLocalCache IsFriend return", "value", val)
log.ZDebug(ctx, "FriendLocalCache isFriend return", "possibleFriendUserID", possibleFriendUserID, "userID", userID, "value", val)
} else {
log.ZError(ctx, "FriendLocalCache IsFriend return", err)
log.ZError(ctx, "FriendLocalCache isFriend return", err, "possibleFriendUserID", possibleFriendUserID, "userID", userID)
}
}()
return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey2.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
return f.client.IsFriend(ctx, possibleFriendUserID, userID)
}, cachekey2.GetFriendIDsKey(possibleFriendUserID)))
var cache cacheProto[relation.IsFriendResp]
return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "FriendLocalCache isFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
return cache.Marshal(f.client.Client.IsFriend(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}))
}, cachekey.GetFriendIDsKey(possibleFriendUserID)))
}
// IsBlack possibleBlackUserID selfUserID.
func (f *FriendLocalCache) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (val bool, err error) {
log.ZDebug(ctx, "FriendLocalCache IsBlack req", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
res, err := f.isBlack(ctx, possibleBlackUserID, userID)
if err != nil {
return false, err
}
return res.InUser2Blacks, nil
}
// IsBlack possibleBlackUserID selfUserID.
func (f *FriendLocalCache) isBlack(ctx context.Context, possibleBlackUserID, userID string) (val *relation.IsBlackResp, err error) {
log.ZDebug(ctx, "FriendLocalCache isBlack req", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "FriendLocalCache IsBlack return", "value", val)
log.ZDebug(ctx, "FriendLocalCache isBlack return", "possibleBlackUserID", possibleBlackUserID, "userID", userID, "value", val)
} else {
log.ZError(ctx, "FriendLocalCache IsBlack return", err)
log.ZError(ctx, "FriendLocalCache isBlack return", err, "possibleBlackUserID", possibleBlackUserID, "userID", userID)
}
}()
return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey2.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) {
var cache cacheProto[relation.IsBlackResp]
return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
return f.client.IsBlack(ctx, possibleBlackUserID, userID)
}, cachekey2.GetBlackIDsKey(userID)))
return cache.Marshal(f.client.Client.IsBlack(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}))
}, cachekey.GetBlackIDsKey(userID)))
}
+22 -17
View File
@@ -17,6 +17,8 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
@@ -32,7 +34,7 @@ func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.Loca
log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &GroupLocalCache{
client: client,
local: localcache.New[any](
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
@@ -48,21 +50,22 @@ func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.Loca
type GroupLocalCache struct {
client rpcclient.GroupRpcClient
local localcache.Cache[any]
local localcache.Cache[[]byte]
}
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) {
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *group.GetGroupMemberUserIDsResp, err error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val)
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "groupID", groupID, "value", val)
} else {
log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err)
log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err, "groupID", groupID)
}
}()
return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
var cache cacheProto[group.GetGroupMemberUserIDsResp]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID)
return newListMap(g.client.GetGroupMemberIDs(ctx, groupID))
return cache.Marshal(g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID}))
}))
}
@@ -70,14 +73,15 @@ func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID st
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID, "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val)
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "groupID", groupID, "userID", userID, "value", val)
} else {
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err)
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err, "groupID", groupID, "userID", userID)
}
}()
return localcache.AnyValue[*sdkws.GroupMemberFullInfo](g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) (any, error) {
var cache cacheProto[sdkws.GroupMemberFullInfo]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID)
return g.client.GetGroupMemberCache(ctx, groupID, userID)
return cache.Marshal(g.client.GetGroupMemberCache(ctx, groupID, userID))
}))
}
@@ -85,14 +89,15 @@ func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID)
defer func() {
if err == nil {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val)
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "groupID", groupID, "value", val)
} else {
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err)
log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err, "groupID", groupID)
}
}()
return localcache.AnyValue[*sdkws.GroupInfo](g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) (any, error) {
var cache cacheProto[sdkws.GroupInfo]
return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID)
return g.client.GetGroupInfoCache(ctx, groupID)
return cache.Marshal(g.client.GetGroupInfoCache(ctx, groupID))
}))
}
@@ -101,7 +106,7 @@ func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string)
if err != nil {
return nil, err
}
return res.List, nil
return res.UserIDs, nil
}
func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) {
@@ -109,7 +114,7 @@ func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID strin
if err != nil {
return nil, err
}
return res.Map, nil
return datautil.SliceSet(res.UserIDs), nil
}
func (g *GroupLocalCache) GetGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) {
+53 -37
View File
@@ -28,7 +28,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue
}
storageCache := x.setUserOnline(userID, platformIDs)
@@ -47,53 +47,69 @@ type OnlineCache struct {
local lru.LRU[string, []int32]
}
func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
return o.local.Get(userID, func() ([]int32, error) {
func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
platformIDs, err := o.local.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID)
})
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
return nil, err
}
log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs)
return platformIDs, nil
}
func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
platformIDs, err := o.getUserOnlinePlatform(ctx, userID)
if err != nil {
return nil, err
}
tmp := make([]int32, len(platformIDs))
copy(tmp, platformIDs)
return platformIDs, nil
}
func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
platformIDs, err := o.GetUserOnlinePlatform(ctx, userID)
platformIDs, err := o.getUserOnlinePlatform(ctx, userID)
if err != nil {
return false, err
}
return len(platformIDs) > 0, nil
}
func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
onlineUserIDs := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
}
}
log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
return onlineUserIDs, nil
}
func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
var onlineUserIDs []string
for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
}
}
log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
return onlineUserIDs, nil
}
//func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
// onlineUserIDs := make([]string, 0, len(userIDs))
// for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID)
// if err != nil {
// return nil, err
// }
// if online {
// onlineUserIDs = append(onlineUserIDs, userID)
// }
// }
// log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
// return onlineUserIDs, nil
//}
//
//func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
// userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return nil, err
// }
// var onlineUserIDs []string
// for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID)
// if err != nil {
// return nil, err
// }
// if online {
// onlineUserIDs = append(onlineUserIDs, userID)
// }
// }
// log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
// return onlineUserIDs, nil
//}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool {
return o.local.SetHas(userID, platformIDs)
+21 -26
View File
@@ -16,12 +16,12 @@ package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
@@ -32,7 +32,7 @@ func NewUserLocalCache(client rpcclient.UserRpcClient, localCache *config.LocalC
log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable())
x := &UserLocalCache{
client: client,
local: localcache.New[any](
local: localcache.New[[]byte](
localcache.WithLocalSlotNum(lc.SlotNum),
localcache.WithLocalSlotSize(lc.SlotSize),
localcache.WithLinkSlotNum(lc.SlotNum),
@@ -48,7 +48,7 @@ func NewUserLocalCache(client rpcclient.UserRpcClient, localCache *config.LocalC
type UserLocalCache struct {
client rpcclient.UserRpcClient
local localcache.Cache[any]
local localcache.Cache[[]byte]
}
func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) {
@@ -60,24 +60,34 @@ func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *s
log.ZError(ctx, "UserLocalCache GetUserInfo return", err)
}
}()
return localcache.AnyValue[*sdkws.UserInfo](u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) (any, error) {
var cache cacheProto[sdkws.UserInfo]
return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID)
return u.client.GetUserInfo(ctx, userID)
return cache.Marshal(u.client.GetUserInfo(ctx, userID))
}))
}
func (u *UserLocalCache) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val int32, err error) {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt req", "userID", userID)
resp, err := u.getUserGlobalMsgRecvOpt(ctx, userID)
if err != nil {
return 0, err
}
return resp.GlobalRecvMsgOpt, nil
}
func (u *UserLocalCache) getUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val *user.GetGlobalRecvMessageOptResp, err error) {
log.ZDebug(ctx, "UserLocalCache getUserGlobalMsgRecvOpt req", "userID", userID)
defer func() {
if err == nil {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", "value", val)
log.ZDebug(ctx, "UserLocalCache getUserGlobalMsgRecvOpt return", "value", val)
} else {
log.ZError(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", err)
log.ZError(ctx, "UserLocalCache getUserGlobalMsgRecvOpt return", err)
}
}()
return localcache.AnyValue[int32](u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) (any, error) {
var cache cacheProto[user.GetGlobalRecvMessageOptResp]
return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) ([]byte, error) {
log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID)
return u.client.GetUserGlobalMsgRecvOpt(ctx, userID)
return cache.Marshal(u.client.Client.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{UserID: userID}))
}))
}
@@ -110,18 +120,3 @@ func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string)
}
return users, nil
}
//func (u *UserLocalCache) GetUserOnlinePlatform(ctx context.Context, userID string) (val []int32, err error) {
// log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform req", "userID", userID)
// defer func() {
// if err == nil {
// log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform return", "value", val)
// } else {
// log.ZError(ctx, "UserLocalCache GetUserOnlinePlatform return", err)
// }
// }()
// return localcache.AnyValue[[]int32](u.local.Get(ctx, cachekey.GetOnlineKey(userID), func(ctx context.Context) (any, error) {
// log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID)
// return u.client.GetUserGlobalMsgRecvOpt(ctx, userID)
// }))
//}
+9 -1
View File
@@ -82,7 +82,7 @@ func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []
return err
}
func (c *ConversationRpcClient) UpdateConversations(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
func (c *ConversationRpcClient) UpdateConversation(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error {
_, err := c.Client.UpdateConversation(ctx, conversation)
return err
}
@@ -146,3 +146,11 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont
}
return resp.UserIDs, nil
}
func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) {
resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
if err != nil {
return nil, err
}
return resp.Conversations, nil
}
+4
View File
@@ -324,6 +324,10 @@ func (s *NotificationSender) send(ctx context.Context, sendID, recvID string, co
options := config.GetOptionsByNotification(optionsConfig)
s.SetOptionsByContentType(ctx, options, contentType)
msg.Options = options
// fill Notification OfflinePush by config
offlineInfo.Title = optionsConfig.OfflinePush.Title
offlineInfo.Desc = optionsConfig.OfflinePush.Desc
offlineInfo.Ex = optionsConfig.OfflinePush.Ext
msg.OfflinePushInfo = &offlineInfo
req.MsgData = &msg
_, err = s.sendMsg(ctx, &req)
+1
View File
@@ -0,0 +1 @@
3.8.0
+6
View File
@@ -0,0 +1,6 @@
package version
import _ "embed"
//go:embed version
var Version string