Skip to content

Commit

Permalink
#30 Simpler and more reliable SymbolList logic
Browse files Browse the repository at this point in the history
+ Minimum fix for version map swallowing exceptions (#365)
+ Fix off-by-one error in submit_tasks_for_range() exception case
  • Loading branch information
qc00 committed May 19, 2023
1 parent 1e03fbf commit a2e625d
Show file tree
Hide file tree
Showing 11 changed files with 616 additions and 228 deletions.
6 changes: 3 additions & 3 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ inline void submit_tasks_for_range(const Inputs& inputs, TaskSubmitter submitter

auto fut_itr = futs.begin();
try {
for(auto input_itr = inputs.cbegin(); input_itr != inputs.cend(); ++input_itr, ++fut_itr) {
auto&& resolved = std::move(*fut_itr).get();
result_handler(*input_itr, std::move(resolved));
for(const auto& input : inputs) {
auto&& resolved = std::move(*(fut_itr++)).get();
result_handler(input, std::move(resolved));
}
} catch(...) {
folly::collectAll(fut_itr, futs.end()).wait();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/failure_simulation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace arcticdb {
std::shared_ptr<StorageFailureSimulator> StorageFailureSimulator::instance(){
std::call_once(StorageFailureSimulator::init_flag_, &StorageFailureSimulator::init);
std::call_once(StorageFailureSimulator::init_flag_, &StorageFailureSimulator::reset);
return StorageFailureSimulator::instance_;
}

Expand Down
156 changes: 122 additions & 34 deletions cpp/arcticdb/storage/failure_simulation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,143 @@
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/util/random.h>
#include <arcticdb/util/constructors.hpp>
#include <arcticdb/util/preconditions.hpp>
#include <arcticdb/util/variant.hpp>
#include <folly/Function.h>

namespace arcticdb {

#ifdef _WIN32
#undef DELETE
#endif

enum class FailureType : int {
WRITE = 0,
READ
READ,
ITERATE,
DELETE,
};

static const char* failure_names[] = {
"WRITE",
"READ"
"READ",
"ITERATE",
"DELETE",
};

struct FailureCategory {
double prob_;
bool thrown_;
/** Function holder with a description. */
struct FailureAction {
using FunctionWrapper = folly::Function<void(FailureType)>;
using Description = std::variant<const char*, std::string>;

Description description_;
FunctionWrapper::SharedProxy proxy_;

FailureAction(Description description, FunctionWrapper::SharedProxy proxy) :
description_(std::move(description)), proxy_(std::move(proxy)) {}

explicit FailureCategory(double prob) : prob_(prob), thrown_(false) {}
template<typename Func>
FailureAction(Description description, Func func):
FailureAction(std::move(description), FunctionWrapper{func}.asSharedProxy()) {}

inline void operator()(FailureType type) const {
proxy_(type);
}
};

inline std::ostream& operator<<(std::ostream& out, const FailureAction& action) {
util::variant_match(action.description_,
[&out](const char* desc) { out << desc ;},
[&out](const std::string& desc) { out << desc ;});
return out;
}

namespace action_factories { // To allow `using namespace`
static inline FailureAction no_op("no_op", [](FailureType){});

/** Raises the given exception with the given probability. */
template<class Exception = StorageException>
static inline FailureAction fault(double probability = 1.0) {
util::check_arg(probability >= 0, "Bad probability: {}", probability);

static FailureAction do_fault("raise", [](FailureType failure_type) {
throw Exception(fmt::format("Simulating {} storage failure", failure_type));
});

if (probability >= 1.0) {
return do_fault;
} else {
return {fmt::format("fault({})", probability),
[prob=probability](FailureType failure_type) {
thread_local std::once_flag flag;
std::call_once(flag, [seed = uint64_t(&failure_type)]() { init_random(seed); });
if (random_probability() < prob) {
do_fault(failure_type);
}
}};
}
}
}

/** Independent state for each FailureType. Thread-safe except for the c'tors. */
class FailureTypeState {
public:
using ActionSequence = std::vector<FailureAction>;
static_assert(std::is_copy_assignable_v<ActionSequence>);

private:
friend class StorageFailureSimulator;

const ActionSequence sequence_;
std::atomic<size_t> cursor_ {0}; // Index into sequence

public:
FailureTypeState(ActionSequence sequence) :
sequence_(sequence.empty() ? ActionSequence{action_factories::no_op} : std::move(sequence)) {}

const ActionSequence::value_type& pick_action() {
if (cursor_ < sequence_.size()) {
if (auto local = cursor_.fetch_add(1); local < sequence_.size()) {
return sequence_[local];
}
}
return sequence_.back();
}
};

class StorageFailureSimulator {
public:
using ParamActionSequence = FailureTypeState::ActionSequence;
/**
* Easy-to-copy parameters that can be used to configure this class. Useful in parameterized tests.
* The string is a sequence of "action indicators" selecting the action to perform for each call.
* After this sequence is exhausted, the last action is used for all subsequent calls.
*/
using Params = std::unordered_map<FailureType, ParamActionSequence>;

static std::shared_ptr<StorageFailureSimulator> instance();
static std::shared_ptr<StorageFailureSimulator> instance_;
static std::once_flag init_flag_;
static void init(){
/* Clears/re-initializes the simulator. All action sequences, custom actions and states will be cleared. */
static void reset() {
instance_ = std::make_shared<StorageFailureSimulator>();
}
static void destroy_instance(){instance_.reset();}

StorageFailureSimulator() :
configured_(false) {
}
StorageFailureSimulator() : configured_(false) {}

void configure(const arcticdb::proto::storage::VersionStoreConfig::StorageFailureSimulator& cfg) {
log::storage().info("Initializing storage failure simulator");
configured_ = true;
categories_.insert(std::make_pair(FailureType::WRITE, FailureCategory{cfg.write_failure_prob()}));
categories_.insert(std::make_pair(FailureType::READ, FailureCategory{cfg.read_failure_prob()}));
configure({{FailureType::WRITE, {action_factories::fault(cfg.write_failure_prob())}},
{FailureType::READ, {action_factories::fault(cfg.read_failure_prob())}}});
}

FailureCategory& find(FailureType failure_type) {
auto item = categories_.find(failure_type);
if(item == categories_.end())
util::raise_rte("Unknown failure type {}", failure_type);

return item->second;
void configure(const Params& params) {
log::storage().info("Initializing storage failure simulator");
for (const auto& [type, sequence]: params) {
// Due to the atomic in FailureTypeState, it cannot be moved, so has to be constructed in-place:
categories_.emplace(std::piecewise_construct,
std::forward_as_tuple(type),
std::forward_as_tuple(sequence));
}
configured_ = true;
}

bool configured() const {
Expand All @@ -66,22 +156,20 @@ class StorageFailureSimulator {
ARCTICDB_NO_MOVE_OR_COPY(StorageFailureSimulator)

void go(FailureType failure_type) {
if (ARCTICDB_LIKELY(!configured_)) return;
util::check(configured_, "Attempted failure simulation in unconfigured class");
thread_local std::once_flag flag;
std::atomic<uint64_t> counter{42};
std::call_once(flag, [&counter]() { init_random(counter++); });
auto& category = find(failure_type);
category.thrown_ = true;
throw std::runtime_error(fmt::format("Simulating storage failure {}", failure_type));
}

bool was_thrown(FailureType failure_type) {
auto category = find(failure_type);
return category.thrown_;
if (auto itr = categories_.find(failure_type); itr != categories_.end()) {
auto& state = itr->second;
auto& action = state.pick_action();
action(failure_type);
}
}

private:
std::unordered_map<FailureType, FailureCategory> categories_;
static std::shared_ptr<StorageFailureSimulator> instance_;
static std::once_flag init_flag_;

std::unordered_map<FailureType, FailureTypeState> categories_;
bool configured_;
};

Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/util/error_code.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
ERROR_CODE(2003, E_INCOMPATIBLE_INDEX) \
ERROR_CODE(2004, E_WRONG_SHAPE) \
ERROR_CODE(3000, E_NO_SUCH_VERSION) \
ERROR_CODE(3010, E_UNREADABLE_SYMBOL_LIST) \
ERROR_CODE(4000, E_DESCRIPTOR_MISMATCH) \
ERROR_CODE(4001, E_COLUMN_DOESNT_EXIST) \
ERROR_CODE(4002, E_UNSUPPORTED_COLUMN_TYPE) \
Expand Down
10 changes: 8 additions & 2 deletions cpp/arcticdb/util/storage_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ struct OnExit {
func_(std::move(func)) {}

~OnExit() {
if(!released_)
func_();
if(!released_) {
// Must not throw in destructor to avoid crashes
try {
func_();
} catch (const std::exception& e) {
log::lock().error("Exception in OnExit: {}", e.what());
}
}
}

void release() {
Expand Down
Loading

0 comments on commit a2e625d

Please sign in to comment.