mirror of
https://github.com/openimsdk/open-im-server.git
synced 2026-04-29 23:09:19 +08:00
feat(release-v3.4): Enhanced Build, Deployment, and Configuration Management for Optimized Performance and Automation (#1255)
* docs: add openim docs Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * docs: add openim images test Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: fix openim ci and deployment Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: fix openim ci and deployment Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * feat: add openim flag api configpath env set Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim push logger Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> * fix: fix openim config path Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com> --------- Signed-off-by: Xinwei Xiong(cubxxw) <3293172751nss@gmail.com>
This commit is contained in:
@@ -0,0 +1,204 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/OpenIMSDK/protocol/constant"
|
||||
"github.com/OpenIMSDK/protocol/msg"
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
"github.com/OpenIMSDK/tools/mw"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
|
||||
pbmsg "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/proto/msg"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
var (
|
||||
topic = "ws2ms_chat" // v2版本配置文件kafka.topic.ws2ms_chat
|
||||
kafkaAddr = "127.0.0.1:9092" // v2版本配置文件kafka.topic.addr
|
||||
rpcAddr = "127.0.0.1:10130" // v3版本配置文件rpcPort.openImMessagePort
|
||||
adminUserID = "openIM123456" // v3版本管理员userID
|
||||
concurrency = 1 // 并发数量
|
||||
)
|
||||
|
||||
getRpcConn := func() (*grpc.ClientConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
return grpc.DialContext(ctx, rpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), mw.GrpcClient())
|
||||
}
|
||||
conn, err := getRpcConn()
|
||||
if err != nil {
|
||||
log.Println("get rpc conn", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
msgClient := msg.NewMsgClient(conn)
|
||||
|
||||
conf := sarama.NewConfig()
|
||||
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
|
||||
|
||||
consumer, err := sarama.NewConsumer([]string{kafkaAddr}, conf)
|
||||
if err != nil {
|
||||
log.Println("kafka consumer conn", err)
|
||||
return
|
||||
}
|
||||
partitions, err := consumer.Partitions(topic) // Get all partitions according to topic
|
||||
if err != nil {
|
||||
log.Println("kafka partitions", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(partitions) == 0 {
|
||||
log.Println("kafka partitions is empty")
|
||||
return
|
||||
}
|
||||
log.Println("kafka partitions", partitions)
|
||||
|
||||
msgCh := make(chan *pbmsg.MsgDataToMQ, concurrency*2)
|
||||
|
||||
var kfkWg sync.WaitGroup
|
||||
|
||||
distinct := make(map[string]struct{})
|
||||
var lock sync.Mutex
|
||||
|
||||
for _, partition := range partitions {
|
||||
kfkWg.Add(1)
|
||||
go func(partition int32) {
|
||||
defer kfkWg.Done()
|
||||
pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
|
||||
if err != nil {
|
||||
log.Printf("kafka Consume Partition %d failed %s\n", partition, err)
|
||||
return
|
||||
}
|
||||
defer pc.Close()
|
||||
ch := pc.Messages()
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Second * 10): // 10s读取不到就关闭
|
||||
return
|
||||
case message, ok := <-ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
msgFromMQV2 := pbmsg.MsgDataToMQ{}
|
||||
err := proto.Unmarshal(message.Value, &msgFromMQV2)
|
||||
if err != nil {
|
||||
log.Printf("kafka msg partition %d offset %d unmarshal failed %s\n", message.Partition, message.Offset, message.Value)
|
||||
continue
|
||||
}
|
||||
if msgFromMQV2.MsgData == nil || msgFromMQV2.OperationID == "" {
|
||||
continue
|
||||
}
|
||||
if msgFromMQV2.MsgData.ContentType < constant.ContentTypeBegin || msgFromMQV2.MsgData.ContentType > constant.AdvancedText {
|
||||
continue
|
||||
}
|
||||
lock.Lock()
|
||||
_, exist := distinct[msgFromMQV2.MsgData.ClientMsgID]
|
||||
if !exist {
|
||||
distinct[msgFromMQV2.MsgData.ClientMsgID] = struct{}{}
|
||||
}
|
||||
lock.Unlock()
|
||||
if exist {
|
||||
continue
|
||||
}
|
||||
msgCh <- &msgFromMQV2
|
||||
}
|
||||
}
|
||||
}(partition)
|
||||
}
|
||||
|
||||
go func() {
|
||||
kfkWg.Wait()
|
||||
close(msgCh)
|
||||
}()
|
||||
|
||||
var msgWg sync.WaitGroup
|
||||
|
||||
var (
|
||||
success int64
|
||||
failed int64
|
||||
)
|
||||
for i := 0; i < concurrency; i++ {
|
||||
msgWg.Add(1)
|
||||
go func() {
|
||||
defer msgWg.Done()
|
||||
for message := range msgCh {
|
||||
HandlerV2Msg(msgClient, adminUserID, message, &success, &failed)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
msgWg.Wait()
|
||||
log.Printf("total %d success %d failed %d\n", success+failed, success, failed)
|
||||
}
|
||||
|
||||
func HandlerV2Msg(msgClient msg.MsgClient, adminUserID string, msgFromMQV2 *pbmsg.MsgDataToMQ, success *int64, failed *int64) {
|
||||
msgData := &sdkws.MsgData{
|
||||
SendID: msgFromMQV2.MsgData.SendID,
|
||||
RecvID: msgFromMQV2.MsgData.RecvID,
|
||||
GroupID: msgFromMQV2.MsgData.GroupID,
|
||||
ClientMsgID: msgFromMQV2.MsgData.ClientMsgID,
|
||||
ServerMsgID: msgFromMQV2.MsgData.ServerMsgID,
|
||||
SenderPlatformID: msgFromMQV2.MsgData.SenderPlatformID,
|
||||
SenderNickname: msgFromMQV2.MsgData.SenderNickname,
|
||||
SenderFaceURL: msgFromMQV2.MsgData.SenderFaceURL,
|
||||
SessionType: msgFromMQV2.MsgData.SessionType,
|
||||
MsgFrom: msgFromMQV2.MsgData.MsgFrom,
|
||||
ContentType: msgFromMQV2.MsgData.ContentType,
|
||||
SendTime: msgFromMQV2.MsgData.SendTime,
|
||||
CreateTime: msgFromMQV2.MsgData.CreateTime,
|
||||
Status: msgFromMQV2.MsgData.Status,
|
||||
IsRead: false,
|
||||
Options: msgFromMQV2.MsgData.Options,
|
||||
AtUserIDList: msgFromMQV2.MsgData.AtUserIDList,
|
||||
AttachedInfo: msgFromMQV2.MsgData.AttachedInfo,
|
||||
Ex: msgFromMQV2.MsgData.Ex,
|
||||
}
|
||||
|
||||
if msgFromMQV2.MsgData.OfflinePushInfo != nil {
|
||||
msgData.OfflinePushInfo = &sdkws.OfflinePushInfo{
|
||||
Title: msgFromMQV2.MsgData.OfflinePushInfo.Title,
|
||||
Desc: msgFromMQV2.MsgData.OfflinePushInfo.Desc,
|
||||
Ex: msgFromMQV2.MsgData.OfflinePushInfo.Ex,
|
||||
IOSPushSound: msgFromMQV2.MsgData.OfflinePushInfo.IOSPushSound,
|
||||
IOSBadgeCount: msgFromMQV2.MsgData.OfflinePushInfo.IOSBadgeCount,
|
||||
SignalInfo: "",
|
||||
}
|
||||
}
|
||||
switch msgData.ContentType {
|
||||
case constant.Text:
|
||||
data, err := json.Marshal(apistruct.TextElem{
|
||||
Content: string(msgFromMQV2.MsgData.Content),
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
msgData.Content = data
|
||||
default:
|
||||
msgData.Content = msgFromMQV2.MsgData.Content
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
ctx = context.WithValue(context.Background(), constant.OperationID, msgFromMQV2.OperationID)
|
||||
ctx = context.WithValue(ctx, constant.OpUserID, adminUserID)
|
||||
|
||||
resp, err := msgClient.SendMsg(ctx, &msg.SendMsgReq{MsgData: msgData})
|
||||
if err != nil {
|
||||
atomic.AddInt64(failed, 1)
|
||||
log.Printf("send msg %+v failed %s\n", msgData, err)
|
||||
return
|
||||
}
|
||||
atomic.AddInt64(success, 1)
|
||||
log.Printf("send msg success %+v resp %+v\n", msgData, resp)
|
||||
}
|
||||
Reference in New Issue
Block a user