Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
JkLondon committed Nov 4, 2024
1 parent d9e0f8f commit 2466c93
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 335 deletions.
39 changes: 35 additions & 4 deletions erigon-lib/common/customfs/customfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@ import (
"errors"
"github.com/spf13/afero"
"io"
"math/rand"
"io/fs"
"os"
"slices"
"strconv"
"strings"
"sync/atomic"
"syscall"
)

var CFS = CustomFileSystem{afero.NewOsFs()}
var CFS = CustomFileSystem{afero.NewOsFs(), atomic.Int32{}}

type CustomFileSystem struct {
afero.Fs
TmpCounter atomic.Int32
}

type IoWrapper struct {
CustomFileSystem
}

func (fs *IoWrapper) Open(name string) (fs.File, error) {
return fs.Fs.Open(name)
}

type CustomFile struct {
Expand Down Expand Up @@ -119,7 +129,8 @@ func (fs *CustomFileSystem) MkdirTemp(dir, pattern string) (string, error) {

try := 0
for {
name := prefix + strconv.Itoa(rand.Int()) + suffix
tmpCounter := fs.TmpCounter.Add(1) - 1
name := prefix + strconv.Itoa(int(tmpCounter)) + suffix
err := fs.Mkdir(name, 0700)
if err == nil {
return name, nil
Expand Down Expand Up @@ -156,7 +167,8 @@ func (fs *CustomFileSystem) CreateTemp(dir, pattern string) (*CustomFile, error)

try := 0
for {
name := prefix + strconv.Itoa(rand.Int()) + suffix
tmpCounter := fs.TmpCounter.Add(1) - 1
name := prefix + strconv.Itoa(int(tmpCounter)) + suffix
f, err := fs.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600)
if fs.IsExist(err) {
if try++; try < 10000 {
Expand Down Expand Up @@ -254,3 +266,22 @@ func (fs *CustomFileSystem) ReadFile(name string) ([]byte, error) {
}
}
}

func (fs *CustomFileSystem) WriteFile(name string, data []byte, perm os.FileMode) error {
f, err := fs.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
_, err = f.Write(data)
if err1 := f.Close(); err1 != nil && err == nil {
err = err1
}
return err
}

func (fs *CustomFileSystem) DirFS(dir string) fs.FS {
if _, ok := fs.Fs.(*afero.OsFs); ok {
return os.DirFS(dir)
}
return &IoWrapper{CustomFileSystem{afero.NewBasePathFs(fs.Fs, dir), atomic.Int32{}}}
}
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/shirou/gopsutil/v4 v4.24.8
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/afero v1.11.0
github.com/stretchr/testify v1.9.0
github.com/tidwall/btree v1.6.0
go.uber.org/mock v0.5.0
Expand All @@ -61,7 +62,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pion/udp v0.1.4 // indirect
github.com/spf13/afero v1.11.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/tools v0.26.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
Expand Down
5 changes: 2 additions & 3 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/erigontech/erigon-lib/common/customfs"
"maps"
"os"
"path/filepath"
Expand Down Expand Up @@ -128,11 +127,11 @@ func (opts MdbxOpts) WriteMap(flag bool) MdbxOpts {

func (opts MdbxOpts) InMem(tmpDir string) MdbxOpts {
if tmpDir != "" {
if err := customfs.CFS.MkdirAll(tmpDir, 0755); err != nil {
if err := os.MkdirAll(tmpDir, 0755); err != nil {
panic(err)
}
}
path, err := customfs.CFS.MkdirTemp(tmpDir, "erigon-memdb-")
path, err := os.MkdirTemp(tmpDir, "erigon-memdb-")
if err != nil {
panic(err)
}
Expand Down
5 changes: 4 additions & 1 deletion erigon-lib/state/aggregator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"path"
"path/filepath"
"sync/atomic"
"testing"
"time"

Expand All @@ -44,7 +45,9 @@ import (

func testDbAndAggregatorBench(b *testing.B, aggStep uint64) (kv.RwDB, *Aggregator) {
b.Helper()
customfs.CFS = customfs.CustomFileSystem{Fs: afero.NewMemMapFs()}
if _, ok := customfs.CFS.Fs.(*afero.OsFs); ok {
customfs.CFS = customfs.CustomFileSystem{Fs: afero.NewMemMapFs(), TmpCounter: atomic.Int32{}}
}
logger := log.New()
dirs := datadir.New("tmp")
db := mdbx.NewMDBX(logger).InMem(dirs.Chaindata).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
Expand Down
14 changes: 11 additions & 3 deletions erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/spf13/afero"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -1093,11 +1094,18 @@ func generateKV(tb testing.TB, tmp string, keySize, valueSize, keyCount int, log
func testDbAndAggregatorv3(t *testing.T, aggStep uint64) (kv.RwDB, *Aggregator) {
t.Helper()
require := require.New(t)
customfs.CFS = customfs.CustomFileSystem{Fs: afero.NewMemMapFs()}
tmpName := "tmp" + strconv.Itoa(rand.Int())
if _, ok := customfs.CFS.Fs.(*afero.OsFs); ok {
customfs.CFS = customfs.CustomFileSystem{Fs: afero.NewMemMapFs(), TmpCounter: atomic.Int32{}}
}
tmpCounter := customfs.CFS.TmpCounter.Add(1) - 1
tmpName := "tmp" + strconv.Itoa(int(tmpCounter))
println("tmpName", tmpName)
dirs := datadir.New(tmpName)
t.Cleanup(func() {
customfs.CFS.RemoveAll(tmpName)
println("removing")
err := customfs.CFS.RemoveAll(tmpName)
require.NoError(err)
err = os.RemoveAll(tmpName)
})
logger := log.New()
db := mdbx.NewMDBX(logger).InMem(dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg {
Expand Down
10 changes: 6 additions & 4 deletions erigon-lib/state/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"github.com/erigontech/erigon-lib/common/customfs"
"io/fs"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"strconv"
Expand Down Expand Up @@ -116,9 +116,9 @@ func TestDomain_OpenFolder(t *testing.T) {
fn := ff.src.decompressor.FilePath()
d.Close()

err := os.Remove(fn)
err := customfs.CFS.Remove(fn)
require.NoError(t, err)
err = os.WriteFile(fn, make([]byte, 33), 0644)
err = customfs.CFS.WriteFile(fn, make([]byte, 33), 0644)
require.NoError(t, err)

err = d.openFolder()
Expand Down Expand Up @@ -916,6 +916,7 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) {

dom.History._visibleFiles[i].src.closeFilesAndRemove()
}
println("removedHist", len(removedHist))
dom.Close()

err = dom.openFolder()
Expand All @@ -925,6 +926,7 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) {

// domain files for same range should not be available so lengths should match
require.Len(t, dom._visible.files, len(run1Doms)-len(removedHist))

require.Len(t, dom.History._visibleFiles, len(dom._visible.files))
require.Len(t, dom.History._visibleFiles, len(run1Hist)-len(removedHist))

Expand Down Expand Up @@ -954,7 +956,7 @@ func TestDomain_OpenFilesWithDeletions(t *testing.T) {

// check files persist on the disk
persistingDomains := make(map[string]bool, 0)
err = fs.WalkDir(os.DirFS(dom.dirs.SnapDomain), ".", func(path string, d fs.DirEntry, err error) error {
err = fs.WalkDir(customfs.CFS.DirFS(dom.dirs.SnapDomain), ".", func(path string, d fs.DirEntry, err error) error {
persistingDomains[filepath.Base(path)] = false
return nil
})
Expand Down
22 changes: 11 additions & 11 deletions erigon-lib/state/files_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package state

import (
"os"
"github.com/erigontech/erigon-lib/common/customfs"
"sync/atomic"

btree2 "github.com/tidwall/btree"
Expand Down Expand Up @@ -107,10 +107,10 @@ func (i *filesItem) closeFilesAndRemove() {
i.decompressor.Close()
// paranoic-mode on: don't delete frozen files
if !i.frozen {
if err := os.Remove(i.decompressor.FilePath()); err != nil {
if err := customfs.CFS.Remove(i.decompressor.FilePath()); err != nil {
log.Trace("remove after close", "err", err, "file", i.decompressor.FileName())
}
if err := os.Remove(i.decompressor.FilePath() + ".torrent"); err != nil {
if err := customfs.CFS.Remove(i.decompressor.FilePath() + ".torrent"); err != nil {
log.Trace("remove after close", "err", err, "file", i.decompressor.FileName()+".torrent")
}
}
Expand All @@ -120,41 +120,41 @@ func (i *filesItem) closeFilesAndRemove() {
i.index.Close()
// paranoic-mode on: don't delete frozen files
if !i.frozen {
if err := os.Remove(i.index.FilePath()); err != nil {
if err := customfs.CFS.Remove(i.index.FilePath()); err != nil {
log.Trace("remove after close", "err", err, "file", i.index.FileName())
}
if err := os.Remove(i.index.FilePath() + ".torrent"); err != nil {
if err := customfs.CFS.Remove(i.index.FilePath() + ".torrent"); err != nil {
log.Trace("remove after close", "err", err, "file", i.index.FileName())
}
}
i.index = nil
}
if i.bindex != nil {
i.bindex.Close()
if err := os.Remove(i.bindex.FilePath()); err != nil {
if err := customfs.CFS.Remove(i.bindex.FilePath()); err != nil {
log.Trace("remove after close", "err", err, "file", i.bindex.FileName())
}
if err := os.Remove(i.bindex.FilePath() + ".torrent"); err != nil {
if err := customfs.CFS.Remove(i.bindex.FilePath() + ".torrent"); err != nil {
log.Trace("remove after close", "err", err, "file", i.bindex.FileName())
}
i.bindex = nil
}
if i.bm != nil {
i.bm.Close()
if err := os.Remove(i.bm.FilePath()); err != nil {
if err := customfs.CFS.Remove(i.bm.FilePath()); err != nil {
log.Trace("remove after close", "err", err, "file", i.bm.FileName())
}
if err := os.Remove(i.bm.FilePath() + ".torrent"); err != nil {
if err := customfs.CFS.Remove(i.bm.FilePath() + ".torrent"); err != nil {
log.Trace("remove after close", "err", err, "file", i.bm.FileName())
}
i.bm = nil
}
if i.existence != nil {
i.existence.Close()
if err := os.Remove(i.existence.FilePath); err != nil {
if err := customfs.CFS.Remove(i.existence.FilePath); err != nil {
log.Trace("remove after close", "err", err, "file", i.existence.FileName)
}
if err := os.Remove(i.existence.FilePath + ".torrent"); err != nil {
if err := customfs.CFS.Remove(i.existence.FilePath + ".torrent"); err != nil {
log.Trace("remove after close", "err", err, "file", i.existence.FilePath)
}
i.existence = nil
Expand Down
17 changes: 13 additions & 4 deletions erigon-lib/state/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
"encoding/binary"
"fmt"
"github.com/erigontech/erigon-lib/common/customfs"
"github.com/spf13/afero"
"math"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -53,8 +54,16 @@ import (

func testDbAndHistory(tb testing.TB, largeValues bool, logger log.Logger) (kv.RwDB, *History) {
tb.Helper()
tmpName := "tmp" + strconv.Itoa(rand.Int())
if _, ok := customfs.CFS.Fs.(*afero.OsFs); ok {
customfs.CFS = customfs.CustomFileSystem{Fs: afero.NewMemMapFs(), TmpCounter: atomic.Int32{}}
}
tmpCounter := customfs.CFS.TmpCounter.Add(1) - 1
tmpName := "tmp" + strconv.Itoa(int(tmpCounter))
dirs := datadir.New(tmpName)
tb.Cleanup(func() {
customfs.CFS.RemoveAll(tmpName)
os.RemoveAll(tmpName)
})
keysTable := "AccountKeys"
indexTable := "AccountIndex"
valsTable := "AccountVals"
Expand Down Expand Up @@ -1549,9 +1558,9 @@ func TestHistory_OpenFolder(t *testing.T) {
fn := ff.src.decompressor.FilePath()
h.Close()

err := os.Remove(fn)
err := customfs.CFS.Remove(fn)
require.NoError(t, err)
err = os.WriteFile(fn, make([]byte, 33), 0644)
err = customfs.CFS.WriteFile(fn, make([]byte, 33), 0644)
require.NoError(t, err)

err = h.openFolder()
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/state/inverted_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"context"
"encoding/binary"
"fmt"
"github.com/erigontech/erigon-lib/common/customfs"
"math"
"os"
"testing"
"time"

Expand Down Expand Up @@ -737,9 +737,9 @@ func TestInvIndex_OpenFolder(t *testing.T) {
fn := ff.src.decompressor.FilePath()
ii.Close()

err := os.Remove(fn)
err := customfs.CFS.Remove(fn)
require.NoError(t, err)
err = os.WriteFile(fn, make([]byte, 33), 0644)
err = customfs.CFS.WriteFile(fn, make([]byte, 33), 0644)
require.NoError(t, err)

err = ii.openFolder()
Expand Down
Loading

0 comments on commit 2466c93

Please sign in to comment.