Skip to content

Commit

Permalink
#447 Mongo Exceptions Normalization (#1411)
Browse files Browse the repository at this point in the history
This normalizes mongo exceptions as in #584. The logic for exception
normalization is now moved to `mongo_storage.cpp` as we also support a
`MockMongoClient` which can mock failures. The Permission exceptions are
also normalized. Previously there were two types of permission
exceptions. One was triggered when a storage API returned a permission
failure, the second was triggered if an unpermitted storage operation
was attempted as per the library `OpenMode`. The two permission have now
been normalized to inherit from the same class.

Furthermore, the exception hierarchy in python has also been changed.
`DuplicateKeyException` inherits from `StorageException` in C++, this
should be reflected on python side too. Why? because if someone wanted
they could catch all the storage exceptions as follows instead of having
to catch `PermissionException`, `DuplicateKeyException` and
`StorageException` separately:

```python
try:
  # an operation that calls storage from python
except StorageException:
  # log storage error
```

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->
  • Loading branch information
muhammadhamzasajjad authored Mar 11, 2024
1 parent 039123a commit fe805da
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 85 deletions.
5 changes: 4 additions & 1 deletion cpp/arcticdb/python/python_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ void register_error_code_ecosystem(py::module& m, py::exception<arcticdb::Arctic
}
});

py::register_exception<storage::DuplicateKeyException>(m, "DuplicateKeyException", storage_exception.ptr());
py::register_exception<PermissionException>(m, "PermissionException", storage_exception.ptr());

