PDO::ERRMODE_EXCEPTION, PDO::ATTR_TIMEOUT => 5 ]); // Create table if missing $db->exec("CREATE TABLE IF NOT EXISTS admin.wevia_tasks ( task_id VARCHAR(64) PRIMARY KEY, type VARCHAR(32), status VARCHAR(16) DEFAULT 'pending', input TEXT, output TEXT DEFAULT '', progress INT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), started_at TIMESTAMP, finished_at TIMESTAMP, session VARCHAR(64) )"); $db->exec("CREATE INDEX IF NOT EXISTS idx_wevia_tasks_status ON admin.wevia_tasks(status)"); $db->exec("CREATE INDEX IF NOT EXISTS idx_wevia_tasks_created ON admin.wevia_tasks(created_at)"); } catch (Throwable $e) { http_response_code(500); echo json_encode(['err'=>'pg_init_failed', 'msg'=>$e->getMessage()]); exit; } $path = $_GET['path'] ?? ''; $parts = array_values(array_filter(explode('/', $path))); $action = $parts[0] ?? 'list'; $task_id = $parts[1] ?? null; // CREATE task if ($action === 'create' && $_SERVER['REQUEST_METHOD'] === 'POST') { $raw = file_get_contents('php://input'); $d = json_decode($raw, true) ?: []; $type = substr($d['type'] ?? 'generic', 0, 32); $input = (string)($d['input'] ?? ''); $cmd = (string)($d['cmd'] ?? ''); $session = substr($d['session'] ?? 'anon', 0, 64); $tid = 'task_' . date('YmdHis') . '_' . bin2hex(random_bytes(4)); $stmt = $db->prepare("INSERT INTO admin.wevia_tasks (task_id, type, status, input, session, started_at) VALUES (?, ?, 'running', ?, ?, NOW())"); $stmt->execute([$tid, $type, $input, $session]); $logf = "$LOG_DIR/$tid.log"; if ($cmd) { $allowed = ['ls', 'find', 'grep', 'cat', 'head', 'tail', 'wc', 'curl', 'php8.4', '/opt/weval-ops/top-ia/', 'psql', 'redis-cli', 'sqlite3']; $ok = false; foreach ($allowed as $a) { if (strpos($cmd, $a) !== false) { $ok = true; break; } } if (!$ok) { $db->prepare("UPDATE admin.wevia_tasks SET status='rejected', output='cmd_not_whitelisted', finished_at=NOW() WHERE task_id=?")->execute([$tid]); echo json_encode(['task_id'=>$tid, 'status'=>'rejected', 'err'=>'cmd_not_whitelisted']); exit; } // Fire async + update status done when finished $escaped_tid = escapeshellarg($tid); $bg = "nohup bash -c 'echo === START === >> $logf 2>&1; ($cmd) >> $logf 2>&1; echo === END === >> $logf 2>&1; PGPASSWORD=admin123 psql -h 10.1.0.3 -p 5432 -U admin -d adx_system -c \"UPDATE admin.wevia_tasks SET status=\\\"done\\\", finished_at=NOW() WHERE task_id='$tid'\" > /dev/null 2>&1' > /dev/null 2>&1 &"; @shell_exec($bg); } echo json_encode([ 'task_id' => $tid, 'status' => 'running', 'stream_url' => "/api/opus5-task-stream.php?path=stream/$tid", 'status_url' => "/api/opus5-task-stream.php?path=status/$tid", 'log_file' => $logf ]); exit; } // STREAM via SSE if ($action === 'stream' && $task_id) { header('Content-Type: text/event-stream'); header('Cache-Control: no-cache'); header('X-Accel-Buffering: no'); @ini_set('output_buffering', '0'); @ini_set('zlib.output_compression', 0); $last_pos = (int)($_SERVER['HTTP_LAST_EVENT_ID'] ?? 0); $logf = "$LOG_DIR/$task_id.log"; $max_duration = 60; $start = time(); $event_id = $last_pos; while (time() - $start < $max_duration) { $stmt = $db->prepare("SELECT * FROM admin.wevia_tasks WHERE task_id=?"); $stmt->execute([$task_id]); $task = $stmt->fetch(PDO::FETCH_ASSOC); if (!$task) { echo "event: error\ndata: " . json_encode(['err'=>'not_found']) . "\n\n"; @ob_flush(); @flush(); break; } if (file_exists($logf)) { $size = filesize($logf); if ($size > $last_pos) { $fh = fopen($logf, 'r'); fseek($fh, $last_pos); $chunk = fread($fh, min($size - $last_pos, 8192)); fclose($fh); $last_pos = $size; $event_id++; echo "id: $event_id\nevent: log\ndata: " . json_encode(['chunk'=>$chunk, 'offset'=>$last_pos]) . "\n\n"; @ob_flush(); @flush(); } } $event_id++; echo "id: $event_id\nevent: status\ndata: " . json_encode(['status'=>$task['status'], 'progress'=>(int)$task['progress']]) . "\n\n"; @ob_flush(); @flush(); if (in_array($task['status'], ['done', 'failed', 'rejected'])) { echo "id: " . ($event_id+1) . "\nevent: end\ndata: " . json_encode(['task_id'=>$task_id, 'final'=>$task['status']]) . "\n\n"; @ob_flush(); @flush(); break; } sleep(1); } exit; } // STATUS if ($action === 'status' && $task_id) { $stmt = $db->prepare("SELECT * FROM admin.wevia_tasks WHERE task_id=?"); $stmt->execute([$task_id]); $task = $stmt->fetch(PDO::FETCH_ASSOC); if (!$task) { http_response_code(404); echo json_encode(['err'=>'not_found']); exit; } echo json_encode($task); exit; } // LIST if ($action === 'list') { $limit = min(50, (int)($_GET['limit'] ?? 20)); $status_filter = preg_replace('/[^a-z_]/', '', $_GET['status'] ?? ''); $sql = "SELECT task_id, type, status, progress, created_at, finished_at, session FROM admin.wevia_tasks"; if ($status_filter) $sql .= " WHERE status='$status_filter'"; $sql .= " ORDER BY created_at DESC LIMIT $limit"; $rows = $db->query($sql)->fetchAll(PDO::FETCH_ASSOC); echo json_encode(['count'=>count($rows), 'tasks'=>$rows, 'doctrine'=>'69 v2 — task stream PG admin.wevia_tasks + SSE']); exit; } http_response_code(400); echo json_encode(['err'=>'unknown_action', 'available'=>['create','stream/{id}','status/{id}','list']]);