178 lines
8.3 KiB
PHP
178 lines
8.3 KiB
PHP
<?php
|
|
// OPUS5 — Plan Orchestrator (doctrine 84)
|
|
// Moteur d'exécution de plans: respect depends_on, séquentiel + parallèle via curl_multi
|
|
// Actions: execute, status, resume
|
|
header('Content-Type: application/json');
|
|
$R = ['ts'=>date('c'), 'source'=>'opus5-plan-orchestrator'];
|
|
|
|
$PG_DSN = 'pgsql:host=10.1.0.3;port=5432;dbname=adx_system;user=admin;password=admin123';
|
|
try { $db = new PDO($PG_DSN, null, null, [PDO::ATTR_ERRMODE=>PDO::ERRMODE_EXCEPTION, PDO::ATTR_TIMEOUT=>5]); }
|
|
catch (Throwable $e) { http_response_code(500); echo json_encode(['err'=>'pg_connect']); exit; }
|
|
|
|
$raw = file_get_contents('php://input');
|
|
$d = json_decode($raw, true) ?: [];
|
|
$action = $_GET['action'] ?? ($d['action'] ?? 'status');
|
|
|
|
if ($action === 'execute') {
|
|
$pid = preg_replace('/[^a-z0-9_-]/i', '', (string)($d['plan_id'] ?? ''));
|
|
$dry_run = !empty($d['dry_run']);
|
|
$max_parallel = min(5, (int)($d['max_parallel'] ?? 3));
|
|
if (!$pid) { http_response_code(400); echo json_encode(['err'=>'no_plan_id']); exit; }
|
|
|
|
// Fetch plan + steps
|
|
$stmt = $db->prepare("SELECT * FROM admin.wevia_plans WHERE plan_id=?");
|
|
$stmt->execute([$pid]);
|
|
$plan = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
if (!$plan) { http_response_code(404); echo json_encode(['err'=>'plan_not_found']); exit; }
|
|
|
|
$stmt = $db->prepare("SELECT * FROM admin.wevia_plan_steps WHERE plan_id=? ORDER BY step_order");
|
|
$stmt->execute([$pid]);
|
|
$steps = $stmt->fetchAll(PDO::FETCH_ASSOC);
|
|
if (empty($steps)) { http_response_code(400); echo json_encode(['err'=>'no_steps']); exit; }
|
|
|
|
$R['plan_id'] = $pid;
|
|
$R['plan_name'] = $plan['name'];
|
|
$R['steps_total'] = count($steps);
|
|
$R['dry_run'] = $dry_run;
|
|
$R['max_parallel'] = $max_parallel;
|
|
|
|
// Mark plan running
|
|
if (!$dry_run) {
|
|
$db->prepare("UPDATE admin.wevia_plans SET status='running', started_at=COALESCE(started_at, NOW()) WHERE plan_id=?")->execute([$pid]);
|
|
}
|
|
|
|
// Execute steps respecting depends_on
|
|
$done_ids = []; // step_ids completed
|
|
$executed = [];
|
|
$skipped = [];
|
|
$failed = [];
|
|
|
|
$rounds = 0;
|
|
while (count($done_ids) + count($failed) < count($steps) && $rounds < 20) {
|
|
$rounds++;
|
|
// Find ready steps : pending + dependencies all done
|
|
$ready = [];
|
|
foreach ($steps as $s) {
|
|
if ($s['status'] !== 'pending') continue;
|
|
if (in_array((int)$s['step_order'], $done_ids)) continue;
|
|
$deps = trim((string)($s['depends_on'] ?? '{}'), '{}');
|
|
$dep_orders = $deps ? array_map('intval', explode(',', $deps)) : [];
|
|
$all_done = true;
|
|
// depends_on referencing step_order (not step_id auto-increment)
|
|
foreach ($dep_orders as $dep_order) { if (!in_array($dep_order, $done_ids)) { $all_done = false; break; } }
|
|
if ($all_done) $ready[] = $s;
|
|
if (count($ready) >= $max_parallel) break;
|
|
}
|
|
if (empty($ready)) break; // No progress
|
|
|
|
if ($dry_run) {
|
|
foreach ($ready as $s) {
|
|
$executed[] = ['step_id'=>$s['step_id'], 'step_order'=>$s['step_order'], 'step_order'=>$s['step_order'], 'name'=>$s['step_name'], 'action'=>'DRY_RUN', 'order'=>$s['step_order']];
|
|
$done_ids[] = (int)$s['step_order'];
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// Parallel exec via curl_multi
|
|
$mh = curl_multi_init();
|
|
$handles = [];
|
|
foreach ($ready as $s) {
|
|
$db->prepare("UPDATE admin.wevia_plan_steps SET status='running', started_at=NOW() WHERE step_id=?")->execute([$s['step_id']]);
|
|
$url = $s['action_url'];
|
|
if (!$url) {
|
|
$db->prepare("UPDATE admin.wevia_plan_steps SET status='failed', result='no_url', finished_at=NOW() WHERE step_id=?")->execute([$s['step_id']]);
|
|
$failed[] = ['step_id'=>$s['step_id'], 'step_order'=>$s['step_order'], 'step_order'=>$s['step_order'], 'name'=>$s['step_name'], 'err'=>'no_url'];
|
|
continue;
|
|
}
|
|
$method = strtoupper((string)($s['action_method'] ?? 'POST'));
|
|
$payload = $s['action_payload'] ?? '{}';
|
|
|
|
// Ensure local host
|
|
if (strpos($url, 'http://') !== 0 && strpos($url, 'https://') !== 0) {
|
|
$url = 'http://127.0.0.1' . $url;
|
|
}
|
|
|
|
$ch = curl_init($url);
|
|
$opts = [
|
|
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
|
|
CURLOPT_RETURNTRANSFER => true,
|
|
CURLOPT_TIMEOUT => 30,
|
|
CURLOPT_FOLLOWLOCATION => true,
|
|
CURLOPT_MAXREDIRS => 3,
|
|
CURLOPT_SSL_VERIFYPEER => false,
|
|
CURLOPT_SSL_VERIFYHOST => 0
|
|
];
|
|
if ($method === 'POST') {
|
|
$opts[CURLOPT_POST] = true;
|
|
$opts[CURLOPT_POSTFIELDS] = is_array($payload) ? json_encode($payload) : $payload;
|
|
}
|
|
curl_setopt_array($ch, $opts);
|
|
curl_multi_add_handle($mh, $ch);
|
|
$handles[] = ['ch'=>$ch, 'step'=>$s, 'started'=>microtime(true)];
|
|
}
|
|
|
|
$running = null;
|
|
do { curl_multi_exec($mh, $running); curl_multi_select($mh, 0.1); } while ($running > 0);
|
|
|
|
foreach ($handles as $h) {
|
|
$body = curl_multi_getcontent($h['ch']);
|
|
$http = curl_getinfo($h['ch'], CURLINFO_HTTP_CODE);
|
|
$ms = round((microtime(true) - $h['started']) * 1000);
|
|
if ($http === 200) {
|
|
$db->prepare("UPDATE admin.wevia_plan_steps SET status='done', result=?, finished_at=NOW() WHERE step_id=?")->execute([substr((string)$body, 0, 2000), $h['step']['step_id']]);
|
|
$executed[] = ['step_id'=>$h['step']['step_id'], 'name'=>$h['step']['step_name'], 'http'=>$http, 'ms'=>$ms, 'preview'=>substr((string)$body, 0, 150)];
|
|
$done_ids[] = (int)$h['step']['step_order'];
|
|
} else {
|
|
$db->prepare("UPDATE admin.wevia_plan_steps SET status='failed', result=?, finished_at=NOW() WHERE step_id=?")->execute([substr("HTTP $http: " . (string)$body, 0, 2000), $h['step']['step_id']]);
|
|
$failed[] = ['step_id'=>$h['step']['step_id'], 'name'=>$h['step']['step_name'], 'http'=>$http, 'ms'=>$ms];
|
|
}
|
|
curl_multi_remove_handle($mh, $h['ch']);
|
|
curl_close($h['ch']);
|
|
}
|
|
curl_multi_close($mh);
|
|
|
|
// Refresh steps (status updated)
|
|
$stmt = $db->prepare("SELECT * FROM admin.wevia_plan_steps WHERE plan_id=? ORDER BY step_order");
|
|
$stmt->execute([$pid]);
|
|
$steps = $stmt->fetchAll(PDO::FETCH_ASSOC);
|
|
}
|
|
|
|
// Final plan status
|
|
if (!$dry_run) {
|
|
$final_status = count($failed) > 0 ? 'failed' : 'done';
|
|
$db->prepare("UPDATE admin.wevia_plans SET status=?, finished_at=NOW() WHERE plan_id=?")->execute([$final_status, $pid]);
|
|
}
|
|
|
|
$R['rounds'] = $rounds;
|
|
$R['executed'] = $executed;
|
|
$R['failed'] = $failed;
|
|
$R['done_count'] = count($done_ids);
|
|
$R['failed_count'] = count($failed);
|
|
$R['final_status'] = $dry_run ? 'dry_run' : (count($failed) > 0 ? 'failed' : 'done');
|
|
$R['doctrine'] = '84 — plan orchestrator (parallel curl_multi + depends_on respect)';
|
|
|
|
echo json_encode($R, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE);
|
|
exit;
|
|
}
|
|
|
|
if ($action === 'status') {
|
|
$pid = preg_replace('/[^a-z0-9_-]/i', '', $_GET['plan_id'] ?? $d['plan_id'] ?? '');
|
|
if (!$pid) { http_response_code(400); echo json_encode(['err'=>'no_plan_id']); exit; }
|
|
$stmt = $db->prepare("SELECT plan_id, name, status, started_at, finished_at,
|
|
(SELECT COUNT(*) FROM admin.wevia_plan_steps WHERE plan_id=p.plan_id) AS total,
|
|
(SELECT COUNT(*) FROM admin.wevia_plan_steps WHERE plan_id=p.plan_id AND status='done') AS done,
|
|
(SELECT COUNT(*) FROM admin.wevia_plan_steps WHERE plan_id=p.plan_id AND status='running') AS running,
|
|
(SELECT COUNT(*) FROM admin.wevia_plan_steps WHERE plan_id=p.plan_id AND status='failed') AS failed
|
|
FROM admin.wevia_plans p WHERE plan_id=?");
|
|
$stmt->execute([$pid]);
|
|
$status = $stmt->fetch(PDO::FETCH_ASSOC);
|
|
if (!$status) { http_response_code(404); echo json_encode(['err'=>'not_found']); exit; }
|
|
$status['progress_pct'] = $status['total'] > 0 ? round(($status['done'] / $status['total']) * 100, 1) : 0;
|
|
$R['status'] = $status;
|
|
echo json_encode($R, JSON_PRETTY_PRINT);
|
|
exit;
|
|
}
|
|
|
|
http_response_code(400);
|
|
echo json_encode(['err'=>'unknown_action', 'available'=>['execute', 'status']]);
|