Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushsatyam146 committed Dec 3, 2024
1 parent b00ab4d commit 602c8c4
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 70 deletions.
26 changes: 13 additions & 13 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,19 +177,19 @@ type persistence struct {
}

type WALConfig struct {
LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"`
Enabled bool `config:"enabled" default:"true"`
WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"`
WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"`
BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"`
RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"`
MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"`
SegmentRotationTimeSec time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"`
BufferSyncIntervalMillis time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"`
RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"`
MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"`
SegmentRetentionDurationSec time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"`
RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"`
LogDir string `config:"log_dir" default:"tmp/deicdeb-wal-lt"`
Enabled bool `config:"enabled" default:"true"`
WalMode string `config:"mode" default:"buffered" validate:"oneof=buffered unbuffered"`
WriteMode string `config:"write_mode" default:"default" validate:"oneof=default fsync"`
BufferSizeMB int `config:"buffer_size" default:"1" validate:"min=1"`
RotationMode string `config:"rotation_mode" default:"segemnt-size" validate:"oneof=segment-size time"`
MaxSegmentSizeMB int64 `config:"buffer_size" default:"16" validate:"min=1"`
SegmentRotationTime time.Duration `config:"max_segment_rotation_time" default:"60" validate:"min=1"`
BufferSyncInterval time.Duration `config:"max_segment_rotation_time" default:"200" validate:"min=1"`
RetentionMode string `config:"retention_mode" default:"num-segments" validate:"oneof=num-segments time checkpoint"`
MaxSegmentCount int `config:"max_segment_count" default:"10" validate:"min=1"`
SegmentRetentionDuration time.Duration `config:"max_segment_retention_time" default:"600" validate:"min=1"`
RecoveryMode string `config:"recovery_mode" default:"strict" validate:"oneof=strict truncate ignore"`
}

type logging struct {
Expand Down
8 changes: 6 additions & 2 deletions internal/iothread/iothread.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,17 @@ func (t *BaseIOThread) handleCommand(ctx context.Context, cmdMeta CmdMeta, diceD
}

if err == nil && t.wl != nil {
t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " "))))
if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil {
return err
}
}
case MultiShard, AllShard:
err = t.writeResponse(ctx, cmdMeta.composeResponse(storeOp...))

if err == nil && t.wl != nil {
t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " "))))
if err := t.wl.LogCommand([]byte(fmt.Sprintf("%s %s", diceDBCmd.Cmd, strings.Join(diceDBCmd.Args, " ")))); err != nil {
return err
}
}
default:
slog.Error("Unknown command type",
Expand Down
2 changes: 1 addition & 1 deletion internal/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type AbstractWAL interface {
LogCommand([] byte) error
LogCommand([]byte) error
Close() error
Init(t time.Time) error
ForEachCommand(f func(c cmd.DiceDBCmd) error) error
Expand Down
64 changes: 33 additions & 31 deletions internal/wal/wal_aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ type WALAOF struct {
}

func NewAOFWAL(directory string) (*WALAOF, error) {

ctx, cancel := context.WithCancel(context.Background())

return &WALAOF{
logDir: directory,
walMode: config.DiceConfig.WAL.WalMode,
bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncIntervalMillis * time.Millisecond),
segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTimeSec * time.Second),
segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDurationSec * time.Second),
bufferSyncTicker: time.NewTicker(config.DiceConfig.WAL.BufferSyncInterval * time.Millisecond),
segmentRotationTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRotationTime * time.Second),
segmentRetentionTicker: time.NewTicker(config.DiceConfig.WAL.SegmentRetentionDuration * time.Second),
writeMode: config.DiceConfig.WAL.WriteMode,
maxSegmentSize: config.DiceConfig.WAL.MaxSegmentSizeMB * 1024 * 1024,
maxSegmentCount: config.DiceConfig.WAL.MaxSegmentCount,
Expand All @@ -76,37 +75,37 @@ func NewAOFWAL(directory string) (*WALAOF, error) {
}, nil
}

