diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index 3f5fcb6e6..832bdbc3d 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -28,6 +28,7 @@ import io.temporal.api.enums.v1.WorkflowExecutionStatus; import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.*; +import io.temporal.api.nexus.v1.Link; import io.temporal.api.nexus.v1.StartOperationResponse; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; import io.temporal.api.workflowservice.v1.*; @@ -116,6 +117,9 @@ void startNexusOperation( void completeNexusOperation(NexusOperationRef ref, Payload result); + void completeAsyncNexusOperation( + NexusOperationRef ref, Payload result, String operationID, Link startLink); + void failNexusOperation(NexusOperationRef ref, Failure failure); boolean validateOperationTaskToken(NexusTaskToken tt); diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 910b987e2..d77cf8313 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -21,6 +21,7 @@ package io.temporal.internal.testservice; import static io.temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage.*; +import static io.temporal.internal.common.LinkConverter.workflowEventToNexusLink; import static io.temporal.internal.testservice.CronUtils.getBackoffInterval; import static io.temporal.internal.testservice.StateMachines.*; import static io.temporal.internal.testservice.StateUtils.mergeMemo; @@ -1677,9 +1678,24 @@ private void processWorkflowCompletionCallbacks(RequestContext ctx) { log.warn("skipping non-nexus completion callback"); continue; } + String serializedRef = cb.getNexus().getHeaderOrThrow("operation-reference"); NexusOperationRef ref = NexusOperationRef.fromBytes(serializedRef.getBytes()); - service.completeNexusOperation(ref, completionEvent.get()); + + io.temporal.api.nexus.v1.Link startLink = + workflowEventToNexusLink( + Link.WorkflowEvent.newBuilder() + .setNamespace(ctx.getNamespace()) + .setWorkflowId(ctx.getExecution().getWorkflowId()) + .setRunId(ctx.getExecution().getRunId()) + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) + .build()) + .build()); + + service.completeNexusOperation( + ref, ctx.getExecution().getWorkflowId(), startLink, completionEvent.get()); } } @@ -2218,6 +2234,32 @@ public void completeNexusOperation(NexusOperationRef ref, Payload result) { }); } + @Override + public void completeAsyncNexusOperation( + NexusOperationRef ref, + Payload result, + String operationID, + io.temporal.api.nexus.v1.Link startLink) { + update( + ctx -> { + StateMachine operation = + getPendingNexusOperation(ref.getScheduledEventId()); + if (operation.getState() == State.INITIATED) { + // Received completion before start, so fabricate started event. + StartOperationResponse.Async start = + StartOperationResponse.Async.newBuilder() + .setOperationId(operationID) + .addLinks(startLink) + .build(); + operation.action(Action.START, ctx, start, 0); + } + operation.action(Action.COMPLETE, ctx, result, 0); + nexusOperations.remove(ref.getScheduledEventId()); + scheduleWorkflowTask(ctx); + ctx.unlockTimer("completeNexusOperation"); + }); + } + @Override public void failNexusOperation(NexusOperationRef ref, Failure failure) { update( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index d455d8bc1..66f71731e 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -50,10 +50,7 @@ import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes; import io.temporal.api.namespace.v1.NamespaceInfo; -import io.temporal.api.nexus.v1.HandlerError; -import io.temporal.api.nexus.v1.Request; -import io.temporal.api.nexus.v1.StartOperationResponse; -import io.temporal.api.nexus.v1.UnsuccessfulOperationError; +import io.temporal.api.nexus.v1.*; import io.temporal.api.testservice.v1.LockTimeSkippingRequest; import io.temporal.api.testservice.v1.SleepRequest; import io.temporal.api.testservice.v1.TestServiceGrpc; @@ -899,7 +896,8 @@ public void respondNexusTaskFailed( } } - public void completeNexusOperation(NexusOperationRef ref, HistoryEvent completionEvent) { + public void completeNexusOperation( + NexusOperationRef ref, String operationID, Link startLink, HistoryEvent completionEvent) { TestWorkflowMutableState target = getMutableState(ref.getExecutionId()); switch (completionEvent.getEventType()) { @@ -912,7 +910,7 @@ public void completeNexusOperation(NexusOperationRef ref, HistoryEvent completio // Nexus does not support it. Payload p = (result.getPayloadsCount() > 0) ? result.getPayloads(0) : Payload.getDefaultInstance(); - target.completeNexusOperation(ref, p); + target.completeAsyncNexusOperation(ref, p, operationID, startLink); break; case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED: Failure f = diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 9b209c02b..1a7273935 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -199,6 +199,66 @@ public void testNexusOperationAsyncCompletion() { } } + @Test + public void testNexusOperationAsyncCompletionBeforeStart() { + WorkflowStub callerStub = newWorkflowStub("TestNexusOperationAsyncCompletionWorkflow"); + WorkflowExecution callerExecution = callerStub.start(); + + // Get first WFT and respond with ScheduleNexusOperation command + PollWorkflowTaskQueueResponse callerTask = pollWorkflowTask(); + completeWorkflowTask(callerTask.getTaskToken(), newScheduleOperationCommand()); + + // Poll for Nexus task with start request but do not complete it + Request startReq; + try { + startReq = pollNexusTask().get().getRequest(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + return; + } + + // Manually start handler WF with callback + TaskQueue handlerWFTaskQueue = TaskQueue.newBuilder().setName("nexus-handler-tq").build(); + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .startWorkflowExecution( + StartWorkflowExecutionRequest.newBuilder() + .setRequestId(UUID.randomUUID().toString()) + .setNamespace(testWorkflowRule.getTestEnvironment().getNamespace()) + .setWorkflowId("TestNexusOperationAsyncHandlerWorkflow") + .setWorkflowType(WorkflowType.newBuilder().setName("EchoNexusHandlerWorkflowImpl")) + .setTaskQueue(handlerWFTaskQueue) + .setInput(Payloads.newBuilder().addPayloads(defaultInput)) + .setIdentity("test") + .addAllLinks( + startReq.getStartOperation().getLinksList().stream() + .map(LinkConverter::nexusLinkToWorkflowEvent) + .collect(Collectors.toList())) + .addCompletionCallbacks( + Callback.newBuilder() + .setNexus( + Callback.Nexus.newBuilder() + .setUrl(startReq.getStartOperation().getCallback()) + .putAllHeader(startReq.getStartOperation().getCallbackHeaderMap()))) + .build()); + + // Complete handler workflow + PollWorkflowTaskQueueResponse handlerTask = pollWorkflowTask(handlerWFTaskQueue); + completeWorkflow( + handlerTask.getTaskToken(), + Payload.newBuilder().setData(ByteString.copyFromUtf8("operation result")).build()); + + // Verify operation start and completion are recorded and triggers caller workflow progress + callerTask = pollWorkflowTask(); + testWorkflowRule.assertHistoryEvent( + callerExecution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); + testWorkflowRule.assertHistoryEvent( + callerExecution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED); + completeWorkflow(callerTask.getTaskToken()); + } + @Test public void testNexusOperationAsyncHandlerCanceled() { String operationId = UUID.randomUUID().toString();