From 73148042f40e4efc7a725e31e485c43447725b72 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 29 May 2024 05:28:24 -0400 Subject: [PATCH] Job runner I2 --- update/job_runner_I2.py | 74 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 4 deletions(-) diff --git a/update/job_runner_I2.py b/update/job_runner_I2.py index 4d018ae7..8eed503b 100644 --- a/update/job_runner_I2.py +++ b/update/job_runner_I2.py @@ -1,8 +1,10 @@ import asyncio +from collections import OrderedDict +from dataclasses import dataclass from datetime import datetime, timedelta import logging from sys import version -from typing import Optional, TypedDict +from typing import Awaitable, Callable, Optional, TypedDict from temporalio import common, workflow, activity from temporalio.client import Client, WorkflowHandle @@ -27,6 +29,13 @@ class JobOutput(TypedDict): stderr: str +@dataclass +class Task: + input: Job + handler: Callable[["JobRunner", Job], Awaitable[JobOutput]] + output: Optional[JobOutput] = None + + @workflow.defn class JobRunner: """ @@ -34,15 +43,67 @@ class JobRunner: not before `job.after_time`. """ + def __init__(self) -> None: + self._completed = set[JobID]() + self.blocked_tasks = OrderedDict[JobID, Task]() + self.unblocked_tasks = OrderedDict[JobID, Task]() + + # Note some undesirable things: + # 1. The update handler functions have become generic enqueuers; the "real" handler functions + # are some other methods that don't have the @workflow.update decorator. + # 2. The update handler functions have to store a reference to the real handler in the queue. + # 3. The workflow `run` method is *much* more complicated and bug-prone here, compared to + # I1:WaitUntilReadyToExecuteHandler + @workflow.run async def run(self): - await workflow.wait_condition( - lambda: workflow.info().is_continue_as_new_suggested() - ) + """ + Process all tasks in the queue serially, in the main workflow coroutine. + """ + while ( + self.blocked_tasks + or self.unblocked_tasks + or not workflow.info().is_continue_as_new_suggested() + ): + await workflow.wait_condition( + lambda: bool(self.unblocked_tasks or self.blocked_tasks) + ) + while self.unblocked_tasks: + id, task = self.unblocked_tasks.popitem(last=False) + await task.handler(self, task.input) + # There are many mistakes a user will make while trying to implement this workflow. For + # example, one might think that the following line could be moved into + # `_enqueue_job_and_wait_for_result`, but that's a deadlock. + self._completed.add(id) + for id, task in list(self.blocked_tasks.items()): + if self.ready_to_execute(task.input): + self.unblocked_tasks[id] = self.blocked_tasks.pop(id) workflow.continue_as_new() + def ready_to_execute(self, job: Job) -> bool: + if not set(job["depends_on"]) <= self._completed: + return False + if after_time := job["after_time"]: + if float(after_time) > workflow.now().timestamp(): + return False + return True + + async def _enqueue_job_and_wait_for_result( + self, job: Job, handler: Callable[["JobRunner", Job], Awaitable[JobOutput]] + ) -> JobOutput: + task = Task(job, handler) + self.blocked_tasks[job["id"]] = task + await workflow.wait_condition(lambda: task.output is not None) + assert task.output + return task.output + @workflow.update async def run_shell_script_job(self, job: Job) -> JobOutput: + return await self._enqueue_job_and_wait_for_result( + job, JobRunner._actually_run_shell_script_job + ) + + async def _actually_run_shell_script_job(self, job: Job) -> JobOutput: if security_errors := await workflow.execute_activity( run_shell_script_security_linter, args=[job["run"]], @@ -56,6 +117,11 @@ async def run_shell_script_job(self, job: Job) -> JobOutput: @workflow.update async def run_python_job(self, job: Job) -> JobOutput: + return await self._enqueue_job_and_wait_for_result( + job, JobRunner._actually_run_python_job + ) + + async def _actually_run_python_job(self, job: Job) -> JobOutput: if not await workflow.execute_activity( check_python_interpreter_version, args=[job["python_interpreter_version"]],