Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

Commit

Permalink
rgw/sfs: honor retry_raced_bucket_write mechanism
Browse files Browse the repository at this point in the history
Updating bucket's metadata concurrently by two or more threads is allowed in radosgw.
There is a retry mechanism: retry_raced_bucket_write(), that expects the bucket references to fetch the latest data from the persistent store.
rgw/sfs driver didn't implement try_refresh_info() in its bucket class definition; this could cause two references to the same bucket to potentially lead to partial metadata updates.

Fixes: https://github.com/aquarist-labs/s3gw/issues/637
Signed-off-by: Giuseppe Baccini <[email protected]>
  • Loading branch information
Giuseppe Baccini committed Nov 17, 2023
1 parent 0e62a28 commit 06ac4ab
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 63 deletions.
89 changes: 65 additions & 24 deletions src/rgw/driver/sfs/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ namespace rgw::sal {

SFSBucket::SFSBucket(SFStore* _store, sfs::BucketRef _bucket)
: StoreBucket(_bucket->get_info()), store(_store), bucket(_bucket) {
update_views();
}

void SFSBucket::update_views() {
get_info() = bucket->get_info();
set_attrs(bucket->get_attrs());

auto it = attrs.find(RGW_ATTR_ACL);
Expand All @@ -56,6 +61,47 @@ SFSBucket::SFSBucket(SFStore* _store, sfs::BucketRef _bucket)
}
}

int SFSBucket::try_metadata_update(
const std::function<int(sfs::sqlite::DBOPBucketInfo& current_state)>&
apply_delta
) {
auto current_state = sfs::sqlite::DBOPBucketInfo(get_info(), get_attrs());
auto db_conn = get_store().db_conn;
int res =
db_conn->transact([&](rgw::sal::sfs::sqlite::StorageRef storage) -> int {
auto db_state = sfs::get_meta_buckets(db_conn)->get_bucket(
bucket->get_bucket_id(), storage
);
if (!db_state) {
// this is an error, the operation should not be retried
return -ERR_NO_SUCH_BUCKET;
}
if (current_state != *db_state) {
// the operation will be retried
return -ECANCELED;
}
// current_state == db_state, we apply the delta and we store the bucket.
int res = apply_delta(current_state);
if (res) {
return res;
}
sfs::get_meta_buckets(db_conn)->store_bucket(current_state, storage);
return 0;
});

if (!res) {
store->_refresh_buckets_safe();
auto bref = store->get_bucket_ref(get_name());
if (!bref) {
// if we go here, the state of this bucket is inconsistent
return -ERR_NO_SUCH_ENTITY;
}
bucket = bref;
update_views();
}
return res;
}

void SFSBucket::write_meta(const DoutPrefixProvider* /*dpp*/) {
// TODO
}
Expand Down Expand Up @@ -404,28 +450,12 @@ int SFSBucket::
int SFSBucket::merge_and_store_attrs(
const DoutPrefixProvider* /*dpp*/, Attrs& new_attrs, optional_yield /*y*/
) {
for (auto& it : new_attrs) {
attrs[it.first] = it.second;

if (it.first == RGW_ATTR_ACL) {
auto lval = it.second.cbegin();
acls.decode(lval);
}
}
for (auto& it : attrs) {
auto it_find = new_attrs.find(it.first);
if (it_find == new_attrs.end()) {
// this is an old attr that is not defined in the new_attrs
// delete it
attrs.erase(it.first);
}
}

sfs::get_meta_buckets(get_store().db_conn)
->store_bucket(sfs::sqlite::DBOPBucketInfo(get_info(), get_attrs()));

store->_refresh_buckets_safe();
return 0;
return try_metadata_update(
[&](sfs::sqlite::DBOPBucketInfo& current_state) -> int {
current_state.battrs = new_attrs;
return 0;
}
);
}

// try_resolve_mp_from_oid tries to parse an integer id from oid to
Expand Down Expand Up @@ -529,11 +559,22 @@ int SFSBucket::abort_multiparts(
return sfs::SFSMultipartUploadV2::abort_multiparts(dpp, store, this);
}

/**
* @brief Refresh this bucket object with the state obtained from the store.
Indeed it can happen that the state of this bucket is obsolete due to
concurrent threads updating metadata using their own SFSBucket instance.
*/
int SFSBucket::try_refresh_info(
const DoutPrefixProvider* dpp, ceph::real_time* /*pmtime*/
) {
lsfs_warn(dpp) << __func__ << ": TODO" << dendl;
return -ENOTSUP;
auto bref = store->get_bucket_ref(get_name());
if (!bref) {
lsfs_dout(dpp, 0) << fmt::format("no such bucket! {}", get_name()) << dendl;
return -ERR_NO_SUCH_BUCKET;
}
bucket = bref;
update_views();
return 0;
}

