#!/usr/bin/env python3 """ WEVADS RAG System - Retrieval-Augmented Generation Analyse intelligente des logs et génération de solutions """ import json import re from datetime import datetime class WEVADS_RAG: def __init__(self): self.knowledge_base = "/opt/wevads/rag/knowledge_base.json" self.load_knowledge() def load_knowledge(self): try: with open(self.knowledge_base, 'r') as f: self.data = json.load(f) except: self.data = { "solutions": {}, "patterns": {}, "incidents": [] } def analyze_incident(self, log_entry): """Analyse un incident et trouve des solutions similaires""" patterns = { "high_cpu": r"CPU.*high|load.*high|%CPU.*\d{2,}", "memory_issue": r"memory.*low|swap.*used|OOM", "disk_full": r"disk.*full|no.*space|usage.*\d{2,}%", "service_down": r"service.*down|failed|inactive", "postgres_issue": r"postgresql.*error|connection.*failed" } detected = [] for pattern_name, pattern in patterns.items(): if re.search(pattern, log_entry, re.IGNORECASE): detected.append(pattern_name) return detected def generate_solution(self, problem_type): """Génère une solution basée sur la base de connaissances""" solutions_db = { "high_cpu": { "diagnosis": "Charge CPU élevée détectée", "actions": [ "Identifier les processus: ps aux --sort=-%cpu | head -10", "Analyser les logs système: journalctl --since '1 hour ago'", "Vérifier les services inutiles: systemctl list-units --state=running" ], "prevention": "Configurer des limites CPU avec systemd" }, "postgres_issue": { "diagnosis": "Problème PostgreSQL détecté", "actions": [ "Vérifier le service: systemctl status postgresql", "Analyser les logs: tail -100 /var/log/postgresql/*.log", "Tester la connexion: sudo -u postgres psql -c 'SELECT 1;'" ], "prevention": "Configurer le monitoring PostgreSQL" } } return solutions_db.get(problem_type, { "diagnosis": "Problème générique", "actions": ["Consulter les logs", "Analyser le système"], "prevention": "Mettre en place un monitoring" }) def learn_from_incident(self, incident, solution_applied, success): """Apprend d'un incident pour améliorer les futures réponses""" learning_entry = { "timestamp": datetime.now().isoformat(), "incident": incident[:200], # Limiter la taille "solution": solution_applied, "success": success, "pattern": self.analyze_incident(incident) } self.data["incidents"].append(learning_entry) # Mettre à jour la base de connaissances with open(self.knowledge_base, 'w') as f: json.dump(self.data, f, indent=2) return True # Exemple d'utilisation if __name__ == "__main__": rag = WEVADS_RAG() # Test test_incident = "CPU usage at 95% for PostgreSQL processes" problems = rag.analyze_incident(test_incident) if problems: print(f"Problèmes détectés: {problems}") for problem in problems: solution = rag.generate_solution(problem) print(f"\nSolution pour {problem}:") print(json.dumps(solution, indent=2))