Skip to content

Commit

Permalink
Merge pull request ceph#223 from irq0/pr/copy-object-reflinks
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcel Lauhoff authored Oct 9, 2023
2 parents 93396c6 + 33fe703 commit 26316b8
Showing 1 changed file with 88 additions and 26 deletions.
114 changes: 88 additions & 26 deletions src/rgw/driver/sfs/object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,55 +296,117 @@ int SFSObject::copy_object(
,
const DoutPrefixProvider* dpp, optional_yield /*y*/
) {
lsfs_dout(dpp, 10) << "source(bucket: " << src_bucket->get_name()
<< ", obj: " << get_name()
<< "), dest(bucket: " << dst_bucket->get_name()
<< ", obj: " << dst_object->get_name() << ")" << dendl;
lsfs_dout(dpp, 10) << fmt::format(
"bucket:{} obj:{} version:{} size:{} -> bucket:{} "
"obj:{} version:{}",
src_bucket->get_name(), get_name(), get_instance(),
get_obj_size(), dst_bucket->get_name(),
dst_object->get_name(), dst_object->get_instance()
)
<< dendl;

refresh_meta();
ceph_assert(objref);
ceph_assert(bucketref);
ceph_assert(dst_object);
ceph_assert(dst_bucket);

auto check_conditional = handle_copy_object_conditionals(
const auto check_conditional = handle_copy_object_conditionals(
dpp, mod_ptr, unmod_ptr, if_match, if_nomatch, objref->get_meta().etag,
objref->get_meta().mtime
);
if (check_conditional != 0) {
return check_conditional;
}

sfs::BucketRef dst_bucket_ref = store->get_bucket_ref(dst_bucket->get_name());
const sfs::BucketRef dst_bucket_ref =
store->get_bucket_ref(dst_bucket->get_name());
ceph_assert(dst_bucket_ref);

std::filesystem::path srcpath =
const std::filesystem::path srcpath =
store->get_data_path() / objref->get_storage_path();

sfs::ObjectRef dstref = dst_bucket_ref->create_version(dst_object->get_key());
const int src_fd = ::open(srcpath.c_str(), O_RDONLY | O_BINARY);
if (src_fd < 0) {
lsfs_dout(dpp, -1)
<< fmt::format(
"unable to open src obj {} file {} for reading: {}",
objref->name, srcpath.string(), cpp_strerror(errno)
)
<< dendl;
return -ERR_INTERNAL_ERROR;
}

const sfs::ObjectRef dstref =
dst_bucket_ref->create_version(dst_object->get_key());
if (!dstref) {
::close(src_fd);
return -ERR_INTERNAL_ERROR;
}
std::filesystem::path dstpath =
const std::filesystem::path dstpath =
store->get_data_path() / dstref->get_storage_path();
std::error_code ec;
std::filesystem::create_directories(dstpath.parent_path(), ec);
if (ec) {
lsfs_dout(dpp, -1)
<< fmt::format(
"failed to create directory hierarchy {} for {}: {}",
dstpath.parent_path().string(), dstref->name, ec.message()
)
<< dendl;
::close(src_fd);
return -ERR_INTERNAL_ERROR;
}
// Open O_CREAT+O_EXCL as dstref is always a new version without a
// file yet
const int dst_fd =
::open(dstpath.c_str(), O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0600);
if (dst_fd < 0) {
lsfs_dout(dpp, -1)
<< fmt::format(
"unable to open dst obj {} file {} for writing: {}",
dstref->name, dstpath.string(), cpp_strerror(errno)
)
<< dendl;
::close(src_fd);
return -ERR_INTERNAL_ERROR;
}

if (std::filesystem::exists(dstpath)) {
// this breaks S3 semantics: as far as we understand, a copy to an existing
// destination object is essentially a put on that object -- meaning, the
// object is clobbered.
lsfs_dout(dpp, 10) << "destination file already exists at '" << dstpath
<< "'" << dendl;
return -EEXIST;
}

lsfs_dout(dpp, 10) << "copying file from '" << srcpath << "' to '" << dstpath
<< "'" << dendl;
std::filesystem::create_directories(dstpath.parent_path());
bool res = std::filesystem::copy_file(srcpath, dstpath);
if (!res) {
lsfs_dout(dpp, 0) << "error copying file from '" << srcpath << "' to '"
<< dstpath << "'" << dendl;
return -EIO;
lsfs_dout(dpp, 10) << fmt::format(
"copying {} fd:{} -> {} fd:{}", srcpath.string(),
src_fd, dstpath.string(), dst_fd
)
<< dendl;

int ret = ::copy_file_range(
src_fd, nullptr, dst_fd, nullptr, objref->get_meta().size, 0
);
if (ret < 0) {
lsfs_dout(dpp, -1) << fmt::format(
"failed to copy file from {} to {}: {}",
srcpath.string(), dstpath.string(),
cpp_strerror(errno)
)
<< dendl;
::close(src_fd);
::close(dst_fd);
return -ERR_INTERNAL_ERROR;
}
ret = ::close(src_fd);
if (ret < 0) {
lsfs_dout(dpp, -1) << fmt::format(
"failed closing src fd:{} fn:{}: {}", src_fd,
srcpath.string(), cpp_strerror(ret)
)
<< dendl;
}
ret = ::close(dst_fd);
if (ret < 0) {
lsfs_dout(dpp, -1) << fmt::format(
"failed closing dst fd:{} fn:{}: {}", dst_fd,
dstpath.string(), cpp_strerror(ret)
)
<< dendl;
}

auto dest_meta = objref->get_meta();
Expand Down

0 comments on commit 26316b8

Please sign in to comment.