feat: Version 1.2 - comprehensive database migration guidelines using… (#10519)

* feat: Version 1.2 - comprehensive database migration guidelines using the Expand-Contract pattern

* Update src/backend/base/langflow/alembic/DB-MIGRATION-GUIDE.MD

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: Cleanup text and typos

* feat: Implement migration validation workflow and add migration validator scripts

* Update src/backend/base/langflow/alembic/migration_validator.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/backend/base/langflow/alembic/migration_validator.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/backend/base/langflow/alembic/migration_validator.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: moved the test_migrations directory to under tests

* feat: Added migration validator to pre-commit check.

* fix: improved test performance.

* fix: optimized attribute resolution in migration validator

* feat: add comprehensive tests for migration validator and guidelines

* fix: Lint is complaining about shebang declared but not being used.

* fix: Shebang reinstated.

* Update src/backend/base/langflow/alembic/DB-MIGRATION-GUIDE.MD

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update .github/workflows/migration-validation.yml

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* feat: color improvments

* Update .github/workflows/migration-validation.yml

Removed

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update .github/workflows/migration-validation.yml

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* ci: Created relative paths for CI

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* fix: Component index json.

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Rico Furtado
2025-11-24 18:09:11 -05:00
committed by GitHub
parent f1e45c6e9c
commit 348b1b86ce
19 changed files with 2217 additions and 2 deletions

View File

@@ -0,0 +1,159 @@
name: Database Migration Validation
on:
pull_request:
paths:
- 'src/backend/base/langflow/alembic/versions/*.py'
- 'alembic/versions/*.py'
jobs:
validate-migration:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install sqlalchemy alembic
- name: Get changed migration files
id: changed-files
run: |
# Get all changed Python files in alembic/versions directories
# CHANGED_FILES=$(git diff --name-only origin/main...HEAD | grep -E '(alembic|migrations)/versions/.*\.py$' || echo "")
# Exclude test migrations, as they are not part of the main codebase
CHANGED_FILES=$(git diff --name-only origin/main...HEAD | grep -E '(alembic|migrations)/versions/.*\.py$' | grep -v 'test_migrations/' || echo "")
if [ -z "$CHANGED_FILES" ]; then
echo "No migration files changed"
echo "files=" >> $GITHUB_OUTPUT
else
echo "Changed migration files:"
echo "$CHANGED_FILES"
# Convert newlines to spaces for passing as arguments
echo "files=$(echo $CHANGED_FILES | tr '\n' ' ')" >> $GITHUB_OUTPUT
fi
- name: Validate migrations
if: steps.changed-files.outputs.files != ''
run: |
python src/backend/base/langflow/alembic/migration_validator.py ${{ steps.changed-files.outputs.files }}
# - name: Check migration phase sequence
# if: steps.changed-files.outputs.files != ''
# run: |
# python scripts/check_phase_sequence.py ${{ steps.changed-files.outputs.files }}
- name: Generate validation report
if: always() && steps.changed-files.outputs.files != ''
run: |
python src/backend/base/langflow/alembic/migration_validator.py \
--json ${{ steps.changed-files.outputs.files }} > validation-report.json || true
- name: Post PR comment with results
if: always() && steps.changed-files.outputs.files != ''
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
let message = '';
let validationPassed = true;
try {
const report = JSON.parse(fs.readFileSync('validation-report.json', 'utf8'));
for (const result of report) {
if (!result.valid) {
validationPassed = false;
}
}
if (validationPassed) {
message = `✅ **Migration Validation Passed**\n\n`;
message += `All migrations follow the Expand-Contract pattern correctly.\n\n`;
} else {
message = `❌ **Migration Validation Failed**\n\n`;
message += `Your migrations don't follow the Expand-Contract pattern.\n\n`;
for (const result of report) {
if (!result.valid || result.warnings.length > 0) {
message += `### File: \`${result.file.split('/').pop()}\`\n`;
message += `**Phase:** ${result.phase}\n\n`;
if (result.violations && result.violations.length > 0) {
message += `**Violations:**\n`;
for (const v of result.violations) {
message += `- Line ${v.line}: ${v.message}\n`;
}
message += `\n`;
}
if (result.warnings && result.warnings.length > 0) {
message += `**Warnings:**\n`;
for (const w of result.warnings) {
message += `- Line ${w.line}: ${w.message}\n`;
}
message += `\n`;
}
}
}
message += `### 📚 Resources\n`;
message += `- Review the [DB Migration Guide](./src/backend/base/langflow/alembic/DB-MIGRATION-GUIDE.MD)\n`;
message += `- Use \`python scripts/generate_migration.py --help\` to generate compliant migrations\n\n`;
message += `### Common Issues & Solutions\n`;
message += `- **New columns must be nullable:** Add \`nullable=True\` or \`server_default\`\n`;
message += `- **Missing phase marker:** Add \`Phase: EXPAND/MIGRATE/CONTRACT\` to docstring\n`;
message += `- **Column drops:** Only allowed in CONTRACT phase\n`;
message += `- **Direct renames:** Use expand-contract pattern instead\n`;
}
} catch (error) {
message = `⚠️ **Migration validation check failed to run properly**\n`;
message += `Error: ${error.message}\n`;
}
// Post or update comment
const { data: comments } = await github.rest.issues.listComments({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
});
const botComment = comments.find(comment =>
comment.user.type === 'Bot' &&
comment.body.includes('Migration Validation')
);
if (botComment) {
await github.rest.issues.updateComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: botComment.id,
body: message
});
} else {
await github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: message
});
}
// Fail the workflow if validation didn't pass
if (!validationPassed) {
core.setFailed('Migration validation failed');
}

View File

@@ -25,6 +25,21 @@ repos:
language: system
types_or: [python, pyi]
args: [--config, pyproject.toml]
- id: validate-migrations
name: Validate Alembic Migrations (Expand-Contract)
entry: python src/backend/base/langflow/alembic/migration_validator.py
language: python
files: (alembic|migrations)/versions/.*\.py$
additional_dependencies: [sqlalchemy, alembic]
pass_filenames: true
always_run: false
verbose: true
- id: check-migration-phase
name: Check Migration Phase Documentation
entry: python -c "import sys, re; content = open(sys.argv[1]).read(); sys.exit(0 if re.search(r'Phase:\s*(EXPAND|MIGRATE|CONTRACT)', content) else 1)"
language: python
files: (alembic|migrations)/versions/.*\.py$
pass_filenames: true
- repo: https://github.com/Yelp/detect-secrets
rev: v1.5.0
hooks:

10
.vscode/launch.json vendored
View File

@@ -1,6 +1,7 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug Backend",
"type": "debugpy",
@@ -79,6 +80,13 @@
"purpose": ["debug-test"],
"console": "integratedTerminal",
"justMyCode": false
}
},
{
"name": "Python Debugger: Python File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
}
]
}

View File

