Skip to content

Commit

Permalink
Handle null epoch OR null slot (both null already handled)
Browse files Browse the repository at this point in the history
  • Loading branch information
siladu committed Sep 11, 2023
1 parent f98bae7 commit 309a037
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Optional<HighWatermark> findHighWatermark(Handle handle) {
"SELECT high_watermark_epoch as epoch, high_watermark_slot as slot FROM metadata WHERE id = ?")
.bind(0, METADATA_ROW_ID)
.mapToBean(HighWatermark.class)
.filter(h -> h.getEpoch() != null && h.getSlot() != null)
.filter(h -> h.getEpoch() != null || h.getSlot() != null)
.findFirst();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,26 @@ public void findsExistingHighWatermark(final Handle handle) {
.contains(new HighWatermark(UInt64.valueOf(2), UInt64.valueOf(1)));
}

@Test
public void findsExistingHighWatermarkWithOnlyEpoch(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
updateHighWatermark(handle, 1, null);

final Optional<HighWatermark> existingHighWatermark = metadataDao.findHighWatermark(handle);

assertThat(existingHighWatermark).contains(new HighWatermark(null, UInt64.valueOf(1)));
}

@Test
public void findsExistingHighWatermarkWithOnlySlot(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
updateHighWatermark(handle, null, 2);

final Optional<HighWatermark> existingHighWatermark = metadataDao.findHighWatermark(handle);

assertThat(existingHighWatermark).contains(new HighWatermark(UInt64.valueOf(2), null));
}

@Test
public void returnsEmptyForNonExistingHighWatermark(final Handle handle) {
assertThat(metadataDao.findHighWatermark(handle)).isEmpty();
Expand All @@ -110,15 +130,27 @@ public void insertsHighWatermark(final Handle handle) {

int updateCount = metadataDao.updateHighWatermark(handle, highWatermark);

assertThat(updateCount).isEqualTo(1);
final List<HighWatermark> highWatermarks =
handle
.createQuery(
"SELECT high_watermark_epoch as epoch, high_watermark_slot as slot FROM metadata")
.mapToBean(HighWatermark.class)
.list();
assertThat(highWatermarks.size()).isEqualTo(1);
assertThat(highWatermarks.get(0)).isEqualTo(highWatermark);
assertHighWatermarkUpdatedSuccessfully(handle, updateCount, highWatermark);
}

@Test
public void insertsOnlyEpochHighWatermark(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
HighWatermark highWatermark = new HighWatermark(null, UInt64.valueOf(1));

int updateCount = metadataDao.updateHighWatermark(handle, highWatermark);

assertHighWatermarkUpdatedSuccessfully(handle, updateCount, highWatermark);
}

@Test
public void insertsOnlySlotHighWatermark(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
HighWatermark highWatermark = new HighWatermark(UInt64.valueOf(1), null);

int updateCount = metadataDao.updateHighWatermark(handle, highWatermark);

assertHighWatermarkUpdatedSuccessfully(handle, updateCount, highWatermark);
}

@Test
Expand All @@ -129,6 +161,11 @@ public void updatesHighWatermark(final Handle handle) {

int updateCount = metadataDao.updateHighWatermark(handle, highWatermark);

assertHighWatermarkUpdatedSuccessfully(handle, updateCount, highWatermark);
}

private void assertHighWatermarkUpdatedSuccessfully(
Handle handle, int updateCount, HighWatermark highWatermark) {
assertThat(updateCount).isEqualTo(1);
final List<HighWatermark> highWatermarks =
handle
Expand Down Expand Up @@ -220,7 +257,7 @@ private void insertLowWatermarks(Handle handle) {
MAX_LOW_WATERMARK_SOURCE_EPOCH);
}

private void updateHighWatermark(final Handle handle, final int epoch, final int slot) {
private void updateHighWatermark(final Handle handle, final Integer epoch, final Integer slot) {
handle
.createUpdate("UPDATE metadata set high_watermark_epoch=:epoch, high_watermark_slot=:slot")
.bind("epoch", epoch)
Expand Down

0 comments on commit 309a037

Please sign in to comment.