Skip to content

Commit

Permalink
Update core / additional failure path tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Oct 17, 2023
1 parent 713c847 commit abe36c3
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 52 deletions.
77 changes: 43 additions & 34 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ async def run_update(
try:
if defn.validator is not None:
# Run the validator
await self._inbound.handle_update_validator(handler_input)
with self._as_read_only():
await self._inbound.handle_update_validator(handler_input)

# Accept the update
command.update_response.accepted.SetInParent()
Expand All @@ -474,7 +475,6 @@ async def run_update(
job.protocol_instance_id
)
command.update_response.completed.CopyFrom(result_payloads[0])
# TODO: Dedupe exception handling if it makes sense
except (Exception, asyncio.CancelledError) as err:
logger.debug(
f"Update raised failure with run ID {self._info.run_id}",
Expand All @@ -485,21 +485,21 @@ async def run_update(
err = temporalio.exceptions.CancelledError(
f"Cancellation raised within update {err}"
)
if isinstance(err, temporalio.exceptions.FailureError):
# All other failure errors fail the update
if command is None:
command = self._add_command()
command.update_response.protocol_instance_id = (
job.protocol_instance_id
)
self._failure_converter.to_failure(
err,
self._payload_converter,
command.update_response.rejected.cause,
)
else:
# All other exceptions fail the task
# Read-only issues during validation should fail the task
if isinstance(err, temporalio.workflow.ReadOnlyContextError):
self._current_activation_error = err
return
# All other errors fail the update
if command is None:
command = self._add_command()
command.update_response.protocol_instance_id = (
job.protocol_instance_id
)
self._failure_converter.to_failure(
err,
self._payload_converter,
command.update_response.rejected.cause,
)
except BaseException as err:
# During tear down, generator exit and no-runtime exceptions can appear
if not self._deleting:
Expand Down
2 changes: 1 addition & 1 deletion temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ def _apply_to_class(
raise ValueError("Class already contains workflow definition")
issues: List[str] = []

# Collect run fn and all signal/query fns
# Collect run fn and all signal/query/update fns
members = inspect.getmembers(cls)
run_fn: Optional[Callable[..., Awaitable[Any]]] = None
seen_run_attr = False
Expand Down
57 changes: 57 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3506,6 +3506,9 @@ async def test_workflow_buffered_metrics(client: Client):
)


bad_validator_fail_ct = 0


@workflow.defn
class UpdateHandlersWorkflow:
def __init__(self) -> None:
Expand Down Expand Up @@ -3548,6 +3551,20 @@ async def runs_activity(self, name: str) -> str:
await act
return "done"

@workflow.update
async def bad_validator(self) -> str:
return "done"

@bad_validator.validator
def bad_validator_validator(self) -> None:
global bad_validator_fail_ct
# Run a command which should not be allowed the first few tries, then "fix" it as if new code was deployed
if bad_validator_fail_ct < 2:
bad_validator_fail_ct += 1
workflow.start_activity(
say_hello, "boo", schedule_to_close_timeout=timedelta(seconds=5)
)

# @workflow.signal
# def set_signal_handler(self, signal_name: str) -> None:
# def new_handler(arg: str) -> None:
Expand Down Expand Up @@ -3644,3 +3661,43 @@ async def test_workflow_update_handlers_unhappy(client: Client):
with pytest.raises(WorkflowUpdateFailedError) as err:
await handle.update(UpdateHandlersWorkflow.runs_activity, "foo")
assert isinstance(err.value.cause, CancelledError)

# Incorrect args for handler
with pytest.raises(WorkflowUpdateFailedError) as err:
await handle.update("last_event", args=[121, "badarg"])
assert isinstance(err.value.cause, ApplicationError)
assert (
"UpdateHandlersWorkflow.last_event_validator() takes 2 positional arguments but 3 were given"
== err.value.cause.message
)

# Un-deserializeable nonsense
with pytest.raises(WorkflowUpdateFailedError) as err:
await handle.update(
"last_event",
arg=RawValue(
payload=Payload(
metadata={"encoding": b"u-dont-know-me"}, data=b"enchi-cat"
)
),
)
assert isinstance(err.value.cause, ApplicationError)
assert "Failed decoding arguments" == err.value.cause.message


async def test_workflow_update_command_in_validator(client: Client):
# Need to not sandbox so behavior of validator can change based on global
async with new_worker(
client, UpdateHandlersWorkflow, workflow_runner=UnsandboxedWorkflowRunner()
) as worker:
handle = await client.start_workflow(
UpdateHandlersWorkflow.run,
id=f"update-handlers-command-in-validator-{uuid.uuid4()}",
task_queue=worker.task_queue,
task_timeout=timedelta(seconds=1),
)

# This will produce a WFT failure which will eventually resolve and then this
# update will return
res = await handle.update(UpdateHandlersWorkflow.bad_validator)
assert res == "done"

0 comments on commit abe36c3

Please sign in to comment.