Skip to content

Commit

Permalink
Add PL/pgSQL triggers as constraints for checking high_watermarks are…
Browse files Browse the repository at this point in the history
… greater than low_watermarks
  • Loading branch information
siladu committed Sep 1, 2023
1 parent 69b62c6 commit 24f0dc6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,28 @@ ALTER TABLE metadata
ADD COLUMN high_watermark_epoch NUMERIC(20),
ADD COLUMN high_watermark_slot NUMERIC(20);

CREATE OR REPLACE FUNCTION check_high_watermarks() RETURNS TRIGGER AS $$
DECLARE
max_slot NUMERIC(20);
max_epoch NUMERIC(20);
BEGIN
SELECT MAX(slot) INTO max_slot FROM low_watermarks;
SELECT GREATEST(MAX(target_epoch), MAX(source_epoch)) INTO max_epoch FROM low_watermarks;

IF NEW.high_watermark_slot <= max_slot THEN
RAISE EXCEPTION 'Insert/Update violates constraint: high_watermark_slot must be greater than max slot in low_watermarks table';
END IF;

IF NEW.high_watermark_epoch <= max_epoch THEN
RAISE EXCEPTION 'Insert/Update violates constraint: high_watermark_epoch must be greater than max epoch in low_watermarks table';
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER check_before_insert_or_update
BEFORE INSERT OR UPDATE ON metadata
FOR EACH ROW EXECUTE PROCEDURE check_high_watermarks();

UPDATE database_version SET version = 12 WHERE id = 1;
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public void findsExistingGvrInDb(final Handle handle) {
assertThat(existingGvr).contains(Bytes32.leftPad(Bytes.of(3)));
}

@Test
public void findsExistingGvrAfterHighWatermarkIsSet(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
updateHighWatermark(handle, 1, 2);

final Optional<Bytes32> existingGvr = metadataDao.findGenesisValidatorsRoot(handle);
assertThat(existingGvr).isNotEmpty();
assertThat(existingGvr).contains(Bytes32.leftPad(Bytes.of(3)));
}

@Test
public void returnsEmptyForNonExistingGvrInDb(final Handle handle) {
assertThat(metadataDao.findGenesisValidatorsRoot(handle)).isEmpty();
Expand Down Expand Up @@ -133,6 +143,40 @@ public void updateHighWatermarkWhenNoGvrHasNoEffect(final Handle handle) {
assertThat(updateCount).isEqualTo(0);
}

@Test
public void updateHighWatermarkFailsWhenNotGreaterThanMaxLowWatermarkSlot(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
insertLowWatermarks(handle);
HighWatermark highWatermark = createHighWatermark(13, 999);
assertThatThrownBy(() -> metadataDao.updateHighWatermark(handle, highWatermark))
.hasMessageContaining(
"high_watermark_slot must be greater than max slot in low_watermarks table");
}

@Test
public void updateHighWatermarkFailsWhenNotGreaterThanMaxLowWatermarkTargetEpoch(
final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
insertLowWatermarks(handle);
HighWatermark highWatermark = createHighWatermark(999, 12);

assertThatThrownBy(() -> metadataDao.updateHighWatermark(handle, highWatermark))
.hasMessageContaining(
"high_watermark_epoch must be greater than max epoch in low_watermarks table");
}

@Test
public void updateHighWatermarkFailsWhenNotGreaterThanMaxLowWatermarkSourceEpoch(
final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
insertLowWatermarks(handle);
HighWatermark highWatermark = createHighWatermark(999, 11);

assertThatThrownBy(() -> metadataDao.updateHighWatermark(handle, highWatermark))
.hasMessageContaining(
"high_watermark_epoch must be greater than max epoch in low_watermarks table");
}

@Test
public void deletesHighWatermark(final Handle handle) {
insertGvr(handle, Bytes32.leftPad(Bytes.of(3)));
Expand All @@ -151,6 +195,23 @@ private void insertGvr(final Handle handle, final Bytes genesisValidatorsRoot) {
genesisValidatorsRoot);
}

private void insertLowWatermarks(Handle handle) {
handle.execute("INSERT INTO validators (public_key, enabled) VALUES (?, ?)", Bytes.of(1), true);
handle.execute("INSERT INTO validators (public_key, enabled) VALUES (?, ?)", Bytes.of(2), true);
handle.execute(
"INSERT INTO low_watermarks (validator_id, slot, target_epoch, source_epoch) VALUES (?, ?, ?, ?)",
1,
3,
2,
1);
handle.execute(
"INSERT INTO low_watermarks (validator_id, slot, target_epoch, source_epoch) VALUES (?, ?, ?, ?)",
2,
13,
12,
11);
}

private void updateHighWatermark(final Handle handle, final int epoch, final int slot) {
handle
.createUpdate("UPDATE metadata set high_watermark_epoch=:epoch, high_watermark_slot=:slot")
Expand All @@ -159,7 +220,7 @@ private void updateHighWatermark(final Handle handle, final int epoch, final int
.execute();
}

private HighWatermark createHighWatermark(final int epoch, final int slot) {
return new HighWatermark(UInt64.valueOf(epoch), UInt64.valueOf(slot));
private HighWatermark createHighWatermark(final int slot, final int epoch) {
return new HighWatermark(UInt64.valueOf(slot), UInt64.valueOf(epoch));
}
}

0 comments on commit 24f0dc6

Please sign in to comment.