Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ModRyanFu committed Sep 24, 2024
1 parent f7fe51a commit 59ef601
Showing 1 changed file with 60 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.beam.runners.core.metrics.DefaultMetricResults;
import org.apache.beam.runners.core.metrics.DistributionData;
Expand Down Expand Up @@ -56,7 +57,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SamzaMetricsContainer {

private static final Logger LOG = LoggerFactory.getLogger(SamzaMetricsContainer.class);
private static final String BEAM_METRICS_GROUP = "BeamMetrics";
public static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS";
Expand All @@ -65,39 +65,58 @@ public class SamzaMetricsContainer {
public static final String COMMIT_ALL_METRIC_UPDATES =
"beam.samza.metrics.commitAllMetricUpdates";
public static final String DEFER_TO_EXECUTOR_CONFIG = "beam.samza.metrics.deferToExecutor";
public static final String METRIC_UPDATE_INTERVAL_SEC_CONFIG =
"beam.samza.metrics.updateIntervalSec";

public static final String DEFER_TO_EXECUTOR_UPDATE_INTERVAL_SEC_CONFIG =
"beam.samza.metrics.deferToExecutor.updateIntervalSec";
private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
private final MetricsRegistryMap metricsRegistry;
private final boolean useShortMetricNames;
private final boolean commitAllMetricUpdates;
private final boolean deferToExecutor;
private final Set<String> activeStepNames = ConcurrentHashMap.newKeySet();
// technically this doesn't need to be volatile, but needed to pass spotbugs check
private volatile boolean executorServiceStarted = false;
private ScheduledExecutorService executorService;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private final long metricUpdateIntervalSec;
// Static AtomicInteger to ensure thread-safe incrementing of instance IDs
private static final AtomicInteger instanceCounter = new AtomicInteger(0);

public SamzaMetricsContainer(MetricsRegistryMap metricsRegistry, Config config) {
// Assign a unique ID to this instance by incrementing the static counter
// Instance-specific ID
int instanceId = getInstanceId();
this.metricsRegistry = metricsRegistry;
this.useShortMetricNames = config.getBoolean(USE_SHORT_METRIC_NAMES_CONFIG, false);
this.commitAllMetricUpdates = config.getBoolean(COMMIT_ALL_METRIC_UPDATES, false);
this.deferToExecutor = config.getBoolean(DEFER_TO_EXECUTOR_CONFIG, false);
metricUpdateIntervalSec = config.getLong(METRIC_UPDATE_INTERVAL_SEC_CONFIG, 1L);
long metricUpdateIntervalSec = config.getLong(DEFER_TO_EXECUTOR_UPDATE_INTERVAL_SEC_CONFIG, 1L);
this.metricsRegistry.metrics().put(BEAM_METRICS_GROUP, new ConcurrentHashMap<>());

LOG.info(
"Creating Samza metrics container with deferToExecutor={}, metricUpdateIntervalSec={}, useShortMetricNames={}, commitAllMetricUpdates={}",
"Creating Samza metrics container (instanceId={}) with deferToExecutor={}, metricUpdateIntervalSec={}, useShortMetricNames={}, commitAllMetricUpdates={}",
instanceId,
deferToExecutor,
metricUpdateIntervalSec,
useShortMetricNames,
commitAllMetricUpdates);
// Register a shutdown hook to gracefully shutdown the executor service
// Initialize the executor service if needed based on configuration
if (deferToExecutor) {
scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("MetricsUpdater-instance-" + instanceId + "-thread-%d")
.build());
scheduledFuture =
scheduler.scheduleAtFixedRate(
this::commitPeriodicMetricsForAllSteps, 0, metricUpdateIntervalSec, TimeUnit.SECONDS);
LOG.info("Executor service for instance {} has been started.", instanceId);
}
// Register a shutdown hook to gracefully shut down the executor service
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdownExecutorService));
}

// Static method to return a unique ID for each instance
private static int getInstanceId() {
return instanceCounter.incrementAndGet();
}

