addOption('execute', 'e', InputOption::VALUE_NONE, '实际执行修复(默认只预览)'); $this->addOption('backup-dir', 'b', InputOption::VALUE_OPTIONAL, '备份目录', '/tmp/openim_seq_backup'); $this->addOption('step', 's', InputOption::VALUE_OPTIONAL, '执行步骤: analyze, backup, fix, all', 'all'); } protected function execute(InputInterface $input, OutputInterface $output): int { $this->dryRun = !$input->getOption('execute'); $this->backupDir = $input->getOption('backup-dir'); $step = $input->getOption('step'); if ($this->dryRun) { $this->log("╔════════════════════════════════════════════════════════════╗"); $this->log("║ 预览模式(不会修改数据) ║"); $this->log("║ 使用 --execute 参数实际执行修复 ║"); $this->log("╚════════════════════════════════════════════════════════════╝"); } else { $this->log("╔════════════════════════════════════════════════════════════╗"); $this->log("║ 执行模式 - 开始修复 ║"); $this->log("╚════════════════════════════════════════════════════════════╝"); } $this->log(""); $this->log("【OpenIM seq 设计原理】"); $this->log(" - seq 表: 存储每个会话的全局 max_seq/min_seq"); $this->log(" - seq_user 表: 存储每个用户在每个会话中的 max_seq/min_seq/read_seq"); $this->log(" - conversation 表: 会话信息,冗余存储 max_seq/min_seq"); $this->log(" - read_seq 不能大于 max_seq(否则未读数变负)"); $this->log(""); switch ($step) { case 'analyze': $this->analyzeAll(); break; case 'backup': $this->backupAll(); break; case 'fix': if (empty($this->conversationSeqMap)) { $this->log("需要先分析消息数据..."); $this->analyzeMsgSeq(); } $this->fixAll(); break; default: $this->analyzeAll(); if (!$this->dryRun) { $this->backupAll(); $this->fixAll(); } } return 0; } private function getConversationId(array $msg): ?string { $sessionType = $msg['session_type'] ?? null; $sendId = $msg['send_id'] ?? ''; $recvId = $msg['recv_id'] ?? ''; $groupId = $msg['group_id'] ?? ''; if ($sessionType === 3 || !empty($groupId)) { return 'sg_' . $groupId; } if ($sessionType === 1 || ($sendId && $recvId)) { $ids = [(int)$sendId, (int)$recvId]; sort($ids); return 'si_' . $ids[0] . '_' . $ids[1]; } return null; } private function analyzeAll(): void { $this->log("═══════════════════════════════════════════════════════════"); $this->log(" 第一步:分析数据 "); $this->log("═══════════════════════════════════════════════════════════"); $this->log(""); $this->log("【1. seq 表分析】"); $seqCount = \app\model\Openim\Seq::count(); $this->log(" 总记录数: {$seqCount}"); $seqSample = \app\model\Openim\Seq::limit(5)->select()->toArray(); foreach ($seqSample as $i => $row) { $this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, max_seq={$row['max_seq']}, min_seq={$row['min_seq']}"); } $this->log(""); $this->log("【2. seq_user 表分析】"); $seqUserCount = \app\model\Openim\SeqUser::count(); $this->log(" 总记录数: {$seqUserCount}"); $seqUserSample = \app\model\Openim\SeqUser::limit(5)->select()->toArray(); foreach ($seqUserSample as $i => $row) { $this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, user_id={$row['user_id']}, max_seq={$row['max_seq']}, min_seq={$row['min_seq']}, read_seq={$row['read_seq']}"); } $this->log(""); $this->log("【3. conversation 表分析】"); $conversationCount = \app\model\Openim\Conversation::count(); $this->log(" 总记录数: {$conversationCount}"); $conversationSample = \app\model\Openim\Conversation::limit(5)->select()->toArray(); foreach ($conversationSample as $i => $row) { $this->log(" 样本[{$i}]: conversation_id={$row['conversation_id']}, owner_user_id={$row['owner_user_id']}, max_seq=" . ($row['max_seq'] ?? 'null') . ", min_seq=" . ($row['min_seq'] ?? 'null')); } $this->log(""); $this->log("【4. msg 表分析 - 统计每个会话的 seq 范围】"); $this->analyzeMsgSeq(); $this->log(""); } private function analyzeMsgSeq(): void { $msgCount = \app\model\Openim\Msg::count(); $this->log(" 消息文档数: {$msgCount}"); $this->log(" 正在分析消息中的 seq 范围..."); $sampleDoc = \app\model\Openim\Msg::limit(1)->select()->first(); if ($sampleDoc) { $this->log(" 样本文档 doc_id: " . ($sampleDoc['doc_id'] ?? 'null')); $msgsArray = $sampleDoc['msgs']; if ($msgsArray instanceof \think\model\Collection) { $msgsArray = $msgsArray->toArray(); } $this->log(" 样本文档 msgs 数量: " . count($msgsArray)); if (count($msgsArray) > 0) { $firstMsg = $msgsArray[0]; if (isset($firstMsg['msg'])) { $msg = $firstMsg['msg']; $this->log(" 第一条消息 session_type: " . ($msg['session_type'] ?? 'null')); $this->log(" 第一条消息 send_id: " . ($msg['send_id'] ?? 'null')); $this->log(" 第一条消息 recv_id: " . ($msg['recv_id'] ?? 'null')); $this->log(" 第一条消息 group_id: " . ($msg['group_id'] ?? 'null')); $this->log(" 第一条消息 seq: " . ($msg['seq'] ?? 'null')); $conversationId = $this->getConversationId($msg); $this->log(" 计算得到的 conversationID: {$conversationId}"); } } } $this->log(""); $processedDocs = 0; $totalMsgs = 0; $msgs = \app\model\Openim\Msg::select(); foreach ($msgs as $doc) { $processedDocs++; $msgsArray = $doc['msgs']; if ($msgsArray instanceof \think\model\Collection) { $msgsArray = $msgsArray->toArray(); } if (!empty($msgsArray) && is_array($msgsArray)) { foreach ($msgsArray as $msgItem) { if (isset($msgItem['msg'])) { $msg = $msgItem['msg']; $conversationId = $this->getConversationId($msg); $seq = $msg['seq'] ?? null; if ($conversationId && $seq !== null) { $totalMsgs++; if (!isset($this->conversationSeqMap[$conversationId])) { $this->conversationSeqMap[$conversationId] = [ 'min_seq' => $seq, 'max_seq' => $seq, 'count' => 0 ]; } $this->conversationSeqMap[$conversationId]['min_seq'] = min($this->conversationSeqMap[$conversationId]['min_seq'], $seq); $this->conversationSeqMap[$conversationId]['max_seq'] = max($this->conversationSeqMap[$conversationId]['max_seq'], $seq); $this->conversationSeqMap[$conversationId]['count']++; } } } } if ($processedDocs % 100 == 0) { $this->log(" 已处理 {$processedDocs}/{$msgCount} 个文档..."); } } $this->log(" 处理完成!共处理 {$processedDocs} 个文档,{$totalMsgs} 条消息"); $this->log(" 发现 " . count($this->conversationSeqMap) . " 个会话"); $this->log(""); $this->log("【5. 会话 seq 范围统计(前10个)】"); $i = 0; foreach ($this->conversationSeqMap as $conversationId => $seqInfo) { if ($i++ >= 10) break; $this->log(" {$conversationId}: min_seq={$seqInfo['min_seq']}, max_seq={$seqInfo['max_seq']}, msg_count={$seqInfo['count']}"); } $this->log(""); $this->log("【6. 检测问题数据】"); $this->detectProblematicData(); } private function detectProblematicData(): void { $seqProblems = 0; $seqUserProblems = 0; $conversationProblems = 0; $this->log(" 正在检测 seq 表..."); $total = \app\model\Openim\Seq::count(); $batchSize = 1000; for ($offset = 0; $offset < $total; $offset += $batchSize) { $seqRecords = \app\model\Openim\Seq::field('conversation_id,max_seq,min_seq')->limit($offset, $batchSize)->select()->toArray(); foreach ($seqRecords as $seq) { $conversationId = $seq['conversation_id']; if (isset($this->conversationSeqMap[$conversationId])) { $expected = $this->conversationSeqMap[$conversationId]; if ($seq['max_seq'] != $expected['max_seq'] || $seq['min_seq'] != $expected['min_seq']) { $seqProblems++; } } } $seqRecords = null; gc_collect_cycles(); } $this->log(" seq 表问题记录: {$seqProblems} 条(max_seq/min_seq 与实际消息不符)"); $this->log(" 正在检测 seq_user 表..."); $total = \app\model\Openim\SeqUser::count(); for ($offset = 0; $offset < $total; $offset += $batchSize) { $seqUserRecords = \app\model\Openim\SeqUser::field('conversation_id,user_id,max_seq,min_seq,read_seq')->limit($offset, $batchSize)->select()->toArray(); foreach ($seqUserRecords as $seqUser) { $conversationId = $seqUser['conversation_id']; if (isset($this->conversationSeqMap[$conversationId])) { $expected = $this->conversationSeqMap[$conversationId]; if ($seqUser['max_seq'] != $expected['max_seq'] || $seqUser['min_seq'] != $expected['min_seq'] || $seqUser['read_seq'] > $expected['max_seq']) { $seqUserProblems++; } } } $seqUserRecords = null; gc_collect_cycles(); } $this->log(" seq_user 表问题记录: {$seqUserProblems} 条(seq 不符或 read_seq > max_seq)"); $this->log(" 正在检测 conversation 表..."); $total = \app\model\Openim\Conversation::count(); for ($offset = 0; $offset < $total; $offset += $batchSize) { $conversationRecords = \app\model\Openim\Conversation::field('conversation_id,max_seq,min_seq')->limit($offset, $batchSize)->select()->toArray(); foreach ($conversationRecords as $conversation) { $conversationId = $conversation['conversation_id']; if (isset($this->conversationSeqMap[$conversationId])) { $expected = $this->conversationSeqMap[$conversationId]; $currentMax = $conversation['max_seq'] ?? 0; $currentMin = $conversation['min_seq'] ?? 0; if ($currentMax != $expected['max_seq'] || $currentMin != $expected['min_seq']) { $conversationProblems++; } } } $conversationRecords = null; gc_collect_cycles(); } $this->log(" conversation 表问题记录: {$conversationProblems} 条(max_seq/min_seq 与实际消息不符)"); $this->log(""); } private function backupAll(): void { $this->log("═══════════════════════════════════════════════════════════"); $this->log(" 第二步:备份数据 "); $this->log("═══════════════════════════════════════════════════════════"); $this->log(""); if (!is_dir($this->backupDir)) { mkdir($this->backupDir, 0755, true); } $timestamp = date('Ymd_His'); $backupPath = $this->backupDir . '/backup_' . $timestamp; mkdir($backupPath, 0755, true); $this->log("备份目录: {$backupPath}"); $this->log(""); $this->log("【1. 备份 seq 表】"); $total = \app\model\Openim\Seq::count(); $batchSize = 1000; $seqData = []; for ($offset = 0; $offset < $total; $offset += $batchSize) { $batch = \app\model\Openim\Seq::limit($offset, $batchSize)->select()->toArray(); $seqData = array_merge($seqData, $batch); $batch = null; gc_collect_cycles(); } file_put_contents($backupPath . '/seq.json', json_encode($seqData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); $this->log(" 已备份 " . count($seqData) . " 条记录 -> seq.json"); $seqData = null; $this->log("【2. 备份 seq_user 表】"); $total = \app\model\Openim\SeqUser::count(); $seqUserData = []; for ($offset = 0; $offset < $total; $offset += $batchSize) { $batch = \app\model\Openim\SeqUser::limit($offset, $batchSize)->select()->toArray(); $seqUserData = array_merge($seqUserData, $batch); $batch = null; gc_collect_cycles(); } file_put_contents($backupPath . '/seq_user.json', json_encode($seqUserData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); $this->log(" 已备份 " . count($seqUserData) . " 条记录 -> seq_user.json"); $seqUserData = null; $this->log("【3. 备份 conversation 表】"); $total = \app\model\Openim\Conversation::count(); $conversationData = []; for ($offset = 0; $offset < $total; $offset += $batchSize) { $batch = \app\model\Openim\Conversation::limit($offset, $batchSize)->select()->toArray(); $conversationData = array_merge($conversationData, $batch); $batch = null; gc_collect_cycles(); } file_put_contents($backupPath . '/conversation.json', json_encode($conversationData, JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT)); $this->log(" 已备份 " . count($conversationData) . " 条记录 -> conversation.json"); $conversationData = null; $this->log(""); $this->log("✓ 备份完成!备份文件保存在: {$backupPath}"); $this->log(""); } private function fixAll(): void { $this->log("═══════════════════════════════════════════════════════════"); $this->log(" 第三步:修复数据 "); $this->log("═══════════════════════════════════════════════════════════"); $this->log(""); if (empty($this->conversationSeqMap)) { $this->log("错误:缺少会话 seq 数据,请先运行分析步骤"); return; } $this->log("【1. 修复 seq 表】"); $this->log(" 原理:根据消息表中的实际 seq 更新 max_seq 和 min_seq"); $seqFixed = 0; $seqCreated = 0; foreach ($this->conversationSeqMap as $conversationId => $seqInfo) { $existing = \app\model\Openim\Seq::where('conversation_id', $conversationId)->find(); if ($existing) { $oldMax = $existing['max_seq']; $oldMin = $existing['min_seq']; if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) { $existing->max_seq = $seqInfo['max_seq']; $existing->min_seq = $seqInfo['min_seq']; $existing->save(); $this->log(" 更新 {$conversationId}: max_seq {$oldMax} -> {$seqInfo['max_seq']}, min_seq {$oldMin} -> {$seqInfo['min_seq']}"); $seqFixed++; } } else { \app\model\Openim\Seq::create([ 'conversation_id' => $conversationId, 'max_seq' => $seqInfo['max_seq'], 'min_seq' => $seqInfo['min_seq'], ]); $this->log(" 创建 {$conversationId}: max_seq={$seqInfo['max_seq']}, min_seq={$seqInfo['min_seq']}"); $seqCreated++; } } $this->log(" 完成:更新 {$seqFixed} 条,新建 {$seqCreated} 条"); $this->log(""); $this->log("【2. 修复 seq_user 表】"); $this->log(" 原理:更新 max_seq/min_seq 为会话值,确保 read_seq <= max_seq"); $seqUserFixed = 0; $seqUserReadFixed = 0; $total = \app\model\Openim\SeqUser::count(); $batchSize = 500; for ($offset = 0; $offset < $total; $offset += $batchSize) { $seqUsers = \app\model\Openim\SeqUser::limit($offset, $batchSize)->select(); foreach ($seqUsers as $seqUser) { $conversationId = $seqUser['conversation_id']; if (isset($this->conversationSeqMap[$conversationId])) { $seqInfo = $this->conversationSeqMap[$conversationId]; $oldMax = $seqUser['max_seq']; $oldMin = $seqUser['min_seq']; $oldRead = $seqUser['read_seq']; $needSave = false; if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) { $seqUser->max_seq = $seqInfo['max_seq']; $seqUser->min_seq = $seqInfo['min_seq']; $needSave = true; } if ($oldRead > $seqInfo['max_seq']) { $seqUser->read_seq = $seqInfo['max_seq']; $this->log(" 修正 read_seq {$conversationId}/{$seqUser['user_id']}: {$oldRead} -> {$seqInfo['max_seq']}"); $seqUserReadFixed++; $needSave = true; } if ($needSave) { $seqUser->save(); $seqUserFixed++; } } } $seqUsers = null; gc_collect_cycles(); } $this->log(" 完成:更新 {$seqUserFixed} 条,其中修正 read_seq {$seqUserReadFixed} 条"); $this->log(""); $this->log("【3. 修复 conversation 表】"); $this->log(" 原理:根据消息表中的实际 seq 更新 max_seq 和 min_seq"); $conversationFixed = 0; $conversationNotInMap = 0; $total = \app\model\Openim\Conversation::count(); $debugLogged = false; for ($offset = 0; $offset < $total; $offset += $batchSize) { $conversations = \app\model\Openim\Conversation::limit($offset, $batchSize)->select(); foreach ($conversations as $conversation) { $conversationId = $conversation['conversation_id']; if (isset($this->conversationSeqMap[$conversationId])) { $seqInfo = $this->conversationSeqMap[$conversationId]; $oldMax = $conversation['max_seq'] ?? 0; $oldMin = $conversation['min_seq'] ?? 0; if ($oldMax != $seqInfo['max_seq'] || $oldMin != $seqInfo['min_seq']) { $data = [ 'max_seq' => (int)$seqInfo['max_seq'], 'min_seq' => (int)$seqInfo['min_seq'], ]; $result = \app\model\Openim\Conversation::where('conversation_id', $conversationId) ->where('owner_user_id', $conversation['owner_user_id']) ->update($data); if ($result) { $conversationFixed++; } else { $this->log(" 警告: 更新失败 {$conversationId} / {$conversation['owner_user_id']}"); } } } else { $conversationNotInMap++; } } $conversations = null; gc_collect_cycles(); } $this->log(" 完成:更新 {$conversationFixed} 条,不在消息映射中 {$conversationNotInMap} 条"); $this->log(""); $this->log("═══════════════════════════════════════════════════════════"); $this->log(" 修复完成! "); $this->log("═══════════════════════════════════════════════════════════"); $this->log(""); $this->log("修复统计:"); $this->log(" - seq 表: 更新 {$seqFixed} 条,新建 {$seqCreated} 条"); $this->log(" - seq_user 表: 更新 {$seqUserFixed} 条,修正 read_seq {$seqUserReadFixed} 条"); $this->log(" - conversation 表: 更新 {$conversationFixed} 条"); } private function log(string $message): void { echo $message . "\n"; } }