2021-05-26 19:24:25 +08:00
package logic
import (
2021-10-11 22:12:01 +08:00
"Open_IM/pkg/common/config"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/common/constant"
2021-10-11 22:12:01 +08:00
"Open_IM/pkg/common/kafka"
2022-05-11 18:33:48 +08:00
"Open_IM/pkg/statistics"
2022-04-20 18:53:23 +08:00
"fmt"
2022-05-11 18:33:48 +08:00
"sync"
2021-05-26 19:24:25 +08:00
)
2022-05-11 20:49:47 +08:00
const OnlineTopicBusy = 1
const OnlineTopicVacancy = 0
2022-05-19 12:25:46 +08:00
const Msg = 2
2022-05-20 13:33:38 +08:00
const ConsumerMsgs = 3
2022-05-28 18:10:08 +08:00
const AggregationMessages = 4
2022-05-24 21:14:39 +08:00
const MongoMessages = 5
2022-05-25 19:01:36 +08:00
const ChannelNum = 100
2022-05-11 18:33:48 +08:00
2021-05-26 19:24:25 +08:00
var (
2022-05-11 18:33:48 +08:00
persistentCH PersistentConsumerHandler
2022-06-16 20:35:27 +08:00
historyCH OnlineHistoryRedisConsumerHandler
historyMongoCH OnlineHistoryMongoConsumerHandler
2022-05-11 18:33:48 +08:00
producer * kafka . Producer
2022-06-16 12:21:49 +08:00
producerToMongo * kafka . Producer
2022-05-11 18:33:48 +08:00
cmdCh chan Cmd2Value
2022-05-11 20:49:47 +08:00
onlineTopicStatus int
2022-05-11 18:33:48 +08:00
w * sync . Mutex
singleMsgSuccessCount uint64
groupMsgCount uint64
singleMsgFailedCount uint64
2022-05-18 08:06:05 +08:00
singleMsgSuccessCountMutex sync . Mutex
2021-05-26 19:24:25 +08:00
)
func Init ( ) {
2022-05-11 18:33:48 +08:00
cmdCh = make ( chan Cmd2Value , 10000 )
2022-05-11 18:38:58 +08:00
w = new ( sync . Mutex )
2022-08-08 11:30:10 +08:00
persistentCH . Init ( ) // 订阅ws2mschat 消费到 mysql
historyCH . Init ( cmdCh ) // 订阅ws2mschat 如果可靠性存储 消费到 incrseq 再存入mongo 再push || 非可靠性 直接incr再push 初始化ws2mschat
2022-06-17 10:37:43 +08:00
historyMongoCH . Init ( )
2022-05-11 20:49:47 +08:00
onlineTopicStatus = OnlineTopicVacancy
2022-05-23 11:57:41 +08:00
//offlineHistoryCH.Init(cmdCh)
2022-05-11 18:33:48 +08:00
statistics . NewStatistics ( & singleMsgSuccessCount , config . Config . ModuleName . MsgTransferName , fmt . Sprintf ( "%d second singleMsgCount insert to mongo" , constant . StatisticsTimeInterval ) , constant . StatisticsTimeInterval )
statistics . NewStatistics ( & groupMsgCount , config . Config . ModuleName . MsgTransferName , fmt . Sprintf ( "%d second groupMsgCount insert to mongo" , constant . StatisticsTimeInterval ) , constant . StatisticsTimeInterval )
2021-05-26 19:24:25 +08:00
producer = kafka . NewKafkaProducer ( config . Config . Kafka . Ms2pschat . Addr , config . Config . Kafka . Ms2pschat . Topic )
2022-06-16 12:21:49 +08:00
producerToMongo = kafka . NewKafkaProducer ( config . Config . Kafka . MsgToMongo . Addr , config . Config . Kafka . MsgToMongo . Topic )
2021-05-26 19:24:25 +08:00
}
func Run ( ) {
//register mysqlConsumerHandler to
2022-04-20 18:53:23 +08:00
if config . Config . ChatPersistenceMysql {
go persistentCH . persistentConsumerGroup . RegisterHandleAndConsumer ( & persistentCH )
} else {
fmt . Println ( "not start mysql consumer" )
}
2021-05-26 19:24:25 +08:00
go historyCH . historyConsumerGroup . RegisterHandleAndConsumer ( & historyCH )
2022-06-17 10:37:43 +08:00
go historyMongoCH . historyConsumerGroup . RegisterHandleAndConsumer ( & historyMongoCH )
2022-05-21 19:17:31 +08:00
//go offlineHistoryCH.historyConsumerGroup.RegisterHandleAndConsumer(&offlineHistoryCH)
2022-05-11 18:33:48 +08:00
}
2022-05-11 20:49:47 +08:00
func SetOnlineTopicStatus ( status int ) {
2022-05-11 18:33:48 +08:00
w . Lock ( )
defer w . Unlock ( )
onlineTopicStatus = status
}
2022-05-11 20:49:47 +08:00
func GetOnlineTopicStatus ( ) int {
2022-05-11 18:33:48 +08:00
w . Lock ( )
defer w . Unlock ( )
return onlineTopicStatus
2021-05-26 19:24:25 +08:00
}