Skip to content

Commit

Permalink
copy bucket, multi-object: fix in-cluster source to remote dest (major)
Browse files Browse the repository at this point in the history
* the (major) part of it is a "no-op" novelty - comments inline
* xxhash is now an officially _trusted_ checksum

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 22, 2024
1 parent a3375d1 commit 5a4203c
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 42 deletions.
42 changes: 37 additions & 5 deletions ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,7 +1406,7 @@ func (coi *coi) do(t *target, dm *bundle.DataMover, lom *core.LOM) (size int64,
}

// DP == nil: use default (no-op transform) if source bucket is remote
if coi.DP == nil && lom.Bck().IsRemote() {
if coi.DP == nil && (lom.Bck().IsRemote() || coi.BckTo.IsRemote()) {
coi.DP = &core.LDP{}
}

Expand All @@ -1423,22 +1423,54 @@ func (coi *coi) do(t *target, dm *bundle.DataMover, lom *core.LOM) (size int64,
// dst is this target
// 2, 3: with transformation and without
dst := core.AllocLOM(coi.ObjnameTo)
defer core.FreeLOM(dst)

if err := dst.InitBck(coi.BckTo.Bucket()); err != nil {
core.FreeLOM(dst)
return 0, err
}
// check for no-op
if coi.isNOP(lom, dst, dm) {
if cmn.Rom.FastV(5, cos.SmoduleAIS) {
nlog.Infoln("copying", lom.String(), "=>", dst.String(), "is a no-op: destination exists and is identical")
}
return
}
// do
if coi.DP != nil {
var ecode int
size, ecode, err = coi._reader(t, dm, lom, dst)
debug.Assert(ecode != http.StatusNotFound || cos.IsNotExist(err, 0), err, ecode)
} else {
// fast path (Copy2FQN)
size, err = coi._regular(t, lom, dst)
}
core.FreeLOM(dst)

return size, err
}

func (coi *coi) isNOP(lom, dst *core.LOM, dm *bundle.DataMover) bool {
if coi.LatestVer || coi.Sync {
return false
}
owt := coi.OWT
if dm != nil {
owt = dm.OWT()
}
if owt != cmn.OwtCopy {
return false
}
if err := lom.Load(true, false); err != nil {
return false
}
if err := dst.Load(true, false); err != nil {
return false
}
if lom.CheckEq(dst) != nil {
return false
}
res := dst.CheckRemoteMD(false /*locked*/, false, nil /*origReq*/)
return res.Eq
}

func (coi *coi) _dryRun(lom *core.LOM, objnameTo string) (size int64, err error) {
if coi.DP == nil {
uname := coi.BckTo.MakeUname(objnameTo)
Expand Down Expand Up @@ -1692,7 +1724,7 @@ func (coi *coi) put(t *target, sargs *sendArgs) error {
}

func (coi *coi) stats(size int64, err error) {
if err == nil && coi.Xact != nil {
if err == nil && coi.Xact != nil && size > 0 {
coi.Xact.ObjsAdd(1, size)
}
}
Expand Down
52 changes: 21 additions & 31 deletions cmn/objattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,34 +248,24 @@ func (oa *ObjAttrs) FromHeader(hdr http.Header) (cksum *cos.Cksum) {
// Note that mismatch in any given checksum type immediately renders inequality and return
// from the function.
//
// TODO -- FIXME: ETag:
// - when comparing ETags, either disregard double quotes OR fix it elsewhere to always contain those
// - when comparing ETag and MD5, don't even try to compare if ETag is multi-part
// TODO: count == 1 with matching checksum being xxhash - must be configurable :NOTE
func (oa *ObjAttrs) CheckEq(rem cos.OAH) error {
var (
ver string
md5 string
etag string
cksumVal string
count int
sameEtag bool
ver string
md5 string
etag string
cksumVal string
count int // number of matches
sameEtag bool
sameCksum bool
)
// size check
if remSize := rem.Lsize(true); oa.Size != 0 && remSize != 0 && oa.Size != remSize {
return fmt.Errorf("size %d != %d remote", oa.Size, remSize)
}

// version check
if remVer, v := rem.Version(true), oa.Version(); remVer != "" && v != "" {
if v != remVer {
return fmt.Errorf("version %s != %s remote", oa.Version(), remVer)
}
ver = v
// NOTE: ais own version is, currently, a nonunique sequence number - not counting
if remSrc, _ := rem.GetCustomKey(SourceObjMD); remSrc != apc.AIS {
count++
}
} else if remMeta, ok := rem.GetCustomKey(VersionObjMD); ok && remMeta != "" {
// Cloud version check (NOTE: ais own version is currently a non-unique sequence number)
if remMeta, ok := rem.GetCustomKey(VersionObjMD); ok && remMeta != "" {
if locMeta, ok := oa.GetCustomKey(VersionObjMD); ok && locMeta != "" {
if remMeta != locMeta {
return fmt.Errorf("version-md %s != %s remote", locMeta, remMeta)
Expand All @@ -291,10 +281,17 @@ func (oa *ObjAttrs) CheckEq(rem cos.OAH) error {
return fmt.Errorf("%s checksum %s != %s remote", a.Ty(), b, a)
}
cksumVal = a.Val()
//
// NOTE: including xxhash in trusted checksums
//
switch a.Ty() {
case cos.ChecksumXXHash, cos.ChecksumSHA256, cos.ChecksumSHA512:
sameCksum = true
}
count++
}

// custom MD: ETag check (NOTE: compare ignoring double quotes)
// custom MD: ETag check (ignoring enclosing quotes)
if remMeta, ok := rem.GetCustomKey(ETag); ok && remMeta != "" {
if locMeta, ok := oa.GetCustomKey(ETag); ok && locMeta != "" {
if !_eqIgnoreQuotes(remMeta, locMeta) {
Expand Down Expand Up @@ -339,16 +336,9 @@ func (oa *ObjAttrs) CheckEq(rem cos.OAH) error {
case count >= 2: // e.g., equal because they have the same (version & md5, where version != md5)
return nil
case count == 0:
default:
// same version or ETag from the same (remote) backend
// (arguably, must be configurable)
if remMeta, ok := rem.GetCustomKey(SourceObjMD); ok && remMeta != "" {
if locMeta, ok := oa.GetCustomKey(SourceObjMD); ok && locMeta != "" {
if (ver != "" || etag != "") && remMeta == locMeta {
return nil
}
}
}
case sameEtag || sameCksum:
// making exception for the same (trusted) checksum or ETag
return nil
}

return fmt.Errorf("local (%v) vs remote (%v)", oa.GetCustomMD(), rem.GetCustomMD())
Expand Down
2 changes: 1 addition & 1 deletion core/ldp.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ remote:
// - [Sync] when Sync option is used (via bucket config and/or `sync` argument) caller MUST take wlock or rlock
// - [MAY] delete remotely-deleted (non-existing) object and increment associated stats counter
//
// Returns NotFound also after having removed local replica (the Sync option)
// also returns `NotFound` after removing local replica - the Sync option
func (lom *LOM) CheckRemoteMD(locked, sync bool, origReq *http.Request) (res CRMD) {
bck := lom.Bck()
if !bck.HasVersioningMD() {
Expand Down
6 changes: 1 addition & 5 deletions xact/xs/wi_lso.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,8 @@ func (wi *walkInfo) ls(lom *core.LOM, status uint16) (e *cmn.LsoEnt) {
return
}

// NOTE: slow path
// NOTE: slow path if lom.Bck is remote
func checkRemoteMD(lom *core.LOM, e *cmn.LsoEnt) {
if !lom.Bucket().HasVersioningMD() {
debug.Assert(false, lom.Cname())
return
}
res := lom.CheckRemoteMD(false /*locked*/, false /*sync*/, nil /*origReq*/)
switch {
case res.Eq:
Expand Down

0 comments on commit 5a4203c

Please sign in to comment.