From 30f391f870f008a7795d4488e6a73d21d3f471ab Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 6 Dec 2024 11:26:45 -0800 Subject: [PATCH 1/3] Add Nexus Worker interceptor (#2278) Add Nexus Worker interceptor --- build.gradle | 2 +- releases/v1.27.0 | 44 ++++ .../OpenTracingWorkerInterceptor.java | 13 +- .../opentracing/SpanOperationType.java | 5 +- .../ActionTypeAndNameSpanBuilderProvider.java | 6 + .../opentracing/internal/ContextAccessor.java | 18 ++ ...NexusOperationInboundCallsInterceptor.java | 97 +++++++++ ...acingWorkflowOutboundCallsInterceptor.java | 30 +++ .../opentracing/internal/SpanFactory.java | 32 +++ .../opentracing/NexusOperationTest.java | 198 ++++++++++++++++++ .../opentracing/integration/JaegerTest.java | 6 +- ...NexusOperationInboundCallsInterceptor.java | 117 +++++++++++ ...sOperationInboundCallsInterceptorBase.java | 51 +++++ ...exusOperationOutboundCallsInterceptor.java | 44 ++++ ...OperationOutboundCallsInterceptorBase.java | 40 ++++ .../interceptors/WorkerInterceptor.java | 13 ++ .../interceptors/WorkerInterceptorBase.java | 8 + .../nexus/NexusOperationContextImpl.java | 16 +- .../internal/nexus/NexusTaskHandlerImpl.java | 27 ++- ...NexusOperationInboundCallsInterceptor.java | 56 +++++ ...exusOperationOutboundCallsInterceptor.java | 38 ++++ .../nexus/TemporalInterceptorMiddleware.java | 92 ++++++++ .../internal/worker/SyncNexusWorker.java | 7 +- .../nexus/NexusTaskHandlerImplTest.java | 24 ++- .../nexus/CancelAsyncOperationTest.java | 33 +++ .../nexus/SyncClientOperationTest.java | 10 +- .../workflow/nexus/SyncOperationFailTest.java | 7 +- .../internal/TracingWorkerInterceptor.java | 48 ++++- 28 files changed, 1053 insertions(+), 29 deletions(-) create mode 100644 releases/v1.27.0 create mode 100644 temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java create mode 100644 temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java create mode 100644 temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java diff --git a/build.gradle b/build.gradle index d8bb288eb..2257ce1ca 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.2.1-alpha' + nexusVersion = '0.3.0-alpha' // [0.1.0,) // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/releases/v1.27.0 b/releases/v1.27.0 new file mode 100644 index 000000000..a93386827 --- /dev/null +++ b/releases/v1.27.0 @@ -0,0 +1,44 @@ +# **💥 BREAKING CHANGES** + +## Update With Start **(Pre-release)** + +### WorkflowClient + +- `WorkflowClient.updateWithStart` has been renamed to `WorkflowClient.startUpdateWithStart`. +- Instead of taking the workflow method, workflow arguments and a `UpdateWithStartWorkflowOperation` , `WorkflowClient.startUpdateWithStart` now takes the update method, update arguments and a `WithStartWorkflowOperation` . `WithStartWorkflowOperation` holds to the workflow method and workflow arguments to be executed together with the update request. + +### WorkflowStub + +- `WorkflowStub.updateWithStart` has been renamed to `WorkflowStub.startUpdateWithStart` +- `WorkflowStub.startUpdateWithStart` now just takes the `UpdateOptions`, update arguments and workflow arguments + +## Update **(Public Preview)** + +- The SDK now preforms more rigorous type validation when registering a Workflow with an `@UpdateValidatorMethod` to make sure the type parameters match the linked `@UpdateMethod` +- The SDK will no longer sometimes throw `WorkflowUpdateException` when calling `WorkflowStub.startUpdate` if the update is rejected. `WorkflowUpdateException` is now consistently throw when getting the result of the update +- `UpdateOptionsBuilder` no longer generates a update ID when built. Now a unique UUID is generated when the options are used. This is similar to how `WorkflowOptions` and workflow ID work. + +## Nexus **(Public Preview)** + +- Workflow started by a Nexus operation now require the Workflow ID to be specified +- The SDK now preforms more rigorous type validation when registering a Nexus Service to make sure it implements the service properly +- All header maps for Nexus operations are now properly case-insensitive. + +# **Highlights** + +## Virtual Threads **(Public Preview)** + +The Java SDK now has experimental support for virtual threads when using a JVM with a version of 21 or higher. Virtual threads can be used inside workflows by enabling `WorkerFactoryOptions.setUsingVirtualWorkflowThreads` . Users can also use virtual threads for task processing in a worker by enabling `WorkerOptions.setUsingVirtualThreads` . + +## Nexus **(Public Preview)** + +`WorkerInterceptor` now has support for intercepting Nexus workers. + +## Update **(Public Preview)** + +`WorkflowClient` now has a set of static methods called `startUpdate` that can be used to start an update, but not immediately wait on the result. This is a type safe analog to `WorkflowStub.startUpdate`. + +## Workflow Metadata **(Public Preview)** + +- The Java SDK now exposes a fixed summary option for local and normal activities. +- The Java SDK now support `__temporal_workflow_metadata` query, this query allows users to get details about a workflow like its’ current description and what signal, update, and query handlers are registered \ No newline at end of file diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java index 095117f96..6da5af3d6 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/OpenTracingWorkerInterceptor.java @@ -20,11 +20,9 @@ package io.temporal.opentracing; +import io.nexusrpc.handler.OperationContext; import io.temporal.common.interceptors.*; -import io.temporal.opentracing.internal.ContextAccessor; -import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor; -import io.temporal.opentracing.internal.OpenTracingWorkflowInboundCallsInterceptor; -import io.temporal.opentracing.internal.SpanFactory; +import io.temporal.opentracing.internal.*; public class OpenTracingWorkerInterceptor implements WorkerInterceptor { private final OpenTracingOptions options; @@ -52,4 +50,11 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt return new OpenTracingActivityInboundCallsInterceptor( next, options, spanFactory, contextAccessor); } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return new OpenTracingNexusOperationInboundCallsInterceptor( + next, options, spanFactory, contextAccessor); + } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 513606865..c11a19c67 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -34,7 +34,10 @@ public enum SpanOperationType { UPDATE_WORKFLOW("UpdateWorkflow"), HANDLE_QUERY("HandleQuery"), HANDLE_SIGNAL("HandleSignal"), - HANDLE_UPDATE("HandleUpdate"); + HANDLE_UPDATE("HandleUpdate"), + START_NEXUS_OPERATION("StartNexusOperation"), + RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"), + RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler"); private final String defaultPrefix; diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 43d226226..3316d4bf0 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -100,6 +100,12 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case START_NEXUS_OPERATION: + return ImmutableMap.of( + StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), + StandardTagNames.RUN_ID, context.getRunId()); + case RUN_START_NEXUS_OPERATION: + case RUN_CANCEL_NEXUS_OPERATION: case HANDLE_QUERY: return ImmutableMap.of(); } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java index 617150856..9cb601c5b 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ContextAccessor.java @@ -24,6 +24,8 @@ import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMapAdapter; import io.temporal.api.common.v1.Payload; import io.temporal.common.converter.DefaultDataConverter; import io.temporal.common.converter.StdConverterBackwardsCompatAdapter; @@ -72,4 +74,20 @@ public SpanContext readSpanContextFromHeader(Header header, Tracer tracer) { payload, HashMap.class, HASH_MAP_STRING_STRING_TYPE); return codec.decode(serializedSpanContext, tracer); } + + public Span writeSpanContextToHeader( + Supplier spanSupplier, Map header, Tracer tracer) { + Span span = spanSupplier.get(); + writeSpanContextToHeader(span.context(), header, tracer); + return span; + } + + public void writeSpanContextToHeader( + SpanContext spanContext, Map header, Tracer tracer) { + tracer.inject(spanContext, Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header)); + } + + public SpanContext readSpanContextFromHeader(Map header, Tracer tracer) { + return tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header)); + } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java new file mode 100644 index 000000000..f5420c7b7 --- /dev/null +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.opentracing.internal; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptorBase; +import io.temporal.opentracing.OpenTracingOptions; + +public class OpenTracingNexusOperationInboundCallsInterceptor + extends NexusOperationInboundCallsInterceptorBase { + private final OpenTracingOptions options; + private final SpanFactory spanFactory; + private final Tracer tracer; + private final ContextAccessor contextAccessor; + + public OpenTracingNexusOperationInboundCallsInterceptor( + NexusOperationInboundCallsInterceptor next, + OpenTracingOptions options, + SpanFactory spanFactory, + ContextAccessor contextAccessor) { + super(next); + this.options = options; + this.spanFactory = spanFactory; + this.tracer = options.getTracer(); + this.contextAccessor = contextAccessor; + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer); + + Span operationStartSpan = + spanFactory + .createStartNexusOperationSpan( + tracer, + input.getOperationContext().getService(), + input.getOperationContext().getOperation(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(operationStartSpan)) { + return super.startOperation(input); + } catch (Throwable t) { + spanFactory.logFail(operationStartSpan, t); + throw t; + } finally { + operationStartSpan.finish(); + } + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer); + + Span operationCancelSpan = + spanFactory + .createCancelNexusOperationSpan( + tracer, + input.getOperationContext().getService(), + input.getOperationContext().getOperation(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(operationCancelSpan)) { + return super.cancelOperation(input); + } catch (Throwable t) { + spanFactory.logFail(operationCancelSpan, t); + throw t; + } finally { + operationCancelSpan.finish(); + } + } +} diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 7b6add447..5f7f2ccbc 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -100,6 +100,25 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp } } + @Override + public ExecuteNexusOperationOutput executeNexusOperation( + ExecuteNexusOperationInput input) { + if (!WorkflowUnsafe.isReplaying()) { + Span nexusOperationExecuteSpan = + contextAccessor.writeSpanContextToHeader( + () -> createStartNexusOperationSpanBuilder(input).start(), + input.getHeaders(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) { + return super.executeNexusOperation(input); + } finally { + nexusOperationExecuteSpan.finish(); + } + } else { + return super.executeNexusOperation(input); + } + } + @Override public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { if (!WorkflowUnsafe.isReplaying()) { @@ -176,6 +195,17 @@ private Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow parentWorkflowInfo.getRunId()); } + private Tracer.SpanBuilder createStartNexusOperationSpanBuilder( + ExecuteNexusOperationInput input) { + WorkflowInfo parentWorkflowInfo = Workflow.getInfo(); + return spanFactory.createStartNexusOperationSpan( + tracer, + input.getService(), + input.getOperation(), + parentWorkflowInfo.getWorkflowId(), + parentWorkflowInfo.getRunId()); + } + private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) { WorkflowInfo continuedWorkflowInfo = Workflow.getInfo(); return spanFactory.createContinueAsNewWorkflowStartSpan( diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 9e0e79ed9..7ab168af9 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -59,6 +59,18 @@ public Tracer.SpanBuilder createWorkflowStartSpan( return createSpan(context, tracer, null, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createStartNexusOperationSpan( + Tracer tracer, String serviceName, String operationName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.START_NEXUS_OPERATION) + .setActionName(serviceName + "/" + operationName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.CHILD_OF); + } + public Tracer.SpanBuilder createChildWorkflowStartSpan( Tracer tracer, String childWorkflowType, @@ -173,6 +185,26 @@ public Tracer.SpanBuilder createActivityRunSpan( return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createStartNexusOperationSpan( + Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.RUN_START_NEXUS_OPERATION) + .setActionName(serviceName + "/" + operationName) + .build(); + return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createCancelNexusOperationSpan( + Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.RUN_CANCEL_NEXUS_OPERATION) + .setActionName(serviceName + "/" + operationName) + .build(); + return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( Tracer tracer, String updateName, String workflowId, String runId) { SpanCreationContext context = diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java new file mode 100644 index 000000000..9b34f9f59 --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/NexusOperationTest.java @@ -0,0 +1,198 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; + +import io.nexusrpc.Operation; +import io.nexusrpc.Service; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationImpl; +import io.nexusrpc.handler.ServiceImpl; +import io.opentracing.Scope; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.nexus.WorkflowClientOperationHandlers; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.List; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +public class NexusOperationTest { + + private final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions OT_OPTIONS = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS)) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS)) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class, OtherWorkflowImpl.class) + .setNexusServiceImplementation(new TestNexusServiceImpl()) + .build(); + + @After + public void tearDown() { + mockTracer.reset(); + } + + @Service + public interface TestNexusService { + @Operation + String operation(String input); + } + + @ServiceImpl(service = TestNexusService.class) + public class TestNexusServiceImpl { + @OperationImpl + public OperationHandler operation() { + return WorkflowClientOperationHandlers.fromWorkflowMethod( + (context, details, client, input) -> + client.newWorkflowStub( + TestOtherWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(details.getRequestId()).build()) + ::workflow); + } + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String workflow(String input); + } + + @WorkflowInterface + public interface TestOtherWorkflow { + @WorkflowMethod + String workflow(String input); + } + + public static class OtherWorkflowImpl implements TestOtherWorkflow { + @Override + public String workflow(String input) { + return "Hello, " + input + "!"; + } + } + + public static class WorkflowImpl implements TestWorkflow { + private final TestNexusService nexusService = + Workflow.newNexusServiceStub( + TestNexusService.class, + NexusServiceOptions.newBuilder() + .setOperationOptions( + NexusOperationOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofSeconds(10)) + .build()) + .build()); + + @Override + public String workflow(String input) { + return nexusService.operation(input); + } + } + + /* + * We are checking that spans structure looks like this: + * ClientFunction + * | + * child + * v + * StartWorkflow:TestWorkflow -follow> RunWorkflow:TestWorkflow + * | + * child + * v + * StartNexusOperation:TestNexusService/operation -follow> RunNexusOperationHandler:TestNexusService/operation + * | + * child + * v + * StartWorkflow:TestOtherWorkflow -follow> RunWorkflow:TestOtherWorkflow + */ + @Test + public void testNexusOperation() { + MockSpan span = mockTracer.buildSpan("ClientFunction").start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + try (Scope scope = mockTracer.scopeManager().activate(span)) { + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + assertEquals("Hello, input!", workflow.workflow("input")); + } finally { + span.finish(); + } + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + + MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction"); + + MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0); + assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); + assertEquals("StartWorkflow:TestWorkflow", workflowStartSpan.operationName()); + + MockSpan workflowRunSpan = spansHelper.getByParentSpan(workflowStartSpan).get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + + MockSpan executeNexusOperationSpan = spansHelper.getByParentSpan(workflowRunSpan).get(0); + assertEquals(workflowRunSpan.context().spanId(), executeNexusOperationSpan.parentId()); + assertEquals( + "StartNexusOperation:TestNexusService/operation", + executeNexusOperationSpan.operationName()); + + List startNexusOperationSpans = + spansHelper.getByParentSpan(executeNexusOperationSpan); + + MockSpan startNexusOperationSpan = startNexusOperationSpans.get(0); + assertEquals(executeNexusOperationSpan.context().spanId(), startNexusOperationSpan.parentId()); + assertEquals( + "RunStartNexusOperationHandler:TestNexusService/operation", + startNexusOperationSpan.operationName()); + + MockSpan startOtherWorkflowSpan = spansHelper.getByParentSpan(startNexusOperationSpan).get(0); + assertEquals(startNexusOperationSpan.context().spanId(), startOtherWorkflowSpan.parentId()); + assertEquals("StartWorkflow:TestOtherWorkflow", startOtherWorkflowSpan.operationName()); + + MockSpan otherWorkflowRunSpan = spansHelper.getByParentSpan(startOtherWorkflowSpan).get(0); + assertEquals(startOtherWorkflowSpan.context().spanId(), otherWorkflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestOtherWorkflow", otherWorkflowRunSpan.operationName()); + } +} diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java index f324cade3..b2ce28ff9 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/integration/JaegerTest.java @@ -42,9 +42,7 @@ import io.temporal.opentracing.OpenTracingWorkerInterceptor; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.WorkerFactoryOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflow.*; import java.time.Duration; import java.util.List; import org.junit.After; @@ -55,7 +53,7 @@ public class JaegerTest { private final InMemoryReporter reporter = new InMemoryReporter(); private final Sampler sampler = new ConstSampler(true); - ; + private final Tracer tracer = new JaegerTracer.Builder("temporal-test").withReporter(reporter).withSampler(sampler).build(); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java new file mode 100644 index 000000000..cf8af8d95 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.*; +import io.temporal.common.Experimental; + +/** + * Intercepts inbound calls to a Nexus operation on the worker side. + * + *

An instance should be created in {@link + * WorkerInterceptor#interceptNexusOperation(NexusOperationInboundCallsInterceptor)}. + * + *

Prefer extending {@link NexusOperationInboundCallsInterceptorBase} and overriding only the + * methods you need instead of implementing this interface directly. {@link + * NexusOperationInboundCallsInterceptorBase} provides correct default implementations to all the + * methods of this interface. + */ +@Experimental +public interface NexusOperationInboundCallsInterceptor { + final class StartOperationInput { + private final OperationContext operationContext; + private final OperationStartDetails startDetails; + private final Object input; + + public StartOperationInput( + OperationContext operationContext, OperationStartDetails startDetails, Object input) { + this.operationContext = operationContext; + this.startDetails = startDetails; + this.input = input; + } + + public OperationContext getOperationContext() { + return operationContext; + } + + public OperationStartDetails getStartDetails() { + return startDetails; + } + + public Object getInput() { + return input; + } + } + + final class StartOperationOutput { + private final OperationStartResult result; + + public StartOperationOutput(OperationStartResult result) { + this.result = result; + } + + public OperationStartResult getResult() { + return result; + } + } + + final class CancelOperationInput { + private final OperationContext operationContext; + private final OperationCancelDetails cancelDetails; + + public CancelOperationInput( + OperationContext operationContext, OperationCancelDetails cancelDetails) { + this.operationContext = operationContext; + this.cancelDetails = cancelDetails; + } + + public OperationContext getOperationContext() { + return operationContext; + } + + public OperationCancelDetails getCancelDetails() { + return cancelDetails; + } + } + + final class CancelOperationOutput {} + + void init(NexusOperationOutboundCallsInterceptor outboundCalls); + + /** + * Intercepts a call to start a Nexus operation. + * + * @param input input to the operation start. + * @return result of the operation start. + * @throws OperationUnsuccessfulException if the operation start failed. + */ + StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException; + + /** + * Intercepts a call to cancel a Nexus operation. + * + * @param input input to the operation cancel. + * @return result of the operation cancel. + */ + CancelOperationOutput cancelOperation(CancelOperationInput input); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java new file mode 100644 index 000000000..b523a9c6c --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.temporal.common.Experimental; + +/** Convenience base class for {@link NexusOperationInboundCallsInterceptor} implementations. */ +@Experimental +public class NexusOperationInboundCallsInterceptorBase + implements NexusOperationInboundCallsInterceptor { + private final NexusOperationInboundCallsInterceptor next; + + public NexusOperationInboundCallsInterceptorBase(NexusOperationInboundCallsInterceptor next) { + this.next = next; + } + + @Override + public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { + next.init(outboundCalls); + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + return next.startOperation(input); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + return next.cancelOperation(input); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java new file mode 100644 index 000000000..1f3d80015 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptor.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; + +/** + * Can be used to intercept calls from a Nexus operation into the Temporal APIs. + * + *

Prefer extending {@link NexusOperationOutboundCallsInterceptorBase} and overriding only the + * methods you need instead of implementing this interface directly. {@link + * NexusOperationOutboundCallsInterceptorBase} provides correct default implementations to all the + * methods of this interface. + * + *

An instance may be created in {@link + * NexusOperationInboundCallsInterceptor#init(NexusOperationOutboundCallsInterceptor)} and set by + * passing it into {@code init} method of the {@code next} {@link + * NexusOperationInboundCallsInterceptor} The implementation must forward all the calls to the + * outbound interceptor passed as a {@code outboundCalls} parameter to the {@code init} call. + */ +@Experimental +public interface NexusOperationOutboundCallsInterceptor { + /** Intercepts call to get the metric scope in a Nexus operation. */ + Scope getMetricsScope(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java new file mode 100644 index 000000000..26ad01481 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationOutboundCallsInterceptorBase.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.common.interceptors; + +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; + +/** Convenience base class for {@link NexusOperationOutboundCallsInterceptor} implementations. */ +@Experimental +public class NexusOperationOutboundCallsInterceptorBase + implements NexusOperationOutboundCallsInterceptor { + private final NexusOperationOutboundCallsInterceptor next; + + public NexusOperationOutboundCallsInterceptorBase(NexusOperationOutboundCallsInterceptor next) { + this.next = next; + } + + @Override + public Scope getMetricsScope() { + return next.getMetricsScope(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java index e5aabbfe9..2a2897c72 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptor.java @@ -20,6 +20,7 @@ package io.temporal.common.interceptors; +import io.nexusrpc.handler.OperationContext; import io.temporal.common.Experimental; /** @@ -95,4 +96,16 @@ public interface WorkerInterceptor { WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next); ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next); + + /** + * Called when Nexus task is received. May create a {@link NexusOperationInboundCallsInterceptor} + * instance. The instance must forward all the calls to {@code next} {@link + * NexusOperationInboundCallsInterceptor}, but it may change the input parameters. + * + * @param next an existing interceptor instance to be proxied by the interceptor created inside + * this method + * @return an interceptor that passes all the calls to {@code next} + */ + NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java index 2c09d188e..b331c4998 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkerInterceptorBase.java @@ -20,6 +20,8 @@ package io.temporal.common.interceptors; +import io.nexusrpc.handler.OperationContext; + public class WorkerInterceptorBase implements WorkerInterceptor { @Override public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInterceptor next) { @@ -30,4 +32,10 @@ public WorkflowInboundCallsInterceptor interceptWorkflow(WorkflowInboundCallsInt public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInterceptor next) { return next; } + + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return next; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java index b54c0867d..e159d775e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusOperationContextImpl.java @@ -22,25 +22,29 @@ import com.uber.m3.tally.Scope; import io.temporal.client.WorkflowClient; +import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; import io.temporal.nexus.NexusOperationContext; public class NexusOperationContextImpl implements NexusOperationContext { private final String namespace; private final String taskQueue; private final WorkflowClient client; - private final Scope metricsScope; + NexusOperationOutboundCallsInterceptor outboundCalls; public NexusOperationContextImpl( - String namespace, String taskQueue, WorkflowClient client, Scope metricsScope) { + String namespace, + String taskQueue, + WorkflowClient client, + NexusOperationOutboundCallsInterceptor outboundCalls) { this.namespace = namespace; this.taskQueue = taskQueue; this.client = client; - this.metricsScope = metricsScope; + this.outboundCalls = outboundCalls; } @Override public Scope getMetricsScope() { - return metricsScope; + return outboundCalls.getMetricsScope(); } public WorkflowClient getWorkflowClient() { @@ -54,4 +58,8 @@ public String getTaskQueue() { public String getNamespace() { return namespace; } + + public void setOutboundInterceptor(NexusOperationOutboundCallsInterceptor outboundCalls) { + this.outboundCalls = outboundCalls; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index 98fa829b7..a31a69f9a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -33,6 +33,7 @@ import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowException; import io.temporal.common.converter.DataConverter; +import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.failure.ApplicationFailure; import io.temporal.internal.common.NexusUtil; import io.temporal.internal.worker.NexusTask; @@ -46,6 +47,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +61,21 @@ public class NexusTaskHandlerImpl implements NexusTaskHandler { private final Map serviceImplInstances = Collections.synchronizedMap(new HashMap<>()); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final WorkerInterceptor[] interceptors; + private final TemporalInterceptorMiddleware nexusServiceInterceptor; public NexusTaskHandlerImpl( - WorkflowClient client, String namespace, String taskQueue, DataConverter dataConverter) { - this.client = client; - this.namespace = namespace; - this.taskQueue = taskQueue; - this.dataConverter = dataConverter; + @Nonnull WorkflowClient client, + @Nonnull String namespace, + @Nonnull String taskQueue, + @Nonnull DataConverter dataConverter, + @Nonnull WorkerInterceptor[] interceptors) { + this.client = Objects.requireNonNull(client); + this.namespace = Objects.requireNonNull(namespace); + this.taskQueue = Objects.requireNonNull(taskQueue); + this.dataConverter = Objects.requireNonNull(dataConverter); + this.interceptors = Objects.requireNonNull(interceptors); + this.nexusServiceInterceptor = new TemporalInterceptorMiddleware(interceptors); } @Override @@ -76,6 +86,7 @@ public boolean start() { ServiceHandler.Builder serviceHandlerBuilder = ServiceHandler.newBuilder().setSerializer(new PayloadSerializer(dataConverter)); serviceImplInstances.forEach((name, instance) -> serviceHandlerBuilder.addInstance(instance)); + serviceHandlerBuilder.addOperationMiddleware(nexusServiceInterceptor); serviceHandler = serviceHandlerBuilder.build(); return true; } @@ -121,7 +132,11 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } CurrentNexusOperationContext.set( - new NexusOperationContextImpl(namespace, taskQueue, client, metricsScope)); + new NexusOperationContextImpl( + namespace, + taskQueue, + client, + new RootNexusOperationOutboundCallsInterceptor(metricsScope))); switch (request.getVariantCase()) { case START_OPERATION: diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java new file mode 100644 index 000000000..5ddada3ac --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.OperationHandler; +import io.nexusrpc.handler.OperationStartResult; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; + +public class RootNexusOperationInboundCallsInterceptor + implements NexusOperationInboundCallsInterceptor { + private final OperationHandler operationInterceptor; + + RootNexusOperationInboundCallsInterceptor(OperationHandler operationInterceptor) { + this.operationInterceptor = operationInterceptor; + } + + @Override + public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { + CurrentNexusOperationContext.get().setOutboundInterceptor(outboundCalls); + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + OperationStartResult result = + operationInterceptor.start( + input.getOperationContext(), input.getStartDetails(), input.getInput()); + return new StartOperationOutput(result); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + operationInterceptor.cancel(input.getOperationContext(), input.getCancelDetails()); + return new CancelOperationOutput(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java new file mode 100644 index 000000000..1b16f87fe --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationOutboundCallsInterceptor.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import com.uber.m3.tally.Scope; +import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor; + +public class RootNexusOperationOutboundCallsInterceptor + implements NexusOperationOutboundCallsInterceptor { + private final Scope scope; + + RootNexusOperationOutboundCallsInterceptor(Scope scope) { + this.scope = scope; + } + + @Override + public Scope getMetricsScope() { + return scope; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java new file mode 100644 index 000000000..abc7bc919 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.nexus; + +import io.nexusrpc.OperationInfo; +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.*; +import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkerInterceptor; + +public class TemporalInterceptorMiddleware implements OperationMiddleware { + private final WorkerInterceptor[] interceptors; + RootNexusOperationInboundCallsInterceptor rootInboundCallsInterceptor; + + public TemporalInterceptorMiddleware(WorkerInterceptor[] interceptors) { + this.interceptors = interceptors; + } + + @Override + public OperationHandler intercept( + OperationContext context, OperationHandler operationHandler) { + rootInboundCallsInterceptor = new RootNexusOperationInboundCallsInterceptor(operationHandler); + NexusOperationInboundCallsInterceptor inboundCallsInterceptor = rootInboundCallsInterceptor; + for (WorkerInterceptor interceptor : interceptors) { + inboundCallsInterceptor = + interceptor.interceptNexusOperation(context, inboundCallsInterceptor); + } + + inboundCallsInterceptor.init( + new RootNexusOperationOutboundCallsInterceptor( + CurrentNexusOperationContext.get().getMetricsScope())); + return new OperationInterceptorConverter(inboundCallsInterceptor); + } + + static class OperationInterceptorConverter implements OperationHandler { + private final NexusOperationInboundCallsInterceptor next; + + public OperationInterceptorConverter(NexusOperationInboundCallsInterceptor next) { + this.next = next; + } + + @Override + public OperationStartResult start( + OperationContext operationContext, OperationStartDetails operationStartDetails, Object o) + throws OperationUnsuccessfulException { + return next.startOperation( + new NexusOperationInboundCallsInterceptor.StartOperationInput( + operationContext, operationStartDetails, o)) + .getResult(); + } + + @Override + public Object fetchResult( + OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails) + throws OperationHandlerException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public OperationInfo fetchInfo( + OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails) + throws OperationHandlerException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void cancel( + OperationContext operationContext, OperationCancelDetails operationCancelDetails) { + next.cancelOperation( + new NexusOperationInboundCallsInterceptor.CancelOperationInput( + operationContext, operationCancelDetails)); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index c213dafc2..d452b7312 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -49,7 +49,12 @@ public SyncNexusWorker( this.taskQueue = taskQueue; this.taskHandler = - new NexusTaskHandlerImpl(client, namespace, taskQueue, options.getDataConverter()); + new NexusTaskHandlerImpl( + client, + namespace, + taskQueue, + options.getDataConverter(), + options.getWorkerInterceptors()); this.worker = new NexusWorker( client.getWorkflowServiceStubs(), diff --git a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java index 6a1a85eb4..f0f1ab11e 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/nexus/NexusTaskHandlerImplTest.java @@ -20,6 +20,8 @@ package io.temporal.internal.nexus; +import static org.mockito.Mockito.mock; + import com.google.protobuf.ByteString; import com.uber.m3.tally.RootScopeBuilder; import com.uber.m3.tally.Scope; @@ -32,8 +34,10 @@ import io.temporal.api.nexus.v1.Request; import io.temporal.api.nexus.v1.StartOperationRequest; import io.temporal.api.workflowservice.v1.PollNexusTaskQueueResponse; +import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.interceptors.WorkerInterceptor; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.internal.worker.NexusTask; import io.temporal.internal.worker.NexusTaskHandler; @@ -59,16 +63,20 @@ public void setUp() { @Test public void nexusTaskHandlerImplStartNoService() { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); // Verify if no service is registered, start should return false Assert.assertFalse(nexusTaskHandlerImpl.start()); } @Test public void nexusTaskHandlerImplStart() { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImpl()}); // Verify if any services are registered, start should return true @@ -77,8 +85,10 @@ public void nexusTaskHandlerImplStart() { @Test public void startSyncTask() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImpl()}); nexusTaskHandlerImpl.start(); @@ -115,8 +125,10 @@ public void startSyncTask() throws TimeoutException { @Test public void syncTimeoutTask() { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImpl2()}); nexusTaskHandlerImpl.start(); @@ -140,8 +152,10 @@ public void syncTimeoutTask() { @Test public void startAsyncSyncOperation() throws TimeoutException { + WorkflowClient client = mock(WorkflowClient.class); NexusTaskHandlerImpl nexusTaskHandlerImpl = - new NexusTaskHandlerImpl(null, NAMESPACE, TASK_QUEUE, dataConverter); + new NexusTaskHandlerImpl( + client, NAMESPACE, TASK_QUEUE, dataConverter, new WorkerInterceptor[] {}); nexusTaskHandlerImpl.registerNexusServiceImplementations( new Object[] {new TestNexusServiceImplAsync()}); nexusTaskHandlerImpl.start(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java index eb0bd1e42..bb8859748 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java @@ -20,6 +20,8 @@ package io.temporal.workflow.nexus; +import static org.junit.Assume.assumeFalse; + import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; @@ -29,11 +31,13 @@ import io.temporal.failure.NexusOperationFailure; import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.testing.internal.TracingWorkerInterceptor; import io.temporal.workflow.*; import io.temporal.workflow.shared.TestNexusServices; import io.temporal.workflow.shared.TestWorkflows; import java.time.Duration; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -45,6 +49,13 @@ public class CancelAsyncOperationTest { .setNexusServiceImplementation(new TestNexusServiceImpl()) .build(); + @Before + public void checkRealServer() { + assumeFalse( + "Test flakes on real server because of delays in the Nexus Registry", + SDKTestWorkflowRule.useExternalService); + } + @Test public void asyncOperationImmediatelyCancelled() { TestWorkflows.TestWorkflow1 workflowStub = @@ -58,6 +69,15 @@ public void asyncOperationImmediatelyCancelled() { CanceledFailure canceledFailure = (CanceledFailure) nexusFailure.getCause(); Assert.assertEquals( "operation canceled before it was started", canceledFailure.getOriginalMessage()); + + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "newThread workflow-method", + "executeNexusOperation TestNexusService1 operation", + "startNexusOperation TestNexusService1 operation", + "cancelNexusOperation TestNexusService1 operation"); } @Test @@ -69,6 +89,19 @@ public void asyncOperationCancelled() { Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure); + + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "newThread workflow-method", + "executeNexusOperation TestNexusService1 operation", + "startNexusOperation TestNexusService1 operation", + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "registerSignalHandlers unblock", + "newThread workflow-method", + "await await", + "cancelNexusOperation TestNexusService1 operation"); } public static class TestNexus implements TestWorkflows.TestWorkflow1 { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java index adbb2c055..d774875dd 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java @@ -34,6 +34,7 @@ import io.temporal.nexus.WorkflowClientOperationHandlers; import io.temporal.serviceclient.MetricsTag; import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.testing.internal.TracingWorkerInterceptor; import io.temporal.worker.MetricsType; import io.temporal.worker.WorkerMetricsTag; import io.temporal.workflow.*; @@ -63,7 +64,14 @@ public void syncClientOperationSuccess() { TestUpdatedWorkflow workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class); Assert.assertTrue(workflowStub.execute(false).startsWith("Update ID:")); - + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "registerUpdateHandlers update", + "newThread workflow-method", + "executeNexusOperation TestNexusService1 operation", + "startNexusOperation TestNexusService1 operation"); // Test metrics all tasks should have Map nexusWorkerTags = ImmutableMap.builder() diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java index 24ad56bc6..be79bbb8b 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java @@ -33,6 +33,7 @@ import io.temporal.failure.ApplicationFailure; import io.temporal.failure.NexusOperationFailure; import io.temporal.serviceclient.MetricsTag; +import io.temporal.testUtils.Eventually; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.MetricsType; import io.temporal.worker.WorkerMetricsTag; @@ -77,7 +78,11 @@ public void failSyncOperation() { .put(MetricsTag.NEXUS_OPERATION, "operation") .put(MetricsTag.TASK_FAILURE_TYPE, "operation_failed") .buildKeepingLast(); - reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 4); + Eventually.assertEventually( + Duration.ofSeconds(1), + () -> { + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 4); + }); } public static class TestNexus implements TestWorkflow1 { diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 30c32ddbd..93eb971dd 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import com.uber.m3.tally.Scope; +import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.handler.OperationContext; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; import io.temporal.common.SearchAttributeUpdate; @@ -129,6 +131,12 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt return new TracingActivityInboundCallsInterceptor(trace, next); } + @Override + public NexusOperationInboundCallsInterceptor interceptNexusOperation( + OperationContext context, NexusOperationInboundCallsInterceptor next) { + return new TracingNexusOperationInboundCallsInterceptor(trace, next); + } + public static class FilteredTrace { private final List impl = Collections.synchronizedList(new ArrayList<>()); @@ -189,7 +197,7 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp public ExecuteNexusOperationOutput executeNexusOperation( ExecuteNexusOperationInput input) { if (!WorkflowUnsafe.isReplaying()) { - trace.add("executeNexusOperation " + input.getOperation()); + trace.add("executeNexusOperation " + input.getService() + " " + input.getOperation()); } return next.executeNexusOperation(input); } @@ -468,4 +476,42 @@ public ActivityOutput execute(ActivityInput input) { return next.execute(input); } } + + private static class TracingNexusOperationInboundCallsInterceptor + implements NexusOperationInboundCallsInterceptor { + private final NexusOperationInboundCallsInterceptor next; + private final FilteredTrace trace; + + public TracingNexusOperationInboundCallsInterceptor( + FilteredTrace trace, NexusOperationInboundCallsInterceptor next) { + this.trace = trace; + this.next = next; + } + + @Override + public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { + next.init(outboundCalls); + } + + @Override + public StartOperationOutput startOperation(StartOperationInput input) + throws OperationUnsuccessfulException { + trace.add( + "startNexusOperation " + + input.getOperationContext().getService() + + " " + + input.getOperationContext().getOperation()); + return next.startOperation(input); + } + + @Override + public CancelOperationOutput cancelOperation(CancelOperationInput input) { + trace.add( + "cancelNexusOperation " + + input.getOperationContext().getService() + + " " + + input.getOperationContext().getOperation()); + return next.cancelOperation(input); + } + } } From 707337527a01e0e6e1f0b048168138d854e091fe Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 6 Dec 2024 12:09:55 -0800 Subject: [PATCH 2/3] Add operation Id to callback headers (#2336) Add operation Id to callback headers --- build.gradle | 2 +- .../internal/common/InternalUtils.java | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 2257ce1ca..4fce4e12d 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.3.0-alpha' // [0.1.0,) + nexusVersion = '0.3.0-alpha' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index e37d65f58..f9fa4c637 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -21,6 +21,7 @@ package io.temporal.internal.common; import com.google.common.base.Defaults; +import io.nexusrpc.Header; import io.temporal.api.common.v1.Callback; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.taskqueue.v1.TaskQueue; @@ -28,6 +29,8 @@ import io.temporal.client.WorkflowStub; import io.temporal.internal.client.NexusStartWorkflowRequest; import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,20 @@ public static WorkflowStub createNexusBoundStub( throw new IllegalArgumentException( "WorkflowId is expected to be set on WorkflowOptions when used with Nexus"); } + // Add the Nexus operation ID to the headers if it is not already present to support fabricating + // a NexusOperationStarted event if the completion is received before the response to a + // StartOperation request. + Map headers = + request.getCallbackHeaders().entrySet().stream() + .collect( + Collectors.toMap( + (k) -> k.getKey().toLowerCase(), + Map.Entry::getValue, + (a, b) -> a, + () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); + if (!headers.containsKey(Header.OPERATION_ID)) { + headers.put(Header.OPERATION_ID.toLowerCase(), options.getWorkflowId()); + } WorkflowOptions.Builder nexusWorkflowOptions = WorkflowOptions.newBuilder(options) .setRequestId(request.getRequestId()) @@ -87,7 +104,7 @@ public static WorkflowStub createNexusBoundStub( .setNexus( Callback.Nexus.newBuilder() .setUrl(request.getCallbackUrl()) - .putAllHeader(request.getCallbackHeaders()) + .putAllHeader(headers) .build()) .build())); if (options.getTaskQueue() == null) { From a073e646c51eab8524421c633ea459f6a5973c8d Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 6 Dec 2024 12:56:38 -0800 Subject: [PATCH 3/3] Clean up v1.27.0 release notes (#2345) Release v1.27.0 --- releases/v1.27.0 | 72 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 13 deletions(-) diff --git a/releases/v1.27.0 b/releases/v1.27.0 index a93386827..5177a2bd7 100644 --- a/releases/v1.27.0 +++ b/releases/v1.27.0 @@ -4,41 +4,87 @@ ### WorkflowClient -- `WorkflowClient.updateWithStart` has been renamed to `WorkflowClient.startUpdateWithStart`. -- Instead of taking the workflow method, workflow arguments and a `UpdateWithStartWorkflowOperation` , `WorkflowClient.startUpdateWithStart` now takes the update method, update arguments and a `WithStartWorkflowOperation` . `WithStartWorkflowOperation` holds to the workflow method and workflow arguments to be executed together with the update request. +- `WorkflowClient.updateWithStart` has been renamed to `WorkflowClient.startUpdateWithStart`. +- Instead of taking the workflow method, workflow arguments and a `UpdateWithStartWorkflowOperation`, `WorkflowClient.startUpdateWithStart` now takes the update method, update arguments and a `WithStartWorkflowOperation`. `WithStartWorkflowOperation` contains the workflow method and workflow arguments to be executed together with the update request. ### WorkflowStub -- `WorkflowStub.updateWithStart` has been renamed to `WorkflowStub.startUpdateWithStart` -- `WorkflowStub.startUpdateWithStart` now just takes the `UpdateOptions`, update arguments and workflow arguments +- `WorkflowStub.updateWithStart` has been renamed to `WorkflowStub.startUpdateWithStart`. +- `WorkflowStub.startUpdateWithStart` now just takes the `UpdateOptions`, update arguments and workflow arguments. ## Update **(Public Preview)** -- The SDK now preforms more rigorous type validation when registering a Workflow with an `@UpdateValidatorMethod` to make sure the type parameters match the linked `@UpdateMethod` -- The SDK will no longer sometimes throw `WorkflowUpdateException` when calling `WorkflowStub.startUpdate` if the update is rejected. `WorkflowUpdateException` is now consistently throw when getting the result of the update -- `UpdateOptionsBuilder` no longer generates a update ID when built. Now a unique UUID is generated when the options are used. This is similar to how `WorkflowOptions` and workflow ID work. +- The SDK now preforms more rigorous type validation when registering a Workflow with an `@UpdateValidatorMethod` to make sure the type parameters match the linked `@UpdateMethod`. +- The SDK will no longer sometimes throw `WorkflowUpdateException` when calling `WorkflowStub.startUpdate` if the update is rejected. `WorkflowUpdateException` is now consistently throw when getting the result of the update +- `UpdateOptionsBuilder` no longer generates a update ID when built. Now a unique UUID is generated when the options are used. This is similar to how `WorkflowOptions` and workflow ID work. ## Nexus **(Public Preview)** -- Workflow started by a Nexus operation now require the Workflow ID to be specified -- The SDK now preforms more rigorous type validation when registering a Nexus Service to make sure it implements the service properly +- Workflow started by a Nexus operation now require the Workflow ID to be specified in the `WorkflowOptions`. +- The SDK now preforms more rigorous type validation when registering a Nexus Service to make sure it implements the service properly. - All header maps for Nexus operations are now properly case-insensitive. # **Highlights** ## Virtual Threads **(Public Preview)** -The Java SDK now has experimental support for virtual threads when using a JVM with a version of 21 or higher. Virtual threads can be used inside workflows by enabling `WorkerFactoryOptions.setUsingVirtualWorkflowThreads` . Users can also use virtual threads for task processing in a worker by enabling `WorkerOptions.setUsingVirtualThreads` . +The Java SDK now has experimental support for virtual threads when using a JVM with a version of 21 or higher. Virtual threads can be used inside workflows by enabling `WorkerFactoryOptions.setUsingVirtualWorkflowThreads`. Users can also use virtual threads for task processing in a worker by enabling `WorkerOptions.setUsingVirtualThreads`. ## Nexus **(Public Preview)** -`WorkerInterceptor` now has support for intercepting Nexus workers. +`WorkerInterceptor` now has support for intercepting Nexus workers. ## Update **(Public Preview)** -`WorkflowClient` now has a set of static methods called `startUpdate` that can be used to start an update, but not immediately wait on the result. This is a type safe analog to `WorkflowStub.startUpdate`. +`WorkflowClient` now has a set of static methods called `startUpdate` that can be used to start an update, but not immediately wait on the result. This is a type safe analog to `WorkflowStub.startUpdate`. ## Workflow Metadata **(Public Preview)** - The Java SDK now exposes a fixed summary option for local and normal activities. -- The Java SDK now support `__temporal_workflow_metadata` query, this query allows users to get details about a workflow like its’ current description and what signal, update, and query handlers are registered \ No newline at end of file +- The Java SDK now support `__temporal_workflow_metadata` query, this query allows users to get details about a workflow like its’ current description and what signal, update, and query handlers are registered. + +# What's Changed + +2024-10-16 - acfadbfd - Avoid spamming retries in nexusOperationApplicationFailureFailureConversion (#2272) +2024-10-16 - eb64ec3e - Fix code coverage (#2275) +2024-10-17 - 25f55366 - Fix Null pointer exception on passing empty search attribute (#2277) +2024-10-21 - 0ce1d6ec - Bump edge test dependencies (#2279) +2024-10-21 - 301e1290 - Fix test server operation timeout (#2282) +2024-10-21 - ac3526b7 - Avoid warning from un-accessed operation promise (#2280) +2024-10-22 - 34106777 - Fix UpdateWithStart untyped operation (#2288) +2024-10-22 - 7bcade2c - Fix UpdateWithStart workflow args (#2286) +2024-10-24 - 27d998af - Add failure_reason to nexus_task_execution_failed (#2274) +2024-10-24 - 805833c3 - Release v1.26.1 (#2293) +2024-10-24 - 93e30d7f - Avoid SyncOperationTimeoutTest spamming the test server (#2292) +2024-10-24 - b8c4b7bf - Fix proto decoding in a Nexus Operation (#2281) +2024-10-25 - 2ded9853 - Bump Java edge dependency version to 21 (#2296) +2024-10-25 - 6efbde3c - Update gradle 7.6.1 -> 8.10.2 (#2294) +2024-10-29 - 0b192d32 - Fix workflow implementation in springboot failing if no default constructor is present (#2300) +2024-10-30 - c96f8d61 - Add workflow metadata query (#2301) +2024-10-31 - 37081cce - Fix jacoco coverage (#2304) +2024-10-31 - b45e40d4 - Remove feature branch (#2303) +2024-11-03 - f6bf576e - Add support for virtual workflow threads (#2297) +2024-11-04 - c8a27ce9 - Add activity summary (#2306) +2024-11-12 - 24990dbc - Ad support for local activity metadata (#2309) +2024-11-12 - 83f47efb - Make sure workflow options from proxy call are propagated (#2310) +2024-11-12 - b53f304f - Expose fromWorkflowStub (#2311) +2024-11-13 - 02ff5cd3 - Add timeout to canRunWithResourceBasedTuner (#2313) +2024-11-14 - 4cee4e08 - Validate with generic parameter types for workflow init (#2318) +2024-11-14 - 7ab0f6c9 - Refactor workflow init validation (#2316) +2024-11-15 - 2a688839 - Generate update ID at call time if not set (#2319) +2024-11-22 - 16b0bb92 - Disable eager activities if task queue rate limits is set (#2325) +2024-11-22 - 1d86a574 - Support toString on workflow proxy types (#2315) +2024-11-22 - c6f0b58c - Do runtime check to ensure update validator has the same parameters as the update it validates (#2323) +2024-11-23 - 74022f16 - Add getResult to WorkflowUpdateHandle (#2324) +2024-11-25 - a2dd3694 - Add high-level workflow describe (#2326) +2024-11-26 - 89021d0e - Add type safe API to execute an async update workflow request (#2320) +2024-12-01 - fcc03431 - Fix CI after GHA's drop of node16 actions (#2338) +2024-12-02 - 5212a34e - Skip flaky test (#2334) +2024-12-02 - 7245bf8b - Require workflow ID to be set for Nexus (#2330) +2024-12-02 - e3ef9b4e - Standardized update failure exception (#2339) +2024-12-03 - 8782de33 - Make nexus header check case-insensitive (#2335) +2024-12-03 - cbcf26cb - Fix unbalanced locks in test server for Nexus (#2341) +2024-12-04 - c7fcf12f - Make headers in ExecuteNexusOperationInput case insensitive (#2342) +2024-12-06 - 30f391f8 - Add Nexus Worker interceptor (#2278) +2024-12-06 - 70733752 - Add operation Id to callback headers (#2336) +2024-12-06 - 9ac1af3d - New Update-with-Start API (#2337)