diff --git a/service/worker/deletenamespace/deleteexecutions/activities.go b/service/worker/deletenamespace/deleteexecutions/activities.go index 28b42922219..d67e7b03bb8 100644 --- a/service/worker/deletenamespace/deleteexecutions/activities.go +++ b/service/worker/deletenamespace/deleteexecutions/activities.go @@ -26,6 +26,7 @@ package deleteexecutions import ( "context" + "time" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" @@ -126,10 +127,29 @@ func (a *LocalActivities) GetNextPageTokenActivity(ctx context.Context, params G func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params DeleteExecutionsActivityParams) (DeleteExecutionsActivityResult, error) { ctx = headers.SetCallerName(ctx, params.Namespace.String()) - rateLimiter := quotas.NewRateLimiter(float64(params.RPS), params.RPS) + progressCh := make(chan DeleteExecutionsActivityResult, 1) + defer func() { close(progressCh) }() + + go func() { + heartbeatTicker := time.NewTicker(deleteWorkflowExecutionsActivityOptions.HeartbeatTimeout / 2) + defer heartbeatTicker.Stop() + + var lastKnownProgress DeleteExecutionsActivityResult + for { + select { + case progress, chOpen := <-progressCh: + if !chOpen { + // Stop heartbeating when a channel is closed, i.e., activity is completed. + return + } + lastKnownProgress = progress + case <-heartbeatTicker.C: + activity.RecordHeartbeat(ctx, lastKnownProgress) + } + } + }() var result DeleteExecutionsActivityResult - req := &manager.ListWorkflowExecutionsRequestV2{ NamespaceID: params.NamespaceID, Namespace: params.Namespace, @@ -143,6 +163,7 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete a.logger.Error("Unable to list all workflow executions.", tag.WorkflowNamespace(params.Namespace.String()), tag.Error(err)) return result, err } + rateLimiter := quotas.NewRateLimiter(float64(params.RPS), params.RPS) for _, execution := range resp.Executions { err = rateLimiter.Wait(ctx) if err != nil { @@ -175,11 +196,14 @@ func (a *Activities) DeleteExecutionsActivity(ctx context.Context, params Delete metrics.DeleteExecutionFailuresCount.With(a.metricsHandler).Record(1) a.logger.Error("Unable to delete workflow execution.", tag.WorkflowNamespace(params.Namespace.String()), tag.WorkflowID(execution.Execution.GetWorkflowId()), tag.WorkflowRunID(execution.Execution.GetRunId()), tag.Error(err)) } - activity.RecordHeartbeat(ctx, result) select { + case progressCh <- result: + // Send the current result to heartbeat go routine. case <-ctx.Done(): + // Stop deletion on cancellation. return result, ctx.Err() default: + // Don't block deletion if a progress channel is full. } } return result, nil diff --git a/service/worker/deletenamespace/reclaimresources/workflow.go b/service/worker/deletenamespace/reclaimresources/workflow.go index d40b7e658bd..50671d94a5a 100644 --- a/service/worker/deletenamespace/reclaimresources/workflow.go +++ b/service/worker/deletenamespace/reclaimresources/workflow.go @@ -73,8 +73,7 @@ var ( } deleteExecutionsWorkflowOptions = workflow.ChildWorkflowOptions{ - RetryPolicy: retryPolicy, - WorkflowRunTimeout: 60 * time.Minute, + RetryPolicy: retryPolicy, } ensureNoExecutionsActivityRetryPolicy = &temporal.RetryPolicy{