- Add 60 new agents across all 10 categories (75 -> 135) - Add 95 new plugins with command files (25 -> 120) - Update all agents to use model: opus - Update README with complete plugin/agent tables - Update marketplace.json with all 120 plugins
101 lines
4.7 KiB
Markdown
101 lines
4.7 KiB
Markdown
# DataFlow Pipeline
|
|
|
|
ETL pipeline for ingesting, transforming, and serving analytics data from multiple sources.
|
|
|
|
## Stack
|
|
- **Language**: Python 3.12
|
|
- **Framework**: FastAPI 0.115 (API), Celery 5.4 (workers)
|
|
- **Database**: PostgreSQL 16 (warehouse), Redis 7 (broker/cache)
|
|
- **ORM**: SQLAlchemy 2.0 with Alembic migrations
|
|
- **Data**: Pandas 2.2, Polars 1.x, DuckDB (analytics queries)
|
|
- **Testing**: pytest, pytest-asyncio, factory-boy, hypothesis
|
|
- **Linting**: Ruff (linter + formatter), mypy (strict mode)
|
|
- **Package Manager**: uv (lockfile: `uv.lock`)
|
|
- **CI/CD**: GitHub Actions, Docker, AWS ECS
|
|
- **Docs**: Sphinx with autodoc
|
|
|
|
## Commands
|
|
- `uv sync` - Install dependencies from lockfile
|
|
- `uv run fastapi dev` - Start API server (localhost:8000)
|
|
- `uv run celery -A dataflow.worker worker --loglevel=info` - Start Celery worker
|
|
- `uv run pytest` - Run test suite
|
|
- `uv run pytest --cov=dataflow --cov-report=html` - Run tests with coverage
|
|
- `uv run mypy dataflow/` - Type check
|
|
- `uv run ruff check dataflow/` - Lint
|
|
- `uv run ruff format dataflow/` - Format
|
|
- `uv run alembic upgrade head` - Apply database migrations
|
|
- `uv run alembic revision --autogenerate -m "description"` - Generate migration
|
|
- `docker compose up -d` - Start PostgreSQL, Redis, and worker locally
|
|
|
|
## Project Structure
|
|
```
|
|
dataflow/
|
|
api/
|
|
routes/ - FastAPI route modules (one per resource)
|
|
deps.py - Dependency injection (db session, current user, services)
|
|
middleware.py - CORS, timing, error handling middleware
|
|
core/
|
|
config.py - Pydantic Settings with environment validation
|
|
security.py - JWT token handling, password hashing
|
|
exceptions.py - Custom exception classes with error codes
|
|
models/ - SQLAlchemy ORM models
|
|
schemas/ - Pydantic request/response schemas
|
|
repositories/ - Data access layer (one per model)
|
|
services/ - Business logic (orchestrates repositories)
|
|
workers/
|
|
tasks.py - Celery task definitions
|
|
pipelines/ - ETL pipeline definitions (extract, transform, load)
|
|
utils/ - Pure utility functions
|
|
tests/
|
|
conftest.py - Shared fixtures (db session, client, factories)
|
|
factories/ - factory-boy model factories
|
|
unit/ - Unit tests for services and utils
|
|
integration/ - Integration tests for repositories and API
|
|
alembic/
|
|
versions/ - Migration scripts
|
|
env.py - Alembic environment configuration
|
|
scripts/ - Operational scripts (backfill, cleanup, reports)
|
|
```
|
|
|
|
## Conventions
|
|
|
|
### Code Style
|
|
- Type hints on all function signatures. Use `from __future__ import annotations`.
|
|
- Use `Annotated` types with `Depends()` for FastAPI dependency injection.
|
|
- Use `async def` for all API endpoints. Use `def` for CPU-bound Celery tasks.
|
|
- Prefer `Polars` for new data transformations. Use `Pandas` only for library compatibility.
|
|
- Maximum function length: 30 lines. Extract helpers with descriptive names.
|
|
- No mutable default arguments. Use `None` with `if arg is None: arg = []`.
|
|
|
|
### Error Handling
|
|
- Custom exceptions inherit from `DataFlowError` base class.
|
|
- Services raise domain exceptions (`UserNotFoundError`, `PipelineFailedError`).
|
|
- API layer catches domain exceptions and maps to HTTP responses.
|
|
- Celery tasks use `autoretry_for` with exponential backoff for transient failures.
|
|
- Log all exceptions with full context (task ID, user ID, input parameters).
|
|
|
|
### Testing
|
|
- 85% minimum coverage. 95% on `services/` and `core/`.
|
|
- Use `factory-boy` for test data. No raw model construction in tests.
|
|
- Use `hypothesis` for property-based tests on data transformation functions.
|
|
- Integration tests run against a real PostgreSQL instance (Docker in CI).
|
|
- Async tests use `pytest-asyncio` with `asyncio_mode = "auto"`.
|
|
|
|
## Environment Variables
|
|
- `DATABASE_URL` - PostgreSQL connection string (`postgresql+asyncpg://...`)
|
|
- `REDIS_URL` - Redis connection for Celery broker and result backend
|
|
- `SECRET_KEY` - JWT signing key (256-bit random)
|
|
- `CORS_ORIGINS` - Comma-separated allowed origins
|
|
- `S3_BUCKET` - Data lake bucket for raw ingestion files
|
|
- `SENTRY_DSN` - Error tracking
|
|
- `LOG_LEVEL` - Logging level (default: `INFO`)
|
|
|
|
## Key Decisions
|
|
| Date | Decision | Rationale |
|
|
|------|----------|-----------|
|
|
| 2025-06-01 | uv over Poetry | Faster installs, better lockfile resolution |
|
|
| 2025-07-15 | Polars over Pandas | 10x faster for column operations, no GIL issues |
|
|
| 2025-08-01 | SQLAlchemy 2.0 | Async support, modern mapped_column syntax |
|
|
| 2025-09-10 | DuckDB for analytics | In-process OLAP queries, no separate cluster needed |
|
|
| 2025-11-01 | Ruff over Black+isort+flake8 | Single tool, faster, consistent configuration |
|