Skip to content

Commit

Permalink
Add test for BoundedTrieResult and some more plumbing
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitsinha54 committed Dec 25, 2024
1 parent 7a2509c commit 83b3d0d
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.metrics.BoundedTrieResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
Expand All @@ -27,6 +28,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;

Expand Down Expand Up @@ -82,4 +84,13 @@ public Iterable<MetricResult<StringSetResult>> getStringSets() {
StringSetResult.create(ImmutableSet.of("ab")),
StringSetResult.create(ImmutableSet.of("cd")));
}

@Override
public Iterable<MetricResult<BoundedTrieResult>> getBoundedTries() {
return makeResults(
"s3",
"n3",
BoundedTrieResult.create(ImmutableSet.of(ImmutableList.of("ab", String.valueOf(false)))),
BoundedTrieResult.create(ImmutableSet.of(ImmutableList.of("cd", String.valueOf(false)))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ public void testWriteMetricsWithCommittedSupported() throws Exception {
metricsHttpSink.writeMetrics(metricQueryResults);
countDownLatch.await();
String expected =
"{\"counters\":[{\"attempted\":20,\"committed\":10,\"name\":{\"name\":\"n1\","
"{\"boundedTries\":[{\"attempted\":{\"result\":[[\"cd\",\"false\"]]},"
+ "\"committed\":{\"result\":[[\"ab\",\"false\"]]},\"name\":{\"name\":\"n3\","
+ "\"namespace\":\"ns1\"},\"step\":\"s3\"}],"
+ "\"counters\":[{\"attempted\":20,\"committed\":10,\"name\":{\"name\":\"n1\","
+ "\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":"
+ "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"committed\":"
+ "{\"count\":2,\"max\":8,\"mean\":5.0,\"min\":5,\"sum\":10},\"name\":{\"name\":\"n2\","
Expand All @@ -111,7 +114,9 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception {
metricsHttpSink.writeMetrics(metricQueryResults);
countDownLatch.await();
String expected =
"{\"counters\":[{\"attempted\":20,\"name\":{\"name\":\"n1\","
"{\"boundedTries\":[{\"attempted\":{\"result\":[[\"cd\",\"false\"]]},"
+ "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}],"
+ "\"counters\":[{\"attempted\":20,\"name\":{\"name\":\"n1\","
+ "\"namespace\":\"ns1\"},\"step\":\"s1\"}],\"distributions\":[{\"attempted\":"
+ "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\""
+ ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@

/**
* <b>Internal:</b> For internal use only and not for public consumption. This API is subject to
* incompatible changes, or even removal, in a future release. A metric that represents a bounded
* trie data structure. This interface extends the {@link Metric} interface and provides methods for
* adding string sequences (paths) to the trie.
* incompatible changes, or even removal, in a future release.
*
* <p>A metric that represents a bounded trie data structure. This interface extends the {@link
* Metric} interface and provides methods for adding string sequences (paths) to the trie.
*
* <p>The trie is bounded in size (max=100), meaning it has a maximum capacity for storing paths.
* When the trie reaches its capacity, it truncates paths. This is useful for tracking and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

/**
* The result of a {@link BoundedTrie} metric. The {@link BoundedTrieResult} hold an immutable copy
* of the set from which it was initially created representing that a result cannot be modified once
* created.
* <b>Internal:</b> For internal use only and not for public consumption. This API is subject to
* incompatible changes, or even removal, in a future release.
*
* <p>The result of a {@link BoundedTrie} metric. The {@link BoundedTrieResult} hold an immutable
* copy of the set from which it was initially created representing that a result cannot be modified
* once created.
*/
@Internal
@AutoValue
public abstract class BoundedTrieResult {

public abstract Set<List<String>> result();
public abstract Set<List<String>> getResult();

/**
* Creates a {@link BoundedTrieResult} from the given {@link Set} by making an immutable copy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,24 @@ public static StringSet stringSet(String namespace, String name) {
}

/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(Class<?> namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}

/**
* Create a metric that accumulates and reports set of unique string values bounded to a max limit
*/
public static BoundedTrie boundedTrie(Class<?> namespace, String name) {
return new DelegatingBoundedTrie(MetricName.named(namespace, name));
}

/** Create a metric that accumulates and reports set of unique string values. */
/**
* Create a metric that accumulates and reports set of unique string values bounded to a max limit
*/
public static BoundedTrie boundedTrie(String namespace, String name) {
return new DelegatingBoundedTrie(MetricName.named(namespace, name));
}

/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(Class<?> namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}

/*
* A dedicated namespace for client throttling time. User DoFn can increment this metrics and then
* runner will put back pressure on scaling decision, if supported.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;

import java.util.List;
import java.util.Set;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;

public class BoundedTrieResultTest {

@Test
public void testCreate() {
Set<List<String>> inputSet =
ImmutableSet.of(
ImmutableList.of("a", "b"), ImmutableList.of("c", "d"), ImmutableList.of("e"));
BoundedTrieResult result = BoundedTrieResult.create(inputSet);
Assert.assertEquals(inputSet, result.getResult());
}

@Test
public void testCreate_empty() {
Set<List<String>> inputSet = ImmutableSet.of();
BoundedTrieResult result = BoundedTrieResult.create(inputSet);
Assert.assertEquals(inputSet, result.getResult());
}

@Test
public void testEmpty() {
BoundedTrieResult result = BoundedTrieResult.empty();
Assert.assertTrue(result.getResult().isEmpty());
}

@Test
public void testImmutability() {
Set<List<String>> inputSet =
ImmutableSet.of(
ImmutableList.of("a", "b"), ImmutableList.of("c", "d"), ImmutableList.of("e"));
BoundedTrieResult result = BoundedTrieResult.create(inputSet);

// Try to modify the set returned by getResult()
try {
result.getResult().add(ImmutableList.of("f"));
Assert.fail("UnsupportedOperationException expected");
} catch (UnsupportedOperationException expected) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTrackerStatus;
import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.BoundedTrieResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Distribution;
Expand All @@ -47,6 +49,7 @@
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Duration;
Expand All @@ -69,6 +72,8 @@ public class ExecutionStateSamplerTest {
private static final Gauge TEST_USER_GAUGE = Metrics.gauge("foo", "gauge");

private static final StringSet TEST_USER_STRING_SET = Metrics.stringSet("foo", "stringset");
private static final BoundedTrie TEST_USER_BOUNDED_TRIE =
Metrics.boundedTrie("foo", "boundedtrie");
private static final Histogram TEST_USER_HISTOGRAM =
new DelegatingHistogram(
MetricName.named("foo", "histogram"), HistogramData.LinearBuckets.of(0, 100, 1), false);
Expand Down Expand Up @@ -380,13 +385,15 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep
TEST_USER_DISTRIBUTION.update(2);
TEST_USER_GAUGE.set(3);
TEST_USER_STRING_SET.add("ab");
TEST_USER_BOUNDED_TRIE.add("bt_ab");
TEST_USER_HISTOGRAM.update(4);
state.deactivate();

TEST_USER_COUNTER.inc(11);
TEST_USER_DISTRIBUTION.update(12);
TEST_USER_GAUGE.set(13);
TEST_USER_STRING_SET.add("cd");
TEST_USER_BOUNDED_TRIE.add("bt_cd");
TEST_USER_HISTOGRAM.update(14);
TEST_USER_HISTOGRAM.update(14);

Expand Down Expand Up @@ -425,6 +432,14 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep
.getStringSet(TEST_USER_STRING_SET.getName())
.getCumulative()
.stringSet());
assertEquals(
BoundedTrieResult.create(ImmutableSet.of(ImmutableList.of("bt_ab", String.valueOf(false)))),
tracker
.getMetricsContainerRegistry()
.getContainer("ptransformId")
.getBoundedTrie(TEST_USER_BOUNDED_TRIE.getName())
.getCumulative()
.extractResult());
assertEquals(
1L,
(long)
Expand Down Expand Up @@ -471,6 +486,14 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep
.getStringSet(TEST_USER_STRING_SET.getName())
.getCumulative()
.stringSet());
assertEquals(
BoundedTrieResult.create(ImmutableSet.of(ImmutableList.of("bt_cd", String.valueOf(false)))),
tracker
.getMetricsContainerRegistry()
.getUnboundContainer()
.getBoundedTrie(TEST_USER_BOUNDED_TRIE.getName())
.getCumulative()
.extractResult());
assertEquals(
2L,
(long)
Expand Down

0 comments on commit 83b3d0d

Please sign in to comment.