Skip to content

Commit

Permalink
Job runner I2
Browse files Browse the repository at this point in the history
  • Loading branch information
dandavison committed May 29, 2024
1 parent 1c264c0 commit 7314804
Showing 1 changed file with 70 additions and 4 deletions.
74 changes: 70 additions & 4 deletions update/job_runner_I2.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
from sys import version
from typing import Optional, TypedDict
from typing import Awaitable, Callable, Optional, TypedDict

from temporalio import common, workflow, activity
from temporalio.client import Client, WorkflowHandle
Expand All @@ -27,22 +29,81 @@ class JobOutput(TypedDict):
stderr: str


@dataclass
class Task:
input: Job
handler: Callable[["JobRunner", Job], Awaitable[JobOutput]]
output: Optional[JobOutput] = None


@workflow.defn
class JobRunner:
"""
Jobs must be executed in order dictated by job dependency graph (see `job.depends_on`) and
not before `job.after_time`.
"""

def __init__(self) -> None:
self._completed = set[JobID]()
self.blocked_tasks = OrderedDict[JobID, Task]()
self.unblocked_tasks = OrderedDict[JobID, Task]()

# Note some undesirable things:
# 1. The update handler functions have become generic enqueuers; the "real" handler functions
# are some other methods that don't have the @workflow.update decorator.
# 2. The update handler functions have to store a reference to the real handler in the queue.
# 3. The workflow `run` method is *much* more complicated and bug-prone here, compared to
# I1:WaitUntilReadyToExecuteHandler

@workflow.run
async def run(self):
await workflow.wait_condition(
lambda: workflow.info().is_continue_as_new_suggested()
)
"""
Process all tasks in the queue serially, in the main workflow coroutine.
"""
while (
self.blocked_tasks
or self.unblocked_tasks
or not workflow.info().is_continue_as_new_suggested()
):
await workflow.wait_condition(
lambda: bool(self.unblocked_tasks or self.blocked_tasks)
)
while self.unblocked_tasks:
id, task = self.unblocked_tasks.popitem(last=False)
await task.handler(self, task.input)
# There are many mistakes a user will make while trying to implement this workflow. For
# example, one might think that the following line could be moved into
# `_enqueue_job_and_wait_for_result`, but that's a deadlock.
self._completed.add(id)
for id, task in list(self.blocked_tasks.items()):
if self.ready_to_execute(task.input):
self.unblocked_tasks[id] = self.blocked_tasks.pop(id)
workflow.continue_as_new()

def ready_to_execute(self, job: Job) -> bool:
if not set(job["depends_on"]) <= self._completed:
return False
if after_time := job["after_time"]:
if float(after_time) > workflow.now().timestamp():
return False
return True

async def _enqueue_job_and_wait_for_result(
self, job: Job, handler: Callable[["JobRunner", Job], Awaitable[JobOutput]]
) -> JobOutput:
task = Task(job, handler)
self.blocked_tasks[job["id"]] = task
await workflow.wait_condition(lambda: task.output is not None)
assert task.output
return task.output

@workflow.update
async def run_shell_script_job(self, job: Job) -> JobOutput:
return await self._enqueue_job_and_wait_for_result(
job, JobRunner._actually_run_shell_script_job
)

async def _actually_run_shell_script_job(self, job: Job) -> JobOutput:
if security_errors := await workflow.execute_activity(
run_shell_script_security_linter,
args=[job["run"]],
Expand All @@ -56,6 +117,11 @@ async def run_shell_script_job(self, job: Job) -> JobOutput:

@workflow.update
async def run_python_job(self, job: Job) -> JobOutput:
return await self._enqueue_job_and_wait_for_result(
job, JobRunner._actually_run_python_job
)

async def _actually_run_python_job(self, job: Job) -> JobOutput:
if not await workflow.execute_activity(
check_python_interpreter_version,
args=[job["python_interpreter_version"]],
Expand Down

0 comments on commit 7314804

Please sign in to comment.