@@ -0,0 +1,267 @@
"""Generate Expand-Contract pattern compliant Alembic migrations."""
import hashlib # noqa: F401
import random # noqa: F401
import re # noqa: F401
import subprocess # noqa: F401
from datetime import datetime # noqa: F401
from pathlib import Path # noqa: F401
from typing import Optional # noqa: F401
import click # noqa: F401
TEMPLATES = {
"expand": '''"""
{description}
Phase: EXPAND
Safe to rollback: YES
Services compatible: All versions
Next phase: MIGRATE after all services deployed
Revision ID: {revision}
Revises: {down_revision}
Create Date: {create_date}
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text, inspect
# revision identifiers, used by Alembic
revision = '{revision}'
down_revision = {down_revision}
branch_labels = None
depends_on = None
def upgrade() -> None:
"""
EXPAND PHASE: Add new schema elements (backward compatible)
- All new columns must be nullable or have defaults
- No breaking changes to existing schema
- Services using old schema continue to work
"""
bind = op.get_bind()
inspector = inspect(bind)
# Get existing columns for idempotency
columns = [col['name'] for col in inspector.get_columns('{table_name}')]
}
# Add new nullable column (always check existence first)
if '{column_name}' not in columns:
op.add_column('{table_name}',
sa.Column('{column_name}', sa.{column_type}(), nullable=True{default_value})
)
print(f"✅ Added column '{column_name}' to table '{table_name}'")
# Optional: Add index for performance
# op.create_index('ix_{table_name}_{column_name}', '{table_name}', ['{column_name}'])
else:
print(f"⏭️ Column '{column_name}' already exists in table '{table_name}'")
# Verify the change
result = bind.execute(text(
"SELECT COUNT(*) as cnt FROM {table_name}"
)).first()
print(f"📊 EXPAND phase complete for {{result.cnt}} rows in {table_name}")
def downgrade() -> None:
"""
Rollback EXPAND phase
- Safe to rollback as it only removes additions
- Check for data loss before dropping
"""
bind = op.get_bind()
inspector = inspect(bind)
columns = [col['name'] for col in inspector.get_columns('{table_name}')]
if '{column_name}' in columns:
# Check if column has data
result = bind.execute(text("""
SELECT COUNT(*) as cnt FROM {table_name}
WHERE {column_name} IS NOT NULL
""")).first()
if result and result.cnt > 0:
print(f"⚠️ Warning: Dropping column '{column_name}' with {{result.cnt}} non-null values")
# Optional: Create backup table
backup_table = '_{table_name}_{column_name}_backup_' + datetime.now().strftime('%Y%m%d_%H%M%S')
bind.execute(text(f"""
CREATE TABLE {{backup_table}} AS
SELECT id, {column_name}, NOW() as backed_up_at
FROM {table_name}
WHERE {column_name} IS NOT NULL
"""))
print(f"💾 Created backup table: {{backup_table}}")
op.drop_column('{table_name}', '{column_name}')
print(f"✅ Dropped column '{column_name}' from table '{table_name}'")
else:
print(f"⏭️ Column '{column_name}' doesn't exist in table '{table_name}'")
''',
"migrate": '''"""
{description}
Phase: MIGRATE
Safe to rollback: PARTIAL (data migration may be lost)
Services compatible: Both old and new versions
Next phase: CONTRACT after 30+ days and full adoption
Revision ID: {revision}
Revises: {down_revision}
Create Date: {create_date}
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text
from datetime import datetime
# revision identifiers, used by Alembic
revision = '{revision}'
down_revision = {down_revision}
branch_labels = None
depends_on = None
def upgrade() -> None:
"""
MIGRATE PHASE: Transition data to new schema
- Backfill data from old columns to new
- Both old and new columns coexist
- Services can use either column
"""
bind = op.get_bind()
print("🔄 Starting data migration...")
# Backfill data from old column to new (if applicable)
{migration_logic}
# Report migration progress
result = bind.execute(text("""
SELECT
COUNT(*) FILTER (WHERE {new_column} IS NOT NULL) as migrated,
COUNT(*) FILTER (WHERE {new_column} IS NULL) as not_migrated,
COUNT(*) as total
FROM {table_name}
""")).first()
print(f"📊 Migration Statistics:")
print(f" - Total rows: {{result.total}}")
print(f" - Migrated: {{result.migrated}} ({{result.migrated * 100 / result.total if result.total > 0 else 0:.1f}}%)")
print(f" - Not migrated: {{result.not_migrated}}")
if result.not_migrated > 0:
print(f"⚠️ WARNING: {{result.not_migrated}} rows not yet migrated")
print(f" Consider running a background job to complete migration")
else:
print(f"✅ All rows successfully migrated")
# Log migration completion
bind.execute(text("""
INSERT INTO alembic_version_history (version_num, phase, completed_at)
VALUES (:version, 'MIGRATE', :timestamp)
ON CONFLICT (version_num) DO UPDATE
SET phase = 'MIGRATE', completed_at = :timestamp
"""), {{"version": revision, "timestamp": datetime.now()}})
def downgrade() -> None:
"""
Rollback MIGRATE phase
- Usually no action needed
- Data remains in both old and new columns
"""
print("⚠️ MIGRATE phase rollback - data remains in both columns")
print(" Services can continue using either old or new schema")
# Optional: Log rollback
bind = op.get_bind()
bind.execute(text("""
UPDATE alembic_version_history
SET phase = 'MIGRATE_ROLLED_BACK', completed_at = NOW()
WHERE version_num = :version
"""), {{"version": revision}})
''', # noqa: E501
"contract": '''"""
{description}
Phase: CONTRACT
Safe to rollback: NO (old schema removed)
Services compatible: New versions only
Prerequisites: All services using new schema for 30+ days
Revision ID: {revision}
Revises: {down_revision}
Create Date: {create_date}
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy import text, inspect
from datetime import datetime, timedelta
# revision identifiers, used by Alembic
revision = '{revision}'
down_revision = {down_revision}
branch_labels = None
depends_on = None
# Configuration
MIN_MIGRATION_DAYS = 30 # Minimum days before contracting
def upgrade() -> None:
"""
CONTRACT PHASE: Remove old schema elements
- Verify all services have migrated
- Ensure data migration is complete
- Remove deprecated columns/tables
- Make new columns non-nullable if needed
"""
bind = op.get_bind()
inspector = inspect(bind)
print("🔍 Verifying migration readiness...")
# Check 1: Verify migration completion
{verification_checks}
# Check 2: Verify no recent usage of old column (if monitoring is set up)
try:
result = bind.execute(text("""
SELECT MAX(last_accessed) as last_use
FROM column_usage_stats
WHERE table_name = '{table_name}'
AND column_name = '{old_column}'
""")).first()
if result and result.last_use:
days_since_use = (datetime.now() - result.last_use).days
if days_since_use < MIN_MIGRATION_DAYS:
raise Exception(
f"❌ Cannot contract: old column used {{days_since_use}} days ago "
f"(minimum: {{MIN_MIGRATION_DAYS}} days)"
)
print(f"✅ Old column last used {{days_since_use}} days ago")
except Exception as e:
if "column_usage_stats" not in str(e):
raise
print("⏭️ No usage tracking table found, skipping usage check")
# Check 3: Create final backup before removing
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_table = 'backup_{table_name}_{old_column}_' + timestamp
print(f"💾 Creating final backup: {{backup_table}}")
bind.execute(text(f"""
CREATE TABLE {{backup_table}} AS
SELECT * FROM {table_name}
WHERE {old_column} IS NOT NULL
LIMIT 10000 -- Limit backup size
"""))
# Remove old column
columns = [col['name'] for col in inspector.get_columns('{table_name}')]
''',
}

223
scripts/test_validator.py Normal file
View File