func (w *WALAOF) Init(t time.Time) error {

if err := w.validateConfig(); err != nil {
func (wal *WALAOF) Init(t time.Time) error {
if err := wal.validateConfig(); err != nil {
return err
}

// TODO - Restore existing checkpoints to memory

// Create the directory if it doesn't exist
if err := os.MkdirAll(w.logDir, 0755); err != nil {
if err := os.MkdirAll(wal.logDir, 0755); err != nil {
return nil
}

// Get the list of log segment files in the directory
files, err := filepath.Glob(filepath.Join(w.logDir, segmentPrefix+"*"))
files, err := filepath.Glob(filepath.Join(wal.logDir, segmentPrefix+"*"))
if err != nil {
return nil
}

if len(files) > 0 {
fmt.Println("Found existing log segments:", files)
// TODO - Check if we have newer WAL entries after the last checkpoint and simultaneously replay and checkpoint them
}

var wg sync.WaitGroup
errCh := make(chan error, w.maxSegmentCount)
errCh := make(chan error, wal.maxSegmentCount)

for i := 0; i < w.maxSegmentCount; i++ {
for i := 0; i < wal.maxSegmentCount; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
filePath := filepath.Join(w.logDir, segmentPrefix+fmt.Sprintf("-%d", index))
filePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("-%d", index))
file, err := os.Create(filePath)
if err != nil {
errCh <- fmt.Errorf("error creating segment file %s: %v", filePath, err)
Expand All @@ -119,24 +118,24 @@ func (w *WALAOF) Init(t time.Time) error {
wg.Wait()
close(errCh)

w.lastSequenceNo = 0
w.currentSegmentIndex = 0
w.oldestSegmentIndex = 0
w.byteOffset = 0
w.currentSegmentFile, err = os.OpenFile(filepath.Join(w.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if _, err = w.currentSegmentFile.Seek(0, io.SeekEnd); err != nil {
wal.lastSequenceNo = 0
wal.currentSegmentIndex = 0
wal.oldestSegmentIndex = 0
wal.byteOffset = 0
wal.currentSegmentFile, err = os.OpenFile(filepath.Join(wal.logDir, "seg-0"), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)

Check failure on line 125 in internal/wal/wal_aof.go

View workflow job for this annotation

GitHub Actions / lint

SA4006: this value of `err` is never used (staticcheck)

Check failure on line 125 in internal/wal/wal_aof.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
if _, err := wal.currentSegmentFile.Seek(0, io.SeekEnd); err != nil {
return err
}
w.bufWriter = bufio.NewWriterSize(w.currentSegmentFile, w.bufferSize)
wal.bufWriter = bufio.NewWriterSize(wal.currentSegmentFile, wal.bufferSize)

go w.keepSyncingBuffer()
go wal.keepSyncingBuffer()

if w.rotationMode == "time" {
go w.rotateSegmentPeriodically()
if wal.rotationMode == "time" { //nolint:goconst
go wal.rotateSegmentPeriodically()
}

if w.retentionMode == "time" {
go w.deleteSegmentPeriodically()
if wal.retentionMode == "time" { //nolint:goconst
go wal.deleteSegmentPeriodically()
}

return nil
Expand All @@ -161,17 +160,21 @@ func (wal *WALAOF) writeEntry(data []byte) error {
}

entrySize := getEntrySize(data)
wal.rotateLogIfNeeded(entrySize)
if err := wal.rotateLogIfNeeded(entrySize); err != nil {
return err
}

wal.byteOffset += entrySize

if err := wal.writeEntryToBuffer(entry); err != nil {
return err
}

// if wal-mode unbuffered immediatley sync to disk
if wal.walMode == "unbuffered" {
wal.Sync()
// if wal-mode unbuffered immediately sync to disk
if wal.walMode == "unbuffered" { //nolint:goconst
if err := wal.Sync(); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -232,7 +235,6 @@ func (wal *WALAOF) rotateLog() error {
}

func (wal *WALAOF) deleteOldestSegment() error {

oldestSegmentFilePath := filepath.Join(wal.logDir, segmentPrefix+fmt.Sprintf("%d", wal.oldestSegmentIndex))

// TODO: checkpoint before deleting the file
Expand Down Expand Up @@ -260,7 +262,7 @@ func (wal *WALAOF) Sync() error {
if err := wal.bufWriter.Flush(); err != nil {
return err
}
if wal.writeMode == "fsync" {
if wal.writeMode == "fsync" { //nolint:goconst
if err := wal.currentSegmentFile.Sync(); err != nil {
return err
}
Expand Down Expand Up @@ -323,7 +325,7 @@ func (wal *WALAOF) deleteSegmentPeriodically() {
}
}

func (w *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error {
func (wal *WALAOF) ForEachCommand(f func(c cmd.DiceDBCmd) error) error {
// TODO: implement this method
return nil
}
2 changes: 1 addition & 1 deletion internal/wal/wal_null.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (w *WALNull) Init(t time.Time) error {
}

// LogCommand serializes a WALLogEntry and writes it to the current WAL file.
func (w *WALNull) LogCommand(b []byte) error{
func (w *WALNull) LogCommand(b []byte) error {
return nil
}

Expand Down
22 changes: 0 additions & 22 deletions internal/wal/wal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,10 @@ package wal

import (
"fmt"
"hash/crc32"

"google.golang.org/protobuf/proto"
)

// unmarshalAndVerifyEntry unmarshals the given data into a WAL entry and
// verifies the CRC of the entry. Only returns an error if the CRC is invalid.
func unmarshalAndVerifyEntry(data []byte) (*WAL_Entry, error) {
var entry WAL_Entry
MustUnmarshal(data, &entry)

if !verifyCRC(&entry) {
return nil, fmt.Errorf("CRC mismatch: data may be corrupted")
}

return &entry, nil
}

// Validates whether the given entry has a valid CRC.
func verifyCRC(entry *WAL_Entry) bool {
// Reset the entry CRC for the verification.
actualCRC := crc32.ChecksumIEEE(append(entry.GetData(), byte(entry.GetLogSequenceNumber())))

return entry.CRC == actualCRC
}

// Marshals
func MustMarshal(entry *WAL_Entry) []byte {
marshaledEntry, err := proto.Marshal(entry)
Expand Down

0 comments on commit 602c8c4

Please sign in to comment.