69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
|
"""WEVIA Redis LLM Worker — Sovereign Cascade Async
|
|
Runs as systemd service. BRPOP tasks from Redis, call Sovereign:4000, store results.
|
|
"""
|
|
import redis, json, requests, time, sys, os
|
|
|
|
REDIS_HOST = "127.0.0.1"
|
|
REDIS_PORT = 6379
|
|
SOVEREIGN_URL = "http://127.0.0.1:4000/v1/chat/completions"
|
|
QUEUE = "wevia:llm:queue"
|
|
TIMEOUT = 20
|
|
|
|
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
|
|
|
|
def process_task(task_data):
|
|
try:
|
|
task = json.loads(task_data)
|
|
task_id = task.get("id", "unknown")
|
|
messages = task.get("messages", [])
|
|
max_tokens = task.get("max_tokens", 1000)
|
|
|
|
# Call Sovereign
|
|
resp = requests.post(SOVEREIGN_URL, json={
|
|
"model": "auto",
|
|
"messages": messages,
|
|
"max_tokens": max_tokens
|
|
}, timeout=TIMEOUT)
|
|
|
|
if resp.status_code == 200:
|
|
d = resp.json()
|
|
result = {
|
|
"status": "ok",
|
|
"response": d.get("choices", [{}])[0].get("message", {}).get("content", ""),
|
|
"provider": d.get("provider", "sovereign"),
|
|
"model": d.get("model", "auto"),
|
|
"usage": d.get("usage", {})
|
|
}
|
|
else:
|
|
result = {"status": "error", "response": f"Sovereign HTTP {resp.status_code}", "provider": "error"}
|
|
except requests.Timeout:
|
|
result = {"status": "error", "response": "Sovereign timeout", "provider": "timeout"}
|
|
except Exception as e:
|
|
result = {"status": "error", "response": str(e)[:200], "provider": "error"}
|
|
|
|
# Store result with 60s TTL
|
|
r.setex(f"wevia:llm:result:{task_id}", 60, json.dumps(result))
|
|
return result
|
|
|
|
print(f"[WEVIA LLM Worker] Started. Listening on {QUEUE}...")
|
|
sys.stdout.flush()
|
|
|
|
while True:
|
|
try:
|
|
item = r.brpop(QUEUE, timeout=30)
|
|
if item:
|
|
_, task_data = item
|
|
result = process_task(task_data)
|
|
print(f"[Task] {result.get('provider','?')} -> {result.get('status','?')}")
|
|
sys.stdout.flush()
|
|
except redis.ConnectionError:
|
|
print("[Redis] Connection lost, retrying in 5s...")
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
print("[Worker] Shutting down.")
|
|
break
|
|
except Exception as e:
|
|
print(f"[Error] {e}")
|
|
time.sleep(1)
|