py::register_exception<SchemaException>(m, "SchemaException", compat_exception.ptr());
py::register_exception<NormalizationException>(m, "NormalizationException", compat_exception.ptr());
py::register_exception<MissingDataException>(m, "MissingDataException", compat_exception.ptr());
Expand Down Expand Up @@ -321,7 +324,7 @@ PYBIND11_MODULE(arcticdb_ext, m) {
auto storage_submodule = m.def_submodule("storage", "Segment storage implementation apis");
auto no_data_found_exception = py::register_exception<arcticdb::storage::NoDataFoundException>(
storage_submodule, "NoDataFoundException", base_exception.ptr());
arcticdb::storage::apy::register_bindings(storage_submodule, base_exception);
arcticdb::storage::apy::register_bindings(storage_submodule);

arcticdb::stream::register_bindings(m);
arcticdb::toolbox::apy::register_bindings(m);
Expand Down
15 changes: 9 additions & 6 deletions cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class Library {

void write(Composite<KeySegmentPair>&& kvs) {
ARCTICDB_SAMPLE(LibraryWrite, 0)
if (open_mode() < OpenMode::WRITE)
throw PermissionException(library_path_, open_mode(), "write");
if (open_mode() < OpenMode::WRITE) {
throw LibraryPermissionException(library_path_, open_mode(), "write");
}

[[maybe_unused]] const size_t total_size = kvs.fold(
[](size_t s, const KeySegmentPair& seg) { return s + seg.segment().total_segment_size(); },
Expand All @@ -86,8 +87,9 @@ class Library {

void update(Composite<KeySegmentPair>&& kvs, storage::UpdateOpts opts) {
ARCTICDB_SAMPLE(LibraryUpdate, 0)
if (open_mode() < OpenMode::WRITE)
throw PermissionException(library_path_, open_mode(), "update");
if (open_mode() < OpenMode::WRITE) {
throw LibraryPermissionException(library_path_, open_mode(), "update");
}

[[maybe_unused]] const size_t total_size = kvs.fold(
[](size_t s, const KeySegmentPair& seg) { return s + seg.segment().total_segment_size(); },
Expand All @@ -104,8 +106,9 @@ class Library {
}

void remove(Composite<VariantKey>&& ks, storage::RemoveOpts opts) {
if (open_mode() < arcticdb::storage::OpenMode::DELETE)
throw PermissionException(library_path_, open_mode(), "delete");
if (open_mode() < arcticdb::storage::OpenMode::DELETE) {
throw LibraryPermissionException(library_path_, open_mode(), "delete");
}

ARCTICDB_SAMPLE(LibraryRemove, 0)
storages_->remove(std::move(ks), opts);
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/mongo/mongo_client_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ enum class MongoError {
UnknownError = 8,
UserNotFound = 11,
UnAuthorized = 13,
AuthenticationFailed = 18,
ExceededTimeLimit = 50,
WriteConflict = 112,
KeyNotFound = 211,
Expand Down
13 changes: 9 additions & 4 deletions cpp/arcticdb/storage/mongo/mongo_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,20 @@ UpdateResult MockMongoClient::update_segment(
const std::string& collection_name,
storage::KeySegmentPair&& kv,
bool upsert) {
auto key_found = has_key(MongoKey(database_name, collection_name, kv.variant_key()));
auto key = MongoKey(database_name, collection_name, kv.variant_key());

auto failure = has_failure_trigger(key, StorageOperation::WRITE);
if (failure.has_value()) {
throw_if_exception(failure.value());
return {std::nullopt};
}

auto key_found = has_key(key);
if (!upsert && !key_found) {
return {0}; // upsert is false, don't update and return 0 as modified_count
}
if (!write_segment(database_name, collection_name, std::move(kv))) {
return {std::nullopt};
}

mongo_contents.insert_or_assign(key, std::move(kv.segment()));
return {key_found ? 1 : 0};
}

Expand Down
138 changes: 108 additions & 30 deletions cpp/arcticdb/storage/mongo/mongo_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,49 @@
#include <arcticdb/entity/performance_tracing.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/storage/storage.hpp>
#include <mongocxx/exception/operation_exception.hpp>
#include <bsoncxx/json.hpp>

namespace arcticdb::storage::mongo {

std::string MongoStorage::collection_name(KeyType k) {
return (fmt::format("{}{}", prefix_, k));
}

/*
* Mongo error handling notes:
* All the exceptions thrown by mongocxx are derived from mongocxx::exception. https://mongocxx.org/api/mongocxx-3.5.1/classmongocxx_1_1exception.html
* - The exceptions that triggered by read, write, delete operations are derived from mongocxx::operation_exception.
* - mongocxx::operation_exception has an error_code which is returned by the server as documented here: https://www.mongodb.com/docs/manual/reference/error-codes/
* - some relevant error codes returned by the server are defined in MongoError enum.
*/
void raise_mongo_exception(const mongocxx::operation_exception& e) {
auto error_code = e.code().value();

if (error_code == static_cast<int>(MongoError::NoSuchKey) || error_code == static_cast<int>(MongoError::KeyNotFound)) {
throw KeyNotFoundException(fmt::format("Key Not Found Error: MongoError#{}: {}", error_code, e.what()));
}

if (error_code == static_cast<int>(MongoError::UnAuthorized) || error_code == static_cast<int>(MongoError::AuthenticationFailed)) {
raise<ErrorCode::E_PERMISSION>(fmt::format("Permission error: MongoError#{}: {}", error_code, e.what()));
}

raise<ErrorCode::E_UNEXPECTED_MONGO_ERROR>(fmt::format("Unexpected Mongo Error: MongoError#{}: {} {} {}",
error_code, e.code().category().name(), e.code().message(), e.what()));
}

bool is_expected_error_type(int error_code) {
return error_code == static_cast<int>(MongoError::KeyNotFound) || error_code == static_cast<int>(MongoError::NoSuchKey);
}

void raise_if_unexpected_error(const mongocxx::operation_exception& e) {
auto error_code = e.code().value();

if (!is_expected_error_type(error_code)) {
raise_mongo_exception(e);
}
}

void MongoStorage::do_write(Composite<KeySegmentPair>&& kvs) {
namespace fg = folly::gen;
auto fmt_db = [](auto &&kv) { return kv.key_type(); };
Expand All @@ -37,8 +73,12 @@ void MongoStorage::do_write(Composite<KeySegmentPair>&& kvs) {
for (auto &kv : group.values()) {
auto collection = collection_name(kv.key_type());
auto key_view = kv.key_view();
auto success = client_->write_segment(db_, collection, std::move(kv));
util::check(success, "Mongo error while putting key {}", key_view);
try {
auto success = client_->write_segment(db_, collection, std::move(kv));
storage::check<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>(success, "Mongo did not acknowledge write for key {}", key_view);
} catch (const mongocxx::operation_exception& ex) {
raise_mongo_exception(ex);
}
}
});
}
Expand All @@ -53,42 +93,58 @@ void MongoStorage::do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) {
for (auto &kv : group.values()) {
auto collection = collection_name(kv.key_type());
auto key_view = kv.key_view();
auto result = client_->update_segment(db_, collection, std::move(kv), opts.upsert_);
util::check(result.modified_count.has_value(), "Mongo error while updating key {}", key_view);
util::check(opts.upsert_ || result.modified_count.value() > 0, "update called with upsert=false but key does not exist");
try {
auto result = client_->update_segment(db_, collection, std::move(kv), opts.upsert_);
storage::check<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>(result.modified_count.has_value(),
"Mongo did not acknowledge write for key {}",
key_view);
if (!opts.upsert_ && result.modified_count.value() == 0) {
throw storage::KeyNotFoundException(
fmt::format("update called with upsert=false but key does not exist: {}", key_view));
}
} catch (const mongocxx::operation_exception& ex) {
raise_mongo_exception(ex);
}
}
});
}

void MongoStorage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts) {
void MongoStorage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) {
namespace fg = folly::gen;
auto fmt_db = [](auto &&k) { return variant_key_type(k); };
ARCTICDB_SAMPLE(MongoStorageRead, 0)
std::vector<VariantKey> failed_reads;
std::vector<VariantKey> keys_not_found;

(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach([&](auto &&group) {
for (auto &k : group.values()) {
auto collection = collection_name(variant_key_type(k));
try {
auto kv = client_->read_segment(db_, collection, k);
// later we should add the key to failed_reads in this case
if(!kv.has_value()) {
throw std::runtime_error(fmt::format("Missing key in mongo: {}", k));
if (!kv.has_value() || !kv.value().has_segment()) {
keys_not_found.push_back(k);
}
if (kv.value().has_segment())
else {
visitor(k, std::move(kv.value().segment()));
else
failed_reads.push_back(k);
}
catch(const std::exception &ex) {
log::storage().info("Segment read error: {}", ex.what());
throw storage::KeyNotFoundException{Composite<VariantKey>{VariantKey{k}}};
}

} catch (const mongocxx::operation_exception& ex) {
raise_if_unexpected_error(ex);

log::storage().log(
opts.dont_warn_about_missing_key ? spdlog::level::debug : spdlog::level::warn,
"Failed to find segment for key '{}' {}: {}",
variant_key_view(k),
ex.code().value(),
ex.what());
keys_not_found.push_back(k);
}
}
});

if(!failed_reads.empty())
throw KeyNotFoundException(Composite<VariantKey>{std::move(failed_reads)});
if (!keys_not_found.empty()) {
throw KeyNotFoundException(Composite<VariantKey>{std::move(keys_not_found)});
}
}

bool MongoStorage::do_fast_delete() {
Expand All @@ -99,29 +155,51 @@ bool MongoStorage::do_fast_delete() {
return true;
}

void MongoStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
void MongoStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) {
namespace fg = folly::gen;
auto fmt_db = [](auto &&k) { return variant_key_type(k); };
ARCTICDB_SAMPLE(MongoStorageRemove, 0)
Composite<VariantKey> keys_not_found;

(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach([&](auto &&group) {
for (auto &k : group.values()) {
auto collection = collection_name(variant_key_type(k));
auto result = client_->remove_keyvalue(db_, collection, k);
if (result.delete_count.has_value()) {
util::warn(result.delete_count.value() == 1, "Expect to delete a single document with key {}",
k);
} else
throw std::runtime_error(fmt::format("Mongo error deleting data for key {}", k));
try {
auto result = client_->remove_keyvalue(db_, collection, k);
storage::check<ErrorCode::E_MONGO_BULK_OP_NO_REPLY>(result.delete_count.has_value(),
"Mongo did not acknowledge deletion for key {}", k);
util::warn(result.delete_count.value() == 1,
"Expected to delete a single document with key {} deleted {} documents",
k, result.delete_count.value());
if (result.delete_count.value() == 0 && !opts.ignores_missing_key_) {
keys_not_found.push_back(k);
}
} catch (const mongocxx::operation_exception& ex) {
// mongo delete does not throw exception if key not found, it returns 0 as delete count
raise_mongo_exception(ex);
}
}
});
if (!keys_not_found.empty()) {
throw KeyNotFoundException(std::move(keys_not_found));
}
}

void MongoStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) {
auto collection = collection_name(key_type);
auto func = folly::Function<void(entity::VariantKey&&)>(visitor);
ARCTICDB_SAMPLE(MongoStorageItType, 0)
auto keys = client_->list_keys(db_, collection, key_type, prefix);
std::vector<VariantKey> keys;
try {
keys = client_->list_keys(db_, collection, key_type, prefix);
} catch (const mongocxx::operation_exception& ex) {
// We don't raise when key is not found because we want to return an empty list instead of raising.
raise_if_unexpected_error(ex);
log::storage().warn("Failed to iterate key type with key '{}' {}: {}",
key_type,
ex.code().value(),
ex.what());
}
for (auto &key : keys) {
func(std::move(key));
}
Expand All @@ -131,11 +209,11 @@ bool MongoStorage::do_key_exists(const VariantKey& key) {
auto collection = collection_name(variant_key_type(key));
try {
return client_->key_exists(db_, collection, key);
} catch (const mongocxx::operation_exception& ex) {
raise_if_unexpected_error(ex);
}
catch(const std::exception& ex) {
log::storage().error(fmt::format("Key exists error: {}", ex.what()));
throw;
}

return false;
}


Expand Down
5 changes: 1 addition & 4 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ std::shared_ptr<LibraryIndex> create_library_index(const std::string &environmen
return std::make_shared<LibraryIndex>(EnvironmentName{environment_name}, mem_resolver);
}

void register_bindings(py::module& storage, py::exception<arcticdb::ArcticException>& base_exception) {
void register_bindings(py::module& storage) {
storage.attr("CONFIG_LIBRARY_NAME") = py::str(arcticdb::storage::CONFIG_LIBRARY_NAME);

py::enum_<KeyType>(storage, "KeyType")
Expand Down Expand Up @@ -191,9 +191,6 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
return library_index.get_library(path, open_mode, UserAuth{});
})
;

py::register_exception<DuplicateKeyException>(storage, "DuplicateKeyException", base_exception.ptr());
py::register_exception<PermissionException>(storage, "PermissionException", base_exception.ptr());
}

} // namespace arcticdb::storage::apy
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/python_bindings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ namespace arcticdb::storage::apy {

namespace py = pybind11;

void register_bindings(py::module &m, py::exception<arcticdb::ArcticException>& base_exception);
void register_bindings(py::module &m);

} // namespace arcticdb
18 changes: 8 additions & 10 deletions cpp/arcticdb/storage/storage_exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,23 @@

namespace arcticdb::storage {

class PermissionException : public std::exception {
class LibraryPermissionException : public PermissionException {
public:
PermissionException(const LibraryPath &path, OpenMode mode, std::string_view operation) :
lib_path_(path), mode_(mode), msg_(fmt::format(
"{} not permitted. lib={}, mode={}", operation, lib_path_, mode_)
) {}

const char *what() const noexcept override {
return msg_.data();
}
LibraryPermissionException(const LibraryPath &path, OpenMode mode, std::string_view operation) :
PermissionException(fmt::format("{} not permitted. lib={}, mode={}", operation, path, mode)),
lib_path_(path), mode_(mode) {}

const LibraryPath &library_path() const {
return lib_path_;
}

OpenMode mode() const {
return mode_;
}

private:
LibraryPath lib_path_;
OpenMode mode_;
std::string msg_;
};

}
Loading

0 comments on commit fe805da

Please sign in to comment.