Skip to content

Commit

Permalink
feat: Terminator
Browse files Browse the repository at this point in the history
  • Loading branch information
phi-friday committed Feb 29, 2024
1 parent e88ddea commit a06a06c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/timeout_executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from timeout_executor.const import SUBPROCESS_COMMAND, TIMEOUT_EXECUTOR_INPUT_FILE
from timeout_executor.result import AsyncResult
from timeout_executor.terminate import Terminator

__all__ = ["apply_func", "delay_func"]

Expand Down Expand Up @@ -55,14 +56,16 @@ def apply(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[T]:
file.write(input_args_as_bytes)

command = self._command()
terminator = Terminator(self._timeout)
process = subprocess.Popen(
command, # noqa: S603
env={TIMEOUT_EXECUTOR_INPUT_FILE: input_file.as_posix()},
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return AsyncResult(process, input_file, output_file, self._timeout)
terminator.process = process
return AsyncResult(process, terminator, input_file, output_file, self._timeout)

async def delay(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[T]:
input_file, output_file = self._create_temp_files()
Expand All @@ -74,14 +77,16 @@ async def delay(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[T]:
await file.write(input_args_as_bytes)

command = self._command()
terminator = Terminator(self._timeout)
process = subprocess.Popen( # noqa: ASYNC101
command, # noqa: S603
env={TIMEOUT_EXECUTOR_INPUT_FILE: input_file.as_posix()},
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return AsyncResult(process, input_file, output_file, self._timeout)
terminator.process = process
return AsyncResult(process, terminator, input_file, output_file, self._timeout)


@overload
Expand Down
9 changes: 8 additions & 1 deletion src/timeout_executor/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
if TYPE_CHECKING:
from pathlib import Path

from timeout_executor.terminate import Terminator

__all__ = ["AsyncResult"]

T = TypeVar("T", infer_variance=True)
Expand All @@ -26,14 +28,16 @@ class AsyncResult(Generic[T]):

_result: Any

def __init__(
def __init__( # noqa: PLR0913
self,
process: subprocess.Popen,
terminator: Terminator,
input_file: Path | anyio.Path,
output_file: Path | anyio.Path,
timeout: float,
) -> None:
self._process = process
self._terminator = terminator
self._timeout = timeout
self._result = SENTINEL

Expand Down Expand Up @@ -87,6 +91,9 @@ async def _load_output(self) -> T:
if self._process.returncode is None:
raise RuntimeError("process is running")

if self._terminator.is_active:
raise TimeoutError(self._timeout)

if not await self._output.exists():
raise FileNotFoundError(self._output)

Expand Down
56 changes: 56 additions & 0 deletions src/timeout_executor/terminate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from __future__ import annotations

import subprocess
import threading
from contextlib import suppress

__all__ = []


class Terminator:
_process: subprocess.Popen | None

def __init__(self, timeout: float) -> None:
self._process = None
self._timeout = timeout
self._is_active = False

@property
def process(self) -> subprocess.Popen:
if self._process is None:
raise AttributeError("there is no process")
return self._process

@process.setter
def process(self, process: subprocess.Popen) -> None:
if self._process is not None:
raise AttributeError("already has process")
self._process = process
self._start()

@property
def is_active(self) -> bool:
return self._is_active

@is_active.setter
def is_active(self, value: bool) -> None:
self._is_active = value

def _start(self) -> None:
self.thread = threading.Thread(
target=terminate, args=(self._timeout, self.process, self)
)
self.thread.daemon = True
self.thread.start()


def terminate(
timeout: float, process: subprocess.Popen, terminator: Terminator
) -> None:
try:
with suppress(TimeoutError, subprocess.TimeoutExpired):
process.wait(timeout)
finally:
if process.returncode is None:
process.terminate()
terminator.is_active = True

0 comments on commit a06a06c

Please sign in to comment.