Skip to content

Commit

Permalink
Add configurable commitAllMetricUpdates
Browse files Browse the repository at this point in the history
  • Loading branch information
ModRyanFu committed Aug 30, 2024
1 parent 10074a9 commit 0a54c22
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,18 @@ public class SamzaMetricsContainer {
public static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS";
public static final String USE_SHORT_METRIC_NAMES_CONFIG =
"beam.samza.metrics.useShortMetricNames";
public static final String COMMIT_ALL_METRIC_UPDATES =
"beam.samza.metrics.commitAllMetricUpdates";

private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
private final MetricsRegistryMap metricsRegistry;
private final boolean useShortMetricNames;
private final boolean commitAllMetricUpdates;

public SamzaMetricsContainer(MetricsRegistryMap metricsRegistry, Config config) {
this.metricsRegistry = metricsRegistry;
this.useShortMetricNames = config.getBoolean(USE_SHORT_METRIC_NAMES_CONFIG, false);
this.commitAllMetricUpdates = config.getBoolean(COMMIT_ALL_METRIC_UPDATES, false);
this.metricsRegistry.metrics().put(BEAM_METRICS_GROUP, new ConcurrentHashMap<>());
LOG.info("Creating Samza metrics container with userShortMetricName = {}", useShortMetricNames);
}
Expand All @@ -81,11 +85,10 @@ public MetricsContainerStepMap getContainers() {
public void updateMetrics(String stepName) {

assert metricsRegistry != null;

final List<String> stepNameList = Arrays.asList(stepName, GLOBAL_CONTAINER_STEP_NAME);
// Since global metrics do not belong to any step, we need to update it in every step.
final MetricResults metricResults =
asAttemptedOnlyMetricResultsForSteps(
metricsContainers, Arrays.asList(stepName, GLOBAL_CONTAINER_STEP_NAME));
asAttemptedOnlyMetricResultsForSteps(metricsContainers, stepNameList);
final MetricQueryResults results = metricResults.allMetrics();

final CounterUpdater updateCounter = new CounterUpdater();
Expand All @@ -96,6 +99,11 @@ public void updateMetrics(String stepName) {

final DistributionUpdater updateDistribution = new DistributionUpdater();
results.getDistributions().forEach(updateDistribution);
if (commitAllMetricUpdates) {
stepNameList.stream()
.map(metricsContainers::getContainer)
.forEach(MetricsContainerImpl::commitUpdates);
}
}

public void updateExecutableStageBundleMetric(String metricName, long time) {
Expand Down

0 comments on commit 0a54c22

Please sign in to comment.