From 40813b3f0158090370472a29d82f54cdb3f53ed5 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Fri, 23 Aug 2024 17:44:34 +0000 Subject: [PATCH] Add idle_shutdown_timeout=5m flag --- .../java/org/apache/beam/runners/prism/PrismExecutor.java | 1 + .../apache/beam/runners/prism/PrismPipelineOptions.java | 7 +++++++ .../java/org/apache/beam/runners/prism/PrismRunner.java | 5 ++++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java index bd45f5834be8..fda5db923a7f 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java @@ -45,6 +45,7 @@ abstract class PrismExecutor { private static final Logger LOG = LoggerFactory.getLogger(PrismExecutor.class); + static final String IDLE_SHUTDOWN_TIMEOUT = "-idle_shutdown_timeout=%s"; static final String JOB_PORT_FLAG_TEMPLATE = "-job_port=%s"; static final String SERVE_HTTP_FLAG_TEMPLATE = "-serve_http=%s"; diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java index f93628917004..6a6ca4e615d0 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineOptions.java @@ -51,4 +51,11 @@ public interface PrismPipelineOptions extends PortablePipelineOptions { Boolean getEnableWebUI(); void setEnableWebUI(Boolean enableWebUI); + + @Description( + "Duration, represented as a String, that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\".") + @Default.String("5m") + String getIdleShutdownTimeout(); + + void setIdleShutdownTimeout(String idleShutdownTimeout); } diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java index 926ca9b3dfac..6099db4b63ee 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java @@ -98,13 +98,16 @@ PrismExecutor startPrism() throws IOException { String serveHttpFlag = String.format( PrismExecutor.SERVE_HTTP_FLAG_TEMPLATE, prismPipelineOptions.getEnableWebUI()); + String idleShutdownTimeoutFlag = + String.format( + PrismExecutor.IDLE_SHUTDOWN_TIMEOUT, prismPipelineOptions.getIdleShutdownTimeout()); String endpoint = "localhost:" + port; prismPipelineOptions.setJobEndpoint(endpoint); String command = locator.resolve(); PrismExecutor executor = PrismExecutor.builder() .setCommand(command) - .setArguments(Arrays.asList(portFlag, serveHttpFlag)) + .setArguments(Arrays.asList(portFlag, serveHttpFlag, idleShutdownTimeoutFlag)) .build(); executor.execute(); checkState(executor.isAlive());