diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 4d8a1c5beb978..a36e45d4241a9 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -602,6 +602,24 @@ paths: - type: integer - type: 'null' title: Source Map Index + - name: timestamp_gte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Timestamp Gte + - name: timestamp_lte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Timestamp Lte responses: '200': description: Successful Response diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index c8cc9fb0f7695..a258f2bd07c5b 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -34,7 +34,9 @@ QueryLimit, QueryOffset, QueryUriPatternSearch, + RangeFilter, SortParam, + datetime_range_filter_factory, filter_param_factory, ) from airflow.api_fastapi.common.router import AirflowRouter @@ -193,12 +195,13 @@ def get_asset_events( source_map_index: Annotated[ FilterParam[int | None], Depends(filter_param_factory(AssetEvent.source_map_index, int | None)) ], + timestamp_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("timestamp", AssetEvent))], session: SessionDep, ) -> AssetEventCollectionResponse: """Get asset events.""" assets_event_select, total_entries = paginated_select( statement=select(AssetEvent), - filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index], + filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index, timestamp_range], order_by=order_by, offset=offset, limit=limit, diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index d747a10893b50..ea18796932159 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -137,6 +137,8 @@ export const UseAssetServiceGetAssetEventsKeyFn = ( sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }: { assetId?: number; limit?: number; @@ -146,6 +148,8 @@ export const UseAssetServiceGetAssetEventsKeyFn = ( sourceMapIndex?: number; sourceRunId?: string; sourceTaskId?: string; + timestampGte?: string; + timestampLte?: string; } = {}, queryKey?: Array, ) => [ @@ -160,6 +164,8 @@ export const UseAssetServiceGetAssetEventsKeyFn = ( sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }, ]), ]; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 8f455a2409c8c..fead08d893d99 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -167,6 +167,8 @@ export const prefetchUseAssetServiceGetAssetAlias = ( * @param data.sourceTaskId * @param data.sourceRunId * @param data.sourceMapIndex + * @param data.timestampGte + * @param data.timestampLte * @returns AssetEventCollectionResponse Successful Response * @throws ApiError */ @@ -181,6 +183,8 @@ export const prefetchUseAssetServiceGetAssetEvents = ( sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }: { assetId?: number; limit?: number; @@ -190,6 +194,8 @@ export const prefetchUseAssetServiceGetAssetEvents = ( sourceMapIndex?: number; sourceRunId?: string; sourceTaskId?: string; + timestampGte?: string; + timestampLte?: string; } = {}, ) => queryClient.prefetchQuery({ @@ -202,6 +208,8 @@ export const prefetchUseAssetServiceGetAssetEvents = ( sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }), queryFn: () => AssetService.getAssetEvents({ @@ -213,6 +221,8 @@ export const prefetchUseAssetServiceGetAssetEvents = ( sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }), }); /** diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 60bcb9de9c390..216c165ae330f 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -218,6 +218,8 @@ export const useAssetServiceGetAssetAlias = < * @param data.sourceTaskId * @param data.sourceRunId * @param data.sourceMapIndex + * @param data.timestampGte + * @param data.timestampLte * @returns AssetEventCollectionResponse Successful Response * @throws ApiError */ @@ -235,6 +237,8 @@ export const useAssetServiceGetAssetEvents = < sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }: { assetId?: number; limit?: number; @@ -244,6 +248,8 @@ export const useAssetServiceGetAssetEvents = < sourceMapIndex?: number; sourceRunId?: string; sourceTaskId?: string; + timestampGte?: string; + timestampLte?: string; } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, @@ -259,6 +265,8 @@ export const useAssetServiceGetAssetEvents = < sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }, queryKey, ), @@ -272,6 +280,8 @@ export const useAssetServiceGetAssetEvents = < sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }) as TData, ...options, }); diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index f29fb594e2990..1f27a15dd0121 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -193,6 +193,8 @@ export const useAssetServiceGetAssetAliasSuspense = < * @param data.sourceTaskId * @param data.sourceRunId * @param data.sourceMapIndex + * @param data.timestampGte + * @param data.timestampLte * @returns AssetEventCollectionResponse Successful Response * @throws ApiError */ @@ -210,6 +212,8 @@ export const useAssetServiceGetAssetEventsSuspense = < sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }: { assetId?: number; limit?: number; @@ -219,6 +223,8 @@ export const useAssetServiceGetAssetEventsSuspense = < sourceMapIndex?: number; sourceRunId?: string; sourceTaskId?: string; + timestampGte?: string; + timestampLte?: string; } = {}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">, @@ -234,6 +240,8 @@ export const useAssetServiceGetAssetEventsSuspense = < sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }, queryKey, ), @@ -247,6 +255,8 @@ export const useAssetServiceGetAssetEventsSuspense = < sourceMapIndex, sourceRunId, sourceTaskId, + timestampGte, + timestampLte, }) as TData, ...options, }); diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index be4dd239cbe3e..f9273c1293fa1 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -315,6 +315,8 @@ export class AssetService { * @param data.sourceTaskId * @param data.sourceRunId * @param data.sourceMapIndex + * @param data.timestampGte + * @param data.timestampLte * @returns AssetEventCollectionResponse Successful Response * @throws ApiError */ @@ -333,6 +335,8 @@ export class AssetService { source_task_id: data.sourceTaskId, source_run_id: data.sourceRunId, source_map_index: data.sourceMapIndex, + timestampGte: data.timestampGte, + timestampLte: data.timestampLte, }, errors: { 401: "Unauthorized", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1e9112db378ef..c178f78608e30 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1357,6 +1357,8 @@ export type GetAssetEventsData = { sourceMapIndex?: number | null; sourceRunId?: string | null; sourceTaskId?: string | null; + timestampGte?: string | null; + timestampLte?: string | null; }; export type GetAssetEventsResponse = AssetEventCollectionResponse; diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index 46c769640a8f3..563fbac961986 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -17,7 +17,7 @@ from __future__ import annotations from collections.abc import Generator -from datetime import datetime +from datetime import datetime, timedelta from unittest import mock import pytest @@ -104,7 +104,7 @@ def _create_provided_asset_alias(session, asset_alias: AssetAliasModel) -> None: session.commit() -def _create_assets_events(session, num: int = 2) -> None: +def _create_assets_events(session, num: int = 2, varying_timestamps=False) -> None: assets_events = [ AssetEvent( id=i, @@ -113,7 +113,7 @@ def _create_assets_events(session, num: int = 2) -> None: source_task_id="source_task_id", source_dag_id="source_dag_id", source_run_id=f"source_run_id_{i}", - timestamp=DEFAULT_DATE, + timestamp=DEFAULT_DATE + timedelta(days=i - 1) if varying_timestamps else DEFAULT_DATE, ) for i in range(1, 1 + num) ] @@ -204,8 +204,8 @@ def create_provided_asset(self, session, asset: AssetModel): _create_provided_asset(session=session, asset=asset) @provide_session - def create_assets_events(self, session, num: int = 2): - _create_assets_events(session=session, num=num) + def create_assets_events(self, session, num: int = 2, varying_timestamps: bool = False): + _create_assets_events(session=session, num=num, varying_timestamps=varying_timestamps) @provide_session def create_assets_events_with_sensitive_extra(self, session, num: int = 2): @@ -607,6 +607,58 @@ def test_filtering(self, test_client, params, total_entries, session): assert response.status_code == 200 assert response.json()["total_entries"] == total_entries + @pytest.mark.parametrize( + "params, expected_ids", + [ + # Test Case 1: Filtering with both timestamp_gte and timestamp_lte set to the same date + ( + { + "timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + "timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + }, + [1], # expected_ids for events exactly on DEFAULT_DATE + ), + # Test Case 2: Filtering events greater than or equal to a certain timestamp and less than or equal to another + ( + { + "timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + "timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=1)), + }, + [1, 2], # expected_ids for events within the date range + ), + # Test Case 3: timestamp_gte later than timestamp_lte with no events in range + ( + { + "timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=1)), + "timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE - timedelta(days=1)), + }, + [], # expected_ids for events outside the range + ), + # Test Case 4: timestamp_gte earlier than timestamp_lte, allowing events within the range + ( + { + "timestamp_gte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=1)), + "timestamp_lte": from_datetime_to_zulu_without_ms(DEFAULT_DATE + timedelta(days=2)), + }, + [2, 3], # expected_ids for events within the date range + ), + ], + ) + def test_filter_by_timestamp_gte_and_lte(self, test_client, params, expected_ids, session): + # Create sample assets and asset events with specified timestamps + self.create_assets() + self.create_assets_events(num=3, varying_timestamps=True) + self.create_dag_run() + self.create_asset_dag_run() + + # Test with both timestamp_gte and timestamp_lte filters + response = test_client.get("/public/assets/events", params=params) + + assert response.status_code == 200 + asset_event_ids = [asset_event["id"] for asset_event in response.json()["asset_events"]] + + assert asset_event_ids == expected_ids + def test_order_by_raises_400_for_invalid_attr(self, test_client, session): response = test_client.get("/public/assets/events?order_by=fake")