Skip to content

Commit

Permalink
Address comments, perform deep copies and support synchronization for…
Browse files Browse the repository at this point in the history
… mutable BoundedTrieData
  • Loading branch information
rohitsinha54 committed Dec 19, 2024
1 parent 38ae8e9 commit 2697eb8
Show file tree
Hide file tree
Showing 9 changed files with 431 additions and 316 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand All @@ -33,44 +31,37 @@
* In that case retrieving the underlying cell and reporting directly to it avoids a step of
* indirection.
*/
// TODO: Write multi-threaded test in MetricContainerImp for this Cell class too.
public class BoundedTrieCell implements BoundedTrie, MetricCell<BoundedTrieData> {

private final DirtyState dirty = new DirtyState();
private final AtomicReference<BoundedTrieData> setValue =
new AtomicReference<>(BoundedTrieData.empty());
private final BoundedTrieData value;
private final MetricName name;

/**
* Generally, runners should construct instances using the methods in {@link
* MetricsContainerImpl}, unless they need to define their own version of {@link
* MetricsContainer}. These constructors are *only* public so runners can instantiate.
*/
public BoundedTrieCell(MetricName name) {
this.name = name;
this.value = new BoundedTrieData();
}

public void update(BoundedTrieCell other) {
this.value.combine(other.value);
dirty.afterModification();
}

@Override
public void reset() {
setValue.set(BoundedTrieData.empty());
value.clear();
dirty.reset();
}

void update(BoundedTrieData data) {
BoundedTrieData original;
do {
original = setValue.get();
} while (!setValue.compareAndSet(original, original.combine(data)));
dirty.afterModification();
}

@Override
public DirtyState getDirty() {
return dirty;
}

@Override
public BoundedTrieData getCumulative() {
return setValue.get();
return value.getCumulative();
}

@Override
Expand All @@ -83,23 +74,20 @@ public boolean equals(@Nullable Object object) {
if (object instanceof BoundedTrieCell) {
BoundedTrieCell boundedTrieCell = (BoundedTrieCell) object;
return Objects.equals(dirty, boundedTrieCell.dirty)
&& Objects.equals(setValue.get(), boundedTrieCell.setValue.get())
&& Objects.equals(value, boundedTrieCell.value)
&& Objects.equals(name, boundedTrieCell.name);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(dirty, setValue.get(), name);
return Objects.hash(dirty, value, name);
}

@Override
public void add(Iterable<String> values) {
BoundedTrieData original;
do {
original = setValue.get();
} while (!setValue.compareAndSet(original, original.add(values)));
this.value.add(values);
dirty.afterModification();
}

Expand Down
Loading

0 comments on commit 2697eb8

Please sign in to comment.