From 2babbe6b0f1e48ca8d66e3c082816de2247e6028 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 9 Dec 2024 14:08:49 -0500 Subject: [PATCH] feat(airflow): drop Airflow < 2.3 support + make plugin v2 the default (#12056) --- .github/workflows/airflow-plugin.yml | 16 +- docs/how/updating-datahub.md | 2 + docs/lineage/airflow.md | 24 +-- .../airflow-plugin/build.gradle | 2 +- .../airflow-plugin/setup.py | 14 +- .../datahub_airflow_plugin/_airflow_shims.py | 4 +- .../datahub_listener.py | 3 +- .../integration/goldens/v1_basic_iolets.json | 54 ++++++- .../integration/goldens/v1_simple_dag.json | 142 ++++++++++++++++-- .../tests/integration/test_plugin.py | 24 ++- .../airflow-plugin/tox.ini | 28 ++-- 11 files changed, 241 insertions(+), 72 deletions(-) diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index 66a08dc63aa0d..1fdfc52857b01 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -34,29 +34,21 @@ jobs: include: # Note: this should be kept in sync with tox.ini. - python-version: "3.8" - extra_pip_requirements: "apache-airflow~=2.1.4" - extra_pip_extras: plugin-v1 - - python-version: "3.8" - extra_pip_requirements: "apache-airflow~=2.2.4" - extra_pip_extras: plugin-v1 + extra_pip_requirements: "apache-airflow~=2.3.4" + extra_pip_extras: test-airflow23 - python-version: "3.10" extra_pip_requirements: "apache-airflow~=2.4.3" - extra_pip_extras: plugin-v2,test-airflow24 + extra_pip_extras: test-airflow24 - python-version: "3.10" extra_pip_requirements: "apache-airflow~=2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt" - extra_pip_extras: plugin-v2 - python-version: "3.10" extra_pip_requirements: "apache-airflow~=2.7.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt" - extra_pip_extras: plugin-v2 - python-version: "3.10" extra_pip_requirements: "apache-airflow~=2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt" - extra_pip_extras: plugin-v2 - python-version: "3.11" extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt" - extra_pip_extras: plugin-v2 - python-version: "3.11" - extra_pip_requirements: "apache-airflow~=2.10.2 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt" - extra_pip_extras: plugin-v2 + extra_pip_requirements: "apache-airflow~=2.10.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt" fail-fast: false steps: - name: Set up JDK 17 diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 087e30c2e541a..3f9de1ff2f7f9 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -35,6 +35,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance..database`. - #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information. +- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2. +- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation. ### Breaking Changes diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md index 829c048a8f8e2..2bd58334933fb 100644 --- a/docs/lineage/airflow.md +++ b/docs/lineage/airflow.md @@ -13,14 +13,14 @@ The DataHub Airflow plugin supports: - Task run information, including task successes and failures. - Manual lineage annotations using `inlets` and `outlets` on Airflow operators. -There's two actively supported implementations of the plugin, with different Airflow version support. +There's two implementations of the plugin, with different Airflow version support. -| Approach | Airflow Version | Notes | -| --------- | --------------- | --------------------------------------------------------------------------- | -| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ | -| Plugin v1 | 2.1 - 2.8 | No automatic lineage extraction; may not extract lineage if the task fails. | +| Approach | Airflow Versions | Notes | +| --------- | ---------------- | --------------------------------------------------------------------------------------- | +| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ | +| Plugin v1 | 2.3 - 2.8 | Deprecated. No automatic lineage extraction; may not extract lineage if the task fails. | -If you're using Airflow older than 2.1, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details. +If you're using Airflow older than 2.3, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details. @@ -29,7 +29,7 @@ If you're using Airflow older than 2.1, it's possible to use the v1 plugin with ### Installation -The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, use the v1 plugin instead. +The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, see the [compatibility section](#compatibility) for other options. ```shell pip install 'acryl-datahub-airflow-plugin[plugin-v2]' @@ -84,9 +84,10 @@ enabled = True # default ### Installation -The v1 plugin requires Airflow 2.1 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details. +The v1 plugin requires Airflow 2.3 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details. -If you're using Airflow 2.3+, we recommend using the v2 plugin instead. If you need to use the v1 plugin with Airflow 2.3+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`. +Note that the v1 plugin is less featureful than the v2 plugin, and is overall not actively maintained. +Since datahub v0.15.0, the v2 plugin has been the default. If you need to use the v1 plugin with `acryl-datahub-airflow-plugin` v0.15.0+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`. ```shell pip install 'acryl-datahub-airflow-plugin[plugin-v1]' @@ -340,11 +341,12 @@ The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade ` ## Compatibility -We no longer officially support Airflow <2.1. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow. -Both of these options support Python 3.7+. +We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow. +The first two options support Python 3.7+, and the last option supports Python 3.8+. - Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.9.1.0. - Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.11.0.1. +- Airflow 2.2.x, use DataHub plugin v2 with acryl-datahub-airflow-plugin <= 0.14.1.5. DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release. Note that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA. diff --git a/metadata-ingestion-modules/airflow-plugin/build.gradle b/metadata-ingestion-modules/airflow-plugin/build.gradle index f30858ba6a14e..68a35c0dfc417 100644 --- a/metadata-ingestion-modules/airflow-plugin/build.gradle +++ b/metadata-ingestion-modules/airflow-plugin/build.gradle @@ -13,7 +13,7 @@ if (!project.hasProperty("extra_pip_requirements")) { ext.extra_pip_requirements = "" } if (!project.hasProperty("extra_pip_extras")) { - ext.extra_pip_extras = "plugin-v2" + ext.extra_pip_extras = "" } // If extra_pip_extras is non-empty, we need to add a comma to the beginning of the string. if (extra_pip_extras != "") { diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 02a0bbb6022e0..3209233184d55 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -24,8 +24,8 @@ def get_long_description(): base_requirements = { f"acryl-datahub[datahub-rest]{_self_pin}", - # Actual dependencies. - "apache-airflow >= 2.0.2", + # We require Airflow 2.3.x, since we need the new DAG listener API. + "apache-airflow>=2.3.0", } plugins: Dict[str, Set[str]] = { @@ -44,12 +44,13 @@ def get_long_description(): # We remain restrictive on the versions allowed here to prevent # us from being broken by backwards-incompatible changes in the # underlying package. - "openlineage-airflow>=1.2.0,<=1.22.0", + "openlineage-airflow>=1.2.0,<=1.25.0", }, } -# Include datahub-rest in the base requirements. +# Require some plugins by default. base_requirements.update(plugins["datahub-rest"]) +base_requirements.update(plugins["plugin-v2"]) mypy_stubs = { @@ -109,6 +110,11 @@ def get_long_description(): "apache-airflow-providers-sqlite", } per_version_test_requirements = { + "test-airflow23": { + "pendulum<3.0", + "Flask-Session<0.6.0", + "connexion<3.0", + }, "test-airflow24": { "pendulum<3.0", "Flask-Session<0.6.0", diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py index c1e2dd4cc422d..d86a46e042e8f 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_airflow_shims.py @@ -46,7 +46,7 @@ def get_task_inlets(operator: "Operator") -> List: return operator._inlets # type: ignore[attr-defined, union-attr] if hasattr(operator, "get_inlet_defs"): return operator.get_inlet_defs() # type: ignore[attr-defined] - return operator.inlets + return operator.inlets or [] def get_task_outlets(operator: "Operator") -> List: @@ -56,7 +56,7 @@ def get_task_outlets(operator: "Operator") -> List: return operator._outlets # type: ignore[attr-defined, union-attr] if hasattr(operator, "get_outlet_defs"): return operator.get_outlet_defs() - return operator.outlets + return operator.outlets or [] __all__ = [ diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index e00cf51ea456c..aa7b3108f64f1 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -74,7 +74,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811 "1", ) _RUN_IN_THREAD_TIMEOUT = float( - os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 15) + os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 10) ) _DATAHUB_CLEANUP_DAG = "Datahub_Cleanup" @@ -102,6 +102,7 @@ def get_airflow_plugin_listener() -> Optional["DataHubListener"]: "capture_tags": plugin_config.capture_tags_info, "capture_ownership": plugin_config.capture_ownership_info, "enable_extractors": plugin_config.enable_extractors, + "render_templates": plugin_config.render_templates, "disable_openlineage_plugin": plugin_config.disable_openlineage_plugin, }, ) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json index fd10f858d00fb..4c21b7ed4000d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_basic_iolets.json @@ -14,7 +14,7 @@ "fileloc": "", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", - "tags": "None", + "tags": "[]", "timezone": "Timezone('UTC')" }, "externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets", @@ -83,7 +83,7 @@ "execution_timeout": "None", "sla": "None", "task_id": "'run_data_task'", - "trigger_rule": "'all_success'", + "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "[]", "inlets": "[]", @@ -246,6 +246,46 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'run_data_task'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'run_data_task'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task", + "name": "run_data_task", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", @@ -402,16 +442,16 @@ "state": "success", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", "orchestrator": "airflow", "dag_id": "basic_iolets", "task_id": "run_data_task" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717180290951, + "time": 1733529136396, "actor": "urn:li:corpuser:datahub" } } @@ -544,7 +584,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1717180290951, + "timestampMillis": 1733529136396, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -561,7 +601,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1717180291140, + "timestampMillis": 1733529137385, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json index 5c5be6848fd83..b6ab1ff9120f2 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v1_simple_dag.json @@ -14,7 +14,7 @@ "fileloc": "", "is_paused_upon_creation": "None", "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", - "tags": "None", + "tags": "[]", "timezone": "Timezone('UTC')" }, "externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag", @@ -84,7 +84,7 @@ "execution_timeout": "None", "sla": "None", "task_id": "'task_1'", - "trigger_rule": "'all_success'", + "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "['run_another_data_task']", "inlets": "[]", @@ -205,6 +205,46 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'task_1'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'task_1'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "['run_another_data_task']", + "inlets": "[]", + "outlets": "[]" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=task_1", + "name": "task_1", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", @@ -319,16 +359,16 @@ "state": "success", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "task_1" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717180227827, + "time": 1733528983395, "actor": "urn:li:corpuser:datahub" } } @@ -419,7 +459,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1717180227827, + "timestampMillis": 1733528983395, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -436,7 +476,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1717180228022, + "timestampMillis": 1733528984355, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -449,6 +489,42 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": { + "_access_control": "None", + "catchup": "False", + "description": "'A simple DAG that runs a few fake data tasks.'", + "doc_md": "None", + "fileloc": "", + "is_paused_upon_creation": "None", + "start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))", + "tags": "[]", + "timezone": "Timezone('UTC')" + }, + "externalUrl": "http://airflow.example.com/tree?dag_id=simple_dag", + "name": "simple_dag", + "description": "A simple DAG that runs a few fake data tasks.", + "env": "PROD" + } + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", @@ -498,7 +574,7 @@ "execution_timeout": "None", "sla": "None", "task_id": "'run_another_data_task'", - "trigger_rule": "'all_success'", + "trigger_rule": "", "wait_for_downstream": "False", "downstream_task_ids": "[]", "inlets": "[]", @@ -575,6 +651,46 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'run_another_data_task'", + "execution_timeout": "None", + "sla": "None", + "task_id": "'run_another_data_task'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=simple_dag&_flt_3_task_id=run_another_data_task", + "name": "run_another_data_task", + "type": { + "string": "COMMAND" + }, + "env": "PROD" + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),run_another_data_task)", @@ -645,16 +761,16 @@ "state": "success", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "run_another_data_task" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717180231676, + "time": 1733528992448, "actor": "urn:li:corpuser:datahub" } } @@ -679,7 +795,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1717180231676, + "timestampMillis": 1733528992448, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" @@ -696,7 +812,7 @@ "aspectName": "dataProcessInstanceRunEvent", "aspect": { "json": { - "timestampMillis": 1717180231824, + "timestampMillis": 1733528993380, "partitionSpec": { "partition": "FULL_TABLE_SNAPSHOT", "type": "FULL_TABLE" diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 75bb43af1a43d..3b2c9140e4632 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -12,6 +12,7 @@ import time from typing import Any, Iterator, Sequence +import packaging.version import pytest import requests import tenacity @@ -20,6 +21,7 @@ from datahub.testing.compare_metadata_json import assert_metadata_files_equal from datahub_airflow_plugin._airflow_shims import ( + AIRFLOW_VERSION, HAS_AIRFLOW_DAG_LISTENER_API, HAS_AIRFLOW_LISTENER_API, HAS_AIRFLOW_STANDALONE_CMD, @@ -242,6 +244,7 @@ def _run_airflow( # Note that we could also disable the RUN_IN_THREAD entirely, # but I want to minimize the difference between CI and prod. "DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT": "30", + "DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN": "true" if is_v1 else "false", # Convenience settings. "AIRFLOW__DATAHUB__LOG_LEVEL": "DEBUG", "AIRFLOW__DATAHUB__DEBUG_EMITTER": "True", @@ -361,7 +364,6 @@ class DagTestCase: @pytest.mark.parametrize( ["golden_filename", "test_case", "is_v1"], [ - # On Airflow <= 2.2, test plugin v1. *[ pytest.param( f"v1_{test_case.dag_id}", @@ -369,8 +371,8 @@ class DagTestCase: True, id=f"v1_{test_case.dag_id}", marks=pytest.mark.skipif( - HAS_AIRFLOW_LISTENER_API, - reason="Not testing plugin v1 on newer Airflow versions", + AIRFLOW_VERSION >= packaging.version.parse("2.4.0"), + reason="We only test the v1 plugin on Airflow 2.3", ), ) for test_case in test_cases @@ -391,10 +393,18 @@ class DagTestCase: if HAS_AIRFLOW_DAG_LISTENER_API else f"v2_{test_case.dag_id}_no_dag_listener" ), - marks=pytest.mark.skipif( - not HAS_AIRFLOW_LISTENER_API, - reason="Cannot test plugin v2 without the Airflow plugin listener API", - ), + marks=[ + pytest.mark.skipif( + not HAS_AIRFLOW_LISTENER_API, + reason="Cannot test plugin v2 without the Airflow plugin listener API", + ), + pytest.mark.skipif( + AIRFLOW_VERSION < packaging.version.parse("2.4.0"), + reason="We skip testing the v2 plugin on Airflow 2.3 because it causes flakiness in the custom properties. " + "Ideally we'd just fix these, but given that Airflow 2.3 is EOL and likely going to be deprecated " + "soon anyways, it's not worth the effort.", + ), + ], ) for test_case in test_cases ], diff --git a/metadata-ingestion-modules/airflow-plugin/tox.ini b/metadata-ingestion-modules/airflow-plugin/tox.ini index 28c0b9532bcb8..b310ec84248f1 100644 --- a/metadata-ingestion-modules/airflow-plugin/tox.ini +++ b/metadata-ingestion-modules/airflow-plugin/tox.ini @@ -4,17 +4,24 @@ # and then run "tox" from this directory. [tox] -envlist = py38-airflow21, py38-airflow22, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210 +envlist = py38-airflow23, py310-airflow24, py310-airflow26, py310-airflow27, py310-airflow28, py311-airflow29, py311-airflow210 [testenv] use_develop = true -extras = dev,integration-tests,plugin-v1 +extras = + dev + integration-tests + plugin-v1 + plugin-v2 + # For Airflow 2.3 and 2.4, add a few extra requirements. + airflow23: test-airflow23 + airflow24: test-airflow24 + deps = # This should be kept in sync with the Github Actions matrix. -e ../../metadata-ingestion/ # Airflow version - airflow21: apache-airflow~=2.1.0 - airflow22: apache-airflow~=2.2.0 + airflow23: apache-airflow~=2.3.0 airflow24: apache-airflow~=2.4.0 airflow26: apache-airflow~=2.6.0 airflow27: apache-airflow~=2.7.0 @@ -23,7 +30,8 @@ deps = airflow210: apache-airflow~=2.10.0 # Respect the Airflow constraints files. - # We can't make ourselves work with the constraints of Airflow < 2.3. + # We can't make ourselves work with the constraints of Airflow <= 2.3. + ; py38-airflow23: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.3.4/constraints-3.8.txt # The Airflow 2.4 constraints file requires a version of the sqlite provider whose # hook type is missing the `conn_name_attr` property. ; py310-airflow24: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.4.3/constraints-3.10.txt @@ -31,7 +39,7 @@ deps = py310-airflow27: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt py310-airflow28: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt py311-airflow29: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt - py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt + py311-airflow210: -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt # Before pinning to the constraint files, we previously left the dependencies # more open. There were a number of packages for which this caused issues. @@ -54,11 +62,3 @@ deps = ; airflow24,airflow26,airflow27,airflow28: Flask-Session<0.6.0 commands = pytest --cov-append {posargs} - -# For Airflow 2.4+, add the plugin-v2 extra. -[testenv:py310-airflow24] -extras = dev,integration-tests,plugin-v2,test-airflow24 - -[testenv:py3{10,11}-airflow{26,27,28,29,210}] -extras = dev,integration-tests,plugin-v2 -