-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Offload Beam Samza Metrics Update to Background Threads to Improve Performance #128
Offload Beam Samza Metrics Update to Background Threads to Improve Performance #128
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me. Thanks for the PR.
public static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS"; | ||
public static final String USE_SHORT_METRIC_NAMES_CONFIG = | ||
"beam.samza.metrics.useShortMetricNames"; | ||
public static final String COMMIT_ALL_METRIC_UPDATES = | ||
"beam.samza.metrics.commitAllMetricUpdates"; | ||
public static final String DEFER_TO_EXECUTOR_CONFIG = "beam.samza.metrics.deferToExecutor"; | ||
public static final String METRIC_UPDATE_INTERVAL_SEC_CONFIG = | ||
"beam.samza.metrics.updateIntervalSec"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe beam.samza.metrics.deferToExecutor.updateIntervalSec
to be more precise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I'll update it to that.
@@ -99,6 +242,7 @@ public void updateMetrics(String stepName) { | |||
|
|||
final DistributionUpdater updateDistribution = new DistributionUpdater(); | |||
results.getDistributions().forEach(updateDistribution); | |||
|
|||
if (commitAllMetricUpdates) { | |||
stepNameList.stream() | |||
.map(metricsContainers::getContainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One side effect of this feature: since we are batching the update every 1 second, the commitUpdates call may not give us as much benefit as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup we could test this out, possibly turning this optimization on and commitAllUpdates one off. But there is still value for committing all updates periodically since it probably relinquishes some memory every time its called.
// Update metrics one last time, ensuring that we're still committing from a single thread. | ||
// This guarantees that all remaining metrics are committed before shutting down the executor | ||
// service. | ||
commitMetricsForAllSteps(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% sure if this is useful since the AmfReporter might have been shutdown already. But there should be no harm of dong this.
useShortMetricNames, | ||
commitAllMetricUpdates); | ||
// Register a shutdown hook to gracefully shutdown the executor service | ||
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdownExecutorService)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen in the case where beam.samza.metrics.deferToExecutor
is set to false, but the shutdownExecutorService hook is added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line should handle it
if (scheduler != null && !scheduler.isShutdown()) {
scheduler would be null and so shutdownExecutorService would NOOP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this PR is experimental, we will be releasing a Beam version that will be available to other Flink jobs. Can we ensure that the job’s behavior remains the same as before when beam.samza.metrics.deferToExecutor
is set to false? Therefore, can we move Runtime.getRuntime().addShutdownHook
inside the if (deferToExecutor)
block?
private volatile boolean executorServiceStarted = false; | ||
private ScheduledExecutorService executorService; | ||
private ScheduledFuture<?> scheduledFuture; | ||
private final long metricUpdateIntervalSec; | ||
|
||
public SamzaMetricsContainer(MetricsRegistryMap metricsRegistry, Config config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure this SamzaMetricsContainer is a process-wise singleton? It might end up creating multiple thread pools if it's not. Any race conditions on updating metrics if it's not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m not entirely sure if SamzaMetricsContainer is a process-wide singleton. From what I see, it’s instantiated in SamzaExecutionContext, which should only occur once per container/JVM. However, I also see instances in both BoundedSourceSystem and UnboundedSourceSystem.
Having multiple thread pools might be fine—it could result in one per container plus the number of sources. If necessary, we can optimize this by moving it to a static instantiation. However, I think this can be addressed in a follow-up PR if needed.
As for race conditions, I don’t believe this code introduces any. If multiple instances of SamzaMetricsContainer exist, each with its own threads updating metrics, the original code would have been equally, if not more, susceptible to race conditions due to the frequency of updates. So, the current implementation shouldn’t increase that risk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update see this comment: https://github.com/linkedin/beam/pull/128/files#r1772470875, doing this should prevent us from introducing additional race conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right it seems like UnboundedSourceSystem will have a separate SamzaMetricsContainer for the input step, while SamzaExecutionContext have a SamzaMetricsContainer for the rest of the DoFns. Theoretically, there should be no overlap on the steps they try to update. There could be concurrent calls on the underlying MetricsRegistry.
public void updateMetrics(String stepName) { | ||
if (deferToExecutor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not start the thread pool when a SamzaMetricsContainer instance in created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if multiple threads call this method concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this more it's better to move this logic into constructor, which will remove the need for volatile and double check locking.
* service or updates the metrics immediately based on the deferToExecutor configuration flag. | ||
* | ||
* @param stepName the step name for which metrics are being updated | ||
*/ | ||
public void updateMetrics(String stepName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method updateMetrics
the only source of overhead that is running on the main
thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but once we move this off the main thread perhaps we'll uncover more bottlenecks.
* | ||
* @param stepName the step name for which metrics are being updated | ||
*/ | ||
private void updateMetricsInternal(String stepName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method thread-safe? Do we need thread-safety for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It’s possible that previously, only a single thread would enter updateMetricsInternal even with multiple instantiations. However, with the introduction of potential multiple thread pools (each committing metrics every second), we might indeed be introducing a race condition. For now, I’ll wrap the updateMetricsInternal call in a static synchronized block during periodic evaluations to ensure thread safety and avoid potential race conditions.
That said, this is an experimental PR, and we can validate the code further after testing and understanding this codebase better. If we find that synchronization isn’t necessary, it can be removed later.
ef149ec
to
59ef601
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
asAttemptedOnlyMetricResultsForSteps(metricsContainers, stepNameList); | ||
final MetricQueryResults results = metricResults.allMetrics(); | ||
MetricQueryResults results = metricResults.allMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the final
is removed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was removed during ./gradlew :runners:samza:spotlessApply
, but it shouldn't matter since I was the one that introduced it originally. I'm actually big proponent of making everything final where possible so I can introduce it back if I ever make anymore changes in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is an experimental PR that won’t affect the behavior of other jobs, shipped
PR Description:
This PR introduces a mechanism to offload the metrics update process from the main thread to background threads using a scheduled executor service. Key changes include:
Deferred Metrics Update:
Metrics updates are now committed periodically using a scheduled executor service. By reducing updates to once per second (rather than per message), we significantly lower the overhead on the main thread. The metrics backend (inGraphs) has a 1-minute granularity, so updating more frequently would not improve display accuracy.
Configurable Executor Service:
The update behavior is controlled by a configuration flag (
beam.samza.metrics.deferToExecutor
) and the interval is configurable, defaulting to 1 second.Synchronization Improvements:
Proper synchronization ensures that concurrent updates from multiple threads do not lead to inconsistencies in the metrics container.
Performance Gains:
By offloading metrics updates to background threads and batching them, we reduce the CPU consumption and avoid the overhead of frequent updates on the main thread. Since committing metrics and updating them is idempotent, updating at a lower frequency (e.g., once per second) won’t affect the display in inGraphs.
Expected Outcome:
This change is expected to reduce the load on the main thread, eliminating performance bottlenecks and improving throughput, especially in high-throughput environments like the CTC repartitioner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.