From 393045d59343bd46df3120b8a33d912723c03d89 Mon Sep 17 00:00:00 2001 From: pdoerner <122412190+pdoerner@users.noreply.github.com> Date: Thu, 10 Oct 2024 09:35:04 -0700 Subject: [PATCH] Test server support for bidi links (#2258) * Test server support for bidi links * typo * license * feedback * link validation * describe fields * link validation --- .../internal/testservice/LinkConverter.java | 129 ++++++++ .../internal/testservice/StateMachines.java | 58 +++- .../TestWorkflowMutableStateImpl.java | 105 ++++++- .../testservice/LinkConverterTest.java | 278 ++++++++++++++++++ .../functional/NexusWorkflowTest.java | 18 ++ 5 files changed, 573 insertions(+), 15 deletions(-) create mode 100644 temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java create mode 100644 temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java new file mode 100644 index 000000000..fac2fe174 --- /dev/null +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.testservice; + +import io.temporal.api.common.v1.Link; +import io.temporal.api.enums.v1.EventType; +import java.net.URI; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.StringTokenizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LinkConverter { + + private static final Logger log = LoggerFactory.getLogger(StateMachines.class); + + private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history"; + + public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) { + try { + String url = + String.format( + linkPathFormat, + URLEncoder.encode(we.getNamespace(), StandardCharsets.UTF_8.toString()), + URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString()), + URLEncoder.encode(we.getRunId(), StandardCharsets.UTF_8.toString())); + + if (we.hasEventRef()) { + url += "?"; + if (we.getEventRef().getEventId() > 0) { + url += "eventID=" + we.getEventRef().getEventId() + "&"; + } + url += + "eventType=" + + URLEncoder.encode( + we.getEventRef().getEventType().name(), StandardCharsets.UTF_8.toString()) + + "&"; + url += "referenceType=EventReference"; + } + + return io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl(url) + .setType(we.getDescriptorForType().getFullName()) + .build(); + } catch (Exception e) { + log.error("Failed to encode Nexus link URL", e); + } + return null; + } + + public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusLink) { + Link.Builder link = Link.newBuilder(); + try { + URI uri = new URI(nexusLink.getUrl()); + + if (!uri.getScheme().equals("temporal")) { + log.error("Failed to parse Nexus link URL: invalid scheme: {}", uri.getScheme()); + return null; + } + + StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/"); + if (!st.nextToken().equals("namespaces")) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.nextToken().equals("workflows")) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + String workflowID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + String runID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString()); + if (!st.hasMoreTokens() || !st.nextToken().equals("history")) { + log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath()); + return null; + } + + Link.WorkflowEvent.Builder we = + Link.WorkflowEvent.newBuilder() + .setNamespace(namespace) + .setWorkflowId(workflowID) + .setRunId(runID); + + if (uri.getQuery() != null) { + Link.WorkflowEvent.EventReference.Builder eventRef = + Link.WorkflowEvent.EventReference.newBuilder(); + String query = URLDecoder.decode(uri.getQuery(), StandardCharsets.UTF_8.toString()); + st = new StringTokenizer(query, "&"); + while (st.hasMoreTokens()) { + String[] param = st.nextToken().split("="); + switch (param[0]) { + case "eventID": + eventRef.setEventId(Long.parseLong(param[1])); + continue; + case "eventType": + eventRef.setEventType(EventType.valueOf(param[1])); + } + } + we.setEventRef(eventRef); + link.setWorkflowEvent(we); + } + } catch (Exception e) { + // Swallow un-parsable links since they are not critical to processing + log.error("Failed to parse Nexus link URL", e); + return null; + } + return link.build(); + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index 480c99ceb..69541eae6 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -20,6 +20,7 @@ package io.temporal.internal.testservice; +import static io.temporal.internal.testservice.LinkConverter.*; import static io.temporal.internal.testservice.StateMachines.Action.CANCEL; import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE; import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW; @@ -60,6 +61,7 @@ import io.temporal.api.failure.v1.TimeoutFailureInfo; import io.temporal.api.history.v1.*; import io.temporal.api.nexus.v1.*; +import io.temporal.api.nexus.v1.Link; import io.temporal.api.protocol.v1.Message; import io.temporal.api.query.v1.WorkflowQueryResult; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; @@ -347,11 +349,13 @@ static final class NexusOperationData { RetryPolicy retryPolicy = defaultNexusRetryPolicy(); long scheduledEventId = NO_EVENT_ID; + Timestamp cancelRequestedTime; TestServiceRetryState retryState; - long lastAttemptCompleteTime; + boolean isBackingOff = false; Duration nextBackoffInterval; - long nextAttemptScheduleTime; + Timestamp lastAttemptCompleteTime; + Timestamp nextAttemptScheduleTime; String identity; public NexusOperationData(Endpoint endpoint) { @@ -685,6 +689,18 @@ private static void scheduleNexusOperation( NexusOperationRef ref = new NexusOperationRef(ctx.getExecutionId(), scheduledEventId); NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false); + Link link = + workflowEventToNexusLink( + io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder() + .setNamespace(ctx.getNamespace()) + .setWorkflowId(ctx.getExecution().getWorkflowId()) + .setRunId(ctx.getExecution().getRunId()) + .setEventRef( + io.temporal.api.common.v1.Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(scheduledEventId) + .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED)) + .build()); + PollNexusTaskQueueResponse.Builder pollResponse = PollNexusTaskQueueResponse.newBuilder() .setTaskToken(taskToken.toBytes()) @@ -697,6 +713,7 @@ private static void scheduleNexusOperation( .setService(attr.getService()) .setOperation(attr.getOperation()) .setPayload(attr.getInput()) + .addLinks(link) .setCallback("http://test-env/operations") // The test server uses this to lookup the operation .putCallbackHeader( @@ -725,15 +742,24 @@ private static void startNexusOperation( NexusOperationData data, StartOperationResponse.Async resp, long notUsed) { - ctx.addEvent( + HistoryEvent.Builder event = HistoryEvent.newBuilder() .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED) .setNexusOperationStartedEventAttributes( NexusOperationStartedEventAttributes.newBuilder() .setOperationId(resp.getOperationId()) .setScheduledEventId(data.scheduledEventId) - .setRequestId(data.scheduledEvent.getRequestId())) - .build()); + .setRequestId(data.scheduledEvent.getRequestId())); + + for (Link l : resp.getLinksList()) { + if (!l.getType() + .equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) { + continue; + } + event.addLinks(nexusLinkToWorkflowEvent(l)); + } + + ctx.addEvent(event.build()); ctx.onCommit(historySize -> data.operationId = resp.getOperationId()); } @@ -846,7 +872,10 @@ private static RetryState attemptNexusOperationRetry( ctx.onCommit( (historySize) -> { data.retryState = nextAttempt; - data.nextAttemptScheduleTime = ctx.currentTime().getSeconds(); + data.isBackingOff = true; + data.lastAttemptCompleteTime = ctx.currentTime(); + data.nextAttemptScheduleTime = + Timestamps.add(ProtobufTimeUtils.getCurrentProtoTime(), data.nextBackoffInterval); task.setTaskToken( new NexusTaskToken( ctx.getExecutionId(), @@ -899,7 +928,12 @@ private static void requestCancelNexusOperation( // Test server only supports worker targets, so just push directly to Nexus task queue without // invoking Nexus client. ctx.addNexusTask(cancelTask); - ctx.onCommit(historySize -> data.nexusTask = cancelTask); + ctx.onCommit( + historySize -> { + data.nexusTask = cancelTask; + data.cancelRequestedTime = ctx.currentTime(); + data.isBackingOff = false; + }); } private static void reportNexusOperationCancellation( @@ -1238,12 +1272,14 @@ private static void startWorkflow( a.setParentWorkflowNamespace(parentExecutionId.getNamespace()); a.setParentWorkflowExecution(parentExecutionId.getExecution()); } - HistoryEvent event = + HistoryEvent.Builder event = HistoryEvent.newBuilder() .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED) - .setWorkflowExecutionStartedEventAttributes(a) - .build(); - ctx.addEvent(event); + .setWorkflowExecutionStartedEventAttributes(a); + if (request.getLinksCount() > 0) { + event.addAllLinks(request.getLinksList()); + } + ctx.addEvent(event.build()); } private static void completeWorkflow( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 2e122054e..1ff929294 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -250,9 +250,65 @@ private StartWorkflowExecutionRequest.Builder validateStartWorkflowExecutionRequ if (request.hasRetryPolicy()) { request.setRetryPolicy(validateAndOverrideRetryPolicy(request.getRetryPolicy())); } + + validateLinks(request.getLinksList()); + return request; } + private void validateLinks(List links) { + if (links == null || links.isEmpty()) { + return; + } + + if (links.size() > 10) { + throw Status.INVALID_ARGUMENT + .withDescription( + String.format( + "cannot attach more than %d links per request, got %d", 10, links.size())) + .asRuntimeException(); + } + + for (Link l : links) { + if (l.getSerializedSize() > 4000) { + throw Status.INVALID_ARGUMENT + .withDescription( + String.format( + "link exceeds allowed size of %d, got %d", 4000, l.getSerializedSize())) + .asRuntimeException(); + } + + if (l.getVariantCase() == Link.VariantCase.WORKFLOW_EVENT) { + if (l.getWorkflowEvent().getNamespace().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("workflow event link must not have an empty namespace field") + .asRuntimeException(); + } + if (l.getWorkflowEvent().getWorkflowId().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("workflow event link must not have an empty workflow ID field") + .asRuntimeException(); + } + if (l.getWorkflowEvent().getRunId().isEmpty()) { + throw Status.INVALID_ARGUMENT + .withDescription("workflow event link must not have an empty run ID field") + .asRuntimeException(); + } + if (l.getWorkflowEvent().getEventRef().getEventType() == EventType.EVENT_TYPE_UNSPECIFIED + && l.getWorkflowEvent().getEventRef().getEventId() != 0) { + throw Status.INVALID_ARGUMENT + .withDescription( + "workflow event link ref cannot have an unspecified event type and a non-zero event ID") + .asRuntimeException(); + } + } else { + throw Status.INVALID_ARGUMENT + .withDescription("unsupported link variant") + .asRuntimeException(); + } + } + } + private void update(UpdateProcedure updater) { StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); update(false, updater, stackTraceElements[2].getMethodName()); @@ -2262,6 +2318,7 @@ private void retryNexusTask(RequestContext ctx, StateMachine timerService.lockTimeSkipping( "nexusOperationRetryTimer " + operation.getData().operationId); boolean unlockTimer = false; + data.isBackingOff = false; try { data.nexusTask.setDeadline(Timestamps.add(ctx.currentTime(), data.requestTimeout)); @@ -2974,6 +3031,11 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() .setParentNamespaceId(p.getExecutionId().getNamespace()) .setParentExecution(p.getExecutionId().getExecution())); + List callbacks = + this.startRequest.getCompletionCallbacksList().stream() + .map(TestWorkflowMutableStateImpl::constructCallbackInfo) + .collect(Collectors.toList()); + List pendingActivities = this.activities.values().stream() .filter(sm -> !isTerminalState(sm.getState())) @@ -2998,6 +3060,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock() .addAllPendingActivities(pendingActivities) .addAllPendingNexusOperations(pendingNexusOperations) .addAllPendingChildren(pendingChildren) + .addAllCallbacks(callbacks) .build(); } @@ -3117,12 +3180,22 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo( .setScheduleToCloseTimeout(data.scheduledEvent.getScheduleToCloseTimeout()) .setState(convertNexusOperationState(sm.getState(), data)) .setAttempt(data.getAttempt()) - .setLastAttemptCompleteTime(Timestamps.fromMillis(data.lastAttemptCompleteTime)) - .setNextAttemptScheduleTime(Timestamps.fromMillis(data.nextAttemptScheduleTime)); + .setLastAttemptCompleteTime(data.lastAttemptCompleteTime) + .setNextAttemptScheduleTime(data.nextAttemptScheduleTime); data.retryState.getPreviousRunFailure().ifPresent(builder::setLastAttemptFailure); - // TODO(pj): support cancellation info + if (data.nexusTask.getTask().getRequest().hasCancelOperation()) { + NexusOperationCancellationInfo.Builder cancelInfo = + NexusOperationCancellationInfo.newBuilder() + .setRequestedTime(data.cancelRequestedTime) + .setState(convertNexusOperationCancellationState(sm.getState(), data)) + .setAttempt(data.getAttempt()) + .setLastAttemptCompleteTime(data.lastAttemptCompleteTime) + .setNextAttemptScheduleTime(data.nextAttemptScheduleTime); + data.retryState.getPreviousRunFailure().ifPresent(cancelInfo::setLastAttemptFailure); + builder.setCancellationInfo(cancelInfo); + } return builder.build(); } @@ -3130,7 +3203,7 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo( private static PendingNexusOperationState convertNexusOperationState( State state, NexusOperationData data) { // Terminal states have already been filtered out, so only handle pending states. - if (data.getAttempt() > 1) { + if (data.isBackingOff) { return PendingNexusOperationState.PENDING_NEXUS_OPERATION_STATE_BACKING_OFF; } switch (state) { @@ -3143,6 +3216,30 @@ private static PendingNexusOperationState convertNexusOperationState( } } + private static NexusOperationCancellationState convertNexusOperationCancellationState( + State state, NexusOperationData data) { + // Terminal states have already been filtered out, so only handle pending states. + if (data.isBackingOff) { + return NexusOperationCancellationState.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF; + } + if (state == State.INITIATED) { + return NexusOperationCancellationState.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED; + } + return NexusOperationCancellationState.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED; + } + + private static CallbackInfo constructCallbackInfo(Callback completionCallback) { + // Currently we only support completion callbacks and the test server implementation assumes + // that callbacks are always delivered successfully upon workflow completion. So we are not + // currently setting state or attempt related fields. + return CallbackInfo.newBuilder() + .setCallback(completionCallback) + .setTrigger( + CallbackInfo.Trigger.newBuilder() + .setWorkflowClosed(CallbackInfo.WorkflowClosed.getDefaultInstance())) + .build(); + } + private static void populateWorkflowExecutionInfoFromHistory( WorkflowExecutionInfo.Builder executionInfo, List fullHistory) { getStartEvent(fullHistory) diff --git a/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java b/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java new file mode 100644 index 000000000..597b34ee9 --- /dev/null +++ b/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java @@ -0,0 +1,278 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.testservice; + +import static io.temporal.internal.testservice.LinkConverter.nexusLinkToWorkflowEvent; +import static io.temporal.internal.testservice.LinkConverter.workflowEventToNexusLink; +import static org.junit.Assert.*; + +import io.temporal.api.common.v1.Link; +import io.temporal.api.enums.v1.EventType; +import org.junit.Test; + +public class LinkConverterTest { + + @Test + public void testConvertWorkflowEventToNexus_Valid() { + Link.WorkflowEvent input = + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertWorkflowEventToNexus_ValidAngle() { + Link.WorkflowEvent input = + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id>") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id%3E/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertWorkflowEventToNexus_ValidSlash() { + Link.WorkflowEvent input = + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id/") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id%2F/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertWorkflowEventToNexus_ValidEventIDMissing() { + Link.WorkflowEvent input = + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)) + .build(); + + io.temporal.api.nexus.v1.Link expected = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertNexusToWorkflowEvent_Valid() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + Link expected = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))) + .build(); + + Link actual = nexusLinkToWorkflowEvent(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertNexusToWorkflowEvent_ValidAngle() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id%3E/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + Link expected = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id>") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))) + .build(); + + Link actual = nexusLinkToWorkflowEvent(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertNexusToWorkflowEvent_ValidSlash() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id%2F/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + Link expected = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id/") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventId(1) + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))) + .build(); + + Link actual = nexusLinkToWorkflowEvent(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertNexusToWorkflowEvent_ValidEventIDMissing() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + Link expected = + Link.newBuilder() + .setWorkflowEvent( + Link.WorkflowEvent.newBuilder() + .setNamespace("ns") + .setWorkflowId("wf-id") + .setRunId("run-id") + .setEventRef( + Link.WorkflowEvent.EventReference.newBuilder() + .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))) + .build(); + + Link actual = nexusLinkToWorkflowEvent(input); + assertEquals(expected, actual); + } + + @Test + public void testConvertNexusToWorkflowEvent_InvalidScheme() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "test:///namespaces/ns/workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + assertNull(nexusLinkToWorkflowEvent(input)); + } + + @Test + public void testConvertNexusToWorkflowEvent_InvalidPathMissingHistory() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + assertNull(nexusLinkToWorkflowEvent(input)); + } + + @Test + public void testConvertNexusToWorkflowEvent_InvalidPathMissingNamespace() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces//workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + assertNull(nexusLinkToWorkflowEvent(input)); + } + + @Test + public void testConvertNexusToWorkflowEvent_InvalidEventType() { + io.temporal.api.nexus.v1.Link input = + io.temporal.api.nexus.v1.Link.newBuilder() + .setUrl( + "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=WorkflowExecution&referenceType=EventReference") + .setType("temporal.api.common.v1.Link.WorkflowEvent") + .build(); + + assertNull(nexusLinkToWorkflowEvent(input)); + } +} diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index eaafaed60..a98c8c76e 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -38,6 +38,7 @@ import io.temporal.api.workflowservice.v1.*; import io.temporal.client.WorkflowOptions; import io.temporal.client.WorkflowStub; +import io.temporal.internal.testservice.LinkConverter; import io.temporal.internal.testservice.NexusTaskToken; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.testserver.functional.common.TestWorkflows; @@ -45,6 +46,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -156,6 +158,10 @@ public void testNexusOperationAsyncCompletion() { .setTaskQueue(handlerWFTaskQueue) .setInput(Payloads.newBuilder().addPayloads(defaultInput)) .setIdentity("test") + .addAllLinks( + startReq.getStartOperation().getLinksList().stream() + .map(LinkConverter::nexusLinkToWorkflowEvent) + .collect(Collectors.toList())) .addCompletionCallbacks( Callback.newBuilder() .setNexus( @@ -233,6 +239,10 @@ public void testNexusOperationAsyncHandlerCanceled() { .setTaskQueue(handlerWFTaskQueue) .setInput(Payloads.newBuilder().addPayloads(defaultInput)) .setIdentity("test") + .addAllLinks( + startReq.getStartOperation().getLinksList().stream() + .map(LinkConverter::nexusLinkToWorkflowEvent) + .collect(Collectors.toList())) .addCompletionCallbacks( Callback.newBuilder() .setNexus( @@ -335,6 +345,10 @@ public void testNexusOperationAsyncHandlerTerminated() { .setTaskQueue(handlerWFTaskQueue) .setInput(Payloads.newBuilder().addPayloads(defaultInput)) .setIdentity("test") + .addAllLinks( + startReq.getStartOperation().getLinksList().stream() + .map(LinkConverter::nexusLinkToWorkflowEvent) + .collect(Collectors.toList())) .addCompletionCallbacks( Callback.newBuilder() .setNexus( @@ -428,6 +442,10 @@ public void testNexusOperationAsyncHandlerTimeout() { .setInput(Payloads.newBuilder().addPayloads(defaultInput)) .setWorkflowRunTimeout(Durations.fromSeconds(1)) .setIdentity("test") + .addAllLinks( + startReq.getStartOperation().getLinksList().stream() + .map(LinkConverter::nexusLinkToWorkflowEvent) + .collect(Collectors.toList())) .addCompletionCallbacks( Callback.newBuilder() .setNexus(