Skip to content

Commit

Permalink
Introduce dg.map_asset_specs
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Nov 25, 2024
1 parent 0de7987 commit 40db08b
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 1 deletion.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Refer to the `Asset definitions <https://docs.dagster.io/concepts/assets/softwar

.. autoclass:: AssetKey

.. autofunction:: map_asset_specs

Graph-backed asset definitions
------------------------------

Expand Down
5 changes: 4 additions & 1 deletion python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@
from dagster._core.definitions.asset_sensor_definition import (
AssetSensorDefinition as AssetSensorDefinition,
)
from dagster._core.definitions.asset_spec import AssetSpec as AssetSpec
from dagster._core.definitions.asset_spec import (
AssetSpec as AssetSpec,
map_asset_specs as map_asset_specs,
)
from dagster._core.definitions.assets import AssetsDefinition as AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import (
AutoMaterializePolicy as AutoMaterializePolicy,
Expand Down
57 changes: 57 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Iterable,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Union,
overload,
)

import dagster._check as check
Expand Down Expand Up @@ -42,6 +45,7 @@

if TYPE_CHECKING:
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.assets import AssetsDefinition

# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset
# (which currently ends up on the Output associated with the asset key)
Expand Down Expand Up @@ -382,3 +386,56 @@ def merge_attributes(
auto_materialize_policy=self.auto_materialize_policy,
partitions_def=self.partitions_def,
)


@overload
def map_asset_specs(
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[AssetSpec]
) -> Sequence[AssetSpec]: ...


@overload
def map_asset_specs(
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable["AssetsDefinition"]
) -> Sequence["AssetsDefinition"]: ...


@overload
def map_asset_specs(
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[Union["AssetsDefinition", AssetSpec]]
) -> Sequence[Union["AssetsDefinition", AssetSpec]]: ...


def map_asset_specs(
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[Union["AssetsDefinition", AssetSpec]]
) -> Sequence[Union["AssetsDefinition", AssetSpec]]:
"""Map a function over a sequence of AssetSpecs or AssetsDefinitions, replacing specs in the sequence
or specs in an AssetsDefinitions with the result of the function.
Args:
func (Callable[[AssetSpec], AssetSpec]): The function to apply to each AssetSpec.
iterable (Iterable[Union[AssetsDefinition, AssetSpec]]): The sequence of AssetSpecs or AssetsDefinitions.
Returns:
Sequence[Union[AssetsDefinition, AssetSpec]]: A sequence of AssetSpecs or AssetsDefinitions with the function applied
to each spec.
Examples:
.. code-block:: python
from dagster import AssetSpec, map_asset_specs
asset_specs = [
AssetSpec(key="my_asset"),
AssetSpec(key="my_asset_2"),
]
mapped_specs = map_asset_specs(lambda spec: spec.replace_attributes(owners=["[email protected]"]), asset_specs)
"""
from dagster._core.definitions.assets import AssetsDefinition

return [
obj.map_asset_specs(func) if isinstance(obj, AssetsDefinition) else func(obj)
for obj in iterable
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import cast

import dagster as dg
import pytest
from dagster import AssetSpec, AutoMaterializePolicy, AutomationCondition
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError


Expand Down Expand Up @@ -142,3 +146,87 @@ def test_merge_attributes_deps() -> None:

spec_new_dep = new_spec.merge_attributes(deps={AssetKey("baz")})
assert spec_new_dep.deps == [AssetDep(AssetKey("bar")), AssetDep(AssetKey("baz"))]


def test_map_asset_specs_basic_specs() -> None:
specs = [
AssetSpec(key="foo"),
AssetSpec(key="bar"),
]

mapped_specs = dg.map_asset_specs(
lambda spec: spec.replace_attributes(owners=["[email protected]"]), specs
)

assert all(spec.owners == ["[email protected]"] for spec in mapped_specs)


def test_map_asset_specs_basic_defs() -> None:
@dg.asset
def my_asset():
pass

@dg.asset
def my_other_asset():
pass

assets = [my_asset, my_other_asset]

mapped_assets = dg.map_asset_specs(
lambda spec: spec.replace_attributes(owners=["[email protected]"]), assets
)

assert all(
spec.owners == ["[email protected]"] for asset in mapped_assets for spec in asset.specs
)


def test_map_asset_specs_mixed_specs_defs() -> None:
@dg.asset
def my_asset():
pass

spec_and_defs = [
my_asset,
AssetSpec(key="bar"),
]

mapped_specs_and_defs = dg.map_asset_specs(
lambda spec: spec.replace_attributes(owners=["[email protected]"]), spec_and_defs
)

assert all(
spec.owners == ["[email protected]"]
for spec in cast(AssetsDefinition, mapped_specs_and_defs[0]).specs
)
assert cast(AssetSpec, mapped_specs_and_defs[1]).owners == ["[email protected]"]


def test_map_asset_specs_multi_asset() -> None:
@dg.multi_asset(
specs=[
AssetSpec(key="foo"),
AssetSpec(key="bar"),
]
)
def my_multi_asset():
pass

@dg.multi_asset(
specs=[
AssetSpec(key="baz"),
AssetSpec(key="qux"),
]
)
def my_other_multi_asset():
pass

assets = [my_multi_asset, my_other_multi_asset]

mapped_assets = dg.map_asset_specs(
lambda spec: spec.replace_attributes(owners=["[email protected]"]), assets
)

assert all(
spec.owners == ["[email protected]"] for asset in mapped_assets for spec in asset.specs
)

0 comments on commit 40db08b

Please sign in to comment.