diff --git a/src/backend/base/langflow/__main__.py b/src/backend/base/langflow/__main__.py index d016e352d..a2beb09e5 100644 --- a/src/backend/base/langflow/__main__.py +++ b/src/backend/base/langflow/__main__.py @@ -34,6 +34,7 @@ from langflow.initial_setup.setup import get_or_create_default_folder from langflow.main import setup_app from langflow.services.auth.utils import get_current_user_from_access_token from langflow.services.database.models.api_key.crud import check_key +from langflow.services.database.service import UnsupportedPostgreSQLVersionError, check_postgresql_version_sync from langflow.services.deps import get_db_service, get_settings_service, is_settings_service_initialized, session_scope from langflow.services.utils import initialize_services from langflow.utils.version import fetch_latest_version, get_version_info @@ -168,6 +169,9 @@ def wait_for_server_ready(host, port, protocol) -> None: status_code = 0 while status_code != httpx.codes.OK: + # If the server process died (e.g. database version check failed), stop waiting. + if process_manager.webapp_process and not process_manager.webapp_process.is_alive(): + sys.exit(process_manager.webapp_process.exitcode or 1) try: status_code = httpx.get( f"{protocol}://{health_check_host}:{port}/health", @@ -339,6 +343,15 @@ def run( # Step 3: Connecting Database (this happens inside setup_app via dependencies) with progress.step(3): + # Pre-flight: fail fast if PostgreSQL version is too old, before + # spawning any server process (avoids messy lifespan / worker errors). + database_url = settings_service.settings.database_url + if database_url: + try: + check_postgresql_version_sync(database_url) + except UnsupportedPostgreSQLVersionError: + sys.exit(1) + # check if port is being used if is_port_in_use(port, host): port = get_free_port(port) diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 34328f927..5b2ad92f1 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -2,9 +2,10 @@ import asyncio import json import os import re +import sys import tempfile import warnings -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from http import HTTPStatus from pathlib import Path from typing import TYPE_CHECKING, cast @@ -39,6 +40,7 @@ from langflow.initial_setup.setup import ( ) from langflow.middleware import ContentSizeLimitMiddleware from langflow.plugin_routes import load_plugin_routes +from langflow.services.database.service import UnsupportedPostgreSQLVersionError from langflow.services.deps import ( get_queue_service, get_service, @@ -316,6 +318,18 @@ def get_lifespan(*, fix_migration=False, version=None): yield except asyncio.CancelledError: await logger.adebug("Lifespan received cancellation signal") + except UnsupportedPostgreSQLVersionError: + # Normally caught by the pre-flight check in __main__.py + # before the server starts. If we get here anyway (e.g. + # direct uvicorn invocation via ``make backend``), exit + # immediately and tell the parent (reloader) to stop. + import signal + + sys.stdout.flush() + sys.stderr.flush() + with suppress(ProcessLookupError, PermissionError): + os.kill(os.getppid(), signal.SIGTERM) + os._exit(3) except Exception as exc: if "langflow migration --fix" not in str(exc): logger.exception(exc) diff --git a/src/backend/base/langflow/services/database/constants.py b/src/backend/base/langflow/services/database/constants.py new file mode 100644 index 000000000..51fb546c6 --- /dev/null +++ b/src/backend/base/langflow/services/database/constants.py @@ -0,0 +1,13 @@ +"""Database service constants.""" + +# Minimum PostgreSQL major version required by Langflow. +# The schema uses UNIQUE NULLS DISTINCT, which is only supported in PostgreSQL 15+. +MIN_POSTGRESQL_MAJOR_VERSION = 15 + +# User-facing message when migrations fail due to PostgreSQL < 15 (e.g. UNIQUE NULLS DISTINCT). +POSTGRESQL_VERSION_REQUIRED_MESSAGE = ( + f"Langflow requires PostgreSQL {MIN_POSTGRESQL_MAJOR_VERSION} or higher when using PostgreSQL as the database. " + "The current PostgreSQL version does not support the syntax used by Langflow's schema. " + f"Please upgrade your PostgreSQL instance to version {MIN_POSTGRESQL_MAJOR_VERSION} or higher. " + "See: https://docs.langflow.org/configuration-custom-database" +) diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 590f0694f..d123f1698 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -29,6 +29,10 @@ from langflow.helpers.windows_postgres_helper import configure_windows_postgres_ from langflow.initial_setup.constants import STARTER_FOLDER_NAME from langflow.services.base import Service from langflow.services.database import models +from langflow.services.database.constants import ( + MIN_POSTGRESQL_MAJOR_VERSION, + POSTGRESQL_VERSION_REQUIRED_MESSAGE, +) from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.database.session import NoopSession from langflow.services.database.utils import Result, TableResults @@ -39,6 +43,50 @@ if TYPE_CHECKING: from lfx.services.settings.service import SettingsService +class UnsupportedPostgreSQLVersionError(Exception): + """Raised when the PostgreSQL version is below the minimum required.""" + + +_PG_VERSION_QUERY = sa.text("SELECT current_setting('server_version_num'), current_setting('server_version')") + + +def _check_version_row(version_num_str: str, version_str: str) -> None: + """Raise ``UnsupportedPostgreSQLVersionError`` when the version is too old.""" + if int(version_num_str) < MIN_POSTGRESQL_MAJOR_VERSION * 10000: + msg = f"Running PostgreSQL {version_str}. {POSTGRESQL_VERSION_REQUIRED_MESSAGE}" + logger.error(msg) + raise UnsupportedPostgreSQLVersionError(msg) + + +def check_postgresql_version_sync(database_url: str) -> None: + """Pre-flight check: verify PostgreSQL >= 15 using a synchronous connection. + + Call this *before* starting uvicorn/gunicorn so a version mismatch + results in a clean ``sys.exit(1)`` rather than a messy lifespan failure. + Silently returns when the URL is not PostgreSQL. + """ + if not database_url.startswith(("postgresql", "postgres")): + return + + from sqlalchemy import create_engine + + # Normalise the async URL to a sync-compatible one. + url = database_url + if url.startswith("postgres://"): + url = "postgresql://" + url.split("://", 1)[1] + # Strip async driver suffixes so create_engine picks the default sync driver. + for async_driver in ("+asyncpg", "+aiosqlite"): + url = url.replace(async_driver, "") + + engine = create_engine(url) + try: + with engine.connect() as conn: + row = conn.execute(_PG_VERSION_QUERY).fetchone() + _check_version_row(*row) + finally: + engine.dispose() + + class DatabaseService(Service): name = "database_service" @@ -226,6 +274,23 @@ class DatabaseService(Service): async with self.async_session_maker() as session: yield session + async def ensure_postgresql_version(self) -> None: + """If the database is PostgreSQL, ensure it is version 15 or higher. + + Langflow's schema uses UNIQUE NULLS DISTINCT, which is only supported in PostgreSQL 15+. + Logs the message and raises UnsupportedPostgreSQLVersionError if the version is too old. + """ + if not self.database_url.startswith(("postgresql", "postgres")): + return + if self.settings_service.settings.use_noop_database: + return + async with session_scope() as session: + result = await session.execute(_PG_VERSION_QUERY) + version_num_str, version_str = result.fetchone() + # Raise AFTER session_scope exits so session_scope doesn't log a + # noisy "An error occurred during the session scope." traceback. + _check_version_row(version_num_str, version_str) + async def assign_orphaned_flows_to_superuser(self) -> None: """Assign orphaned flows to the default superuser when auto login is enabled.""" settings_service = get_settings_service() diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index ddba83edd..5f6956ee5 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -19,6 +19,7 @@ async def initialize_database(*, fix_migration: bool = False) -> None: from langflow.services.deps import get_db_service database_service: DatabaseService = get_db_service() + await database_service.ensure_postgresql_version() try: if database_service.settings_service.settings.database_connection_retry: await database_service.create_db_and_tables_with_retry() @@ -40,23 +41,25 @@ async def initialize_database(*, fix_migration: bool = False) -> None: try: await database_service.run_migrations(fix=fix_migration) except CommandError as exc: - # if "overlaps with other requested revisions" or "Can't locate revision identified by" - # are not in the exception, we can't handle it - if "overlaps with other requested revisions" not in str( - exc - ) and "Can't locate revision identified by" not in str(exc): + error_message = str(exc) + if ( + "overlaps with other requested revisions" in error_message + or "Can't locate revision identified by" in error_message + ): + # Wrong revision in the DB: delete alembic_version and re-run migrations + logger.warning("Wrong revision in DB, deleting alembic_version table and running migrations again") + from langflow.services.deps import session_scope + + async with session_scope() as session: + await session.exec(text("DROP TABLE alembic_version")) + await database_service.run_migrations(fix=fix_migration) + else: raise - # This means there's wrong revision in the DB - # We need to delete the alembic_version table - # and run the migrations again - logger.warning("Wrong revision in DB, deleting alembic_version table and running migrations again") - async with session_getter(database_service) as session: - await session.exec(text("DROP TABLE alembic_version")) - await database_service.run_migrations(fix=fix_migration) except Exception as exc: + error_message = str(exc) # if the exception involves tables already existing # we can ignore it - if "already exists" not in str(exc): + if "already exists" not in error_message: logger.exception(exc) raise await logger.adebug("Migration attempted to create existing table, skipping.")