211 lines
6.5 KiB
PHP
Executable File
211 lines
6.5 KiB
PHP
Executable File
<?php
|
|
/**
|
|
* WEVIA OPUS — Streaming Response Handler
|
|
*
|
|
* Gère les réponses en streaming depuis Ollama:
|
|
* - SSE (Server-Sent Events) vers le frontend
|
|
* - Token-by-token processing
|
|
* - Interruption gracieuse
|
|
* - Progress callbacks
|
|
* - Accumulation + post-processing
|
|
*/
|
|
|
|
class StreamingHandler {
|
|
|
|
private string $ollamaUrl;
|
|
private array $callbacks = [];
|
|
private bool $interrupted = false;
|
|
private string $accumulatedResponse = '';
|
|
private int $tokenCount = 0;
|
|
private float $startTime;
|
|
|
|
public function __construct(string $ollamaUrl = 'http://localhost:11434') {
|
|
$this->ollamaUrl = $ollamaUrl;
|
|
}
|
|
|
|
/**
|
|
* Enregistre un callback pour chaque token
|
|
*/
|
|
public function onToken(callable $callback): self {
|
|
$this->callbacks['token'] = $callback;
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Enregistre un callback pour la fin de génération
|
|
*/
|
|
public function onComplete(callable $callback): self {
|
|
$this->callbacks['complete'] = $callback;
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Enregistre un callback pour les erreurs
|
|
*/
|
|
public function onError(callable $callback): self {
|
|
$this->callbacks['error'] = $callback;
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Stream une réponse depuis Ollama (chat)
|
|
*/
|
|
public function streamChat(string $model, array $messages, array $options = []): string {
|
|
$this->startTime = microtime(true);
|
|
$this->accumulatedResponse = '';
|
|
$this->tokenCount = 0;
|
|
$this->interrupted = false;
|
|
|
|
$payload = json_encode([
|
|
'model' => $model,
|
|
'messages' => $messages,
|
|
'stream' => true,
|
|
'options' => array_merge([
|
|
'temperature' => 0.7,
|
|
'num_predict' => 4096,
|
|
'top_p' => 0.9
|
|
], $options)
|
|
]);
|
|
|
|
$ch = curl_init("{$this->ollamaUrl}/api/chat");
|
|
curl_setopt_array($ch, [
|
|
CURLOPT_POST => true,
|
|
CURLOPT_POSTFIELDS => $payload,
|
|
CURLOPT_HTTPHEADER => ['Content-Type: application/json'],
|
|
CURLOPT_WRITEFUNCTION => function($ch, $data) {
|
|
return $this->processStreamChunk($data);
|
|
},
|
|
CURLOPT_TIMEOUT => 300
|
|
]);
|
|
|
|
$result = curl_exec($ch);
|
|
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
|
curl_close($ch);
|
|
|
|
if ($httpCode !== 200 && isset($this->callbacks['error'])) {
|
|
($this->callbacks['error'])("HTTP $httpCode");
|
|
}
|
|
|
|
// Callback de fin
|
|
if (isset($this->callbacks['complete'])) {
|
|
($this->callbacks['complete'])([
|
|
'response' => $this->accumulatedResponse,
|
|
'tokens' => $this->tokenCount,
|
|
'duration_ms' => round((microtime(true) - $this->startTime) * 1000),
|
|
'tokens_per_second' => $this->calculateTPS(),
|
|
'interrupted' => $this->interrupted
|
|
]);
|
|
}
|
|
|
|
return $this->accumulatedResponse;
|
|
}
|
|
|
|
/**
|
|
* Traite un chunk du stream
|
|
*/
|
|
private function processStreamChunk(string $data): int {
|
|
if ($this->interrupted) return 0; // Stop le stream
|
|
|
|
$lines = explode("\n", $data);
|
|
foreach ($lines as $line) {
|
|
$line = trim($line);
|
|
if (empty($line)) continue;
|
|
|
|
$json = json_decode($line, true);
|
|
if (!$json) continue;
|
|
|
|
$token = $json['message']['content'] ?? '';
|
|
$done = $json['done'] ?? false;
|
|
|
|
if ($token) {
|
|
$this->accumulatedResponse .= $token;
|
|
$this->tokenCount++;
|
|
|
|
if (isset($this->callbacks['token'])) {
|
|
$shouldContinue = ($this->callbacks['token'])([
|
|
'token' => $token,
|
|
'accumulated' => $this->accumulatedResponse,
|
|
'token_count' => $this->tokenCount,
|
|
'elapsed_ms' => round((microtime(true) - $this->startTime) * 1000)
|
|
]);
|
|
|
|
if ($shouldContinue === false) {
|
|
$this->interrupted = true;
|
|
return 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return strlen($data);
|
|
}
|
|
|
|
/**
|
|
* SSE (Server-Sent Events) pour le frontend
|
|
*/
|
|
public function streamSSE(string $model, array $messages, array $options = []): void {
|
|
header('Content-Type: text/event-stream');
|
|
header('Cache-Control: no-cache');
|
|
header('Connection: keep-alive');
|
|
header('X-Accel-Buffering: no');
|
|
|
|
$this->onToken(function($data) {
|
|
echo "data: " . json_encode([
|
|
'type' => 'token',
|
|
'content' => $data['token'],
|
|
'token_count' => $data['token_count']
|
|
]) . "\n\n";
|
|
|
|
if (ob_get_level()) ob_flush();
|
|
flush();
|
|
|
|
// Check si le client est toujours connecté
|
|
if (connection_aborted()) return false;
|
|
return true;
|
|
});
|
|
|
|
$this->onComplete(function($data) {
|
|
echo "data: " . json_encode([
|
|
'type' => 'done',
|
|
'response' => $data['response'],
|
|
'tokens' => $data['tokens'],
|
|
'duration_ms' => $data['duration_ms'],
|
|
'tps' => $data['tokens_per_second']
|
|
]) . "\n\n";
|
|
|
|
if (ob_get_level()) ob_flush();
|
|
flush();
|
|
});
|
|
|
|
$this->onError(function($error) {
|
|
echo "data: " . json_encode(['type' => 'error', 'message' => $error]) . "\n\n";
|
|
if (ob_get_level()) ob_flush();
|
|
flush();
|
|
});
|
|
|
|
$this->streamChat($model, $messages, $options);
|
|
}
|
|
|
|
/**
|
|
* Interrompt le stream en cours
|
|
*/
|
|
public function interrupt(): void {
|
|
$this->interrupted = true;
|
|
}
|
|
|
|
/**
|
|
* Calcule les tokens par seconde
|
|
*/
|
|
private function calculateTPS(): float {
|
|
$elapsed = microtime(true) - $this->startTime;
|
|
return $elapsed > 0 ? round($this->tokenCount / $elapsed, 1) : 0;
|
|
}
|
|
|
|
/**
|
|
* Getters
|
|
*/
|
|
public function getAccumulatedResponse(): string { return $this->accumulatedResponse; }
|
|
public function getTokenCount(): int { return $this->tokenCount; }
|
|
public function isInterrupted(): bool { return $this->interrupted; }
|
|
}
|