Skip to content

Commit

Permalink
Merge branch 'master' into maint/vasil.pashov/include-what-you-use
Browse files Browse the repository at this point in the history
  • Loading branch information
vasil-pashov authored Apr 15, 2024
2 parents 4bab4f6 + 74f9932 commit 9b70d1f
Show file tree
Hide file tree
Showing 41 changed files with 809 additions and 320 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/build_steps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ jobs:
with:
submodules: recursive # Just in case a dep has its own third-party deps

# See: https://github.com/actions/runner-images/issues/9680#issuecomment-2051917949
- name: HOTFIX Setup CMake 3.29.2
if: matrix.os == 'windows'
uses: jwlawson/actions-setup-cmake@v2
with:
cmake-version: '3.29.2'

- name: Configure sccache
uses: mozilla-actions/[email protected]
with:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ Please see version conversion dates in the below table:
| 4.1 | Business Source License 1.1 | Nov 1, 2025 |
| 4.2 | Business Source License 1.1 | Nov 12, 2025 |
| 4.3 | Business Source License 1.1 | Feb 7, 2026 |
| 4.4 | Business Source License 1.1 | Apr 5, 2026 |
## Code of Conduct

[Code of Conduct](https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md)
Expand Down
Empty file removed __init__.py
Empty file.
29 changes: 16 additions & 13 deletions cpp/arcticdb/column_store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,23 +366,25 @@ void Column::mark_absent_rows(size_t num_rows) {
}

void Column::default_initialize_rows(size_t start_pos, size_t num_rows, bool ensure_alloc) {
type_.visit_tag([that=this, start_pos, num_rows, ensure_alloc](auto tag) {
using T= std::decay_t<decltype(tag)>;
using RawType = typename T::DataTypeTag::raw_type;
const auto bytes = (num_rows * sizeof(RawType));
if (num_rows > 0) {
type_.visit_tag([this, start_pos, num_rows, ensure_alloc](auto tag) {
using T = std::decay_t<decltype(tag)>;
using RawType = typename T::DataTypeTag::raw_type;
const auto bytes = (num_rows * sizeof(RawType));

if(ensure_alloc)
that->data_.ensure<uint8_t>(bytes);
if (ensure_alloc)
data_.ensure<uint8_t>(bytes);

auto type_ptr = that->data_.ptr_cast<RawType>(start_pos, bytes);
util::default_initialize<T>(reinterpret_cast<uint8_t*>(type_ptr), bytes);
auto type_ptr = data_.ptr_cast<RawType>(start_pos, bytes);
util::default_initialize<T>(reinterpret_cast<uint8_t *>(type_ptr), bytes);

if(ensure_alloc)
that->data_.commit();
if (ensure_alloc)
data_.commit();

that->last_logical_row_ += static_cast<ssize_t>(num_rows);
that->last_physical_row_ += static_cast<ssize_t>(num_rows);
});
last_logical_row_ += static_cast<ssize_t>(num_rows);
last_physical_row_ += static_cast<ssize_t>(num_rows);
});
}
}

