Skip to content

Commit

Permalink
AIP-84 Get Task Instance Tries (#44301)
Browse files Browse the repository at this point in the history
* adding get_task_instance_tries code

* renaming variables

* dummy change

* changing test case

* remove print statement

* checking response in test cases

* rename variables

---------

Co-authored-by: kandharvishnuu <[email protected]>
  • Loading branch information
kandharvishnu and kandharvishnuu authored Nov 25, 2024
1 parent 1351b08 commit 19e97da
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 2 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ def get_mapped_task_instance_try_details(
)


@mark_fastapi_migration_done
@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@provide_session
def get_task_instance_tries(
Expand Down
73 changes: 73 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3983,6 +3983,63 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries:
get:
tags:
- Task Instance
summary: Get Task Instance Tries
description: Get list of task instances history.
operationId: get_task_instance_tries
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
- name: task_id
in: path
required: true
schema:
type: string
title: Task Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/TaskInstanceHistoryCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}:
get:
tags:
Expand Down Expand Up @@ -7367,6 +7424,22 @@ components:
- total_entries
title: TaskInstanceCollectionResponse
description: Task Instance Collection serializer for responses.
TaskInstanceHistoryCollectionResponse:
properties:
task_instances:
items:
$ref: '#/components/schemas/TaskInstanceHistoryResponse'
type: array
title: Task Instances
total_entries:
type: integer
title: Total Entries
type: object
required:
- task_instances
- total_entries
title: TaskInstanceHistoryCollectionResponse
description: TaskInstanceHistory Collection serializer for responses.
TaskInstanceHistoryResponse:
properties:
task_id:
Expand Down
46 changes: 44 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

from __future__ import annotations

from typing import Annotated, Literal
from typing import Annotated, Literal, cast

from fastapi import Depends, HTTPException, Request, status
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql import select
from sqlalchemy.sql import or_, select
from sqlalchemy.sql.selectable import Select

from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
Expand Down Expand Up @@ -51,6 +52,7 @@
ClearTaskInstancesBody,
TaskDependencyCollectionResponse,
TaskInstanceCollectionResponse,
TaskInstanceHistoryCollectionResponse,
TaskInstanceHistoryResponse,
TaskInstanceReferenceCollectionResponse,
TaskInstanceReferenceResponse,
Expand Down Expand Up @@ -234,6 +236,46 @@ def get_task_instance_dependencies(
return TaskDependencyCollectionResponse.model_validate({"dependencies": deps})


@task_instances_router.get(
task_instances_prefix + "/{task_id}/tries",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
)
def get_task_instance_tries(
dag_id: str,
dag_run_id: str,
task_id: str,
session: Annotated[Session, Depends(get_session)],
) -> TaskInstanceHistoryCollectionResponse:
"""Get list of task instances history."""
map_index = -1

def _query(orm_object: Base) -> Select:
query = select(orm_object).where(
orm_object.dag_id == dag_id,
orm_object.run_id == dag_run_id,
orm_object.task_id == task_id,
orm_object.map_index == map_index,
)
return query

# Exclude TaskInstance with state UP_FOR_RETRY since they have been recorded in TaskInstanceHistory
tis = session.scalars(
_query(TI).where(or_(TI.state != TaskInstanceState.UP_FOR_RETRY, TI.state.is_(None)))
).all()
task_instances = session.scalars(_query(TIH)).all() + tis

if not task_instances:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found",
)

return TaskInstanceHistoryCollectionResponse(
task_instances=cast(list[TaskInstanceHistoryResponse], task_instances),
total_entries=len(task_instances),
)


@task_instances_router.get(
task_instances_prefix + "/{task_id}/{map_index}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
Expand Down
24 changes: 24 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,30 @@ export const UseTaskInstanceServiceGetTaskInstanceDependencies1KeyFn = (
useTaskInstanceServiceGetTaskInstanceDependencies1Key,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }]),
];
export type TaskInstanceServiceGetTaskInstanceTriesDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getTaskInstanceTries>
>;
export type TaskInstanceServiceGetTaskInstanceTriesQueryResult<
TData = TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceTriesKey =
"TaskInstanceServiceGetTaskInstanceTries";
export const UseTaskInstanceServiceGetTaskInstanceTriesKeyFn = (
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
queryKey?: Array<unknown>,
) => [
useTaskInstanceServiceGetTaskInstanceTriesKey,
...(queryKey ?? [{ dagId, dagRunId, taskId }]),
];
export type TaskInstanceServiceGetMappedTaskInstanceDefaultResponse = Awaited<
ReturnType<typeof TaskInstanceService.getMappedTaskInstance>
>;
Expand Down
31 changes: 31 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,37 @@ export const prefetchUseTaskInstanceServiceGetTaskInstanceDependencies1 = (
taskId,
}),
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseTaskInstanceServiceGetTaskInstanceTries = (
queryClient: QueryClient,
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn({
dagId,
dagRunId,
taskId,
}),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({ dagId, dagRunId, taskId }),
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,46 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1 = <
}) as TData,
...options,
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetTaskInstanceTries = <
TData = Common.TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1524,6 +1524,46 @@ export const useTaskInstanceServiceGetTaskInstanceDependencies1Suspense = <
}) as TData,
...options,
});
/**
* Get Task Instance Tries
* Get list of task instances history.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
* @param data.taskId
* @returns TaskInstanceHistoryCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServiceGetTaskInstanceTriesSuspense = <
TData = Common.TaskInstanceServiceGetTaskInstanceTriesDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
dagRunId,
taskId,
}: {
dagId: string;
dagRunId: string;
taskId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseTaskInstanceServiceGetTaskInstanceTriesKeyFn(
{ dagId, dagRunId, taskId },
queryKey,
),
queryFn: () =>
TaskInstanceService.getTaskInstanceTries({
dagId,
dagRunId,
taskId,
}) as TData,
...options,
});
/**
* Get Mapped Task Instance
* Get task instance.
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3401,6 +3401,26 @@ export const $TaskInstanceCollectionResponse = {
description: "Task Instance Collection serializer for responses.",
} as const;

export const $TaskInstanceHistoryCollectionResponse = {
properties: {
task_instances: {
items: {
$ref: "#/components/schemas/TaskInstanceHistoryResponse",
},
type: "array",
title: "Task Instances",
},
total_entries: {
type: "integer",
title: "Total Entries",
},
},
type: "object",
required: ["task_instances", "total_entries"],
title: "TaskInstanceHistoryCollectionResponse",
description: "TaskInstanceHistory Collection serializer for responses.",
} as const;

export const $TaskInstanceHistoryResponse = {
properties: {
task_id: {
Expand Down
Loading

0 comments on commit 19e97da

Please sign in to comment.