Skip to content

Commit

Permalink
err-move-to-virt-dir: do not run filesystem health check; path errors
Browse files Browse the repository at this point in the history
* add 'err-move-to-virt-dir' - type and helpers
  - do not count it as an io-error; don't run FSHC
* add `IsPathErr` and `CheckMvToVirtDir`
* with minor refactoring

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 23, 2024
1 parent a086eb2 commit ac38c98
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 23 deletions.
3 changes: 3 additions & 0 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,9 @@ func (t *target) getObject(w http.ResponseWriter, r *http.Request, dpq *dpq, bck
t.statsT.IncErr(stats.ErrGetCount)
if goi.isIOErr {
t.statsT.IncErr(stats.IOErrGetCount)
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
nlog.Warningln("io-error [", err, "]", goi.lom.String())
}
}

// handle right here, return nil
Expand Down
5 changes: 4 additions & 1 deletion ais/tgtfshc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package ais

import (
"fmt"

"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
Expand Down Expand Up @@ -47,7 +49,8 @@ func (t *target) FSHC(err error, mi *fs.Mountpath, fqn string) {
return
}

nlog.Errorf("%s: waking up FSHC to check %s, err: %v", t, mi, err)
warn := fmt.Sprintf("%s: waking up FSHC to check %s, err: %v", t, mi, err)
nlog.ErrorDepth(1, warn)

//
// counting I/O errors on a per mountpath
Expand Down
6 changes: 5 additions & 1 deletion ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ func (t *target) GetCold(ctx context.Context, lom *core.LOM, owt cmn.OWT) (ecode
if owt != cmn.OwtGetPrefetchLock {
lom.Unlock(true)
}
nlog.Infoln(t.String()+":", "failed to GET remote", lom.Cname()+":", err, ecode)
if cmn.IsErrFailedTo(err) {
nlog.Warningln(err)
} else {
nlog.Warningln("failed to GET remote", lom.Cname(), "[", err, ecode, "]")
}
return ecode, err
}

Expand Down
14 changes: 8 additions & 6 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ func (poi *putOI) putObject() (ecode int, err error) {
rerr:
if poi.owt == cmn.OwtPut && poi.restful && !poi.t2t {
poi.t.statsT.IncErr(stats.ErrPutCount)
if err != cmn.ErrSkip && !poi.remoteErr && err != io.ErrUnexpectedEOF && !cos.IsRetriableConnErr(err) {
if err != cmn.ErrSkip && !poi.remoteErr && err != io.ErrUnexpectedEOF &&
!cos.IsRetriableConnErr(err) && !cos.IsErrMvToVirtDir(err) {
poi.t.statsT.IncErr(stats.IOErrPutCount)
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
nlog.Warningln("io-error [", err, "]", poi.loghdr())
}
}
}
return ecode, err
Expand Down Expand Up @@ -346,9 +350,7 @@ func (poi *putOI) fini() (ecode int, err error) {
// do nothing: lom is already wlocked
case cmn.OwtGetPrefetchLock:
if !lom.TryLock(true) {
if cmn.Rom.FastV(4, cos.SmoduleAIS) {
nlog.Warningln(poi.loghdr(), "is busy")
}
nlog.Warningln(poi.loghdr(), "is busy")
return 0, cmn.ErrSkip // e.g. prefetch can skip it and keep on going
}
defer lom.Unlock(true)
Expand Down Expand Up @@ -676,7 +678,7 @@ do:
goi.lom.Unlock(true)
goi.unlocked = true
if !cos.IsNotExist(res.Err, res.ErrCode) {
nlog.Infoln(ftcg+"(read)", goi.lom.Cname(), res.Err, res.ErrCode)
nlog.Infoln(ftcg, "(read)", goi.lom.Cname(), res.Err, res.ErrCode)
}
return res.ErrCode, res.Err
}
Expand Down Expand Up @@ -742,7 +744,7 @@ func (goi *getOI) _coldPut(res *core.GetReaderResult) (int, error) {

if err != nil {
lom.Unlock(true)
nlog.Infoln(ftcg+"(put)", lom.Cname(), err)
nlog.Infoln(ftcg, "(put)", lom.Cname(), err)
return code, err
}

Expand Down
45 changes: 45 additions & 0 deletions cmn/cos/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"context"
"errors"
"fmt"
iofs "io/fs"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"sync"
ratomic "sync/atomic"
"syscall"
Expand All @@ -32,6 +34,9 @@ type (
cnt int64
mu sync.Mutex
}
ErrMvToVirtDir struct {
dst string
}
)

var (
Expand Down Expand Up @@ -150,6 +155,13 @@ func IsErrSyscallTimeout(err error) bool {
return ok && syscallErr.Timeout()
}

func IsPathErr(err error) (ok bool) {
if pathErr := (*iofs.PathError)(nil); errors.As(err, &pathErr) {
ok = true
}
return
}

// likely out of socket descriptors
func IsErrConnectionNotAvail(err error) (yes bool) {
return errors.Is(err, syscall.EADDRNOTAVAIL)
Expand Down Expand Up @@ -210,3 +222,36 @@ func IsErrClientURLTimeout(err error) bool {
uerr := Err2ClientURLErr(err)
return uerr != nil && uerr.Timeout()
}

//
// ErrMvToVirtDir
// NOTE [design tradeoff] keeping objects under (e.g.) their respective sha256, etc.
//

func CheckMvToVirtDir(err error, dst string) error {
if IsErrMvToVirtDir(err) {
return err
}
if os.IsExist(err) {
if finfo, errN := os.Stat(dst); errN == nil && finfo.IsDir() {
return &ErrMvToVirtDir{dst}
}
}
return err
}

func IsErrMvToVirtDir(err error) bool {
_, ok := err.(*ErrMvToVirtDir)
return ok
}

func (e *ErrMvToVirtDir) Error() string {
var (
b = filepath.Base(e.dst)
d string
)
if l, lb := len(e.dst), len(b); lb > 1 && l > lb+8 {
d = filepath.Base(e.dst[0 : l-lb])
}
return fmt.Sprintf("destination '../%s/%s' exists and is a virtual directory", d, b)
}
4 changes: 4 additions & 0 deletions cmn/cos/err_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var ioErrs = [...]error{
func IsIOError(err error) bool {
debug.Assert(err != nil)

if IsErrMvToVirtDir(err) {
return false
}

// via os.NewSyscallError(), with a prior check !os.IsNotExist()
if e, ok := err.(*os.SyscallError); ok {
nlog.Infoln("by syscall-error", e)
Expand Down
11 changes: 2 additions & 9 deletions cmn/cos/ioutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,11 @@ func Rename(src, dst string) (err error) {
return nil
}
if !os.IsNotExist(err) {
if os.IsExist(err) {
if finfo, errN := os.Stat(dst); errN == nil && finfo.IsDir() {
// [design tradeoff] keeping objects under (e.g.) their respective sha256
// would eliminate this one, in part
return fmt.Errorf("move destination '../%s' already exists (and is a virtual directory)", filepath.Base(dst))
}
}
return err
return CheckMvToVirtDir(err, dst)
}
// create and retry (slow path)
err = CreateDir(filepath.Dir(dst))
if err == nil {
if err == nil || os.IsExist(err) /*race*/ {
err = os.Rename(src, dst)
}
return err
Expand Down
5 changes: 5 additions & 0 deletions cmn/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (e *ErrFailedTo) Error() string {

func (e *ErrFailedTo) Unwrap() (err error) { return e.err }

func IsErrFailedTo(err error) bool {
_, ok := err.(*ErrFailedTo)
return ok
}

// ErrStreamTerminated

func NewErrStreamTerminated(stream string, err error, reason, detail string) *ErrStreamTerminated {
Expand Down
5 changes: 4 additions & 1 deletion core/lfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (lom *LOM) _cf(fqn string) (fh *os.File, err error) {
return fh, nil
}
if !os.IsNotExist(err) {
// TODO: cos.CheckMvToVirtDir(err, fqn)
T.FSHC(err, lom.Mountpath(), "")
return nil, err
}
Expand Down Expand Up @@ -145,7 +146,9 @@ func (lom *LOM) RenameFinalize(wfqn string) error {
return &errBdir{cname: lom.Cname(), err: err}
}
if err := lom.RenameToMain(wfqn); err != nil {
T.FSHC(err, lom.Mountpath(), wfqn)
if !cos.IsErrMvToVirtDir(err) {
T.FSHC(err, lom.Mountpath(), wfqn)
}
return cmn.NewErrFailedTo(T, "finalize", lom.Cname(), err)
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions core/lom.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,12 @@ func (lom *LOM) FromFS() error {
size, atimefs, _, err := lom.Fstat(true /*get-atime*/)
if err != nil {
if !os.IsNotExist(err) {
if cos.IsPathErr(err) && strings.Contains(err.Error(), "not a directory") {
// e.g. err "stat .../aaa/111: not a directory" when there's existing ".../aaa" object
err := fmt.Errorf("%w (path error)", err)
nlog.Errorln(err)
return err
}
err = os.NewSyscallError("stat", err)
T.FSHC(err, lom.Mountpath(), lom.FQN)
}
Expand Down
4 changes: 2 additions & 2 deletions xact/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ func (xctn *Base) Finish() {
case err == nil:
nlog.Infoln(xctn.String(), "finished")
case aborted:
nlog.Warningln(xctn.String(), "aborted:", err.Error(), info)
nlog.Warningln(xctn.String(), "aborted:", err, info)
default:
nlog.Infoln("Warning:", xctn.String(), "finished w/err:", err.Error())
nlog.Warningln(xctn.String(), "finished w/err:", err)
}
}

Expand Down
6 changes: 3 additions & 3 deletions xact/xs/prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ outer:
nlog.Warningln(xname, "::", xblob.String(), "[", msg.String(), err, "]")
default:
if xblob.Size() >= cos.GiB/2 || cmn.Rom.FastV(4, cos.SmoduleXs) {
var s string
if n := int(pebl.num()); n > 0 {
s = " (num-pending " + strconv.Itoa(n) + ")"
nlog.Infoln(xname, "::", xblob.String(), "( num-pending", strconv.Itoa(n), ")")
} else {
nlog.Infoln(xname, "::", xblob.String())
}
nlog.Infoln(xname, "::", xblob.String(), s)
}
}
}
Expand Down

0 comments on commit ac38c98

Please sign in to comment.