Skip to content

Commit

Permalink
Add per worker histogram to portable runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Nov 8, 2024
1 parent 4f8b923 commit 7db8de1
Show file tree
Hide file tree
Showing 22 changed files with 350 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,36 @@ message MonitoringInfoSpecs {
}
]
}];


USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// Represents per worker metrics
PER_WORKER_LATENCY_METRIC= 23 [(monitoring_info_spec) = {
urn: "beam:metrics:per_worker_metric:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: [
"PTRANSFORM"
],
annotations: [
{
key: "description",
value: "Histogram counts for request latencies made to IO service APIs to batch read or write elements."
},
{
key: "units",
value: "Milliseconds"
}
]
}];
}
}

Expand Down Expand Up @@ -576,6 +606,10 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

PER_WORKER_HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];


// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -42,16 +43,19 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.perWorkerHistograms = perWorkerHistograms;
}

@Override
Expand All @@ -62,6 +66,9 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
perWorkerHistograms,
perWorkerHistogram -> MetricFiltering.matches(filter, perWorkerHistogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void update(HistogramCell other) {
dirty.afterModification();
}

@Override
public void update(HistogramData data) {
this.value.update(data);
dirty.afterModification();
}

// TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing
// the infinite buckets as well.
// and remove the incTopBucketCount and incBotBucketCount methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Representation of multiple metric updates. */
Expand All @@ -34,6 +35,7 @@ public abstract class MetricUpdates {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
Expand Down Expand Up @@ -66,21 +68,30 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
/** All the sets updates. */
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

/** All the histogram updates. */
public abstract Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates) {
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates);
counterUpdates,
distributionUpdates,
gaugeUpdates,
stringSetUpdates,
perWorkerHistogramsUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates())
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates());
&& Iterables.isEmpty(stringSetUpdates())
&& Iterables.isEmpty(perWorkerHistogramsUpdates());
}
}
Loading

0 comments on commit 7db8de1

Please sign in to comment.