From 490d60ce221fb9e38b52c259230f122b90a51bb2 Mon Sep 17 00:00:00 2001 From: William Dealtry Date: Wed, 22 May 2024 09:47:28 +0100 Subject: [PATCH] hashing fixes --- cpp/arcticdb/async/async_store.hpp | 2 +- cpp/arcticdb/async/tasks.hpp | 2 +- cpp/arcticdb/codec/codec.cpp | 34 +++++++++++++++---- cpp/arcticdb/codec/codec.hpp | 2 +- cpp/arcticdb/codec/protobuf_mappings.cpp | 2 +- cpp/arcticdb/codec/segment.cpp | 6 ++-- cpp/arcticdb/codec/segment.hpp | 2 +- cpp/arcticdb/codec/segment_header.hpp | 4 +++ .../codec/test/test_segment_header.cpp | 2 +- cpp/arcticdb/pipeline/write_frame.cpp | 5 ++- .../version/local_versioned_engine.cpp | 6 ++-- cpp/arcticdb/version/version_tasks.hpp | 3 +- 12 files changed, 49 insertions(+), 21 deletions(-) diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index 6f2d6c94136..272cb60d0f7 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -338,7 +338,7 @@ std::vector> batch_key_exists( } - folly::Future async_write( +folly::Future async_write( folly::Future> &&input_fut, const std::shared_ptr &de_dup_map) override { using KeyOptSegment = std::pair>; diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 21d07e6e5cf..28be5b34aa5 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -88,7 +88,7 @@ struct EncodeAtomTask : BaseTask { storage::KeySegmentPair encode() { ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_); auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_); - auto content_hash = hash_segment_header(enc_seg.header()); + auto content_hash = get_segment_hash(enc_seg); AtomKey k = partial_key_.build_key(creation_ts_, content_hash); return {std::move(k), std::move(enc_seg)}; diff --git a/cpp/arcticdb/codec/codec.cpp b/cpp/arcticdb/codec/codec.cpp index 2386a84d681..a25bd8a7aec 100644 --- a/cpp/arcticdb/codec/codec.cpp +++ b/cpp/arcticdb/codec/codec.cpp @@ -521,7 +521,8 @@ SegmentInMemory decode_segment(Segment&& s) { return res; } -static void hash_field(const EncodedFieldImpl &field, HashAccum &accum) { +template +void hash_field(const EncodedFieldType &field, HashAccum &accum) { auto &n = field.ndarray(); for(auto i = 0; i < n.shapes_size(); ++i) { auto v = n.shapes(i).hash(); @@ -534,13 +535,32 @@ static void hash_field(const EncodedFieldImpl &field, HashAccum &accum) { } } -HashedValue hash_segment_header(const SegmentHeader &hdr) { +HashedValue get_segment_hash(Segment& seg) { HashAccum accum; - if (hdr.has_metadata_field()) { - hash_field(hdr.metadata_field(), accum); - } - if(hdr.has_string_pool_field()) { - hash_field(hdr.string_pool_field(), accum); + const auto& hdr = seg.header(); + if(hdr.encoding_version() == EncodingVersion::V1) { + // The hashes are part of the encoded fields protobuf in the v1 header, which is not + // ideal but needs to be maintained for consistency + const auto& proto = seg.generate_header_proto(); + if (proto.has_metadata_field()) { + hash_field(proto.metadata_field(), accum); + } + for (int i = 0; i < proto.fields_size(); ++i) { + hash_field(proto.fields(i), accum); + } + if (hdr.has_string_pool_field()) { + hash_field(proto.string_pool_field(), accum); + } + } else { + const auto& header_fields = hdr.header_fields(); + for(auto i = 0UL; i < header_fields.size(); ++i) { + hash_field(header_fields.at(i), accum); + } + + const auto& body_fields = hdr.body_fields(); + for(auto i = 0UL; i < body_fields.size(); ++i) { + hash_field(body_fields.at(i), accum); + } } return accum.digest(); diff --git a/cpp/arcticdb/codec/codec.hpp b/cpp/arcticdb/codec/codec.hpp index e157bb99dcf..d8ff0c6db71 100644 --- a/cpp/arcticdb/codec/codec.hpp +++ b/cpp/arcticdb/codec/codec.hpp @@ -82,7 +82,7 @@ std::pair, StreamDescriptor> decode_metadat std::optional decode_timeseries_descriptor( Segment& segment); -HashedValue hash_segment_header(const SegmentHeader &hdr); +HashedValue get_segment_hash(Segment& seg); SegmentDescriptorImpl read_segment_descriptor(const uint8_t*& data); diff --git a/cpp/arcticdb/codec/protobuf_mappings.cpp b/cpp/arcticdb/codec/protobuf_mappings.cpp index 2175fd55876..047def54a01 100644 --- a/cpp/arcticdb/codec/protobuf_mappings.cpp +++ b/cpp/arcticdb/codec/protobuf_mappings.cpp @@ -79,7 +79,7 @@ void encoded_field_from_proto(const arcticdb::proto::encoding::EncodedField& inp void copy_encoded_field_to_proto(const EncodedFieldImpl& input, arcticdb::proto::encoding::EncodedField& output) { util::check(input.has_ndarray(), "Only ndarray fields supported for v1 encoding"); - ARCTICDB_DEBUG(log::codec(), "Copying field to proto: {}", input); + ARCTICDB_TRACE(log::codec(), "Copying field to proto: {}", input); const auto& input_ndarray = input.ndarray(); auto* output_ndarray = output.mutable_ndarray(); output_ndarray->set_items_count(input_ndarray.items_count()); diff --git a/cpp/arcticdb/codec/segment.cpp b/cpp/arcticdb/codec/segment.cpp index 22c753fd5a8..299b4bc7a32 100644 --- a/cpp/arcticdb/codec/segment.cpp +++ b/cpp/arcticdb/codec/segment.cpp @@ -16,7 +16,7 @@ namespace arcticdb { -arcticdb::proto::encoding::SegmentHeader generate_proto_header(const SegmentHeader& header, const StreamDescriptor& desc) { +arcticdb::proto::encoding::SegmentHeader generate_v1_header(const SegmentHeader& header, const StreamDescriptor& desc) { arcticdb::proto::encoding::SegmentHeader segment_header; if(header.has_metadata_field()) copy_encoded_field_to_proto(header.metadata_field(), *segment_header.mutable_metadata_field()); @@ -298,7 +298,7 @@ std::tuple> Segment::serialize_v1_head } std::tuple> Segment::serialize_header_v1() { - auto proto_header = generate_proto_header(header_, desc_); + auto proto_header = generate_v1_header(header_, desc_); const auto hdr_size = proto_header.ByteSizeLong(); auto total_hdr_size = hdr_size + FIXED_HEADER_SIZE; @@ -333,7 +333,7 @@ std::tuple> Segment::serialize_header( const arcticdb::proto::encoding::SegmentHeader& Segment::generate_header_proto() { if(!proto_) - proto_ = std::make_unique(generate_proto_header(header_, desc_)); + proto_ = std::make_unique(generate_v1_header(header_, desc_)); return *proto_; } diff --git a/cpp/arcticdb/codec/segment.hpp b/cpp/arcticdb/codec/segment.hpp index 79be55a69e8..10174b826cb 100644 --- a/cpp/arcticdb/codec/segment.hpp +++ b/cpp/arcticdb/codec/segment.hpp @@ -44,7 +44,7 @@ struct SegmentHeaderProtoWrapper { SegmentHeaderProtoWrapper decode_protobuf_header(const uint8_t* data, size_t header_bytes_size); -arcticdb::proto::encoding::SegmentHeader generate_proto_header(const SegmentHeader& header, const StreamDescriptor& desc); +arcticdb::proto::encoding::SegmentHeader generate_v1_header(const SegmentHeader& header, const StreamDescriptor& desc); static constexpr uint16_t HEADER_VERSION_V1 = 1; static constexpr uint16_t HEADER_VERSION_V2 = 2; diff --git a/cpp/arcticdb/codec/segment_header.hpp b/cpp/arcticdb/codec/segment_header.hpp index 2a7caf0294f..49393d03780 100644 --- a/cpp/arcticdb/codec/segment_header.hpp +++ b/cpp/arcticdb/codec/segment_header.hpp @@ -232,6 +232,10 @@ class SegmentHeader { return body_fields_; } + [[nodiscard]] const EncodedFieldCollection& header_fields() const { + return header_fields_; + } + void set_body_fields(EncodedFieldCollection&& body_fields) { body_fields_ = std::move(body_fields); body_fields_.regenerate_offsets(); diff --git a/cpp/arcticdb/codec/test/test_segment_header.cpp b/cpp/arcticdb/codec/test/test_segment_header.cpp index df0310bd0d6..02c658fb602 100644 --- a/cpp/arcticdb/codec/test/test_segment_header.cpp +++ b/cpp/arcticdb/codec/test/test_segment_header.cpp @@ -65,7 +65,7 @@ TEST(SegmentHeader, SerializeUnserializeV1) { auto desc = stream_descriptor(StreamId{"thing"}, stream::RowCountIndex{}, {scalar_field(DataType::UINT8, "ints")}); - auto proto = generate_proto_header(header, desc); + auto proto = generate_v1_header(header, desc); const auto header_size = proto.ByteSizeLong(); std::vector vec(header_size); auto read_header = decode_protobuf_header(vec.data(), header_size); diff --git a/cpp/arcticdb/pipeline/write_frame.cpp b/cpp/arcticdb/pipeline/write_frame.cpp index e19567af628..7be9aec1afd 100644 --- a/cpp/arcticdb/pipeline/write_frame.cpp +++ b/cpp/arcticdb/pipeline/write_frame.cpp @@ -85,7 +85,7 @@ std::tuple WriteToS auto rows_to_write = slice_.row_range.second - slice_.row_range.first; if (frame_->desc.index().field_count() > 0) { - util::check(static_cast(frame_->index_tensor), "Got null index tensor in write_slices"); + util::check(static_cast(frame_->index_tensor), "Got null index tensor in WriteToSegmentTask"); auto opt_error = aggregator_set_data( frame_->desc.fields(0).type(), frame_->index_tensor.value(), @@ -186,6 +186,9 @@ folly::Future> slice_and_write( if(slices.empty()) return folly::makeFuture(std::vector{}); + for(const auto& slice : slices) + log::version().info("{}", slice); + ARCTICDB_SUBSAMPLE_DEFAULT(SliceAndWrite) return write_slices(frame, std::move(slices), slicing, std::move(key), sink, de_dup_map, sparsify_floats); } diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 9b1a47b5918..ec57e5fe85c 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -607,7 +607,7 @@ VersionedItem LocalVersionedEngine::write_versioned_metadata_internal( stream_id, VersionQuery{}); if(update_info.previous_index_key_.has_value()) { - ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {}", stream_id); + ARCTICDB_DEBUG(log::version(), "write_versioned_metadata for stream_id: {}", stream_id); auto index_key = UpdateMetadataTask{store(), update_info, std::move(user_meta)}(); write_version_and_prune_previous(prune_previous_versions, index_key, update_info.previous_index_key_); return VersionedItem{ std::move(index_key) }; @@ -734,10 +734,10 @@ VersionedItem LocalVersionedEngine::write_individual_segment( ) { ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0) - ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe"); + ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write individual segment"); auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{}); auto version_id = get_next_version_from_key(maybe_prev); - ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id); + ARCTICDB_DEBUG(log::version(), "write individual segment for stream_id: {} , version_id = {}", stream_id, version_id); auto index = index_type_from_descriptor(segment.descriptor()); auto range = get_range_from_segment(index, segment); diff --git a/cpp/arcticdb/version/version_tasks.hpp b/cpp/arcticdb/version/version_tasks.hpp index e0b6f303239..346617fb8b9 100644 --- a/cpp/arcticdb/version/version_tasks.hpp +++ b/cpp/arcticdb/version/version_tasks.hpp @@ -43,7 +43,8 @@ struct UpdateMetadataTask : async::BaseTask { } }; -struct AsyncRestoreVersionTask : async::BaseTask { +struct +AsyncRestoreVersionTask : async::BaseTask { const std::shared_ptr store_; std::shared_ptr version_map_; const StreamId stream_id_;