From 0ae82c2916c40036cfcf67afe5d4074e266f64a6 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Wed, 27 Nov 2024 13:28:11 +0100 Subject: [PATCH] fix(temporal): fix schedule workflow search attributes --- .../engine/activities/temporal_schedule_create.go | 14 ++++++++++++++ .../connectors/engine/workflow/plugin_workflow.go | 6 ------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/connectors/engine/activities/temporal_schedule_create.go b/internal/connectors/engine/activities/temporal_schedule_create.go index 8325c8ed..c33d5f25 100644 --- a/internal/connectors/engine/activities/temporal_schedule_create.go +++ b/internal/connectors/engine/activities/temporal_schedule_create.go @@ -6,6 +6,7 @@ import ( "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" ) @@ -20,6 +21,19 @@ type ScheduleCreateOptions struct { } func (a Activities) TemporalScheduleCreate(ctx context.Context, options ScheduleCreateOptions) (string, error) { + attributes := make([]temporal.SearchAttributeUpdate, 0, len(options.SearchAttributes)) + for key, value := range options.SearchAttributes { + v, ok := value.(string) + if !ok { + continue + } + + attributes = append(attributes, + temporal.NewSearchAttributeKeyKeyword(key).ValueSet(v), + ) + } + options.Action.TypedSearchAttributes = temporal.NewSearchAttributes(attributes...) + handle, err := a.temporalClient.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: options.ScheduleID, Spec: client.ScheduleSpec{ diff --git a/internal/connectors/engine/workflow/plugin_workflow.go b/internal/connectors/engine/workflow/plugin_workflow.go index cf38f36d..011b921d 100644 --- a/internal/connectors/engine/workflow/plugin_workflow.go +++ b/internal/connectors/engine/workflow/plugin_workflow.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/client" - "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" ) @@ -123,11 +122,6 @@ func (w Workflow) run( task.NextTasks, }, TaskQueue: connectorID.String(), - // Search attributes are used to query workflows - TypedSearchAttributes: temporal.NewSearchAttributes( - temporal.NewSearchAttributeKeyKeyword(SearchAttributeScheduleID).ValueSet(scheduleID), - temporal.NewSearchAttributeKeyKeyword(SearchAttributeStack).ValueSet(w.stack), - ), }, Overlap: enums.SCHEDULE_OVERLAP_POLICY_SKIP, TriggerImmediately: true,