From 58342a07cac5b2d638554a171e83e329a525af4b Mon Sep 17 00:00:00 2001 From: Abraham 'Abram' Israel Date: Fri, 11 Oct 2024 08:42:27 +0100 Subject: [PATCH 1/4] refactor (backend): improve migration handling by checking only the latest migration head - Refactored the migration check to compare the current database migration head against the latest migration script head. - Enhanced `get_current_migration_head_from_db` to ensure only one migration head is tracked. - Updated logic in `get_pending_migration_head` to reduce unnecessary checks and handle first-time setup more efficiently. - Improved error handling and clarified logging messages for better debugging. --- .../migrations/postgres/utils.py | 82 +++++++------------ 1 file changed, 29 insertions(+), 53 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/postgres/utils.py b/agenta-backend/agenta_backend/migrations/postgres/utils.py index a327e32f55..5914d780f1 100644 --- a/agenta-backend/agenta_backend/migrations/postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/postgres/utils.py @@ -5,14 +5,13 @@ import click import asyncpg - -from sqlalchemy import inspect, text, Engine from sqlalchemy.exc import ProgrammingError -from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from alembic import command from alembic.config import Config +from sqlalchemy import inspect, text from alembic.script import ScriptDirectory +from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from agenta_backend.utils.common import isCloudEE, isCloudDev @@ -56,15 +55,15 @@ def is_initial_setup(engine) -> bool: return not all_tables_exist -async def get_applied_migrations(engine: AsyncEngine): +async def get_current_migration_head_from_db(engine: AsyncEngine): """ - Checks the alembic_version table to get all the migrations that has been applied. + Checks the alembic_version table to get the current migration head that has been applied. Args: engine (Engine): The engine that connects to an sqlalchemy pool Returns: - a list of strings + the current migration head (where 'head' is the revision stored in the migration script) """ async with engine.connect() as connection: @@ -76,32 +75,37 @@ async def get_applied_migrations(engine: AsyncEngine): # to make Alembic start tracking the migration changes. # -------------------------------------------------------------------------------------- # This effect (the exception raising) happens for both users (first-time and returning) - return ["alembic_version"] + return "alembic_version" - applied_migrations = [row[0] for row in result.fetchall()] - return applied_migrations + migration_heads = [row[0] for row in result.fetchall()] + assert ( + len(migration_heads) == 1 + ), "There can only be one migration head stored in the database." + return migration_heads[0] -async def get_pending_migrations(): +async def get_pending_migration_head(): """ - Gets the migrations that have not been applied. + Gets the migration head that have not been applied. Returns: - the number of pending migrations + the pending migration head """ engine = create_async_engine(url=os.environ["POSTGRES_URI"]) try: - applied_migrations = await get_applied_migrations(engine=engine) - migration_files = [script.revision for script in script.walk_revisions()] - pending_migrations = [m for m in migration_files if m not in applied_migrations] - - if "alembic_version" in applied_migrations: - pending_migrations.append("alembic_version") + current_migration_script_head = script.get_current_head() + migration_head_from_db = await get_current_migration_head_from_db(engine=engine) + + pending_migration_head = [] + if current_migration_script_head != migration_head_from_db: + pending_migration_head.append(current_migration_script_head) + if "alembic_version" == migration_head_from_db: + pending_migration_head.append("alembic_version") finally: await engine.dispose() - return pending_migrations + return pending_migration_head def run_alembic_migration(): @@ -110,9 +114,9 @@ def run_alembic_migration(): """ try: - pending_migrations = asyncio.run(get_pending_migrations()) + pending_migration_head = asyncio.run(get_pending_migration_head()) APPLY_AUTO_MIGRATIONS = os.environ.get("AGENTA_AUTO_MIGRATIONS") - FIRST_TIME_USER = True if "alembic_version" in pending_migrations else False + FIRST_TIME_USER = True if "alembic_version" in pending_migration_head else False if FIRST_TIME_USER or APPLY_AUTO_MIGRATIONS == "true": command.upgrade(alembic_cfg, "head") @@ -134,7 +138,7 @@ def run_alembic_migration(): except Exception as e: click.echo( click.style( - f"\nAn ERROR occured while applying migration: {traceback.format_exc()}\nThe container will now exit.", + f"\nAn ERROR occurred while applying migration: {traceback.format_exc()}\nThe container will now exit.", fg="red", ), color=True, @@ -147,11 +151,11 @@ async def check_for_new_migrations(): Checks for new migrations and notify the user. """ - pending_migrations = await get_pending_migrations() - if len(pending_migrations) >= 1: + pending_migration_head = await get_pending_migration_head() + if len(pending_migration_head) >= 1 and isinstance(pending_migration_head[0], str): click.echo( click.style( - f"\nWe have detected that there are pending database migrations {pending_migrations} that need to be applied to keep the application up to date. To ensure the application functions correctly with the latest updates, please follow the guide here => https://docs.agenta.ai/self-host/migration/applying-schema-migration\n", + f"\nWe have detected that there are pending database migrations {pending_migration_head} that need to be applied to keep the application up to date. To ensure the application functions correctly with the latest updates, please follow the guide here => https://docs.agenta.ai/self-host/migration/applying-schema-migration\n", fg="yellow", ), color=True, @@ -174,31 +178,3 @@ async def check_if_templates_table_exist(): await engine.dispose() return True - - -def unique_constraint_exists( - engine: Engine, table_name: str, constraint_name: str -) -> bool: - """ - The function checks if a unique constraint with a specific name exists on a table in a PostgreSQL - database. - - Args: - - engine (Engine): instance of a database engine that represents a connection to a database. - - table_name (str): name of the table to check the existence of the unique constraint. - - constraint_name (str): name of the unique constraint to check for existence. - - Returns: - - returns a boolean value indicating whether a unique constraint with the specified `constraint_name` exists in the table. - """ - - with engine.connect() as conn: - result = conn.execute( - text( - f""" - SELECT conname FROM pg_constraint - WHERE conname = '{constraint_name}' AND conrelid = '{table_name}'::regclass; - """ - ) - ) - return result.fetchone() is not None From bf16cafac5a315b4da621ddecfbb833648a44caf Mon Sep 17 00:00:00 2001 From: Abraham 'Abram' Israel Date: Tue, 22 Oct 2024 00:40:37 +0100 Subject: [PATCH 2/4] refactor (build): improve postgres_url to include database in backend and alembic migration service in test yml file --- docker-compose.test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index a4008ea40a..256ae31d6f 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -15,7 +15,7 @@ services: build: ./agenta-backend container_name: agenta-backend-test environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/postgres - REDIS_URL=redis://redis:6379/0 - ENVIRONMENT=${ENVIRONMENT} - BARE_DOMAIN_NAME=localhost @@ -71,7 +71,7 @@ services: command: sh -c "python -c 'from agenta_backend.migrations.postgres.utils import run_alembic_migration; run_alembic_migration()'" environment: - FEATURE_FLAG=oss - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/postgres - ALEMBIC_CFG_PATH=/app/agenta_backend/migrations/postgres/alembic.oss.ini - AGENTA_AUTO_MIGRATIONS=true volumes: @@ -164,7 +164,7 @@ services: volumes: - postgresdb-data:/var/lib/postgresql/data/ healthcheck: - test: ["CMD-SHELL", "pg_isready -U username -d agenta_oss"] + test: ["CMD-SHELL", "pg_isready -U username -d postgres"] interval: 10s timeout: 5s retries: 5 From 846cfefbd195292e3b010679beee0d7201ac7f36 Mon Sep 17 00:00:00 2001 From: Abraham 'Abram' Israel Date: Tue, 22 Oct 2024 00:41:10 +0100 Subject: [PATCH 3/4] feat (migrations): add utility function to check for unique constraint and remove redundant imports --- .../migrations/postgres/utils.py | 34 ++++++++++++++++--- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/agenta-backend/agenta_backend/migrations/postgres/utils.py b/agenta-backend/agenta_backend/migrations/postgres/utils.py index 5914d780f1..4395845347 100644 --- a/agenta-backend/agenta_backend/migrations/postgres/utils.py +++ b/agenta-backend/agenta_backend/migrations/postgres/utils.py @@ -5,16 +5,14 @@ import click import asyncpg -from sqlalchemy.exc import ProgrammingError - from alembic import command +from sqlalchemy import Engine from alembic.config import Config from sqlalchemy import inspect, text from alembic.script import ScriptDirectory +from sqlalchemy.exc import ProgrammingError from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine -from agenta_backend.utils.common import isCloudEE, isCloudDev - # Initializer logger logger = logging.getLogger("alembic.env") @@ -178,3 +176,31 @@ async def check_if_templates_table_exist(): await engine.dispose() return True + + +def unique_constraint_exists( + engine: Engine, table_name: str, constraint_name: str +) -> bool: + """ + The function checks if a unique constraint with a specific name exists on a table in a PostgreSQL + database. + + Args: + - engine (Engine): instance of a database engine that represents a connection to a database. + - table_name (str): name of the table to check the existence of the unique constraint. + - constraint_name (str): name of the unique constraint to check for existence. + + Returns: + - returns a boolean value indicating whether a unique constraint with the specified `constraint_name` exists in the table. + """ + + with engine.connect() as conn: + result = conn.execute( + text( + f""" + SELECT conname FROM pg_constraint + WHERE conname = '{constraint_name}' AND conrelid = '{table_name}'::regclass; + """ + ) + ) + return result.fetchone() is not None From d6d968bbce62aace5eb1c8c802d1edb205c39a2b Mon Sep 17 00:00:00 2001 From: Abraham 'Abram' Israel Date: Fri, 25 Oct 2024 13:20:25 +0100 Subject: [PATCH 4/4] chore (backend): include database name in postgres_url environment for celery_worker service in test compose --- docker-compose.test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 256ae31d6f..73b24025ed 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -131,7 +131,7 @@ services: command: > watchmedo auto-restart --directory=./agenta_backend --pattern=*.py --recursive -- celery -A agenta_backend.main.celery_app worker --concurrency=1 --loglevel=INFO environment: - - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432 + - POSTGRES_URI=postgresql+asyncpg://username:password@postgres:5432/postgres - REDIS_URL=redis://redis:6379/0 - ALEMBIC_CFG_PATH=/app/agenta_backend/migrations/postgres/alembic.oss.ini - ENVIRONMENT=${ENVIRONMENT}