Skip to content

Commit

Permalink
feat(airflow): drop Airflow < 2.3 support + make plugin v2 the default (
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 9, 2024
1 parent 4811de1 commit 2babbe6
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 72 deletions.
16 changes: 4 additions & 12 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,21 @@ jobs:
include:
# Note: this should be kept in sync with tox.ini.
- python-version: "3.8"
extra_pip_requirements: "apache-airflow~=2.1.4"
extra_pip_extras: plugin-v1
- python-version: "3.8"
extra_pip_requirements: "apache-airflow~=2.2.4"
extra_pip_extras: plugin-v1
extra_pip_requirements: "apache-airflow~=2.3.4"
extra_pip_extras: test-airflow23
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.4.3"
extra_pip_extras: plugin-v2,test-airflow24
extra_pip_extras: test-airflow24
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.6.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.10.txt"
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.7.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: "apache-airflow~=2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt"
extra_pip_extras: plugin-v2
- python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.9.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt"
extra_pip_extras: plugin-v2
- python-version: "3.11"
extra_pip_requirements: "apache-airflow~=2.10.2 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.11.txt"
extra_pip_extras: plugin-v2
extra_pip_requirements: "apache-airflow~=2.10.3 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.11.txt"
fail-fast: false
steps:
- name: Set up JDK 17
Expand Down
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.

### Breaking Changes

Expand Down
24 changes: 13 additions & 11 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ The DataHub Airflow plugin supports:
- Task run information, including task successes and failures.
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators.

There's two actively supported implementations of the plugin, with different Airflow version support.
There's two implementations of the plugin, with different Airflow version support.

| Approach | Airflow Version | Notes |
| --------- | --------------- | --------------------------------------------------------------------------- |
| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ |
| Plugin v1 | 2.1 - 2.8 | No automatic lineage extraction; may not extract lineage if the task fails. |
| Approach | Airflow Versions | Notes |
| --------- | ---------------- | --------------------------------------------------------------------------------------- |
| Plugin v2 | 2.3.4+ | Recommended. Requires Python 3.8+ |
| Plugin v1 | 2.3 - 2.8 | Deprecated. No automatic lineage extraction; may not extract lineage if the task fails. |

If you're using Airflow older than 2.1, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.
If you're using Airflow older than 2.3, it's possible to use the v1 plugin with older versions of `acryl-datahub-airflow-plugin`. See the [compatibility section](#compatibility) for more details.

<!-- TODO: Update the local Airflow guide and link to it here. -->
<!-- If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). -->
Expand All @@ -29,7 +29,7 @@ If you're using Airflow older than 2.1, it's possible to use the v1 plugin with

### Installation

The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, use the v1 plugin instead.
The v2 plugin requires Airflow 2.3+ and Python 3.8+. If you don't meet these requirements, see the [compatibility section](#compatibility) for other options.

```shell
pip install 'acryl-datahub-airflow-plugin[plugin-v2]'
Expand Down Expand Up @@ -84,9 +84,10 @@ enabled = True # default

### Installation

The v1 plugin requires Airflow 2.1 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details.
The v1 plugin requires Airflow 2.3 - 2.8 and Python 3.8+. If you're on older versions, it's still possible to use an older version of the plugin. See the [compatibility section](#compatibility) for more details.

If you're using Airflow 2.3+, we recommend using the v2 plugin instead. If you need to use the v1 plugin with Airflow 2.3+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`.
Note that the v1 plugin is less featureful than the v2 plugin, and is overall not actively maintained.
Since datahub v0.15.0, the v2 plugin has been the default. If you need to use the v1 plugin with `acryl-datahub-airflow-plugin` v0.15.0+, you must also set the environment variable `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN=true`.

```shell
pip install 'acryl-datahub-airflow-plugin[plugin-v1]'
Expand Down Expand Up @@ -340,11 +341,12 @@ The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `

## Compatibility

We no longer officially support Airflow <2.1. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
Both of these options support Python 3.7+.
We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.
The first two options support Python 3.7+, and the last option supports Python 3.8+.

- Airflow 1.10.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.9.1.0.
- Airflow 2.0.x, use DataHub plugin v1 with acryl-datahub-airflow-plugin <= 0.11.0.1.
- Airflow 2.2.x, use DataHub plugin v2 with acryl-datahub-airflow-plugin <= 0.14.1.5.

DataHub also previously supported an Airflow [lineage backend](https://airflow.apache.org/docs/apache-airflow/2.2.0/lineage.html#lineage-backend) implementation. While the implementation is still in our codebase, it is deprecated and will be removed in a future release.
Note that the lineage backend did not support automatic lineage extraction, did not capture task failures, and did not work in AWS MWAA.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ if (!project.hasProperty("extra_pip_requirements")) {
ext.extra_pip_requirements = ""
}
if (!project.hasProperty("extra_pip_extras")) {
ext.extra_pip_extras = "plugin-v2"
ext.extra_pip_extras = ""
}
// If extra_pip_extras is non-empty, we need to add a comma to the beginning of the string.
if (extra_pip_extras != "") {
Expand Down
14 changes: 10 additions & 4 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def get_long_description():

base_requirements = {
f"acryl-datahub[datahub-rest]{_self_pin}",
# Actual dependencies.
"apache-airflow >= 2.0.2",
# We require Airflow 2.3.x, since we need the new DAG listener API.
"apache-airflow>=2.3.0",
}

plugins: Dict[str, Set[str]] = {
Expand All @@ -44,12 +44,13 @@ def get_long_description():
# We remain restrictive on the versions allowed here to prevent
# us from being broken by backwards-incompatible changes in the
# underlying package.
"openlineage-airflow>=1.2.0,<=1.22.0",
"openlineage-airflow>=1.2.0,<=1.25.0",
},
}

# Include datahub-rest in the base requirements.
# Require some plugins by default.
base_requirements.update(plugins["datahub-rest"])
base_requirements.update(plugins["plugin-v2"])


mypy_stubs = {
Expand Down Expand Up @@ -109,6 +110,11 @@ def get_long_description():
"apache-airflow-providers-sqlite",
}
per_version_test_requirements = {
"test-airflow23": {
"pendulum<3.0",
"Flask-Session<0.6.0",
"connexion<3.0",
},
"test-airflow24": {
"pendulum<3.0",
"Flask-Session<0.6.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def get_task_inlets(operator: "Operator") -> List:
return operator._inlets # type: ignore[attr-defined, union-attr]
if hasattr(operator, "get_inlet_defs"):
return operator.get_inlet_defs() # type: ignore[attr-defined]
return operator.inlets
return operator.inlets or []


def get_task_outlets(operator: "Operator") -> List:
Expand All @@ -56,7 +56,7 @@ def get_task_outlets(operator: "Operator") -> List:
return operator._outlets # type: ignore[attr-defined, union-attr]
if hasattr(operator, "get_outlet_defs"):
return operator.get_outlet_defs()
return operator.outlets
return operator.outlets or []


__all__ = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
"1",
)
_RUN_IN_THREAD_TIMEOUT = float(
os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 15)
os.getenv("DATAHUB_AIRFLOW_PLUGIN_RUN_IN_THREAD_TIMEOUT", 10)
)
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"

Expand Down Expand Up @@ -102,6 +102,7 @@ def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
"capture_tags": plugin_config.capture_tags_info,
"capture_ownership": plugin_config.capture_ownership_info,
"enable_extractors": plugin_config.enable_extractors,
"render_templates": plugin_config.render_templates,
"disable_openlineage_plugin": plugin_config.disable_openlineage_plugin,
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"fileloc": "<fileloc>",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "None",
"tags": "[]",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=basic_iolets",
Expand Down Expand Up @@ -83,7 +83,7 @@
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "'all_success'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
Expand Down Expand Up @@ -246,6 +246,46 @@
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'run_data_task'",
"execution_timeout": "None",
"sla": "None",
"task_id": "'run_data_task'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=basic_iolets&_flt_3_task_id=run_data_task",
"name": "run_data_task",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)",
Expand Down Expand Up @@ -402,16 +442,16 @@
"state": "success",
"operator": "BashOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets",
"log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"orchestrator": "airflow",
"dag_id": "basic_iolets",
"task_id": "run_data_task"
},
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets",
"externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1",
"name": "basic_iolets_run_data_task_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1717180290951,
"time": 1733529136396,
"actor": "urn:li:corpuser:datahub"
}
}
Expand Down Expand Up @@ -544,7 +584,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1717180290951,
"timestampMillis": 1733529136396,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
Expand All @@ -561,7 +601,7 @@
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1717180291140,
"timestampMillis": 1733529137385,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
Expand Down
Loading

0 comments on commit 2babbe6

Please sign in to comment.