From 81bffcf3a18c1354d3c82b14b29763b97b0002fc Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Mon, 13 May 2024 13:14:28 +0200 Subject: [PATCH 1/2] db: ignore writes below new block transaction on recovery Previously, the recovery code was ignoring writes below the persist transaction of a table block. However, this could lead to dropped writes since the active block is swapped atomically on rotation *before* the old block is persisted. Writes in between these two events would be written to the new table block, but ignored on recovery given the recovery code assumed they were in the old table block. --- db.go | 20 ++--- db_test.go | 73 +++++++++++++++++++ gen/proto/go/frostdb/wal/v1alpha1/wal.pb.go | 53 +++++++++----- .../go/frostdb/wal/v1alpha1/wal_vtproto.pb.go | 27 +++++++ proto/frostdb/wal/v1alpha1/wal.proto | 5 ++ table.go | 8 +- 6 files changed, 155 insertions(+), 31 deletions(-) diff --git a/db.go b/db.go index bfb9e51ee..9df3e1a09 100644 --- a/db.go +++ b/db.go @@ -732,18 +732,18 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { // persistedTables is a map from a table name to the last transaction // persisted. - persistedTables := map[string]uint64{} + persistedTables := make(map[string]uint64) var lastTx uint64 start := time.Now() - if err := wal.Replay(snapshotTx, func(tx uint64, record *walpb.Record) error { + if err := wal.Replay(snapshotTx, func(_ uint64, record *walpb.Record) error { if err := ctx.Err(); err != nil { return err } switch e := record.Entry.EntryType.(type) { case *walpb.Entry_TableBlockPersisted_: - persistedTables[e.TableBlockPersisted.TableName] = tx - if tx > snapshotTx { + persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx + if e.TableBlockPersisted.NextTx > snapshotTx { // The loaded snapshot has data in a table that has been // persisted. Delete all data in this table, since it has // already been persisted. @@ -799,9 +799,10 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { return err } - if lastPersistedTx, ok := persistedTables[entry.TableName]; ok && tx < lastPersistedTx { + if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn { // This block has already been successfully persisted, so we can - // skip it. + // skip it. Note that if this new table block is the active + // block after persistence tx == nextNonPersistedTxn. return nil } @@ -853,7 +854,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { "tx", tx, ) table.pendingBlocks[table.active] = struct{}{} - go table.writeBlock(table.active, db.columnStore.manualBlockRotation, false) + go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false) protoEqual := false switch schema.(type) { @@ -1008,12 +1009,13 @@ func (db *DB) Close(options ...CloseOption) error { table.close() if shouldPersist { // Write the blocks but no snapshots since they are long-running - // jobs. + // jobs. Use db.tx.Load as the block's max txn since the table was + // closed above, so no writes are in flight at this stage. // TODO(asubiotto): Maybe we should snapshot in any case since it // should be faster to write to local disk than upload to object // storage. This would avoid a slow WAL replay on startup if we // don't manage to persist in time. - table.writeBlock(table.ActiveBlock(), false, false) + table.writeBlock(table.ActiveBlock(), db.tx.Load(), false, false) } } level.Info(db.logger).Log("msg", "closed all tables") diff --git a/db_test.go b/db_test.go index 9c13166ef..18f48f356 100644 --- a/db_test.go +++ b/db_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/fs" + "math/rand" "os" "path/filepath" "strings" @@ -1450,6 +1451,78 @@ func TestDBRecover(t *testing.T) { "expected at most one block persist entry; the others should have been snapshot and truncated", ) }) + + // This test is a regression test to verify that writes completed during + // a block rotation are not lost on recovery. + t.Run("RotationDoesntDropWrites", func(t *testing.T) { + dir := setup(t, false) + c, err := New( + WithLogger(newTestLogger(t)), + WithStoragePath(dir), + WithWAL(), + ) + require.NoError(t, err) + + db, err := c.DB(ctx, dbAndTableName) + require.NoError(t, err) + table, err := db.GetTable(dbAndTableName) + require.NoError(t, err) + + // Simulate starting a write against the active block, this will block + // persistence until this write is finished. + block, finish, err := table.ActiveWriteBlock() + require.NoError(t, err) + + // Rotate the block to create a new active block. + require.NoError(t, table.RotateBlock(ctx, block, false)) + + // Issue writes. + const nWrites = 5 + expectedTimestamps := make(map[int64]struct{}, nWrites) + for i := 0; i < nWrites; i++ { + samples := dynparquet.NewTestSamples() + timestamp := rand.Int63() + for j := range samples { + samples[j].Timestamp = timestamp + } + r, err := samples.ToRecord() + require.NoError(t, err) + _, err = table.InsertRecord(ctx, r) + require.NoError(t, err) + expectedTimestamps[timestamp] = struct{}{} + } + + // Finalize the block persistence and close the DB. + finish() + + require.NoError(t, c.Close()) + c, err = New( + WithLogger(newTestLogger(t)), + WithStoragePath(dir), + WithWAL(), + ) + require.NoError(t, err) + defer c.Close() + + db, err = c.DB(ctx, dbAndTableName) + require.NoError(t, err) + + require.NoError( + t, + query.NewEngine( + memory.DefaultAllocator, db.TableProvider(), + ).ScanTable(dbAndTableName).Execute(ctx, func(ctx context.Context, r arrow.Record) error { + idxs := r.Schema().FieldIndices("timestamp") + require.Len(t, idxs, 1) + tCol := r.Column(idxs[0]).(*array.Int64) + for i := 0; i < tCol.Len(); i++ { + delete(expectedTimestamps, tCol.Value(i)) + } + return nil + }), + ) + require.Len(t, expectedTimestamps, 0, "expected to see all timestamps on recovery, but could not find %v", expectedTimestamps) + }) } func Test_DB_WalReplayTableConfig(t *testing.T) { diff --git a/gen/proto/go/frostdb/wal/v1alpha1/wal.pb.go b/gen/proto/go/frostdb/wal/v1alpha1/wal.pb.go index 2472b2c31..5022cc53f 100644 --- a/gen/proto/go/frostdb/wal/v1alpha1/wal.pb.go +++ b/gen/proto/go/frostdb/wal/v1alpha1/wal.pb.go @@ -332,6 +332,11 @@ type Entry_TableBlockPersisted struct { TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"` // Block ID of the new-table-block. BlockId []byte `protobuf:"bytes,2,opt,name=block_id,json=blockId,proto3" json:"block_id,omitempty"` + // NextTx is the next non-persisted transaction at the time of block + // persistence. If the block has been persisted, any txn id < next_tx is + // considered persisted or not relevant to this table (i.e. it can be a + // non-persisted txn from another table). + NextTx uint64 `protobuf:"varint,3,opt,name=next_tx,json=nextTx,proto3" json:"next_tx,omitempty"` } func (x *Entry_TableBlockPersisted) Reset() { @@ -380,6 +385,13 @@ func (x *Entry_TableBlockPersisted) GetBlockId() []byte { return nil } +func (x *Entry_TableBlockPersisted) GetNextTx() uint64 { + if x != nil { + return x.NextTx + } + return 0 +} + // The snapshot entry. type Entry_Snapshot struct { state protoimpl.MessageState @@ -441,7 +453,7 @@ var file_frostdb_wal_v1alpha1_wal_proto_rawDesc = []byte{ 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x77, 0x61, 0x6c, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0xa4, + 0x79, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x22, 0xbd, 0x05, 0x0a, 0x05, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x39, 0x0a, 0x05, 0x77, 0x72, 0x69, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x77, 0x61, 0x6c, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x45, @@ -476,30 +488,31 @@ var file_frostdb_wal_v1alpha1_wal_proto_rawDesc = []byte{ 0x0b, 0x32, 0x23, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4a, 0x04, - 0x08, 0x03, 0x10, 0x04, 0x4a, 0x04, 0x08, 0x04, 0x10, 0x05, 0x1a, 0x4f, 0x0a, 0x13, 0x54, 0x61, + 0x08, 0x03, 0x10, 0x04, 0x4a, 0x04, 0x08, 0x04, 0x10, 0x05, 0x1a, 0x68, 0x0a, 0x13, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x50, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x64, 0x1a, 0x1a, 0x0a, 0x08, 0x53, - 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x78, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x02, 0x74, 0x78, 0x42, 0x0c, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x72, 0x79, - 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0xe5, 0x01, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x66, 0x72, - 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x77, 0x61, 0x6c, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, - 0x61, 0x31, 0x42, 0x08, 0x57, 0x61, 0x6c, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x4d, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6c, 0x61, 0x72, - 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x73, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2f, - 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x66, 0x72, 0x6f, - 0x73, 0x74, 0x64, 0x62, 0x2f, 0x77, 0x61, 0x6c, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, - 0x31, 0x3b, 0x77, 0x61, 0x6c, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xa2, 0x02, 0x03, - 0x46, 0x57, 0x58, 0xaa, 0x02, 0x14, 0x46, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x57, 0x61, - 0x6c, 0x2e, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xca, 0x02, 0x14, 0x46, 0x72, 0x6f, + 0x28, 0x0c, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x6e, + 0x65, 0x78, 0x74, 0x5f, 0x74, 0x78, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x6e, 0x65, + 0x78, 0x74, 0x54, 0x78, 0x1a, 0x1a, 0x0a, 0x08, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, + 0x12, 0x0e, 0x0a, 0x02, 0x74, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x02, 0x74, 0x78, + 0x42, 0x0c, 0x0a, 0x0a, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x42, 0xe5, + 0x01, 0x0a, 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x77, + 0x61, 0x6c, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x42, 0x08, 0x57, 0x61, 0x6c, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x4d, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6c, 0x61, 0x72, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x73, + 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x66, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2f, 0x77, 0x61, + 0x6c, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x77, 0x61, 0x6c, 0x76, 0x31, + 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xa2, 0x02, 0x03, 0x46, 0x57, 0x58, 0xaa, 0x02, 0x14, 0x46, + 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x2e, 0x57, 0x61, 0x6c, 0x2e, 0x56, 0x31, 0x61, 0x6c, 0x70, + 0x68, 0x61, 0x31, 0xca, 0x02, 0x14, 0x46, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x5c, 0x57, 0x61, + 0x6c, 0x5c, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0xe2, 0x02, 0x20, 0x46, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x5c, 0x57, 0x61, 0x6c, 0x5c, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, - 0x31, 0xe2, 0x02, 0x20, 0x46, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x5c, 0x57, 0x61, 0x6c, 0x5c, - 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, - 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x16, 0x46, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x3a, 0x3a, - 0x57, 0x61, 0x6c, 0x3a, 0x3a, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x16, + 0x46, 0x72, 0x6f, 0x73, 0x74, 0x64, 0x62, 0x3a, 0x3a, 0x57, 0x61, 0x6c, 0x3a, 0x3a, 0x56, 0x31, + 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/gen/proto/go/frostdb/wal/v1alpha1/wal_vtproto.pb.go b/gen/proto/go/frostdb/wal/v1alpha1/wal_vtproto.pb.go index 60e93ee2e..0eca2caec 100644 --- a/gen/proto/go/frostdb/wal/v1alpha1/wal_vtproto.pb.go +++ b/gen/proto/go/frostdb/wal/v1alpha1/wal_vtproto.pb.go @@ -206,6 +206,11 @@ func (m *Entry_TableBlockPersisted) MarshalToSizedBufferVT(dAtA []byte) (int, er i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.NextTx != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.NextTx)) + i-- + dAtA[i] = 0x18 + } if len(m.BlockId) > 0 { i -= len(m.BlockId) copy(dAtA[i:], m.BlockId) @@ -450,6 +455,9 @@ func (m *Entry_TableBlockPersisted) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + if m.NextTx != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.NextTx)) + } n += len(m.unknownFields) return n } @@ -1000,6 +1008,25 @@ func (m *Entry_TableBlockPersisted) UnmarshalVT(dAtA []byte) error { m.BlockId = []byte{} } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NextTx", wireType) + } + m.NextTx = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NextTx |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/proto/frostdb/wal/v1alpha1/wal.proto b/proto/frostdb/wal/v1alpha1/wal.proto index c3c7bf863..094122fd9 100644 --- a/proto/frostdb/wal/v1alpha1/wal.proto +++ b/proto/frostdb/wal/v1alpha1/wal.proto @@ -42,6 +42,11 @@ message Entry { string table_name = 1; // Block ID of the new-table-block. bytes block_id = 2; + // NextTx is the next non-persisted transaction at the time of block + // persistence. If the block has been persisted, any txn id < next_tx is + // considered persisted or not relevant to this table (i.e. it can be a + // non-persisted txn from another table). + uint64 next_tx = 3; } // The snapshot entry. diff --git a/table.go b/table.go index a77bdac55..5e40dd648 100644 --- a/table.go +++ b/table.go @@ -496,7 +496,7 @@ func (t *Table) dropPendingBlock(block *TableBlock) { } } -func (t *Table) writeBlock(block *TableBlock, skipPersist, snapshotDB bool) { +func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snapshotDB bool) { level.Debug(t.logger).Log("msg", "syncing block") block.pendingWritersWg.Wait() @@ -532,6 +532,10 @@ func (t *Table) writeBlock(block *TableBlock, skipPersist, snapshotDB bool) { TableBlockPersisted: &walpb.Entry_TableBlockPersisted{ TableName: t.name, BlockId: buf, + // NOTE: nextTxn is used here instead of tx, since some + // writes could have happened between block rotation + // and the txn beginning above. + NextTx: nextTxn, }, }, }, @@ -640,7 +644,7 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo // We don't check t.db.columnStore.manualBlockRotation here because this is // the entry point for users to trigger a manual block rotation and they // will specify through skipPersist if they want the block to be persisted. - go t.writeBlock(block, skipPersist, true) + go t.writeBlock(block, tx, skipPersist, true) return nil } From 0a0a0ab16f3a5361688c96d65a11089d7b3ba259 Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Tue, 14 May 2024 11:07:16 +0200 Subject: [PATCH 2/2] db: fix WAL off-by-one truncation error Previously snapshots were performed using a write txn and the WAL was truncated so that that txn would be the first txn in the truncated WAL. However, snapshots were changed to use a read txn so if a write at txn k was included in the snapshot, a truncated WAL would still contain this write as the first write in the WAL, resulting in duplicate data after recovery. --- db.go | 15 +++++++++++---- db_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/db.go b/db.go index 9df3e1a09..bc4fd16e7 100644 --- a/db.go +++ b/db.go @@ -721,7 +721,10 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { "snapshot_tx", snapshotTx, ) } - if err := wal.Truncate(snapshotTx); err != nil { + // snapshotTx can correspond to a write at that txn that is contained in + // the snapshot. We want the first entry of the WAL to be the subsequent + // txn to not replay duplicate writes. + if err := wal.Truncate(snapshotTx + 1); err != nil { level.Info(db.logger).Log( "msg", "failed to truncate WAL after loading snapshot", "err", err, @@ -736,7 +739,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { var lastTx uint64 start := time.Now() - if err := wal.Replay(snapshotTx, func(_ uint64, record *walpb.Record) error { + if err := wal.Replay(snapshotTx+1, func(_ uint64, record *walpb.Record) error { if err := ctx.Err(); err != nil { return err } @@ -776,7 +779,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error { // WAL (i.e. entries that occupy space on disk but are useless). performSnapshot := false - if err := wal.Replay(snapshotTx, func(tx uint64, record *walpb.Record) error { + if err := wal.Replay(snapshotTx+1, func(tx uint64, record *walpb.Record) error { if err := ctx.Err(); err != nil { return err } @@ -1075,7 +1078,11 @@ func (db *DB) reclaimDiskSpace(ctx context.Context, wal WAL) error { if wal == nil { wal = db.wal } - return wal.Truncate(validSnapshotTxn) + // Snapshots are taken with a read txn and are inclusive, so therefore + // include a potential write at validSnapshotTxn. We don't want this to be + // the first entry in the WAL after truncation, given it is already + // contained in the snapshot, so Truncate at validSnapshotTxn + 1. + return wal.Truncate(validSnapshotTxn + 1) } func (db *DB) getMinTXPersisted() uint64 { diff --git a/db_test.go b/db_test.go index 18f48f356..38b35a909 100644 --- a/db_test.go +++ b/db_test.go @@ -1523,6 +1523,48 @@ func TestDBRecover(t *testing.T) { ) require.Len(t, expectedTimestamps, 0, "expected to see all timestamps on recovery, but could not find %v", expectedTimestamps) }) + + // This is a regression test for a bug found by DST that causes duplicate + // writes on recovery due to an off-by-one error in WAL truncation after + // a snapshot (WAL includes a write that is also in the snapshot). + t.Run("NoDuplicateWrites", func(t *testing.T) { + dir := setup(t, false) + c, err := New( + WithLogger(newTestLogger(t)), + WithStoragePath(dir), + WithWAL(), + ) + require.NoError(t, err) + defer c.Close() + + db, err := c.DB(ctx, dbAndTableName) + require.NoError(t, err) + + // This is deduced based on the fact that `setup` inserts NewTestSamples + // numInserts times. + expectedRowsPerTimestamp := len(dynparquet.NewTestSamples()) + + timestamps := make(map[int64]int, numInserts) + require.NoError( + t, + query.NewEngine( + memory.DefaultAllocator, + db.TableProvider(), + ).ScanTable(dbAndTableName).Execute(ctx, func(ctx context.Context, r arrow.Record) error { + idxs := r.Schema().FieldIndices("timestamp") + require.Len(t, idxs, 1) + tCol := r.Column(idxs[0]).(*array.Int64) + for i := 0; i < tCol.Len(); i++ { + timestamps[tCol.Value(i)]++ + } + return nil + }), + ) + require.Len(t, timestamps, numInserts, "expected %d timestamps, but got %d", numInserts, len(timestamps)) + for ts, occurrences := range timestamps { + require.Equal(t, expectedRowsPerTimestamp, occurrences, "expected %d rows for timestamp %d, but got %d", expectedRowsPerTimestamp, ts, occurrences) + } + }) } func Test_DB_WalReplayTableConfig(t *testing.T) {