Skip to content

Commit

Permalink
Fix Segment use-after-move when replicating to NFS (#1756)
Browse files Browse the repository at this point in the history
## Motivation

(this section copied from previous (closed) attempt -
#1746)

The motivation for the change is to allow `arcticdb-enterprise` to copy
blocks to NFS storages without a use-after-move. I explained this in
man-group/arcticdb-enterprise#139 but to have an
open record:

CopyCompressedInterStoreTask has:

```
                        // Don't bother copying the key segment pair when writing to the final target
                        if (it == std::prev(target_stores_.end())) {
                            (*it)->write_compressed_sync(std::move(key_segment_pair));
                        } else {
                            auto key_segment_pair_copy = key_segment_pair;
                            (*it)->write_compressed_sync(std::move(key_segment_pair_copy));
                        }
```

KeySegmentPair has a shared_ptr to a KeySegmentPair, which we can think
of here as just a `Segment`.

Therefore the old `key_segment_pair_copy` is shallow, the underlying
Segment is the same. But the segment eventually gets passed as an rvalue
reference further down the stack.

In `do_write_impl` we call `put_object` which calls `serialize_header`.

This modifies the segment in place and passes that buffer to the AWS
SDK.

In the `NfsBackedStorage` we have:

```
void NfsBackedStorage::do_write(Composite<KeySegmentPair>&& kvs) {
    auto enc = kvs.transform([] (auto&& key_seg) {
      return KeySegmentPair{encode_object_id(key_seg.variant_key()), std::move(key_seg.segment())};
    });
    s3::detail::do_write_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{});
}
```

where the segment gets moved from.

Subsequent attempts to use the segment (eg copying on to the next store)
then fail.

man-group/arcticdb-enterprise#139 fixed this
issue by cloning the segment, but this approach avoids the (expensive)
clone.

## Logical Change

Copy the `KeySegmentPair`'s pointer to the `Segment` in
`nfs_backed_storage.cpp` rather than moving from the segment.

## Refactor and Testing

### Copy Task

Move the CopyCompressedInterStoreTask down to ArcticDB from
arcticdb-enterprise. Add a test for it on NFS storage. I've verified
that the tests in this commit fail without the refactor in the HEAD~1
commit. The only changes to `CopyCompressedInterstoreTask` from
enterprise are:

- Pass the `KeySegmentPair` by value in to `write_compressed{_sync}`.
The `KeySegmentPair` is cheap to copy (especially considering we are
about to copy an object across storages, likely with a network hop).
- We have adopted the new `set_key` API of `KeySegmentPair`:
```
if (key_to_write_.has_value()) {
                key_segment_pair.set_key(*key_to_write_);
            }
```
- We have namespaced the `ProcessingResult` struct in to the task

### KeySegmentPair

- Replace methods returning mutable lvalue references to keys with a
`set_key` method.
- Remove the `release_segment` method as it dangerously leaves the
`KeySegmentPair` pointing at a `Segment` object that has been moved
from, and it is not actually necessary.

## Follow up work

The non-const `Segment& KeySegmentPair#segment()` API is still dangerous
and error prone. I have a follow up change to remove it, but that API
change affects very many files and will be best raised separately so
that it doesn't block this fix for replication.

A draft PR showing a proposal for that change is here -
#1757 .
  • Loading branch information
poodlewars authored Aug 9, 2024
1 parent ec65b8e commit 498c331
Show file tree
Hide file tree
Showing 22 changed files with 360 additions and 271 deletions.
2 changes: 2 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ set(arcticdb_srcs
# header files
async/async_store.hpp
async/batch_read_args.hpp
async/bit_rate_stats.hpp
async/task_scheduler.hpp
async/tasks.hpp
codec/codec.hpp
Expand Down Expand Up @@ -389,6 +390,7 @@ set(arcticdb_srcs
version/version_utils.hpp
# CPP files
async/async_store.cpp
async/bit_rate_stats.cpp
async/task_scheduler.cpp
async/tasks.cpp
codec/codec.cpp
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ bool is_path_valid(const std::string_view path) const override {
return library_->is_path_valid(path);
}

folly::Future<folly::Unit> write_compressed(storage::KeySegmentPair &&ks) override {
folly::Future<folly::Unit> write_compressed(storage::KeySegmentPair ks) override {
return async::submit_io_task(WriteCompressedTask{std::move(ks), library_});
}

void write_compressed_sync(storage::KeySegmentPair &&ks) override {
void write_compressed_sync(storage::KeySegmentPair ks) override {
library_->write(Composite<storage::KeySegmentPair>(std::move(ks)));
}

Expand Down
50 changes: 50 additions & 0 deletions cpp/arcticdb/async/bit_rate_stats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "bit_rate_stats.hpp"

#include <folly/Likely.h>

#include "log/log.hpp"
#include "util/format_bytes.hpp"
#include "entity/performance_tracing.hpp"

constexpr uint64_t max_bytes{0xFFFFFFFFFF};
constexpr uint64_t max_time_ms{0xFFFFFF};
constexpr arcticdb::entity::timestamp log_frequency_ns{60LL * 1000L * 1000L * 1000L};

namespace arcticdb::async {

BitRateStats::BitRateStats():
last_log_time_ns_(util::SysClock::coarse_nanos_since_epoch())
{}

void BitRateStats::add_stat(std::size_t bytes, double time_ms) {
auto now = util::SysClock::coarse_nanos_since_epoch();
uint64_t stat = data_to_stat(bytes, time_ms);
auto previous_stats = stats_.fetch_add(stat);
auto current_stats = previous_stats + stat;
if (now - last_log_time_ns_ > log_frequency_ns && stats_.compare_exchange_strong(current_stats, 0)) {
last_log_time_ns_ = now;
log_stats(current_stats);
}
}

uint64_t BitRateStats::data_to_stat(std::size_t bytes, double time_ms) const {
if (UNLIKELY(bytes > max_bytes || time_ms > max_time_ms)) {
log::storage().warn("Bit rate stats provided too large to represent, ignoring: {} in {}ms",
format_bytes(bytes),
time_ms);
return 0;
}
uint64_t stat{(bytes << 24) + static_cast<uint64_t>(time_ms)};
return stat;
}

void BitRateStats::log_stats(uint64_t stats) const {
double time_s = static_cast<double>(stats & max_time_ms) / 1000;
double bytes = static_cast<double>(stats >> 24);
double bandwidth = bytes / time_s;
log::storage().info("Byte rate {}/s", format_bytes(bandwidth));
std::string log_msg = "Current BW is " + format_bytes(bandwidth)+"/s";
ARCTICDB_SAMPLE_LOG(log_msg.c_str());
}

} // arcticdb::async
30 changes: 30 additions & 0 deletions cpp/arcticdb/async/bit_rate_stats.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <atomic>
#include <mutex>

#include "arcticdb/util/clock.hpp"
#include "arcticdb/util/constructors.hpp"

namespace arcticdb::async {

class BitRateStats {
public:
BitRateStats();
void add_stat(std::size_t bytes, double time_ms);

ARCTICDB_NO_MOVE_OR_COPY(BitRateStats)
private:
uint64_t data_to_stat(std::size_t bytes, double time_ms) const;
void log_stats(uint64_t stats) const;

// Use an 8 byte atomic for lock free implementation
// Upper 5 bytes represent the number of bytes of data transferred (giving max representable value of 1TB)
// Lower 3 bytes represent the total time in milliseconds (giving max representable value of 4.5 hours)
std::atomic_uint64_t stats_{0};

entity::timestamp last_log_time_ns_;
};

} // arcticdb::async

4 changes: 2 additions & 2 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ namespace arcticdb::async {
}

pipelines::SegmentAndSlice DecodeSliceTask::decode_into_slice(storage::KeySegmentPair&& key_segment_pair) {
auto key = std::move(key_segment_pair.atom_key());
auto seg = std::move(key_segment_pair.release_segment());
auto key = key_segment_pair.atom_key();
auto& seg = key_segment_pair.segment();
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.size(),
key);
Expand Down
131 changes: 105 additions & 26 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <arcticdb/entity/atom_key.hpp>
#include <arcticdb/storage/library.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/storage/store.hpp>
#include <arcticdb/entity/types.hpp>
#include <arcticdb/util/hash.hpp>
#include <arcticdb/stream/stream_utils.hpp>
Expand All @@ -19,6 +20,7 @@
#include <arcticdb/entity/variant_key.hpp>
#include <arcticdb/stream/stream_sink.hpp>
#include <arcticdb/async/base_task.hpp>
#include <arcticdb/async/bit_rate_stats.hpp>
#include <arcticdb/pipeline/frame_slice.hpp>
#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/util/constructors.hpp>
Expand Down Expand Up @@ -248,32 +250,6 @@ struct PassThroughTask : BaseTask {
}
};

struct ReadCompressedSlicesTask : BaseTask {
Composite<pipelines::SliceAndKey> slice_and_keys_;
std::shared_ptr<storage::Library> lib_;

ReadCompressedSlicesTask(Composite<pipelines::SliceAndKey>&& sk, std::shared_ptr<storage::Library> lib)
: slice_and_keys_(std::move(sk)),
lib_(std::move(lib)) {
ARCTICDB_DEBUG(log::storage(), "Creating read compressed slices task for slice and key {}",
slice_and_keys_);
}

ARCTICDB_MOVE_ONLY_DEFAULT(ReadCompressedSlicesTask)

Composite<std::pair<Segment, pipelines::SliceAndKey>> read() {
return slice_and_keys_.transform([that=this](const auto &sk){
ARCTICDB_DEBUG(log::version(), "Reading key {}", sk.key());
return std::make_pair(that->lib_->read(sk.key()).release_segment(), sk);
});
}

Composite<std::pair<Segment, pipelines::SliceAndKey>> operator()() {
ARCTICDB_SAMPLE(ReadCompressed, 0)
return read();
}
};

template <typename ClockType>
struct CopyCompressedTask : BaseTask {
entity::VariantKey source_key_;
Expand Down Expand Up @@ -315,6 +291,109 @@ struct CopyCompressedTask : BaseTask {
}
};

// Used in arcticdb-enterprise, do not remove without checking whether it is still used there
struct CopyCompressedInterStoreTask : async::BaseTask {

using AllOk = std::monostate;
using FailedTargets = std::unordered_set<std::string>;
using ProcessingResult = std::variant<AllOk, FailedTargets>;

CopyCompressedInterStoreTask(entity::VariantKey key_to_read,
std::optional<entity::AtomKey> key_to_write,
bool check_key_exists_on_targets,
bool retry_on_failure,
std::shared_ptr<Store> source_store,
std::vector<std::shared_ptr<Store>> target_stores,
std::shared_ptr<BitRateStats> bit_rate_stats=nullptr)
: key_to_read_(std::move(key_to_read)),
key_to_write_(std::move(key_to_write)),
check_key_exists_on_targets_(check_key_exists_on_targets),
retry_on_failure_(retry_on_failure),
source_store_(std::move(source_store)),
target_stores_(std::move(target_stores)),
bit_rate_stats_(std::move(bit_rate_stats)){
ARCTICDB_DEBUG(log::storage(), "Creating copy compressed inter-store task from key {}: {} -> {}: {}",
variant_key_type(key_to_read_),
variant_key_view(key_to_read_),
key_to_write_.has_value() ? variant_key_type(key_to_write_.value()) : variant_key_type(key_to_read_),
key_to_write_.has_value() ? variant_key_view(key_to_write_.value()) : variant_key_view(key_to_read_));
}

ARCTICDB_MOVE_ONLY_DEFAULT(CopyCompressedInterStoreTask)

ProcessingResult operator()() {
auto res = copy();

if (!res.empty() && retry_on_failure_) {
res = copy();
}

if (!res.empty()) {
return res;
}

return AllOk{};
}

private:
entity::VariantKey key_to_read_;
std::optional<entity::AtomKey> key_to_write_;
bool check_key_exists_on_targets_;
bool retry_on_failure_;
std::shared_ptr<Store> source_store_;
std::vector<std::shared_ptr<Store>> target_stores_;
std::shared_ptr<BitRateStats> bit_rate_stats_;

// Returns an empty set if the copy succeeds, otherwise the set contains the names of the target stores that failed
std::unordered_set<std::string> copy() {
ARCTICDB_SAMPLE(copy, 0)
std::size_t bytes{0};
interval timer;
timer.start();
if (check_key_exists_on_targets_) {
target_stores_.erase(std::remove_if(target_stores_.begin(), target_stores_.end(),
[that=this](const std::shared_ptr<Store>& target_store) {
return target_store->key_exists_sync(that->key_to_read_);
}), target_stores_.end());
}
std::unordered_set<std::string> failed_targets;
if (!target_stores_.empty()) {
storage::KeySegmentPair key_segment_pair;
try {
key_segment_pair = source_store_->read_compressed_sync(key_to_read_, storage::ReadKeyOpts{});
} catch (const storage::KeyNotFoundException& e) {
log::storage().debug("Key {} not found on the source: {}", variant_key_view(key_to_read_), e.what());
return failed_targets;
}
bytes = key_segment_pair.segment().size();
if (key_to_write_.has_value()) {
key_segment_pair.set_key(*key_to_write_);
}

for (auto & target_store : target_stores_) {
try {
target_store->write_compressed_sync(key_segment_pair);
} catch (const storage::DuplicateKeyException& e) {
log::storage().debug("Key {} already exists on the target: {}", variant_key_view(key_to_read_), e.what());
} catch (const storage::KeyNotFoundException& e) {
log::storage().debug("Key {} not found on the source: {}", variant_key_view(key_to_read_), e.what());
} catch (const std::exception& e) {
auto name = target_store->name();
log::storage().error("Failed to write key {} to store {}: {}", variant_key_view(key_to_read_), name, e.what());
failed_targets.insert(name);
}
}
}
timer.end();
auto time_ms = timer.get_results_total() * 1000;
if (bit_rate_stats_) {
bit_rate_stats_->add_stat(bytes, time_ms);
}

return failed_targets;
}
};

struct DecodeSegmentTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeSegmentTask)

Expand Down
Loading

0 comments on commit 498c331

Please sign in to comment.