diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index 409e74ec..f8593766 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit 409e74ec8e80ae4c1f9043e8b413b1371b65f946 +Subproject commit f859376686e46c36607ea527e9fdceec481f549d diff --git a/temporalio/bridge/src/worker.rs b/temporalio/bridge/src/worker.rs index f388d8ea..29836301 100644 --- a/temporalio/bridge/src/worker.rs +++ b/temporalio/bridge/src/worker.rs @@ -184,6 +184,13 @@ impl WorkerRef { Ok(()) } + fn replace_client(&self, client: &client::ClientRef) { + self.worker + .as_ref() + .expect("missing worker") + .replace_client(client.retry_client.clone().into_inner()); + } + fn initiate_shutdown(&self) -> PyResult<()> { let worker = self.worker.as_ref().unwrap().clone(); worker.initiate_shutdown(); diff --git a/temporalio/bridge/worker.py b/temporalio/bridge/worker.py index 842bee3c..fd44d1b1 100644 --- a/temporalio/bridge/worker.py +++ b/temporalio/bridge/worker.py @@ -132,6 +132,10 @@ def request_workflow_eviction(self, run_id: str) -> None: """Request a workflow be evicted.""" self._ref.request_workflow_eviction(run_id) + def replace_client(self, client: temporalio.bridge.client.Client) -> None: + """Replace the worker client.""" + self._ref.replace_client(client._ref) + def initiate_shutdown(self) -> None: """Start shutdown of the worker.""" self._ref.initiate_shutdown() diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 107229e5..a341c550 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -221,23 +221,8 @@ def __init__( ) interceptors = interceptors_from_client + list(interceptors) - # Extract the bridge service client. We try the service on the client - # first, then we support a worker_service_client on the client's service - # to return underlying service client we can use. - bridge_client: temporalio.service._BridgeServiceClient - if isinstance(client.service_client, temporalio.service._BridgeServiceClient): - bridge_client = client.service_client - elif hasattr(client.service_client, "worker_service_client"): - bridge_client = client.service_client.worker_service_client - if not isinstance(bridge_client, temporalio.service._BridgeServiceClient): - raise TypeError( - "Client's worker_service_client cannot be used for a worker" - ) - else: - raise TypeError( - "Client cannot be used for a worker. " - + "Use the original client's service or set worker_service_client on the wrapped service with the original service client." - ) + # Extract the bridge service client + bridge_client = _extract_bridge_client_for_worker(client) # Store the config for tracking self._config = WorkerConfig( @@ -283,7 +268,9 @@ def __init__( # Create activity and workflow worker self._activity_worker: Optional[_ActivityWorker] = None - runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default() + self._runtime = ( + bridge_client.config.runtime or temporalio.runtime.Runtime.default() + ) if activities: # Issue warning here if executor max_workers is lower than max # concurrent activities. We do this here instead of in @@ -304,7 +291,7 @@ def __init__( shared_state_manager=shared_state_manager, data_converter=client_config["data_converter"], interceptors=interceptors, - metric_meter=runtime.metric_meter, + metric_meter=self._runtime.metric_meter, ) self._workflow_worker: Optional[_WorkflowWorker] = None if workflows: @@ -321,30 +308,20 @@ def __init__( workflow_failure_exception_types=workflow_failure_exception_types, debug_mode=debug_mode, disable_eager_activity_execution=disable_eager_activity_execution, - metric_meter=runtime.metric_meter, + metric_meter=self._runtime.metric_meter, on_eviction_hook=None, disable_safe_eviction=disable_safe_workflow_eviction, ) - # We need an already connected client - # TODO(cretz): How to connect to client inside constructor here? In the - # meantime, we disallow lazy clients from being used for workers. We - # could check whether the connected client is present which means - # lazy-but-already-connected clients would work, but that is confusing - # to users that the client only works if they already made a call on it. - if bridge_client.config.lazy: - raise RuntimeError("Lazy clients cannot be used for workers") - raw_bridge_client = bridge_client._bridge_client - assert raw_bridge_client - # Create bridge worker last. We have empirically observed that if it is # created before an error is raised from the activity worker # constructor, a deadlock/hang will occur presumably while trying to # free it. # TODO(cretz): Why does this cause a test hang when an exception is # thrown after it? + assert bridge_client._bridge_client self._bridge_worker = temporalio.bridge.worker.Worker.create( - raw_bridge_client, + bridge_client._bridge_client, temporalio.bridge.worker.WorkerConfig( namespace=client.namespace, task_queue=task_queue, @@ -403,6 +380,29 @@ def task_queue(self) -> str: """Task queue this worker is on.""" return self._config["task_queue"] + @property + def client(self) -> temporalio.client.Client: + """Client currently set on the worker.""" + return self._config["client"] + + @client.setter + def client(self, value: temporalio.client.Client) -> None: + """Update the client associated with the worker. + + Changing the client will make sure the worker starts using it for the + next calls it makes. However, outstanding client calls will still + complete with the existing client. The new client cannot be "lazy" and + must be using the same runtime as the current client. + """ + bridge_client = _extract_bridge_client_for_worker(value) + if self._runtime is not bridge_client.config.runtime: + raise ValueError( + "New client is not on the same runtime as the existing client" + ) + assert bridge_client._bridge_client + self._bridge_worker.replace_client(bridge_client._bridge_client) + self._config["client"] = value + @property def is_running(self) -> bool: """Whether the worker is running. @@ -714,5 +714,37 @@ def _get_module_code(mod_name: str) -> Optional[bytes]: return None +def _extract_bridge_client_for_worker( + client: temporalio.client.Client, +) -> temporalio.service._BridgeServiceClient: + # Extract the bridge service client. We try the service on the client first, + # then we support a worker_service_client on the client's service to return + # underlying service client we can use. + bridge_client: temporalio.service._BridgeServiceClient + if isinstance(client.service_client, temporalio.service._BridgeServiceClient): + bridge_client = client.service_client + elif hasattr(client.service_client, "worker_service_client"): + bridge_client = client.service_client.worker_service_client + if not isinstance(bridge_client, temporalio.service._BridgeServiceClient): + raise TypeError( + "Client's worker_service_client cannot be used for a worker" + ) + else: + raise TypeError( + "Client cannot be used for a worker. " + + "Use the original client's service or set worker_service_client on the wrapped service with the original service client." + ) + + # We need an already connected client + # TODO(cretz): How to connect to client inside Worker constructor here? In + # the meantime, we disallow lazy clients from being used for workers. We + # could check whether the connected client is present which means + # lazy-but-already-connected clients would work, but that is confusing + # to users that the client only works if they already made a call on it. + if bridge_client.config.lazy: + raise RuntimeError("Lazy clients cannot be used for workers") + return bridge_client + + class _ShutdownRequested(RuntimeError): pass diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index f60fc55d..9005af8e 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -4506,8 +4506,68 @@ async def test_workflow_fail_on_bad_input(client: Client): await client.execute_workflow( "FailOnBadInputWorkflow", 123, - id=f"wf-{uuid}", + id=f"wf-{uuid.uuid4()}", task_queue=worker.task_queue, ) assert isinstance(err.value.cause, ApplicationError) assert "Failed decoding arguments" in err.value.cause.message + + +@workflow.defn +class TickingWorkflow: + @workflow.run + async def run(self) -> None: + # Just tick every 100ms for 10s + for _ in range(100): + await asyncio.sleep(0.1) + + +async def test_workflow_replace_worker_client(client: Client, env: WorkflowEnvironment): + if env.supports_time_skipping: + pytest.skip("Only testing against two real servers") + # We are going to start a second ephemeral server and then replace the + # client. So we will start a no-cache ticking workflow with the current + # client and confirm it has accomplished at least one task. Then we will + # start another on the other client, and confirm it gets started too. Then + # we will terminate both. We have to use a ticking workflow with only one + # poller to force a quick re-poll to recognize our client change quickly (as + # opposed to just waiting the minute for poll timeout). + async with await WorkflowEnvironment.start_local() as other_env: + # Start both workflows on different servers + task_queue = f"tq-{uuid.uuid4()}" + handle1 = await client.start_workflow( + TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue + ) + handle2 = await other_env.client.start_workflow( + TickingWorkflow.run, id=f"wf-{uuid.uuid4()}", task_queue=task_queue + ) + + async def any_task_completed(handle: WorkflowHandle) -> bool: + async for e in handle.fetch_history_events(): + if e.HasField("workflow_task_completed_event_attributes"): + return True + return False + + # Now start the worker on the first env + async with Worker( + client, + task_queue=task_queue, + workflows=[TickingWorkflow], + max_cached_workflows=0, + max_concurrent_workflow_task_polls=1, + ) as worker: + # Confirm the first ticking workflow has completed a task but not + # the second + await assert_eq_eventually(True, lambda: any_task_completed(handle1)) + assert not await any_task_completed(handle2) + + # Now replace the client, which should be used fairly quickly + # because we should have timer-done poll completions every 100ms + worker.client = other_env.client + + # Now confirm the other workflow has started + await assert_eq_eventually(True, lambda: any_task_completed(handle2)) + + # Terminate both + await handle1.terminate() + await handle2.terminate()