Skip to content

Commit

Permalink
Add high watermark to slashing database (#896)
Browse files Browse the repository at this point in the history
* New metadata columns: high_watermark_epoch and high_watermark_slot
* CRUD operations in MetadataDao
  - find
  - update (assumes GVR metadata is already inserted)
  - delete
* Add PL/pgSQL trigger as constraints for checking high_watermarks are greater than low_watermarks
* Add PL/pgSQL trigger as constraints for checking low_watermarks are less than or equal to high_watermarks
  • Loading branch information
siladu authored Sep 8, 2023
1 parent be75f36 commit 79d8053
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static db.DatabaseUtil.USERNAME;
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.web3signer.slashingprotection.dao.HighWatermark;
import tech.pegasys.web3signer.slashingprotection.dao.MetadataDao;
import tech.pegasys.web3signer.slashingprotection.dao.SignedAttestation;
import tech.pegasys.web3signer.slashingprotection.dao.SignedBlock;
import tech.pegasys.web3signer.slashingprotection.dao.SigningWatermark;
Expand Down Expand Up @@ -118,6 +120,30 @@ void watermarkIsNotMovedLower() {
assertThat(getWatermark(1).getSlot()).isEqualTo(UInt64.valueOf(8));
}

@Test
void lowWatermarkCanMoveToEqualHighWatermark() {
// in the extreme case where we only keep 1 epoch, the low watermark may move to match the high
// watermark
final SlashingProtectionContext slashingProtectionContext =
SlashingProtectionContextFactory.create(
new TestSlashingProtectionParameters(databaseUrl, USERNAME, PASSWORD, 1, 1));
insertValidatorAndCreateSlashingData(
slashingProtectionContext.getRegisteredValidators(), 10, 10, 1);
MetadataDao metadataDao = new MetadataDao();
jdbi.useTransaction(
h -> {
lowWatermarkDao.updateSlotWatermarkFor(h, 1, UInt64.valueOf(8));
lowWatermarkDao.updateEpochWatermarksFor(h, 1, UInt64.valueOf(8), UInt64.valueOf(8));
metadataDao.updateHighWatermark(
h, new HighWatermark(UInt64.valueOf(9), UInt64.valueOf(9)));
});
slashingProtectionContext.getPruner().prune();

assertThat(fetchAttestations(1)).hasSize(1);
assertThat(fetchBlocks(1)).hasSize(1);
assertThat(getWatermark(1).getSlot()).isEqualTo(UInt64.valueOf(9));
}

@Test
void noPruningOccursWhenThereIsNoWatermark() {
final SlashingProtectionContext slashingProtectionContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.jdbi.v3.core.Handle;

public class DatabaseVersionDao {
public static final int EXPECTED_DATABASE_VERSION = 11;
public static final int EXPECTED_DATABASE_VERSION = 12;
public static final int VALIDATOR_ENABLE_FLAG_VERSION = 10;

public Integer findDatabaseVersion(final Handle handle) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.web3signer.slashingprotection.dao;

import java.util.Objects;

import org.apache.tuweni.units.bigints.UInt64;

public class HighWatermark {

private UInt64 slot;
private UInt64 epoch;

// needed for JDBI
public HighWatermark() {}

public HighWatermark(final UInt64 slot, final UInt64 epoch) {
this.slot = slot;
this.epoch = epoch;
}

public UInt64 getSlot() {
return slot;
}

public UInt64 getEpoch() {
return epoch;
}

public void setSlot(final UInt64 slot) {
this.slot = slot;
}

public void setEpoch(final UInt64 epoch) {
this.epoch = epoch;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HighWatermark that = (HighWatermark) o;
return Objects.equals(slot, that.slot) && Objects.equals(epoch, that.epoch);
}

@Override
public int hashCode() {
return Objects.hash(slot, epoch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,32 @@ public void insertGenesisValidatorsRoot(
.bind(1, genesisValidatorsRoot)
.execute();
}

public Optional<HighWatermark> findHighWatermark(Handle handle) {
return handle
.createQuery(
"SELECT high_watermark_epoch as epoch, high_watermark_slot as slot FROM metadata WHERE id = ?")
.bind(0, METADATA_ROW_ID)
.mapToBean(HighWatermark.class)
.filter(h -> h.getEpoch() != null && h.getSlot() != null)
.findFirst();
}

public int updateHighWatermark(final Handle handle, final HighWatermark highWatermark) {
return handle
.createUpdate(
"UPDATE metadata set high_watermark_epoch=:epoch, high_watermark_slot=:slot WHERE id =:id")
.bind("id", METADATA_ROW_ID)
.bind("epoch", highWatermark.getEpoch())
.bind("slot", highWatermark.getSlot())
.execute();
}

public void deleteHighWatermark(final Handle handle) {
handle
.createUpdate(
"UPDATE metadata set high_watermark_epoch=NULL, high_watermark_slot=NULL WHERE id =:id")
.bind("id", METADATA_ROW_ID)
.execute();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
ALTER TABLE metadata
ADD COLUMN high_watermark_epoch NUMERIC(20),
ADD COLUMN high_watermark_slot NUMERIC(20);

-- inserted high watermark should be above low watermark

CREATE OR REPLACE FUNCTION check_high_watermarks() RETURNS TRIGGER AS $$
DECLARE
max_slot NUMERIC(20);
max_epoch NUMERIC(20);
BEGIN
SELECT MAX(slot) INTO max_slot FROM low_watermarks;
SELECT GREATEST(MAX(target_epoch), MAX(source_epoch)) INTO max_epoch FROM low_watermarks;

IF NEW.high_watermark_slot <= max_slot THEN
RAISE EXCEPTION 'Insert/Update violates constraint: high_watermark_slot must be greater than max slot in low_watermarks table';
END IF;

IF NEW.high_watermark_epoch <= max_epoch THEN
RAISE EXCEPTION 'Insert/Update violates constraint: high_watermark_epoch must be greater than max epoch in low_watermarks table';
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER check_before_insert_or_update_high_watermarks
BEFORE INSERT OR UPDATE ON metadata
FOR EACH ROW EXECUTE PROCEDURE check_high_watermarks();


-- inserted low watermark should be below or the same as high watermark

CREATE OR REPLACE FUNCTION check_low_watermarks() RETURNS TRIGGER AS $$
DECLARE
high_slot NUMERIC(20);
high_epoch NUMERIC(20);
BEGIN
SELECT MIN(high_watermark_slot) INTO high_slot FROM metadata;
SELECT MIN(high_watermark_epoch) INTO high_epoch FROM metadata;

IF NEW.slot > high_slot THEN
RAISE EXCEPTION 'Insert/Update violates constraint: low_watermark slot must be less than or equal to high_watermark_slot in the metadata table';
END IF;

IF NEW.source_epoch > high_epoch THEN
RAISE EXCEPTION 'Insert/Update violates constraint: low_watermark source epoch must be less than or equal to high_watermark_epoch in the metadata table';
END IF;

IF NEW.target_epoch > high_epoch THEN
RAISE EXCEPTION 'Insert/Update violates constraint: low_watermark target epoch must be less than or equal to high_watermark_epoch in the metadata table';
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER check_before_insert_or_update_low_watermarks
BEFORE INSERT OR UPDATE ON low_watermarks
FOR EACH ROW EXECUTE PROCEDURE check_low_watermarks();


UPDATE database_version SET version = 12 WHERE id = 1;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import db.DatabaseSetupExtension;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt64;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
Expand Down Expand Up @@ -200,7 +201,99 @@ public void canUpdateAttestationWatermarkAfterBlockWatermark(final Handle handle
assertThat(watermark.get().getTargetEpoch()).isEqualTo(UInt64.valueOf(5));
}

@Test
public void canCreateLowWatermarkSlotIfLessThanOrEqualToHighWatermarkSlot(final Handle handle) {
insertValidator(handle, Bytes.of(100), 1);
UInt64 slot = UInt64.valueOf(2);
updateHighWatermark(handle, UInt64.MAX_VALUE, slot);

assertThat(lowWatermarkDao.findLowWatermarkForValidator(handle, 1)).isEmpty();

lowWatermarkDao.updateSlotWatermarkFor(handle, 1, slot);

Optional<SigningWatermark> watermark = lowWatermarkDao.findLowWatermarkForValidator(handle, 1);
assertThat(watermark).isNotEmpty();
assertThat(watermark.get().getSlot()).isEqualTo(UInt64.valueOf(2));
}

@Test
public void canCreateLowWatermarkSourceEpochIfLessThanOrEqualToHighWatermarkEpoch(
final Handle handle) {
insertValidator(handle, Bytes.of(100), 1);
UInt64 sourceEpoch = UInt64.valueOf(2);
UInt64 targetEpoch = UInt64.valueOf(2);
updateHighWatermark(handle, sourceEpoch, UInt64.MAX_VALUE);

lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch);

Optional<SigningWatermark> watermark = lowWatermarkDao.findLowWatermarkForValidator(handle, 1);
assertThat(watermark.get().getSourceEpoch()).isEqualTo(sourceEpoch);
}

@Test
public void canCreateLowWatermarkTargetEpochIfLessThanOrEqualToHighWatermarkEpoch(
final Handle handle) {
insertValidator(handle, Bytes.of(100), 1);
UInt64 sourceEpoch = UInt64.valueOf(2);
UInt64 targetEpoch = UInt64.valueOf(3);
updateHighWatermark(handle, targetEpoch, UInt64.MAX_VALUE);

lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch);

Optional<SigningWatermark> watermark = lowWatermarkDao.findLowWatermarkForValidator(handle, 1);
assertThat(watermark.get().getTargetEpoch()).isEqualTo(targetEpoch);
}

@Test
public void cannotCreateLowWatermarkSlotIfGreaterThanHighWatermarkSlot(final Handle handle) {
insertValidator(handle, Bytes.of(100), 1);
UInt64 slot = UInt64.valueOf(3);

updateHighWatermark(handle, UInt64.MAX_VALUE, slot.subtract(1L));

assertThatThrownBy(() -> lowWatermarkDao.updateSlotWatermarkFor(handle, 1, slot))
.hasMessageContaining(
"low_watermark slot must be less than or equal to high_watermark_slot in the metadata table");
}

@Test
public void cannotCreateLowWatermarkSourceEpochIfGreaterThanHighWatermarkEpoch(
final Handle handle) {
insertValidator(handle, Bytes.of(100), 1);
UInt64 sourceEpoch = UInt64.valueOf(3);
UInt64 targetEpoch = UInt64.valueOf(3);
updateHighWatermark(handle, sourceEpoch.subtract(1L), UInt64.MAX_VALUE);

assertThatThrownBy(
() -> lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch))
.hasMessageContaining(
"low_watermark source epoch must be less than or equal to high_watermark_epoch in the metadata table");
}

@Test
public void cannotCreateLowWatermarkTargetEpochIfGreaterThanHighWatermarkEpoch(
final Handle handle) {
insertValidator(handle, Bytes.of(100), 1);
UInt64 sourceEpoch = UInt64.valueOf(2);
UInt64 targetEpoch = UInt64.valueOf(3);
updateHighWatermark(handle, targetEpoch.subtract(1L), UInt64.MAX_VALUE);

assertThatThrownBy(
() -> lowWatermarkDao.updateEpochWatermarksFor(handle, 1, sourceEpoch, targetEpoch))
.hasMessageContaining(
"low_watermark target epoch must be less than or equal to high_watermark_epoch in the metadata table");
}

private void insertValidator(final Handle handle, final Bytes publicKey, final int validatorId) {
handle.execute("INSERT INTO validators (id, public_key) VALUES (?, ?)", validatorId, publicKey);
}

private void updateHighWatermark(final Handle handle, final UInt64 epoch, final UInt64 slot) {
handle.execute(
"INSERT INTO metadata (id, genesis_validators_root, high_watermark_epoch, high_watermark_slot) VALUES (?, ?, ?, ?)",
1,
Bytes32.leftPad(Bytes.of(3)),
epoch,
slot);
}
}
Loading

0 comments on commit 79d8053

Please sign in to comment.