Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload Beam Samza Metrics Update to Background Threads to Improve Performance #128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin<Project> {

// Automatically use the official release version if we are performing a release
// otherwise append '-SNAPSHOT'
project.version = '2.45.27'
project.version = '2.45.28'
if (isLinkedin(project)) {
project.ext.mavenGroupId = 'com.linkedin.beam'
}
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
# buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
# To build a custom Beam version make sure you change it in both places, see
# https://github.com/apache/beam/issues/21302.
version=2.45.27
sdk_version=2.45.27
version=2.45.28
sdk_version=2.45.28

javaVersion=1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.beam.runners.core.metrics.DefaultMetricResults;
import org.apache.beam.runners.core.metrics.DistributionData;
Expand All @@ -34,6 +40,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
Expand All @@ -50,27 +57,64 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class SamzaMetricsContainer {

private static final Logger LOG = LoggerFactory.getLogger(SamzaMetricsContainer.class);
private static final String BEAM_METRICS_GROUP = "BeamMetrics";
// global metrics container is the default container that can be used in user threads
public static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS";
public static final String USE_SHORT_METRIC_NAMES_CONFIG =
"beam.samza.metrics.useShortMetricNames";
public static final String COMMIT_ALL_METRIC_UPDATES =
"beam.samza.metrics.commitAllMetricUpdates";

public static final String DEFER_TO_EXECUTOR_CONFIG = "beam.samza.metrics.deferToExecutor";
public static final String DEFER_TO_EXECUTOR_UPDATE_INTERVAL_SEC_CONFIG =
"beam.samza.metrics.deferToExecutor.updateIntervalSec";
private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
private final MetricsRegistryMap metricsRegistry;
private final boolean useShortMetricNames;
private final boolean commitAllMetricUpdates;
private final boolean deferToExecutor;
private final Set<String> activeStepNames = ConcurrentHashMap.newKeySet();
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
// Static AtomicInteger to ensure thread-safe incrementing of instance IDs
private static final AtomicInteger instanceCounter = new AtomicInteger(0);

public SamzaMetricsContainer(MetricsRegistryMap metricsRegistry, Config config) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this SamzaMetricsContainer is a process-wise singleton? It might end up creating multiple thread pools if it's not. Any race conditions on updating metrics if it's not?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not entirely sure if SamzaMetricsContainer is a process-wide singleton. From what I see, it’s instantiated in SamzaExecutionContext, which should only occur once per container/JVM. However, I also see instances in both BoundedSourceSystem and UnboundedSourceSystem.

Having multiple thread pools might be fine—it could result in one per container plus the number of sources. If necessary, we can optimize this by moving it to a static instantiation. However, I think this can be addressed in a follow-up PR if needed.

As for race conditions, I don’t believe this code introduces any. If multiple instances of SamzaMetricsContainer exist, each with its own threads updating metrics, the original code would have been equally, if not more, susceptible to race conditions due to the frequency of updates. So, the current implementation shouldn’t increase that risk.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update see this comment: https://github.com/linkedin/beam/pull/128/files#r1772470875, doing this should prevent us from introducing additional race conditions.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right it seems like UnboundedSourceSystem will have a separate SamzaMetricsContainer for the input step, while SamzaExecutionContext have a SamzaMetricsContainer for the rest of the DoFns. Theoretically, there should be no overlap on the steps they try to update. There could be concurrent calls on the underlying MetricsRegistry.

// Assign a unique ID to this instance by incrementing the static counter
// Instance-specific ID
int instanceId = getInstanceId();
this.metricsRegistry = metricsRegistry;
this.useShortMetricNames = config.getBoolean(USE_SHORT_METRIC_NAMES_CONFIG, false);
this.commitAllMetricUpdates = config.getBoolean(COMMIT_ALL_METRIC_UPDATES, false);
this.deferToExecutor = config.getBoolean(DEFER_TO_EXECUTOR_CONFIG, false);
long metricUpdateIntervalSec = config.getLong(DEFER_TO_EXECUTOR_UPDATE_INTERVAL_SEC_CONFIG, 1L);
this.metricsRegistry.metrics().put(BEAM_METRICS_GROUP, new ConcurrentHashMap<>());
LOG.info("Creating Samza metrics container with userShortMetricName = {}", useShortMetricNames);
LOG.info(
"Creating Samza metrics container (instanceId={}) with deferToExecutor={}, metricUpdateIntervalSec={}, useShortMetricNames={}, commitAllMetricUpdates={}",
instanceId,
deferToExecutor,
metricUpdateIntervalSec,
useShortMetricNames,
commitAllMetricUpdates);
// Initialize the executor service if needed based on configuration
if (deferToExecutor) {
scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("MetricsUpdater-instance-" + instanceId + "-thread-%d")
.build());
scheduledFuture =
scheduler.scheduleAtFixedRate(
this::commitPeriodicMetricsForAllSteps, 0, metricUpdateIntervalSec, TimeUnit.SECONDS);
LOG.info("Executor service for instance {} has been started.", instanceId);
}
// Register a shutdown hook to gracefully shut down the executor service
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdownExecutorService));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen in the case where beam.samza.metrics.deferToExecutor is set to false, but the shutdownExecutorService hook is added?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should handle it

    if (scheduler != null && !scheduler.isShutdown()) {

scheduler would be null and so shutdownExecutorService would NOOP.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this PR is experimental, we will be releasing a Beam version that will be available to other Flink jobs. Can we ensure that the job’s behavior remains the same as before when beam.samza.metrics.deferToExecutor is set to false? Therefore, can we move Runtime.getRuntime().addShutdownHook inside the if (deferToExecutor) block?

}

