Skip to content

Commit

Permalink
Add Nexus Worker interceptor (#2278)
Browse files Browse the repository at this point in the history
Add Nexus Worker interceptor
  • Loading branch information
Quinn-With-Two-Ns authored Dec 6, 2024
1 parent 9ac1af3 commit 30f391f
Show file tree
Hide file tree
Showing 28 changed files with 1,053 additions and 29 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.2.1-alpha'
nexusVersion = '0.3.0-alpha' // [0.1.0,)
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)

Expand Down
44 changes: 44 additions & 0 deletions releases/v1.27.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# **💥 BREAKING CHANGES**

## Update With Start **(Pre-release)**

### WorkflowClient

- `WorkflowClient.updateWithStart` has been renamed to `WorkflowClient.startUpdateWithStart`.
- Instead of taking the workflow method, workflow arguments and a `UpdateWithStartWorkflowOperation` , `WorkflowClient.startUpdateWithStart` now takes the update method, update arguments and a `WithStartWorkflowOperation` . `WithStartWorkflowOperation` holds to the workflow method and workflow arguments to be executed together with the update request.

### WorkflowStub

- `WorkflowStub.updateWithStart` has been renamed to `WorkflowStub.startUpdateWithStart`
- `WorkflowStub.startUpdateWithStart` now just takes the `UpdateOptions`, update arguments and workflow arguments

## Update **(Public Preview)**

- The SDK now preforms more rigorous type validation when registering a Workflow with an `@UpdateValidatorMethod` to make sure the type parameters match the linked `@UpdateMethod`
- The SDK will no longer sometimes throw `WorkflowUpdateException` when calling `WorkflowStub.startUpdate` if the update is rejected. `WorkflowUpdateException` is now consistently throw when getting the result of the update
- `UpdateOptionsBuilder` no longer generates a update ID when built. Now a unique UUID is generated when the options are used. This is similar to how `WorkflowOptions` and workflow ID work.

## Nexus **(Public Preview)**

- Workflow started by a Nexus operation now require the Workflow ID to be specified
- The SDK now preforms more rigorous type validation when registering a Nexus Service to make sure it implements the service properly
- All header maps for Nexus operations are now properly case-insensitive.

# **Highlights**

## Virtual Threads **(Public Preview)**

The Java SDK now has experimental support for virtual threads when using a JVM with a version of 21 or higher. Virtual threads can be used inside workflows by enabling `WorkerFactoryOptions.setUsingVirtualWorkflowThreads` . Users can also use virtual threads for task processing in a worker by enabling `WorkerOptions.setUsingVirtualThreads` .

## Nexus **(Public Preview)**

`WorkerInterceptor` now has support for intercepting Nexus workers.

## Update **(Public Preview)**

`WorkflowClient` now has a set of static methods called `startUpdate` that can be used to start an update, but not immediately wait on the result. This is a type safe analog to `WorkflowStub.startUpdate`.

## Workflow Metadata **(Public Preview)**

- The Java SDK now exposes a fixed summary option for local and normal activities.
- The Java SDK now support `__temporal_workflow_metadata` query, this query allows users to get details about a workflow like its’ current description and what signal, update, and query handlers are registered
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@

package io.temporal.opentracing;

import io.nexusrpc.handler.OperationContext;
import io.temporal.common.interceptors.*;
import io.temporal.opentracing.internal.ContextAccessor;
import io.temporal.opentracing.internal.OpenTracingActivityInboundCallsInterceptor;
import io.temporal.opentracing.internal.OpenTracingWorkflowInboundCallsInterceptor;
import io.temporal.opentracing.internal.SpanFactory;
import io.temporal.opentracing.internal.*;