public MetricsContainer getContainer(String stepName) {
return this.metricsContainers.getContainer(stepName);
}
Expand All @@ -114,73 +133,22 @@ public MetricsContainerStepMap getContainers() {
*/
public void updateMetrics(String stepName) {
if (deferToExecutor) {
// Start executor service if it hasn't been started yet
startExecutorServiceIfNeeded();

// If using the executor, just schedule the step name for updates.
// Add the step name to the active steps set
if (activeStepNames.add(stepName)) {
LOG.info("Added step '{}' for deferred metrics update.", stepName);
}
} else {
// If not deferring to the executor, update metrics immediately.
// Update metrics immediately if not deferring to the executor
updateMetricsInternal(stepName);
}
}

/**
* Starts the executor service if it hasn't already been started, using a double-checked locking
* pattern.
*
* <p>The double-checked locking pattern optimizes the synchronization process: 1. First, the
* `executorServiceStarted` variable is checked outside the synchronized block. This avoids
* entering a synchronized block unnecessarily if the service has already been started. 2. If the
* service hasn't been started, synchronization is used to ensure that only one thread can start
* the executor service. The variable is checked again inside the synchronized block to avoid race
* conditions in a multi-threaded environment.
*
* <p>**Visibility Guarantees**: The `synchronized` block ensures that changes to
* `executorServiceStarted` made by one thread are visible to all other threads. This happens
* because the `synchronized` keyword guarantees a "happens-before" relationship, ensuring that
* updates to shared variables are safely published and visible to other threads.
*
* <p>This method ensures that the executor service, which periodically commits metrics updates,
* is initialized only once, minimizing resource usage.
*/
private void startExecutorServiceIfNeeded() {
if (!executorServiceStarted) { // First check (outside synchronized block)
synchronized (this) {
if (!executorServiceStarted) { // Second check (inside synchronized block)
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("MetricsUpdater-thread")
.build());
scheduledFuture =
scheduler.scheduleAtFixedRate(
() -> {
try {
commitMetricsForAllSteps();
} catch (Exception e) {
LOG.error("Error occurred during periodic metrics update", e);
}
},
0,
metricUpdateIntervalSec,
TimeUnit.SECONDS);
executorServiceStarted = true;
LOG.info("Started executor service for periodic metrics updates.");
}
}
}
}

/**
* Shutdown the executor service and cancel the scheduled task. Before shutting down, update
* metrics one last time to ensure completeness.
*/
private void shutdownExecutorService() {
if (executorService != null && !executorService.isShutdown()) {
if (scheduler != null && !scheduler.isShutdown()) {
LOG.info("Shutting down executor service...");

// Cancel the scheduled task if it's still running.
Expand All @@ -194,37 +162,51 @@ private void shutdownExecutorService() {
// Update metrics one last time, ensuring that we're still committing from a single thread.
// This guarantees that all remaining metrics are committed before shutting down the executor
// service.
commitMetricsForAllSteps();
commitPeriodicMetricsForAllSteps();

// Shutdown the executor service gracefully.
// Allow any currently executing tasks to finish, then terminate the service. If the shutdown
// process takes longer than 5 seconds, force a shutdown to ensure the service is stopped.
executorService.shutdown();
scheduler.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("Forcing shutdown of executor service...");
executorService.shutdownNow();
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Interrupted during shutdown, forcing shutdown now", e);
executorService.shutdownNow();
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

/**
* This method commits metrics updates for all scheduled steps. It is invoked periodically by the
* scheduled executor service when `deferToExecutor` is set to true and also during shutdown to
* ensure that any remaining metrics are committed before terminating the executor.
* Commits metrics updates for all scheduled steps, invoked periodically.
*
* <p>This method is called periodically by the scheduled executor service when `deferToExecutor`
* is set to true, ensuring that metrics for all active steps are updated at the defined interval.
* It is also invoked during shutdown to ensure that any remaining metrics are committed before
* terminating the executor.
*
* <p>Synchronization is applied to ensure thread-safe updates when multiple instances might share
* the same resources.
*/
private void commitMetricsForAllSteps() {
activeStepNames.forEach(this::updateMetricsInternal);
private void commitPeriodicMetricsForAllSteps() {
synchronized (SamzaMetricsContainer.class) {
activeStepNames.forEach(this::updateMetricsInternal);
}
}

/**
* Private method containing the core logic for updating metrics. This is either called
* immediately or by the executor service, depending on the configuration.
* Updates metrics for a given step.
*
* <p>This method contains the core logic for updating metrics, including counters, gauges, and
* distributions for the specified step. It can be called either immediately or by the scheduled
* executor service, based on the `deferToExecutor` configuration.
*
* <p>This method assumes that it is called in a thread-safe manner, as it does not implement
* synchronization internally.
*
* @param stepName the step name for which metrics are being updated
*/
Expand Down

0 comments on commit 59ef601

Please sign in to comment.