Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: ignore writes below new block transaction on recovery #864

Merged
merged 2 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,10 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
"snapshot_tx", snapshotTx,
)
}
if err := wal.Truncate(snapshotTx); err != nil {
// snapshotTx can correspond to a write at that txn that is contained in
// the snapshot. We want the first entry of the WAL to be the subsequent
// txn to not replay duplicate writes.
if err := wal.Truncate(snapshotTx + 1); err != nil {
level.Info(db.logger).Log(
"msg", "failed to truncate WAL after loading snapshot",
"err", err,
Expand All @@ -732,18 +735,18 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {

// persistedTables is a map from a table name to the last transaction
// persisted.
persistedTables := map[string]uint64{}
persistedTables := make(map[string]uint64)
var lastTx uint64

start := time.Now()
if err := wal.Replay(snapshotTx, func(tx uint64, record *walpb.Record) error {
if err := wal.Replay(snapshotTx+1, func(_ uint64, record *walpb.Record) error {
if err := ctx.Err(); err != nil {
return err
}
switch e := record.Entry.EntryType.(type) {
case *walpb.Entry_TableBlockPersisted_:
persistedTables[e.TableBlockPersisted.TableName] = tx
if tx > snapshotTx {
persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx
if e.TableBlockPersisted.NextTx > snapshotTx {
// The loaded snapshot has data in a table that has been
// persisted. Delete all data in this table, since it has
// already been persisted.
Expand Down Expand Up @@ -776,7 +779,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
// WAL (i.e. entries that occupy space on disk but are useless).
performSnapshot := false

if err := wal.Replay(snapshotTx, func(tx uint64, record *walpb.Record) error {
if err := wal.Replay(snapshotTx+1, func(tx uint64, record *walpb.Record) error {
if err := ctx.Err(); err != nil {
return err
}
Expand All @@ -799,9 +802,10 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
return err
}

if lastPersistedTx, ok := persistedTables[entry.TableName]; ok && tx < lastPersistedTx {
if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn {
// This block has already been successfully persisted, so we can
// skip it.
// skip it. Note that if this new table block is the active
// block after persistence tx == nextNonPersistedTxn.
return nil
}

Expand Down Expand Up @@ -853,7 +857,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
"tx", tx,
)
table.pendingBlocks[table.active] = struct{}{}
go table.writeBlock(table.active, db.columnStore.manualBlockRotation, false)
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)

protoEqual := false
switch schema.(type) {
Expand Down Expand Up @@ -1008,12 +1012,13 @@ func (db *DB) Close(options ...CloseOption) error {
table.close()
if shouldPersist {
// Write the blocks but no snapshots since they are long-running
// jobs.
// jobs. Use db.tx.Load as the block's max txn since the table was
// closed above, so no writes are in flight at this stage.
// TODO(asubiotto): Maybe we should snapshot in any case since it
// should be faster to write to local disk than upload to object
// storage. This would avoid a slow WAL replay on startup if we
// don't manage to persist in time.
table.writeBlock(table.ActiveBlock(), false, false)
table.writeBlock(table.ActiveBlock(), db.tx.Load(), false, false)
}
}
level.Info(db.logger).Log("msg", "closed all tables")
Expand Down Expand Up @@ -1073,7 +1078,11 @@ func (db *DB) reclaimDiskSpace(ctx context.Context, wal WAL) error {
if wal == nil {
wal = db.wal
}
return wal.Truncate(validSnapshotTxn)
// Snapshots are taken with a read txn and are inclusive, so therefore
// include a potential write at validSnapshotTxn. We don't want this to be
// the first entry in the WAL after truncation, given it is already
// contained in the snapshot, so Truncate at validSnapshotTxn + 1.
return wal.Truncate(validSnapshotTxn + 1)
}

func (db *DB) getMinTXPersisted() uint64 {
Expand Down
115 changes: 115 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1450,6 +1451,120 @@ func TestDBRecover(t *testing.T) {
"expected at most one block persist entry; the others should have been snapshot and truncated",
)
})

// This test is a regression test to verify that writes completed during
// a block rotation are not lost on recovery.
t.Run("RotationDoesntDropWrites", func(t *testing.T) {
dir := setup(t, false)
c, err := New(
WithLogger(newTestLogger(t)),
WithStoragePath(dir),
WithWAL(),
)
require.NoError(t, err)

db, err := c.DB(ctx, dbAndTableName)
require.NoError(t, err)
table, err := db.GetTable(dbAndTableName)
require.NoError(t, err)

// Simulate starting a write against the active block, this will block
// persistence until this write is finished.
block, finish, err := table.ActiveWriteBlock()
require.NoError(t, err)

// Rotate the block to create a new active block.
require.NoError(t, table.RotateBlock(ctx, block, false))

// Issue writes.
const nWrites = 5
expectedTimestamps := make(map[int64]struct{}, nWrites)
for i := 0; i < nWrites; i++ {
samples := dynparquet.NewTestSamples()
timestamp := rand.Int63()
for j := range samples {
samples[j].Timestamp = timestamp
}
r, err := samples.ToRecord()
require.NoError(t, err)
_, err = table.InsertRecord(ctx, r)
require.NoError(t, err)
expectedTimestamps[timestamp] = struct{}{}
}

// Finalize the block persistence and close the DB.
finish()

require.NoError(t, c.Close())
c, err = New(
WithLogger(newTestLogger(t)),
WithStoragePath(dir),
WithWAL(),
)
require.NoError(t, err)
defer c.Close()

db, err = c.DB(ctx, dbAndTableName)
require.NoError(t, err)

require.NoError(
t,
query.NewEngine(
memory.DefaultAllocator, db.TableProvider(),
).ScanTable(dbAndTableName).Execute(ctx, func(ctx context.Context, r arrow.Record) error {
idxs := r.Schema().FieldIndices("timestamp")
require.Len(t, idxs, 1)
tCol := r.Column(idxs[0]).(*array.Int64)
for i := 0; i < tCol.Len(); i++ {
delete(expectedTimestamps, tCol.Value(i))
}
return nil
}),
)
require.Len(t, expectedTimestamps, 0, "expected to see all timestamps on recovery, but could not find %v", expectedTimestamps)
})

// This is a regression test for a bug found by DST that causes duplicate
// writes on recovery due to an off-by-one error in WAL truncation after
// a snapshot (WAL includes a write that is also in the snapshot).
t.Run("NoDuplicateWrites", func(t *testing.T) {
dir := setup(t, false)
c, err := New(
WithLogger(newTestLogger(t)),
WithStoragePath(dir),
WithWAL(),
)
require.NoError(t, err)
defer c.Close()

db, err := c.DB(ctx, dbAndTableName)
require.NoError(t, err)

// This is deduced based on the fact that `setup` inserts NewTestSamples
// numInserts times.
expectedRowsPerTimestamp := len(dynparquet.NewTestSamples())

timestamps := make(map[int64]int, numInserts)
require.NoError(
t,
query.NewEngine(
memory.DefaultAllocator,
db.TableProvider(),
).ScanTable(dbAndTableName).Execute(ctx, func(ctx context.Context, r arrow.Record) error {
idxs := r.Schema().FieldIndices("timestamp")
require.Len(t, idxs, 1)
tCol := r.Column(idxs[0]).(*array.Int64)
for i := 0; i < tCol.Len(); i++ {
timestamps[tCol.Value(i)]++
}
return nil
}),
)
require.Len(t, timestamps, numInserts, "expected %d timestamps, but got %d", numInserts, len(timestamps))
for ts, occurrences := range timestamps {
require.Equal(t, expectedRowsPerTimestamp, occurrences, "expected %d rows for timestamp %d, but got %d", expectedRowsPerTimestamp, ts, occurrences)
}
})
}

func Test_DB_WalReplayTableConfig(t *testing.T) {
Expand Down
53 changes: 33 additions & 20 deletions gen/proto/go/frostdb/wal/v1alpha1/wal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions gen/proto/go/frostdb/wal/v1alpha1/wal_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions proto/frostdb/wal/v1alpha1/wal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ message Entry {
string table_name = 1;
// Block ID of the new-table-block.
bytes block_id = 2;
// NextTx is the next non-persisted transaction at the time of block
// persistence. If the block has been persisted, any txn id < next_tx is
// considered persisted or not relevant to this table (i.e. it can be a
// non-persisted txn from another table).
uint64 next_tx = 3;
}

// The snapshot entry.
Expand Down
8 changes: 6 additions & 2 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (t *Table) dropPendingBlock(block *TableBlock) {
}
}

func (t *Table) writeBlock(block *TableBlock, skipPersist, snapshotDB bool) {
func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snapshotDB bool) {
level.Debug(t.logger).Log("msg", "syncing block")
block.pendingWritersWg.Wait()

Expand Down Expand Up @@ -532,6 +532,10 @@ func (t *Table) writeBlock(block *TableBlock, skipPersist, snapshotDB bool) {
TableBlockPersisted: &walpb.Entry_TableBlockPersisted{
TableName: t.name,
BlockId: buf,
// NOTE: nextTxn is used here instead of tx, since some
// writes could have happened between block rotation
// and the txn beginning above.
NextTx: nextTxn,
},
},
},
Expand Down Expand Up @@ -640,7 +644,7 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo
// We don't check t.db.columnStore.manualBlockRotation here because this is
// the entry point for users to trigger a manual block rotation and they
// will specify through skipPersist if they want the block to be persisted.
go t.writeBlock(block, skipPersist, true)
go t.writeBlock(block, tx, skipPersist, true)

return nil
}
Expand Down
Loading