From 3ddc96cf042caabd95b5f3c79264285caa5d349d Mon Sep 17 00:00:00 2001 From: Steven Armstrong Date: Sat, 22 Jul 2023 18:04:48 +0200 Subject: [PATCH 1/2] when closing APIContext close pending responses before closing session Signed-off-by: Steven Armstrong --- kopf/_cogs/clients/auth.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/kopf/_cogs/clients/auth.py b/kopf/_cogs/clients/auth.py index b92d101b..3a133b46 100644 --- a/kopf/_cogs/clients/auth.py +++ b/kopf/_cogs/clients/auth.py @@ -4,7 +4,7 @@ import ssl import tempfile from contextvars import ContextVar -from typing import Any, Callable, Dict, Iterator, Mapping, Optional, TypeVar, cast +from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, TypeVar, cast import aiohttp @@ -36,13 +36,20 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: # If a context is explicitly passed, make it a simple call without re-auth. # Exceptions are escalated to a caller, which is probably wrapped itself. if 'context' in kwargs: - return await fn(*args, **kwargs) + context = kwargs['context'] + response = await fn(*args, **kwargs) + # Keep track of responses which are using this context. + context.add_response(response) + return response # Otherwise, attempt the execution with the vault credentials and re-authenticate on 401s. vault: credentials.Vault = vault_var.get() async for key, info, context in vault.extended(APIContext, 'contexts'): try: - return await fn(*args, **kwargs, context=context) + response = await fn(*args, **kwargs, context=context) + # Keep track of responses which are using this context. + context.add_response(response) + return response except errors.APIUnauthorizedError as e: await vault.invalidate(key, exc=e) @@ -74,6 +81,9 @@ class APIContext: server: str default_namespace: Optional[str] + # List of open responses. + responses: List[aiohttp.ClientResponse] + # Temporary caches of the information retrieved for and from the environment. _tempfiles: "_TempFiles" @@ -166,10 +176,32 @@ def __init__( self.server = info.server self.default_namespace = info.default_namespace + self.responses = [] + # For purging on garbage collection. self._tempfiles = tempfiles + def flush_closed_responses(self) -> None: + # There's no point keeping references to already closed responses. + self.responses[:] = [_response for _response in self.responses if not _response.closed] + + def add_response(self, response: aiohttp.ClientResponse) -> None: + # Keep track of responses so they can be closed later when the session + # is closed. + self.flush_closed_responses() + if not response.closed: + self.responses.append(response) + + def close_open_responses(self) -> None: + # Close all responses that are still open and are using this session. + for response in self.responses: + if not response.closed: + response.close() + self.responses.clear() + async def close(self) -> None: + # Close all open responses that use this session before closing the session itself. + self.close_open_responses() # Closing is triggered by `Vault._flush_caches()` -- forward it to the actual session. await self.session.close() From be20ea587fe98fab7e66f778ef6d974d042799a1 Mon Sep 17 00:00:00 2001 From: Steven Armstrong Date: Mon, 24 Jul 2023 17:32:43 +0200 Subject: [PATCH 2/2] only consider aiohttp.ClientResponse instances when collecting responses related to a session Signed-off-by: Steven Armstrong --- kopf/_cogs/clients/auth.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kopf/_cogs/clients/auth.py b/kopf/_cogs/clients/auth.py index 3a133b46..e5c13a44 100644 --- a/kopf/_cogs/clients/auth.py +++ b/kopf/_cogs/clients/auth.py @@ -38,8 +38,9 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: if 'context' in kwargs: context = kwargs['context'] response = await fn(*args, **kwargs) - # Keep track of responses which are using this context. - context.add_response(response) + if isinstance(response, aiohttp.ClientResponse): + # Keep track of responses which are using this context. + context.add_response(response) return response # Otherwise, attempt the execution with the vault credentials and re-authenticate on 401s. @@ -47,8 +48,9 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: async for key, info, context in vault.extended(APIContext, 'contexts'): try: response = await fn(*args, **kwargs, context=context) - # Keep track of responses which are using this context. - context.add_response(response) + if isinstance(response, aiohttp.ClientResponse): + # Keep track of responses which are using this context. + context.add_response(response) return response except errors.APIUnauthorizedError as e: await vault.invalidate(key, exc=e)