Skip to content

Commit

Permalink
Calculate merged checkpoints on write, not read (#51)
Browse files Browse the repository at this point in the history
This makes the read path much cheaper, and fixes an issue (demonstrated in the tests) that previously seen checkpoint.N could previously go missing.
One functional change is that asking for checkpoint.N will now return precisely N signatures instead of at least N signatures.

The DB deployment is made a fraction more complicated by the introduction of another table, but its worthwhile doing this now while we have no production distributors.
  • Loading branch information
mhutchinson authored Nov 2, 2023
1 parent 7e6cd90 commit 4d716ea
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 87 deletions.
167 changes: 97 additions & 70 deletions cmd/internal/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,59 +79,26 @@ func (d *Distributor) GetCheckpointN(ctx context.Context, logID string, n uint32
if n == 0 || n > maxSigs {
return nil, status.Errorf(codes.InvalidArgument, "invalid N %d", n)
}
l, ok := d.ls[logID]
if !ok {
if _, ok := d.ls[logID]; !ok {
return nil, status.Errorf(codes.InvalidArgument, "unknown log ID %q", logID)
}

tx, err := d.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %v", err)
}
rows, err := tx.QueryContext(ctx, "SELECT treeSize, witID, chkpt FROM chkpts WHERE logID = ? ORDER BY treeSize DESC", logID)
if err != nil {
return nil, fmt.Errorf("query failed: %v", err)
row := tx.QueryRowContext(ctx, "SELECT chkpt FROM merged_checkpoints WHERE logID = ? AND sigCount = ?", logID, n)
if row.Err() != nil {
return nil, fmt.Errorf("QueryRowContext(): %v", err)
}
defer func() {
if err := rows.Close(); err != nil {
glog.Errorf("rows.Close(): %v", err)
}
}()
var currentSize uint64
var witsAtSize []note.Verifier
var cpsAtSize [][]byte
var size uint64
var witID string
var cp []byte
// Iterate over each row, building up cpsAtSize and witsAtSize until currentSize changes
for rows.Next() {
if err := rows.Scan(&size, &witID, &cp); err != nil {
return nil, fmt.Errorf("failed to scan rows: %v", err)
}
if size != currentSize {
if len(cpsAtSize) >= int(n) {
// We have found a sufficient checkpoint, so stop looking
break
}
cpsAtSize = make([][]byte, 0)
witsAtSize = make([]note.Verifier, 0)
currentSize = size
}
witsAtSize = append(witsAtSize, d.ws[witID])
cpsAtSize = append(cpsAtSize, cp)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to scan rows: %v", err)
}
if len(cpsAtSize) >= int(n) {
cp, err := checkpoints.Combine(cpsAtSize, l.Verifier, note.VerifierList(witsAtSize...))
if err != nil {
// TODO(mhutchinson): Keep trying to find some checkpoints that can be merged
// but remember to double check we have enough sigs before returning.
return nil, fmt.Errorf("failed to combine sigs: %v", err)
if err := row.Scan(&cp); err != nil {
if err == sql.ErrNoRows {
return nil, status.Errorf(codes.NotFound, "no checkpoint with %d signatures found", n)
}
return cp, nil
return nil, fmt.Errorf("Scan(): %v", err)
}
return nil, status.Errorf(codes.NotFound, "no checkpoint with %d signatures found", n)
return cp, nil
}

// GetCheckpointWitness gets the largest checkpoint for the log that was witnessed by the given witness.
Expand Down Expand Up @@ -172,43 +139,94 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
}
oldBs, err := getLatestCheckpoint(ctx, tx, logID, witID)
if err != nil {
if status.Code(err) == codes.NotFound {
// If this is the first checkpoint for this witness then just save and exit
_, err := tx.ExecContext(ctx, `INSERT INTO chkpts (logID, witID, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, witID, newCP.Size, nextRaw)
if err != nil {
return fmt.Errorf("Exec(): %v", err)
}
if err := tx.Commit(); err != nil {
return err
if status.Code(err) != codes.NotFound {
return fmt.Errorf("failed to query for latest checkpoint: %v", err)
}
}
if oldBs != nil {
// To replace a previous checkpoint from the same witness, check that the new one is fresher
oldCP, _, _, err := log.ParseCheckpoint(oldBs, l.Origin, l.Verifier, wv)
if err != nil {
// This really shouldn't ever happen unless the DB is corrupted or the config
// for the log or verifier has changed.
return fmt.Errorf("failed to parse checkpoint: %v", err)
}
if newCP.Size < oldCP.Size {
return fmt.Errorf("checkpoint for log %q and witness %q is for size %d, cannot update to size %d", logID, witID, oldCP.Size, newCP.Size)
}
if newCP.Size == oldCP.Size {
if !bytes.Equal(newCP.Hash, oldCP.Hash) {
reportInconsistency(oldBs, nextRaw)
return fmt.Errorf("old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash)
}
// Nothing to do; checkpoint is equivalent to the old one so avoid DB writes.
return nil
}
return err
}

// We have the previous checkpoint, now check that the new one is fresher
// At this point we know that we have a valid checkpoint that is fresher than any previous version for
// this witness. We should now store this, and then attempt to merge with other checkpoints for the same
// log size to create the checkpoint.N files.

if _, err := tx.ExecContext(ctx, `REPLACE INTO checkpoints_by_witness (logID, witID, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, witID, newCP.Size, nextRaw); err != nil {
return fmt.Errorf("ExecContext(): %v", err)
}

oldCP, _, _, err := log.ParseCheckpoint(oldBs, l.Origin, l.Verifier, wv)
// Calculate new checkpoint.N given this new checkpoint.
rows, err := tx.QueryContext(ctx, "SELECT witID, chkpt FROM checkpoints_by_witness WHERE logID = ? AND treeSize = ? ORDER BY witID ASC", logID, newCP.Size)
if err != nil {
// This really shouldn't ever happen unless the DB is corrupted or the config
// for the log or verifier has changed.
return fmt.Errorf("failed to parse checkpoint: %v", err)
return fmt.Errorf("QueryContext(): %v", err)
}
defer func() {
if err := rows.Close(); err != nil {
glog.Errorf("rows.Close(): %v", err)
}
}()

var witnesses []note.Verifier
var allCheckpoints [][]byte
for rows.Next() {
var witID string
var cp []byte
if err := rows.Scan(&witID, &cp); err != nil {
return fmt.Errorf("failed to scan rows: %v", err)
}
allCheckpoints = append(allCheckpoints, cp)
witnesses = append(witnesses, d.ws[witID])
}
if newCP.Size < oldCP.Size {
return fmt.Errorf("checkpoint for log %q and witness %q is for size %d, cannot update to size %d", logID, witID, oldCP.Size, newCP.Size)

if err := rows.Err(); err != nil {
return fmt.Errorf("rows.Err(): %v", err)
}
if newCP.Size == oldCP.Size {
if !bytes.Equal(newCP.Hash, oldCP.Hash) {
reportInconsistency(oldBs, nextRaw)
return fmt.Errorf("old checkpoint for tree size %d had hash %x but new one has %x", newCP.Size, oldCP.Hash, newCP.Hash)

sigCount := len(witnesses)
row := tx.QueryRowContext(ctx, "SELECT treeSize FROM merged_checkpoints WHERE logID = ? AND sigCount = ?", logID, sigCount)
if row.Err() != nil {
return fmt.Errorf("QueryRowContext(): %v", err)
}
var lastTreeSize uint64
if err := row.Scan(&lastTreeSize); err != nil {
if err != sql.ErrNoRows {
return fmt.Errorf("Scan(): %v", err)
}
// Nothing to do; checkpoint is equivalent to the old one so avoid DB writes.
return nil
// If there are no rows then that's fine, we'll allow lastTreeSize to stay at 0
}
_, err = tx.ExecContext(ctx, `REPLACE INTO chkpts (logID, witID, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, witID, newCP.Size, nextRaw)
if err != nil {
return fmt.Errorf("Exec(): %v", err)
if newCP.Size > lastTreeSize {
// If the new checkpoint is for a tree larger than the current checkpoint.N for this log, then
// we have the option of creating a new checkpoint.N for the larger tree size.
mergedCP, err := checkpoints.Combine(allCheckpoints, l.Verifier, note.VerifierList(witnesses...))
if err != nil {
// This could happen because the log has variable info, such as a timestamp.
// Don't treat this as a critical error or the distributor can't accept the new checkpoint.
glog.Warning("Failed to combine %d checkpoints: %v", sigCount, err)
} else {
_, err = tx.ExecContext(ctx, `REPLACE INTO merged_checkpoints (logID, sigCount, treeSize, chkpt) VALUES (?, ?, ?, ?)`, logID, sigCount, newCP.Size, mergedCP)
if err != nil {
return fmt.Errorf("Failed to update checkpoints.%d: %v", sigCount, err)
}
}
}

if err := tx.Commit(); err != nil {
return err
}
Expand All @@ -219,7 +237,7 @@ func (d *Distributor) Distribute(ctx context.Context, logID, witID string, nextR
// any other method on this object. It is safe to call on subsequent runs of
// the application as it is idempotent.
func (d *Distributor) init() error {
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS chkpts (
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS checkpoints_by_witness (
logID VARCHAR(200),
witID VARCHAR(200),
treeSize INTEGER,
Expand All @@ -228,14 +246,23 @@ func (d *Distributor) init() error {
)`); err != nil {
return err
}
if _, err := d.db.Exec(`CREATE TABLE IF NOT EXISTS merged_checkpoints (
logID VARCHAR(200),
sigCount INTEGER,
treeSize INTEGER,
chkpt BLOB,
PRIMARY KEY (logID, sigCount)
)`); err != nil {
return err
}
return nil
}

// getLatestCheckpoint returns the latest checkpoint for the given log and witness pair.
// If no checkpoint is found then an error with status `codes.NotFound` will be returned,
// which allows callers to handle this case separately if needed.
func getLatestCheckpoint(ctx context.Context, tx *sql.Tx, logID, witID string) ([]byte, error) {
row := tx.QueryRowContext(ctx, "SELECT chkpt FROM chkpts WHERE logID = ? AND witID = ?", logID, witID)
row := tx.QueryRowContext(ctx, "SELECT chkpt FROM checkpoints_by_witness WHERE logID = ? AND witID = ?", logID, witID)
if err := row.Err(); err != nil {
return nil, err
}
Expand Down
20 changes: 3 additions & 17 deletions cmd/internal/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,17 +578,6 @@ func TestGetCheckpointN(t *testing.T) {
wantSize: 14,
wantWits: []note.Verifier{witBadger.verifier, witChameleon.verifier},
},
{
desc: "more sigs can be returned than needed",
distWit: witBadger,
distLog: logFoo,
distSize: 16,
reqLog: "FooLog",
reqN: 1,
wantErr: false,
wantSize: 16,
wantWits: []note.Verifier{witBadger.verifier, witAardvark.verifier},
},
{
desc: "error returned if not enough sigs",
distWit: witBadger,
Expand Down Expand Up @@ -725,12 +714,9 @@ func TestGetCheckpointNHistoric(t *testing.T) {
22,
},
},
reqN: 2,
wantErr: true,
wantErrCode: codes.NotFound,
// TODO(mhutchinson): this case should work with the following assertions
// wantErr: false,
// wantSize: 10,
reqN: 2,
wantErr: false,
wantSize: 10,
},
{
desc: "TODO: N=2 can get historic version where both have been seen but not at same time",
Expand Down

0 comments on commit 4d716ea

Please sign in to comment.