Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/1336: on s3 and azure add delimiter in iterate_type when prefix is empty to avoid KeyType::LOG_COMPACTED when KeyType::LOG is queried #1511

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions cpp/arcticdb/storage/azure/azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,32 +229,33 @@ void do_remove_impl(Composite<VariantKey>&& ks,
}
}

auto default_prefix_handler() {
return [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor& key_descriptor, KeyType) {
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
};
std::string prefix_handler(const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor& key_descriptor, KeyType) {
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
}

template<class KeyBucketizer, class PrefixHandler>
template<class KeyBucketizer>
void do_iterate_type_impl(KeyType key_type,
const IterateTypeVisitor& visitor,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
PrefixHandler&& prefix_handler = default_prefix_handler(),
muhammadhamzasajjad marked this conversation as resolved.
Show resolved Hide resolved
const std::string& prefix = std::string{}) {
ARCTICDB_SAMPLE(AzureStorageIterateType, 0)
auto key_type_dir = key_type_folder(root_folder, key_type);
const auto path_to_key_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);
// if prefix is empty, add / to avoid matching both log and logc when key_type_dir is {root_folder}/log
if (prefix.empty()) {
key_type_dir += "/";
}

KeyDescriptor key_descriptor(prefix,
is_ref_key_class(key_type) ? IndexDescriptor::UNKNOWN : IndexDescriptor::TIMESTAMP, FormatType::TOKENIZED);
auto key_prefix = prefix_handler(prefix, key_type_dir, key_descriptor, key_type);
const auto root_folder_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);

