Skip to content

Commit

Permalink
fix: add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
phi-friday committed Jul 30, 2024
1 parent f835bb8 commit e2fe90b
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
23 changes: 23 additions & 0 deletions src/timeout_executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def unique_id(self) -> UUID:
return self._unique_id

def _create_temp_files(self) -> tuple[Path, Path]:
"""create temp files for input and output"""
temp_dir = Path(tempfile.gettempdir()) / "timeout_executor"
temp_dir.mkdir(exist_ok=True)

Expand All @@ -68,13 +69,15 @@ def _create_temp_files(self) -> tuple[Path, Path]:
return input_file, output_file

def _command(self, stacklevel: int = 2) -> list[str]:
"""create subprocess command"""
command = f'{sys.executable} -c "{SUBPROCESS_COMMAND}"'
logger.debug("%r command: %s", self, command, stacklevel=stacklevel)
return shlex.split(command)

def _dump_args(
self, output_file: Path | anyio.Path, *args: P.args, **kwargs: P.kwargs
) -> bytes:
"""dump args and output file path to input file"""
input_args = (self._func, args, kwargs, str(output_file))
logger.debug("%r before dump input args", self)
input_args_as_bytes = cloudpickle.dumps(input_args)
Expand All @@ -86,6 +89,7 @@ def _dump_args(
def _create_process(
self, input_file: Path | anyio.Path, stacklevel: int = 2
) -> subprocess.Popen[str]:
"""create new process"""
command = self._command(stacklevel=stacklevel + 1)
logger.debug("%r before create new process", self, stacklevel=stacklevel)
process = subprocess.Popen( # noqa: S603
Expand All @@ -104,6 +108,7 @@ def _create_executor_args(
output_file: Path | anyio.Path,
terminator: Terminator[P, T],
) -> ExecutorArgs[P, T]:
"""create executor args"""
return ExecutorArgs(
executor=self,
func_name=self._func_name,
Expand All @@ -119,6 +124,22 @@ def _init_process(
output_file: Path | anyio.Path,
stacklevel: int = 2,
) -> AsyncResult[P, T]:
"""init process.
before end process
---
1. create terminator
2. create process
3. create result container
4. setup terminator
5. watch terminator
after end process
---
6. end process
7. return result
8. run terminator
"""
logger.debug("%r before init process", self, stacklevel=stacklevel)
executor_args_builder = partial(
self._create_executor_args, input_file, output_file
Expand All @@ -132,6 +153,7 @@ def _init_process(
return result

def apply(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[P, T]:
"""run function with deadline"""
input_file, output_file = self._create_temp_files()
input_args_as_bytes = self._dump_args(output_file, *args, **kwargs)

Expand All @@ -143,6 +165,7 @@ def apply(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[P, T]:
return self._init_process(input_file, output_file)

async def delay(self, *args: P.args, **kwargs: P.kwargs) -> AsyncResult[P, T]:
"""run function with deadline"""
input_file, output_file = self._create_temp_files()
input_file, output_file = anyio.Path(input_file), anyio.Path(output_file)
input_args_as_bytes = self._dump_args(output_file, *args, **kwargs)
Expand Down
26 changes: 21 additions & 5 deletions src/timeout_executor/terminate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@


class Terminator(Callback[P, T], Generic[P, T]):
"""process terminator.
run callbacks and terminate process.
"""

_process: subprocess.Popen[str] | None
_callback_thread: threading.Thread | None
_terminator_thread: threading.Thread | None
Expand All @@ -49,6 +54,10 @@ def executor_args(self) -> ExecutorArgs[P, T]:

@property
def callback_args(self) -> CallbackArgs[P, T]:
"""callback args.
will be set in executor.
"""
if self._callback_args is None:
raise AttributeError("there is no callback args")
return self._callback_args
Expand All @@ -61,12 +70,20 @@ def callback_args(self, value: CallbackArgs[P, T]) -> None:

@property
def callback_thread(self) -> threading.Thread:
"""callback thread.
will be set in start method.
"""
if self._callback_thread is None:
raise AttributeError("there is no callback thread")
return self._callback_thread

@property
def terminator_thread(self) -> threading.Thread:
"""terminator thread.
will be set in start method.
"""
if self._terminator_thread is None:
raise AttributeError("there is no terminator thread")
return self._terminator_thread
Expand All @@ -77,13 +94,11 @@ def timeout(self) -> float:

@property
def is_active(self) -> bool:
"""process is terminated or not."""
return self._is_active

@is_active.setter
def is_active(self, value: bool) -> None:
self._is_active = value

def start(self) -> None:
"""watch process and run callbacks."""
if self._terminator_thread is not None or self._callback_thread is not None:
raise PermissionError("already started")
self._start_callback_thread()
Expand Down Expand Up @@ -114,6 +129,7 @@ def _start_callback_thread(self) -> None:
logger.debug("%r callback thread: %d", self, self._callback_thread.ident or -1)

def close(self, name: str | None = None) -> None:
"""run callbacks and terminate process."""
logger.debug("%r try to terminate process from %s", self, name or "unknown")
process = self.callback_args.process
if process.returncode is None:
Expand All @@ -128,7 +144,7 @@ def close(self, name: str | None = None) -> None:
process.pid,
)
else:
self.is_active = True
self._is_active = True
else:
logger.warning(
"%r process has no return code but cant find process :: pid: %d",
Expand Down
19 changes: 18 additions & 1 deletion src/timeout_executor/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,32 @@

@dataclass(**_DATACLASS_FROZEN_KWARGS)
class ExecutorArgs(Generic[P, T]):
"""executor args"""
"""executor args.
using in result and terminator.
"""

executor: Executor[P, T]
"""executor self"""
func_name: str
"""target function name"""
terminator: Terminator[P, T]
"""terminator"""
input_file: Path | anyio.Path
"""function args input file"""
output_file: Path | anyio.Path
"""function result output file"""
timeout: float
"""timeout"""


@dataclass(**_DATACLASS_NON_FROZEN_KWARGS)
class State:
"""process state.
using in callback args.
"""

value: Any = field(default=None)


Expand All @@ -60,8 +74,11 @@ class CallbackArgs(Generic[P, T]):
"""callback args"""

process: subprocess.Popen[str]
"""target process"""
result: AsyncResult[P, T]
"""process result"""
state: State = field(init=False, default_factory=State)
"""process state"""


class Callback(ABC, Generic[P, T]):
Expand Down

0 comments on commit e2fe90b

Please sign in to comment.