Skip to content

Commit

Permalink
Merge pull request #28171: Properly handle in-flight deletes followed…
Browse files Browse the repository at this point in the history
… by adds in OrderedListState
  • Loading branch information
reuvenlax authored Sep 11, 2023
2 parents f86d9e2 + f2f5bf6 commit 5e4b9bb
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,9 @@ public Iterable<TimestampedValue<T>> readRange(
// that the ids don't overlap with any in pendingAdds, so begin with pendingAdds.size().
Iterable<TimestampedValueWithId<T>> data =
new Iterable<TimestampedValueWithId<T>>() {
private Iterable<TimestampedValue<T>> iterable = future.get();
// Anything returned from windmill that has been deleted should be ignored.
private Iterable<TimestampedValue<T>> iterable =
Iterables.filter(future.get(), tv -> !pendingDeletes.contains(tv.getTimestamp()));

@Override
public Iterator<TimestampedValueWithId<T>> iterator() {
Expand All @@ -970,9 +972,8 @@ public TimestampedValueWithId<T> next() {
Iterables.mergeSorted(
ImmutableList.of(data, pendingInRange), TimestampedValueWithId.COMPARATOR);
Iterable<TimestampedValue<T>> fullIterable =
Iterables.filter(
Iterables.transform(includingAdds, TimestampedValueWithId::getValue),
tv -> !pendingDeletes.contains(tv.getTimestamp()));
Iterables.transform(includingAdds, TimestampedValueWithId::getValue);

// TODO(reuvenlax): If we have a known bounded amount of data, cache known ranges.
return fullIterable;
} catch (InterruptedException | ExecutionException | IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,71 @@ public void testOrderedListMergePendingAdds() {
assertArrayEquals(expected, read);
}

@Test
public void testOrderedListMergePendingAddsAndDeletes() {
SettableFuture<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = SettableFuture.create();
orderedListFuture.set(null);
SettableFuture<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture =
SettableFuture.create();
deletionsFuture.set(null);
when(mockReader.valueFuture(
systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR),
STATE_FAMILY,
IdTracker.IDS_AVAILABLE_CODER))
.thenReturn(orderedListFuture);
when(mockReader.valueFuture(
systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR),
STATE_FAMILY,
IdTracker.SUBRANGE_DELETIONS_CODER))
.thenReturn(deletionsFuture);

SettableFuture<Iterable<TimestampedValue<String>>> fromStorage = SettableFuture.create();
when(mockReader.orderedListFuture(
FULL_ORDERED_LIST_RANGE,
key(NAMESPACE, "orderedList"),
STATE_FAMILY,
StringUtf8Coder.of()))
.thenReturn(fromStorage);

StateTag<OrderedListState<String>> addr =
StateTags.orderedList("orderedList", StringUtf8Coder.of());
OrderedListState<String> orderedListState = underTest.state(NAMESPACE, addr);

orderedListState.add(TimestampedValue.of("second", Instant.ofEpochMilli(1)));
orderedListState.add(TimestampedValue.of("third", Instant.ofEpochMilli(2)));
orderedListState.add(TimestampedValue.of("fourth", Instant.ofEpochMilli(2)));
orderedListState.add(TimestampedValue.of("eighth", Instant.ofEpochMilli(10)));
orderedListState.add(TimestampedValue.of("ninth", Instant.ofEpochMilli(15)));

orderedListState.clearRange(Instant.ofEpochMilli(2), Instant.ofEpochMilli(5));
orderedListState.add(TimestampedValue.of("fourth", Instant.ofEpochMilli(4)));

fromStorage.set(
ImmutableList.of(
TimestampedValue.of("first", Instant.ofEpochMilli(-1)),
TimestampedValue.of("fifth", Instant.ofEpochMilli(5)),
TimestampedValue.of("sixth", Instant.ofEpochMilli(5)),
TimestampedValue.of("seventh", Instant.ofEpochMilli(5)),
TimestampedValue.of("tenth", Instant.ofEpochMilli(20))));

TimestampedValue[] expected =
Iterables.toArray(
ImmutableList.of(
TimestampedValue.of("first", Instant.ofEpochMilli(-1)),
TimestampedValue.of("second", Instant.ofEpochMilli(1)),
TimestampedValue.of("fourth", Instant.ofEpochMilli(4)),
TimestampedValue.of("fifth", Instant.ofEpochMilli(5)),
TimestampedValue.of("sixth", Instant.ofEpochMilli(5)),
TimestampedValue.of("seventh", Instant.ofEpochMilli(5)),
TimestampedValue.of("eighth", Instant.ofEpochMilli(10)),
TimestampedValue.of("ninth", Instant.ofEpochMilli(15)),
TimestampedValue.of("tenth", Instant.ofEpochMilli(20))),
TimestampedValue.class);

TimestampedValue[] read = Iterables.toArray(orderedListState.read(), TimestampedValue.class);
assertArrayEquals(expected, read);
}

@Test
public void testOrderedListPersistEmpty() throws Exception {
StateTag<OrderedListState<String>> addr =
Expand Down

0 comments on commit 5e4b9bb

Please sign in to comment.