From f2f5bf6a293cb5d0b8e34e9ff9e19fe5e559a92a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Fri, 25 Aug 2023 23:14:37 -0700 Subject: [PATCH] Don't improperly filter newly-added elements that overlap with a delete. --- .../worker/WindmillStateInternals.java | 9 +-- .../worker/WindmillStateInternalsTest.java | 65 +++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java index 8b11786e406e..d4edc0afc0b1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java @@ -945,7 +945,9 @@ public Iterable> readRange( // that the ids don't overlap with any in pendingAdds, so begin with pendingAdds.size(). Iterable> data = new Iterable>() { - private Iterable> iterable = future.get(); + // Anything returned from windmill that has been deleted should be ignored. + private Iterable> iterable = + Iterables.filter(future.get(), tv -> !pendingDeletes.contains(tv.getTimestamp())); @Override public Iterator> iterator() { @@ -970,9 +972,8 @@ public TimestampedValueWithId next() { Iterables.mergeSorted( ImmutableList.of(data, pendingInRange), TimestampedValueWithId.COMPARATOR); Iterable> fullIterable = - Iterables.filter( - Iterables.transform(includingAdds, TimestampedValueWithId::getValue), - tv -> !pendingDeletes.contains(tv.getTimestamp())); + Iterables.transform(includingAdds, TimestampedValueWithId::getValue); + // TODO(reuvenlax): If we have a known bounded amount of data, cache known ranges. return fullIterable; } catch (InterruptedException | ExecutionException | IOException e) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java index 4c15da319b12..9f2d5eee8f87 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java @@ -2169,6 +2169,71 @@ public void testOrderedListMergePendingAdds() { assertArrayEquals(expected, read); } + @Test + public void testOrderedListMergePendingAddsAndDeletes() { + SettableFuture, RangeSet>> orderedListFuture = SettableFuture.create(); + orderedListFuture.set(null); + SettableFuture, RangeSet>> deletionsFuture = + SettableFuture.create(); + deletionsFuture.set(null); + when(mockReader.valueFuture( + systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR), + STATE_FAMILY, + IdTracker.IDS_AVAILABLE_CODER)) + .thenReturn(orderedListFuture); + when(mockReader.valueFuture( + systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR), + STATE_FAMILY, + IdTracker.SUBRANGE_DELETIONS_CODER)) + .thenReturn(deletionsFuture); + + SettableFuture>> fromStorage = SettableFuture.create(); + when(mockReader.orderedListFuture( + FULL_ORDERED_LIST_RANGE, + key(NAMESPACE, "orderedList"), + STATE_FAMILY, + StringUtf8Coder.of())) + .thenReturn(fromStorage); + + StateTag> addr = + StateTags.orderedList("orderedList", StringUtf8Coder.of()); + OrderedListState orderedListState = underTest.state(NAMESPACE, addr); + + orderedListState.add(TimestampedValue.of("second", Instant.ofEpochMilli(1))); + orderedListState.add(TimestampedValue.of("third", Instant.ofEpochMilli(2))); + orderedListState.add(TimestampedValue.of("fourth", Instant.ofEpochMilli(2))); + orderedListState.add(TimestampedValue.of("eighth", Instant.ofEpochMilli(10))); + orderedListState.add(TimestampedValue.of("ninth", Instant.ofEpochMilli(15))); + + orderedListState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(5)); + orderedListState.add(TimestampedValue.of("fourth", Instant.ofEpochMilli(4))); + + fromStorage.set( + ImmutableList.of( + TimestampedValue.of("first", Instant.ofEpochMilli(-1)), + TimestampedValue.of("fifth", Instant.ofEpochMilli(5)), + TimestampedValue.of("sixth", Instant.ofEpochMilli(5)), + TimestampedValue.of("seventh", Instant.ofEpochMilli(5)), + TimestampedValue.of("tenth", Instant.ofEpochMilli(20)))); + + TimestampedValue[] expected = + Iterables.toArray( + ImmutableList.of( + TimestampedValue.of("first", Instant.ofEpochMilli(-1)), + TimestampedValue.of("second", Instant.ofEpochMilli(1)), + TimestampedValue.of("fourth", Instant.ofEpochMilli(4)), + TimestampedValue.of("fifth", Instant.ofEpochMilli(5)), + TimestampedValue.of("sixth", Instant.ofEpochMilli(5)), + TimestampedValue.of("seventh", Instant.ofEpochMilli(5)), + TimestampedValue.of("eighth", Instant.ofEpochMilli(10)), + TimestampedValue.of("ninth", Instant.ofEpochMilli(15)), + TimestampedValue.of("tenth", Instant.ofEpochMilli(20))), + TimestampedValue.class); + + TimestampedValue[] read = Iterables.toArray(orderedListState.read(), TimestampedValue.class); + assertArrayEquals(expected, read); + } + @Test public void testOrderedListPersistEmpty() throws Exception { StateTag> addr =