diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index ec7caeb6e..513606865 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -27,7 +27,14 @@ public enum SpanOperationType { START_CHILD_WORKFLOW("StartChildWorkflow"), START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"), START_ACTIVITY("StartActivity"), - RUN_ACTIVITY("RunActivity"); + RUN_ACTIVITY("RunActivity"), + SIGNAL_EXTERNAL_WORKFLOW("SignalExternalWorkflow"), + QUERY_WORKFLOW("QueryWorkflow"), + SIGNAL_WORKFLOW("SignalWorkflow"), + UPDATE_WORKFLOW("UpdateWorkflow"), + HANDLE_QUERY("HandleQuery"), + HANDLE_SIGNAL("HandleSignal"), + HANDLE_UPDATE("HandleUpdate"); private final String defaultPrefix; diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 710ac73b4..43d226226 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -88,12 +88,20 @@ protected Map getSpanTags(SpanCreationContext context) { case RUN_WORKFLOW: case START_ACTIVITY: case RUN_ACTIVITY: + case SIGNAL_EXTERNAL_WORKFLOW: + case SIGNAL_WORKFLOW: + case UPDATE_WORKFLOW: + case QUERY_WORKFLOW: + case HANDLE_SIGNAL: + case HANDLE_UPDATE: String runId = context.getRunId(); Preconditions.checkNotNull( runId, "runId is expected to be not null for span operation type %s", operationType); return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case HANDLE_QUERY: + return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown span operation type provided"); } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index ef4840054..3ddd42d29 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -58,6 +58,27 @@ public WorkflowStartOutput start(WorkflowStartInput input) { } } + @Override + public WorkflowSignalOutput signal(WorkflowSignalInput input) { + Span workflowSignalSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowSignalSpan( + tracer, + input.getSignalName(), + input.getWorkflowExecution().getWorkflowId(), + input.getWorkflowExecution().getRunId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) { + return super.signal(input); + } finally { + workflowSignalSpan.finish(); + } + } + @Override public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) { WorkflowStartInput workflowStartInput = input.getWorkflowStartInput(); @@ -76,6 +97,48 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu } } + @Override + public QueryOutput query(QueryInput input) { + Span workflowQuerySpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowQuerySpan( + tracer, + input.getQueryType(), + input.getWorkflowExecution().getWorkflowId(), + input.getWorkflowExecution().getRunId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) { + return super.query(input); + } finally { + workflowQuerySpan.finish(); + } + } + + @Override + public StartUpdateOutput startUpdate(StartUpdateInput input) { + Span workflowStartUpdateSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowStartUpdateSpan( + tracer, + input.getUpdateName(), + input.getWorkflowExecution().getWorkflowId(), + input.getWorkflowExecution().getRunId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowStartUpdateSpan)) { + return super.startUpdate(input); + } finally { + workflowStartUpdateSpan.finish(); + } + } + private Tracer.SpanBuilder createWorkflowStartSpanBuilder( WorkflowStartInput input, SpanOperationType operationType) { return spanFactory.createWorkflowStartSpan( diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java index e518a8339..1c83cad9a 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -69,7 +69,7 @@ public WorkflowOutput execute(WorkflowInput input) { Workflow.getInfo().getRunId(), rootSpanContext) .start(); - try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) { + try (Scope ignored = tracer.scopeManager().activate(workflowRunSpan)) { return super.execute(input); } catch (Throwable t) { if (t instanceof DestroyWorkflowThreadError) { @@ -82,4 +82,83 @@ public WorkflowOutput execute(WorkflowInput input) { workflowRunSpan.finish(); } } + + @Override + public void handleSignal(SignalInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowSignalSpan = + spanFactory + .createWorkflowHandleSignalSpan( + tracer, + input.getSignalName(), + Workflow.getInfo().getWorkflowId(), + Workflow.getInfo().getRunId(), + rootSpanContext) + .start(); + try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) { + super.handleSignal(input); + } catch (Throwable t) { + if (t instanceof DestroyWorkflowThreadError) { + spanFactory.logEviction(workflowSignalSpan); + } else { + spanFactory.logFail(workflowSignalSpan, t); + } + throw t; + } finally { + workflowSignalSpan.finish(); + } + } + + @Override + public QueryOutput handleQuery(QueryInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowQuerySpan = + spanFactory + .createWorkflowHandleQuerySpan(tracer, input.getQueryName(), rootSpanContext) + .start(); + try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) { + return super.handleQuery(input); + } catch (Throwable t) { + if (t instanceof DestroyWorkflowThreadError) { + spanFactory.logEviction(workflowQuerySpan); + } else { + spanFactory.logFail(workflowQuerySpan, t); + } + throw t; + } finally { + workflowQuerySpan.finish(); + } + } + + @Override + public UpdateOutput executeUpdate(UpdateInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowSignalSpan = + spanFactory + .createWorkflowExecuteUpdateSpan( + tracer, + input.getUpdateName(), + Workflow.getInfo().getWorkflowId(), + Workflow.getInfo().getRunId(), + rootSpanContext) + .start(); + try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) { + return super.executeUpdate(input); + } catch (Throwable t) { + if (t instanceof DestroyWorkflowThreadError) { + spanFactory.logEviction(workflowSignalSpan); + } else { + spanFactory.logFail(workflowSignalSpan, t); + } + throw t; + } finally { + workflowSignalSpan.finish(); + } + } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 93048f454..5d6661d53 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -100,6 +100,32 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp } } + @Override + public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { + if (!WorkflowUnsafe.isReplaying()) { + WorkflowInfo workflowInfo = Workflow.getInfo(); + Span childWorkflowStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createExternalWorkflowSignalSpan( + tracer, + input.getSignalName(), + workflowInfo.getWorkflowId(), + workflowInfo.getRunId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) { + return super.signalExternalWorkflow(input); + } finally { + childWorkflowStartSpan.finish(); + } + } else { + return super.signalExternalWorkflow(input); + } + } + @Override public void continueAsNew(ContinueAsNewInput input) { if (!WorkflowUnsafe.isReplaying()) { diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 529be5561..9e0e79ed9 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -77,6 +77,46 @@ public Tracer.SpanBuilder createChildWorkflowStartSpan( return createSpan(context, tracer, null, References.CHILD_OF); } + public Tracer.SpanBuilder createExternalWorkflowSignalSpan( + Tracer tracer, String signalName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.SIGNAL_EXTERNAL_WORKFLOW) + .setActionName(signalName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowSignalSpan( + Tracer tracer, String signalName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW) + .setActionName(signalName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowHandleSignalSpan( + Tracer tracer, + String signalName, + String workflowId, + String runId, + SpanContext workflowSignalSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.HANDLE_SIGNAL) + .setActionName(signalName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createContinueAsNewWorkflowStartSpan( Tracer tracer, String continueAsNewWorkflowType, String workflowId, String parentRunId) { SpanCreationContext context = @@ -133,6 +173,56 @@ public Tracer.SpanBuilder createActivityRunSpan( return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( + Tracer tracer, String updateName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.UPDATE_WORKFLOW) + .setActionName(updateName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowExecuteUpdateSpan( + Tracer tracer, + String updateName, + String workflowId, + String runId, + SpanContext workflowUpdateSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.HANDLE_UPDATE) + .setActionName(updateName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, workflowUpdateSpanContext, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowQuerySpan( + Tracer tracer, String updateName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.QUERY_WORKFLOW) + .setActionName(updateName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowHandleQuerySpan( + Tracer tracer, String queryName, SpanContext workflowQuerySpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.HANDLE_QUERY) + .setActionName(queryName) + .build(); + return createSpan(context, tracer, workflowQuerySpanContext, References.FOLLOWS_FROM); + } + @SuppressWarnings("deprecation") public void logFail(Span toSpan, Throwable failReason) { toSpan.setTag(StandardTagNames.FAILED, true); diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java index 79ad11ceb..13b42e3b4 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java @@ -121,11 +121,24 @@ public void signalWithStart() { assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); assertEquals("SignalWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName()); - List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); - assertEquals(1, workflowRunSpans.size()); - - MockSpan workflowRunSpan = workflowRunSpans.get(0); - assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); - assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + if (SDKTestWorkflowRule.useExternalService) { + List workflowSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(2, workflowSpans.size()); + + MockSpan workflowSignalSpan = workflowSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId()); + assertEquals("HandleSignal:signal", workflowSignalSpan.operationName()); + + MockSpan workflowRunSpan = workflowSpans.get(1); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + } else { + List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(1, workflowRunSpans.size()); + + MockSpan workflowRunSpan = workflowRunSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + } } } diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 958958112..9cd54b6c9 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -88,7 +88,7 @@ public void signal(String signalName, Object... args) { try { workflowClientInvoker.signal( new WorkflowClientCallsInterceptor.WorkflowSignalInput( - targetExecution, signalName, args)); + targetExecution, signalName, Header.empty(), args)); } catch (Exception e) { Throwable throwable = throwAsWorkflowFailureException(e, targetExecution); throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable); @@ -275,7 +275,7 @@ public R query(String queryType, Class resultClass, Type resultType, Obje result = workflowClientInvoker.query( new WorkflowClientCallsInterceptor.QueryInput<>( - targetExecution, queryType, args, resultClass, resultType)); + targetExecution, queryType, Header.empty(), args, resultClass, resultType)); } catch (Exception e) { return throwAsWorkflowFailureExceptionForQuery(e, resultClass, targetExecution); } @@ -332,6 +332,7 @@ public UpdateHandle startUpdate(UpdateOptions options, Object... args) new WorkflowClientCallsInterceptor.StartUpdateInput<>( targetExecution, options.getUpdateName(), + Header.empty(), options.getUpdateId(), args, options.getResultClass(), diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 52d33067b..66cd5a6bc 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -152,12 +152,17 @@ public WorkflowExecution getWorkflowExecution() { final class WorkflowSignalInput { private final WorkflowExecution workflowExecution; private final String signalName; + private final Header header; private final Object[] arguments; public WorkflowSignalInput( - WorkflowExecution workflowExecution, String signalName, Object[] signalArguments) { + WorkflowExecution workflowExecution, + String signalName, + Header header, + Object[] signalArguments) { this.workflowExecution = workflowExecution; this.signalName = signalName; + this.header = header; this.arguments = signalArguments; } @@ -169,6 +174,10 @@ public String getSignalName() { return signalName; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } @@ -288,6 +297,7 @@ public CompletableFuture getResult() { final class QueryInput { private final WorkflowExecution workflowExecution; private final String queryType; + private final Header header; private final Object[] arguments; private final Class resultClass; private final Type resultType; @@ -295,11 +305,13 @@ final class QueryInput { public QueryInput( WorkflowExecution workflowExecution, String queryType, + Header header, Object[] arguments, Class resultClass, Type resultType) { this.workflowExecution = workflowExecution; this.queryType = queryType; + this.header = header; this.arguments = arguments; this.resultClass = resultClass; this.resultType = resultType; @@ -313,6 +325,10 @@ public String getQueryType() { return queryType; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } @@ -368,6 +384,7 @@ public WorkflowExecution getWorkflowExecution() { final class StartUpdateInput { private final WorkflowExecution workflowExecution; private final String updateName; + private final Header header; private final Object[] arguments; private final Class resultClass; private final Type resultType; @@ -378,6 +395,7 @@ final class StartUpdateInput { public StartUpdateInput( WorkflowExecution workflowExecution, String updateName, + Header header, String updateId, Object[] arguments, Class resultClass, @@ -385,6 +403,7 @@ public StartUpdateInput( String firstExecutionRunId, WaitPolicy waitPolicy) { this.workflowExecution = workflowExecution; + this.header = header; this.updateId = updateId; this.updateName = updateName; this.arguments = arguments; @@ -402,6 +421,10 @@ public String getUpdateName() { return updateName; } + public Header getHeader() { + return header; + } + public String getUpdateId() { return updateId; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java index 5e2c2369b..b9bf960c2 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java @@ -81,11 +81,13 @@ final class SignalInput { private final String signalName; private final Object[] arguments; private final long EventId; + private final Header header; - public SignalInput(String signalName, Object[] arguments, long eventId) { + public SignalInput(String signalName, Object[] arguments, long eventId, Header header) { this.signalName = signalName; this.arguments = arguments; EventId = eventId; + this.header = header; } public String getSignalName() { @@ -99,14 +101,20 @@ public Object[] getArguments() { public long getEventId() { return EventId; } + + public Header getHeader() { + return header; + } } final class QueryInput { private final String queryName; + private final Header header; private final Object[] arguments; - public QueryInput(String signalName, Object[] arguments) { - this.queryName = signalName; + public QueryInput(String queryName, Header header, Object[] arguments) { + this.queryName = queryName; + this.header = header; this.arguments = arguments; } @@ -114,6 +122,10 @@ public String getQueryName() { return queryName; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } @@ -134,10 +146,12 @@ public Object getResult() { @Experimental final class UpdateInput { private final String updateName; + private final Header header; private final Object[] arguments; - public UpdateInput(String updateName, Object[] arguments) { + public UpdateInput(String updateName, Header header, Object[] arguments) { this.updateName = updateName; + this.header = header; this.arguments = arguments; } @@ -145,6 +159,10 @@ public String getUpdateName() { return updateName; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 21a22a997..629bd79d7 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -263,11 +263,14 @@ public Promise getWorkflowExecution() { final class SignalExternalInput { private final WorkflowExecution execution; private final String signalName; + private final Header header; private final Object[] args; - public SignalExternalInput(WorkflowExecution execution, String signalName, Object[] args) { + public SignalExternalInput( + WorkflowExecution execution, String signalName, Header header, Object[] args) { this.execution = execution; this.signalName = signalName; + this.header = header; this.args = args; } @@ -279,6 +282,10 @@ public String getSignalName() { return signalName; } + public Header getHeader() { + return header; + } + public Object[] getArgs() { return args; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 113995770..ca5310b40 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -36,6 +36,7 @@ import io.temporal.common.converter.DataConverter; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.common.HeaderUtils; import io.temporal.payload.context.WorkflowSerializationContext; import io.temporal.worker.WorkflowTaskDispatchHandle; import java.lang.reflect.Type; @@ -133,7 +134,8 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { .setSignalName(input.getSignalName()) .setWorkflowExecution(input.getWorkflowExecution()) .setIdentity(clientOptions.getIdentity()) - .setNamespace(clientOptions.getNamespace()); + .setNamespace(clientOptions.getNamespace()) + .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); DataConverter dataConverterWitSignalContext = clientOptions @@ -253,7 +255,10 @@ public GetResultAsyncOutput getResultAsync(GetResultInput input) { @Override public QueryOutput query(QueryInput input) { - WorkflowQuery.Builder query = WorkflowQuery.newBuilder().setQueryType(input.getQueryType()); + WorkflowQuery.Builder query = + WorkflowQuery.newBuilder() + .setQueryType(input.getQueryType()) + .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); DataConverter dataConverterWithWorkflowContext = clientOptions .getDataConverter() @@ -303,7 +308,10 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { Optional inputArgs = dataConverterWithWorkflowContext.toPayloads(input.getArguments()); - Input.Builder updateInput = Input.newBuilder().setName(input.getUpdateName()); + Input.Builder updateInput = + Input.newBuilder() + .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)) + .setName(input.getUpdateName()); inputArgs.ifPresent(updateInput::setArgs); Request request = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java index b7e3f30af..8065745e5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java @@ -20,6 +20,7 @@ package io.temporal.internal.replay; +import io.temporal.api.common.v1.Header; import io.temporal.api.common.v1.Payloads; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.query.v1.WorkflowQuery; @@ -35,11 +36,15 @@ public interface ReplayWorkflow { void start(HistoryEvent event, ReplayWorkflowContext context); /** Handle an external signal event. */ - void handleSignal(String signalName, Optional input, long eventId); + void handleSignal(String signalName, Optional input, long eventId, Header header); /** Handle an update workflow execution event */ void handleUpdate( - String updateName, Optional input, long eventId, UpdateProtocolCallback callbacks); + String updateName, + Optional input, + long eventId, + Header header, + UpdateProtocolCallback callbacks); /** * @return true if the execution of the workflow method is finished or an exit was explicitly diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java index 40b1b61e1..87e359ebd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java @@ -144,7 +144,8 @@ public void handleWorkflowExecutionSignaled(HistoryEvent event) { } Optional input = signalAttributes.hasInput() ? Optional.of(signalAttributes.getInput()) : Optional.empty(); - this.workflow.handleSignal(signalAttributes.getSignalName(), input, event.getEventId()); + this.workflow.handleSignal( + signalAttributes.getSignalName(), input, event.getEventId(), signalAttributes.getHeader()); } public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) { @@ -157,7 +158,11 @@ public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) { Input input = update.getInput(); Optional args = Optional.ofNullable(input.getArgs()); this.workflow.handleUpdate( - input.getName(), args, protocolMessage.getEventId(), updateMessage.getCallbacks()); + input.getName(), + args, + protocolMessage.getEventId(), + input.getHeader(), + updateMessage.getCallbacks()); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Message is not an update."); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java index e7392f81b..ee959a13b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java @@ -125,7 +125,7 @@ public void signal(String signalName, Object... args) { outboundCallsInterceptor .signalExternalWorkflow( new WorkflowOutboundCallsInterceptor.SignalExternalInput( - execution.get(), signalName, args)) + execution.get(), signalName, Header.empty(), args)) .getResult(); if (AsyncInternal.isAsync()) { AsyncInternal.setAsyncResult(signaled); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java index 31558e21c..3378c3d96 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java @@ -21,6 +21,7 @@ package io.temporal.internal.sync; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.workflow.*; import java.util.Objects; @@ -53,7 +54,7 @@ public void signal(String signalName, Object... args) { outboundCallsInterceptor .signalExternalWorkflow( new WorkflowOutboundCallsInterceptor.SignalExternalInput( - execution, signalName, args)) + execution, signalName, Header.empty(), args)) .getResult(); if (AsyncInternal.isAsync()) { AsyncInternal.setAsyncResult(signaled); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java index 91246d981..891addf8c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java @@ -23,6 +23,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.workflow.DynamicQueryHandler; @@ -69,7 +70,7 @@ public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery( return new WorkflowInboundCallsInterceptor.QueryOutput(result); } - public Optional handleQuery(String queryName, Optional input) { + public Optional handleQuery(String queryName, Header header, Optional input) { WorkflowOutboundCallsInterceptor.RegisterQueryInput handler = queryCallbacks.get(queryName); Object[] args; if (handler == null) { @@ -85,7 +86,7 @@ public Optional handleQuery(String queryName, Optional input } Object result = inboundCallsInterceptor - .handleQuery(new WorkflowInboundCallsInterceptor.QueryInput(queryName, args)) + .handleQuery(new WorkflowInboundCallsInterceptor.QueryInput(queryName, header, args)) .getResult(); return dataConverterWithWorkflowContext.toPayloads(result); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java index cd4d5ee96..952bbe904 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java @@ -24,6 +24,7 @@ import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.DataConverterException; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.worker.MetricsType; @@ -76,13 +77,14 @@ public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput } } - public void handleSignal(String signalName, Optional input, long eventId) { + public void handleSignal( + String signalName, Optional input, long eventId, Header header) { WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler = signalCallbacks.get(signalName); Object[] args; if (handler == null) { if (dynamicSignalHandler == null) { - signalBuffer.add(new SignalData(signalName, input, eventId)); + signalBuffer.add(new SignalData(signalName, input, eventId, header)); return; } args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)}; @@ -97,7 +99,7 @@ public void handleSignal(String signalName, Optional input, long event } } inboundCallsInterceptor.handleSignal( - new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId)); + new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId, header)); } public void registerSignalHandlers( @@ -110,7 +112,11 @@ public void registerSignalHandlers( signalCallbacks.put(signalType, request); } for (SignalData signalData : signalBuffer) { - handleSignal(signalData.getSignalName(), signalData.getPayload(), signalData.getEventId()); + handleSignal( + signalData.getSignalName(), + signalData.getPayload(), + signalData.getEventId(), + signalData.getHeader()); } } @@ -140,11 +146,13 @@ private static class SignalData { private final String signalName; private final Optional payload; private final long eventId; + private final Header header; - private SignalData(String signalName, Optional payload, long eventId) { + private SignalData(String signalName, Optional payload, long eventId, Header header) { this.signalName = Objects.requireNonNull(signalName); this.payload = Objects.requireNonNull(payload); this.eventId = eventId; + this.header = header; } public String getSignalName() { @@ -158,5 +166,9 @@ public Optional getPayload() { public long getEventId() { return eventId; } + + public Header getHeader() { + return header; + } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index 54bf7943a..bfab45e04 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -20,6 +20,7 @@ package io.temporal.internal.sync; +import io.temporal.api.common.v1.Header; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.common.v1.WorkflowType; @@ -140,14 +141,20 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) { } @Override - public void handleSignal(String signalName, Optional input, long eventId) { + public void handleSignal( + String signalName, Optional input, long eventId, Header header) { runner.executeInWorkflowThread( - "signal " + signalName, () -> workflowProc.handleSignal(signalName, input, eventId)); + "signal " + signalName, + () -> workflowProc.handleSignal(signalName, input, eventId, header)); } @Override public void handleUpdate( - String updateName, Optional input, long eventId, UpdateProtocolCallback callbacks) { + String updateName, + Optional input, + long eventId, + Header header, + UpdateProtocolCallback callbacks) { runner.executeInWorkflowThread( "update " + updateName, () -> { @@ -158,7 +165,7 @@ public void handleUpdate( // should not just be run // in a workflow thread workflowContext.setReadOnly(true); - workflowProc.handleValidateUpdate(updateName, input, eventId); + workflowProc.handleValidateUpdate(updateName, input, eventId, header); } catch (Exception e) { callbacks.reject(this.dataConverter.exceptionToFailure(e)); return; @@ -169,7 +176,7 @@ public void handleUpdate( callbacks.accept(); try { Optional result = - workflowProc.handleExecuteUpdate(updateName, input, eventId); + workflowProc.handleExecuteUpdate(updateName, input, eventId, header); callbacks.complete(result, null); } catch (WorkflowExecutionException e) { callbacks.complete(Optional.empty(), e.getFailure()); @@ -215,7 +222,7 @@ public Optional query(WorkflowQuery query) { } Optional args = query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty(); - return workflowProc.handleQuery(query.getQueryType(), args); + return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 4b2f5f524..02dc8a42b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -56,6 +56,7 @@ import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.failure.*; import io.temporal.internal.common.ActivityOptionUtils; +import io.temporal.internal.common.HeaderUtils; import io.temporal.internal.common.OptionsUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SdkFlag; @@ -315,17 +316,19 @@ public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput signalDispatcher.handleInterceptedSignal(input); } - public void handleSignal(String signalName, Optional input, long eventId) { - signalDispatcher.handleSignal(signalName, input, eventId); + public void handleSignal( + String signalName, Optional input, long eventId, Header header) { + signalDispatcher.handleSignal(signalName, input, eventId, header); } - public void handleValidateUpdate(String updateName, Optional input, long eventId) { - updateDispatcher.handleValidateUpdate(updateName, input, eventId); + public void handleValidateUpdate( + String updateName, Optional input, long eventId, Header header) { + updateDispatcher.handleValidateUpdate(updateName, input, eventId, header); } public Optional handleExecuteUpdate( - String updateName, Optional input, long eventId) { - return updateDispatcher.handleExecuteUpdate(updateName, input, eventId); + String updateName, Optional input, long eventId, Header header) { + return updateDispatcher.handleExecuteUpdate(updateName, input, eventId, header); } public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) { @@ -342,8 +345,8 @@ public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery( return queryDispatcher.handleInterceptedQuery(input); } - public Optional handleQuery(String queryName, Optional input) { - return queryDispatcher.handleQuery(queryName, input); + public Optional handleQuery(String queryName, Header header, Optional input) { + return queryDispatcher.handleQuery(queryName, header, input); } private class ActivityCallback { @@ -1033,6 +1036,7 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { SignalExternalWorkflowExecutionCommandAttributes.newBuilder(); attributes.setSignalName(input.getSignalName()); attributes.setExecution(childExecution); + attributes.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); Optional payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs()); payloads.ifPresent(attributes::setInput); CompletablePromise result = Workflow.newPromise(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java index 19a4d2b3e..aaa96060e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java @@ -23,6 +23,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor.UpdateInput; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor.UpdateOutput; @@ -53,7 +54,8 @@ public void setInboundCallsInterceptor(WorkflowInboundCallsInterceptor inboundCa this.inboundCallsInterceptor = inboundCallsInterceptor; } - public void handleValidateUpdate(String updateName, Optional input, long eventId) { + public void handleValidateUpdate( + String updateName, Optional input, long eventId, Header header) { WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest handler = updateCallbacks.get(updateName); Object[] args; @@ -70,11 +72,11 @@ public void handleValidateUpdate(String updateName, Optional input, lo } inboundCallsInterceptor.validateUpdate( - new WorkflowInboundCallsInterceptor.UpdateInput(updateName, args)); + new WorkflowInboundCallsInterceptor.UpdateInput(updateName, header, args)); } public Optional handleExecuteUpdate( - String updateName, Optional input, long eventId) { + String updateName, Optional input, long eventId, Header header) { WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest handler = updateCallbacks.get(updateName); Object[] args; @@ -91,7 +93,8 @@ public Optional handleExecuteUpdate( } Object result = inboundCallsInterceptor - .executeUpdate(new WorkflowInboundCallsInterceptor.UpdateInput(updateName, args)) + .executeUpdate( + new WorkflowInboundCallsInterceptor.UpdateInput(updateName, header, args)) .getResult(); return dataConverterWithWorkflowContext.toPayloads(result); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java index 7a5c15cd3..1a78df46f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java @@ -87,30 +87,42 @@ public Optional getOutput() { public void close() {} - public void handleSignal(String signalName, Optional input, long eventId) { + public void handleSignal( + String signalName, + Optional input, + long eventId, + io.temporal.api.common.v1.Header header) { try { - context.handleSignal(signalName, input, eventId); + context.handleSignal(signalName, input, eventId, new Header(header)); } catch (Throwable e) { applyWorkflowFailurePolicyAndRethrow(e); } } - public Optional handleQuery(String type, Optional args) { - return context.handleQuery(type, args); + public Optional handleQuery( + String type, io.temporal.api.common.v1.Header header, Optional args) { + return context.handleQuery(type, new Header(header), args); } - public void handleValidateUpdate(String updateName, Optional input, long eventId) { + public void handleValidateUpdate( + String updateName, + Optional input, + long eventId, + io.temporal.api.common.v1.Header header) { try { - context.handleValidateUpdate(updateName, input, eventId); + context.handleValidateUpdate(updateName, input, eventId, new Header(header)); } catch (Throwable e) { applyWorkflowFailurePolicyAndRethrow(e); } } public Optional handleExecuteUpdate( - String updateName, Optional input, long eventId) { + String updateName, + Optional input, + long eventId, + io.temporal.api.common.v1.Header header) { try { - return context.handleExecuteUpdate(updateName, input, eventId); + return context.handleExecuteUpdate(updateName, input, eventId, new Header(header)); } catch (Throwable e) { applyWorkflowFailurePolicyAndRethrow(e); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java index 94138738a..0fe1d11d6 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java @@ -32,6 +32,7 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; +import io.temporal.api.common.v1.Header; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.history.v1.HistoryEvent; @@ -292,13 +293,15 @@ private ReplayWorkflowRunTaskHandler createFakeExecutor(PollWorkflowTaskQueueRes public void start(HistoryEvent event, ReplayWorkflowContext context) {} @Override - public void handleSignal(String signalName, Optional input, long eventId) {} + public void handleSignal( + String signalName, Optional input, long eventId, Header header) {} @Override public void handleUpdate( String updateName, Optional input, long eventId, + Header header, UpdateProtocolCallback callbacks) {} @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java index 4faf0be1d..07fca97ff 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java @@ -27,6 +27,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import java.lang.reflect.Type; @@ -67,7 +68,8 @@ public void testQuerySuccess() { dispatcher.setInboundCallsInterceptor(mockInterceptor); // Invoke functionality under test, expect no exceptions for an existing query. - Optional queryResult = dispatcher.handleQuery("QueryB", Optional.empty()); + Optional queryResult = + dispatcher.handleQuery("QueryB", Header.empty(), Optional.empty()); assertTrue(queryResult.isPresent()); } @@ -79,7 +81,7 @@ public void testQueryDispatcherException() { assertThrows( IllegalArgumentException.class, () -> { - dispatcher.handleQuery("QueryC", null); + dispatcher.handleQuery("QueryC", Header.empty(), null); }); assertEquals("Unknown query type: QueryC, knownTypes=[QueryA, QueryB]", exception.getMessage()); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java index 42683ea42..7c54fa63b 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java @@ -596,6 +596,7 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { new SignalExternalInput( input.getExecution(), overrideSignalName.apply(input.getSignalName()), + Header.empty(), overrideArgs.apply(args))); } }