From 5393d83b58f00efc228a8a11f2566078d441be97 Mon Sep 17 00:00:00 2001 From: Michael Woolnough Date: Tue, 17 Dec 2024 15:31:57 +0000 Subject: [PATCH] Rewrite walk to use less RAM --- go.mod | 1 - go.sum | 2 - walk/dirent.go | 134 ++++++------------- walk/dirent_test.go | 7 - walk/file.go | 15 ++- walk/walk.go | 305 +++++++++++++++++++++++++++++--------------- walk/walk_test.go | 2 +- 7 files changed, 252 insertions(+), 214 deletions(-) diff --git a/go.mod b/go.mod index 89091984..f2db33c3 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/smartystreets/goconvey v1.7.2 github.com/spf13/cobra v1.8.1 github.com/termie/go-shutil v0.0.0-20140729215957-bcacb06fecae - github.com/wtsi-hgi/godirwalk v1.18.1 github.com/wtsi-ssg/wr v0.5.9 ) diff --git a/go.sum b/go.sum index 9bc6b957..064a3a97 100644 --- a/go.sum +++ b/go.sum @@ -284,8 +284,6 @@ github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= -github.com/wtsi-hgi/godirwalk v1.18.1 h1:t7eaGXYBfTtfIEGLizPCC9fzASTvZtdhKEEri8TyyJs= -github.com/wtsi-hgi/godirwalk v1.18.1/go.mod h1:rLa4FlI9kdT7o67jwFos8qgaX3K2sMC6XI4FXJ1iVyk= github.com/wtsi-ssg/wr v0.5.9 h1:lJWNuJfVvhTpXQqxRN5RbffhvK3HMog0fFpUFznvoz8= github.com/wtsi-ssg/wr v0.5.9/go.mod h1:njSdCX+xv1xzzw3Oy3Smid6s/IyIQEvLsKbRwaq4fC8= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/walk/dirent.go b/walk/dirent.go index 4a51a898..8ef168bd 100644 --- a/walk/dirent.go +++ b/walk/dirent.go @@ -29,114 +29,64 @@ package walk import ( "io/fs" "os" - "sync" - "unsafe" - - "github.com/wtsi-hgi/godirwalk" -) - -var ( - filePathPool64 = sync.Pool{New: func() any { x := make(FilePath, 0, 64); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll - filePathPool128 = sync.Pool{New: func() any { x := make(FilePath, 0, 128); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll - filePathPool256 = sync.Pool{New: func() any { x := make(FilePath, 0, 256); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll - filePathPool512 = sync.Pool{New: func() any { x := make(FilePath, 0, 512); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll - filePathPool1024 = sync.Pool{New: func() any { x := make(FilePath, 0, 1024); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll - filePathPool2048 = sync.Pool{New: func() any { x := make(FilePath, 0, 2048); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll - filePathPool4096 = sync.Pool{New: func() any { x := make(FilePath, 0, 4096); return &x }} //nolint:gochecknoglobals,mnd,nlreturn,lll + "strings" ) -// FilePath is a byte-slice of a path, utilising object pools to reduce memory -// allocations. -// -// It is the clients responsibility to call the Done method once it is no longer -// needed. -type FilePath []byte - -func newFilePathSize(size int) *FilePath { - switch { - case size <= 64: //nolint:mnd - return filePathPool64.Get().(*FilePath) //nolint:forcetypeassert - case size <= 128: //nolint:mnd - return filePathPool128.Get().(*FilePath) //nolint:forcetypeassert - case size <= 256: //nolint:mnd - return filePathPool256.Get().(*FilePath) //nolint:forcetypeassert - case size <= 512: //nolint:mnd - return filePathPool512.Get().(*FilePath) //nolint:forcetypeassert - case size <= 1024: //nolint:mnd - return filePathPool1024.Get().(*FilePath) //nolint:forcetypeassert - case size <= 2048: //nolint:mnd - return filePathPool2048.Get().(*FilePath) //nolint:forcetypeassert - } - - return filePathPool4096.Get().(*FilePath) //nolint:forcetypeassert +// FilePath is a byte-slice of a path. +type FilePath struct { + parent *FilePath + name string + depth uint16 } // NewFilePath creates a new FilePath, setting the value to the given string. -func NewFilePath(path string) *FilePath { - c := newFilePathSize(len(path)) - c.writeString(path) - - return c +func NewFilePath(path string) FilePath { + return FilePath{name: path} } -func (f *FilePath) writeString(str string) { - *f = append(*f, str...) +func (f *FilePath) appendTo(p []byte) []byte { + if f.parent != nil { + p = f.parent.appendTo(p) + } + + return append(p, f.name...) } -func (f *FilePath) writeBytes(p []byte) { - *f = append(*f, p...) +// Bytes returns the FilePath as a literal byte-slice. +func (f *FilePath) Bytes() []byte { + return f.appendTo(nil) } -// Done deallocates the underlying byte-slice; any uses of the Bytes method are -// now invalid and may change. -func (f *FilePath) Done() { //nolint:gocyclo - *f = (*f)[:0] - - switch cap(*f) { - case 64: //nolint:mnd - filePathPool64.Put(f) - case 128: //nolint:mnd - filePathPool128.Put(f) - case 256: //nolint:mnd - filePathPool256.Put(f) - case 512: //nolint:mnd - filePathPool512.Put(f) - case 1024: //nolint:mnd - filePathPool1024.Put(f) - case 2048: //nolint:mnd - filePathPool2048.Put(f) - case 4096: //nolint:mnd - filePathPool4096.Put(f) +func (f *FilePath) compare(g *FilePath) int { + if f.depth < g.depth { + return f.compareTo(g.getDepth(f.depth)) + } else if f.depth > g.depth { + return f.getDepth(g.depth).compareTo(g) } -} -func (f *FilePath) sub(d *godirwalk.Dirent) *FilePath { - name := d.Name() - size := len(*f) + len(name) + return f.compareTo(g) +} - if d.IsDir() { - size++ +func (f *FilePath) getDepth(n uint16) *FilePath { + for f.depth != n { + f = f.parent } - c := newFilePathSize(size) - - c.writeBytes(*f) - c.writeString(name) + return f +} - if d.IsDir() { - c.writeString("/") +func (f *FilePath) compareTo(g *FilePath) int { + if f == g { + return 0 } - return c -} + cmp := f.parent.compareTo(g.parent) -// Bytes returns the FilePath as a literal byte-slice. -func (f *FilePath) Bytes() []byte { - return *f -} + if cmp == 0 { + return strings.Compare(f.name, g.name) + } -func (f *FilePath) string() string { - return unsafe.String(&(*f)[0], len(*f)) + return cmp } // Dirent represents a file system directory entry (a file or a directory), @@ -144,21 +94,15 @@ func (f *FilePath) string() string { type Dirent struct { // Path is the complete path to the directory entry (including both // directory and basename) - Path *FilePath + Path FilePath // Type is the type bits of the file mode of this entry. - Type os.FileMode + Type fs.FileMode // Inode is the file system inode number for this entry. Inode uint64 } -// newDirentForDirectoryPath returns a Dirent for the given directory, with -// a Type for directories and no Inode. -func newDirentForDirectoryPath(dir string) Dirent { - return Dirent{Path: NewFilePath(dir), Type: fs.ModeDir} -} - // IsDir returns true if we are a directory. func (d *Dirent) IsDir() bool { return d.Type.IsDir() diff --git a/walk/dirent_test.go b/walk/dirent_test.go index 7bfa648c..2775222b 100644 --- a/walk/dirent_test.go +++ b/walk/dirent_test.go @@ -71,11 +71,4 @@ func TestDirent(t *testing.T) { So(d.IsRegular(), ShouldBeFalse) So(d.IsSymlink(), ShouldBeTrue) }) - - Convey("You can make a fake Direct for directories", t, func() { - d := newDirentForDirectoryPath("/a/dir") - So(d.IsDir(), ShouldBeTrue) - So(d.IsRegular(), ShouldBeFalse) - So(d.IsSymlink(), ShouldBeFalse) - }) } diff --git a/walk/file.go b/walk/file.go index 21b0b03a..3e1f4ad5 100644 --- a/walk/file.go +++ b/walk/file.go @@ -33,10 +33,13 @@ import ( "path/filepath" "strconv" "sync" + "unsafe" ) const userOnlyPerm = 0700 +const maxPathLength = 4096 + // non-ascii bytes could become \xXX (4x the length at worst), the two // speech-marks are +2 and a newline is +1. const maxQuotedPathLength = 4096*4 + 2 + 1 @@ -162,12 +165,16 @@ func NewFiles(outDir string, n int) (*Files, error) { // // It will terminate the walk if writes to our output files fail. func (f *Files) WritePaths() PathCallback { - var quoted [maxQuotedPathLength]byte + var ( + quoted [maxQuotedPathLength]byte + tmpPath [maxPathLength]byte + ) return func(entry *Dirent) error { - defer entry.Path.Done() - - return f.writePath(append(strconv.AppendQuote(quoted[:0], entry.Path.string()), '\n')) + return f.writePath(append( + strconv.AppendQuote( + quoted[:0], unsafe.String(&tmpPath[0], len(entry.Path.appendTo(tmpPath[:0]))), + ), '\n')) } } diff --git a/walk/walk.go b/walk/walk.go index a42f3d73..0ae54490 100644 --- a/walk/walk.go +++ b/walk/walk.go @@ -30,15 +30,16 @@ package walk import ( + "bytes" "context" + "errors" + "io/fs" "os" "path/filepath" "slices" - "sort" - "strings" "sync" - - "github.com/wtsi-hgi/godirwalk" + "syscall" + "unsafe" ) const walkers = 16 @@ -78,16 +79,9 @@ func New(cb PathCallback, includDirs, ignoreSymlinks bool) *Walker { type ErrorCallback func(path string, err error) type pathRequest struct { - path *FilePath - response chan []Dirent -} - -var pathRequestPool = sync.Pool{ //nolint:gochecknoglobals - New: func() any { - return &pathRequest{ - response: make(chan []Dirent), - } - }, + Dirent + next *pathRequest + ready sync.Mutex } // Walk will discover all the paths nested under the given dir, and send them to @@ -102,45 +96,43 @@ func (w *Walker) Walk(dir string, errCB ErrorCallback) error { dir = filepath.Clean(dir) + "/" requestCh := make(chan *pathRequest) sortedRequestCh := make(chan *pathRequest) - direntCh := make(chan Dirent, dirsChSize) - flowControl := newController() ctx, stop := context.WithCancel(context.Background()) for range walkers { - go w.handleDirReads(ctx, sortedRequestCh, errCB) + go w.handleDirReads(ctx, sortedRequestCh, requestCh, errCB, w.ignoreSymlinks) } - go func() { - walkDirectory(ctx, newDirentForDirectoryPath(dir), - flowControl, createPathRequestor(requestCh), w.sendDirs) - close(direntCh) - }() - go sortPathRequests(ctx, requestCh, sortedRequestCh) - go flowControl.PassControl(direntCh) - defer stop() - - return w.sendDirentsToPathCallback(direntCh) -} + r := &pathRequest{ + Dirent: Dirent{ + Path: NewFilePath(dir), + Type: fs.ModeDir, + Inode: 0, + }, + } -func createPathRequestor(requestCh chan *pathRequest) func(*FilePath) []Dirent { - return func(path *FilePath) []Dirent { - pr := pathRequestPool.Get().(*pathRequest) //nolint:errcheck,forcetypeassert - defer pathRequestPool.Put(pr) + r.ready.Lock() - pr.path = path + sortedRequestCh <- r - requestCh <- pr + defer stop() - return <-pr.response - } + return w.sendDirentsToPathCallback(r) } -func (w *Walker) sendDirentsToPathCallback(direntCh <-chan Dirent) error { - for dirent := range direntCh { - if err := w.pathCB(&dirent); err != nil { - return err +func (w *Walker) sendDirentsToPathCallback(r *pathRequest) error { + for ; r != nil; r = r.next { + isDir := r.IsDir() + + if w.sendDirs || !isDir { + if err := w.pathCB(&r.Dirent); err != nil { + return err + } + } + + if isDir { + r.ready.Lock() } } @@ -150,7 +142,7 @@ func (w *Walker) sendDirentsToPathCallback(direntCh <-chan Dirent) error { type heap []*pathRequest func pathCompare(a, b *pathRequest) int { - return strings.Compare(b.path.string(), a.path.string()) + return b.Path.compare(&a.Path) } func (h *heap) Insert(req *pathRequest) { @@ -195,113 +187,218 @@ func sortPathRequests(ctx context.Context, requestCh <-chan *pathRequest, //noli } } -func (w *Walker) handleDirReads(ctx context.Context, requests chan *pathRequest, errCB ErrorCallback) { +func (w *Walker) handleDirReads(ctx context.Context, sortedRequests, requestCh chan *pathRequest, + errCB ErrorCallback, ignoreSymlinks bool) { buffer := make([]byte, os.Getpagesize()) + var pathBuffer [maxPathLength + 1]byte + Loop: for { select { case <-ctx.Done(): break Loop - case request := <-requests: - children, err := godirwalk.ReadDirents(request.path.string(), buffer) - if err != nil { - errCB(string(request.path.Bytes()), err) + case request := <-sortedRequests: + l := len(request.Path.appendTo(pathBuffer[:0])) + pathBuffer[l] = 0 + + if err := scan(buffer, &pathBuffer[0], request, ignoreSymlinks); err != nil { + errCB(string(pathBuffer[:l]), err) } - request.response <- w.childrenToDirents(children, request.path) + go scanChildDirs(requestCh, request) } } } -func (w *Walker) childrenToDirents(children godirwalk.Dirents, parent *FilePath) []Dirent { - dirents := make([]Dirent, 0, len(children)) +func scanChildDirs(requestCh chan *pathRequest, request *pathRequest) { + for p, r := &request.Path, request.next; r != nil && r.Path.parent == p; { + next := r.next - for _, child := range children { - dirent := Dirent{ - Path: parent.sub(child), - Type: child.ModeType(), - Inode: child.Inode(), + if r.IsDir() { + requestCh <- r } - if w.ignoreSymlinks && dirent.IsSymlink() { - continue + r = next + } +} + +type scanner struct { + buffer, read []byte + fh int + syscall.Dirent + err error +} + +func (s *scanner) Next() bool { + for len(s.read) == 0 { + n, err := syscall.ReadDirent(s.fh, s.buffer) + if err != nil { + if errors.Is(err, syscall.EINTR) { + continue + } + + s.err = err + + return false + } + + if n <= 0 { + return false } - dirents = append(dirents, dirent) + s.read = s.buffer[:n] } - sort.Slice(dirents, func(i, j int) bool { - return dirents[i].Path.string() < dirents[j].Path.string() - }) + copy((*[unsafe.Sizeof(syscall.Dirent{})]byte)(unsafe.Pointer(&s.Dirent))[:], s.read) + s.read = s.read[s.Reclen:] - return dirents + return true } -type flowController struct { - controller chan chan<- Dirent -} +func (s *scanner) Get() (string, fs.FileMode, uint64) { + mode := s.getMode() -func newController() *flowController { - return controllerPool.Get().(*flowController) //nolint:forcetypeassert + return s.getName(mode.IsDir()), mode, s.Ino } -func (f *flowController) GetControl() chan<- Dirent { - return <-f.controller -} +func (s *scanner) getName(isDir bool) string { //nolint:gocyclo + n := s.Dirent.Name[:] + name := *(*[]byte)(unsafe.Pointer(&n)) + + l := bytes.IndexByte(name, 0) + if l < 0 || l == 1 && s.Dirent.Name[0] == '.' || l == 2 && s.Dirent.Name[0] == '.' && s.Dirent.Name[1] == '.' { + return "" + } -func (f *flowController) PassControl(control chan<- Dirent) { - f.controller <- control - <-f.controller + if isDir { + s.Dirent.Name[l] = '/' + l++ + } + + return string(name[:l]) } -func (f *flowController) EndControl() { - f.controller <- nil - controllerPool.Put(f) +func (s *scanner) getMode() fs.FileMode { + switch s.Type { + case syscall.DT_DIR: + return fs.ModeDir + case syscall.DT_LNK: + return fs.ModeSymlink + case syscall.DT_CHR: + return fs.ModeDevice | fs.ModeCharDevice + case syscall.DT_BLK: + return fs.ModeDevice + case syscall.DT_FIFO: + return fs.ModeNamedPipe + case syscall.DT_SOCK: + return fs.ModeSocket + } + + return 0 } -var controllerPool = sync.Pool{ //nolint:gochecknoglobals - New: func() any { - return &flowController{ - controller: make(chan chan<- Dirent), +func scan(buffer []byte, path *byte, request *pathRequest, ignoreSymlinks bool) error { + defer request.ready.Unlock() + + fh, err := open(path) + if err != nil { + return err + } + + defer syscall.Close(fh) + + s := scanner{ + buffer: buffer, + fh: fh, + } + + var last *pathRequest + + for s.Next() { + name, mode, inode := s.Get() + if inode == 0 || name == "" || ignoreSymlinks && mode&fs.ModeSymlink != 0 { + continue } - }, + + last = addDirent(request, last, name, mode, inode) + } + + return nil } -func walkDirectory(ctx context.Context, dirent Dirent, - flowControl *flowController, request func(*FilePath) []Dirent, sendDirs bool) { - children := request(dirent.Path) - childControllers := make([]*flowController, len(children)) +func open(path *byte) (int, error) { + const atFDCWD = -0x64 - for n, child := range children { - if child.IsDir() { - childControllers[n] = newController() + dfd := atFDCWD - go walkDirectory(ctx, child, childControllers[n], request, sendDirs) - } + ifh, _, err := syscall.Syscall6( + syscall.SYS_OPENAT, + uintptr(dfd), + uintptr(unsafe.Pointer(path)), + uintptr(syscall.O_RDONLY), + uintptr(0), 0, 0) + if err != 0 { + return 0, err } - control := flowControl.GetControl() + return int(ifh), nil +} - if sendDirs { - sendEntry(ctx, dirent, control) +func addDirent(request, last *pathRequest, name string, + mode fs.FileMode, inode uint64) *pathRequest { + d := &pathRequest{ + Dirent: Dirent{ + Path: FilePath{ + parent: &request.Path, + name: name, + depth: request.Path.depth + 1, + }, + Type: mode, + Inode: inode, + }, } - for n, childController := range childControllers { - if childController == nil { - sendEntry(ctx, children[n], control) - } else { - childController.PassControl(control) - } + if mode.IsDir() { + d.ready.Lock() + } + + return insertDirent(request, last, d) +} + +func insertDirent(request, last, d *pathRequest) *pathRequest { + if last == nil { + return addFirst(request, d) + } else if last.Path.name < d.Path.name { + return insertAtEnd(last, d) } - flowControl.EndControl() + insertIntoList(request, last, d) + + return last +} + +func addFirst(request, d *pathRequest) *pathRequest { + d.next = request.next + request.next = d + + return d +} + +func insertAtEnd(last, d *pathRequest) *pathRequest { + d.next = last.next + last.next = d + + return d } -func sendEntry(ctx context.Context, dirent Dirent, direntCh chan<- Dirent) { - select { - case <-ctx.Done(): - return - case direntCh <- dirent: +func insertIntoList(request, last, d *pathRequest) { + for curr := &request.next; curr != &last.next; curr = &(*curr).next { + if d.Path.name < (*curr).Path.name { + d.next = *curr + *curr = d + + return + } } } diff --git a/walk/walk_test.go b/walk/walk_test.go index e1645d45..c9e55809 100644 --- a/walk/walk_test.go +++ b/walk/walk_test.go @@ -60,7 +60,7 @@ func TestWalk(t *testing.T) { walkErrors = append(walkErrors, err) } - Convey("You can output the paths to a file", func() { + FocusConvey("You can output the paths to a file", func() { ok := testOutputToFiles(true, false, walkDir, outDir, cb, expectedPaths) So(ok, ShouldBeTrue) So(len(walkErrors), ShouldEqual, 0)