Skip to content

Commit

Permalink
Check WorkItemCommitRequest is buildable in WindmillStateInternalTest
Browse files Browse the repository at this point in the history
  • Loading branch information
baeminbo committed Oct 22, 2024
1 parent 3767eda commit 7e346ad
Showing 1 changed file with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleEntry;
Expand Down Expand Up @@ -305,6 +307,26 @@ private <K> K userKeyFromProtoKey(ByteString tag, Coder<K> keyCoder) throws IOEx
return keyCoder.decode(keyBytes.newInput(), Context.OUTER);
}

private static void assertBuildable(
Windmill.WorkItemCommitRequest.Builder commitWorkRequestBuilder) {
Windmill.WorkItemCommitRequest.Builder clone = commitWorkRequestBuilder.clone();
if (!clone.hasKey()) {
clone.setKey(ByteString.EMPTY); // key is required to build
}
if (!clone.hasWorkToken()) {
clone.setWorkToken(1357924680L); // workToken is required to build
}

try {
clone.build();
} catch (Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
fail(
"Failed to build commitRequest from: " + commitWorkRequestBuilder + "\n" + sw.toString());
}
}

@Test
public void testMapAddBeforeGet() throws Exception {
StateTag<MapState<String, Integer>> addr =
Expand Down Expand Up @@ -647,6 +669,8 @@ public void testMapAddPersist() throws Exception {
.map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of()))
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, 1), new SimpleEntry<>(tag2, 2)));

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -670,6 +694,8 @@ public void testMapRemovePersist() throws Exception {
.map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of()))
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, null), new SimpleEntry<>(tag2, null)));

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -695,6 +721,8 @@ public void testMapClearPersist() throws Exception {
assertEquals(
protoKeyFromUserKey(null, StringUtf8Coder.of()),
commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -736,6 +764,8 @@ public void testMapComplexPersist() throws Exception {
commitBuilder = Windmill.WorkItemCommitRequest.newBuilder();
assertEquals(0, commitBuilder.getTagValuePrefixDeletesCount());
assertEquals(0, commitBuilder.getValueUpdatesCount());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -953,6 +983,8 @@ public void testMultimapRemovePersistPut() {

multimapState.put(key, 5);
assertThat(multimapState.get(key).read(), Matchers.containsInAnyOrder(4, 5));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1766,6 +1798,8 @@ public void testMultimapPutAndPersist() {
builder,
new MultimapEntryUpdate(key1, Arrays.asList(1, 2), false),
new MultimapEntryUpdate(key2, Collections.singletonList(2), false));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1799,6 +1833,8 @@ public void testMultimapRemovePutAndPersist() {
builder,
new MultimapEntryUpdate(key1, Arrays.asList(1, 2), true),
new MultimapEntryUpdate(key2, Collections.singletonList(4), true));

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -1825,6 +1861,8 @@ public void testMultimapRemoveAndPersist() {
builder,
new MultimapEntryUpdate(key1, Collections.emptyList(), true),
new MultimapEntryUpdate(key2, Collections.emptyList(), true));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1856,6 +1894,8 @@ public void testMultimapPutRemoveClearAndPersist() {
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
assertEquals(0, builder.getUpdatesCount());
assertTrue(builder.getDeleteAll());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1894,6 +1934,8 @@ false, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
assertTagMultimapUpdates(
builder, new MultimapEntryUpdate(key1, Collections.singletonList(4), false));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1938,6 +1980,8 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(), Context.OUTER);
assertArrayEquals(key1, decodedKey);
assertTrue(entryUpdate.getDeleteAll());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2053,6 +2097,8 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2253,6 +2299,8 @@ public void testOrderedListAddPersist() throws Exception {
assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2284,6 +2332,8 @@ public void testOrderedListClearPersist() throws Exception {
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2331,6 +2381,8 @@ public void testOrderedListDeleteRangePersist() {
assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2539,6 +2591,8 @@ public void testOrderedListPersistEmpty() throws Exception {
assertEquals(1, updates.getDeletesCount());
assertEquals(WindmillOrderedList.MIN_TS_MICROS, updates.getDeletes(0).getRange().getStart());
assertEquals(WindmillOrderedList.MAX_TS_MICROS, updates.getDeletes(0).getRange().getLimit());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2653,6 +2707,8 @@ public void testBagAddPersist() throws Exception {
assertEquals("hello", bagUpdates.getValues(0).toStringUtf8());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -2678,6 +2734,8 @@ public void testBagClearPersist() throws Exception {
assertEquals("world", tagBag.getValues(0).toStringUtf8());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -2693,6 +2751,8 @@ public void testBagPersistEmpty() throws Exception {

// 1 bag update = the clear
assertEquals(1, commitBuilder.getBagUpdatesCount());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2806,6 +2866,8 @@ public void testCombiningAddPersist() throws Exception {
11, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]);

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2835,6 +2897,8 @@ public void testCombiningAddPersistWithCompact() throws Exception {
assertTrue(bagUpdates.getDeleteAll());
assertEquals(
111, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2862,6 +2926,8 @@ public void testCombiningClearPersist() throws Exception {
11, CoderUtils.decodeFromByteArray(accumCoder, tagBag.getValues(0).toByteArray())[0]);

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2990,6 +3056,8 @@ public void testWatermarkPersistEarliest() throws Exception {
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0));

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3016,6 +3084,8 @@ public void testWatermarkPersistLatestEmpty() throws Exception {

Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3042,6 +3112,8 @@ public void testWatermarkPersistLatestWindmillWins() throws Exception {

Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3068,6 +3140,8 @@ public void testWatermarkPersistLatestLocalAdditionsWin() throws Exception {

Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3091,6 +3165,8 @@ public void testWatermarkPersistEndOfWindow() throws Exception {

// Blind adds should not need to read the future.
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3116,6 +3192,8 @@ public void testWatermarkClearPersist() throws Exception {
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), clearAndUpdate.getTimestamps(0));

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3133,6 +3211,8 @@ public void testWatermarkPersistEmpty() throws Exception {

// 1 bag update corresponds to deletion. There shouldn't be a bag update adding items.
assertEquals(1, commitBuilder.getWatermarkHoldsCount());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -3200,6 +3280,8 @@ public void testValueSetPersist() throws Exception {
assertTrue(valueUpdate.isInitialized());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3220,6 +3302,8 @@ public void testValueClearPersist() throws Exception {
assertEquals(0, valueUpdate.getValue().getData().size());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3234,6 +3318,8 @@ public void testValueNoChangePersist() throws Exception {
assertEquals(0, commitBuilder.getValueUpdatesCount());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down

0 comments on commit 7e346ad

Please sign in to comment.