From b8c4b7bf76e782db6c459158c38f600fa0f6b57c Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 24 Oct 2024 07:56:11 -0700 Subject: [PATCH] Fix proto decoding in a Nexus Operation (#2281) Fix proto decoding in a Nexus Operation --- build.gradle | 2 +- .../internal/nexus/PayloadSerializer.java | 10 +- .../temporal/nexus/WorkflowRunOperation.java | 2 +- .../internal/nexus/PayloadSerializerTest.java | 40 +++++++- .../nexus/GenericListOperationTest.java | 90 ++++++++++++++++++ .../workflow/nexus/ProtoOperationTest.java | 94 +++++++++++++++++++ .../workflow/nexus/VoidOperationTest.java | 88 +++++++++++++++++ .../TestWorkflowMutableStateImpl.java | 10 +- 8 files changed, 329 insertions(+), 7 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java diff --git a/build.gradle b/build.gradle index fe11e6190..cf3659d77 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.2.0-alpha' + nexusVersion = '0.2.1-alpha' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/PayloadSerializer.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/PayloadSerializer.java index edb123bb7..ef99a1dd5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/PayloadSerializer.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/PayloadSerializer.java @@ -24,6 +24,7 @@ import io.nexusrpc.Serializer; import io.temporal.api.common.v1.Payload; import io.temporal.common.converter.DataConverter; +import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Optional; import javax.annotation.Nullable; @@ -52,7 +53,14 @@ public Content serialize(@Nullable Object o) { public @Nullable Object deserialize(Content content, Type type) { try { Payload payload = Payload.parseFrom(content.getData()); - return dataConverter.fromPayload(payload, type.getClass(), type); + if ((type instanceof Class)) { + return dataConverter.fromPayload(payload, (Class) type, type); + } else if (type instanceof ParameterizedType) { + return dataConverter.fromPayload( + payload, (Class) ((ParameterizedType) type).getRawType(), type); + } else { + throw new IllegalArgumentException("Unsupported type: " + type); + } } catch (InvalidProtocolBufferException e) { throw new RuntimeException(e); } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java index c6db1597d..96791ccfe 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperation.java @@ -73,7 +73,7 @@ public OperationStartResult start( io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink); try { OperationStartResult.Builder result = - OperationStartResult.newBuilder().setAsyncOperationId(workflowExec.getWorkflowId()); + OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId()); if (nexusLink != null) { result.addLink(nexusProtoLinkToLink(nexusLink)); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/PayloadSerializerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/PayloadSerializerTest.java index 8ab02c87c..a3c965529 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/PayloadSerializerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/PayloadSerializerTest.java @@ -20,8 +20,13 @@ package io.temporal.internal.nexus; +import com.google.common.reflect.TypeToken; +import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.converter.EncodedValuesTest; +import java.util.Collections; +import java.util.Map; import org.junit.Assert; import org.junit.Test; @@ -39,7 +44,40 @@ public void testPayload() { @Test public void testNull() { PayloadSerializer.Content content = payloadSerializer.serialize(null); - payloadSerializer.deserialize(content, String.class); Assert.assertEquals(null, payloadSerializer.deserialize(content, String.class)); } + + @Test + public void testInteger() { + PayloadSerializer.Content content = payloadSerializer.serialize(1); + Assert.assertEquals(1, payloadSerializer.deserialize(content, Integer.class)); + } + + @Test + public void testArray() { + String[] cars = {"test", "nexus", "serialization"}; + PayloadSerializer.Content content = payloadSerializer.serialize(cars); + Assert.assertArrayEquals( + cars, (String[]) payloadSerializer.deserialize(content, String[].class)); + } + + @Test + public void testHashMap() { + Map map = + Collections.singletonMap("key", new EncodedValuesTest.Pair(1, "hello")); + PayloadSerializer.Content content = payloadSerializer.serialize(map); + Map newMap = + (Map) + payloadSerializer.deserialize( + content, (new TypeToken>() {}).getType()); + Assert.assertTrue(newMap.get("key") instanceof EncodedValuesTest.Pair); + } + + @Test + public void testProto() { + WorkflowExecution exec = + WorkflowExecution.newBuilder().setWorkflowId("id").setRunId("runId").build(); + PayloadSerializer.Content content = payloadSerializer.serialize(exec); + Assert.assertEquals(exec, payloadSerializer.deserialize(content, WorkflowExecution.class)); + } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java new file mode 100644 index 000000000..1277111b5 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/GenericListOperationTest.java @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.common.converter.EncodedValuesTest; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +// Test an operation that takes and returns a List type with a non-primitive element type +public class GenericListOperationTest { + @ClassRule + public static SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void testOperation() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("hello", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName()) + .build(); + // Try to call with the typed stub + TestNexusService serviceStub = + Workflow.newNexusServiceStub(TestNexusService.class, serviceOptions); + List arg = + Collections.singletonList(new EncodedValuesTest.Pair(1, "hello")); + return serviceStub.operation(arg).get(0).getS(); + } + } + + @Service + public interface TestNexusService { + @Operation + List operation(List input); + } + + @ServiceImpl(service = TestNexusService.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler, List> + operation() { + return WorkflowClientOperationHandlers.sync( + (context, details, client, input) -> { + return input; + }); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java new file mode 100644 index 000000000..dc386d63d --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/ProtoOperationTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestWorkflows; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +// Test an operation that takes and returns a protobuf message +public class ProtoOperationTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void testDescribeWorkflowOperation() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals(testWorkflowRule.getTaskQueue(), result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + TestNexusService serviceStub = Workflow.newNexusServiceStub(TestNexusService.class); + + WorkflowExecution exec = + WorkflowExecution.newBuilder() + .setWorkflowId(Workflow.getInfo().getWorkflowId()) + .setRunId(Workflow.getInfo().getRunId()) + .build(); + return serviceStub + .describeWorkflow( + DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace(Workflow.getInfo().getNamespace()) + .setExecution(exec) + .build()) + .getExecutionConfig() + .getTaskQueue() + .getName(); + } + } + + @Service + public interface TestNexusService { + @Operation + DescribeWorkflowExecutionResponse describeWorkflow(DescribeWorkflowExecutionRequest input); + } + + @ServiceImpl(service = TestNexusService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler + describeWorkflow() { + return WorkflowClientOperationHandlers.sync( + (context, details, client, input) -> + client.getWorkflowServiceStubs().blockingStub().describeWorkflowExecution(input)); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java new file mode 100644 index 000000000..85e273c4b --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/VoidOperationTest.java @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.nexus; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.NexusServiceOptions; +import io.temporal.workflow.NexusServiceStub; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestWorkflows; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; + +// Test an operation that takes and returns a void type +public class VoidOperationTest { + @ClassRule + public static SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestNexus.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @Test + public void testVoidOperation() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + Assert.assertEquals("success", result); + } + + public static class TestNexus implements TestWorkflows.TestWorkflow1 { + @Override + public String execute(String input) { + NexusServiceOptions serviceOptions = + NexusServiceOptions.newBuilder() + .setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName()) + .build(); + // Try to call with the typed stub + TestNexusService serviceStub = + Workflow.newNexusServiceStub(TestNexusService.class, serviceOptions); + serviceStub.noop(); + // Try to call with an untyped stub + NexusServiceStub untypedServiceStub = + Workflow.newUntypedNexusServiceStub("TestNexusService", serviceOptions); + untypedServiceStub.execute("noop", Void.class, null); + untypedServiceStub.execute("noop", Void.class, Void.class, null); + return "success"; + } + } + + @Service + public interface TestNexusService { + @Operation + void noop(); + } + + @ServiceImpl(service = TestNexusService.class) + public static class TestNexusServiceImpl { + @OperationImpl + public OperationHandler noop() { + return WorkflowClientOperationHandlers.sync((context, details, client, input) -> null); + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index f47c5bd53..165998ff5 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -3180,9 +3180,13 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo( .setScheduledEventId(data.scheduledEventId) .setScheduleToCloseTimeout(data.scheduledEvent.getScheduleToCloseTimeout()) .setState(convertNexusOperationState(sm.getState(), data)) - .setAttempt(data.getAttempt()) - .setLastAttemptCompleteTime(data.lastAttemptCompleteTime) - .setNextAttemptScheduleTime(data.nextAttemptScheduleTime); + .setAttempt(data.getAttempt()); + if (data.lastAttemptCompleteTime != null) { + builder.setLastAttemptCompleteTime(data.lastAttemptCompleteTime); + } + if (data.nextAttemptScheduleTime != null) { + builder.setNextAttemptScheduleTime(data.nextAttemptScheduleTime); + } data.retryState.getPreviousRunFailure().ifPresent(builder::setLastAttemptFailure);