Skip to content

Commit

Permalink
hashing fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed May 22, 2024
1 parent ca10968 commit 490d60c
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ std::vector<folly::Future<bool>> batch_key_exists(
}


folly::Future<SliceAndKey> async_write(
folly::Future<SliceAndKey> async_write(
folly::Future<std::tuple<PartialKey, SegmentInMemory, pipelines::FrameSlice>> &&input_fut,
const std::shared_ptr<DeDupMap> &de_dup_map) override {
using KeyOptSegment = std::pair<VariantKey, std::optional<Segment>>;
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ struct EncodeAtomTask : BaseTask {
storage::KeySegmentPair encode() {
ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_);
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
auto content_hash = hash_segment_header(enc_seg.header());
auto content_hash = get_segment_hash(enc_seg);

AtomKey k = partial_key_.build_key(creation_ts_, content_hash);
return {std::move(k), std::move(enc_seg)};
Expand Down
34 changes: 27 additions & 7 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,8 @@ SegmentInMemory decode_segment(Segment&& s) {
return res;
}

static void hash_field(const EncodedFieldImpl &field, HashAccum &accum) {
template <typename EncodedFieldType>
void hash_field(const EncodedFieldType &field, HashAccum &accum) {
auto &n = field.ndarray();
for(auto i = 0; i < n.shapes_size(); ++i) {
auto v = n.shapes(i).hash();
Expand All @@ -534,13 +535,32 @@ static void hash_field(const EncodedFieldImpl &field, HashAccum &accum) {
}
}

HashedValue hash_segment_header(const SegmentHeader &hdr) {
HashedValue get_segment_hash(Segment& seg) {
HashAccum accum;
if (hdr.has_metadata_field()) {
hash_field(hdr.metadata_field(), accum);
}
if(hdr.has_string_pool_field()) {
hash_field(hdr.string_pool_field(), accum);
const auto& hdr = seg.header();
if(hdr.encoding_version() == EncodingVersion::V1) {
// The hashes are part of the encoded fields protobuf in the v1 header, which is not
// ideal but needs to be maintained for consistency
const auto& proto = seg.generate_header_proto();
if (proto.has_metadata_field()) {
hash_field(proto.metadata_field(), accum);
}
for (int i = 0; i < proto.fields_size(); ++i) {
hash_field(proto.fields(i), accum);
}
if (hdr.has_string_pool_field()) {
hash_field(proto.string_pool_field(), accum);
}
} else {
const auto& header_fields = hdr.header_fields();
for(auto i = 0UL; i < header_fields.size(); ++i) {
hash_field(header_fields.at(i), accum);
}

const auto& body_fields = hdr.body_fields();
for(auto i = 0UL; i < body_fields.size(); ++i) {
hash_field(body_fields.at(i), accum);
}
}

return accum.digest();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ std::pair<std::optional<google::protobuf::Any>, StreamDescriptor> decode_metadat
std::optional<TimeseriesDescriptor> decode_timeseries_descriptor(
Segment& segment);

HashedValue hash_segment_header(const SegmentHeader &hdr);
HashedValue get_segment_hash(Segment& seg);

SegmentDescriptorImpl read_segment_descriptor(const uint8_t*& data);

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/protobuf_mappings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void encoded_field_from_proto(const arcticdb::proto::encoding::EncodedField& inp

void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto::encoding::EncodedField& output) {
util::check(input.has_ndarray(), "Only ndarray fields supported for v1 encoding");
ARCTICDB_DEBUG(log::codec(), "Copying field to proto: {}", input);
ARCTICDB_TRACE(log::codec(), "Copying field to proto: {}", input);
const auto& input_ndarray = input.ndarray();
auto* output_ndarray = output.mutable_ndarray();
output_ndarray->set_items_count(input_ndarray.items_count());
Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/codec/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace arcticdb {

arcticdb::proto::encoding::SegmentHeader generate_proto_header(const SegmentHeader& header, const StreamDescriptor& desc) {
arcticdb::proto::encoding::SegmentHeader generate_v1_header(const SegmentHeader& header, const StreamDescriptor& desc) {
arcticdb::proto::encoding::SegmentHeader segment_header;
if(header.has_metadata_field())
copy_encoded_field_to_proto(header.metadata_field(), *segment_header.mutable_metadata_field());
Expand Down Expand Up @@ -298,7 +298,7 @@ std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_v1_head
}

std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_header_v1() {
auto proto_header = generate_proto_header(header_, desc_);
auto proto_header = generate_v1_header(header_, desc_);
const auto hdr_size = proto_header.ByteSizeLong();
auto total_hdr_size = hdr_size + FIXED_HEADER_SIZE;

Expand Down Expand Up @@ -333,7 +333,7 @@ std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_header(

const arcticdb::proto::encoding::SegmentHeader& Segment::generate_header_proto() {
if(!proto_)
proto_ = std::make_unique<arcticdb::proto::encoding::SegmentHeader>(generate_proto_header(header_, desc_));
proto_ = std::make_unique<arcticdb::proto::encoding::SegmentHeader>(generate_v1_header(header_, desc_));

return *proto_;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct SegmentHeaderProtoWrapper {

SegmentHeaderProtoWrapper decode_protobuf_header(const uint8_t* data, size_t header_bytes_size);

arcticdb::proto::encoding::SegmentHeader generate_proto_header(const SegmentHeader& header, const StreamDescriptor& desc);
arcticdb::proto::encoding::SegmentHeader generate_v1_header(const SegmentHeader& header, const StreamDescriptor& desc);

static constexpr uint16_t HEADER_VERSION_V1 = 1;
static constexpr uint16_t HEADER_VERSION_V2 = 2;
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/codec/segment_header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ class SegmentHeader {
return body_fields_;
}

[[nodiscard]] const EncodedFieldCollection& header_fields() const {
return header_fields_;
}

void set_body_fields(EncodedFieldCollection&& body_fields) {
body_fields_ = std::move(body_fields);
body_fields_.regenerate_offsets();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/test/test_segment_header.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ TEST(SegmentHeader, SerializeUnserializeV1) {

auto desc = stream_descriptor(StreamId{"thing"}, stream::RowCountIndex{}, {scalar_field(DataType::UINT8, "ints")});

auto proto = generate_proto_header(header, desc);
auto proto = generate_v1_header(header, desc);
const auto header_size = proto.ByteSizeLong();
std::vector<uint8_t> vec(header_size);
auto read_header = decode_protobuf_header(vec.data(), header_size);
Expand Down
5 changes: 4 additions & 1 deletion cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ std::tuple<stream::StreamSink::PartialKey, SegmentInMemory, FrameSlice> WriteToS

auto rows_to_write = slice_.row_range.second - slice_.row_range.first;
if (frame_->desc.index().field_count() > 0) {
util::check(static_cast<bool>(frame_->index_tensor), "Got null index tensor in write_slices");
util::check(static_cast<bool>(frame_->index_tensor), "Got null index tensor in WriteToSegmentTask");
auto opt_error = aggregator_set_data(
frame_->desc.fields(0).type(),
frame_->index_tensor.value(),
Expand Down Expand Up @@ -186,6 +186,9 @@ folly::Future<std::vector<SliceAndKey>> slice_and_write(
if(slices.empty())
return folly::makeFuture(std::vector<SliceAndKey>{});

for(const auto& slice : slices)
log::version().info("{}", slice);

ARCTICDB_SUBSAMPLE_DEFAULT(SliceAndWrite)
return write_slices(frame, std::move(slices), slicing, std::move(key), sink, de_dup_map, sparsify_floats);
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ VersionedItem LocalVersionedEngine::write_versioned_metadata_internal(
stream_id,
VersionQuery{});
if(update_info.previous_index_key_.has_value()) {
ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {}", stream_id);
ARCTICDB_DEBUG(log::version(), "write_versioned_metadata for stream_id: {}", stream_id);
auto index_key = UpdateMetadataTask{store(), update_info, std::move(user_meta)}();
write_version_and_prune_previous(prune_previous_versions, index_key, update_info.previous_index_key_);
return VersionedItem{ std::move(index_key) };
Expand Down Expand Up @@ -734,10 +734,10 @@ VersionedItem LocalVersionedEngine::write_individual_segment(
) {
ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0)

ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe");
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write individual segment");
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto version_id = get_next_version_from_key(maybe_prev);
ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id);
ARCTICDB_DEBUG(log::version(), "write individual segment for stream_id: {} , version_id = {}", stream_id, version_id);
auto index = index_type_from_descriptor(segment.descriptor());
auto range = get_range_from_segment(index, segment);

Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/version/version_tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct UpdateMetadataTask : async::BaseTask {
}
};

struct AsyncRestoreVersionTask : async::BaseTask {
struct
AsyncRestoreVersionTask : async::BaseTask {
const std::shared_ptr<Store> store_;
std::shared_ptr<VersionMap> version_map_;
const StreamId stream_id_;
Expand Down

0 comments on commit 490d60c

Please sign in to comment.