Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing support improvements #1819

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ public enum SpanOperationType {
START_CHILD_WORKFLOW("StartChildWorkflow"),
START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"),
START_ACTIVITY("StartActivity"),
RUN_ACTIVITY("RunActivity");
RUN_ACTIVITY("RunActivity"),
SIGNAL_EXTERNAL_WORKFLOW("SignalExternalWorkflow"),
QUERY_WORKFLOW("QueryWorkflow"),
SIGNAL_WORKFLOW("SignalWorkflow"),
UPDATE_WORKFLOW("UpdateWorkflow"),
HANDLE_QUERY("HandleQuery"),
HANDLE_SIGNAL("HandleSignal"),
HANDLE_UPDATE("HandleUpdate");

private final String defaultPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,20 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
case RUN_WORKFLOW:
case START_ACTIVITY:
case RUN_ACTIVITY:
case SIGNAL_EXTERNAL_WORKFLOW:
case SIGNAL_WORKFLOW:
case UPDATE_WORKFLOW:
case QUERY_WORKFLOW:
case HANDLE_SIGNAL:
case HANDLE_UPDATE:
String runId = context.getRunId();
Preconditions.checkNotNull(
runId, "runId is expected to be not null for span operation type %s", operationType);
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());
case HANDLE_QUERY:
return ImmutableMap.of();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our other SDKs, query spans also have workflow ID and run ID.

Copy link
Contributor Author

@siavashkavousi siavashkavousi Jul 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that handleQuery is not executed within a workflow thread, leading to non-deterministic errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ImmutableMap.of();
return ImmutableMap.of(
StandardTagNames.WORKFLOW_ID, context.getWorkflowId(),
StandardTagNames.RUN_ID, context.getRunId());

To confirm, are you saying change this to the above errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you are not setting the workflow ID and run ID on HANDLE_QUERY. You should set those the same way you do for HANDLE_SIGNAL.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that a span without these tags is ok, but I'll let y'all decide

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the options are either no span or a span without those tags. I have no preference.

Regardless of which we go with we will need to open an issue to address this when we fix getInfo in query handlers , but I don't think that should block this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concur, will let you make the call on tagless span vs no span. I don't have a super-strong opinion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it would be better to have a tagless span since the span itself is useful for tracing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me

}
throw new IllegalArgumentException("Unknown span operation type provided");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
}
}

@Override
public WorkflowSignalOutput signal(WorkflowSignalInput input) {
Span workflowSignalSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createWorkflowSignalSpan(
tracer,
input.getSignalName(),
input.getWorkflowExecution().getWorkflowId(),
input.getWorkflowExecution().getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) {
return super.signal(input);
} finally {
workflowSignalSpan.finish();
}
}

@Override
public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInput input) {
WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
Expand All @@ -76,6 +97,48 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
}
}

@Override
public <R> QueryOutput<R> query(QueryInput<R> input) {
Span workflowQuerySpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createWorkflowQuerySpan(
tracer,
input.getQueryType(),
input.getWorkflowExecution().getWorkflowId(),
input.getWorkflowExecution().getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) {
return super.query(input);
} finally {
workflowQuerySpan.finish();
}
}

@Override
public <R> StartUpdateOutput<R> startUpdate(StartUpdateInput<R> input) {
Span workflowStartUpdateSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createWorkflowStartUpdateSpan(
tracer,
input.getUpdateName(),
input.getWorkflowExecution().getWorkflowId(),
input.getWorkflowExecution().getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(workflowStartUpdateSpan)) {
return super.startUpdate(input);
} finally {
workflowStartUpdateSpan.finish();
}
}

