improvement: graceful error when PostgreSQL version is less than 15 (#11811)
* check-postgres-version-and-make-nulls-failure-more-graceful * fix-async-error * use-session-scope-and-remove-unnecessary-function * [autofix.ci] apply automated fixes * use-systemexit-instead-of-runtime-error * only log defined error, fix signal handling * [autofix.ci] apply automated fixes * use-min-version-fstring * ruff-rule-sim105 --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jordan Frazier <jordan.frazier@datastax.com>
This commit is contained in:
@@ -34,6 +34,7 @@ from langflow.initial_setup.setup import get_or_create_default_folder
|
||||
from langflow.main import setup_app
|
||||
from langflow.services.auth.utils import get_current_user_from_access_token
|
||||
from langflow.services.database.models.api_key.crud import check_key
|
||||
from langflow.services.database.service import UnsupportedPostgreSQLVersionError, check_postgresql_version_sync
|
||||
from langflow.services.deps import get_db_service, get_settings_service, is_settings_service_initialized, session_scope
|
||||
from langflow.services.utils import initialize_services
|
||||
from langflow.utils.version import fetch_latest_version, get_version_info
|
||||
@@ -168,6 +169,9 @@ def wait_for_server_ready(host, port, protocol) -> None:
|
||||
|
||||
status_code = 0
|
||||
while status_code != httpx.codes.OK:
|
||||
# If the server process died (e.g. database version check failed), stop waiting.
|
||||
if process_manager.webapp_process and not process_manager.webapp_process.is_alive():
|
||||
sys.exit(process_manager.webapp_process.exitcode or 1)
|
||||
try:
|
||||
status_code = httpx.get(
|
||||
f"{protocol}://{health_check_host}:{port}/health",
|
||||
@@ -339,6 +343,15 @@ def run(
|
||||
|
||||
# Step 3: Connecting Database (this happens inside setup_app via dependencies)
|
||||
with progress.step(3):
|
||||
# Pre-flight: fail fast if PostgreSQL version is too old, before
|
||||
# spawning any server process (avoids messy lifespan / worker errors).
|
||||
database_url = settings_service.settings.database_url
|
||||
if database_url:
|
||||
try:
|
||||
check_postgresql_version_sync(database_url)
|
||||
except UnsupportedPostgreSQLVersionError:
|
||||
sys.exit(1)
|
||||
|
||||
# check if port is being used
|
||||
if is_port_in_use(port, host):
|
||||
port = get_free_port(port)
|
||||
|
||||
@@ -2,9 +2,10 @@ import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import tempfile
|
||||
import warnings
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import asynccontextmanager, suppress
|
||||
from http import HTTPStatus
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
@@ -39,6 +40,7 @@ from langflow.initial_setup.setup import (
|
||||
)
|
||||
from langflow.middleware import ContentSizeLimitMiddleware
|
||||
from langflow.plugin_routes import load_plugin_routes
|
||||
from langflow.services.database.service import UnsupportedPostgreSQLVersionError
|
||||
from langflow.services.deps import (
|
||||
get_queue_service,
|
||||
get_service,
|
||||
@@ -316,6 +318,18 @@ def get_lifespan(*, fix_migration=False, version=None):
|
||||
yield
|
||||
except asyncio.CancelledError:
|
||||
await logger.adebug("Lifespan received cancellation signal")
|
||||
except UnsupportedPostgreSQLVersionError:
|
||||
# Normally caught by the pre-flight check in __main__.py
|
||||
# before the server starts. If we get here anyway (e.g.
|
||||
# direct uvicorn invocation via ``make backend``), exit
|
||||
# immediately and tell the parent (reloader) to stop.
|
||||
import signal
|
||||
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
with suppress(ProcessLookupError, PermissionError):
|
||||
os.kill(os.getppid(), signal.SIGTERM)
|
||||
os._exit(3)
|
||||
except Exception as exc:
|
||||
if "langflow migration --fix" not in str(exc):
|
||||
logger.exception(exc)
|
||||
|
||||
13
src/backend/base/langflow/services/database/constants.py
Normal file
13
src/backend/base/langflow/services/database/constants.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""Database service constants."""
|
||||
|
||||
# Minimum PostgreSQL major version required by Langflow.
|
||||
# The schema uses UNIQUE NULLS DISTINCT, which is only supported in PostgreSQL 15+.
|
||||
MIN_POSTGRESQL_MAJOR_VERSION = 15
|
||||
|
||||
# User-facing message when migrations fail due to PostgreSQL < 15 (e.g. UNIQUE NULLS DISTINCT).
|
||||
POSTGRESQL_VERSION_REQUIRED_MESSAGE = (
|
||||
f"Langflow requires PostgreSQL {MIN_POSTGRESQL_MAJOR_VERSION} or higher when using PostgreSQL as the database. "
|
||||
"The current PostgreSQL version does not support the syntax used by Langflow's schema. "
|
||||
f"Please upgrade your PostgreSQL instance to version {MIN_POSTGRESQL_MAJOR_VERSION} or higher. "
|
||||
"See: https://docs.langflow.org/configuration-custom-database"
|
||||
)
|
||||
@@ -29,6 +29,10 @@ from langflow.helpers.windows_postgres_helper import configure_windows_postgres_
|
||||
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.database import models
|
||||
from langflow.services.database.constants import (
|
||||
MIN_POSTGRESQL_MAJOR_VERSION,
|
||||
POSTGRESQL_VERSION_REQUIRED_MESSAGE,
|
||||
)
|
||||
from langflow.services.database.models.user.crud import get_user_by_username
|
||||
from langflow.services.database.session import NoopSession
|
||||
from langflow.services.database.utils import Result, TableResults
|
||||
@@ -39,6 +43,50 @@ if TYPE_CHECKING:
|
||||
from lfx.services.settings.service import SettingsService
|
||||
|
||||
|
||||
class UnsupportedPostgreSQLVersionError(Exception):
|
||||
"""Raised when the PostgreSQL version is below the minimum required."""
|
||||
|
||||
|
||||
_PG_VERSION_QUERY = sa.text("SELECT current_setting('server_version_num'), current_setting('server_version')")
|
||||
|
||||
|
||||
def _check_version_row(version_num_str: str, version_str: str) -> None:
|
||||
"""Raise ``UnsupportedPostgreSQLVersionError`` when the version is too old."""
|
||||
if int(version_num_str) < MIN_POSTGRESQL_MAJOR_VERSION * 10000:
|
||||
msg = f"Running PostgreSQL {version_str}. {POSTGRESQL_VERSION_REQUIRED_MESSAGE}"
|
||||
logger.error(msg)
|
||||
raise UnsupportedPostgreSQLVersionError(msg)
|
||||
|
||||
|
||||
def check_postgresql_version_sync(database_url: str) -> None:
|
||||
"""Pre-flight check: verify PostgreSQL >= 15 using a synchronous connection.
|
||||
|
||||
Call this *before* starting uvicorn/gunicorn so a version mismatch
|
||||
results in a clean ``sys.exit(1)`` rather than a messy lifespan failure.
|
||||
Silently returns when the URL is not PostgreSQL.
|
||||
"""
|
||||
if not database_url.startswith(("postgresql", "postgres")):
|
||||
return
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
|
||||
# Normalise the async URL to a sync-compatible one.
|
||||
url = database_url
|
||||
if url.startswith("postgres://"):
|
||||
url = "postgresql://" + url.split("://", 1)[1]
|
||||
# Strip async driver suffixes so create_engine picks the default sync driver.
|
||||
for async_driver in ("+asyncpg", "+aiosqlite"):
|
||||
url = url.replace(async_driver, "")
|
||||
|
||||
engine = create_engine(url)
|
||||
try:
|
||||
with engine.connect() as conn:
|
||||
row = conn.execute(_PG_VERSION_QUERY).fetchone()
|
||||
_check_version_row(*row)
|
||||
finally:
|
||||
engine.dispose()
|
||||
|
||||
|
||||
class DatabaseService(Service):
|
||||
name = "database_service"
|
||||
|
||||
@@ -226,6 +274,23 @@ class DatabaseService(Service):
|
||||
async with self.async_session_maker() as session:
|
||||
yield session
|
||||
|
||||
async def ensure_postgresql_version(self) -> None:
|
||||
"""If the database is PostgreSQL, ensure it is version 15 or higher.
|
||||
|
||||
Langflow's schema uses UNIQUE NULLS DISTINCT, which is only supported in PostgreSQL 15+.
|
||||
Logs the message and raises UnsupportedPostgreSQLVersionError if the version is too old.
|
||||
"""
|
||||
if not self.database_url.startswith(("postgresql", "postgres")):
|
||||
return
|
||||
if self.settings_service.settings.use_noop_database:
|
||||
return
|
||||
async with session_scope() as session:
|
||||
result = await session.execute(_PG_VERSION_QUERY)
|
||||
version_num_str, version_str = result.fetchone()
|
||||
# Raise AFTER session_scope exits so session_scope doesn't log a
|
||||
# noisy "An error occurred during the session scope." traceback.
|
||||
_check_version_row(version_num_str, version_str)
|
||||
|
||||
async def assign_orphaned_flows_to_superuser(self) -> None:
|
||||
"""Assign orphaned flows to the default superuser when auto login is enabled."""
|
||||
settings_service = get_settings_service()
|
||||
|
||||
@@ -19,6 +19,7 @@ async def initialize_database(*, fix_migration: bool = False) -> None:
|
||||
from langflow.services.deps import get_db_service
|
||||
|
||||
database_service: DatabaseService = get_db_service()
|
||||
await database_service.ensure_postgresql_version()
|
||||
try:
|
||||
if database_service.settings_service.settings.database_connection_retry:
|
||||
await database_service.create_db_and_tables_with_retry()
|
||||
@@ -40,23 +41,25 @@ async def initialize_database(*, fix_migration: bool = False) -> None:
|
||||
try:
|
||||
await database_service.run_migrations(fix=fix_migration)
|
||||
except CommandError as exc:
|
||||
# if "overlaps with other requested revisions" or "Can't locate revision identified by"
|
||||
# are not in the exception, we can't handle it
|
||||
if "overlaps with other requested revisions" not in str(
|
||||
exc
|
||||
) and "Can't locate revision identified by" not in str(exc):
|
||||
error_message = str(exc)
|
||||
if (
|
||||
"overlaps with other requested revisions" in error_message
|
||||
or "Can't locate revision identified by" in error_message
|
||||
):
|
||||
# Wrong revision in the DB: delete alembic_version and re-run migrations
|
||||
logger.warning("Wrong revision in DB, deleting alembic_version table and running migrations again")
|
||||
from langflow.services.deps import session_scope
|
||||
|
||||
async with session_scope() as session:
|
||||
await session.exec(text("DROP TABLE alembic_version"))
|
||||
await database_service.run_migrations(fix=fix_migration)
|
||||
else:
|
||||
raise
|
||||
# This means there's wrong revision in the DB
|
||||
# We need to delete the alembic_version table
|
||||
# and run the migrations again
|
||||
logger.warning("Wrong revision in DB, deleting alembic_version table and running migrations again")
|
||||
async with session_getter(database_service) as session:
|
||||
await session.exec(text("DROP TABLE alembic_version"))
|
||||
await database_service.run_migrations(fix=fix_migration)
|
||||
except Exception as exc:
|
||||
error_message = str(exc)
|
||||
# if the exception involves tables already existing
|
||||
# we can ignore it
|
||||
if "already exists" not in str(exc):
|
||||
if "already exists" not in error_message:
|
||||
logger.exception(exc)
|
||||
raise
|
||||
await logger.adebug("Migration attempted to create existing table, skipping.")
|
||||
|
||||
Reference in New Issue
Block a user