diff --git a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py index 2676176692a81..c3e8edfc3b928 100644 --- a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py +++ b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py @@ -58,20 +58,22 @@ def upgrade(): with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.drop_index("idx_name_unique") batch_op.create_index("idx_dataset_alias_name_unique", ["name"], unique=True) - # Add 'name' column. Set it to nullable for now. + # Add 'name' and 'group' columns. Set them to nullable for now. with op.batch_alter_table("dataset", schema=None) as batch_op: batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE)) - batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default="", nullable=False)) - # Fill name from uri column. + batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE)) + # Fill name from uri column, and group to 'asset'. + dataset_table = sa.table("dataset", sa.column("name"), sa.column("uri"), sa.column("group")) with Session(bind=op.get_bind()) as session: - session.execute(sa.text("update dataset set name=uri")) + session.execute(sa.update(dataset_table).values(name=dataset_table.c.uri, group="asset")) session.commit() - # Set the name column non-nullable. + # Set the name and group columns non-nullable. # Now with values in there, we can create the new unique constraint and index. # Due to MySQL restrictions, we are also reducing the length on uri. with op.batch_alter_table("dataset", schema=None) as batch_op: batch_op.alter_column("name", existing_type=_STRING_COLUMN_TYPE, nullable=False) batch_op.alter_column("uri", type_=_STRING_COLUMN_TYPE, nullable=False) + batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False) batch_op.drop_index("idx_uri_unique") batch_op.create_index("idx_dataset_name_uri_unique", ["name", "uri"], unique=True) diff --git a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py index f1b57974f053c..25b4f1871938d 100644 --- a/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py +++ b/airflow/migrations/versions/0039_3_0_0_tweak_assetaliasmodel_to_match_asset.py @@ -41,6 +41,7 @@ import sqlalchemy as sa from alembic import op +from sqlalchemy.orm import Session # Revision identifiers, used by Alembic. revision = "fb2d4922cd79" @@ -59,7 +60,13 @@ def upgrade(): """Tweak AssetAliasModel to match AssetModel.""" with op.batch_alter_table("dataset_alias", schema=None) as batch_op: batch_op.alter_column("name", type_=_STRING_COLUMN_TYPE, nullable=False) - batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default=str, nullable=False)) + batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE)) + dataset_alias_table = sa.table("dataset_alias", sa.column("group")) + with Session(bind=op.get_bind()) as session: + session.execute(sa.update(dataset_alias_table).values(group="asset")) + session.commit() + with op.batch_alter_table("dataset_alias", schema=None) as batch_op: + batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False) def downgrade(): diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 4d679e27175fc..e1bb8bb19319a 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -805d79090c38fabc03b704d77a093094758349a13659a334262d5a7afc2e7e45 \ No newline at end of file +95b59583f58fa1dad1ae07cac699b0a72143abae9beaa33f79c3ff0f24e52c7d \ No newline at end of file diff --git a/providers/tests/openlineage/plugins/test_utils.py b/providers/tests/openlineage/plugins/test_utils.py index 475624fef8fbd..8f786cb4c4b51 100644 --- a/providers/tests/openlineage/plugins/test_utils.py +++ b/providers/tests/openlineage/plugins/test_utils.py @@ -370,7 +370,7 @@ def test_serialize_timetable(): { "__type": DagAttributeTypes.ASSET_ALIAS, "name": "another", - "group": "", + "group": "asset", }, { "__type": DagAttributeTypes.ASSET, diff --git a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py index 7ea61e905f509..9f5b85ccb161b 100644 --- a/task_sdk/src/airflow/sdk/definitions/asset/__init__.py +++ b/task_sdk/src/airflow/sdk/definitions/asset/__init__.py @@ -391,10 +391,10 @@ class Model(Asset): @attrs.define(unsafe_hash=False) class AssetAlias(BaseAsset): - """A represeation of asset alias which is used to create asset during the runtime.""" + """A representation of asset alias which is used to create asset during the runtime.""" name: str = attrs.field(validator=_validate_non_empty_identifier) - group: str = attrs.field(kw_only=True, default="", validator=_validate_identifier) + group: str = attrs.field(kw_only=True, default="asset", validator=_validate_identifier) def _resolve_assets(self) -> list[Asset]: from airflow.models.asset import expand_alias_to_assets