Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Get Task Instance Tries #44301

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ def get_mapped_task_instance_try_details(
)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance_tries(
Expand Down
73 changes: 73 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3925,6 +3925,63 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries:
get:
tags:
- Task Instance
summary: Get Task Instance Tries
description: Get list of task instances history.
operationId: get_task_instance_tries
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceHistoryCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}:
get:
tags:
Expand Down Expand Up @@ -7301,6 +7358,22 @@ components:
- total_entries
title: TaskInstanceCollectionResponse
description: Task Instance Collection serializer for responses.
TaskInstanceHistoryCollectionResponse:
properties:
task_instances:
items:
$ref: '#/components/schemas/TaskInstanceHistoryResponse'
type: array
title: Task Instances
total_entries:
type: integer
title: Total Entries
type: object
required:
- task_instances
- total_entries
title: TaskInstanceHistoryCollectionResponse
description: TaskInstanceHistory Collection serializer for responses.
TaskInstanceHistoryResponse:
properties:
task_id:
Expand Down
46 changes: 44 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from __future__ import annotations

from typing import Annotated, Literal
from typing import Annotated, Literal, cast

from fastapi import Depends, HTTPException, Request, status
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select
from sqlalchemy.sql import or_, select
from sqlalchemy.sql.selectable import Select

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
Expand Down Expand Up @@ -51,6 +52,7 @@
ClearTaskInstancesBody,
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceHistoryCollectionResponse,
TaskInstanceHistoryResponse,
TaskInstanceReferenceCollectionResponse,
TaskInstanceReferenceResponse,
Expand Down Expand Up @@ -234,6 +236,46 @@ def get_task_instance_dependencies(
return TaskDependencyCollectionResponse.model_validate({"dependencies": deps})


@task_instances_router.get(
task_instances_prefix + "/{task_id}/tries",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def get_task_instance_tries(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
) -> TaskInstanceHistoryCollectionResponse:
"""Get list of task instances history."""
map_index = -1

def _query(orm_object: Base) -> Select:
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.map_index == map_index,
)
return query

# Exclude TaskInstance with state UP_FOR_RETRY since they have been recorded in TaskInstanceHistory
tis = session.scalars(
_query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, TI.state.is_(None)))
).all()
task_instances = session.scalars(_query(TIH)).all() + tis

if not task_instances:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found",
)

return TaskInstanceHistoryCollectionResponse(
task_instances=cast(list[TaskInstanceHistoryResponse], task_instances),
total_entries=len(task_instances),
)


@task_instances_router.get(
task_instances_prefix + "/{task_id}/{map_index}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
24 changes: 24 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,30 @@ export const UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn = (
useTaskInstanceServiceGetTaskInstanceDependencies1Key,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetTaskInstanceTriesDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getTaskInstanceTries>
>;
export type TaskInstanceServiceGetTaskInstanceTriesQueryResult<
TData = TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceTriesKey =
"TaskInstanceServiceGetTaskInstanceTries";
export const UseTaskInstanceServiceGetTaskInstanceTriesKeyFn = (
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceTriesKey,
...(queryKey ?? [{ dagId, dagRunId, taskId }]),
];
export type TaskInstanceServiceGetMappedTaskInstanceDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getMappedTaskInstance>
>;
Expand Down
31 changes: 31 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,37 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceDependencies1 = (
taskId,
}),
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetTaskInstanceTries = (
queryClient: QueryClient,
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn({
dagId,
dagRunId,
taskId,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({ dagId, dagRunId, taskId }),
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,46 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1 = <
}) as TData,
...options,
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetTaskInstanceTries = <
TData = Common.TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,46 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1Suspense = <
}) as TData,
...options,
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetTaskInstanceTriesSuspense = <
TData = Common.TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3385,6 +3385,26 @@ export const $TaskInstanceCollectionResponse = {
description: "Task Instance Collection serializer for responses.",
} as const;

export const $TaskInstanceHistoryCollectionResponse = {
properties: {
task_instances: {
items: {
$ref: "#/components/schemas/TaskInstanceHistoryResponse",
},
type: "array",
title: "Task Instances",
},
total_entries: {
type: "integer",
title: "Total Entries",
},
},
type: "object",
required: ["task_instances", "total_entries"],
title: "TaskInstanceHistoryCollectionResponse",
description: "TaskInstanceHistory Collection serializer for responses.",
} as const;

export const $TaskInstanceHistoryResponse = {
properties: {
task_id: {
Expand Down
Loading