Skip to content

Commit

Permalink
Further WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Nov 21, 2024
1 parent b0081b6 commit ce315e2
Show file tree
Hide file tree
Showing 30 changed files with 1,123 additions and 1,129 deletions.
29 changes: 14 additions & 15 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ auto read_and_continue(const VariantKey& key, std::shared_ptr<storage::Library>
return async::submit_io_task(ReadCompressedTask{key, library, opts, std::forward<decltype(c)>(c)})
.via(&async::cpu_executor())
.thenValue([](auto &&result) mutable {
auto&& [key_seg, continuation] = std::forward<decltype(result)>(result);
return continuation(std::move(key_seg));
auto&& [key_seg_fut, continuation] = std::forward<decltype(result)>(result);
return std::move(key_seg_fut).thenValue([continuation=std::move(continuation)] (auto&& key_seg) mutable { return continuation(std::move(key_seg)); });
}
);
}
Expand Down Expand Up @@ -160,7 +160,7 @@ folly::Future<folly::Unit> write_compressed(storage::KeySegmentPair ks) override
}

void write_compressed_sync(storage::KeySegmentPair ks) override {
library_->write(Composite<storage::KeySegmentPair>(std::move(ks)));
library_->write(std::move(ks));
}

folly::Future<entity::VariantKey> update(const entity::VariantKey &key,
Expand Down Expand Up @@ -217,9 +217,8 @@ folly::Future<std::pair<entity::VariantKey, SegmentInMemory>> read(
return read_and_continue(key, library_, opts, DecodeSegmentTask{});
}

std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey &key,
storage::ReadKeyOpts opts) override {
return DecodeSegmentTask{}(read_sync_dispatch(key, library_, opts));
std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey& key) override {
return DecodeSegmentTask{}(read_sync_dispatch(key, library_));
}

folly::Future<storage::KeySegmentPair> read_compressed(
Expand All @@ -228,11 +227,8 @@ folly::Future<storage::KeySegmentPair> read_compressed(
return read_and_continue(key, library_, opts, PassThroughTask{});
}

storage::KeySegmentPair read_compressed_sync(
const entity::VariantKey& key,
storage::ReadKeyOpts opts
) override {
return read_sync_dispatch( key, library_, opts );
storage::KeySegmentPair read_compressed_sync(const entity::VariantKey& key) override {
return read_sync_dispatch( key, library_);
}

folly::Future<std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>>> read_metadata(const entity::VariantKey &key, storage::ReadKeyOpts opts) override {
Expand All @@ -247,7 +243,7 @@ folly::Future<std::tuple<VariantKey, std::optional<google::protobuf::Any>, Strea

folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descriptor(
const entity::VariantKey &key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override {
storage::ReadKeyOpts opts) override {
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{});
}

Expand Down Expand Up @@ -340,7 +336,11 @@ std::vector<folly::Future<pipelines::SegmentAndSlice>> batch_read_uncompressed(
std::move(ranges_and_keys),
[this, columns_to_decode](auto&& ranges_and_key) {
const auto key = ranges_and_key.key_;
return read_and_continue(key, library_, storage::ReadKeyOpts{}, DecodeSliceTask{std::move(ranges_and_key), columns_to_decode});
return read_and_continue(
key,
library_,
storage::ReadKeyOpts{},
DecodeSliceTask{std::forward<decltype(ranges_and_key)>(ranges_and_key), columns_to_decode});
}, async::TaskScheduler::instance()->io_thread_count() * 2);
}

Expand Down Expand Up @@ -377,8 +377,7 @@ folly::Future<SliceAndKey> async_write(
.thenValue([lib=library_](auto &&item) {
auto [key_opt_segment, slice] = std::forward<decltype(item)>(item);
if (key_opt_segment.second)
lib->write(Composite<storage::KeySegmentPair>({VariantKey{key_opt_segment.first},
std::move(*key_opt_segment.second)}));
lib->write({VariantKey{key_opt_segment.first}, std::move(*key_opt_segment.second)});

return SliceAndKey{slice, to_atom(key_opt_segment.first)};
});
Expand Down
19 changes: 0 additions & 19 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,4 @@ namespace arcticdb::async {
decode_into_memory_segment(seg, hdr, segment_in_memory, desc);
return pipelines::SegmentAndSlice(std::move(ranges_and_key_), std::move(segment_in_memory));
}

pipelines::SliceAndKey DecodeSlicesTask::decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const {
auto [seg, sk] = std::move(sk_pair);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(sk.key()));

auto &hdr = seg.header();
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, filter_columns_);
sk.slice_.adjust_columns(descriptor.field_count() - descriptor.index().field_count());

ARCTICDB_TRACE(log::codec(), "Creating segment");
SegmentInMemory res(std::move(descriptor));

decode_into_memory_segment(seg, hdr, res, desc);
sk.set_segment(std::move(res));
return sk;
}

} //namespace arcticdb::async
63 changes: 21 additions & 42 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ struct WriteSegmentTask : BaseTask {
VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
ARCTICDB_SAMPLE(WriteSegmentTask, 0)
auto k = key_seg.variant_key();
lib_->write(Composite<storage::KeySegmentPair>(std::move(key_seg)));
lib_->write(std::move(key_seg));
return k;
}
};
Expand All @@ -198,26 +198,26 @@ struct UpdateSegmentTask : BaseTask {
VariantKey operator()(storage::KeySegmentPair &&key_seg) const {
ARCTICDB_SAMPLE(UpdateSegmentTask, 0)
auto k = key_seg.variant_key();
lib_->update(Composite<storage::KeySegmentPair>(std::move(key_seg)), opts_);
lib_->update(std::move(key_seg), opts_);
return k;
}
};

