Skip to content

Commit

Permalink
Add ability to list active backfills both across cluster and per-dag (a…
Browse files Browse the repository at this point in the history
…pache#44624)

* Add ability to list active backfills both across cluster and per-dag (UI-only API)

* Add ability to list active backfills both across cluster and per-dag (UI-only API)

* Resolving PR comments

* add alias

* use filter_param_factory, change parameter name

* use filter_param_factory, change parameter name

---------

Co-authored-by: Sneha Prabhu <[email protected]>
  • Loading branch information
prabhusneha and Sneha Prabhu authored Dec 11, 2024
1 parent f0ec10e commit 1eb683b
Show file tree
Hide file tree
Showing 11 changed files with 547 additions and 15 deletions.
10 changes: 9 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class FilterOptionEnum(Enum):
IN = "in"
NOT_IN = "not_in"
ANY_EQUAL = "any_eq"
IS_NONE = "is_none"


class FilterParam(BaseParam[T]):
Expand All @@ -250,7 +251,7 @@ def __init__(
self.filter_option: FilterOptionEnum = filter_option

def to_orm(self, select: Select) -> Select:
if isinstance(self.value, list) and not self.value and self.skip_none:
if isinstance(self.value, (list, str)) and not self.value and self.skip_none:
return select
if self.value is None and self.skip_none:
return select
Expand Down Expand Up @@ -279,6 +280,13 @@ def to_orm(self, select: Select) -> Select:
return select.where(self.attribute > self.value)
if self.filter_option == FilterOptionEnum.GREATER_THAN_EQUAL:
return select.where(self.attribute >= self.value)
if self.filter_option == FilterOptionEnum.IS_NONE:
if self.value is None:
return select
if self.value is False:
return select.where(self.attribute.is_not(None))
if self.value is True:
return select.where(self.attribute.is_(None))
raise ValueError(f"Invalid filter option {self.filter_option} for value {self.value}")

def depends(self, *args: Any, **kwargs: Any) -> Self:
Expand Down
65 changes: 65 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,71 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/ui/backfills:
get:
tags:
- Backfill
summary: List Backfills
operationId: list_backfills
parameters:
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: id
title: Order By
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: active
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
title: Active
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/BackfillCollectionResponse'
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets:
get:
tags:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.routes.ui.assets import assets_router
from airflow.api_fastapi.core_api.routes.ui.backfills import backfills_router
from airflow.api_fastapi.core_api.routes.ui.config import config_router
from airflow.api_fastapi.core_api.routes.ui.dags import dags_router
from airflow.api_fastapi.core_api.routes.ui.dashboard import dashboard_router
Expand All @@ -30,3 +31,4 @@
ui_router.include_router(dags_router)
ui_router.include_router(dashboard_router)
ui_router.include_router(structure_router)
ui_router.include_router(backfills_router)
73 changes: 73 additions & 0 deletions airflow/api_fastapi/core_api/routes/ui/backfills.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Annotated

from fastapi import Depends, status
from sqlalchemy import select

from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
FilterParam,
QueryLimit,
QueryOffset,
SortParam,
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.backfills import BackfillCollectionResponse
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.models.backfill import Backfill

backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")


@backfills_router.get(
path="",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def list_backfills(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(SortParam(["id"], Backfill).dynamic_depends()),
],
session: SessionDep,
dag_id: Annotated[FilterParam[str | None], Depends(filter_param_factory(Backfill.dag_id, str | None))],
active: Annotated[
FilterParam[bool | None],
Depends(filter_param_factory(Backfill.completed_at, bool | None, FilterOptionEnum.IS_NONE, "active")),
],
) -> BackfillCollectionResponse:
select_stmt, total_entries = paginated_select(
statement=select(Backfill),
filters=[dag_id, active],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
backfills = session.scalars(select_stmt)
return BackfillCollectionResponse(
backfills=backfills,
total_entries=total_entries,
)
30 changes: 29 additions & 1 deletion airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,34 @@ export type BackfillServiceListBackfillsQueryResult<
export const useBackfillServiceListBackfillsKey =
"BackfillServiceListBackfills";
export const UseBackfillServiceListBackfillsKeyFn = (
{
active,
dagId,
limit,
offset,
orderBy,
}: {
active?: boolean;
dagId?: string;
limit?: number;
offset?: number;
orderBy?: string;
} = {},
queryKey?: Array<unknown>,
) => [
useBackfillServiceListBackfillsKey,
...(queryKey ?? [{ active, dagId, limit, offset, orderBy }]),
];
export type BackfillServiceListBackfills1DefaultResponse = Awaited<
ReturnType<typeof BackfillService.listBackfills1>
>;
export type BackfillServiceListBackfills1QueryResult<
TData = BackfillServiceListBackfills1DefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useBackfillServiceListBackfills1Key =
"BackfillServiceListBackfills1";
export const UseBackfillServiceListBackfills1KeyFn = (
{
dagId,
limit,
Expand All @@ -424,7 +452,7 @@ export const UseBackfillServiceListBackfillsKeyFn = (
},
queryKey?: Array<unknown>,
) => [
useBackfillServiceListBackfillsKey,
useBackfillServiceListBackfills1Key,
...(queryKey ?? [{ dagId, limit, offset, orderBy }]),
];
export type BackfillServiceGetBackfillDefaultResponse = Awaited<
Expand Down
44 changes: 41 additions & 3 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,14 +524,52 @@ export const prefetchUseStructureServiceStructureData = (
/**
* List Backfills
* @param data The data for the request.
* @param data.dagId
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.dagId
* @param data.active
* @returns BackfillCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseBackfillServiceListBackfills = (
queryClient: QueryClient,
{
active,
dagId,
limit,
offset,
orderBy,
}: {
active?: boolean;
dagId?: string;
limit?: number;
offset?: number;
orderBy?: string;
} = {},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseBackfillServiceListBackfillsKeyFn({
active,
dagId,
limit,
offset,
orderBy,
}),
queryFn: () =>
BackfillService.listBackfills({ active, dagId, limit, offset, orderBy }),
});
/**
* List Backfills
* @param data The data for the request.
* @param data.dagId
* @param data.limit
* @param data.offset
* @param data.orderBy
* @returns BackfillCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseBackfillServiceListBackfills1 = (
queryClient: QueryClient,
{
dagId,
Expand All @@ -546,14 +584,14 @@ export const prefetchUseBackfillServiceListBackfills = (
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseBackfillServiceListBackfillsKeyFn({
queryKey: Common.UseBackfillServiceListBackfills1KeyFn({
dagId,
limit,
offset,
orderBy,
}),
queryFn: () =>
BackfillService.listBackfills({ dagId, limit, offset, orderBy }),
BackfillService.listBackfills1({ dagId, limit, offset, orderBy }),
});
/**
* Get Backfill
Expand Down
58 changes: 55 additions & 3 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -652,17 +652,64 @@ export const useStructureServiceStructureData = <
/**
* List Backfills
* @param data The data for the request.
* @param data.dagId
* @param data.limit
* @param data.offset
* @param data.orderBy
* @param data.dagId
* @param data.active
* @returns BackfillCollectionResponse Successful Response
* @throws ApiError
*/
export const useBackfillServiceListBackfills = <
TData = Common.BackfillServiceListBackfillsDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
active,
dagId,
limit,
offset,
orderBy,
}: {
active?: boolean;
dagId?: string;
limit?: number;
offset?: number;
orderBy?: string;
} = {},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseBackfillServiceListBackfillsKeyFn(
{ active, dagId, limit, offset, orderBy },
queryKey,
),
queryFn: () =>
BackfillService.listBackfills({
active,
dagId,
limit,
offset,
orderBy,
}) as TData,
...options,
});
/**
* List Backfills
* @param data The data for the request.
* @param data.dagId
* @param data.limit
* @param data.offset
* @param data.orderBy
* @returns BackfillCollectionResponse Successful Response
* @throws ApiError
*/
export const useBackfillServiceListBackfills1 = <
TData = Common.BackfillServiceListBackfills1DefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
Expand All @@ -679,12 +726,17 @@ export const useBackfillServiceListBackfills = <
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseBackfillServiceListBackfillsKeyFn(
queryKey: Common.UseBackfillServiceListBackfills1KeyFn(
{ dagId, limit, offset, orderBy },
queryKey,
),
queryFn: () =>
BackfillService.listBackfills({ dagId, limit, offset, orderBy }) as TData,
BackfillService.listBackfills1({
dagId,
limit,
offset,
orderBy,
}) as TData,
...options,
});
/**
Expand Down
Loading

0 comments on commit 1eb683b

Please sign in to comment.