diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index feb763fb5fed..d45686494576 100644
--- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -352,7 +352,6 @@ message MonitoringInfoSpecs {
]
}];
- //import com.google.api.services.dataflow.model.PerWorkerMetrics;
API_REQUEST_LATENCIES = 20 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_latencies:v1",
type: "beam:metrics:histogram_int64:v1",
@@ -587,9 +586,7 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];
- // Encoding: ...
- // - iter: beam:coder:iterable:v1
- // - valueX: beam:coder:stringutf8:v1
+ // Represents histograms
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];
diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 4898bcdc401e..407a2fd4c423 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -38,19 +38,19 @@ test {
}
}
-// def google_api_services_dataflow = library.java.google_api_services_dataflow
-
dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
+ // implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill") // need histogram proto
+ provided library.java.google_api_services_dataflow
+
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
- // implementation library.java.proto_google_common_protos
implementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
@@ -62,5 +62,4 @@ dependencies {
provided library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation(library.java.google_api_services_dataflow)
- implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java
index eeda5f31707e..f45dd154eb9e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java
@@ -28,8 +28,6 @@
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
@@ -40,7 +38,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DefaultMetricResults extends MetricResults {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricResults.class);
private final Iterable> counters;
private final Iterable> distributions;
@@ -54,7 +51,6 @@ public DefaultMetricResults(
Iterable> gauges,
Iterable> stringSets,
Iterable> perWorkerHistograms) {
- LOG.info("xxx does this get here? DefaultMetricResults ");
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LockFreeHistogram.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LockFreeHistogram.java
deleted file mode 100644
index 884c535c2632..000000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LockFreeHistogram.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.metrics;
-
-import com.google.auto.value.AutoValue;
-import com.google.auto.value.extension.memoized.Memoized;
-import java.io.Serializable;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.metrics.Histogram;
-import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.util.HistogramData;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray;
-
-/**
- * A lock free implementation of {@link org.apache.beam.sdk.metrics.Histogram}. This class supports
- * extracting delta updates with the {@link #getSnapshotAndReset} method.
- */
-@ThreadSafe
-@Internal
-public final class LockFreeHistogram implements Histogram {
- private final HistogramData.BucketType bucketType;
- private final AtomicLongArray buckets;
- private final MetricName name;
- private final AtomicReference underflowStatistic;
- private final AtomicReference overflowStatistic;
-
- /**
- * Whether this histogram has updates that have not been extracted by {@code getSnapshotAndReset}.
- * This values should be flipped to true AFTER recording a value, and flipped to false BEFORE
- * extracting a snapshot. This ensures that recorded values will always be seen by a future {@code
- * getSnapshotAndReset} call.
- */
- private final AtomicBoolean dirty;
-
- /** Create a histogram. */
- public LockFreeHistogram(MetricName name, HistogramData.BucketType bucketType) {
- this.name = name;
- this.bucketType = bucketType;
- this.buckets = new AtomicLongArray(bucketType.getNumBuckets());
- this.underflowStatistic =
- new AtomicReference(OutlierStatistic.EMPTY);
- this.overflowStatistic =
- new AtomicReference(OutlierStatistic.EMPTY);
- this.dirty = new AtomicBoolean(false);
- }
-
- /**
- * Represents the sum and mean of a collection of numbers. Used to represent the
- * underflow/overflow statistics of a histogram.
- */
- @AutoValue
- public abstract static class OutlierStatistic implements Serializable {
- abstract double sum();
-
- public abstract long count();
-
- public static final OutlierStatistic EMPTY = create(0, 0);
-
- public static OutlierStatistic create(double sum, long count) {
- return new AutoValue_LockFreeHistogram_OutlierStatistic(sum, count);
- }
-
- public OutlierStatistic combine(double value) {
- return create(sum() + value, count() + 1);
- }
-
- public double mean() {
- if (count() == 0) {
- return 0;
- }
- return sum() / count();
- }
- }
-
- /**
- * The snapshot of a histogram. The snapshot contains the overflow/underflow statistic, number of
- * values recorded in each bucket, and the BucketType of the underlying histogram.
- */
- @AutoValue
- public abstract static class Snapshot {
- public abstract OutlierStatistic underflowStatistic();
-
- public abstract OutlierStatistic overflowStatistic();
-
- public abstract ImmutableLongArray buckets();
-
- public abstract HistogramData.BucketType bucketType();
-
- public static Snapshot create(
- OutlierStatistic underflowStatistic,
- OutlierStatistic overflowStatistic,
- ImmutableLongArray buckets,
- HistogramData.BucketType bucketType) {
- return new AutoValue_LockFreeHistogram_Snapshot(
- underflowStatistic, overflowStatistic, buckets, bucketType);
- }
-
- @Memoized
- public long totalCount() {
- long count = 0;
- count += underflowStatistic().count();
- count += overflowStatistic().count();
- count += buckets().stream().sum();
-
- return count;
- }
- }
-
- /**
- * Extract a delta update of this histogram. Update represents values that have been recorded in
- * this histogram since the last time this method was called.
- *
- *
If this histogram is being updated concurrent to this method, then the returned snapshot is
- * not guarenteed to contain those updates. However, those updates are not dropped and will be
- * represented in a future call to this method.
- *
- *
If this histogram has not been updated since the last call to this method, an empty optional
- * is returned.
- */
- public Optional getSnapshotAndReset() {
- if (!dirty.getAndSet(false)) {
- return Optional.empty();
- }
-
- ImmutableLongArray.Builder bucketsSnapshotBuilder =
- ImmutableLongArray.builder(buckets.length());
- for (int i = 0; i < buckets.length(); i++) {
- bucketsSnapshotBuilder.add(buckets.getAndSet(i, 0));
- }
- OutlierStatistic overflowSnapshot = overflowStatistic.getAndSet(OutlierStatistic.EMPTY);
- OutlierStatistic underflowSnapshot = underflowStatistic.getAndSet(OutlierStatistic.EMPTY);
-
- return Optional.of(
- Snapshot.create(
- underflowSnapshot, overflowSnapshot, bucketsSnapshotBuilder.build(), bucketType));
- }
-
- @Override
- public MetricName getName() {
- return name;
- }
-
- private void updateInternal(double value) {
- double rangeTo = bucketType.getRangeTo();
- double rangeFrom = bucketType.getRangeFrom();
- if (value >= rangeTo) {
- recordTopRecordsValue(value);
- } else if (value < rangeFrom) {
- recordBottomRecordsValue(value);
- } else {
- recordInBoundsValue(value);
- }
- }
-
- @Override
- public void update(double value) {
- updateInternal(value);
- dirty.set(true);
- }
-
- @Override
- public void update(double... values) {
- for (double value : values) {
- updateInternal(value);
- }
- dirty.set(true);
- }
-
- /** Record a inbounds value to the appropriate bucket. */
- private void recordInBoundsValue(double value) {
- int index = bucketType.getBucketIndex(value);
- if (index < 0 || index >= bucketType.getNumBuckets()) {
- return;
- }
-
- buckets.getAndIncrement(index);
- }
-
- /**
- * Record a new value in {@code overflowStatistic}. This method should only be called when a
- * Histogram is recording a value greater than the upper bound of it's largest bucket.
- *
- * @param value
- */
- private void recordTopRecordsValue(double value) {
- OutlierStatistic original;
- do {
- original = overflowStatistic.get();
- } while (!overflowStatistic.compareAndSet(original, original.combine(value)));
- }
-
- /**
- * Record a new value in {@code underflowStatistic}. This method should only be called when a
- * Histogram is recording a value smaller than the lowerbound bound of it's smallest bucket.
- */
- private void recordBottomRecordsValue(double value) {
- OutlierStatistic original;
- do {
- original = underflowStatistic.get();
- } while (!underflowStatistic.compareAndSet(original, original.combine(value)));
- }
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 1c15774c36e4..960ccb78fbd7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -92,22 +92,15 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
private MetricsMap gauges = new MetricsMap<>(GaugeCell::new);
- // Should it be a cell Instead?
- // Can this be a regular histogram instead of a cell'? see
- // dirty state acts as being lock free, commits only non dirty metrics.
- // also of type DISTRIBUTION_INT64_TYPE
- // refactor to use Lock free histograms? later?
private MetricsMap, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);
private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new);
- // assume the same bucket type?
private MetricsMap, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);
private MetricsContainerImpl(@Nullable String stepName, boolean isProcessWide) {
- LOG.info("xxx create metric container {}: isProcessWide {}", stepName, isProcessWide);
this.stepName = stepName;
this.isProcessWide = isProcessWide;
}
@@ -127,7 +120,6 @@ public MetricsContainerImpl(@Nullable String stepName) {
* collecting processWide metrics for HarnessMonitoringInfoRequest/Response.
*/
public static MetricsContainerImpl createProcessWideContainer() {
- LOG.info("xxx create createProcessWideContainer");
return new MetricsContainerImpl(null, true);
}
@@ -182,16 +174,8 @@ public DistributionCell getDistribution(MetricName metricName) {
@Override
public HistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
- // LOG.info("xxx stepName {}, getPerWorkerHistogram metric {}", stepName, metricName.getName());
- // if not enabled, return a no op container from parent class
- // if (!enablePerWorkerMetrics) {
- // // will be a no op
- // return null;
- // // return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
- // }
- // return no op histogram instead
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
- return val; // no null chceks for the others
+ return val;
}
/**
@@ -897,7 +881,6 @@ public static MetricsContainerImpl deltaContainer(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}
- // treat per worker histograms differently
for (Map.Entry, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
index 62d0f1a39c43..1d5ce08bfa46 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -52,7 +52,6 @@ public class MetricsContainerStepMap implements Serializable {
private Map metricsContainers;
private MetricsContainerImpl unboundContainer = new MetricsContainerImpl(null);
- private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerStepMap.class);
public MetricsContainerStepMap() {
this.metricsContainers = new ConcurrentHashMap<>();
@@ -142,13 +141,8 @@ public static MetricResults asMetricResults(
Map> gauges = new HashMap<>();
Map> sets = new HashMap<>();
Map> perWorkerHistograms = new HashMap<>();
- // LOG.info("xxx asMetricresults");
attemptedMetricsContainers.forEachMetricContainer(
container -> {
- LOG.info(
- "xxx asMetricResults {} per worker histogram size {}",
- container.stepName,
- container.getPerWorkerHistogram().size());
MetricUpdates cumulative = container.getCumulative();
mergeAttemptedResults(counters, cumulative.counterUpdates(), (l, r) -> l + r);
mergeAttemptedResults(
@@ -160,10 +154,6 @@ public static MetricResults asMetricResults(
});
committedMetricsContainers.forEachMetricContainer(
container -> {
- LOG.info(
- "xxx asMetricResults {} per worker histogram size {}",
- container.stepName,
- container.getPerWorkerHistogram().size());
MetricUpdates cumulative = container.getCumulative();
mergeCommittedResults(counters, cumulative.counterUpdates(), (l, r) -> l + r);
mergeCommittedResults(
@@ -173,7 +163,6 @@ public static MetricResults asMetricResults(
mergeCommittedResults(
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
});
- LOG.info("xxx export results {}", perWorkerHistograms.size());
perWorkerHistograms
.values()
.forEach(hist -> LOG.info("xxx {}", hist.getKey().metricName().getName()));
@@ -199,12 +188,7 @@ public Iterable getMonitoringInfos() {
ArrayList monitoringInfos = new ArrayList<>();
forEachMetricContainer(
container -> {
- LOG.info(
- "xxx get getMonitoringInfos {} per worker histogram size {}",
- container.stepName,
- container.getPerWorkerHistogram().size());
for (MonitoringInfo mi : container.getMonitoringInfos()) {
- LOG.info("xxx monitoring info {}", mi.toString());
monitoringInfos.add(mi);
}
});
@@ -218,11 +202,6 @@ public Map getMonitoringData(ShortIdMap shortIds) {
// it does get here.
forEachMetricContainer(
(container) -> {
- LOG.info(
- "xxx get getMonitoringData {} per worker histogram size {}, distribution size {}",
- container.stepName,
- container.getPerWorkerHistogram().size(),
- container.distributions().size());
container
.getPerWorkerHistogram()
.forEach((histogram, data) -> LOG.info("xxx {}", histogram.getKey().getName()));
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
index ff69e0d6a31e..a17c92492dab 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
@@ -17,15 +17,18 @@
*/
package org.apache.beam.runners.core.metrics;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.dataflow.model.BucketOptions;
+import com.google.api.services.dataflow.model.DataflowHistogramValue;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -38,8 +41,7 @@
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.api.services.dataflow.model.DataflowHistogramValue;
-
+import java.lang.reflect.Method;
// TODO Refactor out DataflowHistogramValue to be runner agnostic.
@@ -71,46 +73,50 @@ public static ByteString encodeInt64Distribution(DistributionData data) {
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
LOG.info("Xxx: data {}", inputHistogram.getPercentileString("poll latency", "seconds"));
try {
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.Builder outputHistogram =
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.newBuilder();
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
+
+ // try with new proto:, and add outlier stats as well.
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
// refactor out different bucket types?
if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
HistogramData.LinearBuckets buckets =
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
- Linear.Builder linearOptions =
- Linear.newBuilder()
- .setNumberOfBuckets(numberOfBuckets)
- .setWidth(buckets.getWidth())
- .setStart(buckets.getStart());
- outputHistogram.getBucketOptionsBuilder().setLinear(linearOptions);
+ com.google.api.services.dataflow.model.Linear linear =
+ new com.google.api.services.dataflow.model.Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ // set null value to help with parsing in decoding step
+ outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
+ LOG.info("xxx exp buckets");
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
- Base2Exponent.Builder exponentialOptions =
- Base2Exponent.newBuilder().setNumberOfBuckets(numberOfBuckets).setScale(buckets.getScale());
- outputHistogram.getBucketOptionsBuilder().setExponential(exponentialOptions);
- } else { // unsupported type
- // should an error be thrown here?
+ com.google.api.services.dataflow.model.Base2Exponent base2Exp =
+ new com.google.api.services.dataflow.model.Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ } else { // unsupported type
+ LOG.warn("Bucket type not recognized for histogram: {}", inputHistogram.toString());
}
- outputHistogram.setCount(inputHistogram.getTotalCount());
- LOG.info("xxx inputHistogram.getBucketType().getNumBuckets() {}", inputHistogram.getBucketType().getNumBuckets());
- for (int i = 0; i < inputHistogram.getBucketType().getNumBuckets(); i++) {
- LOG.info("xxx bucket counts {}, num buckets {}", i, inputHistogram.getBucketType().getNumBuckets());
- // dont count overflow and underflow records
- outputHistogram.addBucketCounts(inputHistogram.getCount(i));
- }
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
- Method[] methods = outputHistogram.getClass().getMethods();
- for (Method method : methods) {
- System.out.println(method.toString());
- }
- LOG.info("Xxx: encoded data {} ", outputHistogram.toString());
+ List bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
- // try with new proto:
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
- return outputHistogram.build().toByteString();
+ return ByteString.copyFromUtf8(jsonString);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -118,13 +124,49 @@ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
- // decode to DataflowHistogramValue, then create Histogram Data from it, and pass that along.
try {
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram outputHistogram =
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.parseFrom(payload);
- LOG.info("Xxx: data {}, {} ", outputHistogram.toString(), payload);
- return new HistogramData(outputHistogram);
- } catch (Exception e) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
+ LOG.info("xxx josn NOde pretty print {}", jsonNode.toPrettyString());
+ DataflowHistogramValue newHist = new DataflowHistogramValue();
+ newHist.setCount(jsonNode.get("count").asLong());
+
+ List bucketCounts = new ArrayList<>();
+ Iterator itr = jsonNode.get("bucketCounts").iterator();
+ while (itr.hasNext()) {
+ Long item = itr.next().asLong();
+ // do something with array elements
+ bucketCounts.add(item);
+ }
+ newHist.setBucketCounts(bucketCounts);
+
+ // only one will be set
+ LOG.info("xxx bucketOptions {}", jsonNode.get("bucketOptions").toString());
+ if (jsonNode.get("bucketOptions").has("linear")) {
+ com.google.api.services.dataflow.model.Linear linear =
+ new com.google.api.services.dataflow.model.Linear();
+ JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+ linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+ linear.setWidth(linearNode.get("width").asDouble());
+ linear.setStart(linearNode.get("start").asDouble());
+ LOG.info("xxx linear bucket: {}", linear);
+ newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+ } else {
+ // assume exp for now
+ com.google.api.services.dataflow.model.Base2Exponent base2Exp =
+ new com.google.api.services.dataflow.model.Base2Exponent();
+ JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+
+ base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+ base2Exp.setScale(expNode.get("scale").asInt());
+ newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ }
+
+ LOG.info("xxx jsonNode to proto {}", newHist.toString());
+ LOG.info("Xxx: data {} ", payload);
+ return new HistogramData(newHist); // update
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
index c4cef7d69c0e..f0054990b697 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
@@ -48,7 +48,6 @@ public class MonitoringInfoEncodingsTest {
public void testInt64DistributionEncoding() {
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
ByteString payload = encodeInt64Distribution(data);
- System.out.println("xxxx " + payload);
assertEquals(data, decodeInt64Distribution(payload));
}
@@ -64,15 +63,24 @@ public void testDoubleDistributionEncoding() {
}
@Test
- public void testHistgramInt64Encoding() {
+ public void testHistgramInt64EncodingLinearHist() {
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);
HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(5, 10, 15, 20);
- // LOG.info("Xxx: inputHistogram {}, {} ", inputHistogram.getBoun, payload);
ByteString payload = encodeInt64Histogram(inputHistogram);
- // HistogramData data = inputHistogram.extractResult();
- // System.out.println("xxx data {}" + data);
+
+ assertEquals(inputHistogram, decodeInt64Histogram(payload));
+ }
+
+ @Test
+ public void testHistgramInt64EncodingExpHist() {
+ HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
+
+ HistogramData inputHistogram = new HistogramData(buckets);
+ inputHistogram.record(2, 4, 8, 16, 32);
+ ByteString payload = encodeInt64Histogram(inputHistogram);
+ System.out.println("xxx payload: " + payload);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 9502eaabbeae..a74d637439b0 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -72,6 +72,7 @@ test {
dependencies {
antlr library.java.antlr
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
+ provided library.java.google_api_services_dataflow
// antlr is used to generate code from sdks/java/core/src/main/antlr/
permitUnusedDeclared library.java.antlr
// Required to load constants from the model, e.g. max timestamp for global window
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
index e6c3ac6843e3..5e68adfa9d69 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
@@ -64,14 +64,6 @@ public DelegatingHistogram(
this.bucketType = bucketType;
this.processWideContainer = processWideContainer;
this.perWorkerHistogram = perWorkerHistogram;
- // What is the container here?
- MetricsContainer container =
- processWideContainer
- ? MetricsEnvironment.getProcessWideContainer()
- : MetricsEnvironment.getCurrentContainer();
- if (container == null) {
- } else {
- }
}
private Optional getHistogram() {
@@ -80,14 +72,11 @@ private Optional getHistogram() {
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
if (container == null) {
- // LOG.info("xxx getHistogram container is null {}");
return Optional.empty();
}
if (perWorkerHistogram) {
- // LOG.info("xxx is this null? perWorkerHistogram {}", container.getPerWorkerHistogram(name, bucketType).toString());
return Optional.of(container.getPerWorkerHistogram(name, bucketType));
} else {
- // LOG.info("xxx is this null? histogram {}", container.getHistogram(name, bucketType).toString());
return Optional.of(container.getHistogram(name, bucketType));
}
}
@@ -100,6 +89,7 @@ public void update(double value) {
@Override
public void update(double... values) {
+ // is htis needed?
MetricsContainer container =
this.processWideContainer
? MetricsEnvironment.getProcessWideContainer()
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 345d0b775b5c..f6ba6b6d21e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -53,8 +53,6 @@
*/
public class Metrics {
- // private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);
-
private Metrics() {}
/**
@@ -158,7 +156,6 @@ private DelegatingGauge(MetricName name) {
public void set(long value) {
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
- // LOG.info("xxx delegating gauge container name {}", container.stepName);
container.getGauge(name).set(value);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index 3ed250cf7089..a4ac7b5a5886 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -63,7 +63,6 @@ default Counter getPerWorkerCounter(MetricName metricName) {
* Return the {@link Histogram} that should be used for implementing the given {@code metricName}
* in this container.
*/
- // Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType);
default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
throw new RuntimeException("Histogram metric is not supported yet.");
}
@@ -73,11 +72,6 @@ default Histogram getHistogram(MetricName metricName, HistogramData.BucketType b
*/
Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) ;
- // default Histogram getPerWorkerHistogram(
- // MetricName metricName, HistogramData.BucketType bucketType) {
- // return NoOpHistogram.getInstance();
- // }
-
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
default Iterable getMonitoringInfos() {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index e606a54d3a3b..3421bb4afc85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
index dc280e7e0a3a..ba3a2db5734e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
@@ -29,15 +29,14 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
+import com.google.api.services.dataflow.model.DataflowHistogramValue;
+import java.lang.reflect.Method;
/**
* A histogram that supports estimated percentile with linear interpolation.
*
*
We may consider using Apache Commons or HdrHistogram library in the future for advanced
- * features such as sparsely populated histograms.
+* features such as sparsely populated histograms.
*/
public class HistogramData implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(HistogramData.class);
@@ -77,10 +76,10 @@ public HistogramData(BucketType bucketType) {
this.sumOfSquaredDeviations = 0;
}
- public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram histogramProto) {
- // HistogramData newHist = null;
+ public HistogramData(com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto) {
+
int numBuckets;
- if(histogramProto.getBucketOptions().hasLinear()){
+ if(histogramProto.getBucketOptions().getLinear() != null){
double start = histogramProto.getBucketOptions().getLinear().getStart();
double width = histogramProto.getBucketOptions().getLinear().getWidth();
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
@@ -89,10 +88,9 @@ public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.H
this.buckets = new long[bucketType.getNumBuckets()];
// populate with bucket counts with mean type for now, not used to determine equality
- for (long val: histogramProto.getBucketCountsList()){
- this.buckets[idx] = val; // is this valid?
+ for (long val: histogramProto.getBucketCounts()){
+ this.buckets[idx] = val;
if (!(idx == 0 || idx == bucketType.getNumBuckets()-1 )){
- LOG.info("xxx {} {}", val, idx);
this.numBoundedBucketRecords+= val;
}
idx++;
@@ -107,18 +105,16 @@ public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.H
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];
// populate with bucket counts with mean type for now, not used to determine equality
- for (long val: histogramProto.getBucketCountsList()){
- this.buckets[idx] = val; // is this valid?
+ for (long val: histogramProto.getBucketCounts()){
+ this.buckets[idx] = val;
if (!(idx == 0 || idx == bucketType.getNumBuckets()-1 )){
this.numBoundedBucketRecords+= val;
}
idx++;
}
}
- LOG.info("xxx numBoundedBucketRecords when creating from proto {}", numBoundedBucketRecords);
}
-
public BucketType getBucketType() {
return this.bucketType;
}
@@ -224,6 +220,7 @@ public synchronized void incBucketCount(int bucketIndex, long count) {
}
public synchronized void incTopBucketCount(long count) {
+ LOG.info("xxx increment top bucket {}", count);
this.numTopRecords += count;
}
@@ -258,6 +255,7 @@ public synchronized void record(double value) {
double rangeTo = bucketType.getRangeTo();
double rangeFrom = bucketType.getRangeFrom();
if (value >= rangeTo) {
+ // LOG.info("xxx value, rangeTo {}, {}", value, rangeTo);
recordTopRecordsValue(value);
} else if (value < rangeFrom) {
recordBottomRecordsValue(value);
@@ -296,6 +294,7 @@ private synchronized void updateStatistics(double value) {
*
* @param value
*/
+ // out of bounds values
private synchronized void recordTopRecordsValue(double value) {
numTopRecords++;
topRecordsSum += value;
@@ -351,6 +350,10 @@ public synchronized long getCount(int bucketIndex) {
return buckets[bucketIndex];
}
+ public synchronized long[] getBucketCount() {
+ return buckets;
+ }
+
public synchronized long getTopBucketCount() {
return numTopRecords;
}
@@ -468,6 +471,7 @@ public double getInvLog2GrowthFactor() {
@Memoized
@Override
public double getRangeTo() {
+ // LOG.info("xxx {}, {}, range {}, {}", getBase(), getNumBuckets(), Math.pow(getBase(), getNumBuckets()), getScale());
return Math.pow(getBase(), getNumBuckets());
}
@@ -631,18 +635,12 @@ public double getRangeFrom() {
public double getRangeTo() {
return getStart() + getNumBuckets() * getWidth();
}
-
- // Note: equals() and hashCode() are implemented by the AutoValue.
}
@Override
public synchronized boolean equals(@Nullable Object object) {
if (object instanceof HistogramData) {
HistogramData other = (HistogramData) object;
- LOG.info("xxx {}, {}, {}", numBoundedBucketRecords == other.numBoundedBucketRecords, numBoundedBucketRecords, other.numBoundedBucketRecords);
- LOG.info("xxx {}", numTopRecords == other.numTopRecords);
- LOG.info("xxx {}", numBottomRecords == other.numBottomRecords);
- LOG.info("xxx {}", Arrays.equals(buckets, other.buckets));
synchronized (other) {
return Objects.equals(bucketType, other.bucketType)
diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java
index a3277de7f97b..a76fb59c7d10 100644
--- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java
+++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java
@@ -1,1164 +1,1164 @@
-// /*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-// package org.apache.beam.sdk.extensions.avro.coders;
-
-// import static org.hamcrest.MatcherAssert.assertThat;
-// import static org.hamcrest.Matchers.containsString;
-// import static org.hamcrest.Matchers.equalTo;
-// import static org.junit.Assert.assertArrayEquals;
-// import static org.junit.Assert.assertEquals;
-// import static org.junit.Assert.assertTrue;
-// import static org.junit.Assert.fail;
-
-// import com.esotericsoftware.kryo.Kryo;
-// import com.esotericsoftware.kryo.io.Input;
-// import com.esotericsoftware.kryo.io.Output;
-// import com.esotericsoftware.kryo.serializers.JavaSerializer;
-// import java.io.ByteArrayInputStream;
-// import java.io.ByteArrayOutputStream;
-// import java.io.ObjectInputStream;
-// import java.io.ObjectOutputStream;
-// import java.nio.ByteBuffer;
-// import java.util.ArrayList;
-// import java.util.Collection;
-// import java.util.HashSet;
-// import java.util.LinkedHashMap;
-// import java.util.List;
-// import java.util.Map;
-// import java.util.Objects;
-// import java.util.SortedMap;
-// import java.util.SortedSet;
-// import java.util.TreeMap;
-// import java.util.TreeSet;
-// import org.apache.avro.AvroRuntimeException;
-// import org.apache.avro.Schema;
-// import org.apache.avro.SchemaBuilder;
-// import org.apache.avro.generic.GenericData;
-// import org.apache.avro.generic.GenericRecord;
-// import org.apache.avro.io.DatumReader;
-// import org.apache.avro.reflect.AvroName;
-// import org.apache.avro.reflect.AvroSchema;
-// import org.apache.avro.reflect.ReflectData;
-// import org.apache.avro.reflect.Stringable;
-// import org.apache.avro.reflect.Union;
-// import org.apache.avro.specific.SpecificData;
-// import org.apache.avro.specific.SpecificDatumReader;
-// import org.apache.avro.specific.SpecificRecord;
-// import org.apache.avro.util.Utf8;
-// import org.apache.beam.sdk.coders.Coder.Context;
-// import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-// import org.apache.beam.sdk.coders.DefaultCoder;
-// import org.apache.beam.sdk.coders.SerializableCoder;
-// import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvro;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroConversion;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroConversionFactory;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroFactory;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroNested;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestEnum;
-// import org.apache.beam.sdk.extensions.avro.schemas.fixed4;
-// import org.apache.beam.sdk.testing.CoderProperties;
-// import org.apache.beam.sdk.testing.InterceptingUrlClassLoader;
-// import org.apache.beam.sdk.testing.NeedsRunner;
-// import org.apache.beam.sdk.testing.PAssert;
-// import org.apache.beam.sdk.testing.TestPipeline;
-// import org.apache.beam.sdk.transforms.Create;
-// import org.apache.beam.sdk.transforms.DoFn;
-// import org.apache.beam.sdk.transforms.ParDo;
-// import org.apache.beam.sdk.util.CoderUtils;
-// import org.apache.beam.sdk.util.InstanceBuilder;
-// import org.apache.beam.sdk.util.SerializableUtils;
-// import org.apache.beam.sdk.values.PCollection;
-// import org.apache.beam.sdk.values.TypeDescriptor;
-// import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-// import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-// import org.checkerframework.checker.nullness.qual.Nullable;
-// import org.hamcrest.Description;
-// import org.hamcrest.Matcher;
-// import org.hamcrest.Matchers;
-// import org.hamcrest.TypeSafeMatcher;
-// import org.joda.time.DateTime;
-// import org.joda.time.DateTimeZone;
-// import org.joda.time.LocalDate;
-// import org.junit.Rule;
-// import org.junit.Test;
-// import org.junit.experimental.categories.Category;
-// import org.junit.runner.RunWith;
-// import org.junit.runners.JUnit4;
-// import org.objenesis.strategy.StdInstantiatorStrategy;
-
-// /** Tests for {@link AvroCoder}. */
-// @RunWith(JUnit4.class)
-// public class AvroCoderTest {
-
-// public static final DateTime DATETIME_A =
-// new DateTime().withDate(1994, 10, 31).withZone(DateTimeZone.UTC);
-// public static final DateTime DATETIME_B =
-// new DateTime().withDate(1997, 4, 25).withZone(DateTimeZone.UTC);
-// private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42);
-// private static final TestAvro AVRO_SPECIFIC_RECORD =
-// TestAvroFactory.newInstance(
-// true,
-// 43,
-// 44L,
-// 44.1f,
-// 44.2d,
-// "mystring",
-// ByteBuffer.wrap(new byte[] {1, 2, 3, 4}),
-// new fixed4(new byte[] {1, 2, 3, 4}),
-// new LocalDate(1979, 3, 14),
-// new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4),
-// TestEnum.abc,
-// AVRO_NESTED_SPECIFIC_RECORD,
-// ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD),
-// ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", AVRO_NESTED_SPECIFIC_RECORD));
-
-// private static final String VERSION_AVRO = Schema.class.getPackage().getImplementationVersion();
-
-// @DefaultCoder(AvroCoder.class)
-// private static class Pojo {
-// public String text;
-// public int count;
-
-// @AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
-// public DateTime timestamp;
-
-// // Empty constructor required for Avro decoding.
-// @SuppressWarnings("unused")
-// public Pojo() {}
-
-// public Pojo(String text, int count, DateTime timestamp) {
-// this.text = text;
-// this.count = count;
-// this.timestamp = timestamp;
-// }
-
-// @Override
-// public boolean equals(@Nullable Object other) {
-// if (this == other) {
-// return true;
-// }
-// if (other == null || getClass() != other.getClass()) {
-// return false;
-// }
-// Pojo that = (Pojo) other;
-// return this.count == that.count
-// && Objects.equals(this.text, that.text)
-// && Objects.equals(this.timestamp, that.timestamp);
-// }
-
-// @Override
-// public int hashCode() {
-// return Objects.hash(text, count, timestamp);
-// }
-
-// @Override
-// public String toString() {
-// return "Pojo{"
-// + "text='"
-// + text
-// + '\''
-// + ", count="
-// + count
-// + ", timestamp="
-// + timestamp
-// + '}';
-// }
-// }
-
-// private static class GetTextFn extends DoFn {
-// @ProcessElement
-// public void processElement(ProcessContext c) {
-// c.output(c.element().text);
-// }
-// }
-
-// @Rule public TestPipeline pipeline = TestPipeline.create();
-
-// @Test
-// public void testAvroCoderEncoding() throws Exception {
-// AvroCoder coder = AvroCoder.of(Pojo.class);
-// CoderProperties.coderSerializable(coder);
-// AvroCoder copy = SerializableUtils.clone(coder);
-
-// Pojo pojo = new Pojo("foo", 3, DATETIME_A);
-// Pojo equalPojo = new Pojo("foo", 3, DATETIME_A);
-// Pojo otherPojo = new Pojo("bar", -19, DATETIME_B);
-// CoderProperties.coderConsistentWithEquals(coder, pojo, equalPojo);
-// CoderProperties.coderConsistentWithEquals(copy, pojo, equalPojo);
-// CoderProperties.coderConsistentWithEquals(coder, pojo, otherPojo);
-// CoderProperties.coderConsistentWithEquals(copy, pojo, otherPojo);
-// }
-
-// /**
-// * Tests that {@link AvroCoder} works around issues in Avro where cache classes might be from the
-// * wrong ClassLoader, causing confusing "Cannot cast X to X" error messages.
-// */
-// @SuppressWarnings("ReturnValueIgnored")
-// @Test
-// public void testTwoClassLoaders() throws Exception {
-// ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-// ClassLoader loader1 =
-// new InterceptingUrlClassLoader(contextClassLoader, AvroCoderTestPojo.class.getName());
-// ClassLoader loader2 =
-// new InterceptingUrlClassLoader(contextClassLoader, AvroCoderTestPojo.class.getName());
-
-// Class> pojoClass1 = loader1.loadClass(AvroCoderTestPojo.class.getName());
-// Class> pojoClass2 = loader2.loadClass(AvroCoderTestPojo.class.getName());
-
-// Object pojo1 = InstanceBuilder.ofType(pojoClass1).withArg(String.class, "hello").build();
-// Object pojo2 = InstanceBuilder.ofType(pojoClass2).withArg(String.class, "goodbye").build();
-
-// // Confirm incompatibility
-// try {
-// pojoClass2.cast(pojo1);
-// fail("Expected ClassCastException; without it, this test is vacuous");
-// } catch (ClassCastException e) {
-// // g2g
-// }
-
-// // The first coder is expected to populate the Avro SpecificData cache
-// // The second coder is expected to be corrupted if the caching is done wrong.
-// AvroCoder