template <typename Callable>
struct KeySegmentContinuation {
storage::KeySegmentPair key_seg_;
folly::Future<storage::KeySegmentPair> key_seg_;
Callable continuation_;
};

inline folly::Future<storage::KeySegmentPair> read_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
inline folly::Future<storage::KeySegmentPair> read_dispatch(entity::VariantKey&& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](auto&& key) {
return lib->read(key, opts);
});
}

inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
return lib->read_sync(key, opts);
inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib) {
return util::variant_match(variant_key, [&lib](const auto &key) {
return lib->read_sync(key);
});
}

Expand All @@ -244,7 +244,7 @@ struct ReadCompressedTask : BaseTask {

KeySegmentContinuation<ContinuationType> operator()() {
ARCTICDB_SAMPLE(ReadCompressed, 0)
return KeySegmentContinuation<decltype(continuation_)>{read_dispatch(key_, lib_, opts_), std::move(continuation_)};
return KeySegmentContinuation<decltype(continuation_)>{read_dispatch(std::move(key_), lib_, opts_), std::move(continuation_)};
}
};

Expand Down Expand Up @@ -282,11 +282,11 @@ struct CopyCompressedTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(CopyCompressedTask)

VariantKey copy() {
return std::visit([that = this](const auto &source_key) {
auto key_seg = that->lib_->read(source_key);
auto target_key_seg = stream::make_target_key<ClockType>(that->key_type_, that->stream_id_, that->version_id_, source_key, std::move(key_seg.segment()));
return std::visit([this](const auto &source_key) {
auto key_seg = lib_->read_sync(source_key);
auto target_key_seg = stream::make_target_key<ClockType>(key_type_, stream_id_, version_id_, source_key, std::move(key_seg.segment()));
auto return_key = target_key_seg.variant_key();
that->lib_->write(Composite<storage::KeySegmentPair>{std::move(target_key_seg) });
lib_->write(std::move(target_key_seg));
return return_key;
}, source_key_);
}
Expand Down Expand Up @@ -436,30 +436,6 @@ struct DecodeSliceTask : BaseTask {
pipelines::SegmentAndSlice decode_into_slice(storage::KeySegmentPair&& key_segment_pair);
};

struct DecodeSlicesTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeSlicesTask)

std::shared_ptr<std::unordered_set<std::string>> filter_columns_;

explicit DecodeSlicesTask(
const std::shared_ptr<std::unordered_set<std::string>>& filter_columns) :
filter_columns_(filter_columns) {
}

Composite<pipelines::SliceAndKey> operator()(Composite<std::pair<Segment, pipelines::SliceAndKey>> && skp) const {
ARCTICDB_SAMPLE(DecodeSlicesTask, 0)
auto sk_pairs = std::move(skp);
return sk_pairs.transform([this] (auto&& ssp){
auto seg_slice_pair = std::move(ssp);
ARCTICDB_DEBUG(log::version(), "Decoding slice {}", seg_slice_pair.second.key());
return decode_into_slice(std::move(seg_slice_pair));
});
}

private:
pipelines::SliceAndKey decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const;
};

