diff --git a/airflow/api_fastapi/common/parameters.py b/airflow/api_fastapi/common/parameters.py index c46557508f3e1..cfed2d5e4fc75 100644 --- a/airflow/api_fastapi/common/parameters.py +++ b/airflow/api_fastapi/common/parameters.py @@ -232,6 +232,7 @@ class FilterOptionEnum(Enum): IN = "in" NOT_IN = "not_in" ANY_EQUAL = "any_eq" + IS_NONE = "is_none" class FilterParam(BaseParam[T]): @@ -250,7 +251,7 @@ def __init__( self.filter_option: FilterOptionEnum = filter_option def to_orm(self, select: Select) -> Select: - if isinstance(self.value, list) and not self.value and self.skip_none: + if isinstance(self.value, (list, str)) and not self.value and self.skip_none: return select if self.value is None and self.skip_none: return select @@ -279,6 +280,13 @@ def to_orm(self, select: Select) -> Select: return select.where(self.attribute > self.value) if self.filter_option == FilterOptionEnum.GREATER_THAN_EQUAL: return select.where(self.attribute >= self.value) + if self.filter_option == FilterOptionEnum.IS_NONE: + if self.value is None: + return select + if self.value is False: + return select.where(self.attribute.is_not(None)) + if self.value is True: + return select.where(self.attribute.is_(None)) raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}") def depends(self, *args: Any, **kwargs: Any) -> Self: diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 0cce64152674a..0bfe7f6fe631b 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -259,6 +259,71 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /ui/backfills: + get: + tags: + - Backfill + summary: List Backfills + operationId: list_backfills + parameters: + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 100 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + - name: order_by + in: query + required: false + schema: + type: string + default: id + title: Order By + - name: dag_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id + - name: active + in: query + required: false + schema: + anyOf: + - type: boolean + - type: 'null' + title: Active + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/BackfillCollectionResponse' + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/assets: get: tags: diff --git a/airflow/api_fastapi/core_api/routes/ui/__init__.py b/airflow/api_fastapi/core_api/routes/ui/__init__.py index 2b22cc541206b..3d9850ba37e70 100644 --- a/airflow/api_fastapi/core_api/routes/ui/__init__.py +++ b/airflow/api_fastapi/core_api/routes/ui/__init__.py @@ -18,6 +18,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.routes.ui.assets import assets_router +from airflow.api_fastapi.core_api.routes.ui.backfills import backfills_router from airflow.api_fastapi.core_api.routes.ui.config import config_router from airflow.api_fastapi.core_api.routes.ui.dags import dags_router from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router @@ -30,3 +31,4 @@ ui_router.include_router(dags_router) ui_router.include_router(dashboard_router) ui_router.include_router(structure_router) +ui_router.include_router(backfills_router) diff --git a/airflow/api_fastapi/core_api/routes/ui/backfills.py b/airflow/api_fastapi/core_api/routes/ui/backfills.py new file mode 100644 index 0000000000000..a749cdd6cfcd3 --- /dev/null +++ b/airflow/api_fastapi/core_api/routes/ui/backfills.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Annotated + +from fastapi import Depends, status +from sqlalchemy import select + +from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.parameters import ( + FilterOptionEnum, + FilterParam, + QueryLimit, + QueryOffset, + SortParam, + filter_param_factory, +) +from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.core_api.datamodels.backfills import BackfillCollectionResponse +from airflow.api_fastapi.core_api.openapi.exceptions import ( + create_openapi_http_exception_doc, +) +from airflow.models.backfill import Backfill + +backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") + + +@backfills_router.get( + path="", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), +) +def list_backfills( + limit: QueryLimit, + offset: QueryOffset, + order_by: Annotated[ + SortParam, + Depends(SortParam(["id"], Backfill).dynamic_depends()), + ], + session: SessionDep, + dag_id: Annotated[FilterParam[str | None], Depends(filter_param_factory(Backfill.dag_id, str | None))], + active: Annotated[ + FilterParam[bool | None], + Depends(filter_param_factory(Backfill.completed_at, bool | None, FilterOptionEnum.IS_NONE, "active")), + ], +) -> BackfillCollectionResponse: + select_stmt, total_entries = paginated_select( + statement=select(Backfill), + filters=[dag_id, active], + order_by=order_by, + offset=offset, + limit=limit, + session=session, + ) + backfills = session.scalars(select_stmt) + return BackfillCollectionResponse( + backfills=backfills, + total_entries=total_entries, + ) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index f6c42d0fc14a4..be897ab52a52c 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -411,6 +411,34 @@ export type BackfillServiceListBackfillsQueryResult< export const useBackfillServiceListBackfillsKey = "BackfillServiceListBackfills"; export const UseBackfillServiceListBackfillsKeyFn = ( + { + active, + dagId, + limit, + offset, + orderBy, + }: { + active?: boolean; + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: Array, +) => [ + useBackfillServiceListBackfillsKey, + ...(queryKey ?? [{ active, dagId, limit, offset, orderBy }]), +]; +export type BackfillServiceListBackfills1DefaultResponse = Awaited< + ReturnType +>; +export type BackfillServiceListBackfills1QueryResult< + TData = BackfillServiceListBackfills1DefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useBackfillServiceListBackfills1Key = + "BackfillServiceListBackfills1"; +export const UseBackfillServiceListBackfills1KeyFn = ( { dagId, limit, @@ -424,7 +452,7 @@ export const UseBackfillServiceListBackfillsKeyFn = ( }, queryKey?: Array, ) => [ - useBackfillServiceListBackfillsKey, + useBackfillServiceListBackfills1Key, ...(queryKey ?? [{ dagId, limit, offset, orderBy }]), ]; export type BackfillServiceGetBackfillDefaultResponse = Awaited< diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 44632737a95ce..8b80ae7c08df6 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -524,14 +524,52 @@ export const prefetchUseStructureServiceStructureData = ( /** * List Backfills * @param data The data for the request. - * @param data.dagId * @param data.limit * @param data.offset * @param data.orderBy + * @param data.dagId + * @param data.active * @returns BackfillCollectionResponse Successful Response * @throws ApiError */ export const prefetchUseBackfillServiceListBackfills = ( + queryClient: QueryClient, + { + active, + dagId, + limit, + offset, + orderBy, + }: { + active?: boolean; + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + } = {}, +) => + queryClient.prefetchQuery({ + queryKey: Common.UseBackfillServiceListBackfillsKeyFn({ + active, + dagId, + limit, + offset, + orderBy, + }), + queryFn: () => + BackfillService.listBackfills({ active, dagId, limit, offset, orderBy }), + }); +/** + * List Backfills + * @param data The data for the request. + * @param data.dagId + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns BackfillCollectionResponse Successful Response + * @throws ApiError + */ +export const prefetchUseBackfillServiceListBackfills1 = ( queryClient: QueryClient, { dagId, @@ -546,14 +584,14 @@ export const prefetchUseBackfillServiceListBackfills = ( }, ) => queryClient.prefetchQuery({ - queryKey: Common.UseBackfillServiceListBackfillsKeyFn({ + queryKey: Common.UseBackfillServiceListBackfills1KeyFn({ dagId, limit, offset, orderBy, }), queryFn: () => - BackfillService.listBackfills({ dagId, limit, offset, orderBy }), + BackfillService.listBackfills1({ dagId, limit, offset, orderBy }), }); /** * Get Backfill diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index df298a9d13580..83404e4bb3494 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -652,10 +652,11 @@ export const useStructureServiceStructureData = < /** * List Backfills * @param data The data for the request. - * @param data.dagId * @param data.limit * @param data.offset * @param data.orderBy + * @param data.dagId + * @param data.active * @returns BackfillCollectionResponse Successful Response * @throws ApiError */ @@ -663,6 +664,52 @@ export const useBackfillServiceListBackfills = < TData = Common.BackfillServiceListBackfillsDefaultResponse, TError = unknown, TQueryKey extends Array = unknown[], +>( + { + active, + dagId, + limit, + offset, + orderBy, + }: { + active?: boolean; + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseBackfillServiceListBackfillsKeyFn( + { active, dagId, limit, offset, orderBy }, + queryKey, + ), + queryFn: () => + BackfillService.listBackfills({ + active, + dagId, + limit, + offset, + orderBy, + }) as TData, + ...options, + }); +/** + * List Backfills + * @param data The data for the request. + * @param data.dagId + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns BackfillCollectionResponse Successful Response + * @throws ApiError + */ +export const useBackfillServiceListBackfills1 = < + TData = Common.BackfillServiceListBackfills1DefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], >( { dagId, @@ -679,12 +726,17 @@ export const useBackfillServiceListBackfills = < options?: Omit, "queryKey" | "queryFn">, ) => useQuery({ - queryKey: Common.UseBackfillServiceListBackfillsKeyFn( + queryKey: Common.UseBackfillServiceListBackfills1KeyFn( { dagId, limit, offset, orderBy }, queryKey, ), queryFn: () => - BackfillService.listBackfills({ dagId, limit, offset, orderBy }) as TData, + BackfillService.listBackfills1({ + dagId, + limit, + offset, + orderBy, + }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 9168880d8b5c5..4508a23b5e7b3 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -627,10 +627,11 @@ export const useStructureServiceStructureDataSuspense = < /** * List Backfills * @param data The data for the request. - * @param data.dagId * @param data.limit * @param data.offset * @param data.orderBy + * @param data.dagId + * @param data.active * @returns BackfillCollectionResponse Successful Response * @throws ApiError */ @@ -638,6 +639,52 @@ export const useBackfillServiceListBackfillsSuspense = < TData = Common.BackfillServiceListBackfillsDefaultResponse, TError = unknown, TQueryKey extends Array = unknown[], +>( + { + active, + dagId, + limit, + offset, + orderBy, + }: { + active?: boolean; + dagId?: string; + limit?: number; + offset?: number; + orderBy?: string; + } = {}, + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseBackfillServiceListBackfillsKeyFn( + { active, dagId, limit, offset, orderBy }, + queryKey, + ), + queryFn: () => + BackfillService.listBackfills({ + active, + dagId, + limit, + offset, + orderBy, + }) as TData, + ...options, + }); +/** + * List Backfills + * @param data The data for the request. + * @param data.dagId + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns BackfillCollectionResponse Successful Response + * @throws ApiError + */ +export const useBackfillServiceListBackfills1Suspense = < + TData = Common.BackfillServiceListBackfills1DefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], >( { dagId, @@ -654,12 +701,17 @@ export const useBackfillServiceListBackfillsSuspense = < options?: Omit, "queryKey" | "queryFn">, ) => useSuspenseQuery({ - queryKey: Common.UseBackfillServiceListBackfillsKeyFn( + queryKey: Common.UseBackfillServiceListBackfills1KeyFn( { dagId, limit, offset, orderBy }, queryKey, ), queryFn: () => - BackfillService.listBackfills({ dagId, limit, offset, orderBy }) as TData, + BackfillService.listBackfills1({ + dagId, + limit, + offset, + orderBy, + }) as TData, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index d93e584493d80..98ff5677360f5 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -42,6 +42,8 @@ import type { StructureDataResponse2, ListBackfillsData, ListBackfillsResponse, + ListBackfills1Data, + ListBackfills1Response, CreateBackfillData, CreateBackfillResponse, GetBackfillData, @@ -767,16 +769,47 @@ export class BackfillService { /** * List Backfills * @param data The data for the request. - * @param data.dagId * @param data.limit * @param data.offset * @param data.orderBy + * @param data.dagId + * @param data.active * @returns BackfillCollectionResponse Successful Response * @throws ApiError */ public static listBackfills( - data: ListBackfillsData, + data: ListBackfillsData = {}, ): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/ui/backfills", + query: { + limit: data.limit, + offset: data.offset, + order_by: data.orderBy, + dag_id: data.dagId, + active: data.active, + }, + errors: { + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + + /** + * List Backfills + * @param data The data for the request. + * @param data.dagId + * @param data.limit + * @param data.offset + * @param data.orderBy + * @returns BackfillCollectionResponse Successful Response + * @throws ApiError + */ + public static listBackfills1( + data: ListBackfills1Data, + ): CancelablePromise { return __request(OpenAPI, { method: "GET", url: "/public/backfills", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 1451174a6eb04..42196f1a89202 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1451,7 +1451,8 @@ export type StructureDataData = { export type StructureDataResponse2 = StructureDataResponse; export type ListBackfillsData = { - dagId: string; + active?: boolean | null; + dagId?: string | null; limit?: number; offset?: number; orderBy?: string; @@ -1459,6 +1460,15 @@ export type ListBackfillsData = { export type ListBackfillsResponse = BackfillCollectionResponse; +export type ListBackfills1Data = { + dagId: string; + limit?: number; + offset?: number; + orderBy?: string; +}; + +export type ListBackfills1Response = BackfillCollectionResponse; + export type CreateBackfillData = { requestBody: BackfillPostBody; }; @@ -2548,9 +2558,28 @@ export type $OpenApiTs = { }; }; }; - "/public/backfills": { + "/ui/backfills": { get: { req: ListBackfillsData; + res: { + /** + * Successful Response + */ + 200: BackfillCollectionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; + "/public/backfills": { + get: { + req: ListBackfills1Data; res: { /** * Successful Response diff --git a/tests/api_fastapi/core_api/routes/ui/test_backfills.py b/tests/api_fastapi/core_api/routes/ui/test_backfills.py new file mode 100644 index 0000000000000..bf405b087a21c --- /dev/null +++ b/tests/api_fastapi/core_api/routes/ui/test_backfills.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.models import DagModel +from airflow.models.backfill import Backfill +from airflow.utils import timezone +from airflow.utils.session import provide_session + +from tests_common.test_utils.db import ( + clear_db_backfills, + clear_db_dags, + clear_db_runs, + clear_db_serialized_dags, +) +from tests_common.test_utils.format_datetime import from_datetime_to_zulu + +pytestmark = pytest.mark.db_test + +DAG_ID = "test_dag" +TASK_ID = "op1" +DAG2_ID = "test_dag2" +DAG3_ID = "test_dag3" + + +def _clean_db(): + clear_db_backfills() + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + +@pytest.fixture(autouse=True) +def clean_db(): + _clean_db() + yield + _clean_db() + + +class TestBackfillEndpoint: + @provide_session + def _create_dag_models(self, *, count=3, dag_id_prefix="TEST_DAG", is_paused=False, session=None): + dags = [] + for num in range(1, count + 1): + dag_model = DagModel( + dag_id=f"{dag_id_prefix}_{num}", + fileloc=f"/tmp/dag_{num}.py", + is_active=True, + timetable_summary="0 0 * * *", + is_paused=is_paused, + ) + session.add(dag_model) + dags.append(dag_model) + return dags + + +class TestListBackfills(TestBackfillEndpoint): + @pytest.mark.parametrize( + "test_params, response_params, total_entries", + [ + ({}, ["backfill1", "backfill2", "backfill3"], 3), + ({"active": True}, ["backfill2", "backfill3"], 2), + ({"active": False}, ["backfill1"], 1), + ({"dag_id": "", "active": True}, ["backfill2", "backfill3"], 2), + ({"dag_id": "", "active": False}, ["backfill1"], 1), + ({"dag_id": ""}, ["backfill1", "backfill2", "backfill3"], 3), + ({"dag_id": "TEST_DAG_1", "active": True}, [], 0), + ({"dag_id": "TEST_DAG_1", "active": False}, ["backfill1"], 1), + ({"dag_id": "TEST_DAG_1"}, ["backfill1"], 1), + ], + ) + def test_list_backfill(self, test_params, response_params, total_entries, test_client, session): + dags = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + completed_at = timezone.utcnow() + backfill0 = Backfill( + dag_id=dags[0].dag_id, from_date=from_date, to_date=to_date, completed_at=completed_at + ) + backfill1 = Backfill(dag_id=dags[1].dag_id, from_date=from_date, to_date=to_date) + backfill2 = Backfill(dag_id=dags[2].dag_id, from_date=from_date, to_date=to_date, is_paused=True) + backfills = [backfill0, backfill1, backfill2] + session.add_all(backfills) + session.commit() + backfill_responses = { + "backfill1": { + "completed_at": from_datetime_to_zulu(completed_at), + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": {}, + "from_date": from_datetime_to_zulu(from_date), + "id": backfills[0].id, + "is_paused": False, + "reprocess_behavior": "none", + "max_active_runs": 10, + "to_date": from_datetime_to_zulu(to_date), + "updated_at": mock.ANY, + }, + "backfill2": { + "completed_at": None, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_2", + "dag_run_conf": {}, + "from_date": from_datetime_to_zulu(from_date), + "id": backfills[1].id, + "is_paused": False, + "reprocess_behavior": "none", + "max_active_runs": 10, + "to_date": from_datetime_to_zulu(to_date), + "updated_at": mock.ANY, + }, + "backfill3": { + "completed_at": None, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_3", + "dag_run_conf": {}, + "from_date": from_datetime_to_zulu(from_date), + "id": backfills[2].id, + "is_paused": True, + "reprocess_behavior": "none", + "max_active_runs": 10, + "to_date": from_datetime_to_zulu(to_date), + "updated_at": mock.ANY, + }, + } + expected_response = [] + for backfill in response_params: + expected_response.append(backfill_responses[backfill]) + response = test_client.get("/ui/backfills", params=test_params) + assert response.status_code == 200 + assert response.json() == { + "backfills": expected_response, + "total_entries": total_entries, + }