@@ -0,0 +1,223 @@
"""Test script for migration validator."""
import os
import sys
import tempfile
from pathlib import Path
# Add parent directory to path
sys.path.append(str(Path(__file__).parent.parent))
from src.backend.base.langflow.alembic.migration_validator import MigrationValidator
def create_test_migration(content: str, filename: str) -> Path:
"""Create a temporary migration file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=filename, delete=False) as f:
f.write(content)
return Path(f.name)
def test_expand_phase():
"""Test EXPAND phase validations."""
print("\n🧪 Testing EXPAND Phase Validations...")
# Test: Good EXPAND migration
good_expand = '''"""
Description: Add email_verified column
Phase: EXPAND
Safe to rollback: YES
Revision ID: test_expand_good
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col['name'] for col in inspector.get_columns('users')]
if 'email_verified' not in columns:
op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=True))
def downgrade():
op.drop_column('users', 'email_verified')
'''
# Test: Bad EXPAND migration
bad_expand = '''"""
Description: Add required column
Phase: EXPAND
Revision ID: test_expand_bad
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
# Missing existence check and non-nullable
op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=False))
# Dropping column in EXPAND phase
op.drop_column('users', 'old_column')
def downgrade():
pass
'''
validator = MigrationValidator()
# Test good migration
good_file = create_test_migration(good_expand, "good_expand.py")
result = validator.validate_migration_file(good_file)
print(f" ✅ Good EXPAND: Valid={result['valid']} (expected: True)")
assert result["valid"], "Good EXPAND should pass" # noqa: S101
os.unlink(good_file) # noqa: PTH108
# Test bad migration
bad_file = create_test_migration(bad_expand, "bad_expand.py")
result = validator.validate_migration_file(bad_file)
print(f" ✅ Bad EXPAND: Valid={result['valid']} (expected: False)")
print(f" Violations: {len(result['violations'])}")
for v in result["violations"]:
print(f" - {v['type']}: {v['message']}")
assert not result["valid"], "Bad EXPAND should fail" # noqa: S101
os.unlink(bad_file) # noqa: PTH108
def test_contract_phase():
"""Test CONTRACT phase validations."""
print("\n🧪 Testing CONTRACT Phase Validations...")
good_contract = '''"""
Description: Remove old column
Phase: CONTRACT
Revision ID: test_contract_good
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
bind = op.get_bind()
# Check data migration is complete
result = bind.execute(sa.text("""
SELECT COUNT(*) as cnt FROM users
WHERE old_email IS NOT NULL AND new_email IS NULL
""")).first()
if result.cnt > 0:
raise Exception(f"Cannot contract: {result.cnt} rows not migrated")
op.drop_column('users', 'old_email')
def downgrade():
raise NotImplementedError("Cannot rollback CONTRACT phase")
'''
validator = MigrationValidator()
good_file = create_test_migration(good_contract, "good_contract.py")
result = validator.validate_migration_file(good_file)
print(f" ✅ Good CONTRACT: Valid={result['valid']} (expected: True)")
os.unlink(good_file) # noqa: PTH108
def test_phase_detection():
"""Test phase detection from different formats."""
print("\n🧪 Testing Phase Detection...")
test_cases = [
("Phase: EXPAND", "EXPAND"),
("phase: migrate", "MIGRATE"),
("PHASE: CONTRACT", "CONTRACT"),
("No phase marker", "UNKNOWN"),
]
validator = MigrationValidator()
for content_marker, expected_phase in test_cases:
content = f'''"""
Migration description
{content_marker}
"""
def upgrade(): pass
def downgrade(): pass
'''
file = create_test_migration(content, "phase_test.py")
result = validator.validate_migration_file(file)
detected_phase = result["phase"]
print(f"'{content_marker}'{detected_phase} (expected: {expected_phase})")
assert detected_phase == expected_phase, f"Phase detection failed for {content_marker}" # noqa: S101
os.unlink(file) # noqa: PTH108
def test_common_mistakes():
"""Test detection of common migration mistakes."""
print("\n🧪 Testing Common Mistake Detection...")
mistakes = {
"Direct rename": """
def upgrade():
op.rename_column('users', 'email', 'email_address')
""",
"Direct type change": """
def upgrade():
op.alter_column('users', 'age', type_=sa.Integer())
""",
"Non-nullable without default": """
def upgrade():
op.add_column('users', sa.Column('required_field', sa.String(), nullable=False))
""",
}
validator = MigrationValidator()
for mistake_name, code in mistakes.items():
content = f'''"""
Test: {mistake_name}
Phase: EXPAND
"""
from alembic import op
import sqlalchemy as sa
{code}
def downgrade(): pass
'''
file = create_test_migration(content, f"{mistake_name}.py")
result = validator.validate_migration_file(file)
print(f"{mistake_name}: Detected={not result['valid']}")
assert not result["valid"], f"Should detect {mistake_name}" # noqa: S101
os.unlink(file) # noqa: PTH108
def main():
print("=" * 60)
print("🚀 Migration Validator Test Suite")
print("=" * 60)
try:
test_expand_phase()
test_contract_phase()
test_phase_detection()
test_common_mistakes()
print("\n" + "=" * 60)
print("✅ All tests passed!")
print("=" * 60)
except AssertionError as e:
print(f"\n❌ Test failed: {e}")
sys.exit(1)
except (OSError, ImportError) as e:
print(f"\n❌ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,48 @@
"""Test migrations with actual database."""
import sqlite3
import tempfile
from alembic import command
from alembic.config import Config
def test_real_migration():
"""Test migration with actual SQLite database."""
# Create temporary database
with tempfile.NamedTemporaryFile(suffix=".db") as tmp:
db_path = tmp.name
# Create test table
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT,
old_email TEXT
)
""")
conn.commit()
# Create alembic.ini
alembic_cfg = Config()
alembic_cfg.set_main_option("script_location", "src/backend/base/langflow/alembic")
alembic_cfg.set_main_option("sqlalchemy.url", f"sqlite:///{db_path}")
# Run migration
try:
command.upgrade(alembic_cfg, "head")
print("✅ Migration executed successfully")
except RuntimeError as e:
print(f"❌ Migration failed: {e}")
# Verify schema
cursor = conn.execute("PRAGMA table_info(users)")
columns = [row[1] for row in cursor.fetchall()]
print(f"Columns after migration: {columns}")
conn.close()
if __name__ == "__main__":
test_real_migration()

View File

@@ -0,0 +1,513 @@
# Database Migration Guidelines: Expand-Contract Pattern
## Overview
This guide outlines our approach to database migrations in a multi-service architecture where multiple services with different versions share the same database. We follow the **Expand-Contract Pattern** to ensure zero-downtime deployments and maintain N-1 version compatibility.
## Table of Contents
1. [Core Principles](#core-principles)
2. [The Expand-Contract Pattern](#the-expand-contract-pattern)
3. [Migration Phases](#migration-phases)
4. [Implementation Guidelines](#implementation-guidelines)
5. [Safety Checks](#safety-checks)
6. [Rollback Procedures](#rollback-procedures)
7. [Best Practices](#best-practices)
8. [Anti-Patterns](#anti-patterns)
9. [Examples](#examples)
10. [Monitoring](#monitoring)
## Core Principles
### N-1 Version Support
- The database schema must support at least two consecutive versions of each service
- Never introduce breaking changes that would prevent older service versions from functioning
- All schema changes must be backward compatible
### Zero-Downtime Deployments
- Database migrations should not require service downtime
- Services should continue operating during and after migration
- Rollback should be possible without data loss
## The Expand-Contract Pattern
The Expand-Contract pattern consists of three phases:
```mermaid
flowchart LR
A[Current Schema]
B["**EXPAND: Add New Schema**"]
C["**MIGRATE: Transition Data**"]
D["**CONTRACT: Remove Old Schema**"]
A --> B
B --> C
C --> D
%% basic neutral colors
style B fill:#f2f2f2,stroke:#333,stroke-width:1.5px,color:#000
style C fill:#d9eaff,stroke:#333,stroke-width:1.5px,color:#000
style D fill:#d5f5d5,stroke:#333,stroke-width:1.5px,color:#000
```
**Diagram Description:**
The Expand-Contract migration pattern consists of three phases:
- **Expand:** Add new schema elements (shown in neutral grey).
- **Migrate:** Transition data and service usage (shown in light blue).
- **Contract:** Remove old schema elements after full adoption (shown in light green).
Each phase flows sequentially: Current Schema → Expand → Migrate → Contract.
### Phase Timeline
Considering a N days Contract cycle
| Phase | Duration | Description |
|-------|----------|-------------|
| **Expand** | Day 1 | Add new schema elements (backward compatible) |
| **Migrate** | Days 2-N | Update services, backfill data, monitor adoption |
| **Contract** | Day N+1 | Remove deprecated schema (only after full adoption) |
## Migration Phases
### Phase 1: EXPAND (Non-Breaking Addition)
**Goal**: Add new schema elements without breaking existing services
```python
def upgrade() -> None:
"""
EXPAND PHASE: Add new schema elements as nullable/optional
"""
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col['name'] for col in inspector.get_columns('table_name')]
# Always check existence to ensure idempotency
if 'new_column' not in columns:
# CRITICAL: Use nullable=True for backward compatibility
op.add_column('table_name', sa.Column('new_column', sa.String(), nullable=True))
# Optional: Add index for performance
op.create_index('ix_table_new_column', 'table_name', ['new_column'])
```
**Service Compatibility**:
- ✅ Old services: Continue working (ignore new column)
- ✅ New services: Can start using new column immediately
### Phase 2: MIGRATE (Data Transition)
**Goal**: Gradually transition all services and data to use new schema
**Database Operations**:
```sql
-- Backfill existing data if needed
UPDATE table_name
SET new_column = old_column
WHERE new_column IS NULL AND old_column IS NOT NULL;
-- Monitor adoption
SELECT
COUNT(*) FILTER (WHERE new_column IS NOT NULL) as using_new,
COUNT(*) FILTER (WHERE new_column IS NULL) as not_using_new
FROM table_name
WHERE created_at > NOW() - INTERVAL '1 day';
```
**Service Code Pattern**:
```python
class ServiceAdapter:
def read_data(self, row):
"""Handle both old and new schema"""
return {
'id': row.id,
# Gracefully handle missing columns
'new_field': getattr(row, 'new_column', None),
# Fallback to old column if new doesn't exist
'data': row.new_column or row.old_column
}
def write_data(self, data):
"""Write to both old and new schema during transition"""
return Model(
old_column=data, # Keep writing to old column
new_column=data # Also write to new column
)
```
### Phase 3: CONTRACT (Cleanup)
**Goal**: Remove deprecated schema elements after all services have migrated
**Prerequisites Checklist**:
- [ ] All services deployed with new schema support
- [ ] No queries using old columns in past 30 days
- [ ] Data migration completed and verified
- [ ] Backup of deprecated data created
```python
def upgrade_contract_phase() -> None:
"""
CONTRACT PHASE: Remove old schema (only after full migration)
"""
# Verify no services are using old column
bind = op.get_bind()
# Safety check: Log usage before removal
result = bind.execute(sa.text("""
SELECT COUNT(*) as cnt FROM table_name
WHERE old_column IS NOT NULL AND new_column IS NULL
""")).first()
if result and result.cnt > 0:
raise Exception(f"Cannot contract: {result.cnt} rows still depend on old_column")
# Safe to remove
op.drop_column('table_name', 'old_column')
```
## Implementation Guidelines
### Adding Columns
✅ **DO**:
```python
# Always nullable for new columns
op.add_column('message', sa.Column('context_id', sa.String(), nullable=True))
# Add default value if needed (database-level)
op.add_column('message', sa.Column('status', sa.String(),
nullable=True, server_default='pending'))
```
❌ **DON'T**:
```python
# Never add required columns directly
op.add_column('message', sa.Column('context_id', sa.String(), nullable=False))
```
### Removing Columns
✅ **DO**:
1. Stop writing to the column (application code)
2. Stop reading from the column (application code)
3. Wait for all services to update (monitor for 30+ days)
4. Remove column in separate migration
❌ **DON'T**:
- Remove columns immediately after adding replacements
- Drop columns that might be in use by any service version
### Renaming Columns
✅ **DO**:
1. Add new column with desired name
2. Copy data from old to new column
3. Update services to use new column
4. Remove old column (after migration period)
❌ **DON'T**:
```python
# Never rename directly - breaks old services
op.alter_column('table_name', 'old_name', new_column_name='new_name')
```
### Changing Column Types
✅ **DO**:
1. Add new column with desired type
2. Migrate data with type conversion
3. Switch services to new column
4. Drop old column after transition
## Safety Checks
### Pre-Migration Checks
```python
def pre_migration_checks():
"""Run before applying migration"""
bind = op.get_bind()
# Check table size for performance impact
result = bind.execute(sa.text(
"SELECT COUNT(*) as cnt FROM table_name"
)).first()
if result.cnt > 1000000:
print(f"WARNING: Large table ({result.cnt} rows) - migration may be slow")
# Check for running transactions
result = bind.execute(sa.text("""
SELECT COUNT(*) FROM pg_stat_activity
WHERE state = 'active' AND query_start < NOW() - INTERVAL '5 minutes'
""")).first()
if result[0] > 0:
print(f"WARNING: {result[0]} long-running transactions detected")
```
### Post-Migration Validation
```python
def post_migration_validation():
"""Verify migration succeeded"""
bind = op.get_bind()
inspector = sa.inspect(bind)
# Verify column exists
columns = [col['name'] for col in inspector.get_columns('table_name')]
assert 'new_column' in columns, "Migration failed: column not added"
# Verify data integrity
result = bind.execute(sa.text("""
SELECT COUNT(*) FROM table_name
WHERE new_column IS NOT NULL
""")).first()
print(f"Migration complete: {result[0]} rows have new_column populated")
```
## Rollback Procedures
### Safe Rollback Checklist
Before rolling back, verify:
- [ ] No services depend solely on new schema
- [ ] No critical data exists only in new columns
- [ ] Rollback migration has been tested in staging
### Rollback Implementation
```python
def downgrade() -> None:
"""
Safe rollback with data preservation
"""
bind = op.get_bind()
inspector = sa.inspect(bind)
# Check for data that would be lost
result = bind.execute(sa.text("""
SELECT COUNT(*) as cnt FROM table_name
WHERE new_column IS NOT NULL
""")).first()
if result and result.cnt > 0:
# Backup data before dropping
bind.execute(sa.text("""
CREATE TABLE IF NOT EXISTS table_name_backup AS
SELECT id, new_column, NOW() as backed_up_at
FROM table_name WHERE new_column IS NOT NULL
"""))
print(f"Backed up {result.cnt} rows to table_name_backup")
# Safe to drop column
columns = [col['name'] for col in inspector.get_columns('table_name')]
if 'new_column' in columns:
op.drop_column('table_name', 'new_column')
```
## Best Practices
### 1. Always Use Idempotent Migrations
```python
# Check existence before adding/dropping
if 'column_name' not in columns:
op.add_column(...)
if 'column_name' in columns:
op.drop_column(...)
```
### 2. Document Migration Phases
```python
"""
Migration: Add context_id to message table
Phase: EXPAND
Safe to rollback: YES
Services compatible: All versions
Next phase: MIGRATE after all services deployed
Revision ID: 182e5471b900
"""
```
### 3. Use Feature Flags for Service Transitions
```python
class MessageService:
def process_message(self, message):
if feature_flags.is_enabled('use_context_id'):
# New logic using context_id
return self._process_with_context(message)
else:
# Old logic without context_id
return self._process_legacy(message)
```
### 4. Monitor Migration Progress
```sql
-- Create monitoring view
CREATE VIEW migration_progress AS
SELECT
'context_id_adoption' as migration,
COUNT(*) FILTER (WHERE context_id IS NOT NULL) * 100.0 / COUNT(*) as percentage_complete,
COUNT(*) as total_records,
MAX(updated_at) as last_update
FROM message
WHERE created_at > NOW() - INTERVAL '7 days';
```
## Anti-Patterns
### ❌ Breaking Changes Without Migration Path
```python
# DON'T: This breaks existing services
op.alter_column('message', 'content', nullable=False)
```
### ❌ Immediate Schema Contraction
```python
# DON'T: Remove old schema in same migration as adding new
def upgrade():
op.add_column('table', sa.Column('new_col', ...))
op.drop_column('table', 'old_col') # Services still using this!
```
### ❌ Data Type Changes Without Migration
```python
# DON'T: Direct type change can fail or corrupt data
op.alter_column('table', 'amount', type_=sa.Integer()) # Was String
```
### ❌ Assuming Service Deployment Order
```python
# DON'T: Assume all services update simultaneously
if datetime.now() > deployment_date:
op.drop_column('table', 'old_column') # Some services might be delayed!
```
## Examples
### Example 1: Adding a New Required Field
```python
# Migration 1: EXPAND (Day 1)
def upgrade_expand():
op.add_column('user', sa.Column('email_verified', sa.Boolean(),
nullable=True, server_default='false'))
# Migration 2: MIGRATE (Day 30, after all services updated)
def upgrade_migrate():
# Backfill any NULL values
op.execute("UPDATE user SET email_verified = false WHERE email_verified IS NULL")
# Migration 3: CONTRACT (Day 60, after verification)
def upgrade_contract():
op.alter_column('user', 'email_verified', nullable=False)
```
### Example 2: Replacing a Column
```python
# Migration 1: Add new column
def upgrade_phase1():
op.add_column('order', sa.Column('status_code', sa.Integer(), nullable=True))
# Copy data from old column
op.execute("""
UPDATE order SET status_code =
CASE status_text
WHEN 'pending' THEN 1
WHEN 'processing' THEN 2
WHEN 'complete' THEN 3
ELSE 0
END
""")
# Migration 2: Remove old column (after transition period)
def upgrade_phase2():
op.drop_column('order', 'status_text')
```
## Monitoring
### Service Version Tracking
```sql
-- Track which services are using new schema
CREATE TABLE service_schema_usage (
service_name VARCHAR(100),
schema_version VARCHAR(50),
last_seen TIMESTAMP DEFAULT NOW(),
uses_new_schema BOOLEAN DEFAULT FALSE
);
-- Update from application
INSERT INTO service_schema_usage (service_name, schema_version, uses_new_schema)
VALUES ('user-service', 'v2.1.0', true)
ON CONFLICT (service_name)
DO UPDATE SET
schema_version = EXCLUDED.schema_version,
last_seen = NOW(),
uses_new_schema = EXCLUDED.uses_new_schema;
```
### Migration Health Dashboard Queries
```sql
-- Check migration adoption rate
SELECT
migration_name,
adopted_services,
total_services,
(adopted_services * 100.0 / total_services) as adoption_percentage,
days_since_deployment
FROM migration_tracking
WHERE is_active = true
ORDER BY days_since_deployment DESC;
```sql
-- Identify services not yet migrated
SELECT
s.service_name,
s.version,
s.last_deployment,
m.migration_name
FROM services s
CROSS JOIN active_migrations m
WHERE NOT EXISTS (
SELECT 1 FROM service_migrations sm
WHERE sm.service_name = s.service_name
AND sm.migration_name = m.migration_name
);
```
## Conclusion
Following the Expand-Contract pattern ensures:
- ✅ Zero-downtime deployments
- ✅ Safe rollback capabilities
- ✅ N-1 version compatibility
- ✅ Gradual, monitored transitions
- ✅ No data loss during migrations
Remember: **When in doubt, expand first, migrate slowly, and contract only when certain.**
## References
- [Evolutionary Database Design - Martin Fowler](https://martinfowler.com/articles/evodb.html)
- [Zero-Downtime Database Migrations](https://www.brunton-spall.co.uk/post/2014/05/06/database-migrations-done-right/)
- [Alembic Documentation](https://alembic.sqlalchemy.org/)

View File

@@ -0,0 +1,376 @@
"""Migration Validator - Enforces Expand-Contract Pattern for Alembic migrations."""
import ast
import json
import re
import sys
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any
class MigrationPhase(Enum):
EXPAND = "EXPAND"
MIGRATE = "MIGRATE"
CONTRACT = "CONTRACT"
UNKNOWN = "UNKNOWN"
@dataclass
class Violation:
type: str
message: str
line: int
severity: str = "error" # error or warning
class MigrationValidator:
"""Validates Alembic migrations follow Expand-Contract pattern."""
VIOLATIONS = {
"BREAKING_ADD_COLUMN": "Adding non-nullable column without default",
"DIRECT_RENAME": "Direct column rename detected",
"DIRECT_TYPE_CHANGE": "Direct type alteration detected",
"IMMEDIATE_DROP": "Dropping column without migration phase",
"MISSING_IDEMPOTENCY": "Migration not idempotent",
"NO_PHASE_MARKER": "Migration missing phase documentation",
"UNSAFE_ROLLBACK": "Downgrade may cause data loss",
"MISSING_DOWNGRADE": "Downgrade function not implemented",
"INVALID_PHASE_OPERATION": "Operation not allowed in this phase",
"NO_EXISTENCE_CHECK": "Operation should check existence first",
"MISSING_DATA_CHECK": "CONTRACT phase should verify data migration",
}
def __init__(self, *, strict_mode: bool = True):
self.strict_mode = strict_mode
### Main validation method - it's a template method Go4 style.###
def validate_migration_file(self, filepath: Path) -> dict[str, Any]:
"""Validate a single migration file."""
if not filepath.exists():
return {
"file": str(filepath),
"valid": False,
"violations": [Violation("FILE_NOT_FOUND", f"File not found: {filepath}", 0)],
"warnings": [],
}
content = filepath.read_text()
try:
tree = ast.parse(content)
except SyntaxError as e:
return {
"file": str(filepath),
"valid": False,
"violations": [Violation("SYNTAX_ERROR", str(e), e.lineno or 0)],
"warnings": [],
}
violations = []
warnings = []
# Check for phase documentation
phase = self._extract_phase(content)
if phase == MigrationPhase.UNKNOWN:
violations.append(
Violation("NO_PHASE_MARKER", "Migration must specify phase: EXPAND, MIGRATE, or CONTRACT", 1)
)
# Check upgrade function
upgrade_node = self._find_function(tree, "upgrade")
if upgrade_node:
phase_violations = self._check_upgrade_operations(upgrade_node, phase)
violations.extend(phase_violations)
else:
violations.append(Violation("MISSING_UPGRADE", "Migration must have an upgrade() function", 1))
# Check downgrade function
downgrade_node = self._find_function(tree, "downgrade")
if downgrade_node:
downgrade_issues = self._check_downgrade_safety(downgrade_node, phase)
warnings.extend(downgrade_issues)
elif phase != MigrationPhase.CONTRACT: # CONTRACT phase may not support rollback
violations.append(Violation("MISSING_DOWNGRADE", "Migration must have a downgrade() function", 1))
# Additional phase-specific checks
if phase == MigrationPhase.CONTRACT:
contract_issues = self._check_contract_phase_requirements(content)
violations.extend(contract_issues)
return {
"file": str(filepath),
"valid": len(violations) == 0,
"violations": [v.__dict__ for v in violations],
"warnings": [w.__dict__ for w in warnings],
"phase": phase.value,
}
# Method to check DB operations constraints imposed by phases -
# New constraint requirements should be added here
def _check_upgrade_operations(self, node: ast.FunctionDef, phase: MigrationPhase) -> list[Violation]:
"""Check upgrade operations for violations."""
violations = []
for child in ast.walk(node):
if isinstance(child, ast.Call):
if self._is_op_call(child, "add_column"):
violations.extend(self._check_add_column(child, phase, node))
elif self._is_op_call(child, "alter_column"):
violations.extend(self._check_alter_column(child, phase))
elif self._is_op_call(child, "drop_column"):
violations.extend(self._check_drop_column(child, phase))
elif self._is_op_call(child, "rename_table") or self._is_op_call(child, "rename_column"):
violations.append(
Violation("DIRECT_RENAME", "Use expand-contract pattern instead of direct rename", child.lineno)
)
return violations
def _check_add_column(self, call: ast.Call, phase: MigrationPhase, func_node: ast.FunctionDef) -> list[Violation]:
"""Check add_column operations."""
violations = []
# Check if column is nullable or has default
if not self._has_nullable_true(call) and not self._has_server_default(call):
violations.append(
Violation(
"BREAKING_ADD_COLUMN", "New columns must be nullable=True or have server_default", call.lineno
)
)
# Check for idempotency
if not self._has_existence_check_nearby(func_node, call):
violations.append(
Violation(
"NO_EXISTENCE_CHECK", "add_column should check if column exists first (idempotency)", call.lineno
)
)
# Phase-specific checks
if phase == MigrationPhase.CONTRACT:
violations.append(Violation("INVALID_PHASE_OPERATION", "Cannot add columns in CONTRACT phase", call.lineno))
return violations
def _check_alter_column(self, call: ast.Call, phase: MigrationPhase) -> list[Violation]:
"""Check alter_column operations."""
violations = []
# Check for type changes
if self._has_type_change(call) and phase != MigrationPhase.CONTRACT:
violations.append(
Violation("DIRECT_TYPE_CHANGE", "Type changes should use expand-contract pattern", call.lineno)
)
# Check for nullable changes
if self._changes_nullable_to_false(call) and phase != MigrationPhase.CONTRACT:
violations.append(
Violation(
"BREAKING_ADD_COLUMN", "Making column non-nullable only allowed in CONTRACT phase", call.lineno
)
)
return violations
def _check_drop_column(self, call: ast.Call, phase: MigrationPhase) -> list[Violation]:
"""Check drop_column operations."""
violations = []
if phase != MigrationPhase.CONTRACT:
violations.append(
Violation(
"IMMEDIATE_DROP",
f"Column drops only allowed in CONTRACT phase (current: {phase.value})",
call.lineno,
)
)
return violations
def _check_contract_phase_requirements(self, content: str) -> list[Violation]:
"""Check CONTRACT phase specific requirements."""
# Check for data migration before dropping columns
if not ("SELECT" in content and "COUNT" in content):
return [
Violation(
"MISSING_DATA_CHECK",
"CONTRACT phase should verify data migration before dropping columns",
1,
severity="warning",
)
]
return []
def _check_downgrade_safety(self, node: ast.FunctionDef, phase: MigrationPhase) -> list[Violation]:
"""Check downgrade function for safety issues."""
warnings = []
# Check if downgrade might lose data
for child in ast.walk(node):
if isinstance(child, ast.Call) and self._is_op_call(child, "alter_column"):
# Check if there's a backup mechanism
func_content = ast.unparse(node)
if "backup" not in func_content.lower() and "SELECT" not in func_content:
warnings.append(
Violation(
"UNSAFE_ROLLBACK",
"Downgrade drops column without checking/backing up data",
child.lineno,
severity="warning",
)
)
# CONTRACT phase special handling
if phase == MigrationPhase.CONTRACT:
func_content = ast.unparse(node)
if "NotImplementedError" not in func_content and "raise" not in func_content:
warnings.append(
Violation(
"UNSAFE_ROLLBACK",
"CONTRACT phase downgrade should raise NotImplementedError or handle carefully",
node.lineno,
severity="warning",
)
)
return warnings
def _is_op_call(self, call: ast.Call, method: str) -> bool:
"""Check if call is op.method()."""
func = call.func
# Avoid multiple attribute resolutions and isinstance checks
if type(func) is ast.Attribute:
val = func.value
if type(val) is ast.Name:
return val.id == "op" and func.attr == method
return False
def _has_nullable_true(self, call: ast.Call) -> bool:
"""Check if call has nullable=True."""
for keyword in call.keywords:
if keyword.arg == "nullable" and isinstance(keyword.value, ast.Constant):
return keyword.value.value is True
for call_arg in call.args:
if isinstance(call_arg, ast.Call):
return self._has_nullable_true(call_arg)
return False
def _has_server_default(self, call: ast.Call) -> bool:
"""Check if call has server_default."""
return any(kw.arg == "server_default" for kw in call.keywords)
def _has_type_change(self, call: ast.Call) -> bool:
"""Check if alter_column changes type."""
return any(kw.arg in ["type_", "type"] for kw in call.keywords)
def _changes_nullable_to_false(self, call: ast.Call) -> bool:
"""Check if alter_column sets nullable=False."""
for keyword in call.keywords:
if keyword.arg == "nullable" and isinstance(keyword.value, ast.Constant):
return keyword.value.value is False
return False
### Helper method to check for existence checks around operations.
# It looks for if statements that might be checking column existence
# TODO: Evaluate if more sophisticated analysis is needed for existence checks
def _has_existence_check_nearby(self, func_node: ast.FunctionDef, target_call: ast.Call) -> bool:
"""Check if operation is wrapped in existence check."""
# Look for if statements that might be checking column existence
for node in ast.walk(func_node):
if isinstance(node, ast.If):
# Check if this if statement contains our target call
for child in ast.walk(node):
if child == target_call:
# Check if the condition mentions columns or inspector
condition = ast.unparse(node.test)
if any(keyword in condition.lower() for keyword in ["column", "inspector", "not in", "if not"]):
return True
return False
### Helper methods ###
def _extract_phase(self, content: str) -> MigrationPhase:
"""Extract migration phase from documentation."""
# TODO: Support phase detection from inline comments and function
# annotations, not just docstrings or top-level comments.
# Look in docstring or comments
phase_pattern = r"Phase:\s*(EXPAND|MIGRATE|CONTRACT)"
match = re.search(phase_pattern, content, re.IGNORECASE)
if match:
phase_str = match.group(1).upper()
return MigrationPhase[phase_str]
return MigrationPhase.UNKNOWN
def _find_function(self, tree: ast.Module, name: str) -> ast.FunctionDef | None:
"""Find a function by name in the AST."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == name:
return node
return None
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Validate Alembic migrations")
parser.add_argument("files", nargs="+", help="Migration files to validate")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--strict", action="store_true", help="Treat warnings as errors")
args = parser.parse_args()
validator = MigrationValidator(strict_mode=args.strict)
all_valid = True
results = []
for file_path in args.files:
result = validator.validate_migration_file(Path(file_path))
results.append(result)
if not result["valid"]:
all_valid = False
if args.strict and result["warnings"]:
all_valid = False
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("migration_validator")
if args.json:
logger.info(json.dumps(results, indent=2))
else:
for result in results:
logger.info("\n%s", "=" * 60)
logger.info("File: %s", result["file"])
logger.info("Phase: %s", result["phase"])
logger.info("Valid: %s", "" if result["valid"] else "")
if result["violations"]:
logger.error("\n❌ Violations:")
for v in result["violations"]:
logger.error(" Line %s: %s - %s", v["line"], v["type"], v["message"])
if result["warnings"]:
logger.warning("\n⚠️ Warnings:")
for w in result["warnings"]:
logger.warning(" Line %s: %s - %s", w["line"], w["type"], w["message"])
sys.exit(0 if all_valid else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,24 @@
"""Test migration - BAD EXPAND phase.
This should fail validation.
Revision ID: test001
""" # noqa: N999
import sqlalchemy as sa
from alembic import op
revision = "test001"
down_revision = None
def upgrade():
# ❌ Bad: non-nullable column without default
op.add_column("users", sa.Column("email_verified", sa.Boolean(), nullable=False))
# ❌ Bad: dropping column in EXPAND phase
op.drop_column("users", "old_field")
def downgrade():
pass

View File

@@ -0,0 +1,33 @@
"""Test migration - GOOD EXPAND phase.
Phase: EXPAND
Safe to rollback: YES
Revision ID: test002
""" # noqa: N999
import sqlalchemy as sa
from alembic import op
revision = "test002"
down_revision = None
def upgrade():
"""EXPAND PHASE: Add new columns."""
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col["name"] for col in inspector.get_columns("users")]
# ✅ Good: nullable column with existence check
if "email_verified" not in columns:
op.add_column("users", sa.Column("email_verified", sa.Boolean(), nullable=True, server_default="false"))
def downgrade():
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col["name"] for col in inspector.get_columns("users")]
if "email_verified" in columns:
op.drop_column("users", "email_verified")

View File

@@ -0,0 +1,24 @@
"""Test migration - BAD CONTRACT phase.
Phase: CONTRACT
Revision ID: test003
""" # noqa: N999
import sqlalchemy as sa
from alembic import op
revision = "test003"
down_revision = None
def upgrade():
# ❌ Bad: No verification before dropping
op.drop_column("users", "old_email")
# ❌ Bad: Adding new column in CONTRACT phase
op.add_column("users", sa.Column("new_field", sa.String()))
def downgrade():
pass

View File

@@ -0,0 +1,68 @@
from pathlib import Path
import pytest
from langflow.alembic.migration_validator import MigrationValidator
class TestExistingMigrations:
"""Validate all existing migration files against the guidelines."""
def test_validation_of_test_migrations(self):
"""Verify specific test migrations (001, 002, 003) are identified correctly.
They should be identified as valid or invalid by the validator.
"""
workspace_root = Path(__file__).resolve().parents[5]
migrations_dir = workspace_root / "src/backend/base/langflow/alembic/versions"
if not migrations_dir.exists():
pytest.fail(f"Migrations directory not found at {migrations_dir}")
validator = MigrationValidator(strict_mode=False)
# 1. Test Good Expansion
good_expand = migrations_dir / "002_good_expand0.py"
if good_expand.exists():
result = validator.validate_migration_file(good_expand)
assert result["valid"] is True, f"002_good_expand0.py should be valid but got: {result['violations']}"
# 2. Test Bad Expansion
bad_expand = migrations_dir / "001_bad_expand0.py"
if bad_expand.exists():
result = validator.validate_migration_file(bad_expand)
assert result["valid"] is False, "001_bad_expand0.py should be invalid"
violations = [v["type"] for v in result["violations"]]
assert "BREAKING_ADD_COLUMN" in violations
assert "IMMEDIATE_DROP" in violations
# 3. Test Bad Contract
bad_contract = migrations_dir / "003_bad_contract0.py"
if bad_contract.exists():
result = validator.validate_migration_file(bad_contract)
assert result["valid"] is False, "003_bad_contract0.py should be invalid"
violations = [v["type"] for v in result["violations"]]
assert "INVALID_PHASE_OPERATION" in violations
# The validator currently flags MISSING_DATA_CHECK as a violation in strict mode
# or if added to violations list
assert "MISSING_DATA_CHECK" in violations
def test_legacy_migrations_flagged(self):
"""Ensure legacy migrations are flagged for missing phase markers.
This confirms the validator catches them.
"""
workspace_root = Path(__file__).resolve().parents[5]
migrations_dir = workspace_root / "src/backend/base/langflow/alembic/versions"
validator = MigrationValidator(strict_mode=False)
# Pick a random legacy migration
legacy_migration = next(
(f for f in migrations_dir.glob("*.py") if not f.name.startswith("00") and f.name != "__init__.py"), None
)
if legacy_migration:
result = validator.validate_migration_file(legacy_migration)
assert result["valid"] is False
violations = [v["type"] for v in result["violations"]]
assert "NO_PHASE_MARKER" in violations

View File

@@ -0,0 +1,55 @@
import os
import sqlite3
import tempfile
from pathlib import Path
import pytest
from alembic import command
from alembic.config import Config
def test_real_migration_execution():
"""Test migration with actual SQLite database."""
# Create temporary database
with tempfile.NamedTemporaryFile(suffix=".db") as tmp:
db_path = tmp.name
# Create test table
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT,
old_email TEXT
)
""")
conn.commit()
# Create alembic.ini
alembic_cfg = Config()
# Ensure path is correct relative to where tests run
workspace_root = Path(__file__).resolve().parents[5]
script_location = workspace_root / "src/backend/base/langflow/alembic"
if not script_location.exists():
pytest.fail(f"Alembic script location not found at {script_location}")
alembic_cfg.set_main_option("script_location", str(script_location))
alembic_cfg.set_main_option("sqlalchemy.url", f"sqlite+aiosqlite:///{db_path}")
# Run migration
try:
# Use specific head to avoid conflict with test migrations
migration_revision = os.environ.get("ALEMBIC_TEST_REVISION", "head")
command.upgrade(alembic_cfg, migration_revision) # pragma: allowlist secret
except Exception as e:
pytest.fail(f"Migration failed: {e}")
# Verify schema
cursor = conn.execute("PRAGMA table_info(users)")
cursor.fetchall()
conn.close()
# Just ensure we reached this point
assert True

View File

@@ -0,0 +1,233 @@
import pytest
import sqlalchemy as sa
from langflow.alembic.migration_validator import MigrationValidator
from sqlalchemy import Column, Integer, MetaData, String, Table, create_engine, text
# Fixture to create temporary migration files
@pytest.fixture
def create_migration_file(tmp_path):
def _create(content):
p = tmp_path / "test_migration.py"
p.write_text(content)
return p
return _create
class TestMigrationValidator:
"""Tests for the MigrationValidator static analysis tool."""
def test_valid_expand_migration(self, create_migration_file):
"""Test that a properly formatted EXPAND migration passes validation."""
content = """
\"\"\"
Phase: EXPAND
\"\"\"
from alembic import op
import sqlalchemy as sa
def upgrade():
# Check existence for idempotency
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col['name'] for col in inspector.get_columns('users')]
if 'new_col' not in columns:
# Nullable=True is required for EXPAND
op.add_column('users', sa.Column('new_col', sa.String(), nullable=True))
def downgrade():
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col['name'] for col in inspector.get_columns('users')]
if 'new_col' in columns:
# Check for data loss (warning in validation)
op.execute("SELECT COUNT(*) FROM users WHERE new_col IS NOT NULL")
op.drop_column('users', 'new_col')
"""
path = create_migration_file(content)
validator = MigrationValidator()
result = validator.validate_migration_file(path)
assert result["valid"] is True
assert result["phase"] == "EXPAND"
assert len(result["violations"]) == 0
def test_invalid_expand_migration_breaking_change(self, create_migration_file):
"""Test that adding a non-nullable column is caught."""
content = """
\"\"\"
Phase: EXPAND
\"\"\"
from alembic import op
import sqlalchemy as sa
def upgrade():
# VIOLATION: nullable=False without default
op.add_column('users', sa.Column('new_col', sa.String(), nullable=False))
def downgrade():
op.drop_column('users', 'new_col')
"""
path = create_migration_file(content)
validator = MigrationValidator()
result = validator.validate_migration_file(path)
assert result["valid"] is False
violations = [v["type"] for v in result["violations"]]
assert "BREAKING_ADD_COLUMN" in violations
# Also likely catches missing existence check
assert "NO_EXISTENCE_CHECK" in violations
def test_invalid_direct_rename_explicit(self, create_migration_file):
"""Test that explicit rename_column is caught."""
content = """
\"\"\"
Phase: EXPAND
\"\"\"
from alembic import op
def upgrade():
op.rename_column('users', 'old', 'new')
def downgrade():
pass
"""
path = create_migration_file(content)
validator = MigrationValidator()
result = validator.validate_migration_file(path)
assert result["valid"] is False
assert any(v["type"] == "DIRECT_RENAME" for v in result["violations"])
def test_contract_phase_validation(self, create_migration_file):
"""Test CONTRACT phase requirements."""
# Valid CONTRACT migration
content = """
\"\"\"
Phase: CONTRACT
\"\"\"
from alembic import op
import sqlalchemy as sa
def upgrade():
bind = op.get_bind()
# DATA CHECK (Required)
bind.execute("SELECT COUNT(*) FROM users WHERE old_col IS NOT NULL")
op.drop_column('users', 'old_col')
def downgrade():
# Downgrade in contract phase is hard/impossible without backup
raise NotImplementedError("Cannot reverse CONTRACT phase")
"""
path = create_migration_file(content)
validator = MigrationValidator()
result = validator.validate_migration_file(path)
assert result["valid"] is True
assert result["phase"] == "CONTRACT"
def test_contract_phase_missing_data_check(self, create_migration_file):
"""Test CONTRACT phase catches missing data check."""
content = """
\"\"\"
Phase: CONTRACT
\"\"\"
from alembic import op
def upgrade():
# Missing data verification check
op.drop_column('users', 'old_col')
def downgrade():
raise NotImplementedError
"""
path = create_migration_file(content)
validator = MigrationValidator()
result = validator.validate_migration_file(path)
# NOTE: The validator currently treats this as a violation (error) despite the
# Violation object having severity="warning" internally, because it adds it
# to the violations list.
violations = [v["type"] for v in result["violations"]]
assert "MISSING_DATA_CHECK" in violations
assert result["valid"] is False
class TestMigrationRuntimeGuidelines:
"""Tests proving that following the guidelines results in correct behavior.
1. N-1 Compatibility (Old code works with new schema).
2. Safe Rollback.
"""
def test_expand_phase_compatibility_and_rollback(self):
"""Simulate an EXPAND phase migration and verify N-1 compatibility and rollback."""
# 1. Setup Initial State (Version N-1)
engine = create_engine("sqlite:///:memory:")
metadata = MetaData()
# Initial Schema
users = Table(
"users", metadata, Column("id", Integer, primary_key=True), Column("username", String, nullable=False)
)
metadata.create_all(engine)
# Populate with some data using "Old Service"
with engine.connect() as conn:
conn.execute(users.insert().values(username="user_v1"))
conn.commit()
# 2. Apply EXPAND Migration (Version N)
# Guideline: Add new column as nullable
with engine.connect() as conn:
# Verify idempotency check logic works (simulated)
inspector = sa.inspect(conn)
if "email" not in [c["name"] for c in inspector.get_columns("users")]:
conn.execute(text("ALTER TABLE users ADD COLUMN email VARCHAR NULL"))
conn.commit()
# 3. Verify N-1 Compatibility
with engine.connect() as conn:
# Can "Old Service" still read?
# (Select * might get extra column, but mapped ORM usually ignores unknown unless strict)
# Raw SQL insert from old service (doesn't know about email)
try:
conn.execute(text("INSERT INTO users (username) VALUES ('user_v1_after_migration')"))
conn.commit()
except Exception as e:
pytest.fail(f"Old service broke after migration: {e}")
# Can "New Service" use new features?
conn.execute(text("INSERT INTO users (username, email) VALUES ('user_v2', 'test@example.com')"))
conn.commit()
# 4. Verify Rollback Safety
# Guideline: Check for data in new column before dropping
with engine.connect() as conn:
# Check for data
count = conn.execute(text("SELECT COUNT(*) FROM users WHERE email IS NOT NULL")).scalar()
assert count is not None, "Count should not be None"
assert count > 0, "Should have data in new column"
# In a real scenario, we would backup here if count > 0
# For this test, we proceed to drop, simulating the downgrade() op
# SQLite support for DROP COLUMN
conn.execute(text("ALTER TABLE users DROP COLUMN email"))
conn.commit()
# 5. Verify Post-Rollback State
with engine.connect() as conn:
inspector = sa.inspect(conn)
columns = [c["name"] for c in inspector.get_columns("users")]
assert "email" not in columns
assert "username" in columns
# Verify data integrity of original columns
rows = conn.execute(text("SELECT username FROM users")).fetchall()
usernames = [r[0] for r in rows]
assert "user_v1" in usernames
assert "user_v1_after_migration" in usernames
assert "user_v2" in usernames # This user should still exist, just lost their email

View File

@@ -0,0 +1,169 @@
from pathlib import Path
import pytest
from langflow.alembic.migration_validator import MigrationValidator
@pytest.fixture
def create_test_migration(tmp_path):
def _create(content: str, filename: str) -> Path:
p = tmp_path / filename
p.write_text(content)
return p
return _create
class TestMigrationValidatorScripts:
def test_expand_phase(self, create_test_migration):
"""Test EXPAND phase validations."""
# Test: Good EXPAND migration
good_expand = '''"""
Description: Add email_verified column
Phase: EXPAND
Safe to rollback: YES
Revision ID: test_expand_good
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
bind = op.get_bind()
inspector = sa.inspect(bind)
columns = [col['name'] for col in inspector.get_columns('users')]
if 'email_verified' not in columns:
op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=True))
def downgrade():
op.drop_column('users', 'email_verified')
'''
# Test: Bad EXPAND migration
bad_expand = '''"""
Description: Add required column
Phase: EXPAND
Revision ID: test_expand_bad
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
# Missing existence check and non-nullable
op.add_column('users', sa.Column('email_verified', sa.Boolean(), nullable=False))
# Dropping column in EXPAND phase
op.drop_column('users', 'old_column')
def downgrade():
pass
'''
validator = MigrationValidator()
# Test good migration
good_file = create_test_migration(good_expand, "good_expand.py")
result = validator.validate_migration_file(good_file)
assert result["valid"], "Good EXPAND should pass"
# Test bad migration
bad_file = create_test_migration(bad_expand, "bad_expand.py")
result = validator.validate_migration_file(bad_file)
assert not result["valid"], "Bad EXPAND should fail"
violations = [v["type"] for v in result["violations"]]
assert len(violations) > 0
def test_contract_phase(self, create_test_migration):
"""Test CONTRACT phase validations."""
good_contract = '''"""
Description: Remove old column
Phase: CONTRACT
Revision ID: test_contract_good
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
bind = op.get_bind()
# Check data migration is complete
result = bind.execute(sa.text("""
SELECT COUNT(*) as cnt FROM users
WHERE old_email IS NOT NULL AND new_email IS NULL
""")).first()
if result.cnt > 0:
raise Exception(f"Cannot contract: {result.cnt} rows not migrated")
op.drop_column('users', 'old_email')
def downgrade():
raise NotImplementedError("Cannot rollback CONTRACT phase")
'''
validator = MigrationValidator()
good_file = create_test_migration(good_contract, "good_contract.py")
result = validator.validate_migration_file(good_file)
assert result["valid"], "Good CONTRACT should pass"
def test_phase_detection(self, create_test_migration):
"""Test phase detection from different formats."""
test_cases = [
("Phase: EXPAND", "EXPAND"),
("phase: migrate", "MIGRATE"),
("PHASE: CONTRACT", "CONTRACT"),
("No phase marker", "UNKNOWN"),
]
validator = MigrationValidator()
for content_marker, expected_phase in test_cases:
content = f'''"""
Migration description
{content_marker}
"""
def upgrade(): pass
def downgrade(): pass
'''
file = create_test_migration(content, "phase_test.py")
result = validator.validate_migration_file(file)
detected_phase = result["phase"]
assert detected_phase == expected_phase, f"Phase detection failed for {content_marker}"
def test_common_mistakes(self, create_test_migration):
"""Test detection of common migration mistakes."""
mistakes = {
"Direct rename": """
def upgrade():
op.rename_column('users', 'email', 'email_address')
""",
"Direct type change": """
def upgrade():
op.alter_column('users', 'age', type_=sa.Integer())
""",
"Non-nullable without default": """
def upgrade():
op.add_column('users', sa.Column('required_field', sa.String(), nullable=False))
""",
}
validator = MigrationValidator()
for mistake_name, code in mistakes.items():
content = f'''"""
Test: {mistake_name}
Phase: EXPAND
"""
from alembic import op
import sqlalchemy as sa
{code}
def downgrade(): pass
'''
file = create_test_migration(content, f"{mistake_name}.py")
result = validator.validate_migration_file(file)
assert not result["valid"], f"Should detect {mistake_name}"

File diff suppressed because one or more lines are too long