int SFSBucket::read_usage(
Expand Down
19 changes: 19 additions & 0 deletions src/rgw/driver/sfs/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ class SFSBucket : public StoreBucket {
SFSBucket(SFStore* _store, sfs::BucketRef _bucket);
SFSBucket& operator=(const SFSBucket&) = delete;

/**
* This method updates the in-memory views of this object fetching
* from this.bucket.
* This method should be called every time this.bucket is updated
* from the backing storage.
*
* Views updated:
*
* - get_info()
* - get_attrs()
* - acls
*/
void update_views();

int try_metadata_update(
const std::function<int(sfs::sqlite::DBOPBucketInfo& current_state)>&
apply_delta
);

virtual std::unique_ptr<Bucket> clone() override {
return std::unique_ptr<Bucket>(new SFSBucket{*this});
}
Expand Down
10 changes: 10 additions & 0 deletions src/rgw/driver/sfs/sqlite/buckets/bucket_definitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ struct DBOPBucketInfo {

DBOPBucketInfo(const DBOPBucketInfo& other) = default;
DBOPBucketInfo& operator=(const DBOPBucketInfo& other) = default;

bool operator==(const DBOPBucketInfo& other) const {
if (this->deleted != other.deleted) return false;
if (this->battrs != other.battrs) return false;
ceph::bufferlist this_binfo_bl;
this->binfo.encode(this_binfo_bl);
ceph::bufferlist other_binfo_bl;
other.binfo.encode(other_binfo_bl);
return this_binfo_bl == other_binfo_bl;
}
};

using DBDeletedObjectItem =
Expand Down
7 changes: 7 additions & 0 deletions src/rgw/driver/sfs/sqlite/dbconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class DBConn {
std::vector<sqlite3*> sqlite_conns;
const std::thread::id main_thread;
mutable std::shared_mutex storage_pool_mutex;
mutable std::mutex transactional_block_mutex;

public:
CephContext* const cct;
Expand All @@ -289,6 +290,12 @@ class DBConn {
return dbapi::sqlite::database(get_storage()->filename());
}

int transact(const std::function<int(StorageRef)>& block) {
auto storage = get_storage();
std::unique_lock<std::mutex> lock(transactional_block_mutex);
return block(storage);
}

static std::string getDBPath(CephContext* cct) {
auto rgw_sfs_path = cct->_conf.get_val<std::string>("rgw_sfs_data_path");
auto db_path =
Expand Down
91 changes: 65 additions & 26 deletions src/rgw/driver/sfs/sqlite/sqlite_buckets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ std::vector<DBOPBucketInfo> get_rgw_buckets(
}

std::optional<DBOPBucketInfo> SQLiteBuckets::get_bucket(
const std::string& bucket_id
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
auto bucket = storage->get_pointer<DBBucket>(bucket_id);
std::optional<DBOPBucketInfo> ret_value;
if (bucket) {
Expand All @@ -50,9 +52,11 @@ std::optional<DBOPBucketInfo> SQLiteBuckets::get_bucket(
}

std::optional<std::pair<std::string, std::string>> SQLiteBuckets::get_owner(
const std::string& bucket_id
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
const auto rows = storage->select(
columns(&DBUser::user_id, &DBUser::display_name),
inner_join<DBUser>(on(is_equal(&DBBucket::owner_id, &DBUser::user_id))),
Expand All @@ -66,62 +70,92 @@ std::optional<std::pair<std::string, std::string>> SQLiteBuckets::get_owner(
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_bucket_by_name(
const std::string& bucket_name
const std::string& bucket_name, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
return get_rgw_buckets(
storage->get_all<DBBucket>(where(c(&DBBucket::bucket_name) = bucket_name))
);
}

void SQLiteBuckets::store_bucket(const DBOPBucketInfo& bucket) const {
auto storage = conn->get_storage();
void SQLiteBuckets::store_bucket(
const DBOPBucketInfo& bucket, rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
auto db_bucket = get_db_bucket(bucket);
storage->replace(db_bucket);
}

void SQLiteBuckets::remove_bucket(const std::string& bucket_name) const {
auto storage = conn->get_storage();
void SQLiteBuckets::remove_bucket(
const std::string& bucket_name, rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
storage->remove<DBBucket>(bucket_name);
}

std::vector<std::string> SQLiteBuckets::get_bucket_ids() const {
auto storage = conn->get_storage();
std::vector<std::string> SQLiteBuckets::get_bucket_ids(
rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
return storage->select(&DBBucket::bucket_name);
}

std::vector<std::string> SQLiteBuckets::get_bucket_ids(
const std::string& user_id
const std::string& user_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
return storage->select(
&DBBucket::bucket_name, where(c(&DBBucket::owner_id) = user_id)
);
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets() const {
auto storage = conn->get_storage();
std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets(
rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
return get_rgw_buckets(storage->get_all<DBBucket>());
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets(
const std::string& user_id
const std::string& user_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
return get_rgw_buckets(
storage->get_all<DBBucket>(where(c(&DBBucket::owner_id) = user_id))
);
}

std::vector<std::string> SQLiteBuckets::get_deleted_buckets_ids() const {
auto storage = conn->get_storage();
std::vector<std::string> SQLiteBuckets::get_deleted_buckets_ids(
rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
return storage->select(
&DBBucket::bucket_id, where(c(&DBBucket::deleted) = true)
);
}

bool SQLiteBuckets::bucket_empty(const std::string& bucket_id) const {
auto storage = conn->get_storage();
bool SQLiteBuckets::bucket_empty(
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
if (!storage) {
storage = conn->get_storage();
}
auto num_ids = storage->count<DBVersionedObject>(
inner_join<DBObject>(
on(is_equal(&DBObject::uuid, &DBVersionedObject::object_id))
Expand All @@ -136,9 +170,12 @@ bool SQLiteBuckets::bucket_empty(const std::string& bucket_id) const {
}

std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
const std::string& bucket_id, uint max_objects, bool& bucket_deleted
const std::string& bucket_id, uint max_objects, bool& bucket_deleted,
rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
RetrySQLiteBusy<DBDeletedObjectItems> retry([&]() {
bucket_deleted = false;
DBDeletedObjectItems ret_values;
Expand Down Expand Up @@ -186,9 +223,11 @@ std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
}

const std::optional<SQLiteBuckets::Stats> SQLiteBuckets::get_stats(
const std::string& bucket_id
const std::string& bucket_id, rgw::sal::sfs::sqlite::StorageRef storage
) const {
auto storage = conn->get_storage();
if (!storage) {
storage = conn->get_storage();
}
std::optional<SQLiteBuckets::Stats> stats;

auto res = storage->select(
Expand Down
Loading

0 comments on commit 06ac4ab

Please sign in to comment.