Files
wevia-brain/knowledge/deep/python-advanced-patterns.md
2026-04-12 23:01:36 +02:00

5.4 KiB
Executable File

Python Advanced Patterns — Guide Expert

Async Programming (FastAPI / aiohttp)

import asyncio
from fastapi import FastAPI
import httpx

app = FastAPI()

async def fetch_multiple_urls(urls: list[str]) -> list[dict]:
    """Fetch multiple URLs concurrently"""
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return [
            {"url": url, "status": r.status_code, "size": len(r.text)}
            if not isinstance(r, Exception)
            else {"url": url, "error": str(r)}
            for url, r in zip(urls, responses)
        ]

@app.get("/api/health-check")
async def health_check():
    """Check health of all WEVADS servers concurrently"""
    servers = [
        "http://89.167.40.150:5821/api/health",
        "http://88.198.4.195:11434/api/tags",
        "http://151.80.235.110/track.php?test=1"
    ]
    results = await fetch_multiple_urls(servers)
    return {"servers": results, "all_healthy": all(r.get("status") == 200 for r in results)}

Decorators Advanced

import functools
import time
import logging

def retry(max_attempts=3, delay=1, exceptions=(Exception,)):
    """Retry decorator with exponential backoff"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        raise
                    wait = delay * (2 ** (attempt - 1))
                    logging.warning(f"{func.__name__} failed (attempt {attempt}), retrying in {wait}s: {e}")
                    time.sleep(wait)
        return wrapper
    return decorator

def cache_result(ttl_seconds=300):
    """Cache function results with TTL"""
    def decorator(func):
        cache = {}
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = (args, tuple(sorted(kwargs.items())))
            now = time.time()
            if key in cache and now - cache[key]['time'] < ttl_seconds:
                return cache[key]['value']
            result = func(*args, **kwargs)
            cache[key] = {'value': result, 'time': now}
            return result
        wrapper.cache_clear = lambda: cache.clear()
        return wrapper
    return decorator

def measure_time(func):
    """Log execution time"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        duration = time.perf_counter() - start
        logging.info(f"{func.__name__} took {duration:.4f}s")
        return result
    return wrapper

# Usage:
@retry(max_attempts=3, delay=2, exceptions=(ConnectionError, TimeoutError))
@measure_time
@cache_result(ttl_seconds=60)
def fetch_ollama_models():
    """Fetch available models from Ollama"""
    import httpx
    response = httpx.get("http://88.198.4.195:11434/api/tags", timeout=10)
    return response.json()

Data Processing Patterns

import pandas as pd
from typing import Generator

def process_large_csv(filepath: str, chunk_size: int = 10000) -> Generator:
    """Process large CSV files in chunks without loading everything in memory"""
    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # Clean
        chunk = chunk.dropna(subset=['email'])
        chunk['email'] = chunk['email'].str.lower().str.strip()
        chunk['domain'] = chunk['email'].str.split('@').str[1]
        
        # Filter
        valid = chunk[chunk['email'].str.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')]
        
        yield valid

def deduplicate_contacts(df: pd.DataFrame) -> pd.DataFrame:
    """Deduplicate contacts keeping the most recent"""
    return (df
        .sort_values('updated_at', ascending=False)
        .drop_duplicates(subset=['email'], keep='first')
        .reset_index(drop=True))

def segment_by_isp(df: pd.DataFrame) -> dict[str, pd.DataFrame]:
    """Segment contacts by ISP for targeted sending"""
    isp_map = {
        'gmail.com': 'gmail', 'googlemail.com': 'gmail',
        'outlook.com': 'microsoft', 'hotmail.com': 'microsoft', 'live.com': 'microsoft',
        'yahoo.com': 'yahoo', 'yahoo.fr': 'yahoo', 'ymail.com': 'yahoo',
    }
    df['isp'] = df['domain'].map(isp_map).fillna('other')
    return {isp: group for isp, group in df.groupby('isp')}

Type Hints & Pydantic

from pydantic import BaseModel, EmailStr, Field, validator
from typing import Optional
from datetime import datetime

class Contact(BaseModel):
    email: EmailStr
    first_name: str = Field(min_length=1, max_length=100)
    last_name: str = Field(min_length=1, max_length=100)
    specialty: Optional[str] = None
    country: str = Field(pattern=r'^[A-Z]{2}$')  # ISO 2-letter
    status: str = Field(default='active', pattern=r'^(active|inactive|bounced|unsubscribed)$')
    created_at: datetime = Field(default_factory=datetime.utcnow)
    
    @validator('email')
    def lowercase_email(cls, v):
        return v.lower()
    
    class Config:
        json_schema_extra = {
            "example": {
                "email": "dr.benali@gmail.com",
                "first_name": "Ahmed",
                "last_name": "Benali",
                "specialty": "Cardiologie",
                "country": "MA"
            }
        }