@@ -0,0 +1,872 @@
< ? php
namespace app\command ;
use Symfony\Component\Console\Command\Command ;
use Symfony\Component\Console\Input\InputInterface ;
use Symfony\Component\Console\Input\InputOption ;
use Symfony\Component\Console\Output\OutputInterface ;
// 引入内置进度条类
use Symfony\Component\Console\Helper\ProgressBar ;
use support\think\Db ;
class MigrateMessages extends Command
{
protected static $defaultName = 'migrate:messages' ;
protected static $defaultDescription = '从老数据库迁移数据到新OpenIM数据库' ;
private $sdk = null ;
private $oldManager = null ;
private $newManager = null ;
private $retry = 3 ;
private $delay = 2 ;
private $backupDir = '/vol3/1000/code/im/admin/backup' ;
private $currentBackup = null ;
private $skipUsers = [];
private $skipGroups = [];
private $stats = [
'users' => [ 'total' => 0 , 'success' => 0 , 'failed' => 0 ],
'groups' => [ 'total' => 0 , 'success' => 0 , 'failed' => 0 ],
'members' => [ 'total' => 0 , 'success' => 0 , 'failed' => 0 ],
'messages' => [ 'total' => 0 , 'success' => 0 , 'failed' => 0 , 'skipped' => 0 ],
];
protected function configure () : void
{
$this -> addOption ( 'step' , 's' , InputOption :: VALUE_OPTIONAL , '执行步骤: users/groups/members/messages/all' , 'all' );
$this -> addOption ( 'skip-users' , null , InputOption :: VALUE_OPTIONAL , '跳过的用户ID(逗号分隔)' );
$this -> addOption ( 'skip-groups' , null , InputOption :: VALUE_OPTIONAL , '跳过的群ID(逗号分隔)' );
$this -> addOption ( 'clean' , null , InputOption :: VALUE_NONE , '清空现有数据后再迁移' );
$this -> addOption ( 'retry' , 'r' , InputOption :: VALUE_OPTIONAL , '失败重试次数' , 3 );
}
protected function execute ( InputInterface $input , OutputInterface $output ) : int
{
$step = $input -> getOption ( 'step' );
$skipUsers = $input -> getOption ( 'skip-users' ) ? explode ( ',' , $input -> getOption ( 'skip-users' )) : [];
$skipGroups = $input -> getOption ( 'skip-groups' ) ? explode ( ',' , $input -> getOption ( 'skip-groups' )) : [];
$clean = $input -> getOption ( 'clean' );
$retry = ( int ) $input -> getOption ( 'retry' );
// 自动忽略特殊用户
$defaultSkipUsers = [ 'group_bot' , 'official_team' , 'system' , 'imAdmin' ];
$skipUsers = array_merge ( $skipUsers , $defaultSkipUsers );
$this -> skipUsers = array_unique ( $skipUsers );
$this -> skipGroups = array_unique ( $skipGroups );
$this -> retry = $retry ;
$this -> log ( $output , " ╔════════════════════════════════════════════════════════════╗ " );
$this -> log ( $output , " ║ OpenIM 数据迁移工具 v2.0 ║ " );
$this -> log ( $output , " ╚════════════════════════════════════════════════════════════╝ " );
$this -> log ( $output , " " );
if ( $clean ) {
$this -> log ( $output , " 🗑️ 清理模式:会清空现有数据 " );
}
$this -> log ( $output , " 📍 执行步骤: { $step } " );
$this -> log ( $output , " " );
if ( $step == 'restore' ){
$this -> restoreMongoDB ( $output , '/vol3/1000/code/im/admin/backup/openim_v3_groups_20260413141105.json' );
return 0 ;
}
$this -> cleanExistingData ( $output ,[]);
try {
$this -> initConnections ( $output );
//return 0 ;
if ( $clean ) {
$this -> cleanExistingData ( $output ,[
'conversation' , 'conversation_version' , // 会话相关集合
'data_version' , // 数据版本集合
'friend' , 'friend_request' , 'friend_version' , // 好友关系相关集合
'group' , 'group_join_version' , 'group_member' , 'group_member_version' , 'group_request' , // 群组相关集合
'msg' , 'seq' , 'seq_user' // 消息和序列号相关集合
]);
return 0 ;
}
cache ( 'admin_token_imAdmin' , null );
$steps = $step === 'all' ? [
'users' ,
'friends' ,
'groups' ,
//'members',
'messages'
] : [ $step ];
foreach ( $steps as $s ) {
// 备份数据
$backupFile = $this -> backupMongoDB ( $output , $s );
try {
switch ( $s ) {
case 'users' :
$this -> migrateUsers ( $output );
break ;
case 'friends' :
$this -> migrateFriends ( $output );
break ;
case 'groups' :
$this -> migrateGroups ( $output );
break ;
case 'members' :
$this -> migrateGroupMembers ( $output );
break ;
case 'messages' :
$this -> migrateMessages ( $output );
break ;
}
} catch ( \Exception $e ) {
// 遇到错误,回滚数据
if ( ! empty ( $backupFile )) {
$this -> restoreMongoDB ( $output , $backupFile );
}
throw $e ;
}
}
$this -> printStats ( $output );
return self :: SUCCESS ;
} catch ( \Exception $e ) {
$this -> log ( $output , " ❌ 错误: " . $e -> getMessage ());
//$this->log($output, $e->getTraceAsString());
return self :: FAILURE ;
}
}
private function migrateUsers ( OutputInterface $output ) : void
{
//之前残留了一部分数据,是单向好友,这里没做删除,所以数据大小和之前的不一样,用户重新删除一次就好了
$this -> log ( $output , " " );
$this -> log ( $output , " ═════════════════ 步骤1: 迁移用户 ═════════════════ " );
$this -> log ( $output , " 清理旧的数据 " );
$this -> cleanExistingData ( $output ,[
'user'
]);
$user_list = ( new \app\model\Openim\User ()) -> setOption ( 'connection' , 'tettt' )
-> whereNotIn ( 'user_id' , $this -> skipUsers )
-> field ( 'user_id,nickname,face_url' )
-> select ();
$user_list = $user_list -> toArray ();
// 1. 创建进度条(内置核心方法)
$progressBar = new ProgressBar ( $output , count ( $user_list ));
// 可选:设置进度条样式(字符、长度等)
$progressBar -> setBarCharacter ( '█' );
$progressBar -> setEmptyBarCharacter ( '░' );
$progressBar -> setProgressCharacter ( '▶' );
$progressBar -> setBarWidth ( 400 );
// 2. 开始显示
$progressBar -> start ();
echo sprintf ( " \r " );
while ( count ( $user_list ) > 0 ){
$step = 100 ;
$user = array_slice ( $user_list , 0 , $step );
$user_list = array_slice ( $user_list , $step );
$this -> sdk -> user -> userRegister ( $user );
$progressBar -> advance ( $step );
}
// 4. 结束进度条
$progressBar -> finish ();
}
private function migrateFriends ( OutputInterface $output ) : void
{
//之前残留了一部分数据,是单向好友,这里没做删除,所以数据大小和之前的不一样,用户重新删除一次就好了
$this -> log ( $output , " " );
$this -> log ( $output , " ═════════════════ 步骤3: 迁移好友 ═════════════════ " );
$this -> log ( $output , " 清理旧的数据 " );
// $this->cleanExistingData($output,[
// 'conversation', 'conversation_version', // 会话相关集合
// 'data_version', // 数据版本集合
// 'friend', 'friend_request', 'friend_version', // 好友关系相关集合
// 'group', 'group_join_version','group_member','group_member_version','group_request', // 群组相关集合
// 'msg','seq','seq_user' // 消息和序列号相关集合
// ]);
$user_list = ( new \app\model\Openim\User ()) -> setOption ( 'connection' , 'tettt' )
-> whereNotNull ( 'user_id' )
-> column ( 'user_id' );
// 1. 创建进度条(内置核心方法)
$progressBar = new ProgressBar ( $output , count ( $user_list ));
// 可选:设置进度条样式(字符、长度等)
$progressBar -> setBarCharacter ( '█' );
$progressBar -> setEmptyBarCharacter ( '░' );
$progressBar -> setProgressCharacter ( '▶' );
$progressBar -> setBarWidth ( 400 );
// 2. 开始显示
$progressBar -> start ();
foreach ( $user_list as $userID ){
$friend_list = ( new \app\model\Openim\Friend ()) -> setOption ( 'connection' , 'tettt' )
-> where ( 'owner_user_id' , $userID )
-> column ( 'friend_user_id' );
if ( count ( $friend_list )){
while ( count ( $friend_list )){
$_friend_list = array_slice ( $friend_list , 0 , 500 );
$friend_list = array_slice ( $friend_list , 500 );
$this -> sdk -> friend -> importFriend ( $userID , $_friend_list );
}
}
$progressBar -> advance ();
}
// 4. 结束进度条
$progressBar -> finish ();
}
private function migrateGroups ( OutputInterface $output ) : void
{
$this -> log ( $output , " " );
$this -> log ( $output , " ═════════════════ 步骤2: 迁移群组 ═════════════════ " );
$this -> log ( $output , " " );
$options = [];
$groups = $this -> queryOldDb ( 'group' , [], $options );
$this -> stats [ 'groups' ][ 'total' ] = count ( $groups );
$this -> log ( $output , " 📊 找到 { $this -> stats [ 'groups' ][ 'total' ] } 个群组 " );
$processed = 0 ;
// 1. 创建进度条(内置核心方法)
$progressBar = new ProgressBar ( $output , count ( $groups ));
// 可选:设置进度条样式(字符、长度等)
$progressBar -> setBarCharacter ( '█' );
$progressBar -> setEmptyBarCharacter ( '░' );
$progressBar -> setProgressCharacter ( '▶' );
$progressBar -> setBarWidth ( 400 );
// 2. 开始显示
$progressBar -> start ();
foreach ( $groups as $group ) {
$processed ++ ;
$groupID = ( string )( $group [ 'group_id' ] ? ? $group [ 'groupID' ] ? ? '' );
if ( empty ( $groupID ) || in_array ( $groupID , $this -> skipGroups )) {
$this -> stats [ 'groups' ][ 'failed' ] ++ ;
continue ;
}
$ownerUserID = ( string )( $group [ 'owner_user_id' ] ? ? $group [ 'ownerUserID' ] ? ? $group [ 'creator_user_id' ] ? ? $group [ 'creatorUserID' ] ? ? '' );
if ( empty ( $ownerUserID )) {
$this -> stats [ 'groups' ][ 'failed' ] ++ ;
continue ;
}
$groupName = ( string )( $group [ 'group_name' ] ? ? $group [ 'groupName' ] ? ? '' );
$faceURL = ( string )( $group [ 'face_url' ] ? ? $group [ 'faceURL' ] ? ? '' );
$introduction = ( string )( $group [ 'introduction' ] ? ? '' );
$notification = ( string )( $group [ 'notification' ] ? ? '' );
$ex = ( string )( $group [ 'ex' ] ? ? '' );
// 群组设置字段
$groupType = ( int )( $group [ 'group_type' ] ? ? $group [ 'groupType' ] ? ? 2 );
$needVerification = ( int )( $group [ 'need_verification' ] ? ? $group [ 'needVerification' ] ? ? 0 );
$lookMemberInfo = ( int )( $group [ 'look_member_info' ] ? ? $group [ 'lookMemberInfo' ] ? ? 0 );
$applyMemberFriend = ( int )( $group [ 'apply_member_friend' ] ? ? $group [ 'applyMemberFriend' ] ? ? 0 );
$progress = sprintf ( " [群组 %d/%d] " , $processed , $this -> stats [ 'groups' ][ 'total' ]);
if ( $processed % 20 == 0 || $processed == 1 ) {
$this -> log ( $output , " { $progress } 处理中... " );
}
$this -> log ( $output , " { $progress } 尝试创建群组: { $groupID } , 群主: { $ownerUserID } " );
// 管理员信息
$adminUserIDs = ( new \app\model\Openim\GroupMember ()) -> setOption ( 'connection' , 'tettt' )
-> where ( 'group_id' , $groupID )
-> where ( 'role_level' , 60 )
-> column ( 'user_id' );
//cp($adminUserIDs );
// 成员信息
$memberUserIDs = ( new \app\model\Openim\GroupMember ()) -> setOption ( 'connection' , 'tettt' )
-> where ( 'group_id' , $groupID )
-> where ( 'role_level' , 20 )
-> column ( 'user_id' );
//cp($memberUserIDs );
$memberUserIDs = array_unique ( $memberUserIDs );
$_memberUserIDs = array_slice ( $memberUserIDs , 0 , 10 );
$memberUserIDs = array_slice ( $memberUserIDs , 10 );
try {
$this -> sdk -> group -> createGroup (
$ownerUserID ,
$_memberUserIDs ,
$adminUserIDs ,
$groupName ,
$groupID ,
$faceURL ,
$introduction ,
$notification ,
$ex ,
$groupType ,
$needVerification ,
$lookMemberInfo ,
$applyMemberFriend
);
while ( count ( $memberUserIDs )){
$_memberUserIDs = array_slice ( $memberUserIDs , 0 , 10 );
$memberUserIDs = array_slice ( $memberUserIDs , 10 );
try {
$this -> sdk -> group -> inviteUserToGroup ( $groupID , $_memberUserIDs );
} catch ( \Exception $e ) {
$this -> log ( $output , " { $progress } ❌ 邀请成员失败: " . $e -> getMessage ());
}
}
$this -> stats [ 'groups' ][ 'success' ] ++ ;
//$this->log($output, "{$progress} ✅ 创建成功");
} catch ( \Exception $e ) {
$this -> stats [ 'groups' ][ 'failed' ] ++ ;
if ( $e -> getCode () == 1202 || strpos ( $e -> getMessage (), 'GroupIDExisted' ) !== false ) {
$this -> log ( $output , " { $progress } ℹ ️ 群组已存在,跳过创建 " );
$this -> stats [ 'groups' ][ 'success' ] ++ ;
continue ;
} else {
$this -> log ( $output , " { $progress } ❌ 创建失败: " . $e -> getMessage ());
}
}
$progressBar -> advance ();
}
// 4. 结束进度条
$progressBar -> finish ();
}
private function migrateGroupMembers ( OutputInterface $output ) : void
{
$this -> log ( $output , " " );
$this -> log ( $output , " ═════════════════ 步骤3: 迁移群成员 ═════════════════ " );
$this -> log ( $output , " " );
$groups = $this -> queryOldDb ( 'group' , [], [ 'projection' => [ 'group_id' => 1 , 'groupID' => 1 ]]);
$groupIDs = [];
foreach ( $groups as $g ) {
$gid = ( string )( $g [ 'group_id' ] ? ? $g [ 'groupID' ] ? ? '' );
if ( ! empty ( $gid ) && ! in_array ( $gid , $this -> skipGroups )) {
$groupIDs [] = $gid ;
}
}
$totalMembers = 0 ;
foreach ( $groupIDs as $groupID ) {
$members = $this -> queryOldDb ( 'group_member' , [ 'group_id' => $groupID ]);
$ownerUserID = null ;
$adminUserIDs = [];
$memberUserIDs = [];
foreach ( $members as $member ) {
$userID = ( string )( $member [ 'user_id' ] ? ? $member [ 'userID' ] ? ? '' );
if ( empty ( $userID )) continue ;
$roleLevel = ( int )( $member [ 'role_level' ] ? ? $member [ 'roleLevel' ] ? ? 0 );
if ( $roleLevel == 100 ) {
$ownerUserID = $userID ;
} elseif ( $roleLevel == 60 ) {
$adminUserIDs [] = $userID ;
} else {
$memberUserIDs [] = $userID ;
}
}
if ( empty ( $memberUserIDs ) && empty ( $adminUserIDs )) {
continue ;
}
$totalMembers += count ( $memberUserIDs );
$this -> stats [ 'members' ][ 'total' ] += count ( $memberUserIDs );
$progress = sprintf ( " [群 %s 成员 %d] " , $groupID , count ( $memberUserIDs ));
$this -> log ( $output , " { $progress } 处理中... " );
// 分批邀请,每批最多50人
$batches = array_chunk ( $memberUserIDs , 50 );
foreach ( $batches as $batch ) {
if ( empty ( $batch )) {
continue ;
}
$attempts = 0 ;
while ( $attempts < $this -> retry ) {
try {
$this -> log ( $output , " { $progress } 邀请成员: " . implode ( ', ' , array_slice ( $batch , 0 , 5 )) . ( count ( $batch ) > 5 ? '...' : '' ));
$result = $this -> sdk -> group -> inviteUserToGroup ( $groupID , $ownerUserID ? ? 'admin' , $batch );
$this -> log ( $output , " { $progress } API返回: " . json_encode ( $result , JSON_UNESCAPED_UNICODE ));
if ( isset ( $result [ 'errCode' ]) && $result [ 'errCode' ] != 0 ) {
// 检查是否是重复键错误
if ( strpos ( $result [ 'errMsg' ] ? ? '' , 'duplicate key' ) !== false || strpos ( $result [ 'errMsg' ] ? ? '' , 'DuplicateKey' ) !== false ) {
$this -> log ( $output , " { $progress } ℹ ️ 部分成员已存在,跳过 " );
$this -> stats [ 'members' ][ 'success' ] += count ( $batch );
} else {
$this -> stats [ 'members' ][ 'failed' ] += count ( $batch );
$this -> log ( $output , " { $progress } ❌ 邀请失败: " . ( $result [ 'errMsg' ] ? ? '未知错误' ));
}
} else {
$this -> stats [ 'members' ][ 'success' ] += count ( $batch );
$this -> log ( $output , " { $progress } ✅ 邀请成功 " );
}
break ;
} catch ( \Exception $e ) {
$attempts ++ ;
// 检查是否是重复键错误
if ( strpos ( $e -> getMessage (), 'duplicate key' ) !== false || strpos ( $e -> getMessage (), 'DuplicateKey' ) !== false ) {
$this -> log ( $output , " { $progress } ℹ ️ 部分成员已存在,跳过 " );
$this -> stats [ 'members' ][ 'success' ] += count ( $batch );
break ;
} elseif ( $attempts >= $this -> retry ) {
$this -> stats [ 'members' ][ 'failed' ] += count ( $batch );
$this -> log ( $output , " { $progress } ❌ 邀请异常: " . $e -> getMessage ());
} else {
$this -> log ( $output , " { $progress } ⚠️ 邀请失败,第 { $attempts } / { $this -> retry } 次重试... " );
usleep ( 100000 );
}
}
}
usleep ( 10000 );
}
}
$this -> log ( $output , " 📊 共处理 { $totalMembers } 个群成员 " );
}
private function migrateMessages ( OutputInterface $output ) : void
{
$this -> log ( $output , " " );
$this -> log ( $output , " ═════════════════ 步骤4: 迁移消息 ═════════════════ " );
$this -> log ( $output , " " );
$pipeline = [
[ '$unwind' => '$msgs' ],
[ '$match' => [ 'msgs.msg' => [ '$ne' => null ]]],
[ '$sort' => [ 'msgs.msg.send_time' => 1 ]],
];
$pipeline [] = [ '$project' => [ 'doc_id' => 1 , 'msg' => '$msgs.msg' ]];
$command = new \MongoDB\Driver\Command ([
'aggregate' => 'msg' ,
'pipeline' => $pipeline ,
'cursor' => new \stdClass
]);
$cursor = $this -> oldManager -> executeCommand ( 'tettt' , $command );
$messages = [];
foreach ( $cursor as $doc ) {
$messages [] = $this -> bsonToArray ( $doc );
}
$this -> stats [ 'messages' ][ 'total' ] = count ( $messages );
$this -> log ( $output , " 📊 找到 { $this -> stats [ 'messages' ][ 'total' ] } 条消息 " );
$processed = 0 ;
foreach ( $messages as $doc ) {
$processed ++ ;
$msg = $doc [ 'msg' ] ? ? [];
if ( empty ( $msg )) {
$this -> stats [ 'messages' ][ 'skipped' ] ++ ;
continue ;
}
$sendID = ( string )( $msg [ 'send_id' ] ? ? $msg [ 'sendID' ] ? ? '' );
$recvID = ( string )( $msg [ 'recv_id' ] ? ? $msg [ 'recvID' ] ? ? '' );
$groupID = ( string )( $msg [ 'group_id' ] ? ? $msg [ 'groupID' ] ? ? '' );
$contentType = ( int )( $msg [ 'content_type' ] ? ? $msg [ 'contentType' ] ? ? 101 );
$sessionType = ( int )( $msg [ 'session_type' ] ? ? $msg [ 'sessionType' ] ? ? 1 );
if ( in_array ( $sendID , $this -> skipUsers )) {
$this -> stats [ 'messages' ][ 'skipped' ] ++ ;
continue ;
}
if ( $sessionType == 3 && in_array ( $groupID , $this -> skipGroups )) {
$this -> stats [ 'messages' ][ 'skipped' ] ++ ;
continue ;
}
// 跳过特殊消息类型(如系统通知等)
if ( in_array ( $contentType , [ 200 , 201 , 202 , 203 , 204 , 205 ])) {
$this -> stats [ 'messages' ][ 'skipped' ] ++ ;
continue ;
}
$progress = sprintf ( " [消息 %d/%d] " , $processed , $this -> stats [ 'messages' ][ 'total' ]);
if ( $processed % 100 == 0 || $processed == 1 ) {
$this -> log ( $output , " { $progress } 处理中... " );
}
try {
$this -> log ( $output , " { $progress } 发送消息: sendID= { $sendID } , recvID= { $recvID } , groupID= { $groupID } , contentType= { $contentType } , sessionType= { $sessionType } " );
$result = $this -> sendMessage ( $msg );
$this -> log ( $output , " { $progress } API返回: " . json_encode ( $result , JSON_UNESCAPED_UNICODE ));
if ( $result [ 'success' ] ? ? false ) {
$this -> stats [ 'messages' ][ 'success' ] ++ ;
if ( $processed % 100 == 0 ) {
$this -> log ( $output , " { $progress } ✅ 发送成功 " );
}
} else {
$this -> stats [ 'messages' ][ 'failed' ] ++ ;
$this -> log ( $output , " { $progress } ❌ 发送失败: " . ( $result [ 'errMsg' ] ? ? '未知错误' ));
// 遇到NotInGroupYetError时跳过,继续迁移其他消息
if ( strpos (( $result [ 'errMsg' ] ? ? '' ), 'NotInGroupYetError' ) === false ) {
// 遇到其他错误时退出
throw new \Exception ( " 消息发送失败: " . ( $result [ 'errMsg' ] ? ? '未知错误' ));
} else {
$this -> log ( $output , " { $progress } ℹ ️ 跳过NotInGroupYetError错误,继续迁移 " );
}
}
} catch ( \Exception $e ) {
$this -> stats [ 'messages' ][ 'failed' ] ++ ;
$this -> log ( $output , " { $progress } ❌ 发送异常: " . $e -> getMessage ());
// 遇到NotInGroupYetError异常时跳过,继续迁移其他消息
if ( strpos ( $e -> getMessage (), 'NotInGroupYetError' ) === false ) {
// 遇到其他异常时退出
throw $e ;
} else {
$this -> log ( $output , " { $progress } ℹ ️ 跳过NotInGroupYetError异常,继续迁移 " );
}
}
if ( $this -> delay > 0 ) {
usleep ( $this -> delay * 1000 );
}
}
}
private function sendMessage ( array $msg ) : array
{
$sendID = ( string )( $msg [ 'send_id' ] ? ? $msg [ 'sendID' ] ? ? '' );
$recvID = ( string )( $msg [ 'recv_id' ] ? ? $msg [ 'recvID' ] ? ? '' );
$groupID = ( string )( $msg [ 'group_id' ] ? ? $msg [ 'groupID' ] ? ? '' );
$contentType = ( int )( $msg [ 'content_type' ] ? ? $msg [ 'contentType' ] ? ? 101 );
$sessionType = ( int )( $msg [ 'session_type' ] ? ? $msg [ 'sessionType' ] ? ? 1 );
$sendTime = ( int )( $msg [ 'send_time' ] ? ? $msg [ 'sendTime' ] ? ? 0 );
$content = $msg [ 'content' ] ? ? '' ;
$ex = ( string )( $msg [ 'ex' ] ? ? '' );
if ( empty ( $sendID )) {
return [ 'success' => false , 'errMsg' => 'sendID为空' ];
}
$contentData = $this -> parseContent ( $content , $contentType );
// 构建消息数据
$messageData = [
'content' => $contentData ,
'contentType' => $contentType ,
'sendTime' => $sendTime ,
'ex' => $ex ,
'isOnlineOnly' => false ,
'notOfflinePush' => true
];
// 根据会话类型调用不同的发送方法
if ( $sessionType == 1 && ! empty ( $recvID )) {
// 单聊
$result = $this -> sdk -> message -> sendSingleMessage ( $sendID , $recvID , $messageData );
} elseif ( ! empty ( $groupID )) {
// 群聊
$result = $this -> sdk -> message -> sendGroupMessage ( $sendID , $groupID , $messageData );
} else {
return [ 'success' => false , 'errMsg' => '缺少必要的参数' ];
}
return [
'success' => ! ( $result [ 'errCode' ] ? ? 0 ),
'errMsg' => $result [ 'errMsg' ] ? ? ''
];
}
private function parseContent ( $content , int $contentType ) : array
{
if ( is_string ( $content )) {
$decoded = json_decode ( $content , true );
if ( json_last_error () === JSON_ERROR_NONE && is_array ( $decoded )) {
return $decoded ;
}
return [ 'content' => $content , 'text' => $content ];
}
if ( is_array ( $content )) {
return $content ;
}
return [ 'content' => '' , 'text' => '' ];
}
private function bsonToArray ( $data ) : array
{
if ( $data instanceof \MongoDB\Model\BSONArray ) {
return $data -> getArrayCopy ();
}
if ( $data instanceof \MongoDB\Model\BSONDocument ) {
return $data -> getArrayCopy ();
}
if ( is_object ( $data )) {
return json_decode ( json_encode ( $data ), true );
}
return is_array ( $data ) ? $data : [];
}
private function printStats ( OutputInterface $output ) : void
{
$this -> log ( $output , " " );
$this -> log ( $output , " ╔════════════════════════════════════════════════════════════╗ " );
$this -> log ( $output , " ║ 迁移统计报告 ║ " );
$this -> log ( $output , " ╠════════════════════════════════════════════════════════════╣ " );
$this -> log ( $output , " ║ 用户: 总数 { $this -> stats [ 'users' ][ 'total' ] } , 成功 { $this -> stats [ 'users' ][ 'success' ] } , 失败 { $this -> stats [ 'users' ][ 'failed' ] } " );
$this -> log ( $output , " ║ 群组: 总数 { $this -> stats [ 'groups' ][ 'total' ] } , 成功 { $this -> stats [ 'groups' ][ 'success' ] } , 失败 { $this -> stats [ 'groups' ][ 'failed' ] } " );
$this -> log ( $output , " ║ 成员: 总数 { $this -> stats [ 'members' ][ 'total' ] } , 成功 { $this -> stats [ 'members' ][ 'success' ] } , 失败 { $this -> stats [ 'members' ][ 'failed' ] } " );
$this -> log ( $output , " ║ 消息: 总数 { $this -> stats [ 'messages' ][ 'total' ] } , 成功 { $this -> stats [ 'messages' ][ 'success' ] } , 失败 { $this -> stats [ 'messages' ][ 'failed' ] } , 跳过 { $this -> stats [ 'messages' ][ 'skipped' ] } " );
$this -> log ( $output , " ╚════════════════════════════════════════════════════════════╝ " );
}
private function log ( OutputInterface $output , string $message ) : void
{
$output -> writeln ( $message );
}
/**
* 获取OpenIM SDK实例
*
* @return object
*/
function getSdk ()
{
if ( $this -> sdk ) {
return $this -> sdk ;
}
$this -> sdk = new \support\OpenImSdk\Client ([
'host' => config ( 'openim.server' ),
'secret' => config ( 'openim.secret' ),
]);
return $this -> sdk ;
}
private function initConnections ( OutputInterface $output ) : void
{
$this -> log ( $output , " 正在初始化连接... " );
$this -> getSdk ();
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/tettt?authSource=admin' ;
$this -> oldManager = new \MongoDB\Driver\Manager ( $uri );
$this -> log ( $output , " ✅ 连接成功 " );
}
private function cleanExistingData ( OutputInterface $output , $collections = []) : void
{
// 记录开始清理数据的日志信息
$this -> log ( $output , " \n ═════════════════ 清理现有数据 ═════════════════ " );
$this -> log ( $output , " " );
// 构建新数据库(OpenIM v3)的MongoDB连接URI
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin' ;
// 创建MongoDB驱动管理器实例,用于操作新数据库
$this -> newManager = new \MongoDB\Driver\Manager ( $uri );
try {
// 记录开始清理数据的状态
$this -> log ( $output , " 正在清理mongodb数据... " );
// 定义需要清空的数据集合列表(保留user集合,清空其他所有业务数据)
// 遍历所有需要清理的集合,逐个执行清空操作
foreach ( $collections as $collection ) {
try {
// 创建批量写入操作对象
$bulk = new \MongoDB\Driver\BulkWrite ;
// 添加删除所有文档的操作(空条件表示删除全部)
$bulk -> delete ([]);
// 执行批量删除操作,指定数据库和集合名称
$this -> newManager -> executeBulkWrite ( 'openim_v3.' . $collection , $bulk );
// 记录该集合清空成功的日志
$this -> log ( $output , " 已清空集合: { $collection } " );
} catch ( \Exception $e ) {
// 单个集合清空失败时记录警告信息,不影响其他集合的清理
$this -> log ( $output , " ⚠️ 清空集合 { $collection } 失败: " . $e -> getMessage ());
}
}
$this -> log ( $output , " 正在清理redis数据... " );
$redis = new \Redis ();
$host = '127.0.0.1' ;
$port = 16379 ;
$password = 'n1e5a6s6m7' ;
$output -> writeln ( " 连接 Redis: { $host } : { $port } " );
if ( $redis -> connect ( $host , $port )) {
if ( ! empty ( $password )) {
$redis -> auth ( $password );
}
$result = $redis -> flushAll ();
if ( $result ) {
$output -> writeln ( " ✅ Redis 清空成功 " );
} else {
$output -> writeln ( " ❌ Redis 清空失败 " );
}
} else {
$output -> writeln ( " ❌ 无法连接到 Redis " );
}
// 记录所有数据清理完成的日志
$this -> log ( $output , " ✅ 数据清理完成 " );
} catch ( \Exception $e ) {
// 捕获整体清理过程中的异常,记录错误但不抛出,确保程序继续执行
$this -> log ( $output , " ❌ 清理数据失败: " . $e -> getMessage ());
// 不抛出异常,继续执行
}
}
private function queryOldDb ( string $collection , array $filter = [], array $options = []) : array
{
$query = new \MongoDB\Driver\Query ( $filter , $options );
$cursor = $this -> oldManager -> executeQuery ( 'tettt.' . $collection , $query );
$result = [];
foreach ( $cursor as $doc ) {
$result [] = $this -> bsonToArray ( $doc );
}
return $result ;
}
/**
* 备份MongoDB数据
* @param OutputInterface $output
* @param string $step
* @return string
*/
private function backupMongoDB ( OutputInterface $output , string $step ) : string
{
$this -> log ( $output , " ═════════════════ 备份MongoDB数据 ═════════════════ " );
// 确保备份目录存在
if ( ! is_dir ( $this -> backupDir )) {
mkdir ( $this -> backupDir , 0755 , true );
}
// 生成备份文件名
$timestamp = date ( 'YmdHis' );
$backupFile = " { $this -> backupDir } /openim_v3_ { $step } _ { $timestamp } .json " ;
try {
// 使用现有的新数据库连接
if ( ! $this -> newManager ) {
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin' ;
$this -> newManager = new \MongoDB\Driver\Manager ( $uri );
}
// 获取所有集合
$command = new \MongoDB\Driver\Command ([ 'listCollections' => 1 ]);
$cursor = $this -> newManager -> executeCommand ( 'openim_v3' , $command );
$collections = $cursor -> toArray ();
//$this->log($output, "找到 " . count($collections) . " 个集合");
$backupData = [];
// 备份每个集合
foreach ( $collections as $collection ) {
$collectionName = $collection -> name ;
if ( in_array ( $collectionName , [ 'system.indexes' , 'system.profile' ])) {
continue ;
}
//$this->log($output, "备份集合: {$collectionName}");
$query = new \MongoDB\Driver\Query ([]);
$cursor = $this -> newManager -> executeQuery ( 'openim_v3.' . $collectionName , $query );
$documents = [];
foreach ( $cursor as $doc ) {
$document = $this -> bsonToArray ( $doc );
// 处理ObjectId
if ( isset ( $document [ '_id' ]) && is_array ( $document [ '_id' ]) && isset ( $document [ '_id' ][ '$oid' ])) {
$document [ '_id' ] = $document [ '_id' ][ '$oid' ];
}
$documents [] = $document ;
}
if ( ! empty ( $documents )) {
$backupData [ $collectionName ] = $documents ;
//$this->log($output, " - 备份了 " . count($documents) . " 条记录");
}
}
// 保存备份文件
file_put_contents ( $backupFile , json_encode ( $backupData , JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT ));
$this -> log ( $output , " ✅ 备份成功: { $backupFile } ( " . filesize ( $backupFile ) . " 字节) " );
$this -> currentBackup = $backupFile ;
return $backupFile ;
} catch ( \Exception $e ) {
$this -> log ( $output , " ❌ 备份失败: " . $e -> getMessage ());
return '' ;
}
}
/**
* 回滚MongoDB数据
* @param OutputInterface $output
* @param string $backupFile
* @return bool
*/
private function restoreMongoDB ( OutputInterface $output , string $backupFile ) : bool
{
$this -> log ( $output , " ═════════════════ 回滚MongoDB数据 ═════════════════ " );
if ( ! file_exists ( $backupFile )) {
$this -> log ( $output , " ❌ 备份文件不存在: { $backupFile } " );
return false ;
}
try {
// 读取备份文件
$backupData = json_decode ( file_get_contents ( $backupFile ), true );
if ( empty ( $backupData )) {
$this -> log ( $output , " ❌ 备份文件为空 " );
return false ;
}
// 连接到新数据库
$uri = 'mongodb://commie:n1e5a6s6m7@127.0.0.1:37017/openim_v3?authSource=admin' ;
$manager = new \MongoDB\Driver\Manager ( $uri );
// 清空所有集合
$this -> log ( $output , " 清空现有数据... " );
foreach ( array_keys ( $backupData ) as $collectionName ) {
$bulk = new \MongoDB\Driver\BulkWrite ;
$bulk -> delete ([]);
$manager -> executeBulkWrite ( 'openim_v3.' . $collectionName , $bulk );
}
// 恢复数据
foreach ( $backupData as $collectionName => $documents ) {
$documentCount = count ( $documents );
$this -> log ( $output , " 恢复集合: { $collectionName } ( { $documentCount } 条记录) " );
$bulk = new \MongoDB\Driver\BulkWrite ;
foreach ( $documents as $document ) {
// 处理_id字段
if ( isset ( $document [ '_id' ]) && is_string ( $document [ '_id' ])) {
// 尝试创建ObjectId
try {
$document [ '_id' ] = new \MongoDB\BSON\ObjectId ( $document [ '_id' ]);
} catch ( \Exception $e ) {
// 如果不是有效的ObjectId格式,保持原样
}
}
$bulk -> insert ( $document );
}
$result = $manager -> executeBulkWrite ( 'openim_v3.' . $collectionName , $bulk );
$this -> log ( $output , " 恢复成功: { $result -> getInsertedCount () } 条记录 " );
}
$this -> log ( $output , " ✅ 回滚成功 " );
return true ;
} catch ( \Exception $e ) {
$this -> log ( $output , " ❌ 回滚失败: " . $e -> getMessage ());
return false ;
}
}
}