private Tracer.SpanBuilder createWorkflowStartSpanBuilder(
WorkflowStartInput input, SpanOperationType operationType) {
return spanFactory.createWorkflowStartSpan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public WorkflowOutput execute(WorkflowInput input) {
Workflow.getInfo().getRunId(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(workflowRunSpan)) {
try (Scope ignored = tracer.scopeManager().activate(workflowRunSpan)) {
return super.execute(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
Expand All @@ -82,4 +82,83 @@ public WorkflowOutput execute(WorkflowInput input) {
workflowRunSpan.finish();
}
}

@Override
public void handleSignal(SignalInput input) {
Tracer tracer = options.getTracer();
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
Span workflowSignalSpan =
spanFactory
.createWorkflowHandleSignalSpan(
tracer,
input.getSignalName(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend, to match other SDKs, that workflow inbound signal, query, and update span names be HandleSignal:<name>, HandleQuery:<name>, and HandleQuery:<name> instead of reusing the same outbound names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Workflow.getInfo().getWorkflowId(),
Workflow.getInfo().getRunId(),
rootSpanContext)
.start();
try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) {
super.handleSignal(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
spanFactory.logEviction(workflowSignalSpan);
} else {
spanFactory.logFail(workflowSignalSpan, t);
}
throw t;
} finally {
workflowSignalSpan.finish();
}
}

@Override
public QueryOutput handleQuery(QueryInput input) {
Tracer tracer = options.getTracer();
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
Span workflowQuerySpan =
spanFactory
.createWorkflowHandleQuerySpan(tracer, input.getQueryName(), rootSpanContext)
.start();
try (Scope ignored = tracer.scopeManager().activate(workflowQuerySpan)) {
return super.handleQuery(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
spanFactory.logEviction(workflowQuerySpan);
} else {
spanFactory.logFail(workflowQuerySpan, t);
}
throw t;
} finally {
workflowQuerySpan.finish();
}
}

@Override
public UpdateOutput executeUpdate(UpdateInput input) {
Tracer tracer = options.getTracer();
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getHeader(), tracer);
Span workflowSignalSpan =
spanFactory
.createWorkflowExecuteUpdateSpan(
tracer,
input.getUpdateName(),
Workflow.getInfo().getWorkflowId(),
Workflow.getInfo().getRunId(),
rootSpanContext)
.start();
try (Scope ignored = tracer.scopeManager().activate(workflowSignalSpan)) {
return super.executeUpdate(input);
} catch (Throwable t) {
if (t instanceof DestroyWorkflowThreadError) {
spanFactory.logEviction(workflowSignalSpan);
} else {
spanFactory.logFail(workflowSignalSpan, t);
}
throw t;
} finally {
workflowSignalSpan.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,32 @@ public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> inp
}
}

@Override
public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
if (!WorkflowUnsafe.isReplaying()) {
WorkflowInfo workflowInfo = Workflow.getInfo();
Span childWorkflowStartSpan =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createExternalWorkflowSignalSpan(
tracer,
input.getSignalName(),
workflowInfo.getWorkflowId(),
workflowInfo.getRunId())
.start(),
input.getHeader(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(childWorkflowStartSpan)) {
return super.signalExternalWorkflow(input);
} finally {
childWorkflowStartSpan.finish();
}
} else {
return super.signalExternalWorkflow(input);
}
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,46 @@ public Tracer.SpanBuilder createChildWorkflowStartSpan(
return createSpan(context, tracer, null, References.CHILD_OF);
}

public Tracer.SpanBuilder createExternalWorkflowSignalSpan(
Tracer tracer, String signalName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.SIGNAL_EXTERNAL_WORKFLOW)
.setActionName(signalName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowSignalSpan(
Tracer tracer, String signalName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.SIGNAL_WORKFLOW)
.setActionName(signalName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowHandleSignalSpan(
Tracer tracer,
String signalName,
String workflowId,
String runId,
SpanContext workflowSignalSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.HANDLE_SIGNAL)
.setActionName(signalName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, workflowSignalSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createContinueAsNewWorkflowStartSpan(
Tracer tracer, String continueAsNewWorkflowType, String workflowId, String parentRunId) {
SpanCreationContext context =
Expand Down Expand Up @@ -133,6 +173,56 @@ public Tracer.SpanBuilder createActivityRunSpan(
return createSpan(context, tracer, activityStartSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowStartUpdateSpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.UPDATE_WORKFLOW)
.setActionName(updateName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowExecuteUpdateSpan(
Tracer tracer,
String updateName,
String workflowId,
String runId,
SpanContext workflowUpdateSpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.HANDLE_UPDATE)
.setActionName(updateName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, workflowUpdateSpanContext, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowQuerySpan(
Tracer tracer, String updateName, String workflowId, String runId) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.QUERY_WORKFLOW)
.setActionName(updateName)
.setWorkflowId(workflowId)
.setRunId(runId)
.build();
return createSpan(context, tracer, null, References.FOLLOWS_FROM);
}

public Tracer.SpanBuilder createWorkflowHandleQuerySpan(
Tracer tracer, String queryName, SpanContext workflowQuerySpanContext) {
SpanCreationContext context =
SpanCreationContext.newBuilder()
.setSpanOperationType(SpanOperationType.HANDLE_QUERY)
.setActionName(queryName)
.build();
return createSpan(context, tracer, workflowQuerySpanContext, References.FOLLOWS_FROM);
}

@SuppressWarnings("deprecation")
public void logFail(Span toSpan, Throwable failReason) {
toSpan.setTag(StandardTagNames.FAILED, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,24 @@ public void signalWithStart() {
assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId());
assertEquals("SignalWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName());

List<MockSpan> workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan);
assertEquals(1, workflowRunSpans.size());

MockSpan workflowRunSpan = workflowRunSpans.get(0);
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
if (SDKTestWorkflowRule.useExternalService) {
List<MockSpan> workflowSpans = spansHelper.getByParentSpan(workflowStartSpan);
assertEquals(2, workflowSpans.size());

MockSpan workflowSignalSpan = workflowSpans.get(0);
assertEquals(workflowStartSpan.context().spanId(), workflowSignalSpan.parentId());
assertEquals("HandleSignal:signal", workflowSignalSpan.operationName());

MockSpan workflowRunSpan = workflowSpans.get(1);
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
} else {
List<MockSpan> workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan);
assertEquals(1, workflowRunSpans.size());

MockSpan workflowRunSpan = workflowRunSpans.get(0);
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
}
}
}
Loading
Loading