Skip to content

Commit

Permalink
Change default asset alias group to 'asset' (apache#44778)
Browse files Browse the repository at this point in the history
And empty default group name is really unhelpful. I'm not entirely sure
if we should dump everything into 'asset' by default, or should aliases
be separated into their own default 'alias' group. We might have a
better idea when we have a real UI for this.

Also fixed migrations when providing a default to the new 'group'
column. Previously, migrations use 'default', which is client-side
and would not work correctly. A manual create-fill-alter 3-step
approach fixes this.
  • Loading branch information
uranusjr authored Dec 9, 2024
1 parent 238b209 commit 771d56b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,22 @@ def upgrade():
with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
batch_op.drop_index("idx_name_unique")
batch_op.create_index("idx_dataset_alias_name_unique", ["name"], unique=True)
# Add 'name' column. Set it to nullable for now.
# Add 'name' and 'group' columns. Set them to nullable for now.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE))
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default="", nullable=False))
# Fill name from uri column.
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE))
# Fill name from uri column, and group to 'asset'.
dataset_table = sa.table("dataset", sa.column("name"), sa.column("uri"), sa.column("group"))
with Session(bind=op.get_bind()) as session:
session.execute(sa.text("update dataset set name=uri"))
session.execute(sa.update(dataset_table).values(name=dataset_table.c.uri, group="asset"))
session.commit()
# Set the name column non-nullable.
# Set the name and group columns non-nullable.
# Now with values in there, we can create the new unique constraint and index.
# Due to MySQL restrictions, we are also reducing the length on uri.
with op.batch_alter_table("dataset", schema=None) as batch_op:
batch_op.alter_column("name", existing_type=_STRING_COLUMN_TYPE, nullable=False)
batch_op.alter_column("uri", type_=_STRING_COLUMN_TYPE, nullable=False)
batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False)
batch_op.drop_index("idx_uri_unique")
batch_op.create_index("idx_dataset_name_uri_unique", ["name", "uri"], unique=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy.orm import Session

# Revision identifiers, used by Alembic.
revision = "fb2d4922cd79"
Expand All @@ -59,7 +60,13 @@ def upgrade():
"""Tweak AssetAliasModel to match AssetModel."""
with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
batch_op.alter_column("name", type_=_STRING_COLUMN_TYPE, nullable=False)
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default=str, nullable=False))
batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE))
dataset_alias_table = sa.table("dataset_alias", sa.column("group"))
with Session(bind=op.get_bind()) as session:
session.execute(sa.update(dataset_alias_table).values(group="asset"))
session.commit()
with op.batch_alter_table("dataset_alias", schema=None) as batch_op:
batch_op.alter_column("group", type_=_STRING_COLUMN_TYPE, default="asset", nullable=False)


def downgrade():
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
805d79090c38fabc03b704d77a093094758349a13659a334262d5a7afc2e7e45
95b59583f58fa1dad1ae07cac699b0a72143abae9beaa33f79c3ff0f24e52c7d
2 changes: 1 addition & 1 deletion providers/tests/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def test_serialize_timetable():
{
"__type": DagAttributeTypes.ASSET_ALIAS,
"name": "another",
"group": "",
"group": "asset",
},
{
"__type": DagAttributeTypes.ASSET,
Expand Down
4 changes: 2 additions & 2 deletions task_sdk/src/airflow/sdk/definitions/asset/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ class Model(Asset):

@attrs.define(unsafe_hash=False)
class AssetAlias(BaseAsset):
"""A represeation of asset alias which is used to create asset during the runtime."""
"""A representation of asset alias which is used to create asset during the runtime."""

name: str = attrs.field(validator=_validate_non_empty_identifier)
group: str = attrs.field(kw_only=True, default="", validator=_validate_identifier)
group: str = attrs.field(kw_only=True, default="asset", validator=_validate_identifier)

def _resolve_assets(self) -> list[Asset]:
from airflow.models.asset import expand_alias_to_assets
Expand Down

0 comments on commit 771d56b

Please sign in to comment.