try {
for (auto page = azure_client.list_blobs(key_prefix); page.HasPage(); page.MoveToNextPage()) {
for (const auto& blob : page.Blobs) {
auto key = blob.Name.substr(root_folder_size);
auto key = blob.Name.substr(path_to_key_size);
ARCTICDB_TRACE(log::version(), "Got object_list: {}, key: {}", blob.Name, key);
auto k = variant_key_from_bytes(
reinterpret_cast<uint8_t*>(key.data()),
Expand Down Expand Up @@ -316,11 +317,7 @@ void AzureStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
}

void AzureStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) {
auto prefix_handler = [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor key_descriptor, KeyType) {
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
};

detail::do_iterate_type_impl(key_type, visitor, root_folder_, *azure_client_, FlatBucketizer{}, std::move(prefix_handler), prefix);
detail::do_iterate_type_impl(key_type, visitor, root_folder_, *azure_client_, FlatBucketizer{}, prefix);
}

bool AzureStorage::do_key_exists(const VariantKey& key) {
Expand Down
8 changes: 6 additions & 2 deletions cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ namespace s3 {
) {
ARCTICDB_SAMPLE(S3StorageIterateType, 0)
auto key_type_dir = key_type_folder(root_folder, key_type);
const auto path_to_key_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);
// if prefix is empty, add / to avoid matching both log and logc when key_type_dir is {root_folder}/log
if (prefix.empty()) {
key_type_dir += "/";
}

// Generally we get the key descriptor from the AtomKey, but in the case of iterating version journals
// where we want to have a narrower prefix, we can use the info that it's a version journal and derive
Expand All @@ -304,11 +309,10 @@ namespace s3 {
if (list_objects_result.is_success()) {
auto& output = list_objects_result.get_output();

const auto root_folder_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Received object list");

for (auto& s3_object_name: output.s3_object_names) {
auto key = s3_object_name.substr(root_folder_size);
auto key = s3_object_name.substr(path_to_key_size);
ARCTICDB_TRACE(log::version(), "Got object_list: {}, key: {}", s3_object_name, key);
auto k = variant_key_from_bytes(
reinterpret_cast<uint8_t *>(key.data()),
Expand Down
39 changes: 25 additions & 14 deletions cpp/arcticdb/storage/test/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

namespace arcticdb::storage {

inline VariantKey get_test_key(std::string name) {
inline VariantKey get_test_key(std::string name, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto builder = arcticdb::atom_key_builder();
return builder.build<arcticdb::entity::KeyType::TABLE_DATA>(name);
return builder.build(name, key_type);
}

inline Segment get_test_segment() {
Expand All @@ -25,44 +25,55 @@ inline Segment get_test_segment() {
return encode_dispatch(std::move(segment_in_memory), codec_opts, arcticdb::EncodingVersion::V2);
}

inline void write_in_store(Storage &store, std::string symbol) {
auto variant_key = get_test_key(symbol);
inline void write_in_store(Storage &store, std::string symbol, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto variant_key = get_test_key(symbol, key_type);
store.write(KeySegmentPair(std::move(variant_key), get_test_segment()));
}

inline void update_in_store(Storage &store, std::string symbol) {
auto variant_key = get_test_key(symbol);
inline void update_in_store(Storage &store, std::string symbol, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto variant_key = get_test_key(symbol, key_type);
store.update(KeySegmentPair(std::move(variant_key), get_test_segment()), arcticdb::storage::UpdateOpts{});
}

inline bool exists_in_store(Storage &store, std::string symbol) {
auto variant_key = get_test_key(symbol);
inline bool exists_in_store(Storage &store, std::string symbol, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto variant_key = get_test_key(symbol, key_type);
return store.key_exists(variant_key);
}

inline std::string read_in_store(Storage &store, std::string symbol) {
auto variant_key = get_test_key(symbol);
inline std::string read_in_store(Storage &store, std::string symbol, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto variant_key = get_test_key(symbol, key_type);
auto opts = ReadKeyOpts{};
auto result = store.read(std::move(variant_key), opts);
return std::get<StringId>(result.atom_key().id());
}

inline void remove_in_store(Storage &store, std::vector<std::string> symbols) {
inline void remove_in_store(Storage &store, std::vector<std::string> symbols, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto to_remove = std::vector<VariantKey>();
for (auto &symbol: symbols) {
to_remove.emplace_back(get_test_key(symbol));
to_remove.emplace_back(get_test_key(symbol, key_type));
}
auto opts = RemoveOpts();
store.remove(Composite(std::move(to_remove)), opts);
}

inline std::set<std::string> list_in_store(Storage &store) {
inline std::set<std::string> list_in_store(Storage &store, entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto keys = std::set<std::string>();
store.iterate_type(KeyType::TABLE_DATA, [&keys](VariantKey &&key) {
store.iterate_type(key_type, [&keys](VariantKey &&key) {
auto atom_key = std::get<AtomKey>(key);
keys.emplace(std::get<StringId>(atom_key.id()));
});
return keys;
}

inline std::set<std::string> populate_store(Storage &store, std::string_view symbol_prefix, int start, int end,
entity::KeyType key_type = entity::KeyType::TABLE_DATA) {
auto symbols = std::set<std::string>();
for (int i = start; i < end; ++i) {
auto symbol = fmt::format("{}_{}", symbol_prefix, i);
write_in_store(store, symbol, key_type);
symbols.emplace(symbol);
}
return symbols;
}

}
10 changes: 10 additions & 0 deletions cpp/arcticdb/storage/test/test_azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,13 @@ TEST_F(AzureMockStorageFixture, test_list) {

ASSERT_EQ(list_in_store(store), symbols);
}

TEST_F(AzureMockStorageFixture, test_matching_key_type_prefix_list) {
auto log_symbols = populate_store(store, "symbol_log", 0, 5, entity::KeyType::LOG);
ASSERT_EQ(list_in_store(store, entity::KeyType::LOG), log_symbols);

auto log_compacted_symbols = populate_store(store, "symbol_logc", 0, 5, entity::KeyType::LOG_COMPACTED);
ASSERT_EQ(list_in_store(store, entity::KeyType::LOG_COMPACTED), log_compacted_symbols);

ASSERT_EQ(list_in_store(store, entity::KeyType::LOG), log_symbols);
}
10 changes: 10 additions & 0 deletions cpp/arcticdb/storage/test/test_s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,13 @@ TEST_F(S3StorageFixture, test_list) {

ASSERT_THROW(list_in_store(store), UnexpectedS3ErrorException);
}

TEST_F(S3StorageFixture, test_matching_key_type_prefix_list) {
auto log_symbols = populate_store(store, "symbol_log", 0, 5, entity::KeyType::LOG);
ASSERT_EQ(list_in_store(store, entity::KeyType::LOG), log_symbols);

auto log_compacted_symbols = populate_store(store, "symbol_logc", 0, 5, entity::KeyType::LOG_COMPACTED);
ASSERT_EQ(list_in_store(store, entity::KeyType::LOG_COMPACTED), log_compacted_symbols);

ASSERT_EQ(list_in_store(store, entity::KeyType::LOG), log_symbols);
}
Loading