From 707337527a01e0e6e1f0b048168138d854e091fe Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Fri, 6 Dec 2024 12:09:55 -0800 Subject: [PATCH] Add operation Id to callback headers (#2336) Add operation Id to callback headers --- build.gradle | 2 +- .../internal/common/InternalUtils.java | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 2257ce1ca..4fce4e12d 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { // Platforms grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.3.0-alpha' // [0.1.0,) + nexusVersion = '0.3.0-alpha' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index e37d65f58..f9fa4c637 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -21,6 +21,7 @@ package io.temporal.internal.common; import com.google.common.base.Defaults; +import io.nexusrpc.Header; import io.temporal.api.common.v1.Callback; import io.temporal.api.enums.v1.TaskQueueKind; import io.temporal.api.taskqueue.v1.TaskQueue; @@ -28,6 +29,8 @@ import io.temporal.client.WorkflowStub; import io.temporal.internal.client.NexusStartWorkflowRequest; import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +81,20 @@ public static WorkflowStub createNexusBoundStub( throw new IllegalArgumentException( "WorkflowId is expected to be set on WorkflowOptions when used with Nexus"); } + // Add the Nexus operation ID to the headers if it is not already present to support fabricating + // a NexusOperationStarted event if the completion is received before the response to a + // StartOperation request. + Map headers = + request.getCallbackHeaders().entrySet().stream() + .collect( + Collectors.toMap( + (k) -> k.getKey().toLowerCase(), + Map.Entry::getValue, + (a, b) -> a, + () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); + if (!headers.containsKey(Header.OPERATION_ID)) { + headers.put(Header.OPERATION_ID.toLowerCase(), options.getWorkflowId()); + } WorkflowOptions.Builder nexusWorkflowOptions = WorkflowOptions.newBuilder(options) .setRequestId(request.getRequestId()) @@ -87,7 +104,7 @@ public static WorkflowStub createNexusBoundStub( .setNexus( Callback.Nexus.newBuilder() .setUrl(request.getCallbackUrl()) - .putAllHeader(request.getCallbackHeaders()) + .putAllHeader(headers) .build()) .build())); if (options.getTaskQueue() == null) {