diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java index d52c2e515077..90dc058ff448 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/SamzaMetricsContainer.java @@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import org.apache.beam.runners.core.metrics.DefaultMetricResults; import org.apache.beam.runners.core.metrics.DistributionData; @@ -56,7 +57,6 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class SamzaMetricsContainer { - private static final Logger LOG = LoggerFactory.getLogger(SamzaMetricsContainer.class); private static final String BEAM_METRICS_GROUP = "BeamMetrics"; public static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS"; @@ -65,39 +65,58 @@ public class SamzaMetricsContainer { public static final String COMMIT_ALL_METRIC_UPDATES = "beam.samza.metrics.commitAllMetricUpdates"; public static final String DEFER_TO_EXECUTOR_CONFIG = "beam.samza.metrics.deferToExecutor"; - public static final String METRIC_UPDATE_INTERVAL_SEC_CONFIG = - "beam.samza.metrics.updateIntervalSec"; - + public static final String DEFER_TO_EXECUTOR_UPDATE_INTERVAL_SEC_CONFIG = + "beam.samza.metrics.deferToExecutor.updateIntervalSec"; private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); private final MetricsRegistryMap metricsRegistry; private final boolean useShortMetricNames; private final boolean commitAllMetricUpdates; private final boolean deferToExecutor; private final Set activeStepNames = ConcurrentHashMap.newKeySet(); - // technically this doesn't need to be volatile, but needed to pass spotbugs check - private volatile boolean executorServiceStarted = false; - private ScheduledExecutorService executorService; + private ScheduledExecutorService scheduler; private ScheduledFuture scheduledFuture; - private final long metricUpdateIntervalSec; + // Static AtomicInteger to ensure thread-safe incrementing of instance IDs + private static final AtomicInteger instanceCounter = new AtomicInteger(0); public SamzaMetricsContainer(MetricsRegistryMap metricsRegistry, Config config) { + // Assign a unique ID to this instance by incrementing the static counter + // Instance-specific ID + int instanceId = getInstanceId(); this.metricsRegistry = metricsRegistry; this.useShortMetricNames = config.getBoolean(USE_SHORT_METRIC_NAMES_CONFIG, false); this.commitAllMetricUpdates = config.getBoolean(COMMIT_ALL_METRIC_UPDATES, false); this.deferToExecutor = config.getBoolean(DEFER_TO_EXECUTOR_CONFIG, false); - metricUpdateIntervalSec = config.getLong(METRIC_UPDATE_INTERVAL_SEC_CONFIG, 1L); + long metricUpdateIntervalSec = config.getLong(DEFER_TO_EXECUTOR_UPDATE_INTERVAL_SEC_CONFIG, 1L); this.metricsRegistry.metrics().put(BEAM_METRICS_GROUP, new ConcurrentHashMap<>()); - LOG.info( - "Creating Samza metrics container with deferToExecutor={}, metricUpdateIntervalSec={}, useShortMetricNames={}, commitAllMetricUpdates={}", + "Creating Samza metrics container (instanceId={}) with deferToExecutor={}, metricUpdateIntervalSec={}, useShortMetricNames={}, commitAllMetricUpdates={}", + instanceId, deferToExecutor, metricUpdateIntervalSec, useShortMetricNames, commitAllMetricUpdates); - // Register a shutdown hook to gracefully shutdown the executor service + // Initialize the executor service if needed based on configuration + if (deferToExecutor) { + scheduler = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("MetricsUpdater-instance-" + instanceId + "-thread-%d") + .build()); + scheduledFuture = + scheduler.scheduleAtFixedRate( + this::commitPeriodicMetricsForAllSteps, 0, metricUpdateIntervalSec, TimeUnit.SECONDS); + LOG.info("Executor service for instance {} has been started.", instanceId); + } + // Register a shutdown hook to gracefully shut down the executor service Runtime.getRuntime().addShutdownHook(new Thread(this::shutdownExecutorService)); } + // Static method to return a unique ID for each instance + private static int getInstanceId() { + return instanceCounter.incrementAndGet(); + } + public MetricsContainer getContainer(String stepName) { return this.metricsContainers.getContainer(stepName); } @@ -114,73 +133,22 @@ public MetricsContainerStepMap getContainers() { */ public void updateMetrics(String stepName) { if (deferToExecutor) { - // Start executor service if it hasn't been started yet - startExecutorServiceIfNeeded(); - - // If using the executor, just schedule the step name for updates. + // Add the step name to the active steps set if (activeStepNames.add(stepName)) { LOG.info("Added step '{}' for deferred metrics update.", stepName); } } else { - // If not deferring to the executor, update metrics immediately. + // Update metrics immediately if not deferring to the executor updateMetricsInternal(stepName); } } - /** - * Starts the executor service if it hasn't already been started, using a double-checked locking - * pattern. - * - *

The double-checked locking pattern optimizes the synchronization process: 1. First, the - * `executorServiceStarted` variable is checked outside the synchronized block. This avoids - * entering a synchronized block unnecessarily if the service has already been started. 2. If the - * service hasn't been started, synchronization is used to ensure that only one thread can start - * the executor service. The variable is checked again inside the synchronized block to avoid race - * conditions in a multi-threaded environment. - * - *

**Visibility Guarantees**: The `synchronized` block ensures that changes to - * `executorServiceStarted` made by one thread are visible to all other threads. This happens - * because the `synchronized` keyword guarantees a "happens-before" relationship, ensuring that - * updates to shared variables are safely published and visible to other threads. - * - *

This method ensures that the executor service, which periodically commits metrics updates, - * is initialized only once, minimizing resource usage. - */ - private void startExecutorServiceIfNeeded() { - if (!executorServiceStarted) { // First check (outside synchronized block) - synchronized (this) { - if (!executorServiceStarted) { // Second check (inside synchronized block) - final ScheduledExecutorService scheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("MetricsUpdater-thread") - .build()); - scheduledFuture = - scheduler.scheduleAtFixedRate( - () -> { - try { - commitMetricsForAllSteps(); - } catch (Exception e) { - LOG.error("Error occurred during periodic metrics update", e); - } - }, - 0, - metricUpdateIntervalSec, - TimeUnit.SECONDS); - executorServiceStarted = true; - LOG.info("Started executor service for periodic metrics updates."); - } - } - } - } - /** * Shutdown the executor service and cancel the scheduled task. Before shutting down, update * metrics one last time to ensure completeness. */ private void shutdownExecutorService() { - if (executorService != null && !executorService.isShutdown()) { + if (scheduler != null && !scheduler.isShutdown()) { LOG.info("Shutting down executor service..."); // Cancel the scheduled task if it's still running. @@ -194,37 +162,51 @@ private void shutdownExecutorService() { // Update metrics one last time, ensuring that we're still committing from a single thread. // This guarantees that all remaining metrics are committed before shutting down the executor // service. - commitMetricsForAllSteps(); + commitPeriodicMetricsForAllSteps(); // Shutdown the executor service gracefully. // Allow any currently executing tasks to finish, then terminate the service. If the shutdown // process takes longer than 5 seconds, force a shutdown to ensure the service is stopped. - executorService.shutdown(); + scheduler.shutdown(); try { - if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { LOG.warn("Forcing shutdown of executor service..."); - executorService.shutdownNow(); + scheduler.shutdownNow(); } } catch (InterruptedException e) { LOG.error("Interrupted during shutdown, forcing shutdown now", e); - executorService.shutdownNow(); + scheduler.shutdownNow(); Thread.currentThread().interrupt(); } } } /** - * This method commits metrics updates for all scheduled steps. It is invoked periodically by the - * scheduled executor service when `deferToExecutor` is set to true and also during shutdown to - * ensure that any remaining metrics are committed before terminating the executor. + * Commits metrics updates for all scheduled steps, invoked periodically. + * + *

This method is called periodically by the scheduled executor service when `deferToExecutor` + * is set to true, ensuring that metrics for all active steps are updated at the defined interval. + * It is also invoked during shutdown to ensure that any remaining metrics are committed before + * terminating the executor. + * + *

Synchronization is applied to ensure thread-safe updates when multiple instances might share + * the same resources. */ - private void commitMetricsForAllSteps() { - activeStepNames.forEach(this::updateMetricsInternal); + private void commitPeriodicMetricsForAllSteps() { + synchronized (SamzaMetricsContainer.class) { + activeStepNames.forEach(this::updateMetricsInternal); + } } /** - * Private method containing the core logic for updating metrics. This is either called - * immediately or by the executor service, depending on the configuration. + * Updates metrics for a given step. + * + *

This method contains the core logic for updating metrics, including counters, gauges, and + * distributions for the specified step. It can be called either immediately or by the scheduled + * executor service, based on the `deferToExecutor` configuration. + * + *

This method assumes that it is called in a thread-safe manner, as it does not implement + * synchronization internally. * * @param stepName the step name for which metrics are being updated */