public class OpenTracingWorkerInterceptor implements WorkerInterceptor {
private final OpenTracingOptions options;
Expand Down Expand Up @@ -52,4 +50,11 @@ public ActivityInboundCallsInterceptor interceptActivity(ActivityInboundCallsInt
return new OpenTracingActivityInboundCallsInterceptor(
next, options, spanFactory, contextAccessor);
}

@Override
public NexusOperationInboundCallsInterceptor interceptNexusOperation(
OperationContext context, NexusOperationInboundCallsInterceptor next) {
return new OpenTracingNexusOperationInboundCallsInterceptor(
next, options, spanFactory, contextAccessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public enum SpanOperationType {
UPDATE_WORKFLOW("UpdateWorkflow"),
HANDLE_QUERY("HandleQuery"),
HANDLE_SIGNAL("HandleSignal"),
HANDLE_UPDATE("HandleUpdate");
HANDLE_UPDATE("HandleUpdate"),
START_NEXUS_OPERATION("StartNexusOperation"),
RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"),
RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler");

private final String defaultPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case START_NEXUS_OPERATION:
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case RUN_START_NEXUS_OPERATION:
case RUN_CANCEL_NEXUS_OPERATION:
case HANDLE_QUERY:
return ImmutableMap.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapAdapter;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.StdConverterBackwardsCompatAdapter;
Expand Down Expand Up @@ -72,4 +74,20 @@ public SpanContext readSpanContextFromHeader(Header header, Tracer tracer) {
payload, HashMap.class, HASH_MAP_STRING_STRING_TYPE);
return codec.decode(serializedSpanContext, tracer);
}

public Span writeSpanContextToHeader(
Supplier<Span> spanSupplier, Map<String, String> header, Tracer tracer) {
Span span = spanSupplier.get();
writeSpanContextToHeader(span.context(), header, tracer);
return span;
}

public void writeSpanContextToHeader(
SpanContext spanContext, Map<String, String> header, Tracer tracer) {
tracer.inject(spanContext, Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header));
}

public SpanContext readSpanContextFromHeader(Map<String, String> header, Tracer tracer) {
return tracer.extract(Format.Builtin.HTTP_HEADERS, new TextMapAdapter(header));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.opentracing.internal;

import io.nexusrpc.OperationUnsuccessfulException;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor;
import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;

public class OpenTracingNexusOperationInboundCallsInterceptor
extends NexusOperationInboundCallsInterceptorBase {
private final OpenTracingOptions options;
private final SpanFactory spanFactory;
private final Tracer tracer;
private final ContextAccessor contextAccessor;

public OpenTracingNexusOperationInboundCallsInterceptor(
NexusOperationInboundCallsInterceptor next,
OpenTracingOptions options,
SpanFactory spanFactory,
ContextAccessor contextAccessor) {
super(next);
this.options = options;
this.spanFactory = spanFactory;
this.tracer = options.getTracer();
this.contextAccessor = contextAccessor;
}

@Override
public StartOperationOutput startOperation(StartOperationInput input)
throws OperationUnsuccessfulException {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Span operationStartSpan =
spanFactory
.createStartNexusOperationSpan(
tracer,
input.getOperationContext().getService(),
input.getOperationContext().getOperation(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(operationStartSpan)) {
return super.startOperation(input);
} catch (Throwable t) {
spanFactory.logFail(operationStartSpan, t);
throw t;
} finally {
operationStartSpan.finish();
}
}

@Override
public CancelOperationOutput cancelOperation(CancelOperationInput input) {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Span operationCancelSpan =
spanFactory
.createCancelNexusOperationSpan(
tracer,
input.getOperationContext().getService(),
input.getOperationContext().getOperation(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(operationCancelSpan)) {
return super.cancelOperation(input);
} catch (Throwable t) {
spanFactory.logFail(operationCancelSpan, t);
throw t;
} finally {
operationCancelSpan.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
}
}

@Override
public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
ExecuteNexusOperationInput<R> input) {
if (!WorkflowUnsafe.isReplaying()) {
Span nexusOperationExecuteSpan =
contextAccessor.writeSpanContextToHeader(
() -> createStartNexusOperationSpanBuilder(input).start(),
input.getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(nexusOperationExecuteSpan)) {
return super.executeNexusOperation(input);
} finally {
nexusOperationExecuteSpan.finish();
}
} else {
return super.executeNexusOperation(input);
}
}

@Override
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down Expand Up @@ -176,6 +195,17 @@ private <R> Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow
parentWorkflowInfo.getRunId());
}

private <R> Tracer.SpanBuilder createStartNexusOperationSpanBuilder(
ExecuteNexusOperationInput<R> input) {
WorkflowInfo parentWorkflowInfo = Workflow.getInfo();
return spanFactory.createStartNexusOperationSpan(
tracer,
input.getService(),
input.getOperation(),
parentWorkflowInfo.getWorkflowId(),
parentWorkflowInfo.getRunId());
}

private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) {
WorkflowInfo continuedWorkflowInfo = Workflow.getInfo();
return spanFactory.createContinueAsNewWorkflowStartSpan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ public Tracer.SpanBuilder createWorkflowStartSpan(
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStartNexusOperationSpan(
Tracer tracer, String serviceName, String operationName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.START_NEXUS_OPERATION)
.setActionName(serviceName + "/" + operationName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.CHILD_OF);
}

public Tracer.SpanBuilder createChildWorkflowStartSpan(
Tracer tracer,
String childWorkflowType,
Expand Down Expand Up @@ -173,6 +185,26 @@ public Tracer.SpanBuilder createActivityRunSpan(
return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createStartNexusOperationSpan(
Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.RUN_START_NEXUS_OPERATION)
.setActionName(serviceName + "/" + operationName)
.build();
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createCancelNexusOperationSpan(
Tracer tracer, String serviceName, String operationName, SpanContext nexusStartSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.RUN_CANCEL_NEXUS_OPERATION)
.setActionName(serviceName + "/" + operationName)
.build();
return createSpan(context, tracer, nexusStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowStartUpdateSpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
Expand Down
Loading

0 comments on commit 30f391f

Please sign in to comment.