From e2fe90b39b39abecb0787e28234d978cb2661a18 Mon Sep 17 00:00:00 2001 From: phi Date: Wed, 31 Jul 2024 00:49:14 +0900 Subject: [PATCH] fix: add docs --- src/timeout_executor/executor.py | 23 +++++++++++++++++++++++ src/timeout_executor/terminate.py | 26 +++++++++++++++++++++----- src/timeout_executor/types.py | 19 ++++++++++++++++++- 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/src/timeout_executor/executor.py b/src/timeout_executor/executor.py index e38fb02..02e8cd8 100644 --- a/src/timeout_executor/executor.py +++ b/src/timeout_executor/executor.py @@ -56,6 +56,7 @@ def unique_id(self) -> UUID: return self._unique_id def _create_temp_files(self) -> tuple[Path, Path]: + """create temp files for input and output""" temp_dir = Path(tempfile.gettempdir()) / "timeout_executor" temp_dir.mkdir(exist_ok=True) @@ -68,6 +69,7 @@ def _create_temp_files(self) -> tuple[Path, Path]: return input_file, output_file def _command(self, stacklevel: int = 2) -> list[str]: + """create subprocess command""" command = f'{sys.executable} -c "{SUBPROCESS_COMMAND}"' logger.debug("%r command: %s", self, command, stacklevel=stacklevel) return shlex.split(command) @@ -75,6 +77,7 @@ def _command(self, stacklevel: int = 2) -> list[str]: def _dump_args( self, output_file: Path | anyio.Path, *args: P.args, **kwargs: P.kwargs ) -> bytes: + """dump args and output file path to input file""" input_args = (self._func, args, kwargs, str(output_file)) logger.debug("%r before dump input args", self) input_args_as_bytes = cloudpickle.dumps(input_args) @@ -86,6 +89,7 @@ def _dump_args( def _create_process( self, input_file: Path | anyio.Path, stacklevel: int = 2 ) -> subprocess.Popen[str]: + """create new process""" command = self._command(stacklevel=stacklevel + 1) logger.debug("%r before create new process", self, stacklevel=stacklevel) process = subprocess.Popen( # noqa: S603 @@ -104,6 +108,7 @@ def _create_executor_args( output_file: Path | anyio.Path, terminator: Terminator[P, T], ) -> ExecutorArgs[P, T]: + """create executor args""" return ExecutorArgs( executor=self, func_name=self._func_name, @@ -119,6 +124,22 @@ def _init_process( output_file: Path | anyio.Path, stacklevel: int = 2, ) -> AsyncResult[P, T]: + """init process. + + before end process + --- + 1. create terminator + 2. create process + 3. create result container + 4. setup terminator + 5. watch terminator + + after end process + --- + 6. end process + 7. return result + 8. run terminator + """ logger.debug("%r before init process", self, stacklevel=stacklevel) executor_args_builder = partial( self._create_executor_args, input_file, output_file @@ -132,6 +153,7 @@ def _init_process( return result def apply(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[P, T]: + """run function with deadline""" input_file, output_file = self._create_temp_files() input_args_as_bytes = self._dump_args(output_file, *args, **kwargs) @@ -143,6 +165,7 @@ def apply(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[P, T]: return self._init_process(input_file, output_file) async def delay(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[P, T]: + """run function with deadline""" input_file, output_file = self._create_temp_files() input_file, output_file = anyio.Path(input_file), anyio.Path(output_file) input_args_as_bytes = self._dump_args(output_file, *args, **kwargs) diff --git a/src/timeout_executor/terminate.py b/src/timeout_executor/terminate.py index 89c9062..5a1db98 100644 --- a/src/timeout_executor/terminate.py +++ b/src/timeout_executor/terminate.py @@ -24,6 +24,11 @@ class Terminator(Callback[P, T], Generic[P, T]): + """process terminator. + + run callbacks and terminate process. + """ + _process: subprocess.Popen[str] | None _callback_thread: threading.Thread | None _terminator_thread: threading.Thread | None @@ -49,6 +54,10 @@ def executor_args(self) -> ExecutorArgs[P, T]: @property def callback_args(self) -> CallbackArgs[P, T]: + """callback args. + + will be set in executor. + """ if self._callback_args is None: raise AttributeError("there is no callback args") return self._callback_args @@ -61,12 +70,20 @@ def callback_args(self, value: CallbackArgs[P, T]) -> None: @property def callback_thread(self) -> threading.Thread: + """callback thread. + + will be set in start method. + """ if self._callback_thread is None: raise AttributeError("there is no callback thread") return self._callback_thread @property def terminator_thread(self) -> threading.Thread: + """terminator thread. + + will be set in start method. + """ if self._terminator_thread is None: raise AttributeError("there is no terminator thread") return self._terminator_thread @@ -77,13 +94,11 @@ def timeout(self) -> float: @property def is_active(self) -> bool: + """process is terminated or not.""" return self._is_active - @is_active.setter - def is_active(self, value: bool) -> None: - self._is_active = value - def start(self) -> None: + """watch process and run callbacks.""" if self._terminator_thread is not None or self._callback_thread is not None: raise PermissionError("already started") self._start_callback_thread() @@ -114,6 +129,7 @@ def _start_callback_thread(self) -> None: logger.debug("%r callback thread: %d", self, self._callback_thread.ident or -1) def close(self, name: str | None = None) -> None: + """run callbacks and terminate process.""" logger.debug("%r try to terminate process from %s", self, name or "unknown") process = self.callback_args.process if process.returncode is None: @@ -128,7 +144,7 @@ def close(self, name: str | None = None) -> None: process.pid, ) else: - self.is_active = True + self._is_active = True else: logger.warning( "%r process has no return code but cant find process :: pid: %d", diff --git a/src/timeout_executor/types.py b/src/timeout_executor/types.py index d513f2f..f6f807e 100644 --- a/src/timeout_executor/types.py +++ b/src/timeout_executor/types.py @@ -40,18 +40,32 @@ @dataclass(**_DATACLASS_FROZEN_KWARGS) class ExecutorArgs(Generic[P, T]): - """executor args""" + """executor args. + + using in result and terminator. + """ executor: Executor[P, T] + """executor self""" func_name: str + """target function name""" terminator: Terminator[P, T] + """terminator""" input_file: Path | anyio.Path + """function args input file""" output_file: Path | anyio.Path + """function result output file""" timeout: float + """timeout""" @dataclass(**_DATACLASS_NON_FROZEN_KWARGS) class State: + """process state. + + using in callback args. + """ + value: Any = field(default=None) @@ -60,8 +74,11 @@ class CallbackArgs(Generic[P, T]): """callback args""" process: subprocess.Popen[str] + """target process""" result: AsyncResult[P, T] + """process result""" state: State = field(init=False, default_factory=State) + """process state""" class Callback(ABC, Generic[P, T]):