diff --git a/src/aiida/engine/processes/calcjobs/tasks.py b/src/aiida/engine/processes/calcjobs/tasks.py index 58647f4a70..b4bd0c7cf0 100644 --- a/src/aiida/engine/processes/calcjobs/tasks.py +++ b/src/aiida/engine/processes/calcjobs/tasks.py @@ -54,6 +54,7 @@ class PreSubmitException(Exception): # noqa: N818 """Raise in the `do_upload` coroutine when an exception is raised in `CalcJob.presubmit`.""" + async def task_upload_job(process: 'CalcJob', transport_queue: TransportQueue, cancellable: InterruptableFuture): """Transport task that will attempt to upload the files of a job calculation to the remote. @@ -154,12 +155,12 @@ async def task_submit_job(node: CalcJobNode, transport_queue: TransportQueue, ca # transport = await cancellable.with_interrupt(request) # else: # pass - + async def do_submit(): transport_request = transport_queue._transport_requests.get(authinfo.pk, None) open_transport = transport_queue._open_transports.get(authinfo.pk, None) - if open_transport is not None: # and not transport_queue._last_request_special: + if open_transport is not None: # and not transport_queue._last_request_special: transport = open_transport # transport_queue._last_request_special = True elif transport_request is None: # or transport_queue._last_request_special: diff --git a/src/aiida/engine/transports.py b/src/aiida/engine/transports.py index 4efedd84ff..3cc6c94b89 100644 --- a/src/aiida/engine/transports.py +++ b/src/aiida/engine/transports.py @@ -13,7 +13,6 @@ import contextvars import logging import traceback -from datetime import datetime from typing import TYPE_CHECKING, Awaitable, Dict, Hashable, Iterator, Optional from aiida.common import timezone @@ -32,6 +31,7 @@ def __init__(self): self.future: asyncio.Future = asyncio.Future() self.count = 0 + class TransportCloseRequest: """Information kept about close request for a transport object""" @@ -39,6 +39,7 @@ def __init__(self): self.future: asyncio.Future = asyncio.Future() self.count = 0 + class TransportQueue: """A queue to get transport objects from authinfo. This class allows clients to register their interest in a transport object which will be provided at @@ -142,15 +143,17 @@ def do_open(): else: # If the last one was a special request, wait the difference between safe_open_interval and lost - open_callback_handle = self._loop.call_later(safe_open_interval-open_timedelta, do_open, context=contextvars.Context()) + open_callback_handle = self._loop.call_later( + safe_open_interval - open_timedelta, do_open, context=contextvars.Context() + ) # open_callback_handle = self._loop.call_later(safe_open_interval, do_open, context=contextvars.Context()) - # ? This logic is implemented in `tasks.py` instead. + # ? This logic is implemented in `tasks.py` instead. # else: # transport = authinfo.get_transport() # return transport - # If transport_request is open already + # If transport_request is open already try: transport_request.count += 1 yield transport_request.future @@ -168,7 +171,6 @@ def do_open(): # Check if there are no longer any users that want the transport if transport_request.count == 0: if transport_request.future.done(): - # ? Why is all this logic in the `request_transport` method? # ? Shouldn't the logic to close a transport be outside, such that the transport is being closed # ? once it was actually used??? @@ -178,21 +180,21 @@ def do_open(): # transport_request.future.result().close() # self._last_close_time = timezone.localtime(timezone.now()) - # close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() + # close_timedelta = (timezone.localtime(timezone.now()) - self._last_open_time).total_seconds() # if close_timedelta < safe_open_interval: - # Also here logic when transport should be closed immediately, or when via call_later? - # self._last_close_time = timezone.localtime(timezone.now()) - # self._transport_requests.pop(authinfo.pk, None) - # close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) + # Also here logic when transport should be closed immediately, or when via call_later? + # self._last_close_time = timezone.localtime(timezone.now()) + # self._transport_requests.pop(authinfo.pk, None) + # close_callback_handle = self._loop.call_later(safe_open_interval, do_close, context=contextvars.Context()) # if close_timedelta > safe_open_interval: # close_callback_handle = self._loop.call_soon(do_close, context=contextvars.Context()) # self._last_close_time = timezone.localtime(timezone.now()) # self._transport_requests.pop(authinfo.pk, None) - # self._transport_requests.pop(authinfo.pk, None) - - # transport_request.transport_closer = close_callback_handle + # self._transport_requests.pop(authinfo.pk, None) + + # transport_request.transport_closer = close_callback_handle # This should be replaced with the call_later close_callback_handle invocation # transport_request.future.result().close()