void Column::set_row_data(size_t row_id) {
Expand Down Expand Up @@ -538,6 +540,7 @@ void Column::change_type(DataType target_type) {
});
});
}
buf.commit();
type_ = TypeDescriptor{target_type, type_.dimension()};
std::swap(data_, buf);
}
Expand Down
8 changes: 6 additions & 2 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void SegmentInMemoryImpl::append(const SegmentInMemoryImpl& other) {

if (this_type != *opt_common_type) {
column_unchecked(col).change_type(opt_common_type->data_type_);
descriptor_->mutable_field(col).mutable_type().data_type_ = opt_common_type->data_type_;
}
if (other_type != *opt_common_type) {
auto type_promoted_col = other.column_unchecked(*other_col_index).clone();
Expand Down Expand Up @@ -543,8 +544,10 @@ void SegmentInMemoryImpl::change_schema(StreamDescriptor descriptor) {
const auto& other_type = descriptor.field(col).type();
if(col_index) {
auto this_type = column_unchecked(*col_index).type();
util::check(this_type == other_type, "Could not convert type {} to type {} for column {}, this index {}, other index {}",
other_type, this_type, col_name, *col_index, col);
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
this_type == other_type,
"Could not convert type {} to type {} for column {}, this index {}, other index {}",
other_type, this_type, col_name, *col_index, col);
new_columns[col] = std::move(columns_[*col_index]);
} else {
auto new_column = std::make_shared<Column>(other_type, row_count(), false, allow_sparse_);
Expand All @@ -564,6 +567,7 @@ std::optional<std::string_view> SegmentInMemoryImpl::string_at(position_t row, p
const auto& col_ref = column(col);

if(is_fixed_string_type(td.data_type()) && col_ref.is_inflated()) {

auto string_size = col_ref.bytes() / row_count();
auto ptr = col_ref.data().buffer().ptr_cast<char>(row * string_size, string_size);
return std::string_view(ptr, string_size);
Expand Down
46 changes: 24 additions & 22 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,34 +328,36 @@ Composite<EntityIds> ProjectClause::process(Composite<EntityIds>&& entity_ids) c
}

AggregationClause::AggregationClause(const std::string& grouping_column,
const std::unordered_map<std::string,
std::string>& aggregations):
grouping_column_(grouping_column),
aggregation_map_(aggregations) {
const std::vector<NamedAggregator>& named_aggregators):
grouping_column_(grouping_column) {
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.new_index_ = grouping_column_;
clause_info_.input_columns_ = std::make_optional<std::unordered_set<std::string>>({grouping_column_});
clause_info_.modifies_output_descriptor_ = true;
for (const auto& [column_name, aggregation_operator]: aggregations) {
auto [_, inserted] = clause_info_.input_columns_->insert(column_name);
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(inserted,
"Cannot perform two aggregations over the same column: {}",
column_name);
auto typed_column_name = ColumnName(column_name);
if (aggregation_operator == "sum") {
aggregators_.emplace_back(SumAggregator(typed_column_name, typed_column_name));
} else if (aggregation_operator == "mean") {
aggregators_.emplace_back(MeanAggregator(typed_column_name, typed_column_name));
} else if (aggregation_operator == "max") {
aggregators_.emplace_back(MaxAggregator(typed_column_name, typed_column_name));
} else if (aggregation_operator == "min") {
aggregators_.emplace_back(MinAggregator(typed_column_name, typed_column_name));
} else if (aggregation_operator == "count") {
aggregators_.emplace_back(CountAggregator(typed_column_name, typed_column_name));
str_ = "AGGREGATE {";
for (const auto& named_aggregator: named_aggregators) {
str_.append(fmt::format("{}: ({}, {}), ",
named_aggregator.output_column_name_,
named_aggregator.input_column_name_,
named_aggregator.aggregation_operator_));
clause_info_.input_columns_->insert(named_aggregator.input_column_name_);
auto typed_input_column_name = ColumnName(named_aggregator.input_column_name_);
auto typed_output_column_name = ColumnName(named_aggregator.output_column_name_);
if (named_aggregator.aggregation_operator_ == "sum") {
aggregators_.emplace_back(SumAggregator(typed_input_column_name, typed_output_column_name));
} else if (named_aggregator.aggregation_operator_ == "mean") {
aggregators_.emplace_back(MeanAggregator(typed_input_column_name, typed_output_column_name));
} else if (named_aggregator.aggregation_operator_ == "max") {
aggregators_.emplace_back(MaxAggregator(typed_input_column_name, typed_output_column_name));
} else if (named_aggregator.aggregation_operator_ == "min") {
aggregators_.emplace_back(MinAggregator(typed_input_column_name, typed_output_column_name));
} else if (named_aggregator.aggregation_operator_ == "count") {
aggregators_.emplace_back(CountAggregator(typed_input_column_name, typed_output_column_name));
} else {
user_input::raise<ErrorCode::E_INVALID_USER_ARGUMENT>("Unknown aggregation operator provided: {}", aggregation_operator);
user_input::raise<ErrorCode::E_INVALID_USER_ARGUMENT>("Unknown aggregation operator provided: {}", named_aggregator.aggregation_operator_);
}
}
str_.append("}");
}

Composite<EntityIds> AggregationClause::process(Composite<EntityIds>&& entity_ids) const {
Expand Down Expand Up @@ -530,7 +532,7 @@ Composite<EntityIds> AggregationClause::process(Composite<EntityIds>&& entity_id
}

[[nodiscard]] std::string AggregationClause::to_string() const {
return fmt::format("AGGREGATE {}", aggregation_map_);
return str_;
}

[[nodiscard]] Composite<EntityIds> RemoveColumnPartitioningClause::process(Composite<EntityIds>&& entity_ids) const {
Expand Down
11 changes: 8 additions & 3 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,21 +339,26 @@ inline StreamDescriptor empty_descriptor(arcticdb::proto::descriptors::IndexDesc
return StreamDescriptor{StreamId{id}, IndexDescriptor{field_count, type}, std::make_shared<FieldCollection>()};
}

struct NamedAggregator {
std::string aggregation_operator_;
std::string input_column_name_;
std::string output_column_name_;
};

struct AggregationClause {
ClauseInfo clause_info_;
std::shared_ptr<ComponentManager> component_manager_;
ProcessingConfig processing_config_;
std::string grouping_column_;
std::unordered_map<std::string, std::string> aggregation_map_;
std::vector<GroupingAggregator> aggregators_;
std::string str_;

AggregationClause() = delete;

ARCTICDB_MOVE_COPY_DEFAULT(AggregationClause)

AggregationClause(const std::string& grouping_column,
const std::unordered_map<std::string,
std::string>& aggregations);
const std::vector<NamedAggregator>& aggregations);

[[noreturn]] std::vector<std::vector<size_t>> structure_for_processing(
ARCTICDB_UNUSED const std::vector<RangesAndKey>&,
Expand Down
28 changes: 24 additions & 4 deletions cpp/arcticdb/processing/test/test_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ TEST(Clause, AggregationEmptyColumn) {
using namespace arcticdb;
auto component_manager = std::make_shared<ComponentManager>();

AggregationClause aggregation("int_repeated_values", {{"empty_sum", "sum"}, {"empty_min", "min"}, {"empty_max", "max"}, {"empty_mean", "mean"}, {"empty_count", "count"}});
AggregationClause aggregation("int_repeated_values",
{{"sum", "empty_sum", "empty_sum"},
{"min", "empty_min", "empty_min"},
{"max", "empty_max", "empty_max"},
{"mean", "empty_mean", "empty_mean"},
{"count", "empty_count", "empty_count"}});
aggregation.set_component_manager(component_manager);

size_t num_rows{100};
Expand Down Expand Up @@ -132,7 +137,12 @@ TEST(Clause, AggregationColumn)
using namespace arcticdb;
auto component_manager = std::make_shared<ComponentManager>();

AggregationClause aggregation("int_repeated_values", {{"sum_int", "sum"}, {"min_int", "min"}, {"max_int", "max"}, {"mean_int", "mean"}, {"count_int", "count"}});
AggregationClause aggregation("int_repeated_values",
{{"sum", "sum_int", "sum_int"},
{"min", "min_int", "min_int"},
{"max", "max_int", "max_int"},
{"mean", "mean_int", "mean_int"},
{"count", "count_int", "count_int"}});
aggregation.set_component_manager(component_manager);

size_t num_rows{100};
Expand All @@ -159,7 +169,12 @@ TEST(Clause, AggregationSparseColumn)
using namespace arcticdb;
auto component_manager = std::make_shared<ComponentManager>();

AggregationClause aggregation("int_repeated_values", {{"sum_int", "sum"}, {"min_int", "min"}, {"max_int", "max"}, {"mean_int", "mean"}, {"count_int", "count"}});
AggregationClause aggregation("int_repeated_values",
{{"sum", "sum_int", "sum_int"},
{"min", "min_int", "min_int"},
{"max", "max_int", "max_int"},
{"mean", "mean_int", "mean_int"},
{"count", "count_int", "count_int"}});
aggregation.set_component_manager(component_manager);

size_t num_rows{100};
Expand Down Expand Up @@ -217,7 +232,12 @@ TEST(Clause, AggregationSparseGroupby) {
using namespace arcticdb;
auto component_manager = std::make_shared<ComponentManager>();

AggregationClause aggregation("int_sparse_repeated_values", {{"sum_int", "sum"}, {"min_int", "min"}, {"max_int", "max"}, {"mean_int", "mean"}, {"count_int", "count"}});
AggregationClause aggregation("int_sparse_repeated_values",
{{"sum", "sum_int", "sum_int"},
{"min", "min_int", "min_int"},
{"max", "max_int", "max_int"},
{"mean", "mean_int", "mean_int"},
{"count", "count_int", "count_int"}});
aggregation.set_component_manager(component_manager);

size_t num_rows{100};
Expand Down
11 changes: 5 additions & 6 deletions cpp/arcticdb/storage/azure/azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,30 +191,29 @@ void do_remove_impl(Composite<VariantKey>&& ks,
static const size_t delete_object_limit =
std::min(BATCH_SUBREQUEST_LIMIT, static_cast<size_t>(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize", BATCH_SUBREQUEST_LIMIT)));

unsigned int batch_counter = 0u;
auto submit_batch = [&azure_client, &request_timeout, &batch_counter](auto &to_delete) {
auto submit_batch = [&azure_client, &request_timeout](auto &to_delete) {
try {
azure_client.delete_blobs(to_delete, request_timeout);
}
catch (const Azure::Core::RequestFailedException& e) {
raise_azure_exception(e);
}
batch_counter = 0u;
to_delete.clear();
};

(fg::from(ks.as_range()) | fg::move | fg::groupBy(fmt_db)).foreach(
[&root_folder, b=std::move(bucketizer), delete_object_limit=delete_object_limit, &batch_counter, &to_delete, &submit_batch] (auto&& group) {//bypass incorrect 'set but no used" error for delete_object_limit
[&root_folder, b=std::move(bucketizer), delete_object_limit=delete_object_limit, &to_delete, &submit_batch] (auto&& group) {//bypass incorrect 'set but no used" error for delete_object_limit
auto key_type_dir = key_type_folder(root_folder, group.key());
for (auto k : folly::enumerate(group.values())) {
auto blob_name = object_path(b.bucketize(key_type_dir, *k), *k);
to_delete.emplace_back(std::move(blob_name));
if (++batch_counter == delete_object_limit) {
if (to_delete.size() == delete_object_limit) {
submit_batch(to_delete);
}
}
}
);
if (batch_counter) {
if (!to_delete.empty()) {
submit_batch(to_delete);
}
}
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ void register_bindings(py::module& storage) {
.def_property("bucket_name", &S3Override::bucket_name, &S3Override::set_bucket_name)
.def_property("region", &S3Override::region, &S3Override::set_region)
.def_property(
"use_virtual_addressing", &S3Override::use_virtual_addressing, &S3Override::set_use_virtual_addressing);
"use_virtual_addressing", &S3Override::use_virtual_addressing, &S3Override::set_use_virtual_addressing)
.def_property("ca_cert_path", &S3Override::ca_cert_path, &S3Override::set_ca_cert_path)
.def_property("ca_cert_dir", &S3Override::ca_cert_dir, &S3Override::set_ca_cert_dir)
.def_property("https", &S3Override::https, &S3Override::set_https)
.def_property("ssl", &S3Override::ssl, &S3Override::set_ssl);

py::class_<AzureOverride>(storage, "AzureOverride")
.def(py::init<>())
Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ auto get_s3_config(const ConfigType& conf) {
const bool verify_ssl = ConfigsMap::instance()->get_int("S3Storage.VerifySSL", conf.ssl());
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Verify ssl: {}", verify_ssl);
client_configuration.verifySSL = verify_ssl;
if (client_configuration.verifySSL && (!conf.ca_cert_path().empty() || !conf.ca_cert_dir().empty())) {
client_configuration.caFile = conf.ca_cert_path();
client_configuration.caPath = conf.ca_cert_dir();
}
client_configuration.maxConnections = conf.max_connections() == 0 ?
ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", 16) :
conf.max_connections();
Expand Down
41 changes: 41 additions & 0 deletions cpp/arcticdb/storage/storage_override.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ class S3Override {
std::string endpoint_;
std::string bucket_name_;
std::string region_;
std::string ca_cert_path_;
std::string ca_cert_dir_;
bool use_virtual_addressing_ = false;
bool https_;
bool ssl_;

public:
std::string credential_name() const {
Expand Down Expand Up @@ -65,6 +69,39 @@ class S3Override {
use_virtual_addressing_ = use_virtual_addressing;
}

std::string ca_cert_path() const {
return ca_cert_path_;
}

void set_ca_cert_path(std::string_view ca_cert_path){
ca_cert_path_ = ca_cert_path;
}


std::string ca_cert_dir() const {
return ca_cert_dir_;
}

void set_ca_cert_dir(std::string_view ca_cert_dir){
ca_cert_dir_ = ca_cert_dir;
}

bool https() const {
return https_;
}

void set_https(bool https){
https_ = https;
}

bool ssl() const {
return ssl_;
}

void set_ssl(bool ssl){
ssl_ = ssl;
}

void modify_storage_config(arcticdb::proto::storage::VariantStorage& storage) const {
if(storage.config().Is<arcticdb::proto::s3_storage::Config>()) {
arcticdb::proto::s3_storage::Config s3_storage;
Expand All @@ -76,6 +113,10 @@ class S3Override {
s3_storage.set_endpoint(endpoint_);
s3_storage.set_region(region_);
s3_storage.set_use_virtual_addressing(use_virtual_addressing_);
s3_storage.set_ca_cert_path(ca_cert_path_);
s3_storage.set_ca_cert_dir(ca_cert_dir_);
s3_storage.set_https(https_);
s3_storage.set_ssl(ssl_);

util::pack_to_any(s3_storage, *storage.mutable_config());
}
Expand Down
Loading

0 comments on commit 9b70d1f

Please sign in to comment.