Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message-processing design patterns #114

Draft
wants to merge 41 commits into
base: main
Choose a base branch
from

Conversation

dandavison
Copy link
Contributor

@dandavison dandavison commented May 22, 2024

This PR contains proposals for how the Python SDK could help users defer update processing, control interleaving of handler coroutines, and ensure processing is complete before workflow completion.

update/job_runner_I1.py and update/job_runner_I2.py show how users can do this themselves, with minimal SDK changes.

update/job_runner_I1_native.py and update/job_runner_I2_native.py show how the SDK could be modified to make this easier and less error-prone for users.

@dandavison dandavison force-pushed the message-processing-design-patterns branch 4 times, most recently from 2885070 to d8f4468 Compare May 22, 2024 20:22
# contain multiple yield points.


class Queue(asyncio.Queue[tuple[Arg, asyncio.Future[Result]]]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably wouldn't make a whole separate class for this nor would I extend asyncio.Queue. A simple deque or list would be fine I think (may have some wait_conditions but I think it's a bit simpler than this)

Comment on lines 52 to 55
arg, fut = await self.queue.get()
fut.set_result(await self.process_task(arg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an await self.process_task(*self.queue.get()) that doesn't return until the update has returned is best. This will get rid of the footgun. Yes this makes it two-phase: you have to tell the update its result and you have to wait for the update to return its result.

await workflow.wait_condition(lambda: hasattr(self, "queue"))
fut = asyncio.Future[Result]()
self.queue.put_nowait((arg, fut)) # Note: update validation gates enqueue
return await fut
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use case is a bit flawed. Do you care about update failing on continue as new? If you do then you wouldn't carry over queue items, if you don't then you wouldn't need the asyncio.sleep(0). As it is right now, this fails updates that are processing, but if your update activity just so happened to finish in the same task as continue as new suggested, then it doesn't fail the update.

Copy link
Member

@cretz cretz May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are three alternatives that are untested, I just typed real quick, but the ideas are there

Same as original design (CAN w/ update failures)
from collections import deque
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Optional

from temporalio import workflow

# !!!
# This is a copy of the original design, but it is flawed because it fails
# updates to the caller on CAN
# !!!


@dataclass
class MessageProcessorInput:
    pending_tasks: list[str] = field(default_factory=list)


class UpdateTask:
    def __init__(self, arg: str) -> None:
        self.arg = arg
        self.result: Optional[str] = None
        self.returned = False


@workflow.defn
class MessageProcessor:
    def __init__(self) -> None:
        self.queue: deque[UpdateTask] = deque()

    @workflow.run
    async def run(self, input: MessageProcessorInput) -> None:
        # Startup
        self.queue.extendleft([UpdateTask(arg) for arg in input.pending_tasks])

        # Process until CAN is needed
        while not workflow.info().is_continue_as_new_suggested():
            await workflow.wait_condition(lambda: len(self.queue) > 0)
            await self.process_task(self.queue.popleft())

        # CAN knowing that pending updates will fail
        workflow.continue_as_new(
            MessageProcessorInput(pending_tasks=[task.arg for task in self.queue])
        )

    @workflow.update
    async def do_task(self, arg: str) -> str:
        # Add task and wait on result
        task = UpdateTask(arg)
        try:
            self.queue.append(task)
            await workflow.wait_condition(lambda: task.result is not None)
            assert task.result
            return task.result
        finally:
            task.returned = True

    async def process_task(self, task: UpdateTask) -> None:
        task.result = await workflow.execute_activity(
            "some_activity", task.arg, start_to_close_timeout=timedelta(seconds=10)
        )
        await workflow.wait_condition(lambda: task.returned)
Update doesn't wait
from collections import deque
from datetime import timedelta

from temporalio import workflow

# !!!
# This version does not make update wait on completion
# !!!


@workflow.defn
class MessageProcessor:
    def __init__(self) -> None:
        self.queue: deque[str] = deque()

    @workflow.run
    async def run(self, queue: deque[str]) -> None:
        self.queue.extendleft(queue)
        # Process until CAN is needed
        while not workflow.info().is_continue_as_new_suggested():
            await workflow.wait_condition(lambda: len(self.queue) > 0)
            await self.process_task(self.queue.popleft())

        workflow.continue_as_new(self.queue)

    @workflow.update
    async def do_task(self, arg: str) -> None:
        # Put on queue and complete update
        self.queue.append(arg)

    async def process_task(self, arg: str) -> None:
        await workflow.execute_activity(
            "some_activity", arg, start_to_close_timeout=timedelta(seconds=10)
        )

Update must complete with result

from collections import deque
from datetime import timedelta
from typing import Optional

from temporalio import workflow

# !!!
# This version requires update to complete with result and won't CAN until after
# everything is done
# !!!


class UpdateTask:
    def __init__(self, arg: str) -> None:
        self.arg = arg
        self.result: Optional[str] = None
        self.returned = False


@workflow.defn
class MessageProcessor:
    def __init__(self) -> None:
        self.queue: deque[UpdateTask] = deque()

    @workflow.run
    async def run(self) -> None:
        # Process until CAN is needed and the queue is empty
        while not workflow.info().is_continue_as_new_suggested() or len(self.queue) > 0:
            await workflow.wait_condition(lambda: len(self.queue) > 0)
            await self.process_task(self.queue.popleft())

        # CAN knowing the queue is empty
        workflow.continue_as_new()

    @workflow.update
    async def do_task(self, arg: str) -> str:
        # Add task and wait on result
        task = UpdateTask(arg)
        try:
            self.queue.append(task)
            await workflow.wait_condition(lambda: task.result is not None)
            assert task.result
            return task.result
        finally:
            task.returned = True

    async def process_task(self, task: UpdateTask) -> None:
        task.result = await workflow.execute_activity(
            "some_activity", task.arg, start_to_close_timeout=timedelta(seconds=10)
        )
        await workflow.wait_condition(lambda: task.returned)

Copy link

@antlai-temporal antlai-temporal May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the third case, "Update must complete with result", how to guarantee that CAN eventually happens if updates keep on coming...? We probably need to reject before enqueueing (with a validator?) new requests when we are trying to drain. Otherwise, history exceeds threshold, workflow fails, updates fail, and we are back to the problem of the first case...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the third case, "Update must complete with result", how to guarantee that CAN eventually happens if updates keep on coming...?

You can't. You'd only choose this use case if you were ensured a period of idleness between tasks. You can definitely have a version that rejects updates when is_continue_as_new_suggested() is present if you wanted.

@dandavison dandavison changed the title Message processing design patterns Message-processing design patterns May 23, 2024
@dandavison dandavison force-pushed the message-processing-design-patterns branch 7 times, most recently from f0fd183 to fb92159 Compare May 29, 2024 16:46
@dandavison dandavison force-pushed the message-processing-design-patterns branch from 8d50d9c to b107e4d Compare May 29, 2024 21:36
@dandavison dandavison force-pushed the message-processing-design-patterns branch from b107e4d to be50780 Compare May 30, 2024 13:40
@dandavison dandavison force-pushed the message-processing-design-patterns branch from be50780 to 0f335b4 Compare May 30, 2024 15:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants