Files
wevads-platform/scripts/rag-processor.py
2026-02-26 04:53:11 +01:00

105 lines
3.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
WEVADS RAG System - Retrieval-Augmented Generation
Analyse intelligente des logs et génération de solutions
"""
import json
import re
from datetime import datetime
class WEVADS_RAG:
def __init__(self):
self.knowledge_base = "/opt/wevads/rag/knowledge_base.json"
self.load_knowledge()
def load_knowledge(self):
try:
with open(self.knowledge_base, 'r') as f:
self.data = json.load(f)
except:
self.data = {
"solutions": {},
"patterns": {},
"incidents": []
}
def analyze_incident(self, log_entry):
"""Analyse un incident et trouve des solutions similaires"""
patterns = {
"high_cpu": r"CPU.*high|load.*high|%CPU.*\d{2,}",
"memory_issue": r"memory.*low|swap.*used|OOM",
"disk_full": r"disk.*full|no.*space|usage.*\d{2,}%",
"service_down": r"service.*down|failed|inactive",
"postgres_issue": r"postgresql.*error|connection.*failed"
}
detected = []
for pattern_name, pattern in patterns.items():
if re.search(pattern, log_entry, re.IGNORECASE):
detected.append(pattern_name)
return detected
def generate_solution(self, problem_type):
"""Génère une solution basée sur la base de connaissances"""
solutions_db = {
"high_cpu": {
"diagnosis": "Charge CPU élevée détectée",
"actions": [
"Identifier les processus: ps aux --sort=-%cpu | head -10",
"Analyser les logs système: journalctl --since '1 hour ago'",
"Vérifier les services inutiles: systemctl list-units --state=running"
],
"prevention": "Configurer des limites CPU avec systemd"
},
"postgres_issue": {
"diagnosis": "Problème PostgreSQL détecté",
"actions": [
"Vérifier le service: systemctl status postgresql",
"Analyser les logs: tail -100 /var/log/postgresql/*.log",
"Tester la connexion: sudo -u postgres psql -c 'SELECT 1;'"
],
"prevention": "Configurer le monitoring PostgreSQL"
}
}
return solutions_db.get(problem_type, {
"diagnosis": "Problème générique",
"actions": ["Consulter les logs", "Analyser le système"],
"prevention": "Mettre en place un monitoring"
})
def learn_from_incident(self, incident, solution_applied, success):
"""Apprend d'un incident pour améliorer les futures réponses"""
learning_entry = {
"timestamp": datetime.now().isoformat(),
"incident": incident[:200], # Limiter la taille
"solution": solution_applied,
"success": success,
"pattern": self.analyze_incident(incident)
}
self.data["incidents"].append(learning_entry)
# Mettre à jour la base de connaissances
with open(self.knowledge_base, 'w') as f:
json.dump(self.data, f, indent=2)
return True
# Exemple d'utilisation
if __name__ == "__main__":
rag = WEVADS_RAG()
# Test
test_incident = "CPU usage at 95% for PostgreSQL processes"
problems = rag.analyze_incident(test_incident)
if problems:
print(f"Problèmes détectés: {problems}")
for problem in problems:
solution = rag.generate_solution(problem)
print(f"\nSolution pour {problem}:")
print(json.dumps(solution, indent=2))