Skip to content

Commit

Permalink
add time filter on asset events (apache#44795)
Browse files Browse the repository at this point in the history
* add time filter on asset events

* add asset timestamp filters

* add tests

* fix test nit

* revert legacy api changes

* fmt

* fmt
  • Loading branch information
nishant-gupta-sh authored Dec 13, 2024
1 parent b69441d commit 464e9ee
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 6 deletions.
18 changes: 18 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,24 @@ paths:
- type: integer
- type: 'null'
title: Source Map Index
- name: timestamp_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Timestamp Gte
- name: timestamp_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Timestamp Lte
responses:
'200':
description: Successful Response
Expand Down
5 changes: 4 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
QueryLimit,
QueryOffset,
QueryUriPatternSearch,
RangeFilter,
SortParam,
datetime_range_filter_factory,
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
Expand Down Expand Up @@ -193,12 +195,13 @@ def get_asset_events(
source_map_index: Annotated[
FilterParam[int | None], Depends(filter_param_factory(AssetEvent.source_map_index, int | None))
],
timestamp_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("timestamp", AssetEvent))],
session: SessionDep,
) -> AssetEventCollectionResponse:
"""Get asset events."""
assets_event_select, total_entries = paginated_select(
statement=select(AssetEvent),
filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index],
filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index, timestamp_range],
order_by=order_by,
offset=offset,
limit=limit,
Expand Down
6 changes: 6 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ export const UseAssetServiceGetAssetEventsKeyFn = (
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}: {
assetId?: number;
limit?: number;
Expand All @@ -146,6 +148,8 @@ export const UseAssetServiceGetAssetEventsKeyFn = (
sourceMapIndex?: number;
sourceRunId?: string;
sourceTaskId?: string;
timestampGte?: string;
timestampLte?: string;
} = {},
queryKey?: Array<unknown>,
) => [
Expand All @@ -160,6 +164,8 @@ export const UseAssetServiceGetAssetEventsKeyFn = (
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
},
]),
];
Expand Down
10 changes: 10 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ export const prefetchUseAssetServiceGetAssetAlias = (
* @param data.sourceTaskId
* @param data.sourceRunId
* @param data.sourceMapIndex
* @param data.timestampGte
* @param data.timestampLte
* @returns AssetEventCollectionResponse Successful Response
* @throws ApiError
*/
Expand All @@ -181,6 +183,8 @@ export const prefetchUseAssetServiceGetAssetEvents = (
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}: {
assetId?: number;
limit?: number;
Expand All @@ -190,6 +194,8 @@ export const prefetchUseAssetServiceGetAssetEvents = (
sourceMapIndex?: number;
sourceRunId?: string;
sourceTaskId?: string;
timestampGte?: string;
timestampLte?: string;
} = {},
) =>
queryClient.prefetchQuery({
Expand All @@ -202,6 +208,8 @@ export const prefetchUseAssetServiceGetAssetEvents = (
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}),
queryFn: () =>
AssetService.getAssetEvents({
Expand All @@ -213,6 +221,8 @@ export const prefetchUseAssetServiceGetAssetEvents = (
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}),
});
/**
Expand Down
10 changes: 10 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ export const useAssetServiceGetAssetAlias = <
* @param data.sourceTaskId
* @param data.sourceRunId
* @param data.sourceMapIndex
* @param data.timestampGte
* @param data.timestampLte
* @returns AssetEventCollectionResponse Successful Response
* @throws ApiError
*/
Expand All @@ -235,6 +237,8 @@ export const useAssetServiceGetAssetEvents = <
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}: {
assetId?: number;
limit?: number;
Expand All @@ -244,6 +248,8 @@ export const useAssetServiceGetAssetEvents = <
sourceMapIndex?: number;
sourceRunId?: string;
sourceTaskId?: string;
timestampGte?: string;
timestampLte?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
Expand All @@ -259,6 +265,8 @@ export const useAssetServiceGetAssetEvents = <
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
},
queryKey,
),
Expand All @@ -272,6 +280,8 @@ export const useAssetServiceGetAssetEvents = <
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}) as TData,
...options,
});
Expand Down
10 changes: 10 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ export const useAssetServiceGetAssetAliasSuspense = <
* @param data.sourceTaskId
* @param data.sourceRunId
* @param data.sourceMapIndex
* @param data.timestampGte
* @param data.timestampLte
* @returns AssetEventCollectionResponse Successful Response
* @throws ApiError
*/
Expand All @@ -210,6 +212,8 @@ export const useAssetServiceGetAssetEventsSuspense = <
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}: {
assetId?: number;
limit?: number;
Expand All @@ -219,6 +223,8 @@ export const useAssetServiceGetAssetEventsSuspense = <
sourceMapIndex?: number;
sourceRunId?: string;
sourceTaskId?: string;
timestampGte?: string;
timestampLte?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
Expand All @@ -234,6 +240,8 @@ export const useAssetServiceGetAssetEventsSuspense = <
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
},
queryKey,
),
Expand All @@ -247,6 +255,8 @@ export const useAssetServiceGetAssetEventsSuspense = <
sourceMapIndex,
sourceRunId,
sourceTaskId,
timestampGte,
timestampLte,
}) as TData,
...options,
});
Expand Down
4 changes: 4 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ export class AssetService {
* @param data.sourceTaskId
* @param data.sourceRunId
* @param data.sourceMapIndex
* @param data.timestampGte
* @param data.timestampLte
* @returns AssetEventCollectionResponse Successful Response
* @throws ApiError
*/
Expand All @@ -333,6 +335,8 @@ export class AssetService {
source_task_id: data.sourceTaskId,
source_run_id: data.sourceRunId,
source_map_index: data.sourceMapIndex,
timestampGte: data.timestampGte,
timestampLte: data.timestampLte,
},
errors: {
401: "Unauthorized",
Expand Down
2 changes: 2 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,8 @@ export type GetAssetEventsData = {
sourceMapIndex?: number | null;
sourceRunId?: string | null;
sourceTaskId?: string | null;
timestampGte?: string | null;
timestampLte?: string | null;
};

export type GetAssetEventsResponse = AssetEventCollectionResponse;
Expand Down
62 changes: 57 additions & 5 deletions tests/api_fastapi/core_api/routes/public/test_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from __future__ import annotations

from collections.abc import Generator
from datetime import datetime
from datetime import datetime, timedelta
from unittest import mock

import pytest
Expand Down Expand Up @@ -104,7 +104,7 @@ def _create_provided_asset_alias(session, asset_alias: AssetAliasModel) -> None:
session.commit()


def _create_assets_events(session, num: int = 2) -> None:
def _create_assets_events(session, num: int = 2, varying_timestamps=False) -> None:
assets_events = [
AssetEvent(
id=i,
Expand All @@ -113,7 +113,7 @@ def _create_assets_events(session, num: int = 2) -> None:
source_task_id="source_task_id",
source_dag_id="source_dag_id",
source_run_id=f"source_run_id_{i}",
timestamp=DEFAULT_DATE,
timestamp=DEFAULT_DATE + timedelta(days=i - 1) if varying_timestamps else DEFAULT_DATE,
)
for i in range(1, 1 + num)
]
Expand Down Expand Up @@ -204,8 +204,8 @@ def create_provided_asset(self, session, asset: AssetModel):
_create_provided_asset(session=session, asset=asset)

@provide_session
def create_assets_events(self, session, num: int = 2):
_create_assets_events(session=session, num=num)
def create_assets_events(self, session, num: int = 2, varying_timestamps: bool = False):
_create_assets_events(session=session, num=num, varying_timestamps=varying_timestamps)

@provide_session
def create_assets_events_with_sensitive_extra(self, session, num: int = 2):
Expand Down Expand Up @@ -607,6 +607,58 @@ def test_filtering(self, test_client, params, total_entries, session):
assert response.status_code == 200
assert response.json()["total_entries"] == total_entries

@pytest.mark.parametrize(
"params, expected_ids",
[
# Test Case 1: Filtering with both timestamp_gte and timestamp_lte set to the same date
(
{
"timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
},
[1], # expected_ids for events exactly on DEFAULT_DATE
),
# Test Case 2: Filtering events greater than or equal to a certain timestamp and less than or equal to another
(
{
"timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE),
"timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=1)),
},
[1, 2], # expected_ids for events within the date range
),
# Test Case 3: timestamp_gte later than timestamp_lte with no events in range
(
{
"timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=1)),
"timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE - timedelta(days=1)),
},
[], # expected_ids for events outside the range
),
# Test Case 4: timestamp_gte earlier than timestamp_lte, allowing events within the range
(
{
"timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=1)),
"timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=2)),
},
[2, 3], # expected_ids for events within the date range
),
],
)
def test_filter_by_timestamp_gte_and_lte(self, test_client, params, expected_ids, session):
# Create sample assets and asset events with specified timestamps
self.create_assets()
self.create_assets_events(num=3, varying_timestamps=True)
self.create_dag_run()
self.create_asset_dag_run()

# Test with both timestamp_gte and timestamp_lte filters
response = test_client.get("/public/assets/events", params=params)

assert response.status_code == 200
asset_event_ids = [asset_event["id"] for asset_event in response.json()["asset_events"]]

assert asset_event_ids == expected_ids

def test_order_by_raises_400_for_invalid_attr(self, test_client, session):
response = test_client.get("/public/assets/events?order_by=fake")

Expand Down

0 comments on commit 464e9ee

Please sign in to comment.