Skip to content

Commit

Permalink
Switch to use ConcurrentMap for StringSetData (#33057)
Browse files Browse the repository at this point in the history
* Switch to use ConcurrentMap for StringSetData

* address comments
  • Loading branch information
Abacn authored Nov 12, 2024
1 parent 628348b commit 2604943
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -54,13 +54,13 @@ public static StringSetData create(Set<String> set) {
if (set.isEmpty()) {
return empty();
}
HashSet<String> combined = new HashSet<>();
Set<String> combined = ConcurrentHashMap.newKeySet();
long stringSize = addUntilCapacity(combined, 0L, set);
return new AutoValue_StringSetData(combined, stringSize);
}

/** Returns a {@link StringSetData} which is made from the given set in place. */
private static StringSetData createInPlace(HashSet<String> set, long stringSize) {
private static StringSetData createInPlace(Set<String> set, long stringSize) {
return new AutoValue_StringSetData(set, stringSize);
}

Expand All @@ -76,11 +76,12 @@ public static StringSetData empty() {
* <p>>Should only be used by {@link StringSetCell#add}.
*/
public StringSetData addAll(String... strings) {
HashSet<String> combined;
if (this.stringSet() instanceof HashSet) {
combined = (HashSet<String>) this.stringSet();
Set<String> combined;
if (this.stringSet() instanceof ConcurrentHashMap.KeySetView) {
combined = this.stringSet();
} else {
combined = new HashSet<>(this.stringSet());
combined = ConcurrentHashMap.newKeySet();
combined.addAll(this.stringSet());
}
long stringSize = addUntilCapacity(combined, this.stringSize(), Arrays.asList(strings));
return StringSetData.createInPlace(combined, stringSize);
Expand All @@ -95,7 +96,8 @@ public StringSetData combine(StringSetData other) {
} else if (other.stringSet().isEmpty()) {
return this;
} else {
HashSet<String> combined = new HashSet<>(this.stringSet());
Set<String> combined = ConcurrentHashMap.newKeySet();
combined.addAll(this.stringSet());
long stringSize = addUntilCapacity(combined, this.stringSize(), other.stringSet());
return StringSetData.createInPlace(combined, stringSize);
}
Expand All @@ -105,7 +107,8 @@ public StringSetData combine(StringSetData other) {
* Combines this {@link StringSetData} with others, all original StringSetData are left intact.
*/
public StringSetData combine(Iterable<StringSetData> others) {
HashSet<String> combined = new HashSet<>(this.stringSet());
Set<String> combined = ConcurrentHashMap.newKeySet();
combined.addAll(this.stringSet());
long stringSize = this.stringSize();
for (StringSetData other : others) {
stringSize = addUntilCapacity(combined, stringSize, other.stringSet());
Expand All @@ -120,7 +123,7 @@ public StringSetResult extractResult() {

/** Add strings into set until reach capacity. Return the all string size of added set. */
private static long addUntilCapacity(
HashSet<String> combined, long currentSize, Iterable<String> others) {
Set<String> combined, long currentSize, Iterable<String> others) {
if (currentSize > STRING_SET_SIZE_LIMIT) {
// already at capacity
return currentSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.junit.Assert;
Expand Down Expand Up @@ -94,4 +100,42 @@ public void testReset() {
assertThat(stringSetCell.getCumulative(), equalTo(StringSetData.empty()));
assertThat(stringSetCell.getDirty(), equalTo(new DirtyState()));
}

@Test(timeout = 5000)
public void testStringSetCellConcurrentAddRetrieval() throws InterruptedException {
StringSetCell cell = new StringSetCell(MetricName.named("namespace", "name"));
AtomicBoolean finished = new AtomicBoolean(false);
Thread increment =
new Thread(
() -> {
for (long i = 0; !finished.get(); ++i) {
cell.add(String.valueOf(i));
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
});
increment.start();
Instant start = Instant.now();
try {
while (true) {
Set<String> s = cell.getCumulative().stringSet();
List<String> snapshot = new ArrayList<>(s);
if (Instant.now().isAfter(start.plusSeconds(3)) && snapshot.size() > 0) {
finished.compareAndSet(false, true);
break;
}
}
} finally {
increment.interrupt();
increment.join();
}

Set<String> s = cell.getCumulative().stringSet();
for (long i = 0; i < s.size(); ++i) {
assertTrue(s.contains(String.valueOf(i)));
}
}
}

0 comments on commit 2604943

Please sign in to comment.