155 lines
6.2 KiB
PHP
155 lines
6.2 KiB
PHP
<?php
|
|
// OPUS5 — Task Stream v2 — PostgreSQL au lieu de SQLite (FPM manque pdo_sqlite)
|
|
// Doctrine 69 v2 — PG S95 table admin.wevia_tasks
|
|
header('Content-Type: application/json');
|
|
|
|
$PG_DSN = 'pgsql:host=10.1.0.3;port=5432;dbname=adx_system;user=admin;password=admin123';
|
|
$LOG_DIR = '/var/log/weval/tasks';
|
|
if (!file_exists($LOG_DIR)) @mkdir($LOG_DIR, 0775, true);
|
|
|
|
try {
|
|
$db = new PDO($PG_DSN, null, null, [
|
|
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
|
|
PDO::ATTR_TIMEOUT => 5
|
|
]);
|
|
// Create table if missing
|
|
$db->exec("CREATE TABLE IF NOT EXISTS admin.wevia_tasks (
|
|
task_id VARCHAR(64) PRIMARY KEY,
|
|
type VARCHAR(32),
|
|
status VARCHAR(16) DEFAULT 'pending',
|
|
input TEXT,
|
|
output TEXT DEFAULT '',
|
|
progress INT DEFAULT 0,
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
started_at TIMESTAMP,
|
|
finished_at TIMESTAMP,
|
|
session VARCHAR(64)
|
|
)");
|
|
$db->exec("CREATE INDEX IF NOT EXISTS idx_wevia_tasks_status ON admin.wevia_tasks(status)");
|
|
$db->exec("CREATE INDEX IF NOT EXISTS idx_wevia_tasks_created ON admin.wevia_tasks(created_at)");
|
|
} catch (Throwable $e) {
|
|
http_response_code(500);
|
|
echo json_encode(['err'=>'pg_init_failed', 'msg'=>$e->getMessage()]);
|
|
exit;
|
|
}
|
|
|
|
$path = $_GET['path'] ?? '';
|
|
$parts = array_values(array_filter(explode('/', $path)));
|
|
$action = $parts[0] ?? 'list';
|
|
$task_id = $parts[1] ?? null;
|
|
|
|
// CREATE task
|
|
if ($action === 'create' && $_SERVER['REQUEST_METHOD'] === 'POST') {
|
|
$raw = file_get_contents('php://input');
|
|
$d = json_decode($raw, true) ?: [];
|
|
$type = substr($d['type'] ?? 'generic', 0, 32);
|
|
$input = (string)($d['input'] ?? '');
|
|
$cmd = (string)($d['cmd'] ?? '');
|
|
$session = substr($d['session'] ?? 'anon', 0, 64);
|
|
|
|
$tid = 'task_' . date('YmdHis') . '_' . bin2hex(random_bytes(4));
|
|
$stmt = $db->prepare("INSERT INTO admin.wevia_tasks (task_id, type, status, input, session, started_at) VALUES (?, ?, 'running', ?, ?, NOW())");
|
|
$stmt->execute([$tid, $type, $input, $session]);
|
|
|
|
$logf = "$LOG_DIR/$tid.log";
|
|
|
|
if ($cmd) {
|
|
$allowed = ['ls', 'find', 'grep', 'cat', 'head', 'tail', 'wc', 'curl', 'php8.4', '/opt/weval-ops/top-ia/', 'psql', 'redis-cli', 'sqlite3'];
|
|
$ok = false; foreach ($allowed as $a) { if (strpos($cmd, $a) !== false) { $ok = true; break; } }
|
|
if (!$ok) {
|
|
$db->prepare("UPDATE admin.wevia_tasks SET status='rejected', output='cmd_not_whitelisted', finished_at=NOW() WHERE task_id=?")->execute([$tid]);
|
|
echo json_encode(['task_id'=>$tid, 'status'=>'rejected', 'err'=>'cmd_not_whitelisted']);
|
|
exit;
|
|
}
|
|
// Fire async + update status done when finished
|
|
$escaped_tid = escapeshellarg($tid);
|
|
$bg = "nohup bash -c 'echo === START === >> $logf 2>&1; ($cmd) >> $logf 2>&1; echo === END === >> $logf 2>&1; PGPASSWORD=admin123 psql -h 10.1.0.3 -p 5432 -U admin -d adx_system -c \"UPDATE admin.wevia_tasks SET status=\\\"done\\\", finished_at=NOW() WHERE task_id='$tid'\" > /dev/null 2>&1' > /dev/null 2>&1 &";
|
|
@shell_exec($bg);
|
|
}
|
|
|
|
echo json_encode([
|
|
'task_id' => $tid,
|
|
'status' => 'running',
|
|
'stream_url' => "/api/opus5-task-stream.php?path=stream/$tid",
|
|
'status_url' => "/api/opus5-task-stream.php?path=status/$tid",
|
|
'log_file' => $logf
|
|
]);
|
|
exit;
|
|
}
|
|
|
|
// STREAM via SSE
|
|
if ($action === 'stream' && $task_id) {
|
|
header('Content-Type: text/event-stream');
|
|
header('Cache-Control: no-cache');
|
|
header('X-Accel-Buffering: no');
|
|
@ini_set('output_buffering', '0');
|
|
@ini_set('zlib.output_compression', 0);
|
|
|
|
$last_pos = (int)($_SERVER['HTTP_LAST_EVENT_ID'] ?? 0);
|
|
$logf = "$LOG_DIR/$task_id.log";
|
|
$max_duration = 60;
|
|
$start = time();
|
|
$event_id = $last_pos;
|
|
|
|
while (time() - $start < $max_duration) {
|
|
$stmt = $db->prepare("SELECT * FROM admin.wevia_tasks WHERE task_id=?");
|
|
$stmt->execute([$task_id]);
|
|
$task = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
if (!$task) {
|
|
echo "event: error\ndata: " . json_encode(['err'=>'not_found']) . "\n\n"; @ob_flush(); @flush();
|
|
break;
|
|
}
|
|
|
|
if (file_exists($logf)) {
|
|
$size = filesize($logf);
|
|
if ($size > $last_pos) {
|
|
$fh = fopen($logf, 'r');
|
|
fseek($fh, $last_pos);
|
|
$chunk = fread($fh, min($size - $last_pos, 8192));
|
|
fclose($fh);
|
|
$last_pos = $size;
|
|
$event_id++;
|
|
echo "id: $event_id\nevent: log\ndata: " . json_encode(['chunk'=>$chunk, 'offset'=>$last_pos]) . "\n\n";
|
|
@ob_flush(); @flush();
|
|
}
|
|
}
|
|
|
|
$event_id++;
|
|
echo "id: $event_id\nevent: status\ndata: " . json_encode(['status'=>$task['status'], 'progress'=>(int)$task['progress']]) . "\n\n";
|
|
@ob_flush(); @flush();
|
|
|
|
if (in_array($task['status'], ['done', 'failed', 'rejected'])) {
|
|
echo "id: " . ($event_id+1) . "\nevent: end\ndata: " . json_encode(['task_id'=>$task_id, 'final'=>$task['status']]) . "\n\n";
|
|
@ob_flush(); @flush();
|
|
break;
|
|
}
|
|
sleep(1);
|
|
}
|
|
exit;
|
|
}
|
|
|
|
// STATUS
|
|
if ($action === 'status' && $task_id) {
|
|
$stmt = $db->prepare("SELECT * FROM admin.wevia_tasks WHERE task_id=?");
|
|
$stmt->execute([$task_id]);
|
|
$task = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
if (!$task) { http_response_code(404); echo json_encode(['err'=>'not_found']); exit; }
|
|
echo json_encode($task);
|
|
exit;
|
|
}
|
|
|
|
// LIST
|
|
if ($action === 'list') {
|
|
$limit = min(50, (int)($_GET['limit'] ?? 20));
|
|
$status_filter = preg_replace('/[^a-z_]/', '', $_GET['status'] ?? '');
|
|
$sql = "SELECT task_id, type, status, progress, created_at, finished_at, session FROM admin.wevia_tasks";
|
|
if ($status_filter) $sql .= " WHERE status='$status_filter'";
|
|
$sql .= " ORDER BY created_at DESC LIMIT $limit";
|
|
$rows = $db->query($sql)->fetchAll(PDO::FETCH_ASSOC);
|
|
echo json_encode(['count'=>count($rows), 'tasks'=>$rows, 'doctrine'=>'69 v2 — task stream PG admin.wevia_tasks + SSE']);
|
|
exit;
|
|
}
|
|
|
|
http_response_code(400);
|
|
echo json_encode(['err'=>'unknown_action', 'available'=>['create','stream/{id}','status/{id}','list']]);
|