// Static method to return a unique ID for each instance
private static int getInstanceId() {
return instanceCounter.incrementAndGet();
}

public MetricsContainer getContainer(String stepName) {
Expand All @@ -81,15 +125,96 @@ public MetricsContainerStepMap getContainers() {
return this.metricsContainers;
}

/** Update Beam metrics to Samza metrics for the current step and global step. */
/**
* This is the public method for updating metrics. It either defers the update to the executor
* service or updates the metrics immediately based on the deferToExecutor configuration flag.
*
* @param stepName the step name for which metrics are being updated
*/
public void updateMetrics(String stepName) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method updateMetrics the only source of overhead that is running on the main thread?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but once we move this off the main thread perhaps we'll uncover more bottlenecks.

if (deferToExecutor) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not start the thread pool when a SamzaMetricsContainer instance in created?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if multiple threads call this method concurrently?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this more it's better to move this logic into constructor, which will remove the need for volatile and double check locking.

// Add the step name to the active steps set
if (activeStepNames.add(stepName)) {
LOG.info("Added step '{}' for deferred metrics update.", stepName);
}
} else {
// Update metrics immediately if not deferring to the executor
updateMetricsInternal(stepName);
}
}

/**
* Shutdown the executor service and cancel the scheduled task. Before shutting down, update
* metrics one last time to ensure completeness.
*/
private void shutdownExecutorService() {
if (scheduler != null && !scheduler.isShutdown()) {
LOG.info("Shutting down executor service...");

// Cancel the scheduled task if it's still running.
// This ensures that any periodic metrics updates are stopped and no further updates
// are scheduled once we begin the shutdown process.
if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
LOG.info("Cancelling scheduled metrics updates...");
scheduledFuture.cancel(true);
}

// Update metrics one last time, ensuring that we're still committing from a single thread.
// This guarantees that all remaining metrics are committed before shutting down the executor
// service.
commitPeriodicMetricsForAllSteps();

// Shutdown the executor service gracefully.
// Allow any currently executing tasks to finish, then terminate the service. If the shutdown
// process takes longer than 5 seconds, force a shutdown to ensure the service is stopped.
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.warn("Forcing shutdown of executor service...");
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Interrupted during shutdown, forcing shutdown now", e);
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

/**
* Commits metrics updates for all scheduled steps, invoked periodically.
*
* <p>This method is called periodically by the scheduled executor service when `deferToExecutor`
* is set to true, ensuring that metrics for all active steps are updated at the defined interval.
* It is also invoked during shutdown to ensure that any remaining metrics are committed before
* terminating the executor.
*
* <p>Synchronization is applied to ensure thread-safe updates when multiple instances might share
* the same resources.
*/
private void commitPeriodicMetricsForAllSteps() {
synchronized (SamzaMetricsContainer.class) {
activeStepNames.forEach(this::updateMetricsInternal);
}
}

assert metricsRegistry != null;
final List<String> stepNameList = Arrays.asList(stepName, GLOBAL_CONTAINER_STEP_NAME);
// Since global metrics do not belong to any step, we need to update it in every step.
final MetricResults metricResults =
/**
* Updates metrics for a given step.
*
* <p>This method contains the core logic for updating metrics, including counters, gauges, and
* distributions for the specified step. It can be called either immediately or by the scheduled
* executor service, based on the `deferToExecutor` configuration.
*
* <p>This method assumes that it is called in a thread-safe manner, as it does not implement
* synchronization internally.
*
* @param stepName the step name for which metrics are being updated
*/
private void updateMetricsInternal(String stepName) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method thread-safe? Do we need thread-safety for this method?

Copy link
Author

@FuRyanf FuRyanf Sep 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It’s possible that previously, only a single thread would enter updateMetricsInternal even with multiple instantiations. However, with the introduction of potential multiple thread pools (each committing metrics every second), we might indeed be introducing a race condition. For now, I’ll wrap the updateMetricsInternal call in a static synchronized block during periodic evaluations to ensure thread safety and avoid potential race conditions.

That said, this is an experimental PR, and we can validate the code further after testing and understanding this codebase better. If we find that synchronization isn’t necessary, it can be removed later.

List<String> stepNameList = Arrays.asList(stepName, GLOBAL_CONTAINER_STEP_NAME);
MetricResults metricResults =
asAttemptedOnlyMetricResultsForSteps(metricsContainers, stepNameList);
final MetricQueryResults results = metricResults.allMetrics();
MetricQueryResults results = metricResults.allMetrics();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the final is removed here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was removed during ./gradlew :runners:samza:spotlessApply, but it shouldn't matter since I was the one that introduced it originally. I'm actually big proponent of making everything final where possible so I can introduce it back if I ever make anymore changes in this class.


final CounterUpdater updateCounter = new CounterUpdater();
results.getCounters().forEach(updateCounter);
Expand All @@ -99,6 +224,7 @@ public void updateMetrics(String stepName) {

final DistributionUpdater updateDistribution = new DistributionUpdater();
results.getDistributions().forEach(updateDistribution);

if (commitAllMetricUpdates) {
stepNameList.stream()
.map(metricsContainers::getContainer)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One side effect of this feature: since we are batching the update every 1 second, the commitUpdates call may not give us as much benefit as before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup we could test this out, possibly turning this optimization on and commitAllUpdates one off. But there is still value for committing all updates periodically since it probably relinquishes some memory every time its called.

Expand Down
Loading