diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py index 84528cbeb9163..03a58aee22b2f 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/package_spec.py @@ -125,6 +125,7 @@ class PackageSpec: queue: Optional[BuildkiteQueue] = None run_pytest: bool = True always_run_if: Optional[Callable[[], bool]] = None + skip_if: Optional[Callable[[], Optional[str]]] = None def __post_init__(self): if not self.name: @@ -161,9 +162,13 @@ def build_steps(self) -> List[BuildkiteTopLevelStep]: if v not in unsupported_python_versions ] - pytest_python_versions = sorted( - list(set(default_python_versions) - set(unsupported_python_versions)) - ) + pytest_python_versions = [ + AvailablePythonVersion(v) + for v in sorted( + set(e.value for e in default_python_versions) + - set(e.value for e in unsupported_python_versions) + ) + ] # Use highest supported python version if no defaults_match if len(pytest_python_versions) == 0: pytest_python_versions = [supported_python_versions[-1]] @@ -243,22 +248,39 @@ def requirements(self): @property def skip_reason(self) -> Optional[str]: - # Memoize so we don't log twice - if self._should_skip is False: - return None - - if self.always_run_if and self.always_run_if(): - self._should_skip = False - return None - - if self._skip_reason: + """Provides a message if this package's steps should be skipped on this run, and no message if the package's steps should be run. + We actually use this to determine whether or not to run the package. + + Because we use an archaic version of python to build our images, we can't use `cached_property`, and so we reinvent the wheel here with + self._should_skip and self._skip_reason. When we determine definitively that a package should or shouldn't be skipped, we cache the result on self._should_skip + as a boolean (it starts out as None), and cache the skip reason (or lack thereof) on self._skip_reason. + """ + # If self._should_skip is not None, then the result is cached on self._skip_reason and we can return it. + if self._should_skip is not None: + if self._should_skip is True: + assert ( + self._skip_reason is not None + ), "Expected skip reason to be set if self._should_skip is True." return self._skip_reason + # If the result is not cached, check for NO_SKIP signifier first, so that it always + # takes precedent. if message_contains("NO_SKIP"): logging.info(f"Building {self.name} because NO_SKIP set") self._should_skip = False return None + if self.always_run_if and self.always_run_if(): + self._should_skip = False + self._skip_reason = None + return None + if self.skip_if and self.skip_if(): + self._skip_reason = self.skip_if() + self._should_skip = True + return self._skip_reason + # Take account of feature_branch changes _after_ skip_if so that skip_if + # takes precedent. This way, integration tests can run on branch but won't be + # forced to run on every master commit. if not is_feature_branch(os.getenv("BUILDKITE_BRANCH", "")): logging.info(f"Building {self.name} we're not on a feature branch") self._should_skip = False diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_main.py b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_main.py index fc8dc0426c6a2..a79cfcde575d1 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_main.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_main.py @@ -52,6 +52,7 @@ def build_dagster_oss_main_steps() -> List[BuildkiteStep]: ), "DAGSTER_CHECKOUT_DEPTH": _get_setting("DAGSTER_CHECKOUT_DEPTH") or "100", "OSS_COMPAT_SLIM": "1" if oss_compat_slim else "", + "DAGSTER_FROM_OSS": "1" if pipeline_name == "internal" else "0", }, ), ) diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py index 23dc34b5f87ca..d0e307db08400 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py @@ -42,6 +42,28 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]: ], pytest_extra_cmds=k8s_extra_cmds, ), + PackageSpec( + "examples/experimental/dagster-dlift/kitchen-sink", + name="dbt-cloud-live-tests", + env_vars=[ + "KS_DBT_CLOUD_ACCOUNT_ID", + "KS_DBT_CLOUD_PROJECT_ID", + "KS_DBT_CLOUD_TOKEN", + "KS_DBT_CLOUD_ACCESS_URL", + "KS_DBT_CLOUD_DISCOVERY_API_URL", + ], + ), + PackageSpec( + "examples/experimental/dagster-airlift/examples/dbt-example", + name="airlift-demo-live-tests", + env_vars=[ + "KS_DBT_CLOUD_ACCOUNT_ID", + "KS_DBT_CLOUD_PROJECT_ID", + "KS_DBT_CLOUD_TOKEN", + "KS_DBT_CLOUD_ACCESS_URL", + "KS_DBT_CLOUD_DISCOVERY_API_URL", + ], + ), ] ) diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/prerelease_package.py b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/prerelease_package.py index 7bda9c847b57d..f5611f2048fa4 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/prerelease_package.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/prerelease_package.py @@ -1,5 +1,3 @@ -import re -from pathlib import Path from typing import List from dagster_buildkite.python_version import AvailablePythonVersion @@ -17,14 +15,6 @@ def build_prerelease_package_steps() -> List[BuildkiteStep]: + _get_uncustomized_pkg_roots("examples/experimental", []) ) - # Get only packages that have a fixed version in setup.py - filtered_packages = [] - for package in packages: - setup_file = Path(package) / "setup.py" - contents = setup_file.read_text() - if re.findall(r"version=\"[\d\.]+\"", contents): - filtered_packages.append(package) - input_step: BlockStep = { "block": ":question: Choose package", "prompt": None, @@ -39,7 +29,7 @@ def build_prerelease_package_steps() -> List[BuildkiteStep]: else package, "value": package, } - for package in filtered_packages + for package in packages ], "hint": None, "default": None, diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/python_version.py b/.buildkite/dagster-buildkite/dagster_buildkite/python_version.py index 529b59c32661c..68b27a661340e 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/python_version.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/python_version.py @@ -5,9 +5,8 @@ from dagster_buildkite.utils import is_release_branch, safe_getenv -class AvailablePythonVersion(str, Enum): +class AvailablePythonVersion(Enum): # Ordering is important here, because some steps will take the highest/lowest available version. - V3_8 = "3.8" V3_9 = "3.9" V3_10 = "3.10" V3_11 = "3.11" @@ -21,6 +20,12 @@ def get_all(cls) -> List["AvailablePythonVersion"]: def get_default(cls) -> "AvailablePythonVersion": return cls["V3_11"] + # Useful for providing to `PackageSpec.unsupported_python_versions` when you only want to test + # the default version. + @classmethod + def get_all_except_default(cls) -> List["AvailablePythonVersion"]: + return [v for v in cls.get_all() if v != cls.get_default()] + @classmethod def get_pytest_defaults(cls) -> List["AvailablePythonVersion"]: branch_name = safe_getenv("BUILDKITE_BRANCH") @@ -55,6 +60,6 @@ def from_major_minor(cls, major: int, minor: int) -> "AvailablePythonVersion": @classmethod def to_tox_factor(cls, version: "AvailablePythonVersion") -> str: - ver_parts = version.split(".") + ver_parts = version.value.split(".") major, minor = ver_parts[0], ver_parts[1] return f"py{major}{minor}" diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/step_builder.py b/.buildkite/dagster-buildkite/dagster_buildkite/step_builder.py index f8ab1d790620e..0ef854abf775b 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/step_builder.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/step_builder.py @@ -109,7 +109,7 @@ def on_test_image( raise Exception(f"Unsupported python version for test image: {ver}.") return self.on_python_image( - image=f"buildkite-test:py{ver}-{BUILDKITE_TEST_IMAGE_VERSION}", + image=f"buildkite-test:py{ver.value}-{BUILDKITE_TEST_IMAGE_VERSION}", env=env, ) diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/helm.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/helm.py index a21df88a5b717..c90ee12e306dd 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/helm.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/helm.py @@ -20,7 +20,6 @@ def build_helm_steps() -> List[BuildkiteStep]: os.path.join("helm", "dagster", "schema"), unsupported_python_versions=[ # run helm schema tests only once, on the latest python version - AvailablePythonVersion.V3_8, AvailablePythonVersion.V3_9, AvailablePythonVersion.V3_10, AvailablePythonVersion.V3_11, diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py index 74c27cd8e0497..98ea3ec22004e 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py @@ -6,7 +6,11 @@ GCP_CREDS_LOCAL_FILE, LATEST_DAGSTER_RELEASE, ) -from dagster_buildkite.package_spec import PackageSpec, UnsupportedVersionsFunction +from dagster_buildkite.package_spec import ( + PackageSpec, + PytestExtraCommandsFunction, + UnsupportedVersionsFunction, +) from dagster_buildkite.python_version import AvailablePythonVersion from dagster_buildkite.step_builder import BuildkiteQueue from dagster_buildkite.steps.test_project import test_project_depends_fn @@ -60,12 +64,12 @@ def build_backcompat_suite_steps() -> List[BuildkiteTopLevelStep]: ) -def backcompat_extra_cmds(_, factor: str) -> List[str]: +def backcompat_extra_cmds(_, factor: Optional[str]) -> List[str]: tox_factor_map = { "user-code-latest-release": LATEST_DAGSTER_RELEASE, "user-code-earliest-release": EARLIEST_TESTED_RELEASE, } - + assert factor webserver_version = DAGSTER_CURRENT_BRANCH webserver_library_version = _get_library_version(webserver_version) user_code_version = tox_factor_map[factor] @@ -163,7 +167,7 @@ def build_auto_materialize_perf_suite_steps(): def daemon_pytest_extra_cmds(version: AvailablePythonVersion, _): return [ - "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version, + "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, 'export DAGSTER_DOCKER_REPOSITORY="$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com"', "pushd integration_tests/test_suites/daemon-test-suite/monitoring_daemon_tests/", "docker-compose up -d --remove-orphans", @@ -182,7 +186,7 @@ def daemon_pytest_extra_cmds(version: AvailablePythonVersion, _): # ######################## -def build_k8s_suite_steps(): +def build_k8s_suite_steps() -> List[BuildkiteTopLevelStep]: pytest_tox_factors = ["-default", "-subchart"] directory = os.path.join("integration_tests", "test_suites", "k8s-test-suite") return build_integration_suite_steps( @@ -201,7 +205,7 @@ def build_k8s_suite_steps(): def build_integration_suite_steps( directory: str, pytest_tox_factors: Optional[List[str]], - pytest_extra_cmds: Optional[Callable] = None, + pytest_extra_cmds: Optional[PytestExtraCommandsFunction] = None, queue=None, always_run_if: Optional[Callable[[], bool]] = None, unsupported_python_versions: Optional[ @@ -229,19 +233,19 @@ def build_integration_suite_steps( ).build_steps() -def k8s_integration_suite_pytest_extra_cmds(version: str, _) -> List[str]: +def k8s_integration_suite_pytest_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: return [ - "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version, + "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, 'export DAGSTER_DOCKER_REPOSITORY="$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com"', "aws ecr get-login --no-include-email --region us-west-2 | sh", ] -def celery_k8s_integration_suite_pytest_extra_cmds(version: str, _) -> List[str]: +def celery_k8s_integration_suite_pytest_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: cmds = [ 'export AIRFLOW_HOME="/airflow"', "mkdir -p $${AIRFLOW_HOME}", - "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version, + "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, 'export DAGSTER_DOCKER_REPOSITORY="$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com"', "aws ecr get-login --no-include-email --region us-west-2 | sh", ] diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py index 91b271775f46c..ddc6291ff301f 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py @@ -6,6 +6,7 @@ from dagster_buildkite.defines import GCP_CREDS_FILENAME, GCP_CREDS_LOCAL_FILE, GIT_REPO_ROOT from dagster_buildkite.package_spec import PackageSpec from dagster_buildkite.python_version import AvailablePythonVersion +from dagster_buildkite.step_builder import BuildkiteQueue from dagster_buildkite.steps.test_project import test_project_depends_fn from dagster_buildkite.utils import ( BuildkiteStep, @@ -13,6 +14,8 @@ has_dagster_airlift_changes, has_storage_test_fixture_changes, network_buildkite_container, + skip_if_not_airlift_or_dlift_commit, + skip_if_not_dlift_commit, ) @@ -89,7 +92,7 @@ def _get_uncustomized_pkg_roots(root: str, custom_pkg_roots: List[str]) -> List[ # ######################## -def airflow_extra_cmds(version: str, _) -> List[str]: +def airflow_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: return [ 'export AIRFLOW_HOME="/airflow"', "mkdir -p $${AIRFLOW_HOME}", @@ -157,9 +160,9 @@ def dagster_graphql_extra_cmds(_, tox_factor: Optional[str]) -> List[str]: ] -def celery_extra_cmds(version: str, _) -> List[str]: +def celery_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: return [ - "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version, + "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, 'export DAGSTER_DOCKER_REPOSITORY="$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com"', "pushd python_modules/libraries/dagster-celery", # Run the rabbitmq db. We are in docker running docker @@ -175,7 +178,7 @@ def celery_extra_cmds(version: str, _) -> List[str]: ] -def celery_docker_extra_cmds(version: str, _) -> List[str]: +def celery_docker_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: return celery_extra_cmds(version, _) + [ "pushd python_modules/libraries/dagster-celery-docker/dagster_celery_docker_tests/", "docker-compose up -d --remove-orphans", @@ -189,9 +192,9 @@ def celery_docker_extra_cmds(version: str, _) -> List[str]: ] -def docker_extra_cmds(version: str, _) -> List[str]: +def docker_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: return [ - "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version, + "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, 'export DAGSTER_DOCKER_REPOSITORY="$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com"', "pushd python_modules/libraries/dagster-docker/dagster_docker_tests/", "docker-compose up -d --remove-orphans", @@ -227,9 +230,9 @@ def docker_extra_cmds(version: str, _) -> List[str]: ] -def k8s_extra_cmds(version: str, _) -> List[str]: +def k8s_extra_cmds(version: AvailablePythonVersion, _) -> List[str]: return [ - "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version, + "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, 'export DAGSTER_DOCKER_REPOSITORY="$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com"', ] @@ -272,7 +275,6 @@ def k8s_extra_cmds(version: str, _) -> List[str]: PackageSpec( "examples/with_airflow", unsupported_python_versions=[ - AvailablePythonVersion.V3_9, AvailablePythonVersion.V3_10, AvailablePythonVersion.V3_11, AvailablePythonVersion.V3_12, @@ -288,15 +290,19 @@ def k8s_extra_cmds(version: str, _) -> List[str]: PackageSpec( "examples/docs_snippets", pytest_extra_cmds=docs_snippets_extra_cmds, - unsupported_python_versions=[ - # dependency on 3.9-incompatible extension libs - AvailablePythonVersion.V3_9, - # dagster-airflow dep - AvailablePythonVersion.V3_12, - ], + # The docs_snippets test suite also installs a ton of packages in the same environment, + # which is liable to cause dependency collisions. It's not necessary to test all these + # snippets in all python versions since we are testing the core code exercised by the + # snippets against all supported python versions. + unsupported_python_versions=AvailablePythonVersion.get_all_except_default(), ), PackageSpec( "examples/docs_beta_snippets", + # The docs_snippets test suite also installs a ton of packages in the same environment, + # which is liable to cause dependency collisions. It's not necessary to test all these + # snippets in all python versions since we are testing the core code exercised by the + # snippets against all supported python versions. + unsupported_python_versions=AvailablePythonVersion.get_all_except_default(), pytest_tox_factors=["all", "integrations"], ), PackageSpec( @@ -307,6 +313,9 @@ def k8s_extra_cmds(version: str, _) -> List[str]: ), PackageSpec( "examples/with_great_expectations", + unsupported_python_versions=[ + AvailablePythonVersion.V3_9, + ], ), PackageSpec( "examples/with_pyspark", @@ -361,9 +370,20 @@ def k8s_extra_cmds(version: str, _) -> List[str]: PackageSpec( "examples/experimental/dagster-airlift", ), + # Runs against live dbt cloud instance, we only want to run on commits and on the + # nightly build PackageSpec( "examples/experimental/dagster-airlift/examples/dbt-example", - always_run_if=has_dagster_airlift_changes, + skip_if=skip_if_not_airlift_or_dlift_commit, + env_vars=[ + "KS_DBT_CLOUD_ACCOUNT_ID", + "KS_DBT_CLOUD_PROJECT_ID", + "KS_DBT_CLOUD_TOKEN", + "KS_DBT_CLOUD_ACCESS_URL", + "KS_DBT_CLOUD_DISCOVERY_API_URL", + ], + timeout_in_minutes=30, + queue=BuildkiteQueue.DOCKER, ), PackageSpec( "examples/experimental/dagster-airlift/examples/perf-harness", @@ -377,6 +397,24 @@ def k8s_extra_cmds(version: str, _) -> List[str]: "examples/experimental/dagster-airlift/examples/kitchen-sink", always_run_if=has_dagster_airlift_changes, ), + PackageSpec( + "examples/experimental/dagster-dlift", + name="dlift", + ), + # Runs against live dbt cloud instance, we only want to run on commits and on the + # nightly build + PackageSpec( + "examples/experimental/dagster-dlift/kitchen-sink", + skip_if=skip_if_not_dlift_commit, + name="dlift-live", + env_vars=[ + "KS_DBT_CLOUD_ACCOUNT_ID", + "KS_DBT_CLOUD_PROJECT_ID", + "KS_DBT_CLOUD_TOKEN", + "KS_DBT_CLOUD_ACCESS_URL", + "KS_DBT_CLOUD_DISCOVERY_API_URL", + ], + ), ] @@ -418,7 +456,10 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]: LIBRARY_PACKAGES_WITH_CUSTOM_CONFIG: List[PackageSpec] = [ PackageSpec( "python_modules/automation", - unsupported_python_versions=[AvailablePythonVersion.V3_12], + # automation is internal code that doesn't need to be tested in every python version. The + # test suite also installs a ton of packages in the same environment, which is liable to + # cause dependency collisions. + unsupported_python_versions=AvailablePythonVersion.get_all_except_default(), ), PackageSpec("python_modules/dagster-webserver", pytest_extra_cmds=ui_extra_cmds), PackageSpec( @@ -428,8 +469,7 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]: "api_tests", "asset_defs_tests", "cli_tests", - "core_tests_pydantic1", - "core_tests_pydantic2", + "core_tests", "daemon_sensor_tests", "daemon_tests", "definitions_tests", @@ -437,8 +477,7 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]: "general_tests_old_protobuf", "launcher_tests", "logging_tests", - "model_tests_pydantic1", - "model_tests_pydantic2", + "model_tests", "scheduler_tests", "storage_tests", "storage_tests_sqlalchemy_1_3", @@ -496,16 +535,12 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]: "python_modules/libraries/dagster-dbt", pytest_tox_factors=[ f"{deps_factor}-{command_factor}" - for deps_factor in ["dbt17", "dbt18", "pydantic1"] + for deps_factor in ["dbt17", "dbt18"] for command_factor in ["cloud", "core-main", "core-derived-metadata"] ], ), PackageSpec( "python_modules/libraries/dagster-snowflake", - pytest_tox_factors=[ - "pydantic1", - "pydantic2", - ], env_vars=["SNOWFLAKE_ACCOUNT", "SNOWFLAKE_USER", "SNOWFLAKE_PASSWORD"], ), PackageSpec( @@ -560,10 +595,6 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]: ), PackageSpec( "python_modules/libraries/dagster-databricks", - pytest_tox_factors=[ - "pydantic1", - "pydantic2", - ], ), PackageSpec( "python_modules/libraries/dagster-docker", @@ -640,6 +671,9 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]: ), PackageSpec( "python_modules/libraries/dagster-ge", + unsupported_python_versions=[ + AvailablePythonVersion.V3_9, + ], ), PackageSpec( "python_modules/libraries/dagster-k8s", diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/test_project.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/test_project.py index dc73185d23cf2..bbdcd26bf43a5 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/test_project.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/test_project.py @@ -49,16 +49,16 @@ def build_test_project_steps() -> List[GroupStep]: " $${GOOGLE_APPLICATION_CREDENTIALS}", "export" " BASE_IMAGE=$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com/test-project-base:py" - + version + + version.value + "-" + TEST_PROJECT_BASE_IMAGE_VERSION, # build and tag test image "export" " TEST_PROJECT_IMAGE=$${AWS_ACCOUNT_ID}.dkr.ecr.us-west-2.amazonaws.com/test-project:$${BUILDKITE_BUILD_ID}-" - + version, + + version.value, "git config --global --add safe.directory /workdir", "./python_modules/dagster-test/dagster_test/test_project/build.sh " - + version + + version.value + " $${TEST_PROJECT_IMAGE}", # # push the built image @@ -67,7 +67,7 @@ def build_test_project_steps() -> List[GroupStep]: ) .on_python_image( # py version can be bumped when rebuilt - f"buildkite-build-test-project-image:py{AvailablePythonVersion.V3_8}-{BUILDKITE_BUILD_TEST_PROJECT_IMAGE_IMAGE_VERSION}", + f"buildkite-build-test-project-image:py{AvailablePythonVersion.V3_11.value}-{BUILDKITE_BUILD_TEST_PROJECT_IMAGE_IMAGE_VERSION}", [ "AIRFLOW_HOME", "AWS_ACCOUNT_ID", diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/utils.py b/.buildkite/dagster-buildkite/dagster_buildkite/utils.py index 9b1bd174d89b6..d83b5d67a7785 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/utils.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/utils.py @@ -100,7 +100,7 @@ class GroupStep(TypedDict): BuildkiteLeafStep = Union[CommandStep, TriggerStep, WaitStep] BuildkiteTopLevelStep = Union[CommandStep, GroupStep] -UV_PIN = "uv==0.4.8" +UV_PIN = "uv==0.4.30" def is_command_step(step: BuildkiteStep) -> TypeGuard[CommandStep]: @@ -307,6 +307,19 @@ def has_dagster_airlift_changes(): return any("dagster-airlift" in str(path) for path in ChangedFiles.all) +@functools.lru_cache(maxsize=None) +def skip_if_not_airlift_or_dlift_commit() -> Optional[str]: + """If no dlift or airlift files are touched, then do NOT run. Even if on master.""" + return ( + None + if ( + any("dagster-dlift" in str(path) for path in ChangedFiles.all) + or any("dagster-airlift" in str(path) for path in ChangedFiles.all) + ) + else "Not an airlift or dlift commit" + ) + + @functools.lru_cache(maxsize=None) def has_storage_test_fixture_changes(): # Attempt to ensure that changes to TestRunStorage and TestEventLogStorage suites trigger integration @@ -316,6 +329,15 @@ def has_storage_test_fixture_changes(): ) +def skip_if_not_dlift_commit() -> Optional[str]: + """If no dlift files are touched, then do NOT run. Even if on master.""" + return ( + None + if any("dagster-dlift" in str(path) for path in ChangedFiles.all) + else "Not a dlift commit" + ) + + def skip_if_no_helm_changes(): if message_contains("NO_SKIP"): return None diff --git a/.buildkite/dagster-buildkite/setup.py b/.buildkite/dagster-buildkite/setup.py index e4cef3a49d6b3..bcbc3c4b08c2e 100644 --- a/.buildkite/dagster-buildkite/setup.py +++ b/.buildkite/dagster-buildkite/setup.py @@ -9,7 +9,6 @@ description="Tools for buildkite automation", url="https://github.com/dagster-io/dagster/tree/master/.buildkite/dagster-buildkite", classifiers=[ - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "License :: OSI Approved :: Apache Software License", diff --git a/CHANGES.md b/CHANGES.md index 5f29c24acda2f..b5622a756c1d9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,182 @@ # Changelog +## 1.9.1 (core) / 0.25.1 (libraries) + +### New + +- `dagster project scaffold` now has an option to create dagster projects from templates with excluded files/filepaths. +- [ui] Filters in the asset catalog now persist when navigating subdirectories. +- [ui] The Run page now displays the partition(s) a run was for. +- [ui] Filtering on owners/groups/tags is now case-insensitive. +- [dagster-tableau] the helper function `parse_tableau_external_and_materializable_asset_specs` is now available to parse a list of Tableau asset specs into a list of external asset specs and materializable asset specs. +- [dagster-looker] Looker assets now by default have owner and URL metadata. +- [dagster-k8s] Added a per_step_k8s_config configuration option to the k8s_job_executor, allowing the k8s configuration of individual steps to be configured at run launch time (thanks @Kuhlwein!) +- [dagster-fivetran] Introduced `DagsterFivetranTranslator` to customize assets loaded from Fivetran. +- [dagster-snowflake] `dagster_snowflake.fetch_last_updated_timestamps` now supports ignoring tables not found in Snowflake instead of raising an error. + +### Bugfixes + +- Fixed issue which would cause a `default_automation_condition_sensor` to be constructed for user code servers running on dagster version < 1.9.0 even if the legacy `auto_materialize: use_sensors` configuration setting was set to `False`. +- Fixed an issue where running `dagster instance migrate` on Dagster version 1.9.0 constructed a SQL query that exceeded the maximum allowed depth. +- Fixed an issue where wiping a dynamically partitioned asset causes an error. +- [dagster-polars] `ImportError`s are no longer raised when bigquery libraries are not installed [#25708] + +### Documentation + +- [dagster-dbt] A guide on how to use dbt defer with Dagster branch deployments has been added to the dbt reference. + +# 1.9.0 (core) / 0.25.0 (libraries) + +## Major changes since 1.8.0 (core) / 0.24.0 (libraries) + +### Automation + +- Declarative Automation, the system which enables setting per-asset `AutomationConditions`, is no longer experimental. We now recommend using this system in all cases where asset-centric orchestration is desired. A suite of built-in static constructors have been added for common usecases, such as `AutomationCondition.on_missing()` (which can fill in missing partitions of assets as soon as upstream data is available), and `AutomationCondition.all_deps_blocking_checks_passed()` (which can prevent materialization of assets until all upstream blocking checks have passed). +- You can now assign `AutomationConditions` to asset checks, via the `automation_condition` parameter on `@asset_check` or `AssetCheckSpec`. +- You can now assign `AutomationConditions` to observable source assets, via the `automation_condition` parameter on `@observable_source_asset`. +- [experimental] You can now define custom subclasses of `AutomationCondition` to execute arbitrary Python code in the context of a broader expression. This allows you to compose built-in conditions with custom business logic. +- The `target` arguments on schedules and sensors are now marked stable, allowing a stable way for schedules and sensors to target asset selections without needing to define a job. + +### Integrations + +- Introduced a slate of integrations with business intelligence (BI) tools, enabling dashboards, views, and reports to be represented in the Dagster asset graph. + - [Looker](https://docs.dagster.io/integrations/looker) + - [Power BI](https://docs.dagster.io/integrations/powerbi) + - [Sigma](https://docs.dagster.io/integrations/sigma) + - [Tableau](https://docs.dagster.io/integrations/tableau) +- A rich set of metadata is now automatically collected by our suite of ELT integrations. + - The `dagster/table_name` metadata tag, containing the fully-qualified name of the destination model, has been added for Airbyte, dlt, Fivetran and Sling assets. + - The `dagster/row_count` metadata tag, containing the number of records loaded in the corresponding run, has been added for dlt and Sling assets. + - The `dagster/column_schema` metadata tag, containing column schema information of the destination tables, has been added for Fivetran assets. + - Column lineage information is now collected for Sling assets. +- [dagster-pipes](https://docs.dagster.io/concepts/dagster-pipes) are replacing the now deprecated Step Launchers as the new recommended approach for executing remote Spark jobs. Three new [Pipes clients](https://docs.dagster.io/_apidocs/libraries/dagster-aws#clients) for running Spark applications on Amazon Web Services have been added: + - `dagster_aws.pipes.PipesGlueClient` + - `dagster_aws.pipes.PipesEMRServerlessClient` + - `dagster_aws.pipes.PipesEMRClient` + +### UI + +- Several changes have been made to the information architecture to make it easier to find what you’re looking for: + - Backfills have been moved from their own tab underneath the Overview page to entries within the table on the Runs page. This reflects the fact that backfills and runs are similar entities that share most properties. You can continue to use the legacy Runs page with the “Revert to legacy Runs page” user setting. ([GitHub Discussion](https://github.com/dagster-io/dagster/discussions/24898)) + - “Jobs” is now a page reachable from the top-level navigation pane. It replaces the Jobs tab within the Overview page. + - “Automations” is now a page reachable from the top-level navigation pane. It replaces the schedule and sensor tabs within the Overview page. +- `@asset` and `AssetSpec` now have a `kinds` attribute that enables specifying labels that show up on asset nodes in the asset graph in the UI. This supersedes the `compute_kind` attribute. + +## Changes since 1.8.13 (core) / 0.24.13 (libraries) + +### New + +- The `tags` parameter to `@asset` and `AssetSpec` is no longer marked as experimental. +- The `@observable_source_asset` decorator now supports an `automation_condition` argument. +- `AutomationCondition` and associated APIs are no longer marked as experimental. +- Added a new `use_user_code_server` parameter to `AutomationConditionSensorDefinition`. If set, the sensor will be evaluated in the user code server (as traditional sensors are), allowing custom `AutomationCondition` subclasses to be evaluated. +- Added a new column to the BulkActions table, a new column to the Runs table, and a new BackfillTags table to improve the performance of the Runs page. To take advantage of these performance improvements, run `dagster instance migrate`. This migration involves a schema migration to add the new columns and table, and a data migration to populate the new columns for historical backfills and runs. +- Performance improvements when loading definitions with multi-assets with many asset keys. +- [ui] The previously-experimental changes to the top nav are now enabled for all users. +- [ui] Added new code location pages which provide information regarding library versions, metadata, and definitions. +- [ui] The new version of the Runs page is now enabled by default. To use the legacy version of the Runs page, toggle the "Revert to legacy Runs page" user setting. +- [ui] Clicking an asset with failed partitions on the asset health overview now takes you to a list of the failed partitions. +- [ui] The Materialize button runs pre-flight checks more efficiently, resulting in faster run launch times. +- [dagster-pipes] Added support for multi-container log streaming (thanks, [@MattyKuzyk](https://github.com/MattyKuzyk)!) +- [dagster-docker] `container_kwargs.stop_timeout` can now be set when using the `DockerRunLauncher` or `docker_executor` to configure the amount of time that Docker will wait when terminating a run for it to clean up before forcibly stopping it with a SIGKILL signal. +- [dagster-dbt] Performance improvements when loading definitions using `build_dbt_asset_selection`. + +### Bugfixes + +- [ui] Fixed redirect behavior on full pageloads of the legacy auto-materialize overview page. +- [ui] Plots for assets that emit materialization and observation events at different rates no longer display a time period missing the more frequent event type. +- [ui] Fixed issue causing scrolling to misbehave on the concurrency settings page. +- [helm] The blockOpConcurrencyLimitedRuns section of queuedRunCoordinator now correctly templates the appropriate config. +- [dagster-pipes] Fixed issue where k8s ops would fail after 4 hours (thanks, [@MattyKuzyk](https://github.com/MattyKuzyk)!) + +### Documentation + +- [dagster-dbt] Added guide for using dbt defer with Dagster branch deployments. +- [docs] Step Launchers documentation has been removed and replaced with references to Dagster Pipes. +- [docs] Fixed code example in Dagster Essentials (thanks, [@aleexharris](https://github.com/aleexharris)!) + +### Breaking Changes + +- `dagster` no longer supports Python 3.8, which hit EOL on 2024-10-07. +- `dagster` now requires `pydantic>=2`. +- By default, `AutomationConditionSensorDefinitions` will now emit backfills to handle cases where more than one partition of an asset is requested on a given tick. This allows that asset's `BackfillPolicy` to be respected. This feature can be disabled by setting `allow_backfills` to `False`. +- Passing a custom `PartitionsDefinition` subclass into a `Definitions` object now issues an error instead of a deprecation warning. +- `AssetExecutionContext` is no longer a subclass of `OpExecutionContext`. At this release, `AssetExecutionContext` and `OpExecutionContext` implement the same methods, but in the future, the methods implemented by each class may diverge. If you have written helper functions with `OpExecutionContext` type annotations, they may need to be updated to include `AssetExecutionContext` depending on your usage. Explicit calls to `isinstance(context, OpExecutionContext)` will now fail if `context` is an `AssetExecutionContext`. +- The `asset_selection` parameter on `AutomationConditionSensorDefinition` has been renamed to `target`, to align with existing sensor APIs. +- The experimental `freshness_policy_sensor` has been removed, as it relies on the long-deprecated `FreshnessPolicy` API. +- The deprecated `external_assets_from_specs` and `external_asset_from_spec` methods have been removed. Users should use `AssetsDefinition(specs=[...])`, or pass specs directly into the `Definitions` object instead. +- `AssetKey` objects can no longer be iterated over or indexed in to. This behavior was never an intended access pattern and in all observed cases was a mistake. +- The `dagster/relation_identifier` metadata key has been renamed to `dagster/table_name`. +- [dagster-ge] `dagster-ge` now only supports `great_expectations>=0.17.15`. The `ge_validation_op_factory` API has been replaced with the API previously called `ge_validation_op_factory_v3`. +- [dagster-aws] Removed deprecated parameters from `dagster_aws.pipes.PipesGlueClient.run`. +- [dagster-embedded-elt] Removed deprecated parameter `dlt_dagster_translator` from `@dlt_assets`. The `dagster_dlt_translator` parameter should be used instead. +- [dagster-polars] Dropped support for saving storage-level arbitrary metadata via IOManagers. + +### Deprecations + +- The `DataBricksPysparkStepLauncher`, `EmrPySparkStepLauncher`, and any custom subclass of `StepLauncher` have been marked as deprecated, but will not be removed from the codebase until Dagster 2.0 is released, meaning they will continue to function as they currently do for the foreseeable future. Their functionality has been superseded by the interfaces provided by `dagster-pipes`, and so future development work will be focused there. +- The experimental `multi_asset_sensor` has been marked as deprecated, as its main use cases have been superseded by the `AutomationCondition` APIs. However, it will not be removed until version 2.0.0. + +## 1.8.13 (core) / 0.24.13 (libraries) + +### New + +- Performance improvements when loading code locations using multi-assets with many asset keys. +- `AutomationCondition.in_progress()` now will be true if an asset partition is part of an in-progress backfill that has not yet executed it. The prior behavior, which only considered runs, is encapsulated in `AutomationCondition.execution_in_progress()`. +- [ui] Added tag filter to the jobs page. +- [ui] Preserve user login state for a longer period of time. +- [dagster-dbt] Performance improvements when loading definitions using `build_dbt_asset_selection`. +- [dagster-docker] `container_kwargs.stop_timeout` can now be set when using the `DockerRunLauncher` or `docker_executor` to configure the amount of time that Docker will wait when terminating a run for it to clean up before forcibly stopping it with a SIGKILL signal. +- [dagster-sigma] The Sigma integration now fetches initial API responses in parallel, speeding up initial load. +- [dagster-looker] Attempt to naively render liquid templates for derived table sql. +- [dagster-looker] Added support for views and explores that rely on refinements or extends. +- [dagster-looker] When fetching explores and dashboards from the Looker API, retrieve in parallel. + +### Bugfixes + +- Fixed an issue with `AutomationCondition.eager()` that could cause it to attempt to launch a second attempt of an asset in cases where it was skipped or failed during a run where one of its parents successfully materialized. +- Fixed an issue which would cause `AutomationConditionSensorDefinitions` to not be evaluated if the `use_user_code_server` value was toggled after the initial evaluation. +- Fixed an issue where configuration values for aliased pydantic fields would be dropped. +- [ui] Fix an issue in the code locations page where invalid query parameters could crash the page. +- [ui] Fix navigation between deployments when query parameters are present in the URL. +- [helm] the blockOpConcurrencyLimitedRuns section of queuedRunCoordinator now correctly templates the appropriate config. +- [dagster-sigma] Fixed pulling incomplete data for very large workspaces. + +## 1.8.12 (core) / 0.24.12 (libraries) + +### New + +- The `AutomationCondition.eager()`, `AutomationCondition.missing()`, and `AutomationCondition.on_cron` conditions are now compatible with asset checks. +- Added `AssetSelection.materializable()`, which returns only assets that are materializable in an existing selection. +- Added a new `AutomationCondition.all_deps_blocking_checks_passed` condition, which can be used to prevent materialization when any upstream blocking checks have failed. +- Added a `code_version` parameter to the `@graph_asset` decorator. +- If a `LaunchPartitionBackfill` mutation is submitted to GQL with invalid partition keys, it will now return an early `PartitionKeysNotFoundError`. +- `AssetSelection.checks_for_assets` now accepts `AssetKey`s and string asset keys, in addition to `AssetsDefinition`s. +- [ui] Added a search bar to partitions tab on the asset details page. +- [ui] Restored docked left nav behavior for wide viewports. +- [dagster-aws] `get_objects` now has a `since_last_modified` that enables only fetching objects modified after a given timestamp. +- [dagster-aws] New AWS EMR Dagster Pipes client (`dagster_aws.pipes.PipesEMRCLient` ) for running and monitoring AWS EMR jobs from Dagster. +- [dagster-looker] Pinned the looker-sdk dependency below 24.18.0 to avoid this issue: https://github.com/looker-open-source/sdk-codegen/issues/1518. + +### Bugfixes + +- Fixed an issue which could cause incorrect evaluation results when using self-dependent partition mappings with `AutomationConditions` that operate over dependencies. +- [ui] Fixed an issue where the breadcumb on asset pages would flicker nonstop. +- [dagster-embedded-elt] Fixed extraction of metadata for dlt assets whose source and destination identifiers differ. +- [dagster-databricks] Fixed a permissioning gap that existed with the `DatabricksPySparkStepLauncher`, so that permissions are now set correctly for non-admin users. +- [dagster-dbt] Fixed an issue where column metadata generated with `fetch_column_metadata` did not work properly for models imported through dbt dependencies. + +### Documentation + +- [dagster-k8s] `DagsterK8sPipesClient.run` now shows up in API docs. + +### Dagster Plus + +- [ui] Fixed a bug in the catalog UI where owners filters were not applied correctly. +- [ui] Fixed width of the column lineage dropdown selector on the asset page. +- [ui] Column lineage now correctly renders when set on asset definition metadata +- [ui] Fixed Settings link on the list of deployments, for users in the legacy navigation flag. + ## 1.8.11 (core) / 0.24.11 (libraries) ### New @@ -5770,7 +5947,7 @@ runLauncher: - [dagit] A “Copy config” button has been added to the run configuration dialog on Run pages. - [dagit] An “Open in Launchpad” button has been added to the run details page. - [dagit] The Run page now surfaces more information about start time and elapsed time in the header. -- [dagster-dbt] The dbt_cloud_resource has a new `get_runs()` function to get a list of runs matching certain paramters from the dbt Cloud API (thanks @[kstennettlull](https://github.com/kstennettlull)!) +- [dagster-dbt] The dbt_cloud_resource has a new `get_runs()` function to get a list of runs matching certain parameters from the dbt Cloud API (thanks @[kstennettlull](https://github.com/kstennettlull)!) - [dagster-snowflake] Added an `authenticator` field to the connection arguments for the `snowflake_resource` (thanks @swotai!). - [celery-docker] The celery docker executor has a new configuration entry `container_kwargs` that allows you to specify additional arguments to pass to your docker containers when they are run. diff --git a/MIGRATION.md b/MIGRATION.md index 519920c737cdd..080c365363019 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -2,6 +2,37 @@ When new releases include breaking changes or deprecations, this document describes how to migrate. +## Migrating to 1.9.0 + +## Database migration + +- This release includes database schema and data migrations to improve the performance of the Runs page. We highly recommend running these migrations to avoid slow page loads of the new Runs page. The migration will add a new column to the `runs` table, a new column to the `bulk_actions` table and a new `backfill_tags` table. A data migration will populate the new columns and table. Run `dagster instance migrate` to run the schema and data migration. + +## Notable behavior changes + +- Backfills have been moved from their own tab underneath the Overview page to entries within the table on the Runs page. This reflects the fact that backfills and runs are similar entities that share most properties. You can continue to use the legacy Runs page with the “Revert to legacy Runs page” user setting. ([GitHub Discussion](https://github.com/dagster-io/dagster/discussions/24898)) +- By default, `AutomationConditionSensorDefinitions` will now emit backfills to handle cases where more than one partition of an asset is requested on a given tick. This allows that asset's `BackfillPolicy` to be respected. This feature can be disabled by setting `allow_backfills` to `False` on the sensor definition. + +## Deprecations + +- The `DataBricksPysparkStepLauncher`, `EmrPySparkStepLauncher`, and any custom subclass of `StepLauncher` have been marked as deprecated, but will not be removed from the codebase until Dagster 2.0 is released, meaning they will continue to function as they currently do for the foreseeable future. Their functionality has been superseded by the interfaces provided by `dagster-pipes`, and so future development work will be focused there. +- The experimental `@multi_asset_sensor` has been marked as deprecated, but will not be removed from the codebase until Dagster 2.0 is released, meaning it will continue to function as it currently does for the foreseeable future. Its functionality has been largely superseded by the `AutomationCondition` system. + +## Breaking changes + +- `dagster` no longer supports Python 3.8, which hit EOL on 2024-10-07. +- `dagster` now requires `pydantic>=2` . +- Passing a custom `PartitionsDefinition` subclass into a `Definitions` object now issues an error instead of a deprecation warning. +- `AssetExecutionContext` is no longer a subclass of `OpExecutionContext`. At this release, `AssetExecutionContext` and `OpExecutionContext` implement the same methods, but in the future, the methods implemented by each class may diverge. If you have written helper functions with `OpExecutionContext` type annotations, they may need to be updated to include `AssetExecutionContext` depending on your usage. Explicit calls to `isinstance(context, OpExecutionContext)` will now fail if `context` is an `AssetExecutionContext`. +- The `dagster/relation_identifier` metadata key has been renamed to `dagster/table_name`. +- The `asset_selection` parameter on `AutomationConditionSensorDefinition` has been renamed to `target`, to align with existing sensor APIs. +- The experimental `freshness_policy_sensor` has been removed, as it relies on the long-deprecated `FreshnessPolicy` API. +- The deprecated `external_assets_from_specs` and `external_asset_from_spec` methods have been removed. Users should use `AssetsDefinition(specs=[...])`, or pass specs directly into the `Definitions` object instead. +- `AssetKey` objects can no longer be iterated over or indexed in to. This behavior was never an intended access pattern and in all observed cases was a mistake. +- [dagster-ge] `dagster-ge` now only supports `great_expectations>=0.17.15`. The `ge_validation_op_factory` API has been replaced with the API previously called `ge_validation_op_factory_v3`. +- [dagster-aws] Removed deprecated parameters from `dagster_aws.pipes.PipesGlueClient.run`. +- [dagster-embedded-elt] Removed deprecated parameter `dlt_dagster_translator` from `@dlt_assets`. The `dagster_dlt_translator` parameter should be used instead. + ## Migrating to 1.8.0 ### Notable behavior changes diff --git a/Makefile b/Makefile index 31adeb0c25445..1be48b8703620 100644 --- a/Makefile +++ b/Makefile @@ -51,13 +51,13 @@ prettier: ':!:README.md'` --write install_dev_python_modules: - python scripts/install_dev_python_modules.py -qqq + python scripts/install_dev_python_modules.py -q install_dev_python_modules_verbose: python scripts/install_dev_python_modules.py install_dev_python_modules_verbose_m1: - python scripts/install_dev_python_modules.py -qqq --include-prebuilt-grpcio-wheel + python scripts/install_dev_python_modules.py -q --include-prebuilt-grpcio-wheel graphql: cd js_modules/dagster-ui/; make generate-graphql; make generate-perms diff --git a/azure-pipelines.yml b/azure-pipelines.yml index e8acffd0f9c5c..25a540a540bf1 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -14,7 +14,6 @@ parameters: default: - api_tests - cli_tests - - core_tests_pydantic1 - general_tests - launcher_tests - daemon_tests diff --git a/docs/content/_apidocs.mdx b/docs/content/_apidocs.mdx index b89795bba5629..1436b4398b6e5 100644 --- a/docs/content/_apidocs.mdx +++ b/docs/content/_apidocs.mdx @@ -610,6 +610,16 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi Includes implementations of run and event log storage built on Postgres. +
dagster-powerbi
)
+ dagster-sigma
)
+ dagster-tableau
)
+ |
(pipe)
A | B
+ OR; either condition is true; ex: A | B
&
(ampersand)
A & B
+ AND; both conditions are true; ex: A & B
dagster/relation_identifier
+ dagster/table_name
AutoMaterializePolicy
s. However,
- this example provides a starting point for those wishing to customize this
- behavior beyond what the pre-built sensor supports.
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
- -![Peered asset in Dagster UI](./images/peer.svg) - -
- -If we kick off a run of the `rebuild_customers_list` DAG in Airflow, we should see the corresponding asset materialize in Dagster. - -- -![Materialized peer asset in Dagster UI](./images/peer_materialize.svg) - -
- -_Note: When the code location loads, Dagster will query the Airflow REST API in order to build a representation of your DAGs. In order for Dagster to reflect changes to your DAGs, you will need to reload your code location._ - -- -![Observed asset graph in Dagster](./images/observe.svg) - -
- -Kicking off a run of the DAG in Airflow, you should see the newly created assets materialize in Dagster as each task completes. - -_Note: There will be some delay between task completion and assets materializing in Dagster, managed by the sensor. This sensor runs every 30 seconds by default (you can reduce down to one second via the `minimum_interval_seconds` argument to `sensor`), so there will be some delay._ - -## Migrating Assets - -Once you have created corresponding definitions in Dagster to your Airflow tasks, you can proxy execution to Dagster on a per-task basis while Airflow is still controlling scheduling and orchestration. -Once a task has been proxied, Airflow will kick off materializations of corresponding Dagster assets in place of executing the business logic of that task. - -To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a `proxied_state` folder, and in it create a yaml file with the same name as your DAG. The included example at [`airflow_dags/proxied_state`](./tutorial_example/airflow_dags/proxied_state) is used by `make airflow_run`, and can be used as a template for your own proxied state files. - -Given our example DAG `rebuild_customers_list` with three tasks, `load_raw_customers`, `run_dbt_model`, and `export_customers`, [`proxied_state/rebuild_customers_list.yaml`](./tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml) should look like the following: - -```yaml -# tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml -tasks: - - id: load_raw_customers - proxied: False - - id: build_dbt_models - proxied: False - - id: export_customers - proxied: False -``` - -Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG: - -```python -# tutorial_example/snippets/dags_truncated.py -# Dags file can be found at tutorial_example/airflow_dags/dags.py -from pathlib import Path - -from airflow import DAG -from dagster_airlift.in_airflow import proxying_to_dagster -from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml - -dag = DAG("rebuild_customers_list", ...) - -... - -# Set this to True to begin the proxying process -PROXYING = False - -if PROXYING: - proxying_to_dagster( - global_vars=globals(), - proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), - ) -``` - -Set `PROXYING` to `True` or eliminate the `if` statement. - -The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.) - -- -![Migration state rendering in Airflow UI](./images/state_in_airflow.png) - -
- -### Migrating individual tasks - -In order to proxy a task, you must do two things: - -1. First, ensure all associated assets are executable in Dagster by providing asset definitions in place of bare asset specs. -2. The `proxied: False` status in the `proxied_state` YAML folder must be adjusted to `proxied: True`. - -Any task marked as proxied will use the `DefaultProxyToDagsterOperator` when executed as part of the DAG. This operator will use the Dagster GraphQL API to initiate a Dagster run of the assets corresponding to the task. - -The proxied file acts as the source of truth for proxied state. The information is attached to the DAG and then accessed by Dagster via the REST API. - -A task which has been proxied can be easily toggled back to run in Airflow (for example, if a bug in implementation was encountered) simply by editing the file to `proxied: False`. - -#### Supporting custom authorization - -If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. -`proxying_to_dagster` can take a parameter `dagster_operator_klass`, which allows you to define a custom `BaseProxyToDagsterOperator` class. This allows you to -override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow `Variable` called `my_api_key`. -We can create a custom `BaseProxyToDagsterOperator` subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API -will be made using that api key. - -```python -# tutorial_example/snippets/custom_operator_examples/custom_proxy.py -from pathlib import Path - -import requests -from airflow import DAG -from airflow.utils.context import Context -from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster -from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml - - -class CustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator): - def get_dagster_session(self, context: Context) -> requests.Session: - if "var" not in context: - raise ValueError("No variables found in context") - api_key = context["var"]["value"].get("my_api_key") - session = requests.Session() - session.headers.update({"Authorization": f"Bearer {api_key}"}) - return session - - def get_dagster_url(self, context: Context) -> str: - return "https://dagster.example.com/" - - -dag = DAG( - dag_id="custom_proxy_example", -) - -# At the end of your dag file -proxying_to_dagster( - global_vars=globals(), - proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), - build_from_task_fn=CustomProxyToDagsterOperator.build_from_task, -) -``` - -#### Dagster Plus Authorization - -You can use a customer proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as -Airflow Variables. To set a Dagster+ user token, follow this guide: https://docs.dagster.io/dagster-plus/account/managing-user-agent-tokens#managing-user-tokens. - -```python -# tutorial_example/snippets/custom_operator_examples/plus_proxy_operator.py -import requests -from airflow.utils.context import Context -from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator - - -class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator): - def get_variable(self, context: Context, var_name: str) -> str: - if "var" not in context: - raise ValueError("No variables found in context") - return context["var"]["value"][var_name] - - def get_dagster_session(self, context: Context) -> requests.Session: - dagster_cloud_user_token = self.get_variable(context, "dagster_cloud_user_token") - session = requests.Session() - session.headers.update({"Dagster-Cloud-Api-Token": dagster_cloud_user_token}) - return session - - def get_dagster_url(self, context: Context) -> str: - org_name = self.get_variable(context, "dagster_plus_organization_name") - deployment_name = self.get_variable(context, "dagster_plus_deployment_name") - return f"https://{org_name}.dagster.plus/{deployment_name}" -``` - -#### Migrating common operators - -For some common operator patterns, like our dbt operator, Dagster supplies factories to build software defined assets for our tasks. In fact, the `@dbt_assets` decorator used earlier already backs its assets with definitions, so we can toggle the proxied state of the `build_dbt_models` task to `proxied: True` in the proxied state file: - -```yaml -# tutorial_example/snippets/dbt_proxied.yaml -tasks: - - id: load_raw_customers - proxied: False - - id: build_dbt_models - proxied: True - - id: export_customers - proxied: False -``` - -**Important**: It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting `dagster dev`. - -You can now run the `rebuild_customers_list` DAG in Airflow, and the `build_dbt_models` task will be executed in a Dagster run: - -- -![dbt build executing in Dagster](./images/proxied_dag.png) - -
- -You'll note that we proxied a task in the _middle_ of the Airflow DAG. The Airflow DAG structure and execution history is stable in the Airflow UI, but execution of `build_dbt_models` has moved to Dagster. - -#### Migrating the remaining custom operators - -For all other operator types, we will need to build our own asset definitions. We recommend creating a factory function whose arguments match the inputs to your Airflow operator. Then, you can use this factory to build definitions for each Airflow task. - -For example, our `load_raw_customers` task uses a custom `LoadCSVToDuckDB` operator. We'll define a function `load_csv_to_duckdb_defs` factory to build corresponding software-defined assets. Similarly for `export_customers` we'll define a function `export_duckdb_to_csv_defs` to build SDAs: - -```python -# tutorial_example/dagster_defs/stages/migrate.py -import os -from pathlib import Path - -from dagster import ( - AssetExecutionContext, - AssetsDefinition, - AssetSpec, - Definitions, - materialize, - multi_asset, -) -from dagster_airlift.core import ( - AirflowInstance, - BasicAuthBackend, - assets_with_task_mappings, - build_defs_from_airflow_instance, -) -from dagster_dbt import DbtCliResource, DbtProject, dbt_assets - -# Code also invoked from Airflow -from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv -from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb - - -def dbt_project_path() -> Path: - env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") - assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" - return Path(env_val) - - -def airflow_dags_path() -> Path: - return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags" - - -def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition: - @multi_asset(name=f"load_{args.table_name}", specs=[spec]) - def _multi_asset() -> None: - load_csv_to_duckdb(args) - - return _multi_asset - - -def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition: - @multi_asset(name=f"export_{args.table_name}", specs=[spec]) - def _multi_asset() -> None: - export_duckdb_to_csv(args) - - return _multi_asset - - -@dbt_assets( - manifest=dbt_project_path() / "target" / "manifest.json", - project=DbtProject(dbt_project_path()), -) -def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): - yield from dbt.cli(["build"], context=context).stream() - - -mapped_assets = assets_with_task_mappings( - dag_id="rebuild_customers_list", - task_mappings={ - "load_raw_customers": [ - load_csv_to_duckdb_asset( - AssetSpec(key=["raw_data", "raw_customers"]), - LoadCsvToDuckDbArgs( - table_name="raw_customers", - csv_path=airflow_dags_path() / "raw_customers.csv", - duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", - names=["id", "first_name", "last_name"], - duckdb_schema="raw_data", - duckdb_database_name="jaffle_shop", - ), - ) - ], - "build_dbt_models": - # load rich set of assets from dbt project - [dbt_project_assets], - "export_customers": [ - export_duckdb_to_csv_defs( - AssetSpec(key="customers_csv", deps=["customers"]), - ExportDuckDbToCsvArgs( - table_name="customers", - csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", - duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", - duckdb_database_name="jaffle_shop", - ), - ) - ], - }, -) - - -defs = build_defs_from_airflow_instance( - airflow_instance=AirflowInstance( - auth_backend=BasicAuthBackend( - webserver_url="http://localhost:8080", - username="admin", - password="admin", - ), - name="airflow_instance_one", - ), - defs=Definitions( - assets=mapped_assets, - resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, - ), -) -``` - -We can then toggle the proxied state of the remaining tasks in the `proxied_state` file: - -```yaml -# tutorial_example/snippets/all_proxied.yaml -tasks: - - id: load_raw_customers - proxied: True - - id: build_dbt_models - proxied: True - - id: export_customers - proxied: True -``` - -## Decomissioning an Airflow DAG - -Once we are confident in our migrated versions of the tasks, we can decommission the Airflow DAG. First, we can remove the DAG from our Airflow DAG directory. - -Next, we can strip the task associations from our Dagster definitions. This can be done by removing the `assets_with_task_mappings` call. We can use this opportunity to attach our assets to a `ScheduleDefinition` so that Dagster's scheduler can manage their execution: - -```python -# tutorial_example/dagster_defs/stages/standalone.py -import os -from pathlib import Path - -from dagster import ( - AssetCheckResult, - AssetCheckSeverity, - AssetExecutionContext, - AssetKey, - AssetsDefinition, - AssetSelection, - AssetSpec, - Definitions, - ScheduleDefinition, - asset_check, - multi_asset, -) -from dagster_dbt import DbtCliResource, DbtProject, dbt_assets - -# Code also invoked from Airflow -from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv -from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb - - -def dbt_project_path() -> Path: - env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") - assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" - return Path(env_val) - - -def airflow_dags_path() -> Path: - return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags" - - -def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition: - @multi_asset(name=f"load_{args.table_name}", specs=[spec]) - def _multi_asset() -> None: - load_csv_to_duckdb(args) - - return _multi_asset - - -def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition: - @multi_asset(name=f"export_{args.table_name}", specs=[spec]) - def _multi_asset() -> None: - export_duckdb_to_csv(args) - - return _multi_asset - - -@dbt_assets( - manifest=dbt_project_path() / "target" / "manifest.json", - project=DbtProject(dbt_project_path()), -) -def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): - yield from dbt.cli(["build"], context=context).stream() - - -assets = [ - load_csv_to_duckdb_asset( - AssetSpec(key=["raw_data", "raw_customers"]), - LoadCsvToDuckDbArgs( - table_name="raw_customers", - csv_path=airflow_dags_path() / "raw_customers.csv", - duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", - names=["id", "first_name", "last_name"], - duckdb_schema="raw_data", - duckdb_database_name="jaffle_shop", - ), - ), - dbt_project_assets, - export_duckdb_to_csv_defs( - AssetSpec(key="customers_csv", deps=["customers"]), - ExportDuckDbToCsvArgs( - table_name="customers", - csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", - duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", - duckdb_database_name="jaffle_shop", - ), - ), -] - - -@asset_check(asset=AssetKey(["customers_csv"])) -def validate_exported_csv() -> AssetCheckResult: - csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" - - if not csv_path.exists(): - return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") - - rows = len(csv_path.read_text().split("\n")) - if rows < 2: - return AssetCheckResult( - passed=False, - description=f"Export CSV {csv_path} is empty", - severity=AssetCheckSeverity.WARN, - ) - - return AssetCheckResult( - passed=True, - description=f"Export CSV {csv_path} exists", - metadata={"rows": rows}, - ) - - -rebuild_customer_list_schedule = rebuild_customers_list_schedule = ScheduleDefinition( - name="rebuild_customers_list_schedule", - target=AssetSelection.assets(*assets), - cron_schedule="0 0 * * *", -) - - -defs = Definitions( - assets=assets, - schedules=[rebuild_customer_list_schedule], - asset_checks=[validate_exported_csv], - resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, -) -``` - -## Addendum: Adding asset checks - -Once you have peered your Airflow DAGs in Dagster, regardless of migration progress, you can begin to add asset checks to your Dagster code. Asset checks can be used to validate the quality of your data assets, and can provide additional observability and value on top of your Airflow DAG even before migration starts. - -For example, given a peered version of our DAG, we can add an asset check to ensure that the final `customers` CSV output exists and has a non-zero number of rows: - -```python -# tutorial_example/dagster_defs/stages/peer_with_check.py -import os -from pathlib import Path - -from dagster import AssetCheckResult, AssetCheckSeverity, AssetKey, Definitions, asset_check -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance - - -# Attach a check to the DAG representation asset, which will be executed by Dagster -# any time the DAG is run in Airflow -@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"])) -def validate_exported_csv() -> AssetCheckResult: - csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" - - if not csv_path.exists(): - return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") - - rows = len(csv_path.read_text().split("\n")) - if rows < 2: - return AssetCheckResult( - passed=False, - description=f"Export CSV {csv_path} is empty", - severity=AssetCheckSeverity.WARN, - ) - - return AssetCheckResult( - passed=True, - description=f"Export CSV {csv_path} exists", - metadata={"rows": rows}, - ) - - -defs = Definitions.merge( - build_defs_from_airflow_instance( - airflow_instance=AirflowInstance( - # other backends available (e.g. MwaaSessionAuthBackend) - auth_backend=BasicAuthBackend( - webserver_url="http://localhost:8080", - username="admin", - password="admin", - ), - name="airflow_instance_one", - ) - ), - Definitions(asset_checks=[validate_exported_csv]), -) -``` - -Once we have introduced representations of the assets produced by our Airflow tasks, we can directly attach asset checks to these assets. These checks will run once the corresponding task completes, regardless of whether the task is executed in Airflow or Dagster. - -- -![Before DAG proxying](./images/before_dag_override.svg) -![After DAG proxying](./images/after_dag_override.svg) - -
- -When performing dag-level mapping, we don't preserve task structure in the Airflow dags. This single task will materialize all mapped Dagster assets instead of executing the original Airflow task business logic. - -We can similarly mark `proxied` back to `False`, and the original task structure and business logic will return unchanged. - -### Customizing DAG proxying operator - -Similar to how we can customize the operator we construct on a per-dag basis, we can customize the operator we construct on a per-dag basis. We can use the `build_from_dag_fn` argument of `proxying_to_dagster` to provide a custom operator in place of the default. - -For example, let's take a look at the following custom operator which expects an API key to be provided as a variable: - -```python -# tutorial_example/snippets/custom_operator_examples/custom_dag_level_proxy.py -from pathlib import Path - -import requests -from airflow import DAG -from airflow.utils.context import Context -from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster -from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml - - -class CustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator): - def get_dagster_session(self, context: Context) -> requests.Session: - if "var" not in context: - raise ValueError("No variables found in context") - api_key = context["var"]["value"].get("my_api_key") - session = requests.Session() - session.headers.update({"Authorization": f"Bearer {api_key}"}) - return session - - def get_dagster_url(self, context: Context) -> str: - return "https://dagster.example.com/" - - # This method controls how the operator is built from the dag. - @classmethod - def build_from_dag(cls, dag: DAG): - return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN") - - -dag = DAG( - dag_id="custom_dag_level_proxy_example", -) - -# At the end of your dag file -proxying_to_dagster( - global_vars=globals(), - proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), - build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag, -) -``` - -`BaseProxyDAGToDagsterOperator` has three abstract methods which must be implemented: - -- `get_dagster_session`, which controls the creation of a valid session to access the Dagster graphql API. -- `get_dagster_url`, which retrieves the domain at which the dagster webserver lives. -- `build_from_dag`, which controls how the proxying task is constructed from the provided DAG. diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/images/partitioned_mat.png b/examples/experimental/dagster-airlift/examples/tutorial-example/images/partitioned_mat.png new file mode 100644 index 0000000000000..2a50ca8eec0e3 Binary files /dev/null and b/examples/experimental/dagster-airlift/examples/tutorial-example/images/partitioned_mat.png differ diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/images/proxied_dag.png b/examples/experimental/dagster-airlift/examples/tutorial-example/images/proxied_dag.png index dbc7fbbc898e6..b50af91c37b34 100644 Binary files a/examples/experimental/dagster-airlift/examples/tutorial-example/images/proxied_dag.png and b/examples/experimental/dagster-airlift/examples/tutorial-example/images/proxied_dag.png differ diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/images/state_in_airflow.png b/examples/experimental/dagster-airlift/examples/tutorial-example/images/state_in_airflow.png index 1fa81a964c77d..d90301fc59788 100644 Binary files a/examples/experimental/dagster-airlift/examples/tutorial-example/images/state_in_airflow.png and b/examples/experimental/dagster-airlift/examples/tutorial-example/images/state_in_airflow.png differ diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/dags.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/dags.py index 975d21dec26a7..049bc5cc30c3a 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/dags.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/airflow_dags/dags.py @@ -1,12 +1,13 @@ # Define the default arguments for the DAG import os -from datetime import datetime, timedelta +from datetime import timedelta from pathlib import Path from typing import List, Optional from airflow import DAG from airflow.models.operator import BaseOperator from airflow.operators.bash import BashOperator +from dagster._time import get_current_datetime_midnight from dagster_airlift.in_airflow import proxying_to_dagster from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv @@ -78,7 +79,7 @@ def execute(self, context) -> None: default_args = { "owner": "airflow", "depends_on_past": False, - "start_date": datetime(2024, 7, 18), + "start_date": get_current_datetime_midnight(), "retries": 1, "retry_delay": timedelta(minutes=5), } @@ -87,7 +88,7 @@ def execute(self, context) -> None: dag = DAG( "rebuild_customers_list", default_args=default_args, - schedule_interval=None, + schedule="@daily", is_paused_upon_creation=False, ) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate.py index 61e3f7c8f9f5a..4e77cbcac686d 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate.py @@ -5,13 +5,15 @@ AssetExecutionContext, AssetsDefinition, AssetSpec, + DailyPartitionsDefinition, Definitions, materialize, multi_asset, ) +from dagster._time import get_current_datetime_midnight from dagster_airlift.core import ( + AirflowBasicAuthBackend, AirflowInstance, - BasicAuthBackend, assets_with_task_mappings, build_defs_from_airflow_instance, ) @@ -21,6 +23,8 @@ from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb +PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") @@ -51,6 +55,7 @@ def _multi_asset() -> None: @dbt_assets( manifest=dbt_project_path() / "target" / "manifest.json", project=DbtProject(dbt_project_path()), + partitions_def=PARTITIONS_DEF, ) def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() @@ -61,7 +66,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): task_mappings={ "load_raw_customers": [ load_csv_to_duckdb_asset( - AssetSpec(key=["raw_data", "raw_customers"]), + AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF), LoadCsvToDuckDbArgs( table_name="raw_customers", csv_path=airflow_dags_path() / "raw_customers.csv", @@ -77,7 +82,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): [dbt_project_assets], "export_customers": [ export_duckdb_to_csv_defs( - AssetSpec(key="customers_csv", deps=["customers"]), + AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF), ExportDuckDbToCsvArgs( table_name="customers", csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", @@ -92,7 +97,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs = build_defs_from_airflow_instance( airflow_instance=AirflowInstance( - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_dag_level.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_dag_level.py index 9c49f4624cb96..cfebc83ad54eb 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_dag_level.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_dag_level.py @@ -10,8 +10,8 @@ multi_asset, ) from dagster_airlift.core import ( + AirflowBasicAuthBackend, AirflowInstance, - BasicAuthBackend, assets_with_dag_mappings, build_defs_from_airflow_instance, ) @@ -87,7 +87,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs = build_defs_from_airflow_instance( airflow_instance=AirflowInstance( - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_with_check.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_with_check.py index dbd7647307397..314327c7f11d7 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_with_check.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/migrate_with_check.py @@ -8,14 +8,16 @@ AssetKey, AssetsDefinition, AssetSpec, + DailyPartitionsDefinition, Definitions, asset_check, materialize, multi_asset, ) +from dagster._time import get_current_datetime_midnight from dagster_airlift.core import ( + AirflowBasicAuthBackend, AirflowInstance, - BasicAuthBackend, assets_with_task_mappings, build_defs_from_airflow_instance, ) @@ -25,6 +27,8 @@ from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb +PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") @@ -55,6 +59,7 @@ def _multi_asset() -> None: @dbt_assets( manifest=dbt_project_path() / "target" / "manifest.json", project=DbtProject(dbt_project_path()), + partitions_def=PARTITIONS_DEF, ) def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() @@ -65,7 +70,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): task_mappings={ "load_raw_customers": [ load_csv_to_duckdb_asset( - AssetSpec(key=["raw_data", "raw_customers"]), + AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF), LoadCsvToDuckDbArgs( table_name="raw_customers", csv_path=airflow_dags_path() / "raw_customers.csv", @@ -81,7 +86,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): [dbt_project_assets], "export_customers": [ export_duckdb_to_csv_defs( - AssetSpec(key="customers_csv", deps=["customers"]), + AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF), ExportDuckDbToCsvArgs( table_name="customers", csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", @@ -118,7 +123,7 @@ def validate_exported_csv() -> AssetCheckResult: defs = build_defs_from_airflow_instance( airflow_instance=AirflowInstance( - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py index 0dd594aeed3e2..73af301732ee8 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe.py @@ -1,16 +1,46 @@ import os from pathlib import Path -from dagster import AssetExecutionContext, AssetSpec, Definitions +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) from dagster_airlift.core import ( + AirflowBasicAuthBackend, AirflowInstance, - BasicAuthBackend, assets_with_task_mappings, build_defs_from_airflow_instance, ) from dagster_dbt import DbtCliResource, DbtProject, dbt_assets +@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" @@ -37,7 +67,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs = build_defs_from_airflow_instance( airflow_instance=AirflowInstance( - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", @@ -47,5 +77,6 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs=Definitions( assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], ), ) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py new file mode 100644 index 0000000000000..360950591cf82 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_check_on_asset.py @@ -0,0 +1,82 @@ +import os +from pathlib import Path + +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + Definitions, + asset_check, +) +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + assets_with_task_mappings, + build_defs_from_airflow_instance, +) +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + + +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])], + "build_dbt_models": [dbt_project_assets], + "export_customers": [AssetSpec(key="customers_csv", deps=["customers"])], + }, +) + + +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", + ), + defs=Definitions( + assets=mapped_assets, + resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], + ), +) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_dag_level.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_dag_level.py index 9e9f077137454..72d2516ab0c61 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_dag_level.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_dag_level.py @@ -3,8 +3,8 @@ from dagster import AssetExecutionContext, AssetSpec, Definitions from dagster_airlift.core import ( + AirflowBasicAuthBackend, AirflowInstance, - BasicAuthBackend, assets_with_dag_mappings, build_defs_from_airflow_instance, ) @@ -39,7 +39,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): defs = build_defs_from_airflow_instance( airflow_instance=AirflowInstance( - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py new file mode 100644 index 0000000000000..a9d88160b09a1 --- /dev/null +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/observe_with_partitions.py @@ -0,0 +1,91 @@ +import os +from pathlib import Path + +from dagster import ( + AssetCheckResult, + AssetCheckSeverity, + AssetExecutionContext, + AssetKey, + AssetSpec, + DailyPartitionsDefinition, + Definitions, + asset_check, +) +from dagster._time import get_current_datetime_midnight +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + assets_with_task_mappings, + build_defs_from_airflow_instance, +) +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + +PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) + + +@asset_check(asset=AssetKey(["customers_csv"])) +def validate_exported_csv() -> AssetCheckResult: + csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv" + + if not csv_path.exists(): + return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist") + + rows = len(csv_path.read_text().split("\n")) + if rows < 2: + return AssetCheckResult( + passed=False, + description=f"Export CSV {csv_path} is empty", + severity=AssetCheckSeverity.WARN, + ) + + return AssetCheckResult( + passed=True, + description=f"Export CSV {csv_path} exists", + metadata={"rows": rows}, + ) + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), + partitions_def=PARTITIONS_DEF, +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "load_raw_customers": [ + AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF) + ], + "build_dbt_models": [dbt_project_assets], + "export_customers": [ + AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF) + ], + }, +) + + +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", + ), + defs=Definitions( + assets=mapped_assets, + resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, + asset_checks=[validate_exported_csv], + ), +) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer.py index 83e080db5a66f..ffd908784ce80 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer.py @@ -1,9 +1,13 @@ -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_defs_from_airflow_instance, +) defs = build_defs_from_airflow_instance( airflow_instance=AirflowInstance( # other backends available (e.g. MwaaSessionAuthBackend) - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py index 65f45e57bb642..6abcef989641d 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/peer_with_check.py @@ -2,7 +2,11 @@ from pathlib import Path from dagster import AssetCheckResult, AssetCheckSeverity, AssetKey, Definitions, asset_check -from dagster_airlift.core import AirflowInstance, BasicAuthBackend, build_defs_from_airflow_instance +from dagster_airlift.core import ( + AirflowBasicAuthBackend, + AirflowInstance, + build_defs_from_airflow_instance, +) # Attach a check to the DAG representation asset, which will be executed by Dagster @@ -29,17 +33,15 @@ def validate_exported_csv() -> AssetCheckResult: ) -defs = Definitions.merge( - build_defs_from_airflow_instance( - airflow_instance=AirflowInstance( - # other backends available (e.g. MwaaSessionAuthBackend) - auth_backend=BasicAuthBackend( - webserver_url="http://localhost:8080", - username="admin", - password="admin", - ), - name="airflow_instance_one", - ) +defs = build_defs_from_airflow_instance( + airflow_instance=AirflowInstance( + # other backends available (e.g. MwaaSessionAuthBackend) + auth_backend=AirflowBasicAuthBackend( + webserver_url="http://localhost:8080", + username="admin", + password="admin", + ), + name="airflow_instance_one", ), - Definitions(asset_checks=[validate_exported_csv]), + defs=Definitions(asset_checks=[validate_exported_csv]), ) diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/standalone.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/standalone.py index 99340a7040e62..2eedee5788956 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/standalone.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example/dagster_defs/stages/standalone.py @@ -9,17 +9,21 @@ AssetsDefinition, AssetSelection, AssetSpec, + DailyPartitionsDefinition, Definitions, ScheduleDefinition, asset_check, multi_asset, ) +from dagster._time import get_current_datetime_midnight from dagster_dbt import DbtCliResource, DbtProject, dbt_assets # Code also invoked from Airflow from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb +PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) + def dbt_project_path() -> Path: env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") @@ -50,6 +54,7 @@ def _multi_asset() -> None: @dbt_assets( manifest=dbt_project_path() / "target" / "manifest.json", project=DbtProject(dbt_project_path()), + partitions_def=PARTITIONS_DEF, ) def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream() @@ -57,7 +62,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): assets = [ load_csv_to_duckdb_asset( - AssetSpec(key=["raw_data", "raw_customers"]), + AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF), LoadCsvToDuckDbArgs( table_name="raw_customers", csv_path=airflow_dags_path() / "raw_customers.csv", @@ -69,7 +74,7 @@ def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): ), dbt_project_assets, export_duckdb_to_csv_defs( - AssetSpec(key="customers_csv", deps=["customers"]), + AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF), ExportDuckDbToCsvArgs( table_name="customers", csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py index 81eb94a9064dd..9105f494529a2 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/integration_tests/test_migrating_e2e.py @@ -5,7 +5,7 @@ import pytest from dagster import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus -from dagster_airlift.core import AirflowInstance, BasicAuthBackend +from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance from .utils import ( poll_for_materialization, @@ -23,7 +23,7 @@ def test_migration_status( both the Airflow DAGs and the Dagster asset definitions. """ instance = AirflowInstance( - auth_backend=BasicAuthBackend( + auth_backend=AirflowBasicAuthBackend( webserver_url="http://localhost:8080", username="admin", password="admin", diff --git a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/unit_tests/test_proxy_operators.py b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/unit_tests/test_proxy_operators.py index dcee6accbe53b..2900309914e39 100644 --- a/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/unit_tests/test_proxy_operators.py +++ b/examples/experimental/dagster-airlift/examples/tutorial-example/tutorial_example_tests/unit_tests/test_proxy_operators.py @@ -1,4 +1,4 @@ -def test_dagster_cloud_proxy_operator(mock_airflow_variable: None) -> None: +def test_dagster_cloud_proxy_operator() -> None: from tutorial_example.snippets.custom_operator_examples.plus_proxy_operator import ( DagsterCloudProxyOperator, ) @@ -26,7 +26,7 @@ def test_dagster_cloud_proxy_operator(mock_airflow_variable: None) -> None: ) -def test_custom_proxy_operator(mock_airflow_variable: None) -> None: +def test_custom_proxy_operator() -> None: from tutorial_example.snippets.custom_operator_examples.custom_proxy import ( CustomProxyToDagsterOperator, ) @@ -40,7 +40,7 @@ def test_custom_proxy_operator(mock_airflow_variable: None) -> None: assert session.headers["Authorization"] == "Bearer test_key" -def test_dag_override_operator(mock_airflow_variable: None) -> None: +def test_dag_override_operator() -> None: from tutorial_example.snippets.custom_operator_examples.custom_dag_level_proxy import ( CustomProxyToDagsterOperator, ) diff --git a/examples/experimental/dagster-airlift/scripts/airflow_setup.sh b/examples/experimental/dagster-airlift/scripts/airflow_setup.sh index 8d6bc4422898c..bb61e59c82bf9 100755 --- a/examples/experimental/dagster-airlift/scripts/airflow_setup.sh +++ b/examples/experimental/dagster-airlift/scripts/airflow_setup.sh @@ -1,37 +1,41 @@ #!/bin/bash -# Check if the path argument is provided -if [ -z "$1" ]; then - echo "Usage: $0{asset.partitionDefinition.description}
-