struct SegmentFunctionTask : BaseTask {
stream::StreamSource::ReadContinuation func_;

Expand Down Expand Up @@ -578,14 +554,15 @@ struct WriteCompressedTask : BaseTask {
std::shared_ptr<storage::Library> lib_;

WriteCompressedTask(storage::KeySegmentPair&& key_seg, std::shared_ptr<storage::Library> lib) :
kv_(std::move(kv)), lib_(std::move(lib)) {
kv_(std::move(key_seg)),
lib_(std::move(lib)) {
ARCTICDB_DEBUG(log::storage(), "Creating write compressed task");
}

ARCTICDB_MOVE_ONLY_DEFAULT(WriteCompressedTask)

folly::Future<folly::Unit> write() {
lib_->write(Composite<storage::KeySegmentPair>(std::move(kv_)));
lib_->write(std::move(kv_));
return folly::makeFuture();
}

Expand All @@ -609,7 +586,9 @@ struct WriteCompressedBatchTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(WriteCompressedBatchTask)

folly::Future<folly::Unit> write() {
lib_->write(Composite<storage::KeySegmentPair>(std::move(kvs_)));
for(auto&& kv : kvs_)
lib_->write(std::move(kv));

return folly::makeFuture();
}

Expand All @@ -634,7 +613,7 @@ struct RemoveTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(RemoveTask)

stream::StreamSink::RemoveKeyResultType operator()() {
lib_->remove(Composite<VariantKey>(std::move(key_)), opts_);
lib_->remove(std::move(key_), opts_);
return {};
}
};
Expand All @@ -658,7 +637,7 @@ struct RemoveBatchTask : BaseTask {


std::vector<stream::StreamSink::RemoveKeyResultType> operator()() {
lib_->remove(Composite<VariantKey>(std::move(keys_)), opts_);
lib_->remove(std::span(keys_), opts_);
return {};
}
};
Expand Down
20 changes: 7 additions & 13 deletions cpp/arcticdb/entity/atom_key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ class AtomKeyImpl {

template<class IndexValueType>
AtomKeyImpl(
StreamId id,
VersionId version_id,
timestamp creation_ts,
ContentHash content_hash,
IndexValueType start_index,
IndexValueType end_index,
KeyType key_type)
:
StreamId id,
VersionId version_id,
timestamp creation_ts,
ContentHash content_hash,
IndexValueType start_index,
IndexValueType end_index,
KeyType key_type) :
id_(std::move(id)),
version_id_(version_id),
creation_ts_(creation_ts),
Expand Down Expand Up @@ -59,11 +58,6 @@ class AtomKeyImpl {
const IndexValue &end_index() const { return index_end_; }
IndexRange index_range() const { IndexRange ir = {index_start_, index_end_}; ir.end_closed_ = false; return ir;}

auto change_type(KeyType new_type) {
key_type_ = new_type;
reset_cached();
}

/**
* Useful for caching/replacing the ID with an existing shared instance.
* @param id Will be moved.
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/log/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

#include <spdlog/spdlog.h>


#define DEBUG_BUILD
#ifdef DEBUG_BUILD
#define ARCTICDB_DEBUG(logger, ...) logger.debug(__VA_ARGS__)
#define ARCTICDB_TRACE(logger, ...) logger.trace(__VA_ARGS__)
Expand Down
Loading

0 comments on commit ce315e2

Please sign in to comment.