Skip to content

Commit

Permalink
Merge branch 'add_hist_to_runner_implementation' into add_kafka_sdf_p…
Browse files Browse the repository at this point in the history
…oll_metrics
  • Loading branch information
Naireen committed Nov 12, 2024
2 parents 9432961 + 24ce41c commit 79ee70f
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,25 +378,6 @@ message MonitoringInfoSpecs {
value: "URN utilized to report user metric."
}]
}];

// Represents per worker metrics
PER_WORKER_LATENCY_METRIC= 23 [(monitoring_info_spec) = {
urn: "beam:metrics:per_worker_metric:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: [
"PTRANSFORM"
],
annotations: [
{
key: "description",
value: "Histogram counts for request latencies made to IO service APIs to batch read or write elements."
},
{
key: "units",
value: "Milliseconds"
}
]
}];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public MetricUpdates getUpdates() {
return metricToMonitoringMetadata(
metricKey,
MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE,
MonitoringInfoConstants.Urns.PER_WORKER_LATENCY_METRIC);
MonitoringInfoConstants.Urns.USER_PER_WORKER_HISTOGRAM);
}

/**
Expand Down Expand Up @@ -656,7 +656,7 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
break;

case PER_WORKER_HISTOGRAM_TYPE:
updateForPerWorkerHistogramInt64(monitoringInfo); // use type, and not urn info
updateForPerWorkerHistogramInt64(monitoringInfo);
break;
default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
Expand Down Expand Up @@ -830,6 +830,7 @@ public static MetricsContainerImpl deltaContainer(
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}
// Do the same for perWorkerHistograms
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;

// TODO(naireenhussain): Refactor out DataflowHistogramValue to be runner agnostic, and rename to
// TODO(#33093): Refactor out DataflowHistogramValue to be runner agnostic, and rename to
// remove Dataflow reference.

/** A set of functions used to encode and decode common monitoring info types. */
Expand Down

0 comments on commit 79ee70f

Please sign in to comment.