From 97561e8b52f1f54b0d9b229e95545580dc598574 Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 16 Jul 2023 15:44:31 +0330 Subject: [PATCH 01/17] feat: support tracing on signals --- .../opentracing/SpanOperationType.java | 3 +- .../ActionTypeAndNameSpanBuilderProvider.java | 1 + ...TracingWorkflowClientCallsInterceptor.java | 22 +++++++++++++++ ...racingWorkflowInboundCallsInterceptor.java | 28 +++++++++++++++++++ ...acingWorkflowOutboundCallsInterceptor.java | 26 +++++++++++++++++ .../opentracing/internal/SpanFactory.java | 28 +++++++++++++++++++ .../io/temporal/client/WorkflowStubImpl.java | 2 +- .../WorkflowClientCallsInterceptor.java | 11 +++++++- .../WorkflowInboundCallsInterceptor.java | 8 +++++- .../WorkflowOutboundCallsInterceptor.java | 9 +++++- .../client/RootWorkflowClientInvoker.java | 6 +++- .../internal/replay/ReplayWorkflow.java | 3 +- .../replay/ReplayWorkflowExecutor.java | 3 +- .../internal/sync/ChildWorkflowStubImpl.java | 2 +- .../sync/ExternalWorkflowStubImpl.java | 3 +- .../internal/sync/SignalDispatcher.java | 22 +++++++++++---- .../temporal/internal/sync/SyncWorkflow.java | 7 +++-- .../internal/sync/SyncWorkflowContext.java | 7 +++-- .../sync/WorkflowExecutionHandler.java | 8 ++++-- 19 files changed, 178 insertions(+), 21 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index ec7caeb6e..9685910a8 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -27,7 +27,8 @@ public enum SpanOperationType { START_CHILD_WORKFLOW("StartChildWorkflow"), START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"), START_ACTIVITY("StartActivity"), - RUN_ACTIVITY("RunActivity"); + RUN_ACTIVITY("RunActivity"), + SIGNAL_WORKFLOW("SignalWorkflow"); private final String defaultPrefix; diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 710ac73b4..8e3ada360 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -88,6 +88,7 @@ protected Map getSpanTags(SpanCreationContext context) { case RUN_WORKFLOW: case START_ACTIVITY: case RUN_ACTIVITY: + case SIGNAL_WORKFLOW: String runId = context.getRunId(); Preconditions.checkNotNull( runId, "runId is expected to be not null for span operation type %s", operationType); diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index ef4840054..edf42e229 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -20,6 +20,7 @@ package io.temporal.opentracing.internal; +import io.opentracing.References; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.Tracer; @@ -58,6 +59,27 @@ public WorkflowStartOutput start(WorkflowStartInput input) { } } + @Override + public WorkflowSignalOutput signal(WorkflowSignalInput input) { + Span workflowStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowSignalSpan( + tracer, + input.getSignalName(), + input.getWorkflowExecution().getWorkflowId(), + References.FOLLOWS_FROM) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowStartSpan)) { + return super.signal(input); + } finally { + workflowStartSpan.finish(); + } + } + @Override public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) { WorkflowStartInput workflowStartInput = input.getWorkflowStartInput(); diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java index e518a8339..c7141ac66 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -82,4 +82,32 @@ public WorkflowOutput execute(WorkflowInput input) { workflowRunSpan.finish(); } } + + @Override + public void handleSignal(SignalInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowRunSpan = + spanFactory + .createWorkflowSignalSpan( + tracer, + Workflow.getInfo().getWorkflowType(), + Workflow.getInfo().getWorkflowId(), + Workflow.getInfo().getRunId(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) { + super.handleSignal(input); + } catch (Throwable t) { + if (t instanceof DestroyWorkflowThreadError) { + spanFactory.logEviction(workflowRunSpan); + } else { + spanFactory.logFail(workflowRunSpan, t); + } + throw t; + } finally { + workflowRunSpan.finish(); + } + } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 93048f454..0ecc2f137 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -100,6 +100,32 @@ public ChildWorkflowOutput executeChildWorkflow(ChildWorkflowInput inp } } + @Override + public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { + if (!WorkflowUnsafe.isReplaying()) { + WorkflowInfo workflowInfo = Workflow.getInfo(); + Span childWorkflowStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowSignalSpan( + tracer, + input.getSignalName(), + workflowInfo.getWorkflowId(), + workflowInfo.getRunId()) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) { + return super.signalExternalWorkflow(input); + } finally { + childWorkflowStartSpan.finish(); + } + } else { + return super.signalExternalWorkflow(input); + } + } + @Override public void continueAsNew(ContinueAsNewInput input) { if (!WorkflowUnsafe.isReplaying()) { diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 529be5561..779d7c061 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -77,6 +77,34 @@ public Tracer.SpanBuilder createChildWorkflowStartSpan( return createSpan(context, tracer, null, References.CHILD_OF); } + public Tracer.SpanBuilder createWorkflowSignalSpan( + Tracer tracer, String signalName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW) + .setActionName(signalName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowSignalSpan( + Tracer tracer, + String signalName, + String workflowId, + String runId, + SpanContext workflowSignalSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW) + .setActionName(signalName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createContinueAsNewWorkflowStartSpan( Tracer tracer, String continueAsNewWorkflowType, String workflowId, String parentRunId) { SpanCreationContext context = diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 958958112..816280e84 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -88,7 +88,7 @@ public void signal(String signalName, Object... args) { try { workflowClientInvoker.signal( new WorkflowClientCallsInterceptor.WorkflowSignalInput( - targetExecution, signalName, args)); + targetExecution, signalName, Header.empty(), args)); } catch (Exception e) { Throwable throwable = throwAsWorkflowFailureException(e, targetExecution); throw new WorkflowServiceException(targetExecution, workflowType.orElse(null), throwable); diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 52d33067b..cebd2dd0e 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -152,12 +152,17 @@ public WorkflowExecution getWorkflowExecution() { final class WorkflowSignalInput { private final WorkflowExecution workflowExecution; private final String signalName; + private final Header header; private final Object[] arguments; public WorkflowSignalInput( - WorkflowExecution workflowExecution, String signalName, Object[] signalArguments) { + WorkflowExecution workflowExecution, + String signalName, + Header header, + Object[] signalArguments) { this.workflowExecution = workflowExecution; this.signalName = signalName; + this.header = header; this.arguments = signalArguments; } @@ -169,6 +174,10 @@ public String getSignalName() { return signalName; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java index 5e2c2369b..d80402865 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java @@ -81,11 +81,13 @@ final class SignalInput { private final String signalName; private final Object[] arguments; private final long EventId; + private final Header header; - public SignalInput(String signalName, Object[] arguments, long eventId) { + public SignalInput(String signalName, Object[] arguments, long eventId, Header header) { this.signalName = signalName; this.arguments = arguments; EventId = eventId; + this.header = header; } public String getSignalName() { @@ -99,6 +101,10 @@ public Object[] getArguments() { public long getEventId() { return EventId; } + + public Header getHeader() { + return header; + } } final class QueryInput { diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 21a22a997..629bd79d7 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -263,11 +263,14 @@ public Promise getWorkflowExecution() { final class SignalExternalInput { private final WorkflowExecution execution; private final String signalName; + private final Header header; private final Object[] args; - public SignalExternalInput(WorkflowExecution execution, String signalName, Object[] args) { + public SignalExternalInput( + WorkflowExecution execution, String signalName, Header header, Object[] args) { this.execution = execution; this.signalName = signalName; + this.header = header; this.args = args; } @@ -279,6 +282,10 @@ public String getSignalName() { return signalName; } + public Header getHeader() { + return header; + } + public Object[] getArgs() { return args; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 113995770..237389839 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -21,6 +21,7 @@ package io.temporal.internal.client; import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; +import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; import io.grpc.Deadline; import io.grpc.Status; @@ -128,12 +129,15 @@ public WorkflowStartOutput start(WorkflowStartInput input) { @Override public WorkflowSignalOutput signal(WorkflowSignalInput input) { + Header grpcHeader = toHeaderGrpc(input.getHeader(), null); + SignalWorkflowExecutionRequest.Builder request = SignalWorkflowExecutionRequest.newBuilder() .setSignalName(input.getSignalName()) .setWorkflowExecution(input.getWorkflowExecution()) .setIdentity(clientOptions.getIdentity()) - .setNamespace(clientOptions.getNamespace()); + .setNamespace(clientOptions.getNamespace()) + .setHeader(grpcHeader); DataConverter dataConverterWitSignalContext = clientOptions diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java index b7e3f30af..c66a4f929 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java @@ -20,6 +20,7 @@ package io.temporal.internal.replay; +import io.temporal.api.common.v1.Header; import io.temporal.api.common.v1.Payloads; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.query.v1.WorkflowQuery; @@ -35,7 +36,7 @@ public interface ReplayWorkflow { void start(HistoryEvent event, ReplayWorkflowContext context); /** Handle an external signal event. */ - void handleSignal(String signalName, Optional input, long eventId); + void handleSignal(String signalName, Optional input, long eventId, Header header); /** Handle an update workflow execution event */ void handleUpdate( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java index 40b1b61e1..343a826fa 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java @@ -144,7 +144,8 @@ public void handleWorkflowExecutionSignaled(HistoryEvent event) { } Optional input = signalAttributes.hasInput() ? Optional.of(signalAttributes.getInput()) : Optional.empty(); - this.workflow.handleSignal(signalAttributes.getSignalName(), input, event.getEventId()); + this.workflow.handleSignal( + signalAttributes.getSignalName(), input, event.getEventId(), signalAttributes.getHeader()); } public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java index 8e1874e34..82763e783 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ChildWorkflowStubImpl.java @@ -119,7 +119,7 @@ public void signal(String signalName, Object... args) { outboundCallsInterceptor .signalExternalWorkflow( new WorkflowOutboundCallsInterceptor.SignalExternalInput( - execution.get(), signalName, args)) + execution.get(), signalName, Header.empty(), args)) .getResult(); if (AsyncInternal.isAsync()) { AsyncInternal.setAsyncResult(signaled); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java index 88cc5a9ff..9368e81fa 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/ExternalWorkflowStubImpl.java @@ -21,6 +21,7 @@ package io.temporal.internal.sync; import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.workflow.CancelExternalWorkflowException; import io.temporal.workflow.ExternalWorkflowStub; @@ -51,7 +52,7 @@ public void signal(String signalName, Object... args) { outboundCallsInterceptor .signalExternalWorkflow( new WorkflowOutboundCallsInterceptor.SignalExternalInput( - execution, signalName, args)) + execution, signalName, Header.empty(), args)) .getResult(); if (AsyncInternal.isAsync()) { AsyncInternal.setAsyncResult(signaled); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java index cd4d5ee96..952bbe904 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java @@ -24,6 +24,7 @@ import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.DataConverterException; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.worker.MetricsType; @@ -76,13 +77,14 @@ public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput } } - public void handleSignal(String signalName, Optional input, long eventId) { + public void handleSignal( + String signalName, Optional input, long eventId, Header header) { WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler = signalCallbacks.get(signalName); Object[] args; if (handler == null) { if (dynamicSignalHandler == null) { - signalBuffer.add(new SignalData(signalName, input, eventId)); + signalBuffer.add(new SignalData(signalName, input, eventId, header)); return; } args = new Object[] {new EncodedValues(input, dataConverterWithWorkflowContext)}; @@ -97,7 +99,7 @@ public void handleSignal(String signalName, Optional input, long event } } inboundCallsInterceptor.handleSignal( - new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId)); + new WorkflowInboundCallsInterceptor.SignalInput(signalName, args, eventId, header)); } public void registerSignalHandlers( @@ -110,7 +112,11 @@ public void registerSignalHandlers( signalCallbacks.put(signalType, request); } for (SignalData signalData : signalBuffer) { - handleSignal(signalData.getSignalName(), signalData.getPayload(), signalData.getEventId()); + handleSignal( + signalData.getSignalName(), + signalData.getPayload(), + signalData.getEventId(), + signalData.getHeader()); } } @@ -140,11 +146,13 @@ private static class SignalData { private final String signalName; private final Optional payload; private final long eventId; + private final Header header; - private SignalData(String signalName, Optional payload, long eventId) { + private SignalData(String signalName, Optional payload, long eventId, Header header) { this.signalName = Objects.requireNonNull(signalName); this.payload = Objects.requireNonNull(payload); this.eventId = eventId; + this.header = header; } public String getSignalName() { @@ -158,5 +166,9 @@ public Optional getPayload() { public long getEventId() { return eventId; } + + public Header getHeader() { + return header; + } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index 250e03201..781f925bd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -20,6 +20,7 @@ package io.temporal.internal.sync; +import io.temporal.api.common.v1.Header; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.common.v1.WorkflowType; @@ -140,9 +141,11 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) { } @Override - public void handleSignal(String signalName, Optional input, long eventId) { + public void handleSignal( + String signalName, Optional input, long eventId, Header header) { runner.executeInWorkflowThread( - "signal " + signalName, () -> workflowProc.handleSignal(signalName, input, eventId)); + "signal " + signalName, + () -> workflowProc.handleSignal(signalName, input, eventId, header)); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 2d16b0442..61d765830 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -55,6 +55,7 @@ import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.failure.*; import io.temporal.internal.common.ActivityOptionUtils; +import io.temporal.internal.common.HeaderUtils; import io.temporal.internal.common.OptionsUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.SearchAttributesUtil; @@ -312,8 +313,9 @@ public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput signalDispatcher.handleInterceptedSignal(input); } - public void handleSignal(String signalName, Optional input, long eventId) { - signalDispatcher.handleSignal(signalName, input, eventId); + public void handleSignal( + String signalName, Optional input, long eventId, Header header) { + signalDispatcher.handleSignal(signalName, input, eventId, header); } public void handleValidateUpdate(String updateName, Optional input, long eventId) { @@ -998,6 +1000,7 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { SignalExternalWorkflowExecutionCommandAttributes.newBuilder(); attributes.setSignalName(input.getSignalName()); attributes.setExecution(childExecution); + attributes.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); Optional payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs()); payloads.ifPresent(attributes::setInput); CompletablePromise result = Workflow.newPromise(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java index 7a5c15cd3..f14c5e532 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java @@ -87,9 +87,13 @@ public Optional getOutput() { public void close() {} - public void handleSignal(String signalName, Optional input, long eventId) { + public void handleSignal( + String signalName, + Optional input, + long eventId, + io.temporal.api.common.v1.Header header) { try { - context.handleSignal(signalName, input, eventId); + context.handleSignal(signalName, input, eventId, new Header(header)); } catch (Throwable e) { applyWorkflowFailurePolicyAndRethrow(e); } From 6e15b18f7c10500959f04312d0265126685229c2 Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 16 Jul 2023 20:12:25 +0330 Subject: [PATCH 02/17] feat: support tracing on signals --- .../replay/ReplayWorkflowRunTaskHandlerCacheTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java index 76d408bb3..68b591517 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java @@ -32,6 +32,7 @@ import com.uber.m3.tally.Scope; import com.uber.m3.util.Duration; import com.uber.m3.util.ImmutableMap; +import io.temporal.api.common.v1.Header; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.history.v1.HistoryEvent; @@ -291,7 +292,7 @@ private ReplayWorkflowRunTaskHandler createFakeExecutor(PollWorkflowTaskQueueRes public void start(HistoryEvent event, ReplayWorkflowContext context) {} @Override - public void handleSignal(String signalName, Optional input, long eventId) {} + public void handleSignal(String signalName, Optional input, long eventId, Header header) {} @Override public void handleUpdate( From 47d1002174aec1dfc073fc629717378d1b20c165 Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 16 Jul 2023 20:53:47 +0330 Subject: [PATCH 03/17] feat: support tracing on signals --- .../temporal/internal/client/RootWorkflowClientInvoker.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 237389839..88dcff8af 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -37,6 +37,7 @@ import io.temporal.common.converter.DataConverter; import io.temporal.common.interceptors.WorkflowClientCallsInterceptor; import io.temporal.internal.client.external.GenericWorkflowClient; +import io.temporal.internal.common.HeaderUtils; import io.temporal.payload.context.WorkflowSerializationContext; import io.temporal.worker.WorkflowTaskDispatchHandle; import java.lang.reflect.Type; @@ -129,15 +130,13 @@ public WorkflowStartOutput start(WorkflowStartInput input) { @Override public WorkflowSignalOutput signal(WorkflowSignalInput input) { - Header grpcHeader = toHeaderGrpc(input.getHeader(), null); - SignalWorkflowExecutionRequest.Builder request = SignalWorkflowExecutionRequest.newBuilder() .setSignalName(input.getSignalName()) .setWorkflowExecution(input.getWorkflowExecution()) .setIdentity(clientOptions.getIdentity()) .setNamespace(clientOptions.getNamespace()) - .setHeader(grpcHeader); + .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); DataConverter dataConverterWitSignalContext = clientOptions From acc4296c6476e747acd077bdb3b88d5d82af48e9 Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 16 Jul 2023 21:09:31 +0330 Subject: [PATCH 04/17] feat: support tracing on updates --- .../opentracing/SpanOperationType.java | 3 +- .../ActionTypeAndNameSpanBuilderProvider.java | 1 + ...TracingWorkflowClientCallsInterceptor.java | 27 +++++++++++-- ...racingWorkflowInboundCallsInterceptor.java | 38 ++++++++++++++++--- .../opentracing/internal/SpanFactory.java | 12 ++++++ .../io/temporal/client/WorkflowStubImpl.java | 1 + .../WorkflowClientCallsInterceptor.java | 7 ++++ .../WorkflowInboundCallsInterceptor.java | 8 +++- .../client/RootWorkflowClientInvoker.java | 6 ++- .../internal/replay/ReplayWorkflow.java | 6 ++- .../replay/ReplayWorkflowExecutor.java | 6 ++- .../temporal/internal/sync/SyncWorkflow.java | 10 +++-- .../internal/sync/SyncWorkflowContext.java | 9 +++-- .../internal/sync/UpdateDispatcher.java | 11 ++++-- .../sync/WorkflowExecutionHandler.java | 15 ++++++-- ...eplayWorkflowRunTaskHandlerCacheTests.java | 4 +- 16 files changed, 134 insertions(+), 30 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 9685910a8..15bc2bd3c 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -28,7 +28,8 @@ public enum SpanOperationType { START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"), START_ACTIVITY("StartActivity"), RUN_ACTIVITY("RunActivity"), - SIGNAL_WORKFLOW("SignalWorkflow"); + SIGNAL_WORKFLOW("SignalWorkflow"), + UPDATE_WORKFLOW("UpdateWorkflow"); private final String defaultPrefix; diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 8e3ada360..3b4d50c78 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -89,6 +89,7 @@ protected Map getSpanTags(SpanCreationContext context) { case START_ACTIVITY: case RUN_ACTIVITY: case SIGNAL_WORKFLOW: + case UPDATE_WORKFLOW: String runId = context.getRunId(); Preconditions.checkNotNull( runId, "runId is expected to be not null for span operation type %s", operationType); diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index edf42e229..919918120 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -61,7 +61,7 @@ public WorkflowStartOutput start(WorkflowStartInput input) { @Override public WorkflowSignalOutput signal(WorkflowSignalInput input) { - Span workflowStartSpan = + Span workflowSignalSpan = contextAccessor.writeSpanContextToHeader( () -> spanFactory @@ -73,10 +73,10 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { .start(), input.getHeader(), tracer); - try (Scope ignored = tracer.scopeManager().activate(workflowStartSpan)) { + try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) { return super.signal(input); } finally { - workflowStartSpan.finish(); + workflowSignalSpan.finish(); } } @@ -98,6 +98,27 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu } } + @Override + public StartUpdateOutput startUpdate(StartUpdateInput input) { + Span workflowStartUpdateSpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowStartUpdateSpan( + tracer, + input.getUpdateName(), + input.getWorkflowExecution().getWorkflowId(), + References.FOLLOWS_FROM) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowStartUpdateSpan)) { + return super.startUpdate(input); + } finally { + workflowStartUpdateSpan.finish(); + } + } + private Tracer.SpanBuilder createWorkflowStartSpanBuilder( WorkflowStartInput input, SpanOperationType operationType) { return spanFactory.createWorkflowStartSpan( diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java index c7141ac66..fc2ed8de3 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -88,7 +88,7 @@ public void handleSignal(SignalInput input) { Tracer tracer = options.getTracer(); SpanContext rootSpanContext = contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); - Span workflowRunSpan = + Span workflowSignalSpan = spanFactory .createWorkflowSignalSpan( tracer, @@ -97,17 +97,45 @@ public void handleSignal(SignalInput input) { Workflow.getInfo().getRunId(), rootSpanContext) .start(); - try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) { + try (Scope scope = tracer.scopeManager().activate(workflowSignalSpan)) { super.handleSignal(input); } catch (Throwable t) { if (t instanceof DestroyWorkflowThreadError) { - spanFactory.logEviction(workflowRunSpan); + spanFactory.logEviction(workflowSignalSpan); } else { - spanFactory.logFail(workflowRunSpan, t); + spanFactory.logFail(workflowSignalSpan, t); } throw t; } finally { - workflowRunSpan.finish(); + workflowSignalSpan.finish(); + } + } + + @Override + public UpdateOutput executeUpdate(UpdateInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowSignalSpan = + spanFactory + .createWorkflowSignalSpan( + tracer, + Workflow.getInfo().getWorkflowType(), + Workflow.getInfo().getWorkflowId(), + Workflow.getInfo().getRunId(), + rootSpanContext) + .start(); + try (Scope scope = tracer.scopeManager().activate(workflowSignalSpan)) { + return super.executeUpdate(input); + } catch (Throwable t) { + if (t instanceof DestroyWorkflowThreadError) { + spanFactory.logEviction(workflowSignalSpan); + } else { + spanFactory.logFail(workflowSignalSpan, t); + } + throw t; + } finally { + workflowSignalSpan.finish(); } } } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 779d7c061..33256d267 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -161,6 +161,18 @@ public Tracer.SpanBuilder createActivityRunSpan( return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( + Tracer tracer, String updateName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW) + .setActionName(updateName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + @SuppressWarnings("deprecation") public void logFail(Span toSpan, Throwable failReason) { toSpan.setTag(StandardTagNames.FAILED, true); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 816280e84..81eab16e8 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -332,6 +332,7 @@ public UpdateHandle startUpdate(UpdateOptions options, Object... args) new WorkflowClientCallsInterceptor.StartUpdateInput<>( targetExecution, options.getUpdateName(), + Header.empty(), options.getUpdateId(), args, options.getResultClass(), diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index cebd2dd0e..494e08afc 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -377,6 +377,7 @@ public WorkflowExecution getWorkflowExecution() { final class StartUpdateInput { private final WorkflowExecution workflowExecution; private final String updateName; + private final Header header; private final Object[] arguments; private final Class resultClass; private final Type resultType; @@ -387,6 +388,7 @@ final class StartUpdateInput { public StartUpdateInput( WorkflowExecution workflowExecution, String updateName, + Header header, String updateId, Object[] arguments, Class resultClass, @@ -394,6 +396,7 @@ public StartUpdateInput( String firstExecutionRunId, WaitPolicy waitPolicy) { this.workflowExecution = workflowExecution; + this.header = header; this.updateId = updateId; this.updateName = updateName; this.arguments = arguments; @@ -411,6 +414,10 @@ public String getUpdateName() { return updateName; } + public Header getHeader() { + return header; + } + public String getUpdateId() { return updateId; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java index d80402865..41a57e382 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java @@ -140,10 +140,12 @@ public Object getResult() { @Experimental final class UpdateInput { private final String updateName; + private final Header header; private final Object[] arguments; - public UpdateInput(String updateName, Object[] arguments) { + public UpdateInput(String updateName, Header header, Object[] arguments) { this.updateName = updateName; + this.header = header; this.arguments = arguments; } @@ -151,6 +153,10 @@ public String getUpdateName() { return updateName; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index 88dcff8af..e289b3f23 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -21,7 +21,6 @@ package io.temporal.internal.client; import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; -import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; import io.grpc.Deadline; import io.grpc.Status; @@ -306,7 +305,10 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { Optional inputArgs = dataConverterWithWorkflowContext.toPayloads(input.getArguments()); - Input.Builder updateInput = Input.newBuilder().setName(input.getUpdateName()); + Input.Builder updateInput = + Input.newBuilder() + .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)) + .setName(input.getUpdateName()); inputArgs.ifPresent(updateInput::setArgs); Request request = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java index c66a4f929..8065745e5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflow.java @@ -40,7 +40,11 @@ public interface ReplayWorkflow { /** Handle an update workflow execution event */ void handleUpdate( - String updateName, Optional input, long eventId, UpdateProtocolCallback callbacks); + String updateName, + Optional input, + long eventId, + Header header, + UpdateProtocolCallback callbacks); /** * @return true if the execution of the workflow method is finished or an exit was explicitly diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java index 343a826fa..87e359ebd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java @@ -158,7 +158,11 @@ public void handleWorkflowExecutionUpdated(UpdateMessage updateMessage) { Input input = update.getInput(); Optional args = Optional.ofNullable(input.getArgs()); this.workflow.handleUpdate( - input.getName(), args, protocolMessage.getEventId(), updateMessage.getCallbacks()); + input.getName(), + args, + protocolMessage.getEventId(), + input.getHeader(), + updateMessage.getCallbacks()); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Message is not an update."); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index 781f925bd..84fcd622b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -150,7 +150,11 @@ public void handleSignal( @Override public void handleUpdate( - String updateName, Optional input, long eventId, UpdateProtocolCallback callbacks) { + String updateName, + Optional input, + long eventId, + Header header, + UpdateProtocolCallback callbacks) { runner.executeInWorkflowThread( "update " + updateName, () -> { @@ -160,7 +164,7 @@ public void handleUpdate( // TODO(https://github.com/temporalio/sdk-java/issues/1748) handleValidateUpdate // should not just be run // in a workflow thread - workflowProc.handleValidateUpdate(updateName, input, eventId); + workflowProc.handleValidateUpdate(updateName, input, eventId, header); } catch (Exception e) { callbacks.reject(this.dataConverter.exceptionToFailure(e)); return; @@ -169,7 +173,7 @@ public void handleUpdate( callbacks.accept(); try { Optional result = - workflowProc.handleExecuteUpdate(updateName, input, eventId); + workflowProc.handleExecuteUpdate(updateName, input, eventId, header); callbacks.complete(result, null); } catch (WorkflowExecutionException e) { callbacks.complete(Optional.empty(), e.getFailure()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 61d765830..71de8085b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -318,13 +318,14 @@ public void handleSignal( signalDispatcher.handleSignal(signalName, input, eventId, header); } - public void handleValidateUpdate(String updateName, Optional input, long eventId) { - updateDispatcher.handleValidateUpdate(updateName, input, eventId); + public void handleValidateUpdate( + String updateName, Optional input, long eventId, Header header) { + updateDispatcher.handleValidateUpdate(updateName, input, eventId, header); } public Optional handleExecuteUpdate( - String updateName, Optional input, long eventId) { - return updateDispatcher.handleExecuteUpdate(updateName, input, eventId); + String updateName, Optional input, long eventId, Header header) { + return updateDispatcher.handleExecuteUpdate(updateName, input, eventId, header); } public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java index 19a4d2b3e..aaa96060e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java @@ -23,6 +23,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor.UpdateInput; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor.UpdateOutput; @@ -53,7 +54,8 @@ public void setInboundCallsInterceptor(WorkflowInboundCallsInterceptor inboundCa this.inboundCallsInterceptor = inboundCallsInterceptor; } - public void handleValidateUpdate(String updateName, Optional input, long eventId) { + public void handleValidateUpdate( + String updateName, Optional input, long eventId, Header header) { WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest handler = updateCallbacks.get(updateName); Object[] args; @@ -70,11 +72,11 @@ public void handleValidateUpdate(String updateName, Optional input, lo } inboundCallsInterceptor.validateUpdate( - new WorkflowInboundCallsInterceptor.UpdateInput(updateName, args)); + new WorkflowInboundCallsInterceptor.UpdateInput(updateName, header, args)); } public Optional handleExecuteUpdate( - String updateName, Optional input, long eventId) { + String updateName, Optional input, long eventId, Header header) { WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest handler = updateCallbacks.get(updateName); Object[] args; @@ -91,7 +93,8 @@ public Optional handleExecuteUpdate( } Object result = inboundCallsInterceptor - .executeUpdate(new WorkflowInboundCallsInterceptor.UpdateInput(updateName, args)) + .executeUpdate( + new WorkflowInboundCallsInterceptor.UpdateInput(updateName, header, args)) .getResult(); return dataConverterWithWorkflowContext.toPayloads(result); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java index f14c5e532..7a6ed0a00 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java @@ -103,18 +103,25 @@ public Optional handleQuery(String type, Optional args) { return context.handleQuery(type, args); } - public void handleValidateUpdate(String updateName, Optional input, long eventId) { + public void handleValidateUpdate( + String updateName, + Optional input, + long eventId, + io.temporal.api.common.v1.Header header) { try { - context.handleValidateUpdate(updateName, input, eventId); + context.handleValidateUpdate(updateName, input, eventId, new Header(header)); } catch (Throwable e) { applyWorkflowFailurePolicyAndRethrow(e); } } public Optional handleExecuteUpdate( - String updateName, Optional input, long eventId) { + String updateName, + Optional input, + long eventId, + io.temporal.api.common.v1.Header header) { try { - return context.handleExecuteUpdate(updateName, input, eventId); + return context.handleExecuteUpdate(updateName, input, eventId, new Header(header)); } catch (Throwable e) { applyWorkflowFailurePolicyAndRethrow(e); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java index 68b591517..84bb8eba9 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java @@ -292,13 +292,15 @@ private ReplayWorkflowRunTaskHandler createFakeExecutor(PollWorkflowTaskQueueRes public void start(HistoryEvent event, ReplayWorkflowContext context) {} @Override - public void handleSignal(String signalName, Optional input, long eventId, Header header) {} + public void handleSignal( + String signalName, Optional input, long eventId, Header header) {} @Override public void handleUpdate( String updateName, Optional input, long eventId, + Header header, UpdateProtocolCallback callbacks) {} @Override From ff21f0aa31cca344f2889f5174cc6ef4b40fd06b Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 16 Jul 2023 23:41:08 +0330 Subject: [PATCH 05/17] feat: support tracing on queries --- .../opentracing/SpanOperationType.java | 1 + .../ActionTypeAndNameSpanBuilderProvider.java | 2 + ...TracingWorkflowClientCallsInterceptor.java | 21 ++++++++++ ...racingWorkflowInboundCallsInterceptor.java | 33 ++++++++++++--- .../opentracing/internal/SpanFactory.java | 40 ++++++++++++++++++- .../io/temporal/client/WorkflowStubImpl.java | 2 +- .../WorkflowClientCallsInterceptor.java | 7 ++++ .../WorkflowInboundCallsInterceptor.java | 10 ++++- .../client/RootWorkflowClientInvoker.java | 5 ++- .../internal/sync/QueryDispatcher.java | 5 ++- .../temporal/internal/sync/SyncWorkflow.java | 2 +- .../internal/sync/SyncWorkflowContext.java | 4 +- .../sync/WorkflowExecutionHandler.java | 5 ++- 13 files changed, 119 insertions(+), 18 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 15bc2bd3c..13da9e418 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -28,6 +28,7 @@ public enum SpanOperationType { START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"), START_ACTIVITY("StartActivity"), RUN_ACTIVITY("RunActivity"), + QUERY_WORKFLOW("QueryWorkflow"), SIGNAL_WORKFLOW("SignalWorkflow"), UPDATE_WORKFLOW("UpdateWorkflow"); diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 3b4d50c78..f76863964 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -96,6 +96,8 @@ protected Map getSpanTags(SpanCreationContext context) { return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case QUERY_WORKFLOW: + return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown span operation type provided"); } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index 919918120..f064bbc85 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -98,6 +98,27 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu } } + @Override + public QueryOutput query(QueryInput input) { + Span workflowQuerySpan = + contextAccessor.writeSpanContextToHeader( + () -> + spanFactory + .createWorkflowQuerySpan( + tracer, + input.getQueryType(), + input.getWorkflowExecution().getWorkflowId(), + References.FOLLOWS_FROM) + .start(), + input.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) { + return super.query(input); + } finally { + workflowQuerySpan.finish(); + } + } + @Override public StartUpdateOutput startUpdate(StartUpdateInput input) { Span workflowStartUpdateSpan = diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java index fc2ed8de3..d4966b147 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -69,7 +69,7 @@ public WorkflowOutput execute(WorkflowInput input) { Workflow.getInfo().getRunId(), rootSpanContext) .start(); - try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) { + try (Scope ignored = tracer.scopeManager().activate(workflowRunSpan)) { return super.execute(input); } catch (Throwable t) { if (t instanceof DestroyWorkflowThreadError) { @@ -92,12 +92,12 @@ public void handleSignal(SignalInput input) { spanFactory .createWorkflowSignalSpan( tracer, - Workflow.getInfo().getWorkflowType(), + input.getSignalName(), Workflow.getInfo().getWorkflowId(), Workflow.getInfo().getRunId(), rootSpanContext) .start(); - try (Scope scope = tracer.scopeManager().activate(workflowSignalSpan)) { + try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) { super.handleSignal(input); } catch (Throwable t) { if (t instanceof DestroyWorkflowThreadError) { @@ -111,6 +111,27 @@ public void handleSignal(SignalInput input) { } } + @Override + public QueryOutput handleQuery(QueryInput input) { + Tracer tracer = options.getTracer(); + SpanContext rootSpanContext = + contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); + Span workflowQuerySpan = + spanFactory.createWorkflowQuerySpan(tracer, input.getQueryName(), rootSpanContext).start(); + try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) { + return super.handleQuery(input); + } catch (Throwable t) { + if (t instanceof DestroyWorkflowThreadError) { + spanFactory.logEviction(workflowQuerySpan); + } else { + spanFactory.logFail(workflowQuerySpan, t); + } + throw t; + } finally { + workflowQuerySpan.finish(); + } + } + @Override public UpdateOutput executeUpdate(UpdateInput input) { Tracer tracer = options.getTracer(); @@ -118,14 +139,14 @@ public UpdateOutput executeUpdate(UpdateInput input) { contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); Span workflowSignalSpan = spanFactory - .createWorkflowSignalSpan( + .createWorkflowStartUpdateSpan( tracer, - Workflow.getInfo().getWorkflowType(), + input.getUpdateName(), Workflow.getInfo().getWorkflowId(), Workflow.getInfo().getRunId(), rootSpanContext) .start(); - try (Scope scope = tracer.scopeManager().activate(workflowSignalSpan)) { + try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) { return super.executeUpdate(input); } catch (Throwable t) { if (t instanceof DestroyWorkflowThreadError) { diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 33256d267..42b087f53 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -165,7 +165,7 @@ public Tracer.SpanBuilder createWorkflowStartUpdateSpan( Tracer tracer, String updateName, String workflowId, String runId) { SpanCreationContext context = SpanCreationContext.newBuilder() - .setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW) + .setSpanOperationType(SpanOperationType.UPDATE_WORKFLOW) .setActionName(updateName) .setWorkflowId(workflowId) .setRunId(runId) @@ -173,6 +173,44 @@ public Tracer.SpanBuilder createWorkflowStartUpdateSpan( return createSpan(context, tracer, null, References.FOLLOWS_FROM); } + public Tracer.SpanBuilder createWorkflowStartUpdateSpan( + Tracer tracer, + String updateName, + String workflowId, + String runId, + SpanContext workflowSignalSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.UPDATE_WORKFLOW) + .setActionName(updateName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowQuerySpan( + Tracer tracer, String updateName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.QUERY_WORKFLOW) + .setActionName(updateName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + + public Tracer.SpanBuilder createWorkflowQuerySpan( + Tracer tracer, String queryName, SpanContext workflowSignalSpanContext) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.QUERY_WORKFLOW) + .setActionName(queryName) + .build(); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + } + @SuppressWarnings("deprecation") public void logFail(Span toSpan, Throwable failReason) { toSpan.setTag(StandardTagNames.FAILED, true); diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java index 81eab16e8..9cd54b6c9 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowStubImpl.java @@ -275,7 +275,7 @@ public R query(String queryType, Class resultClass, Type resultType, Obje result = workflowClientInvoker.query( new WorkflowClientCallsInterceptor.QueryInput<>( - targetExecution, queryType, args, resultClass, resultType)); + targetExecution, queryType, Header.empty(), args, resultClass, resultType)); } catch (Exception e) { return throwAsWorkflowFailureExceptionForQuery(e, resultClass, targetExecution); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java index 494e08afc..66cd5a6bc 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowClientCallsInterceptor.java @@ -297,6 +297,7 @@ public CompletableFuture getResult() { final class QueryInput { private final WorkflowExecution workflowExecution; private final String queryType; + private final Header header; private final Object[] arguments; private final Class resultClass; private final Type resultType; @@ -304,11 +305,13 @@ final class QueryInput { public QueryInput( WorkflowExecution workflowExecution, String queryType, + Header header, Object[] arguments, Class resultClass, Type resultType) { this.workflowExecution = workflowExecution; this.queryType = queryType; + this.header = header; this.arguments = arguments; this.resultClass = resultClass; this.resultType = resultType; @@ -322,6 +325,10 @@ public String getQueryType() { return queryType; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java index 41a57e382..b9bf960c2 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java @@ -109,10 +109,12 @@ public Header getHeader() { final class QueryInput { private final String queryName; + private final Header header; private final Object[] arguments; - public QueryInput(String signalName, Object[] arguments) { - this.queryName = signalName; + public QueryInput(String queryName, Header header, Object[] arguments) { + this.queryName = queryName; + this.header = header; this.arguments = arguments; } @@ -120,6 +122,10 @@ public String getQueryName() { return queryName; } + public Header getHeader() { + return header; + } + public Object[] getArguments() { return arguments; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java index e289b3f23..ca5310b40 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java @@ -255,7 +255,10 @@ public GetResultAsyncOutput getResultAsync(GetResultInput input) { @Override public QueryOutput query(QueryInput input) { - WorkflowQuery.Builder query = WorkflowQuery.newBuilder().setQueryType(input.getQueryType()); + WorkflowQuery.Builder query = + WorkflowQuery.newBuilder() + .setQueryType(input.getQueryType()) + .setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null)); DataConverter dataConverterWithWorkflowContext = clientOptions .getDataConverter() diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java index 91246d981..891addf8c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java @@ -23,6 +23,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.workflow.DynamicQueryHandler; @@ -69,7 +70,7 @@ public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery( return new WorkflowInboundCallsInterceptor.QueryOutput(result); } - public Optional handleQuery(String queryName, Optional input) { + public Optional handleQuery(String queryName, Header header, Optional input) { WorkflowOutboundCallsInterceptor.RegisterQueryInput handler = queryCallbacks.get(queryName); Object[] args; if (handler == null) { @@ -85,7 +86,7 @@ public Optional handleQuery(String queryName, Optional input } Object result = inboundCallsInterceptor - .handleQuery(new WorkflowInboundCallsInterceptor.QueryInput(queryName, args)) + .handleQuery(new WorkflowInboundCallsInterceptor.QueryInput(queryName, header, args)) .getResult(); return dataConverterWithWorkflowContext.toPayloads(result); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index 84fcd622b..6348fce8c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -219,7 +219,7 @@ public Optional query(WorkflowQuery query) { } Optional args = query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty(); - return workflowProc.handleQuery(query.getQueryType(), args); + return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 71de8085b..c8fa9c741 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -342,8 +342,8 @@ public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery( return queryDispatcher.handleInterceptedQuery(input); } - public Optional handleQuery(String queryName, Optional input) { - return queryDispatcher.handleQuery(queryName, input); + public Optional handleQuery(String queryName, Header header, Optional input) { + return queryDispatcher.handleQuery(queryName, header, input); } private class ActivityCallback { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java index 7a6ed0a00..1a78df46f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowExecutionHandler.java @@ -99,8 +99,9 @@ public void handleSignal( } } - public Optional handleQuery(String type, Optional args) { - return context.handleQuery(type, args); + public Optional handleQuery( + String type, io.temporal.api.common.v1.Header header, Optional args) { + return context.handleQuery(type, new Header(header), args); } public void handleValidateUpdate( From 16fdb02ed31630813c74bd7bce69e64261cd62bb Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 16 Jul 2023 23:49:30 +0330 Subject: [PATCH 06/17] fix: tests --- .../java/io/temporal/internal/sync/QueryDispatcherTest.java | 6 ++++-- .../src/test/java/io/temporal/workflow/MetricsTest.java | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java index 4faf0be1d..07fca97ff 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/QueryDispatcherTest.java @@ -27,6 +27,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import java.lang.reflect.Type; @@ -67,7 +68,8 @@ public void testQuerySuccess() { dispatcher.setInboundCallsInterceptor(mockInterceptor); // Invoke functionality under test, expect no exceptions for an existing query. - Optional queryResult = dispatcher.handleQuery("QueryB", Optional.empty()); + Optional queryResult = + dispatcher.handleQuery("QueryB", Header.empty(), Optional.empty()); assertTrue(queryResult.isPresent()); } @@ -79,7 +81,7 @@ public void testQueryDispatcherException() { assertThrows( IllegalArgumentException.class, () -> { - dispatcher.handleQuery("QueryC", null); + dispatcher.handleQuery("QueryC", Header.empty(), null); }); assertEquals("Unknown query type: QueryC, knownTypes=[QueryA, QueryB]", exception.getMessage()); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java index 42683ea42..7c54fa63b 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/MetricsTest.java @@ -596,6 +596,7 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { new SignalExternalInput( input.getExecution(), overrideSignalName.apply(input.getSignalName()), + Header.empty(), overrideArgs.apply(args))); } } From 74028ebe7c2cc35b45466443ae9608f0cac79e00 Mon Sep 17 00:00:00 2001 From: siavash Date: Mon, 17 Jul 2023 17:27:51 +0330 Subject: [PATCH 07/17] feat: support tracing on queries --- .../opentracing/SpanOperationType.java | 6 ++++- ...TracingWorkflowClientCallsInterceptor.java | 6 ++--- ...acingWorkflowOutboundCallsInterceptor.java | 2 +- .../opentracing/internal/SpanFactory.java | 24 ++++++++++++++----- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 13da9e418..513606865 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -28,9 +28,13 @@ public enum SpanOperationType { START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"), START_ACTIVITY("StartActivity"), RUN_ACTIVITY("RunActivity"), + SIGNAL_EXTERNAL_WORKFLOW("SignalExternalWorkflow"), QUERY_WORKFLOW("QueryWorkflow"), SIGNAL_WORKFLOW("SignalWorkflow"), - UPDATE_WORKFLOW("UpdateWorkflow"); + UPDATE_WORKFLOW("UpdateWorkflow"), + HANDLE_QUERY("HandleQuery"), + HANDLE_SIGNAL("HandleSignal"), + HANDLE_UPDATE("HandleUpdate"); private final String defaultPrefix; diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index f064bbc85..1546d359f 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -69,7 +69,7 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) { tracer, input.getSignalName(), input.getWorkflowExecution().getWorkflowId(), - References.FOLLOWS_FROM) + input.getWorkflowExecution().getRunId()) .start(), input.getHeader(), tracer); @@ -108,7 +108,7 @@ public QueryOutput query(QueryInput input) { tracer, input.getQueryType(), input.getWorkflowExecution().getWorkflowId(), - References.FOLLOWS_FROM) + input.getWorkflowExecution().getRunId()) .start(), input.getHeader(), tracer); @@ -129,7 +129,7 @@ public StartUpdateOutput startUpdate(StartUpdateInput input) { tracer, input.getUpdateName(), input.getWorkflowExecution().getWorkflowId(), - References.FOLLOWS_FROM) + input.getWorkflowExecution().getRunId()) .start(), input.getHeader(), tracer); diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java index 0ecc2f137..5d6661d53 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowOutboundCallsInterceptor.java @@ -108,7 +108,7 @@ public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) { contextAccessor.writeSpanContextToHeader( () -> spanFactory - .createWorkflowSignalSpan( + .createExternalWorkflowSignalSpan( tracer, input.getSignalName(), workflowInfo.getWorkflowId(), diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 42b087f53..efec55e94 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -77,6 +77,18 @@ public Tracer.SpanBuilder createChildWorkflowStartSpan( return createSpan(context, tracer, null, References.CHILD_OF); } + public Tracer.SpanBuilder createExternalWorkflowSignalSpan( + Tracer tracer, String signalName, String workflowId, String runId) { + SpanCreationContext context = + SpanCreationContext.newBuilder() + .setSpanOperationType(SpanOperationType.SIGNAL_EXTERNAL_WORKFLOW) + .setActionName(signalName) + .setWorkflowId(workflowId) + .setRunId(runId) + .build(); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); + } + public Tracer.SpanBuilder createWorkflowSignalSpan( Tracer tracer, String signalName, String workflowId, String runId) { SpanCreationContext context = @@ -97,12 +109,12 @@ public Tracer.SpanBuilder createWorkflowSignalSpan( SpanContext workflowSignalSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() - .setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW) + .setSpanOperationType(SpanOperationType.HANDLE_SIGNAL) .setActionName(signalName) .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + return createSpan(context, tracer, workflowSignalSpanContext, References.CHILD_OF); } public Tracer.SpanBuilder createContinueAsNewWorkflowStartSpan( @@ -181,12 +193,12 @@ public Tracer.SpanBuilder createWorkflowStartUpdateSpan( SpanContext workflowSignalSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() - .setSpanOperationType(SpanOperationType.UPDATE_WORKFLOW) + .setSpanOperationType(SpanOperationType.HANDLE_UPDATE) .setActionName(updateName) .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + return createSpan(context, tracer, workflowSignalSpanContext, References.CHILD_OF); } public Tracer.SpanBuilder createWorkflowQuerySpan( @@ -205,10 +217,10 @@ public Tracer.SpanBuilder createWorkflowQuerySpan( Tracer tracer, String queryName, SpanContext workflowSignalSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() - .setSpanOperationType(SpanOperationType.QUERY_WORKFLOW) + .setSpanOperationType(SpanOperationType.HANDLE_QUERY) .setActionName(queryName) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + return createSpan(context, tracer, workflowSignalSpanContext, References.CHILD_OF); } @SuppressWarnings("deprecation") From 1a44c2234d7b7abc1cfefdcde291195c94437f66 Mon Sep 17 00:00:00 2001 From: siavash Date: Mon, 17 Jul 2023 17:33:28 +0330 Subject: [PATCH 08/17] feat: support tracing on queries --- .../internal/ActionTypeAndNameSpanBuilderProvider.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index f76863964..ee406b5f2 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -88,8 +88,11 @@ protected Map getSpanTags(SpanCreationContext context) { case RUN_WORKFLOW: case START_ACTIVITY: case RUN_ACTIVITY: + case SIGNAL_EXTERNAL_WORKFLOW: case SIGNAL_WORKFLOW: case UPDATE_WORKFLOW: + case HANDLE_SIGNAL: + case HANDLE_UPDATE: String runId = context.getRunId(); Preconditions.checkNotNull( runId, "runId is expected to be not null for span operation type %s", operationType); @@ -97,6 +100,7 @@ protected Map getSpanTags(SpanCreationContext context) { StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); case QUERY_WORKFLOW: + case HANDLE_QUERY: return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown span operation type provided"); From a69f5be623f974aa6ad1deab850861281bcb9f68 Mon Sep 17 00:00:00 2001 From: siavash Date: Sat, 22 Jul 2023 10:34:52 +0330 Subject: [PATCH 09/17] feat: support tracing on queries --- .../io/temporal/opentracing/internal/SpanFactory.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index efec55e94..ae6f7effe 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -86,7 +86,7 @@ public Tracer.SpanBuilder createExternalWorkflowSignalSpan( .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, null, References.FOLLOWS_FROM); + return createSpan(context, tracer, null, References.CHILD_OF); } public Tracer.SpanBuilder createWorkflowSignalSpan( @@ -114,7 +114,7 @@ public Tracer.SpanBuilder createWorkflowSignalSpan( .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.CHILD_OF); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); } public Tracer.SpanBuilder createContinueAsNewWorkflowStartSpan( @@ -198,7 +198,7 @@ public Tracer.SpanBuilder createWorkflowStartUpdateSpan( .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.CHILD_OF); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); } public Tracer.SpanBuilder createWorkflowQuerySpan( @@ -220,7 +220,7 @@ public Tracer.SpanBuilder createWorkflowQuerySpan( .setSpanOperationType(SpanOperationType.HANDLE_QUERY) .setActionName(queryName) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.CHILD_OF); + return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); } @SuppressWarnings("deprecation") From 172cf89a0b306f43fdbd72d3a51edc511cad3288 Mon Sep 17 00:00:00 2001 From: siavash Date: Sat, 22 Jul 2023 20:58:02 +0330 Subject: [PATCH 10/17] feat: support tracing on queries --- .../internal/OpenTracingWorkflowClientCallsInterceptor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index 1546d359f..3ddd42d29 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -20,7 +20,6 @@ package io.temporal.opentracing.internal; -import io.opentracing.References; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.Tracer; From aa24cb911d26f9e1d7b78d2ae84095498d48efdf Mon Sep 17 00:00:00 2001 From: siavash Date: Mon, 14 Aug 2023 21:42:44 +0330 Subject: [PATCH 11/17] fix: code review comments --- .../internal/ActionTypeAndNameSpanBuilderProvider.java | 5 ++--- .../OpenTracingWorkflowInboundCallsInterceptor.java | 6 +++--- .../java/io/temporal/opentracing/internal/SpanFactory.java | 6 +++--- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index ee406b5f2..da7d8b973 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -91,17 +91,16 @@ protected Map getSpanTags(SpanCreationContext context) { case SIGNAL_EXTERNAL_WORKFLOW: case SIGNAL_WORKFLOW: case UPDATE_WORKFLOW: + case QUERY_WORKFLOW: case HANDLE_SIGNAL: case HANDLE_UPDATE: + case HANDLE_QUERY: String runId = context.getRunId(); Preconditions.checkNotNull( runId, "runId is expected to be not null for span operation type %s", operationType); return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); - case QUERY_WORKFLOW: - case HANDLE_QUERY: - return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown span operation type provided"); } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java index d4966b147..0ba623675 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -90,7 +90,7 @@ public void handleSignal(SignalInput input) { contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); Span workflowSignalSpan = spanFactory - .createWorkflowSignalSpan( + .createWorkflowHandleSignalSpan( tracer, input.getSignalName(), Workflow.getInfo().getWorkflowId(), @@ -117,7 +117,7 @@ public QueryOutput handleQuery(QueryInput input) { SpanContext rootSpanContext = contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); Span workflowQuerySpan = - spanFactory.createWorkflowQuerySpan(tracer, input.getQueryName(), rootSpanContext).start(); + spanFactory.createWorkflowHandleQuerySpan(tracer, input.getQueryName(), rootSpanContext).start(); try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) { return super.handleQuery(input); } catch (Throwable t) { @@ -139,7 +139,7 @@ public UpdateOutput executeUpdate(UpdateInput input) { contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); Span workflowSignalSpan = spanFactory - .createWorkflowStartUpdateSpan( + .createWorkflowExecuteUpdateSpan( tracer, input.getUpdateName(), Workflow.getInfo().getWorkflowId(), diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index ae6f7effe..700c4a3d4 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -101,7 +101,7 @@ public Tracer.SpanBuilder createWorkflowSignalSpan( return createSpan(context, tracer, null, References.FOLLOWS_FROM); } - public Tracer.SpanBuilder createWorkflowSignalSpan( + public Tracer.SpanBuilder createWorkflowHandleSignalSpan( Tracer tracer, String signalName, String workflowId, @@ -185,7 +185,7 @@ public Tracer.SpanBuilder createWorkflowStartUpdateSpan( return createSpan(context, tracer, null, References.FOLLOWS_FROM); } - public Tracer.SpanBuilder createWorkflowStartUpdateSpan( + public Tracer.SpanBuilder createWorkflowExecuteUpdateSpan( Tracer tracer, String updateName, String workflowId, @@ -213,7 +213,7 @@ public Tracer.SpanBuilder createWorkflowQuerySpan( return createSpan(context, tracer, null, References.FOLLOWS_FROM); } - public Tracer.SpanBuilder createWorkflowQuerySpan( + public Tracer.SpanBuilder createWorkflowHandleQuerySpan( Tracer tracer, String queryName, SpanContext workflowSignalSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() From b9744df6359271d35c57a7fb1d48d61ff1cce622 Mon Sep 17 00:00:00 2001 From: siavash Date: Mon, 14 Aug 2023 22:32:51 +0330 Subject: [PATCH 12/17] fix: code review comments --- .../internal/ActionTypeAndNameSpanBuilderProvider.java | 3 ++- .../internal/OpenTracingWorkflowInboundCallsInterceptor.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index da7d8b973..43d226226 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -94,13 +94,14 @@ protected Map getSpanTags(SpanCreationContext context) { case QUERY_WORKFLOW: case HANDLE_SIGNAL: case HANDLE_UPDATE: - case HANDLE_QUERY: String runId = context.getRunId(); Preconditions.checkNotNull( runId, "runId is expected to be not null for span operation type %s", operationType); return ImmutableMap.of( StandardTagNames.WORKFLOW_ID, context.getWorkflowId(), StandardTagNames.RUN_ID, context.getRunId()); + case HANDLE_QUERY: + return ImmutableMap.of(); } throw new IllegalArgumentException("Unknown span operation type provided"); } diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java index 0ba623675..1c83cad9a 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowInboundCallsInterceptor.java @@ -117,7 +117,9 @@ public QueryOutput handleQuery(QueryInput input) { SpanContext rootSpanContext = contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer); Span workflowQuerySpan = - spanFactory.createWorkflowHandleQuerySpan(tracer, input.getQueryName(), rootSpanContext).start(); + spanFactory + .createWorkflowHandleQuerySpan(tracer, input.getQueryName(), rootSpanContext) + .start(); try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) { return super.handleQuery(input); } catch (Throwable t) { From 9f1d16f6296f9a16945541e68b9cfbbac18c3ab2 Mon Sep 17 00:00:00 2001 From: siavash Date: Sat, 2 Sep 2023 00:08:18 +0330 Subject: [PATCH 13/17] refactor: merge latest commits --- .../temporal/opentracing/internal/SpanFactory.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index 700c4a3d4..c6ebe2fc9 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -33,9 +33,10 @@ import io.temporal.opentracing.SpanCreationContext; import io.temporal.opentracing.SpanOperationType; import io.temporal.opentracing.StandardTagNames; + +import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import javax.annotation.Nullable; public class SpanFactory { // Inspired by convention used in JAX-RS2 OpenTracing implementation: @@ -86,7 +87,7 @@ public Tracer.SpanBuilder createExternalWorkflowSignalSpan( .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, null, References.CHILD_OF); + return createSpan(context, tracer, null, References.FOLLOWS_FROM); } public Tracer.SpanBuilder createWorkflowSignalSpan( @@ -190,7 +191,7 @@ public Tracer.SpanBuilder createWorkflowExecuteUpdateSpan( String updateName, String workflowId, String runId, - SpanContext workflowSignalSpanContext) { + SpanContext workflowUpdateSpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() .setSpanOperationType(SpanOperationType.HANDLE_UPDATE) @@ -198,7 +199,7 @@ public Tracer.SpanBuilder createWorkflowExecuteUpdateSpan( .setWorkflowId(workflowId) .setRunId(runId) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + return createSpan(context, tracer, workflowUpdateSpanContext, References.FOLLOWS_FROM); } public Tracer.SpanBuilder createWorkflowQuerySpan( @@ -214,13 +215,13 @@ public Tracer.SpanBuilder createWorkflowQuerySpan( } public Tracer.SpanBuilder createWorkflowHandleQuerySpan( - Tracer tracer, String queryName, SpanContext workflowSignalSpanContext) { + Tracer tracer, String queryName, SpanContext workflowQuerySpanContext) { SpanCreationContext context = SpanCreationContext.newBuilder() .setSpanOperationType(SpanOperationType.HANDLE_QUERY) .setActionName(queryName) .build(); - return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM); + return createSpan(context, tracer, workflowQuerySpanContext, References.FOLLOWS_FROM); } @SuppressWarnings("deprecation") From 2e02ef9dfb970756bc8a29245b9bdf58997e24ef Mon Sep 17 00:00:00 2001 From: siavash Date: Sat, 2 Sep 2023 00:43:31 +0330 Subject: [PATCH 14/17] refactor: merge latest commits --- .../java/io/temporal/opentracing/internal/SpanFactory.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java index c6ebe2fc9..9e0e79ed9 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/SpanFactory.java @@ -33,10 +33,9 @@ import io.temporal.opentracing.SpanCreationContext; import io.temporal.opentracing.SpanOperationType; import io.temporal.opentracing.StandardTagNames; - -import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; public class SpanFactory { // Inspired by convention used in JAX-RS2 OpenTracing implementation: From c6dc5461ca19e191c7caf6121c902aaba346ccf5 Mon Sep 17 00:00:00 2001 From: siavash Date: Sat, 2 Sep 2023 01:16:55 +0330 Subject: [PATCH 15/17] fix: test failure --- .../java/io/temporal/opentracing/SignalWithStartTest.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java index 79ad11ceb..0464b8594 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java @@ -122,9 +122,13 @@ public void signalWithStart() { assertEquals("SignalWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName()); List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); - assertEquals(1, workflowRunSpans.size()); + assertEquals(2, workflowRunSpans.size()); - MockSpan workflowRunSpan = workflowRunSpans.get(0); + MockSpan workflowSignalSpan = workflowRunSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId()); + assertEquals("HandleSignal:signal", workflowSignalSpan.operationName()); + + MockSpan workflowRunSpan = workflowRunSpans.get(1); assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); } From 355c61369d146f2cb01667fce20ce3a2610b4421 Mon Sep 17 00:00:00 2001 From: siavash Date: Sat, 2 Sep 2023 01:19:07 +0330 Subject: [PATCH 16/17] fix: test failure --- .../java/io/temporal/opentracing/SignalWithStartTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java index 0464b8594..bda67e6cd 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java @@ -121,14 +121,14 @@ public void signalWithStart() { assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); assertEquals("SignalWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName()); - List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); - assertEquals(2, workflowRunSpans.size()); + List workflowSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(2, workflowSpans.size()); - MockSpan workflowSignalSpan = workflowRunSpans.get(0); + MockSpan workflowSignalSpan = workflowSpans.get(0); assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId()); assertEquals("HandleSignal:signal", workflowSignalSpan.operationName()); - MockSpan workflowRunSpan = workflowRunSpans.get(1); + MockSpan workflowRunSpan = workflowSpans.get(1); assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); } From 8fb166f4aa7c6062a433741f8146ba65d69c8cb2 Mon Sep 17 00:00:00 2001 From: siavash Date: Sun, 10 Sep 2023 01:24:18 +0330 Subject: [PATCH 17/17] fix: test failure --- .../opentracing/SignalWithStartTest.java | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java index bda67e6cd..13b42e3b4 100644 --- a/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/SignalWithStartTest.java @@ -121,15 +121,24 @@ public void signalWithStart() { assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); assertEquals("SignalWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName()); - List workflowSpans = spansHelper.getByParentSpan(workflowStartSpan); - assertEquals(2, workflowSpans.size()); - - MockSpan workflowSignalSpan = workflowSpans.get(0); - assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId()); - assertEquals("HandleSignal:signal", workflowSignalSpan.operationName()); - - MockSpan workflowRunSpan = workflowSpans.get(1); - assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); - assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + if (SDKTestWorkflowRule.useExternalService) { + List workflowSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(2, workflowSpans.size()); + + MockSpan workflowSignalSpan = workflowSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId()); + assertEquals("HandleSignal:signal", workflowSignalSpan.operationName()); + + MockSpan workflowRunSpan = workflowSpans.get(1); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + } else { + List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(1, workflowRunSpans.size()); + + MockSpan workflowRunSpan = workflowRunSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + } } }