feat: add secret key rotation script and documentation (#10978)

* feat: add script for migrating encrypted data to a new secret key

* test: add unit tests for secret key migration script

* docs: update SECURITY.md to include secret key rotation process and migration instructions

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* update component index

* update component index

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* refactor: use atomic transaction for secret key migration

* fix: print new key for both generated and provided cases

Ensure users always see the key being used regardless of whether
it was auto-generated or provided via --new-key flag.

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* fix: improve error handling and test coverage for secret key migration

- Update docstring to accurately reflect migrated fields (remove KB reference)
- Change migrate_auth_settings to return (result, failed_fields) tuple
- Skip folders with failed field migrations instead of silently preserving old values
- Add tests for transaction atomicity and rollback behavior
- Add tests for error handling: invalid data, null values, malformed JSON
- Add test for file permissions (0o600 on Unix)
- Add test for dry-run mode database preservation

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* docs: improve docstrings for secret key migration script

- Add detailed description for ensure_valid_key explaining short vs long key handling
- Add Returns section to migrate_value documenting None return on failure
- Add full Args documentation and transaction behavior to migrate function

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* fix: update .gitignore to include new data files and user-specific cache

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* fix: update usage instructions to include 'uv run' for secret key migration script

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

* feat: add migration verification functionality and corresponding tests

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* [autofix.ci] apply automated fixes (attempt 3/3)

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida
2026-03-03 10:06:58 -03:00
committed by GitHub
parent 797c430270
commit 91219be56c
5 changed files with 1416 additions and 2 deletions

1
.gitignore vendored
View File

@@ -281,7 +281,6 @@ src/frontend/temp
news-aggregated.json
.claude
member_servers.json
# Component index cache (user-specific)
**/.cache/lfx/

View File

@@ -74,6 +74,7 @@ When creating a flow through the [`/api/v1/flows/`](https://docs.langflow.org/ap
Langflow versions `1.6.0` through `1.6.3` have a critical bug where environment variables from `.env` files are not being read. This affects all deployments using environment variables for configuration, including security settings.
**Potential security impact:**
- Environment variables from `.env` files are not read.
- Security configurations like `AUTO_LOGIN=false` may not be applied, potentially allowing users to log in as the default superuser.
- Database credentials, API keys, and other sensitive configuration may not be loaded.
@@ -128,10 +129,12 @@ The `langflow superuser` CLI command can present a privilege escalation risk if
#### Security Measures
1. **Authentication Required in Production**
- When `LANGFLOW_AUTO_LOGIN=false`, superuser creation requires authentication
- Use `--auth-token` parameter with a valid superuser API key or JWT token
2. **Disable CLI Superuser Creation**
- Set `LANGFLOW_ENABLE_SUPERUSER_CLI=false` to disable the command entirely
- Strongly recommended for production environments
@@ -149,4 +152,75 @@ export LANGFLOW_SUPERUSER="<your-superuser-username>"
export LANGFLOW_SUPERUSER_PASSWORD="<your-superuser-password>"
export LANGFLOW_DATABASE_URL="<your-production-database-url>" # e.g. "postgresql+psycopg://langflow:secure_pass@db.internal:5432/langflow"
export LANGFLOW_SECRET_KEY="your-strong-random-secret-key"
```
```
## Secret Key Rotation
The `LANGFLOW_SECRET_KEY` is used for:
- **JWT signing**: Access tokens, refresh tokens
- **Fernet encryption**: Stored credentials, encrypted variables, MCP auth settings
User passwords use bcrypt and are **not affected** by key rotation.
### Running the Migration
```bash
# Stop Langflow first, then:
# Preview what will be migrated
uv run python scripts/migrate_secret_key.py --dry-run
# Run the migration
uv run python scripts/migrate_secret_key.py
# Start Langflow
```
The script will:
1. Read your current secret key from the config directory
2. Generate a new secret key
3. Re-encrypt all sensitive data in the database (atomic transaction)
4. Backup the old key to `<config-dir>/secret_key.backup.<timestamp>`
5. Save the new key to `<config-dir>/secret_key`
If the database migration fails, no changes are made - the transaction rolls back and the key files remain untouched.
### Config Directory Location
The default config directory varies by platform:
- **macOS**: `~/Library/Caches/langflow`
- **Linux**: `~/.cache/langflow`
- **Windows**: `C:\Users\<user>\AppData\Local\langflow\langflow\Cache`
Override with `LANGFLOW_CONFIG_DIR` environment variable or `--config-dir` flag.
### CLI Options
```
uv run python scripts/migrate_secret_key.py --help
Options:
--dry-run Preview changes without modifying anything
--config-dir PATH Langflow config directory (default: platform-specific)
--database-url URL Database connection URL (default: sqlite in config dir)
--old-key KEY Current secret key (default: read from config dir)
--new-key KEY New secret key (default: auto-generated)
```
### What Gets Migrated
| Location | Data | Notes |
| ---------------------- | -------------------------------- | ---------------- |
| `user.store_api_key` | Langflow Store API key | Fernet encrypted |
| `variable.value` | All variable values | Fernet encrypted |
| `folder.auth_settings` | MCP oauth_client_secret, api_key | Fernet encrypted |
### What Gets Invalidated
Even with migration, these cannot be preserved:
- **Active sessions**: Users must log in again (JWT tokens are invalidated)

View File

@@ -0,0 +1,489 @@
"""Migrate encrypted data from old secret key to new secret key.
This script handles the full key rotation lifecycle:
1. Reads the current secret key from config directory
2. Generates a new secret key (or uses one provided)
3. Re-encrypts all sensitive data in the database (atomic transaction)
4. Backs up the old key
5. Saves the new key
Migrated database fields:
- user.store_api_key: Langflow Store API keys
- variable.value: All encrypted variable values
- folder.auth_settings: MCP oauth_client_secret and api_key fields
Usage:
uv run python scripts/migrate_secret_key.py --help
uv run python scripts/migrate_secret_key.py --dry-run
uv run python scripts/migrate_secret_key.py --database-url postgresql://...
"""
import argparse
import base64
import json
import os
import platform
import random
import secrets
import sys
from datetime import datetime, timezone
from pathlib import Path
from cryptography.fernet import Fernet, InvalidToken
from platformdirs import user_cache_dir
from sqlalchemy import create_engine, text
MINIMUM_KEY_LENGTH = 32
SENSITIVE_AUTH_FIELDS = ["oauth_client_secret", "api_key"]
# Must match langflow.services.variable.constants.CREDENTIAL_TYPE
CREDENTIAL_TYPE = "Credential"
def get_default_config_dir() -> Path:
"""Get the default Langflow config directory using platformdirs."""
return Path(user_cache_dir("langflow", "langflow"))
def get_config_dir() -> Path:
"""Get the Langflow config directory from environment or default."""
config_dir = os.environ.get("LANGFLOW_CONFIG_DIR")
if config_dir:
return Path(config_dir)
return get_default_config_dir()
def set_secure_permissions(file_path: Path) -> None:
"""Set restrictive permissions on a file (600 on Unix)."""
if platform.system() in {"Linux", "Darwin"}:
file_path.chmod(0o600)
elif platform.system() == "Windows":
try:
import win32api
import win32con
import win32security
user, _, _ = win32security.LookupAccountName("", win32api.GetUserName())
sd = win32security.GetFileSecurity(str(file_path), win32security.DACL_SECURITY_INFORMATION)
dacl = win32security.ACL()
dacl.AddAccessAllowedAce(
win32security.ACL_REVISION,
win32con.GENERIC_READ | win32con.GENERIC_WRITE,
user,
)
sd.SetSecurityDescriptorDacl(1, dacl, 0)
win32security.SetFileSecurity(str(file_path), win32security.DACL_SECURITY_INFORMATION, sd)
except ImportError:
print("Warning: Could not set secure permissions on Windows (pywin32 not installed)")
def read_secret_key_from_file(config_dir: Path) -> str | None:
"""Read the secret key from the config directory."""
secret_file = config_dir / "secret_key"
if secret_file.exists():
return secret_file.read_text(encoding="utf-8").strip()
return None
def write_secret_key_to_file(config_dir: Path, key: str, filename: str = "secret_key") -> None:
"""Write a secret key to file with secure permissions."""
config_dir.mkdir(parents=True, exist_ok=True)
secret_file = config_dir / filename
secret_file.write_text(key, encoding="utf-8")
set_secure_permissions(secret_file)
def ensure_valid_key(s: str) -> bytes:
"""Convert a secret key string to valid Fernet key bytes.
For keys shorter than MINIMUM_KEY_LENGTH (32), generates a deterministic
key by seeding random with the input string. For longer keys, pads with
'=' to ensure valid base64 encoding.
NOTE: This function is duplicated from langflow.services.auth.utils.ensure_valid_key
to keep the migration script self-contained (can run without full Langflow installation).
Keep in sync if encryption logic changes.
"""
if len(s) < MINIMUM_KEY_LENGTH:
random.seed(s)
key = bytes(random.getrandbits(8) for _ in range(32))
return base64.urlsafe_b64encode(key)
padding_needed = 4 - len(s) % 4
return (s + "=" * padding_needed).encode()
def decrypt_with_key(encrypted: str, key: str) -> str:
"""Decrypt data with the given key."""
fernet = Fernet(ensure_valid_key(key))
return fernet.decrypt(encrypted.encode()).decode()
def encrypt_with_key(plaintext: str, key: str) -> str:
"""Encrypt data with the given key."""
fernet = Fernet(ensure_valid_key(key))
return fernet.encrypt(plaintext.encode()).decode()
def migrate_value(encrypted: str, old_key: str, new_key: str) -> str | None:
"""Decrypt with old key and re-encrypt with new key.
Returns:
The re-encrypted value, or None if decryption fails (invalid key or corrupted data).
"""
try:
plaintext = decrypt_with_key(encrypted, old_key)
return encrypt_with_key(plaintext, new_key)
except InvalidToken:
return None
def migrate_auth_settings(auth_settings: dict, old_key: str, new_key: str) -> tuple[dict, list[str]]:
"""Re-encrypt sensitive fields in auth_settings dict.
Returns:
Tuple of (migrated_settings, failed_fields) where failed_fields contains
names of fields that could not be decrypted with the old key.
"""
result = auth_settings.copy()
failed_fields = []
for field in SENSITIVE_AUTH_FIELDS:
if result.get(field):
new_value = migrate_value(result[field], old_key, new_key)
if new_value:
result[field] = new_value
else:
failed_fields.append(field)
return result, failed_fields
def verify_migration(conn, new_key: str) -> tuple[int, int]:
"""Verify migrated data can be decrypted with the new key.
Samples records from each table and attempts decryption.
Returns:
Tuple of (verified_count, failed_count).
"""
verified, failed = 0, 0
# Verify user.store_api_key (sample up to 3)
users = conn.execute(
text('SELECT id, store_api_key FROM "user" WHERE store_api_key IS NOT NULL LIMIT 3')
).fetchall()
for _, encrypted_key in users:
try:
decrypt_with_key(encrypted_key, new_key)
verified += 1
except InvalidToken:
failed += 1
# Verify variable.value (sample up to 3)
variables = conn.execute(
text("SELECT id, value FROM variable WHERE type = :type AND value IS NOT NULL LIMIT 3"),
{"type": CREDENTIAL_TYPE},
).fetchall()
for _, encrypted_value in variables:
try:
decrypt_with_key(encrypted_value, new_key)
verified += 1
except InvalidToken:
failed += 1
# Verify folder.auth_settings (sample up to 3)
folders = conn.execute(
text("SELECT id, auth_settings FROM folder WHERE auth_settings IS NOT NULL LIMIT 3")
).fetchall()
for _, auth_settings in folders:
if not auth_settings:
continue
try:
settings_dict = auth_settings if isinstance(auth_settings, dict) else json.loads(auth_settings)
for field in SENSITIVE_AUTH_FIELDS:
if settings_dict.get(field):
decrypt_with_key(settings_dict[field], new_key)
verified += 1
except (InvalidToken, json.JSONDecodeError):
failed += 1
return verified, failed
def get_default_database_url(config_dir: Path) -> str | None:
"""Get database URL from default SQLite location."""
default_db = config_dir / "langflow.db"
if default_db.exists():
return f"sqlite:///{default_db}"
return None
DATABASE_URL_DISPLAY_LENGTH = 50
def migrate(
config_dir: Path,
database_url: str,
old_key: str | None = None,
new_key: str | None = None,
*,
dry_run: bool = False,
):
"""Run the secret key migration.
Args:
config_dir: Path to Langflow config directory containing secret_key file.
database_url: SQLAlchemy database connection URL.
old_key: Current secret key. If None, reads from config_dir/secret_key.
new_key: New secret key. If None, generates a secure random key.
dry_run: If True, simulates migration without making changes.
The migration runs as an atomic transaction - either all database changes
succeed or none are applied. Key files are only modified after successful
database migration.
"""
# Determine old key
if not old_key:
old_key = read_secret_key_from_file(config_dir)
if not old_key:
print("Error: Could not find current secret key.")
print(f" Checked: {config_dir}/secret_key")
print(" Use --old-key to provide it explicitly")
sys.exit(1)
# Determine new key
if not new_key:
new_key = secrets.token_urlsafe(32)
print(f"Generated new secret key: {new_key}")
else:
print(f"Using provided new key: {new_key}")
print(" (Save this key - you'll need it if the migration fails after database commit)")
if old_key == new_key:
print("Error: Old and new secret keys are the same")
sys.exit(1)
print("\nConfiguration:")
print(f" Config dir: {config_dir}")
db_display = (
f"{database_url[:DATABASE_URL_DISPLAY_LENGTH]}..."
if len(database_url) > DATABASE_URL_DISPLAY_LENGTH
else database_url
)
print(f" Database: {db_display}")
print(f" Dry run: {dry_run}")
if dry_run:
print("\n[DRY RUN] No changes will be made.\n")
engine = create_engine(database_url)
total_migrated = 0
total_failed = 0
# Use begin() for atomic transaction - all changes commit together or rollback on failure
with engine.begin() as conn:
# Migrate user.store_api_key
print("\n1. Migrating user.store_api_key...")
users = conn.execute(text('SELECT id, store_api_key FROM "user" WHERE store_api_key IS NOT NULL')).fetchall()
migrated, failed = 0, 0
for user_id, encrypted_key in users:
new_encrypted = migrate_value(encrypted_key, old_key, new_key)
if new_encrypted:
if not dry_run:
conn.execute(
text('UPDATE "user" SET store_api_key = :val WHERE id = :id'),
{"val": new_encrypted, "id": user_id},
)
migrated += 1
else:
failed += 1
print(f" Warning: Could not decrypt for user {user_id}")
print(f" {'Would migrate' if dry_run else 'Migrated'}: {migrated}, Failed: {failed}")
total_migrated += migrated
total_failed += failed
# Migrate variable.value (only Credential type variables are encrypted)
print("\n2. Migrating credential variable values...")
variables = conn.execute(
text("SELECT id, name, value FROM variable WHERE type = :type"),
{"type": CREDENTIAL_TYPE},
).fetchall()
migrated, failed, skipped = 0, 0, 0
for var_id, var_name, encrypted_value in variables:
if not encrypted_value:
skipped += 1
continue
new_encrypted = migrate_value(encrypted_value, old_key, new_key)
if new_encrypted:
if not dry_run:
conn.execute(
text("UPDATE variable SET value = :val WHERE id = :id"),
{"val": new_encrypted, "id": var_id},
)
migrated += 1
else:
failed += 1
print(f" Warning: Could not decrypt variable '{var_name}' ({var_id})")
print(f" {'Would migrate' if dry_run else 'Migrated'}: {migrated}, Failed: {failed}, Skipped: {skipped}")
total_migrated += migrated
total_failed += failed
# Migrate folder.auth_settings
print("\n3. Migrating folder.auth_settings (MCP)...")
folders = conn.execute(
text("SELECT id, name, auth_settings FROM folder WHERE auth_settings IS NOT NULL")
).fetchall()
migrated, failed = 0, 0
for folder_id, folder_name, auth_settings in folders:
if not auth_settings:
continue
try:
settings_dict = auth_settings if isinstance(auth_settings, dict) else json.loads(auth_settings)
new_settings, failed_fields = migrate_auth_settings(settings_dict, old_key, new_key)
if failed_fields:
failed += 1
print(f" Warning: Could not migrate folder '{folder_name}' fields: {', '.join(failed_fields)}")
continue
if not dry_run:
conn.execute(
text("UPDATE folder SET auth_settings = :val WHERE id = :id"),
{"val": json.dumps(new_settings), "id": folder_id},
)
migrated += 1
except (json.JSONDecodeError, InvalidToken, TypeError, KeyError) as e:
failed += 1
print(f" Warning: Could not migrate folder '{folder_name}': {e}")
print(f" {'Would migrate' if dry_run else 'Migrated'}: {migrated}, Failed: {failed}")
total_migrated += migrated
total_failed += failed
# Verify migrated data can be decrypted with new key
if total_migrated > 0:
print("\n4. Verifying migration...")
verified, verify_failed = verify_migration(conn, new_key)
if verify_failed > 0:
print(f" ERROR: {verify_failed} records failed verification!")
print(" Rolling back transaction...")
conn.rollback()
sys.exit(1)
if verified > 0:
print(f" Verified {verified} sample records can be decrypted with new key")
else:
print(" No records to verify (all tables empty)")
# Rollback if dry run (transaction will auto-commit on exit otherwise)
if dry_run:
conn.rollback()
# Save new key only after successful database migration
if not dry_run:
backup_file = config_dir / f"secret_key.backup.{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
write_secret_key_to_file(config_dir, old_key, backup_file.name)
print(f"\n5. Backed up old key to: {backup_file}")
write_secret_key_to_file(config_dir, new_key)
print(f"6. Saved new secret key to: {config_dir / 'secret_key'}")
else:
print("\n5. [DRY RUN] Would backup old key")
print(f"6. [DRY RUN] Would save new key to: {config_dir / 'secret_key'}")
# Summary
print("\n" + "=" * 50)
if dry_run:
print("DRY RUN COMPLETE")
print(f"\nWould migrate {total_migrated} items, {total_failed} failures")
print("\nRun without --dry-run to apply changes.")
else:
print("MIGRATION COMPLETE")
print(f"\nMigrated {total_migrated} items, {total_failed} failures")
print(f"\nBackup key location: {config_dir}/secret_key.backup.*")
print("\nNext steps:")
print("1. Start Langflow and verify everything works")
print("2. Users must log in again (JWT sessions invalidated)")
print("3. Once verified, you may delete the backup key file")
if total_failed > 0:
print(f"\nWarning: {total_failed} items could not be migrated.")
print("These may have been encrypted with a different key or are corrupted.")
sys.exit(1 if not dry_run else 0)
def main():
default_config = get_config_dir()
parser = argparse.ArgumentParser(
description="Migrate Langflow encrypted data to a new secret key",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Preview what will be migrated
%(prog)s --dry-run
# Run migration with defaults
%(prog)s
# Custom database and config
%(prog)s --database-url postgresql://user:pass@host/db --config-dir /etc/langflow # pragma: allowlist secret
# Provide keys explicitly
%(prog)s --old-key "current-key" --new-key "replacement-key"
""",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Preview changes without modifying anything",
)
parser.add_argument(
"--config-dir",
type=Path,
default=default_config,
metavar="PATH",
help=f"Langflow config directory (default: {default_config})",
)
parser.add_argument(
"--database-url",
type=str,
default=None,
metavar="URL",
help="Database connection URL (default: sqlite in config dir)",
)
parser.add_argument(
"--old-key",
type=str,
default=None,
metavar="KEY",
help="Current secret key (default: read from config dir)",
)
parser.add_argument(
"--new-key",
type=str,
default=None,
metavar="KEY",
help="New secret key (default: auto-generated)",
)
args = parser.parse_args()
# Resolve database URL
database_url = args.database_url or get_default_database_url(args.config_dir)
if not database_url:
print("Error: Could not determine database URL.")
print(f" No database found at {args.config_dir}/langflow.db")
print(" Use --database-url to specify the database location")
sys.exit(1)
migrate(
config_dir=args.config_dir,
database_url=database_url,
old_key=args.old_key,
new_key=args.new_key,
dry_run=args.dry_run,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,852 @@
"""Tests for the secret key migration script."""
import importlib.util
import json
import secrets
import sys
import tempfile
from pathlib import Path
from uuid import uuid4
import pytest
from cryptography.fernet import Fernet
from httpx import AsyncClient
from langflow.services.deps import get_settings_service
from langflow.services.variable.constants import CREDENTIAL_TYPE
from sqlalchemy import create_engine, text
@pytest.fixture(scope="module")
def migrate_module():
"""Load the migrate_secret_key module from scripts directory."""
# Test file is at: src/backend/tests/unit/scripts/test_migrate_secret_key.py
# Script is at: scripts/migrate_secret_key.py
# Need to go up 5 levels to repo root, then into scripts/
test_file = Path(__file__).resolve()
repo_root = test_file.parents[5] # Goes to langflow repo root
script_path = repo_root / "scripts" / "migrate_secret_key.py"
if not script_path.exists():
pytest.skip(f"migrate_secret_key.py script not found at {script_path}")
spec = importlib.util.spec_from_file_location("migrate_secret_key", script_path)
module = importlib.util.module_from_spec(spec)
sys.modules["migrate_secret_key"] = module
spec.loader.exec_module(module)
return module
@pytest.fixture
def old_key():
"""Generate a valid old secret key."""
return secrets.token_urlsafe(32)
@pytest.fixture
def new_key():
"""Generate a valid new secret key."""
return secrets.token_urlsafe(32)
@pytest.fixture
def short_old_key():
"""A short key that triggers the seed-based generation."""
return "short-key"
@pytest.fixture
def short_new_key():
"""A different short key."""
return "other-short"
@pytest.fixture
def sqlite_db():
"""Create an in-memory SQLite database with the required tables."""
engine = create_engine("sqlite:///:memory:")
with engine.connect() as conn:
conn.execute(
text("""
CREATE TABLE "user" (
id TEXT PRIMARY KEY,
store_api_key TEXT
)
""")
)
conn.execute(
text("""
CREATE TABLE variable (
id TEXT PRIMARY KEY,
name TEXT,
value TEXT,
type TEXT
)
""")
)
conn.execute(
text("""
CREATE TABLE folder (
id TEXT PRIMARY KEY,
name TEXT,
auth_settings TEXT
)
""")
)
conn.commit()
return engine
class TestEnsureValidKey:
"""Tests for ensure_valid_key function."""
def test_long_key_padded(self, migrate_module, old_key):
"""Long keys should be padded to valid base64."""
result = migrate_module.ensure_valid_key(old_key)
assert isinstance(result, bytes)
Fernet(result)
def test_short_key_generates_valid_key(self, migrate_module, short_old_key):
"""Short keys should generate a valid Fernet key via seeding."""
result = migrate_module.ensure_valid_key(short_old_key)
assert isinstance(result, bytes)
Fernet(result)
def test_same_short_key_produces_same_result(self, migrate_module, short_old_key):
"""Same short key should always produce the same Fernet key."""
result1 = migrate_module.ensure_valid_key(short_old_key)
result2 = migrate_module.ensure_valid_key(short_old_key)
assert result1 == result2
def test_different_short_keys_produce_different_results(self, migrate_module, short_old_key, short_new_key):
"""Different short keys should produce different Fernet keys."""
result1 = migrate_module.ensure_valid_key(short_old_key)
result2 = migrate_module.ensure_valid_key(short_new_key)
assert result1 != result2
class TestEncryptDecrypt:
"""Tests for encrypt_with_key and decrypt_with_key functions."""
def test_encrypt_decrypt_roundtrip(self, migrate_module, old_key):
"""Encrypting then decrypting should return original value."""
plaintext = "my-secret-api-key-12345"
encrypted = migrate_module.encrypt_with_key(plaintext, old_key)
decrypted = migrate_module.decrypt_with_key(encrypted, old_key)
assert decrypted == plaintext
def test_encrypt_produces_different_output(self, migrate_module, old_key):
"""Encryption should produce ciphertext different from plaintext."""
plaintext = "my-secret-api-key-12345"
encrypted = migrate_module.encrypt_with_key(plaintext, old_key)
assert encrypted != plaintext
assert encrypted.startswith("gAAAAAB")
def test_decrypt_with_wrong_key_fails(self, migrate_module, old_key, new_key):
"""Decrypting with wrong key should raise an error."""
from cryptography.fernet import InvalidToken
plaintext = "my-secret-api-key-12345"
encrypted = migrate_module.encrypt_with_key(plaintext, old_key)
with pytest.raises(InvalidToken):
migrate_module.decrypt_with_key(encrypted, new_key)
def test_encrypt_decrypt_with_short_keys(self, migrate_module, short_old_key):
"""Short keys should work for encryption/decryption."""
plaintext = "secret-value"
encrypted = migrate_module.encrypt_with_key(plaintext, short_old_key)
decrypted = migrate_module.decrypt_with_key(encrypted, short_old_key)
assert decrypted == plaintext
class TestMigrateValue:
"""Tests for migrate_value function."""
def test_migrate_value_success(self, migrate_module, old_key, new_key):
"""Successfully migrate a value from old key to new key."""
plaintext = "original-secret"
old_encrypted = migrate_module.encrypt_with_key(plaintext, old_key)
new_encrypted = migrate_module.migrate_value(old_encrypted, old_key, new_key)
assert new_encrypted is not None
assert new_encrypted != old_encrypted
decrypted = migrate_module.decrypt_with_key(new_encrypted, new_key)
assert decrypted == plaintext
def test_migrate_value_wrong_old_key(self, migrate_module, old_key, new_key):
"""Migration should return None if old key doesn't decrypt."""
plaintext = "original-secret"
encrypted = migrate_module.encrypt_with_key(plaintext, old_key)
wrong_key = secrets.token_urlsafe(32)
result = migrate_module.migrate_value(encrypted, wrong_key, new_key)
assert result is None
def test_migrate_value_invalid_ciphertext(self, migrate_module, old_key, new_key):
"""Migration should return None for invalid ciphertext."""
result = migrate_module.migrate_value("not-valid-ciphertext", old_key, new_key)
assert result is None
class TestMigrateAuthSettings:
"""Tests for migrate_auth_settings function."""
def test_migrate_oauth_client_secret(self, migrate_module, old_key, new_key):
"""oauth_client_secret should be re-encrypted."""
secret = "my-oauth-secret" # noqa: S105 # pragma: allowlist secret
auth_settings = {
"auth_type": "oauth",
"oauth_client_id": "client-123",
"oauth_client_secret": migrate_module.encrypt_with_key(secret, old_key),
}
migrated, failed_fields = migrate_module.migrate_auth_settings(auth_settings, old_key, new_key)
assert failed_fields == []
assert migrated["auth_type"] == "oauth"
assert migrated["oauth_client_id"] == "client-123"
assert migrated["oauth_client_secret"] != auth_settings["oauth_client_secret"]
decrypted = migrate_module.decrypt_with_key(migrated["oauth_client_secret"], new_key)
assert decrypted == secret
def test_migrate_api_key_field(self, migrate_module, old_key, new_key):
"""api_key field should be re-encrypted."""
api_key = "sk-test-key" # pragma: allowlist secret
auth_settings = {
"auth_type": "api",
"api_key": migrate_module.encrypt_with_key(api_key, old_key),
}
migrated, failed_fields = migrate_module.migrate_auth_settings(auth_settings, old_key, new_key)
assert failed_fields == []
decrypted = migrate_module.decrypt_with_key(migrated["api_key"], new_key)
assert decrypted == api_key
def test_migrate_preserves_non_sensitive_fields(self, migrate_module, old_key, new_key):
"""Non-sensitive fields should be preserved unchanged."""
auth_settings = {
"auth_type": "oauth",
"oauth_host": "localhost",
"oauth_port": 3000,
"oauth_client_id": "my-client",
"oauth_client_secret": migrate_module.encrypt_with_key("secret", old_key),
}
migrated, failed_fields = migrate_module.migrate_auth_settings(auth_settings, old_key, new_key)
assert failed_fields == []
assert migrated["auth_type"] == auth_settings["auth_type"]
assert migrated["oauth_host"] == auth_settings["oauth_host"]
assert migrated["oauth_port"] == auth_settings["oauth_port"]
assert migrated["oauth_client_id"] == auth_settings["oauth_client_id"]
def test_migrate_empty_sensitive_fields(self, migrate_module, old_key, new_key):
"""Empty/None sensitive fields should be handled gracefully."""
auth_settings = {
"auth_type": "api",
"api_key": None,
"oauth_client_secret": "",
}
migrated, failed_fields = migrate_module.migrate_auth_settings(auth_settings, old_key, new_key)
assert failed_fields == []
assert migrated["api_key"] is None
assert migrated["oauth_client_secret"] == ""
def test_migrate_returns_failed_fields_for_invalid_encryption(self, migrate_module, old_key, new_key):
"""Invalid encrypted fields should be reported in failed_fields."""
auth_settings = {
"auth_type": "api",
"api_key": "not-valid-encrypted-data", # Invalid ciphertext # pragma: allowlist secret
"oauth_client_secret": migrate_module.encrypt_with_key("valid-secret", old_key),
}
migrated, failed_fields = migrate_module.migrate_auth_settings(auth_settings, old_key, new_key)
assert "api_key" in failed_fields
assert "oauth_client_secret" not in failed_fields
# oauth_client_secret should still be migrated
decrypted = migrate_module.decrypt_with_key(migrated["oauth_client_secret"], new_key)
assert decrypted == "valid-secret"
class TestDatabaseMigrationUnit:
"""Unit tests for database migration with in-memory SQLite."""
def test_migrate_user_store_api_key(self, migrate_module, sqlite_db, old_key, new_key):
"""Test migrating user.store_api_key column."""
user_id = str(uuid4())
original_value = "langflow-store-api-key"
encrypted_value = migrate_module.encrypt_with_key(original_value, old_key)
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": user_id, "key": encrypted_value},
)
conn.commit()
users = conn.execute(
text('SELECT id, store_api_key FROM "user" WHERE store_api_key IS NOT NULL')
).fetchall()
for uid, encrypted_key in users:
new_encrypted = migrate_module.migrate_value(encrypted_key, old_key, new_key)
assert new_encrypted is not None
conn.execute(
text('UPDATE "user" SET store_api_key = :val WHERE id = :id'),
{"val": new_encrypted, "id": uid},
)
conn.commit()
result = conn.execute(text('SELECT store_api_key FROM "user" WHERE id = :id'), {"id": user_id}).fetchone()
decrypted = migrate_module.decrypt_with_key(result[0], new_key)
assert decrypted == original_value
def test_migrate_variable_values(self, migrate_module, sqlite_db, old_key, new_key):
"""Test migrating variable.value column."""
var_id = str(uuid4())
original_value = "my-openai-api-key"
encrypted_value = migrate_module.encrypt_with_key(original_value, old_key)
with sqlite_db.connect() as conn:
conn.execute(
text("INSERT INTO variable (id, name, value, type) VALUES (:id, :name, :value, :type)"),
{"id": var_id, "name": "OPENAI_API_KEY", "value": encrypted_value, "type": "Credential"},
)
conn.commit()
variables = conn.execute(text("SELECT id, name, value FROM variable")).fetchall()
for vid, _, encrypted_val in variables:
if encrypted_val:
new_encrypted = migrate_module.migrate_value(encrypted_val, old_key, new_key)
assert new_encrypted is not None
conn.execute(
text("UPDATE variable SET value = :val WHERE id = :id"),
{"val": new_encrypted, "id": vid},
)
conn.commit()
result = conn.execute(text("SELECT value FROM variable WHERE id = :id"), {"id": var_id}).fetchone()
decrypted = migrate_module.decrypt_with_key(result[0], new_key)
assert decrypted == original_value
def test_migrate_folder_auth_settings(self, migrate_module, sqlite_db, old_key, new_key):
"""Test migrating folder.auth_settings JSON column."""
folder_id = str(uuid4())
oauth_secret = "my-oauth-secret" # noqa: S105 # pragma: allowlist secret
auth_settings = {
"auth_type": "oauth",
"oauth_client_id": "client-123",
"oauth_client_secret": migrate_module.encrypt_with_key(oauth_secret, old_key),
}
with sqlite_db.connect() as conn:
conn.execute(
text("INSERT INTO folder (id, name, auth_settings) VALUES (:id, :name, :settings)"),
{"id": folder_id, "name": "My Project", "settings": json.dumps(auth_settings)},
)
conn.commit()
folders = conn.execute(
text("SELECT id, name, auth_settings FROM folder WHERE auth_settings IS NOT NULL")
).fetchall()
for fid, _, settings_json in folders:
settings_dict = json.loads(settings_json)
new_settings, failed_fields = migrate_module.migrate_auth_settings(settings_dict, old_key, new_key)
assert failed_fields == []
conn.execute(
text("UPDATE folder SET auth_settings = :val WHERE id = :id"),
{"val": json.dumps(new_settings), "id": fid},
)
conn.commit()
result = conn.execute(text("SELECT auth_settings FROM folder WHERE id = :id"), {"id": folder_id}).fetchone()
migrated_settings = json.loads(result[0])
decrypted_secret = migrate_module.decrypt_with_key(migrated_settings["oauth_client_secret"], new_key)
assert decrypted_secret == oauth_secret
assert migrated_settings["oauth_client_id"] == "client-123"
class TestKeyFileManagement:
"""Tests for secret key file read/write operations."""
def test_read_secret_key_from_file(self, migrate_module):
"""Test reading secret key from config directory."""
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir)
secret_file = config_dir / "secret_key"
test_key = "test-secret-key-12345"
secret_file.write_text(test_key)
result = migrate_module.read_secret_key_from_file(config_dir)
assert result == test_key
def test_read_secret_key_strips_whitespace(self, migrate_module):
"""Test that reading strips whitespace from key."""
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir)
secret_file = config_dir / "secret_key"
secret_file.write_text(" test-key-with-spaces \n")
result = migrate_module.read_secret_key_from_file(config_dir)
assert result == "test-key-with-spaces"
def test_read_secret_key_returns_none_if_missing(self, migrate_module):
"""Test that reading returns None if file doesn't exist."""
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir)
result = migrate_module.read_secret_key_from_file(config_dir)
assert result is None
def test_write_secret_key_creates_file(self, migrate_module):
"""Test writing secret key creates file with correct content."""
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir)
test_key = "new-secret-key-67890"
migrate_module.write_secret_key_to_file(config_dir, test_key)
secret_file = config_dir / "secret_key"
assert secret_file.exists()
assert secret_file.read_text() == test_key
def test_write_secret_key_creates_parent_dirs(self, migrate_module):
"""Test writing creates parent directories if needed."""
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir) / "nested" / "config"
test_key = "nested-key"
migrate_module.write_secret_key_to_file(config_dir, test_key)
secret_file = config_dir / "secret_key"
assert secret_file.exists()
def test_write_secret_key_custom_filename(self, migrate_module):
"""Test writing with custom filename for backups."""
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir)
test_key = "backup-key"
migrate_module.write_secret_key_to_file(config_dir, test_key, "secret_key.backup")
backup_file = config_dir / "secret_key.backup"
assert backup_file.exists()
assert backup_file.read_text() == test_key
def test_get_config_dir_default(self, migrate_module, monkeypatch):
"""Test default config directory uses platformdirs."""
from platformdirs import user_cache_dir
monkeypatch.delenv("LANGFLOW_CONFIG_DIR", raising=False)
result = migrate_module.get_config_dir()
expected = Path(user_cache_dir("langflow", "langflow"))
assert result == expected
def test_get_config_dir_from_env(self, migrate_module, monkeypatch):
"""Test config directory from environment variable."""
monkeypatch.setenv("LANGFLOW_CONFIG_DIR", "/custom/config")
result = migrate_module.get_config_dir()
assert result == Path("/custom/config")
@pytest.mark.usefixtures("client")
class TestMigrationWithRealDatabase:
"""Integration tests using real Langflow database fixtures."""
async def test_credential_variable_stored_encrypted(
self,
migrate_module, # noqa: ARG002
client: AsyncClient,
active_user, # noqa: ARG002
logged_in_headers,
):
"""Test that credential variables are stored encrypted in the database.
The API returns None for credential values (for security), so we verify
that the value is different from the original - which means it's encrypted.
The migration script handles these encrypted values and re-encrypts them
with a new key - this is tested in the unit tests.
"""
client.follow_redirects = True
# Create a credential variable via API
var_name = f"TEST_API_KEY_{uuid4().hex[:8]}"
credential_variable = {
"name": var_name,
"value": "sk-test-secret-value-12345",
"type": CREDENTIAL_TYPE,
"default_fields": [],
}
response = await client.post("api/v1/variables/", json=credential_variable, headers=logged_in_headers)
assert response.status_code == 201
created_var = response.json()
# Read the variable back
response = await client.get("api/v1/variables/", headers=logged_in_headers)
assert response.status_code == 200
all_vars = response.json()
# Find our variable
our_var = next((v for v in all_vars if v["name"] == var_name), None)
assert our_var is not None
# For credentials, the API returns None for security (value is encrypted in DB)
# This is expected behavior - the migration script works on the raw DB values
assert our_var["value"] is None or our_var["value"] != credential_variable["value"]
# Cleanup
await client.delete(f"api/v1/variables/{created_var['id']}", headers=logged_in_headers)
async def test_create_folder_via_api(
self,
migrate_module, # noqa: ARG002
client: AsyncClient,
active_user, # noqa: ARG002
logged_in_headers,
):
"""Test that folders can be created via API."""
client.follow_redirects = True
project_data = {
"name": f"Test Project {uuid4().hex[:8]}",
"description": "Test project for migration",
}
response = await client.post("api/v1/folders/", json=project_data, headers=logged_in_headers)
assert response.status_code == 201
created_folder = response.json()
# Cleanup
await client.delete(f"api/v1/folders/{created_folder['id']}", headers=logged_in_headers)
@pytest.mark.usefixtures("client")
class TestMigrationCompatibility:
"""Test that migration script is compatible with Langflow's encryption."""
def test_script_encryption_matches_langflow(self, migrate_module):
"""Verify migration script produces same results as Langflow's auth utils."""
from langflow.services.auth import utils as auth_utils
settings_service = get_settings_service()
secret_key = settings_service.auth_settings.SECRET_KEY.get_secret_value()
plaintext = "test-api-key-compatibility"
# Encrypt with Langflow
langflow_encrypted = auth_utils.encrypt_api_key(plaintext, settings_service)
# Decrypt with migration script
script_decrypted = migrate_module.decrypt_with_key(langflow_encrypted, secret_key)
assert script_decrypted == plaintext
# Encrypt with migration script
script_encrypted = migrate_module.encrypt_with_key(plaintext, secret_key)
# Decrypt with Langflow
langflow_decrypted = auth_utils.decrypt_api_key(script_encrypted, settings_service)
assert langflow_decrypted == plaintext
class TestTransactionAtomicity:
"""Tests for atomic transaction behavior."""
def test_transaction_rollback_on_error(self, migrate_module, sqlite_db, old_key, new_key):
"""Test that database changes are rolled back if an error occurs mid-migration."""
user_id = str(uuid4())
original_value = "user-api-key"
encrypted_value = migrate_module.encrypt_with_key(original_value, old_key)
# Insert test data
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": user_id, "key": encrypted_value},
)
conn.commit()
# Simulate a failed migration using begin() - any exception causes rollback
try:
with sqlite_db.begin() as conn:
# Update the user's key
new_encrypted = migrate_module.migrate_value(encrypted_value, old_key, new_key)
conn.execute(
text('UPDATE "user" SET store_api_key = :val WHERE id = :id'),
{"val": new_encrypted, "id": user_id},
)
# Simulate an error before commit
msg = "Simulated failure"
raise RuntimeError(msg)
except RuntimeError:
pass
# Verify the original value was preserved (transaction was rolled back)
with sqlite_db.connect() as conn:
result = conn.execute(text('SELECT store_api_key FROM "user" WHERE id = :id'), {"id": user_id}).fetchone()
assert result[0] == encrypted_value # Original value preserved
def test_partial_migration_does_not_persist(self, migrate_module, sqlite_db, old_key, new_key):
"""Test that partial migrations don't leave database in inconsistent state."""
user_id = str(uuid4())
var_id = str(uuid4())
user_value = "user-secret"
var_value = "var-secret"
# Insert test data
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": user_id, "key": migrate_module.encrypt_with_key(user_value, old_key)},
)
encrypted_var = migrate_module.encrypt_with_key(var_value, old_key)
conn.execute(
text("INSERT INTO variable (id, name, value, type) VALUES (:id, :name, :value, :type)"),
{"id": var_id, "name": "TEST_VAR", "value": encrypted_var, "type": "Credential"},
)
conn.commit()
original_user_key = None
original_var_value = None
with sqlite_db.connect() as conn:
original_user_key = conn.execute(
text('SELECT store_api_key FROM "user" WHERE id = :id'), {"id": user_id}
).fetchone()[0]
original_var_value = conn.execute(
text("SELECT value FROM variable WHERE id = :id"), {"id": var_id}
).fetchone()[0]
# Attempt migration with failure after first table
try:
with sqlite_db.begin() as conn:
# Migrate user table successfully
new_encrypted = migrate_module.migrate_value(original_user_key, old_key, new_key)
conn.execute(
text('UPDATE "user" SET store_api_key = :val WHERE id = :id'),
{"val": new_encrypted, "id": user_id},
)
# Fail before variable table
msg = "Simulated failure after partial migration"
raise RuntimeError(msg)
except RuntimeError:
pass
# Both tables should be unchanged
with sqlite_db.connect() as conn:
user_result = conn.execute(
text('SELECT store_api_key FROM "user" WHERE id = :id'), {"id": user_id}
).fetchone()
var_result = conn.execute(text("SELECT value FROM variable WHERE id = :id"), {"id": var_id}).fetchone()
assert user_result[0] == original_user_key
assert var_result[0] == original_var_value
class TestErrorHandling:
"""Tests for error handling scenarios."""
def test_migration_handles_invalid_encrypted_data(self, migrate_module, sqlite_db, old_key, new_key):
"""Test that migration continues when encountering invalid encrypted data."""
valid_id = str(uuid4())
invalid_id = str(uuid4())
valid_value = "valid-secret"
with sqlite_db.connect() as conn:
# Insert valid encrypted data
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": valid_id, "key": migrate_module.encrypt_with_key(valid_value, old_key)},
)
# Insert invalid/corrupted encrypted data
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": invalid_id, "key": "not-valid-encrypted-data"},
)
conn.commit()
# migrate_value returns None for invalid data, allowing migration to continue
with sqlite_db.connect() as conn:
users = conn.execute(text('SELECT id, store_api_key FROM "user"')).fetchall()
migrated_count = 0
failed_count = 0
for _uid, encrypted_key in users:
new_encrypted = migrate_module.migrate_value(encrypted_key, old_key, new_key)
if new_encrypted:
migrated_count += 1
else:
failed_count += 1
assert migrated_count == 1 # Valid entry was migrated
assert failed_count == 1 # Invalid entry failed gracefully
def test_migration_handles_null_values(
self,
migrate_module, # noqa: ARG002
sqlite_db,
old_key, # noqa: ARG002
new_key, # noqa: ARG002
):
"""Test that migration handles NULL values correctly."""
user_id = str(uuid4())
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, NULL)'),
{"id": user_id},
)
conn.commit()
# NULL values should not cause errors
with sqlite_db.connect() as conn:
result = conn.execute(
text('SELECT id, store_api_key FROM "user" WHERE id = :id'), {"id": user_id}
).fetchone()
assert result[1] is None
def test_migration_handles_empty_auth_settings(self, migrate_module, old_key, new_key):
"""Test that migration handles empty auth_settings dict."""
empty_settings = {}
result, failed_fields = migrate_module.migrate_auth_settings(empty_settings, old_key, new_key)
assert result == {}
assert failed_fields == []
def test_migration_handles_malformed_json_gracefully(
self,
migrate_module, # noqa: ARG002
sqlite_db,
old_key, # noqa: ARG002
new_key, # noqa: ARG002
):
"""Test that malformed JSON in auth_settings is handled gracefully."""
folder_id = str(uuid4())
with sqlite_db.connect() as conn:
conn.execute(
text("INSERT INTO folder (id, name, auth_settings) VALUES (:id, :name, :settings)"),
{"id": folder_id, "name": "Bad Folder", "settings": "not-valid-json{"},
)
conn.commit()
# Attempting to parse and migrate should raise JSONDecodeError
with sqlite_db.connect() as conn:
result = conn.execute(text("SELECT auth_settings FROM folder WHERE id = :id"), {"id": folder_id}).fetchone()
with pytest.raises(json.JSONDecodeError):
json.loads(result[0])
def test_key_file_permissions_set_correctly(self, migrate_module):
"""Test that key file has restrictive permissions on Unix systems."""
import platform
import stat
if platform.system() not in {"Linux", "Darwin"}:
pytest.skip("Permission test only runs on Unix systems")
with tempfile.TemporaryDirectory() as tmpdir:
config_dir = Path(tmpdir)
test_key = "secure-key-12345"
migrate_module.write_secret_key_to_file(config_dir, test_key)
secret_file = config_dir / "secret_key"
file_mode = secret_file.stat().st_mode
# Check that only owner has read/write (0o600)
assert stat.S_IMODE(file_mode) == 0o600
class TestDryRunMode:
"""Tests for dry-run mode behavior."""
def test_dry_run_does_not_modify_database(self, migrate_module, sqlite_db, old_key, new_key):
"""Test that dry run mode doesn't modify the database."""
user_id = str(uuid4())
original_value = "original-secret"
encrypted_value = migrate_module.encrypt_with_key(original_value, old_key)
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": user_id, "key": encrypted_value},
)
conn.commit()
# Simulate dry-run behavior: use begin() then rollback
with sqlite_db.begin() as conn:
# Migrate the value
new_encrypted = migrate_module.migrate_value(encrypted_value, old_key, new_key)
conn.execute(
text('UPDATE "user" SET store_api_key = :val WHERE id = :id'),
{"val": new_encrypted, "id": user_id},
)
# Explicitly rollback to simulate dry-run
conn.rollback()
# Verify original value is preserved
with sqlite_db.connect() as conn:
result = conn.execute(text('SELECT store_api_key FROM "user" WHERE id = :id'), {"id": user_id}).fetchone()
assert result[0] == encrypted_value
# Can still decrypt with old key
decrypted = migrate_module.decrypt_with_key(result[0], old_key)
assert decrypted == original_value
class TestVerifyMigration:
"""Tests for post-migration verification."""
def test_verify_migration_success(self, migrate_module, sqlite_db, new_key):
"""Test verification passes when data is correctly migrated."""
user_id = str(uuid4())
var_id = str(uuid4())
original_value = "test-secret-value"
# Create data encrypted with new key (simulating successful migration)
encrypted_value = migrate_module.encrypt_with_key(original_value, new_key)
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": user_id, "key": encrypted_value},
)
conn.execute(
text("INSERT INTO variable (id, name, value, type) VALUES (:id, :name, :value, :type)"),
{"id": var_id, "name": "test_var", "value": encrypted_value, "type": CREDENTIAL_TYPE},
)
conn.commit()
# Verify migration
with sqlite_db.connect() as conn:
verified, failed = migrate_module.verify_migration(conn, new_key)
assert verified == 2 # 1 user + 1 variable
assert failed == 0
def test_verify_migration_failure(self, migrate_module, sqlite_db, old_key, new_key):
"""Test verification fails when data is encrypted with wrong key."""
user_id = str(uuid4())
original_value = "test-secret-value"
# Create data encrypted with old key (simulating failed migration)
encrypted_value = migrate_module.encrypt_with_key(original_value, old_key)
with sqlite_db.connect() as conn:
conn.execute(
text('INSERT INTO "user" (id, store_api_key) VALUES (:id, :key)'),
{"id": user_id, "key": encrypted_value},
)
conn.commit()
# Verify migration with new key should fail
with sqlite_db.connect() as conn:
verified, failed = migrate_module.verify_migration(conn, new_key)
assert verified == 0
assert failed == 1
def test_verify_migration_empty_tables(self, migrate_module, sqlite_db, new_key):
"""Test verification handles empty tables gracefully."""
with sqlite_db.connect() as conn:
verified, failed = migrate_module.verify_migration(conn, new_key)
assert verified == 0
assert failed == 0