Skip to content

Commit

Permalink
AIP-84 Migrate patch a connection to FastAPI API (apache#43102)
Browse files Browse the repository at this point in the history
* Migrate Patch Connection endpoint to FastAPI and Include Optional parameter Field default value for backward compatibility

* Make all tests parametrized

* Make UpdateMask Annotated for Patch endpoints, update the serialisation for backward compat and fix for variables serialisation won't update val in setattr()

* Merge duplicate imports

* Revert update_mask to explode true and include significant.rst for update_mask breaking change

* Amend update_mask as array

* generalize handling of list query parameters in newsfragment

* Include password field

* Include password to response and tests, rebase and rerun pre-commit

* Fix copy&paste mistakes for method and naming

* Convert redact_password to agreed approach, rebase and pre-commits

* Include new exception doc generation

* Run pre-commit after rebase
  • Loading branch information
bugraoz93 authored Nov 6, 2024
1 parent 995cd8f commit cd75707
Show file tree
Hide file tree
Showing 13 changed files with 558 additions and 40 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def get_connections(
)


@mark_fastapi_migration_done
@security.requires_access_connection("PUT")
@provide_session
@action_logging(
Expand Down
85 changes: 75 additions & 10 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,72 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- Connection
summary: Patch Connection
description: Update a connection entry.
operationId: patch_connection
parameters:
- name: connection_id
in: path
required: true
schema:
type: string
title: Connection Id
- name: update_mask
in: query
required: false
schema:
anyOf:
- type: array
items:
type: string
- type: 'null'
title: Update Mask
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/connections/:
get:
tags:
Expand Down Expand Up @@ -4349,20 +4415,19 @@ components:
key:
type: string
title: Key
description:
value:
anyOf:
- type: string
- type: 'null'
title: Description
value:
title: Value
description:
anyOf:
- type: string
- type: 'null'
title: Value
title: Description
type: object
required:
- key
- description
- value
title: VariableBody
description: Variable serializer for bodies.
Expand All @@ -4387,21 +4452,21 @@ components:
key:
type: string
title: Key
description:
value:
anyOf:
- type: string
- type: 'null'
title: Description
value:
title: Value
description:
anyOf:
- type: string
- type: 'null'
title: Value
title: Description
type: object
required:
- key
- description
- value
- description
title: VariableResponse
description: Variable serializer for responses.
VersionInfo:
Expand Down
39 changes: 38 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from fastapi import Depends, HTTPException, status
from fastapi import Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated
Expand Down Expand Up @@ -143,3 +143,40 @@ async def post_connection(
session.add(connection)

return ConnectionResponse.model_validate(connection, from_attributes=True)


@connections_router.patch(
"/{connection_id}",
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
status.HTTP_401_UNAUTHORIZED,
status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND,
]
),
)
async def patch_connection(
connection_id: str,
patch_body: ConnectionBody,
session: Annotated[Session, Depends(get_session)],
update_mask: list[str] | None = Query(None),
) -> ConnectionResponse:
"""Update a connection entry."""
if patch_body.connection_id != connection_id:
raise HTTPException(400, "The connection_id in the request body does not match the URL parameter")

non_update_fields = {"connection_id", "conn_id"}
connection = session.scalar(select(Connection).filter_by(conn_id=connection_id).limit(1))

if connection is None:
raise HTTPException(404, f"The Connection with connection_id: `{connection_id}` was not found")

if update_mask:
data = patch_body.model_dump(include=set(update_mask) - non_update_fields)
else:
data = patch_body.model_dump(exclude=non_update_fields)

for key, val in data.items():
setattr(connection, key, val)
return ConnectionResponse.model_validate(connection, from_attributes=True)
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ async def patch_dag(
status.HTTP_400_BAD_REQUEST, "Only `is_paused` field can be updated through the REST API"
)

data = patch_body.model_dump(include=set(update_mask))
else:
update_mask = ["is_paused"]
data = patch_body.model_dump()

for attr_name in update_mask:
attr_value = getattr(patch_body, attr_name)
setattr(dag, attr_name, attr_value)
for key, val in data.items():
setattr(dag, key, val)

return DAGResponse.model_validate(dag, from_attributes=True)

Expand Down
8 changes: 5 additions & 3 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ async def patch_variable(
status.HTTP_404_NOT_FOUND, f"The Variable with key: `{variable_key}` was not found"
)
if update_mask:
data = patch_body.model_dump(include=set(update_mask) - non_update_fields)
data = patch_body.model_dump(
include=set(update_mask) - non_update_fields, by_alias=True, exclude_none=True
)
else:
data = patch_body.model_dump(exclude=non_update_fields)
data = patch_body.model_dump(exclude=non_update_fields, by_alias=True, exclude_none=True)
for key, val in data.items():
setattr(variable, key, val)
return variable
return VariableResponse.model_validate(variable, from_attributes=True)


@variables_router.post(
Expand Down
17 changes: 7 additions & 10 deletions airflow/api_fastapi/core_api/serializers/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,14 @@
from airflow.utils.log.secrets_masker import redact


class VariableBase(BaseModel):
"""Base Variable serializer."""
class VariableResponse(BaseModel):
"""Variable serializer for responses."""

model_config = ConfigDict(populate_by_name=True)

key: str
description: str | None


class VariableResponse(VariableBase):
"""Variable serializer for responses."""

val: str | None = Field(alias="value")
description: str | None

@model_validator(mode="after")
def redact_val(self) -> Self:
Expand All @@ -54,10 +49,12 @@ def redact_val(self) -> Self:
return self


class VariableBody(VariableBase):
class VariableBody(BaseModel):
"""Variable serializer for bodies."""

value: str | None
key: str
value: str | None = Field(serialization_alias="val")
description: str | None = Field(default=None)


class VariableCollectionResponse(BaseModel):
Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ export type DagServicePatchDagsMutationResult = Awaited<
export type DagServicePatchDagMutationResult = Awaited<
ReturnType<typeof DagService.patchDag>
>;
export type ConnectionServicePatchConnectionMutationResult = Awaited<
ReturnType<typeof ConnectionService.patchConnection>
>;
export type DagRunServicePatchDagRunStateMutationResult = Awaited<
ReturnType<typeof DagRunService.patchDagRunState>
>;
Expand Down
47 changes: 47 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,53 @@ export const useDagServicePatchDag = <
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Connection
* Update a connection entry.
* @param data The data for the request.
* @param data.connectionId
* @param data.requestBody
* @param data.updateMask
* @returns ConnectionResponse Successful Response
* @throws ApiError
*/
export const useConnectionServicePatchConnection = <
TData = Common.ConnectionServicePatchConnectionMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
connectionId: string;
requestBody: ConnectionBody;
updateMask?: string[];
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
connectionId: string;
requestBody: ConnectionBody;
updateMask?: string[];
},
TContext
>({
mutationFn: ({ connectionId, requestBody, updateMask }) =>
ConnectionService.patchConnection({
connectionId,
requestBody,
updateMask,
}) as unknown as Promise<TData>,
...options,
});
/**
* Patch Dag Run State
* Modify a DAG Run.
Expand Down
20 changes: 10 additions & 10 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2820,7 +2820,7 @@ export const $VariableBody = {
type: "string",
title: "Key",
},
description: {
value: {
anyOf: [
{
type: "string",
Expand All @@ -2829,9 +2829,9 @@ export const $VariableBody = {
type: "null",
},
],
title: "Description",
title: "Value",
},
value: {
description: {
anyOf: [
{
type: "string",
Expand All @@ -2840,11 +2840,11 @@ export const $VariableBody = {
type: "null",
},
],
title: "Value",
title: "Description",
},
},
type: "object",
required: ["key", "description", "value"],
required: ["key", "value"],
title: "VariableBody",
description: "Variable serializer for bodies.",
} as const;
Expand Down Expand Up @@ -2875,7 +2875,7 @@ export const $VariableResponse = {
type: "string",
title: "Key",
},
description: {
value: {
anyOf: [
{
type: "string",
Expand All @@ -2884,9 +2884,9 @@ export const $VariableResponse = {
type: "null",
},
],
title: "Description",
title: "Value",
},
value: {
description: {
anyOf: [
{
type: "string",
Expand All @@ -2895,11 +2895,11 @@ export const $VariableResponse = {
type: "null",
},
],
title: "Value",
title: "Description",
},
},
type: "object",
required: ["key", "description", "value"],
required: ["key", "value", "description"],
title: "VariableResponse",
description: "Variable serializer for responses.",
} as const;
Expand Down
Loading

0 comments on commit cd75707

Please sign in to comment.