Skip to content

Commit

Permalink
Add per worker histogram to portable runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Nov 12, 2024
1 parent b387721 commit 377f753
Show file tree
Hide file tree
Showing 23 changed files with 331 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,17 @@ message MonitoringInfoSpecs {
}
]
}];


USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];
}
}

Expand Down Expand Up @@ -576,6 +587,10 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

PER_WORKER_HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];


// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -42,16 +43,19 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.perWorkerHistograms = perWorkerHistograms;
}

@Override
Expand All @@ -62,6 +66,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
perWorkerHistograms,
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void update(HistogramCell other) {
dirty.afterModification();
}

@Override
public void update(HistogramData data) {
this.value.update(data);
dirty.afterModification();
}

// TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing
// the infinite buckets as well.
// and remove the incTopBucketCount and incBotBucketCount methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Representation of multiple metric updates. */
Expand All @@ -34,6 +35,7 @@ public abstract class MetricUpdates {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
Expand Down Expand Up @@ -66,21 +68,30 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
/** All the sets updates. */
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

/** All the histogram updates. */
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates) {
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates);
counterUpdates,
distributionUpdates,
gaugeUpdates,
stringSetUpdates,
perWorkerHistogramsUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates())
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates());
&& Iterables.isEmpty(stringSetUpdates())
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
}
}
Loading

0 comments on commit 377f753

Please sign in to comment.