diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java
new file mode 100644
index 000000000..fac2fe174
--- /dev/null
+++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/LinkConverter.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.internal.testservice;
+
+import io.temporal.api.common.v1.Link;
+import io.temporal.api.enums.v1.EventType;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.StringTokenizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LinkConverter {
+
+ private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
+
+ private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";
+
+ public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
+ try {
+ String url =
+ String.format(
+ linkPathFormat,
+ URLEncoder.encode(we.getNamespace(), StandardCharsets.UTF_8.toString()),
+ URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString()),
+ URLEncoder.encode(we.getRunId(), StandardCharsets.UTF_8.toString()));
+
+ if (we.hasEventRef()) {
+ url += "?";
+ if (we.getEventRef().getEventId() > 0) {
+ url += "eventID=" + we.getEventRef().getEventId() + "&";
+ }
+ url +=
+ "eventType="
+ + URLEncoder.encode(
+ we.getEventRef().getEventType().name(), StandardCharsets.UTF_8.toString())
+ + "&";
+ url += "referenceType=EventReference";
+ }
+
+ return io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(url)
+ .setType(we.getDescriptorForType().getFullName())
+ .build();
+ } catch (Exception e) {
+ log.error("Failed to encode Nexus link URL", e);
+ }
+ return null;
+ }
+
+ public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusLink) {
+ Link.Builder link = Link.newBuilder();
+ try {
+ URI uri = new URI(nexusLink.getUrl());
+
+ if (!uri.getScheme().equals("temporal")) {
+ log.error("Failed to parse Nexus link URL: invalid scheme: {}", uri.getScheme());
+ return null;
+ }
+
+ StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/");
+ if (!st.nextToken().equals("namespaces")) {
+ log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
+ return null;
+ }
+ String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
+ if (!st.nextToken().equals("workflows")) {
+ log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
+ return null;
+ }
+ String workflowID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
+ String runID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
+ if (!st.hasMoreTokens() || !st.nextToken().equals("history")) {
+ log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
+ return null;
+ }
+
+ Link.WorkflowEvent.Builder we =
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace(namespace)
+ .setWorkflowId(workflowID)
+ .setRunId(runID);
+
+ if (uri.getQuery() != null) {
+ Link.WorkflowEvent.EventReference.Builder eventRef =
+ Link.WorkflowEvent.EventReference.newBuilder();
+ String query = URLDecoder.decode(uri.getQuery(), StandardCharsets.UTF_8.toString());
+ st = new StringTokenizer(query, "&");
+ while (st.hasMoreTokens()) {
+ String[] param = st.nextToken().split("=");
+ switch (param[0]) {
+ case "eventID":
+ eventRef.setEventId(Long.parseLong(param[1]));
+ continue;
+ case "eventType":
+ eventRef.setEventType(EventType.valueOf(param[1]));
+ }
+ }
+ we.setEventRef(eventRef);
+ link.setWorkflowEvent(we);
+ }
+ } catch (Exception e) {
+ // Swallow un-parsable links since they are not critical to processing
+ log.error("Failed to parse Nexus link URL", e);
+ return null;
+ }
+ return link.build();
+ }
+}
diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
index 480c99ceb..69541eae6 100644
--- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
+++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
@@ -20,6 +20,7 @@
package io.temporal.internal.testservice;
+import static io.temporal.internal.testservice.LinkConverter.*;
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
@@ -60,6 +61,7 @@
import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.api.history.v1.*;
import io.temporal.api.nexus.v1.*;
+import io.temporal.api.nexus.v1.Link;
import io.temporal.api.protocol.v1.Message;
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
@@ -347,11 +349,13 @@ static final class NexusOperationData {
RetryPolicy retryPolicy = defaultNexusRetryPolicy();
long scheduledEventId = NO_EVENT_ID;
+ Timestamp cancelRequestedTime;
TestServiceRetryState retryState;
- long lastAttemptCompleteTime;
+ boolean isBackingOff = false;
Duration nextBackoffInterval;
- long nextAttemptScheduleTime;
+ Timestamp lastAttemptCompleteTime;
+ Timestamp nextAttemptScheduleTime;
String identity;
public NexusOperationData(Endpoint endpoint) {
@@ -685,6 +689,18 @@ private static void scheduleNexusOperation(
NexusOperationRef ref = new NexusOperationRef(ctx.getExecutionId(), scheduledEventId);
NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false);
+ Link link =
+ workflowEventToNexusLink(
+ io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder()
+ .setNamespace(ctx.getNamespace())
+ .setWorkflowId(ctx.getExecution().getWorkflowId())
+ .setRunId(ctx.getExecution().getRunId())
+ .setEventRef(
+ io.temporal.api.common.v1.Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(scheduledEventId)
+ .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED))
+ .build());
+
PollNexusTaskQueueResponse.Builder pollResponse =
PollNexusTaskQueueResponse.newBuilder()
.setTaskToken(taskToken.toBytes())
@@ -697,6 +713,7 @@ private static void scheduleNexusOperation(
.setService(attr.getService())
.setOperation(attr.getOperation())
.setPayload(attr.getInput())
+ .addLinks(link)
.setCallback("http://test-env/operations")
// The test server uses this to lookup the operation
.putCallbackHeader(
@@ -725,15 +742,24 @@ private static void startNexusOperation(
NexusOperationData data,
StartOperationResponse.Async resp,
long notUsed) {
- ctx.addEvent(
+ HistoryEvent.Builder event =
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
.setNexusOperationStartedEventAttributes(
NexusOperationStartedEventAttributes.newBuilder()
.setOperationId(resp.getOperationId())
.setScheduledEventId(data.scheduledEventId)
- .setRequestId(data.scheduledEvent.getRequestId()))
- .build());
+ .setRequestId(data.scheduledEvent.getRequestId()));
+
+ for (Link l : resp.getLinksList()) {
+ if (!l.getType()
+ .equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) {
+ continue;
+ }
+ event.addLinks(nexusLinkToWorkflowEvent(l));
+ }
+
+ ctx.addEvent(event.build());
ctx.onCommit(historySize -> data.operationId = resp.getOperationId());
}
@@ -846,7 +872,10 @@ private static RetryState attemptNexusOperationRetry(
ctx.onCommit(
(historySize) -> {
data.retryState = nextAttempt;
- data.nextAttemptScheduleTime = ctx.currentTime().getSeconds();
+ data.isBackingOff = true;
+ data.lastAttemptCompleteTime = ctx.currentTime();
+ data.nextAttemptScheduleTime =
+ Timestamps.add(ProtobufTimeUtils.getCurrentProtoTime(), data.nextBackoffInterval);
task.setTaskToken(
new NexusTaskToken(
ctx.getExecutionId(),
@@ -899,7 +928,12 @@ private static void requestCancelNexusOperation(
// Test server only supports worker targets, so just push directly to Nexus task queue without
// invoking Nexus client.
ctx.addNexusTask(cancelTask);
- ctx.onCommit(historySize -> data.nexusTask = cancelTask);
+ ctx.onCommit(
+ historySize -> {
+ data.nexusTask = cancelTask;
+ data.cancelRequestedTime = ctx.currentTime();
+ data.isBackingOff = false;
+ });
}
private static void reportNexusOperationCancellation(
@@ -1238,12 +1272,14 @@ private static void startWorkflow(
a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
a.setParentWorkflowExecution(parentExecutionId.getExecution());
}
- HistoryEvent event =
+ HistoryEvent.Builder event =
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
- .setWorkflowExecutionStartedEventAttributes(a)
- .build();
- ctx.addEvent(event);
+ .setWorkflowExecutionStartedEventAttributes(a);
+ if (request.getLinksCount() > 0) {
+ event.addAllLinks(request.getLinksList());
+ }
+ ctx.addEvent(event.build());
}
private static void completeWorkflow(
diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java
index 2e122054e..1ff929294 100644
--- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java
+++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java
@@ -250,9 +250,65 @@ private StartWorkflowExecutionRequest.Builder validateStartWorkflowExecutionRequ
if (request.hasRetryPolicy()) {
request.setRetryPolicy(validateAndOverrideRetryPolicy(request.getRetryPolicy()));
}
+
+ validateLinks(request.getLinksList());
+
return request;
}
+ private void validateLinks(List links) {
+ if (links == null || links.isEmpty()) {
+ return;
+ }
+
+ if (links.size() > 10) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription(
+ String.format(
+ "cannot attach more than %d links per request, got %d", 10, links.size()))
+ .asRuntimeException();
+ }
+
+ for (Link l : links) {
+ if (l.getSerializedSize() > 4000) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription(
+ String.format(
+ "link exceeds allowed size of %d, got %d", 4000, l.getSerializedSize()))
+ .asRuntimeException();
+ }
+
+ if (l.getVariantCase() == Link.VariantCase.WORKFLOW_EVENT) {
+ if (l.getWorkflowEvent().getNamespace().isEmpty()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("workflow event link must not have an empty namespace field")
+ .asRuntimeException();
+ }
+ if (l.getWorkflowEvent().getWorkflowId().isEmpty()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("workflow event link must not have an empty workflow ID field")
+ .asRuntimeException();
+ }
+ if (l.getWorkflowEvent().getRunId().isEmpty()) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("workflow event link must not have an empty run ID field")
+ .asRuntimeException();
+ }
+ if (l.getWorkflowEvent().getEventRef().getEventType() == EventType.EVENT_TYPE_UNSPECIFIED
+ && l.getWorkflowEvent().getEventRef().getEventId() != 0) {
+ throw Status.INVALID_ARGUMENT
+ .withDescription(
+ "workflow event link ref cannot have an unspecified event type and a non-zero event ID")
+ .asRuntimeException();
+ }
+ } else {
+ throw Status.INVALID_ARGUMENT
+ .withDescription("unsupported link variant")
+ .asRuntimeException();
+ }
+ }
+ }
+
private void update(UpdateProcedure updater) {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
update(false, updater, stackTraceElements[2].getMethodName());
@@ -2262,6 +2318,7 @@ private void retryNexusTask(RequestContext ctx, StateMachine
timerService.lockTimeSkipping(
"nexusOperationRetryTimer " + operation.getData().operationId);
boolean unlockTimer = false;
+ data.isBackingOff = false;
try {
data.nexusTask.setDeadline(Timestamps.add(ctx.currentTime(), data.requestTimeout));
@@ -2974,6 +3031,11 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
.setParentNamespaceId(p.getExecutionId().getNamespace())
.setParentExecution(p.getExecutionId().getExecution()));
+ List callbacks =
+ this.startRequest.getCompletionCallbacksList().stream()
+ .map(TestWorkflowMutableStateImpl::constructCallbackInfo)
+ .collect(Collectors.toList());
+
List pendingActivities =
this.activities.values().stream()
.filter(sm -> !isTerminalState(sm.getState()))
@@ -2998,6 +3060,7 @@ private DescribeWorkflowExecutionResponse describeWorkflowExecutionInsideLock()
.addAllPendingActivities(pendingActivities)
.addAllPendingNexusOperations(pendingNexusOperations)
.addAllPendingChildren(pendingChildren)
+ .addAllCallbacks(callbacks)
.build();
}
@@ -3117,12 +3180,22 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo(
.setScheduleToCloseTimeout(data.scheduledEvent.getScheduleToCloseTimeout())
.setState(convertNexusOperationState(sm.getState(), data))
.setAttempt(data.getAttempt())
- .setLastAttemptCompleteTime(Timestamps.fromMillis(data.lastAttemptCompleteTime))
- .setNextAttemptScheduleTime(Timestamps.fromMillis(data.nextAttemptScheduleTime));
+ .setLastAttemptCompleteTime(data.lastAttemptCompleteTime)
+ .setNextAttemptScheduleTime(data.nextAttemptScheduleTime);
data.retryState.getPreviousRunFailure().ifPresent(builder::setLastAttemptFailure);
- // TODO(pj): support cancellation info
+ if (data.nexusTask.getTask().getRequest().hasCancelOperation()) {
+ NexusOperationCancellationInfo.Builder cancelInfo =
+ NexusOperationCancellationInfo.newBuilder()
+ .setRequestedTime(data.cancelRequestedTime)
+ .setState(convertNexusOperationCancellationState(sm.getState(), data))
+ .setAttempt(data.getAttempt())
+ .setLastAttemptCompleteTime(data.lastAttemptCompleteTime)
+ .setNextAttemptScheduleTime(data.nextAttemptScheduleTime);
+ data.retryState.getPreviousRunFailure().ifPresent(cancelInfo::setLastAttemptFailure);
+ builder.setCancellationInfo(cancelInfo);
+ }
return builder.build();
}
@@ -3130,7 +3203,7 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo(
private static PendingNexusOperationState convertNexusOperationState(
State state, NexusOperationData data) {
// Terminal states have already been filtered out, so only handle pending states.
- if (data.getAttempt() > 1) {
+ if (data.isBackingOff) {
return PendingNexusOperationState.PENDING_NEXUS_OPERATION_STATE_BACKING_OFF;
}
switch (state) {
@@ -3143,6 +3216,30 @@ private static PendingNexusOperationState convertNexusOperationState(
}
}
+ private static NexusOperationCancellationState convertNexusOperationCancellationState(
+ State state, NexusOperationData data) {
+ // Terminal states have already been filtered out, so only handle pending states.
+ if (data.isBackingOff) {
+ return NexusOperationCancellationState.NEXUS_OPERATION_CANCELLATION_STATE_BACKING_OFF;
+ }
+ if (state == State.INITIATED) {
+ return NexusOperationCancellationState.NEXUS_OPERATION_CANCELLATION_STATE_SCHEDULED;
+ }
+ return NexusOperationCancellationState.NEXUS_OPERATION_CANCELLATION_STATE_UNSPECIFIED;
+ }
+
+ private static CallbackInfo constructCallbackInfo(Callback completionCallback) {
+ // Currently we only support completion callbacks and the test server implementation assumes
+ // that callbacks are always delivered successfully upon workflow completion. So we are not
+ // currently setting state or attempt related fields.
+ return CallbackInfo.newBuilder()
+ .setCallback(completionCallback)
+ .setTrigger(
+ CallbackInfo.Trigger.newBuilder()
+ .setWorkflowClosed(CallbackInfo.WorkflowClosed.getDefaultInstance()))
+ .build();
+ }
+
private static void populateWorkflowExecutionInfoFromHistory(
WorkflowExecutionInfo.Builder executionInfo, List fullHistory) {
getStartEvent(fullHistory)
diff --git a/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java b/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java
new file mode 100644
index 000000000..597b34ee9
--- /dev/null
+++ b/temporal-test-server/src/test/java/io/temporal/internal/testservice/LinkConverterTest.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
+ *
+ * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Modifications copyright (C) 2017 Uber Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this material except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.temporal.internal.testservice;
+
+import static io.temporal.internal.testservice.LinkConverter.nexusLinkToWorkflowEvent;
+import static io.temporal.internal.testservice.LinkConverter.workflowEventToNexusLink;
+import static org.junit.Assert.*;
+
+import io.temporal.api.common.v1.Link;
+import io.temporal.api.enums.v1.EventType;
+import org.junit.Test;
+
+public class LinkConverterTest {
+
+ @Test
+ public void testConvertWorkflowEventToNexus_Valid() {
+ Link.WorkflowEvent input =
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(1)
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
+ .build();
+
+ io.temporal.api.nexus.v1.Link expected =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertWorkflowEventToNexus_ValidAngle() {
+ Link.WorkflowEvent input =
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id>")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(1)
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
+ .build();
+
+ io.temporal.api.nexus.v1.Link expected =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id%3E/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertWorkflowEventToNexus_ValidSlash() {
+ Link.WorkflowEvent input =
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id/")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(1)
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
+ .build();
+
+ io.temporal.api.nexus.v1.Link expected =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id%2F/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertWorkflowEventToNexus_ValidEventIDMissing() {
+ Link.WorkflowEvent input =
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED))
+ .build();
+
+ io.temporal.api.nexus.v1.Link expected =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ io.temporal.api.nexus.v1.Link actual = workflowEventToNexusLink(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_Valid() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ Link expected =
+ Link.newBuilder()
+ .setWorkflowEvent(
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(1)
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)))
+ .build();
+
+ Link actual = nexusLinkToWorkflowEvent(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_ValidAngle() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id%3E/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ Link expected =
+ Link.newBuilder()
+ .setWorkflowEvent(
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id>")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(1)
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)))
+ .build();
+
+ Link actual = nexusLinkToWorkflowEvent(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_ValidSlash() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id%2F/run-id/history?eventID=1&eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ Link expected =
+ Link.newBuilder()
+ .setWorkflowEvent(
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id/")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventId(1)
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)))
+ .build();
+
+ Link actual = nexusLinkToWorkflowEvent(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_ValidEventIDMissing() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ Link expected =
+ Link.newBuilder()
+ .setWorkflowEvent(
+ Link.WorkflowEvent.newBuilder()
+ .setNamespace("ns")
+ .setWorkflowId("wf-id")
+ .setRunId("run-id")
+ .setEventRef(
+ Link.WorkflowEvent.EventReference.newBuilder()
+ .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)))
+ .build();
+
+ Link actual = nexusLinkToWorkflowEvent(input);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_InvalidScheme() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "test:///namespaces/ns/workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ assertNull(nexusLinkToWorkflowEvent(input));
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_InvalidPathMissingHistory() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id/run-id/?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ assertNull(nexusLinkToWorkflowEvent(input));
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_InvalidPathMissingNamespace() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces//workflows/wf-id/run-id/history?eventType=EVENT_TYPE_WORKFLOW_EXECUTION_STARTED&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ assertNull(nexusLinkToWorkflowEvent(input));
+ }
+
+ @Test
+ public void testConvertNexusToWorkflowEvent_InvalidEventType() {
+ io.temporal.api.nexus.v1.Link input =
+ io.temporal.api.nexus.v1.Link.newBuilder()
+ .setUrl(
+ "temporal:///namespaces/ns/workflows/wf-id/run-id/history?eventType=WorkflowExecution&referenceType=EventReference")
+ .setType("temporal.api.common.v1.Link.WorkflowEvent")
+ .build();
+
+ assertNull(nexusLinkToWorkflowEvent(input));
+ }
+}
diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java
index eaafaed60..a98c8c76e 100644
--- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java
+++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java
@@ -38,6 +38,7 @@
import io.temporal.api.workflowservice.v1.*;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
+import io.temporal.internal.testservice.LinkConverter;
import io.temporal.internal.testservice.NexusTaskToken;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testserver.functional.common.TestWorkflows;
@@ -45,6 +46,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -156,6 +158,10 @@ public void testNexusOperationAsyncCompletion() {
.setTaskQueue(handlerWFTaskQueue)
.setInput(Payloads.newBuilder().addPayloads(defaultInput))
.setIdentity("test")
+ .addAllLinks(
+ startReq.getStartOperation().getLinksList().stream()
+ .map(LinkConverter::nexusLinkToWorkflowEvent)
+ .collect(Collectors.toList()))
.addCompletionCallbacks(
Callback.newBuilder()
.setNexus(
@@ -233,6 +239,10 @@ public void testNexusOperationAsyncHandlerCanceled() {
.setTaskQueue(handlerWFTaskQueue)
.setInput(Payloads.newBuilder().addPayloads(defaultInput))
.setIdentity("test")
+ .addAllLinks(
+ startReq.getStartOperation().getLinksList().stream()
+ .map(LinkConverter::nexusLinkToWorkflowEvent)
+ .collect(Collectors.toList()))
.addCompletionCallbacks(
Callback.newBuilder()
.setNexus(
@@ -335,6 +345,10 @@ public void testNexusOperationAsyncHandlerTerminated() {
.setTaskQueue(handlerWFTaskQueue)
.setInput(Payloads.newBuilder().addPayloads(defaultInput))
.setIdentity("test")
+ .addAllLinks(
+ startReq.getStartOperation().getLinksList().stream()
+ .map(LinkConverter::nexusLinkToWorkflowEvent)
+ .collect(Collectors.toList()))
.addCompletionCallbacks(
Callback.newBuilder()
.setNexus(
@@ -428,6 +442,10 @@ public void testNexusOperationAsyncHandlerTimeout() {
.setInput(Payloads.newBuilder().addPayloads(defaultInput))
.setWorkflowRunTimeout(Durations.fromSeconds(1))
.setIdentity("test")
+ .addAllLinks(
+ startReq.getStartOperation().getLinksList().stream()
+ .map(LinkConverter::nexusLinkToWorkflowEvent)
+ .collect(Collectors.toList()))
.addCompletionCallbacks(
Callback.newBuilder()
.setNexus(