Files
wevia-brain/modules/pipeline/streaming-handler.php
2026-04-12 23:01:36 +02:00

211 lines
6.5 KiB
PHP
Executable File

<?php
/**
* WEVIA OPUS — Streaming Response Handler
*
* Gère les réponses en streaming depuis Ollama:
* - SSE (Server-Sent Events) vers le frontend
* - Token-by-token processing
* - Interruption gracieuse
* - Progress callbacks
* - Accumulation + post-processing
*/
class StreamingHandler {
private string $ollamaUrl;
private array $callbacks = [];
private bool $interrupted = false;
private string $accumulatedResponse = '';
private int $tokenCount = 0;
private float $startTime;
public function __construct(string $ollamaUrl = 'http://localhost:11434') {
$this->ollamaUrl = $ollamaUrl;
}
/**
* Enregistre un callback pour chaque token
*/
public function onToken(callable $callback): self {
$this->callbacks['token'] = $callback;
return $this;
}
/**
* Enregistre un callback pour la fin de génération
*/
public function onComplete(callable $callback): self {
$this->callbacks['complete'] = $callback;
return $this;
}
/**
* Enregistre un callback pour les erreurs
*/
public function onError(callable $callback): self {
$this->callbacks['error'] = $callback;
return $this;
}
/**
* Stream une réponse depuis Ollama (chat)
*/
public function streamChat(string $model, array $messages, array $options = []): string {
$this->startTime = microtime(true);
$this->accumulatedResponse = '';
$this->tokenCount = 0;
$this->interrupted = false;
$payload = json_encode([
'model' => $model,
'messages' => $messages,
'stream' => true,
'options' => array_merge([
'temperature' => 0.7,
'num_predict' => 4096,
'top_p' => 0.9
], $options)
]);
$ch = curl_init("{$this->ollamaUrl}/api/chat");
curl_setopt_array($ch, [
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => $payload,
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
CURLOPT_WRITEFUNCTION => function($ch, $data) {
return $this->processStreamChunk($data);
},
CURLOPT_TIMEOUT => 300
]);
$result = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200 && isset($this->callbacks['error'])) {
($this->callbacks['error'])("HTTP $httpCode");
}
// Callback de fin
if (isset($this->callbacks['complete'])) {
($this->callbacks['complete'])([
'response' => $this->accumulatedResponse,
'tokens' => $this->tokenCount,
'duration_ms' => round((microtime(true) - $this->startTime) * 1000),
'tokens_per_second' => $this->calculateTPS(),
'interrupted' => $this->interrupted
]);
}
return $this->accumulatedResponse;
}
/**
* Traite un chunk du stream
*/
private function processStreamChunk(string $data): int {
if ($this->interrupted) return 0; // Stop le stream
$lines = explode("\n", $data);
foreach ($lines as $line) {
$line = trim($line);
if (empty($line)) continue;
$json = json_decode($line, true);
if (!$json) continue;
$token = $json['message']['content'] ?? '';
$done = $json['done'] ?? false;
if ($token) {
$this->accumulatedResponse .= $token;
$this->tokenCount++;
if (isset($this->callbacks['token'])) {
$shouldContinue = ($this->callbacks['token'])([
'token' => $token,
'accumulated' => $this->accumulatedResponse,
'token_count' => $this->tokenCount,
'elapsed_ms' => round((microtime(true) - $this->startTime) * 1000)
]);
if ($shouldContinue === false) {
$this->interrupted = true;
return 0;
}
}
}
}
return strlen($data);
}
/**
* SSE (Server-Sent Events) pour le frontend
*/
public function streamSSE(string $model, array $messages, array $options = []): void {
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('X-Accel-Buffering: no');
$this->onToken(function($data) {
echo "data: " . json_encode([
'type' => 'token',
'content' => $data['token'],
'token_count' => $data['token_count']
]) . "\n\n";
if (ob_get_level()) ob_flush();
flush();
// Check si le client est toujours connecté
if (connection_aborted()) return false;
return true;
});
$this->onComplete(function($data) {
echo "data: " . json_encode([
'type' => 'done',
'response' => $data['response'],
'tokens' => $data['tokens'],
'duration_ms' => $data['duration_ms'],
'tps' => $data['tokens_per_second']
]) . "\n\n";
if (ob_get_level()) ob_flush();
flush();
});
$this->onError(function($error) {
echo "data: " . json_encode(['type' => 'error', 'message' => $error]) . "\n\n";
if (ob_get_level()) ob_flush();
flush();
});
$this->streamChat($model, $messages, $options);
}
/**
* Interrompt le stream en cours
*/
public function interrupt(): void {
$this->interrupted = true;
}
/**
* Calcule les tokens par seconde
*/
private function calculateTPS(): float {
$elapsed = microtime(true) - $this->startTime;
return $elapsed > 0 ? round($this->tokenCount / $elapsed, 1) : 0;
}
/**
* Getters
*/
public function getAccumulatedResponse(): string { return $this->accumulatedResponse; }
public function getTokenCount(): int { return $this->tokenCount; }
public function isInterrupted(): bool { return $this->interrupted; }
}