Skip to content

Commit

Permalink
Merge branch 'master' into dev/vasil.pashov/empty-index
Browse files Browse the repository at this point in the history
  • Loading branch information
vasil-pashov authored Mar 29, 2024
2 parents f86640d + ae922bd commit 4e5a0eb
Show file tree
Hide file tree
Showing 76 changed files with 1,795 additions and 861 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# .git-blame-ignore-revs

# Build time improvements
9d2c8ef41589a4e8635b972911d25c0125dc3728

# Moving implementation from column.hpp to column.cpp
# Reordering/removing some inclusions
801cf4b6f0f9ec0a997311fbdc14639537d3bbb6
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/analysis_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ jobs:
- name: Install ArcticDB[Testing]
shell: bash -el {0}
run: |
pip install arcticdb[Testing]
pip install arcticdb[Testing] "protobuf<5"
- name: Publish results to Github Pages
shell: bash -el {0}
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/benchmark_commits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
job_type: start

benchmark_commit:
timeout-minutes: 1200
needs: [start_ec2_runner]
if: |
always() &&
Expand Down Expand Up @@ -92,4 +93,4 @@ jobs:
with:
job_type: stop
label: ${{ needs.start_ec2_runner.outputs.label }}
ec2-instance-id: ${{ needs.start_ec2_runner.outputs.ec2-instance-id }}
ec2-instance-id: ${{ needs.start_ec2_runner.outputs.ec2-instance-id }}
6 changes: 3 additions & 3 deletions .github/workflows/persistent_storage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ jobs:
- name: Install latest release
if: inputs.arcticdb_version == 'latest'
run: |
pip install pytest arcticdb
pip install pytest arcticdb "protobuf<5"
# Currently, the oldest "supported" release is 3.0.0
# We use this version to test forwards/barwards compatibility
# So we expect that libs written by the version that we are build to be readable by arcticdb>=3.0.0 and vice-versa
# Change this value, if we need to support a newer one in the future
- name: Install oldest supported release
run: |
pip install pytest arcticdb=="3.0.0"
pip install pytest arcticdb=="3.0.0" "protobuf<5"
- name: Set persistent storage variables
uses: ./.github/actions/set_persistent_storage_env_vars
Expand Down Expand Up @@ -96,7 +96,7 @@ jobs:

- name: Install latest release
run: |
pip install pytest arcticdb
pip install pytest arcticdb "protobuf<5"
- name: Set persistent storage variables
uses: ./.github/actions/set_persistent_storage_env_vars
Expand Down
21 changes: 0 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,3 @@
<table>
<tr>
<td>

![Alt text](static/happybdayarcticdb.png)

</td>
<td>

# 🚀 ArcticDB Hits 1,000 GitHub Stars on Our First Open Source Anniversary! 🚀

Exciting news as we celebrate two milestones: ArcticDB's first year in the open-source community
and reaching 1,000 GitHub stars! ⭐

A huge thank you to all the contributors, supporters, and community members whose involvement has been
pivotal to our success! 🙌

</td>
</tr>
</table>

<p align="center">
<img src="https://github.com/man-group/ArcticDB/raw/master/static/ArcticDBCropped.png" width="40%">
</p>
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/codec/codec-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ std::size_t decode_ndarray(
util::check(type_desc_tag.data_type() == DataType::EMPTYVAL,
"NDArray of type {} should not be of size 0!",
datatype_to_str(type_desc_tag.data_type()));
read_bytes = encoding_sizes::data_compressed_size(field);
return;
}

Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/column_store/chunked_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ class ChunkedBufferImpl {
}

[[nodiscard]] const uint8_t* data() const {
util::check(blocks_.size() == 1, "Taking a pointer to the beginning of a non-contiguous buffer");
if (blocks_.empty()) {
return nullptr;
}
internal::check<ErrorCode::E_ASSERTION_FAILURE>(blocks_.size() == 1,
"Taking a pointer to the beginning of a non-contiguous buffer");
blocks_[0]->magic_.check();
return blocks_[0]->data();
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/column_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ struct ColumnData {
}

// Used to construct [c]end iterators
explicit ColumnDataIterator(ColumnData* parent, typename TDT::DataTypeTag::raw_type* end_ptr):
explicit ColumnDataIterator(ColumnData* parent, RawType* end_ptr):
parent_(parent) {
data_.ptr_ = end_ptr;
}
Expand Down Expand Up @@ -304,7 +304,7 @@ struct ColumnData {
if(!data_->blocks().empty()) {
auto block = data_->blocks().at(num_blocks() - 1);
auto typed_block_data = next_typed_block<TDT>(block);
end_ptr = typed_block_data.data() + typed_block_data.row_count();
end_ptr = const_cast<RawType*>(typed_block_data.data() + typed_block_data.row_count());
}
return ColumnDataIterator<TDT, iterator_type, iterator_density, false>(this, end_ptr);
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/memory_segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,10 @@ class SegmentInMemory {
impl_->set_string_pool(string_pool);
}

SegmentInMemory filter(const util::BitSet& filter_bitset,
SegmentInMemory filter(util::BitSet&& filter_bitset,
bool filter_down_stringpool=false,
bool validate=false) const{
return SegmentInMemory(impl_->filter(filter_bitset, filter_down_stringpool, validate));
return SegmentInMemory(impl_->filter(std::move(filter_bitset), filter_down_stringpool, validate));
}

/// @see SegmentInMemoryImpl::truncate
Expand Down
16 changes: 9 additions & 7 deletions cpp/arcticdb/column_store/memory_segment_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ void SegmentInMemoryImpl::drop_column(std::string_view name) {
column_map_->erase(name);
}

std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::filter(const util::BitSet& filter_bitset,
std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::filter(util::BitSet&& filter_bitset,
bool filter_down_stringpool,
bool validate) const {
filter_bitset.resize(row_count());
bool is_input_sparse = is_sparse();
auto num_values = filter_bitset.count();
if(num_values == 0)
Expand Down Expand Up @@ -210,18 +211,19 @@ std::shared_ptr<SegmentInMemoryImpl> SegmentInMemoryImpl::filter(const util::Bit
} else {
bitset_including_sparse.resize((*column)->row_count());
}
if (bitset_including_sparse.count() == 0) {
// No values are set in the sparse column, skip it
return;
}
output_col_idx = output->add_column(field(column.index), bitset_including_sparse.count(), true);
final_bitset = &bitset_including_sparse;
} else {
final_bitset = &filter_bitset;
}
auto& output_col = output->column(position_t(output_col_idx));
if (sparse_map)
if (sparse_map) {
output_col.opt_sparse_map() = std::make_optional<util::BitSet>();
if (final_bitset->count() == 0) {
// No values are set in the sparse column, no more work to do
return;
}
}
auto output_ptr = reinterpret_cast<RawType*>(output_col.ptr());
auto input_data = (*column)->data();

Expand Down Expand Up @@ -585,7 +587,7 @@ std::vector<std::shared_ptr<SegmentInMemoryImpl>> SegmentInMemoryImpl::split(siz
util::BitSetSizeType end = std::min(start + rows, total_rows);
// set_range is close interval on [left, right]
bitset.set_range(start, end - 1, true);
output.emplace_back(filter(bitset));
output.emplace_back(filter(std::move(bitset)));
}
return output;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/memory_segment_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ class SegmentInMemoryImpl {

std::shared_ptr<SegmentInMemoryImpl> get_output_segment(size_t num_values, bool pre_allocate=true) const;

std::shared_ptr<SegmentInMemoryImpl> filter(const util::BitSet& filter_bitset,
std::shared_ptr<SegmentInMemoryImpl> filter(util::BitSet&& filter_bitset,
bool filter_down_stringpool=false,
bool validate=false) const;

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/column_store/test/test_memory_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ TEST(MemSegment, Filter) {
filter_bitset.set_bit(retained_row);
}

auto filtered_seg = seg.filter(filter_bitset);
auto filtered_seg = seg.filter(std::move(filter_bitset));

for (auto&& [idx, row]: folly::enumerate(filtered_seg)) {
ASSERT_EQ(static_cast<int64_t>(retained_rows[idx]), row.scalar_at<int64_t>(0));
Expand Down
52 changes: 24 additions & 28 deletions cpp/arcticdb/entity/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,39 @@ namespace arcticdb {
std::shared_ptr<PrometheusInstance> PrometheusInstance::instance_;
std::once_flag PrometheusInstance::init_flag_;

std::shared_ptr<PrometheusConfigInstance> PrometheusConfigInstance::instance(){
std::call_once(PrometheusConfigInstance::init_flag_, &PrometheusConfigInstance::init);
return PrometheusConfigInstance::instance_;
PrometheusInstance::PrometheusInstance() : configured_(false) {
arcticdb::log::version().debug("PrometheusInstance created");
}

std::shared_ptr<PrometheusConfigInstance> PrometheusConfigInstance::instance_;
std::once_flag PrometheusConfigInstance::init_flag_;

PrometheusInstance::PrometheusInstance() {

auto cfg = PrometheusConfigInstance::instance()->config;

if (cfg.prometheus_model() == PrometheusConfigInstance::Proto::PUSH) {
// PUSH MODE
if (cfg.instance().empty() || cfg.host().empty() || cfg.port().empty() || cfg.job_name().empty()) {
util::raise_rte( "Invalid Push PrometheusConfig {}", arcticdb::util::format(cfg));
}
void PrometheusInstance::configure(const MetricsConfig& config, const bool reconfigure) {
if (configured_ && !reconfigure) {
arcticdb::log::version().warn("Prometheus already configured");
return;
}

cfg_ = config;

if (cfg_.model_ == MetricsConfig::Model::PUSH) {
// IMP: This is the GROUPING_KEY - every push overwrites the previous grouping key
auto labels = prometheus::Gateway::GetInstanceLabel(getHostName());
mongo_instance_ = cfg.instance();
mongo_instance_ = cfg_.instance;
labels.try_emplace(MONGO_INSTANCE_LABEL, mongo_instance_);
labels.try_emplace(PROMETHEUS_ENV_LABEL, cfg.prometheus_env());
gateway_= std::make_shared<prometheus::Gateway>(cfg.host(), cfg.port(), cfg.job_name(), labels);
labels.try_emplace(PROMETHEUS_ENV_LABEL, cfg_.prometheus_env);
gateway_= std::make_shared<prometheus::Gateway>(cfg_.host, cfg_.port, cfg_.job_name, labels);
registry_ = std::make_shared<prometheus::Registry>();
gateway_->RegisterCollectable(registry_);

arcticdb::log::version().info("Prometheus Push created with settings {}", arcticdb::util::format(cfg));
arcticdb::log::version().info("Prometheus Push created with settings {}", cfg_);

} else if (cfg.prometheus_model() == PrometheusConfigInstance::Proto::WEB) {

// WEB SERVER MODE
if (cfg.port().empty()) {
util::raise_rte( "PrometheusConfig web mode port not set {}", arcticdb::util::format(cfg));
}
} else if (cfg_.model_ == MetricsConfig::Model::PULL) {

// create an http server ie "http://hostname:"+port()+"/metrics"
std::string hostname = getHostName();
std::string endpoint = hostname + ":" + cfg.port();
std::string endpoint = cfg_.host + ":" + cfg_.port;

if (exposer_.use_count() > 0) {
exposer_->RemoveCollectable(registry_, "/metrics");
exposer_.reset();
}

// default to 2 threads
exposer_ = std::make_shared<prometheus::Exposer>(endpoint, 2);
Expand All @@ -79,8 +73,10 @@ namespace arcticdb {
arcticdb::log::version().info("Prometheus endpoint created on {}/metrics", endpoint);
}
else {
arcticdb::log::version().info("Prometheus not configured {}", arcticdb::util::format(cfg));
arcticdb::log::version().info("Prometheus not configured {}", cfg_);
}

configured_ = true;
}

// new mechanism, labels at runtime
Expand Down
68 changes: 55 additions & 13 deletions cpp/arcticdb/entity/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <map>
#include <unordered_map>
#include <memory>
#include <fmt/format.h>

namespace arcticdb {

Expand All @@ -34,6 +35,42 @@ const std::string PROMETHEUS_ENV_LABEL = "env";
const int SUMMARY_MAX_AGE = 30;
const int SUMMARY_AGE_BUCKETS = 5;

class MetricsConfig {
public:
enum class Model {
NO_INIT,
PUSH,
PULL
};
MetricsConfig() : model_(Model::NO_INIT) {}

MetricsConfig(const std::string& host,
const std::string& port,
const std::string& job_name,
const std::string& instance,
const std::string& prometheus_env,
const Model model)
: host(host)
, port(port)
, job_name(job_name)
, instance(instance)
, prometheus_env(prometheus_env)
, model_(model) {
util::check(!host.empty(), "MetricsConfig: host is empty");
util::check(!port.empty(), "MetricsConfig: port is empty");
util::check(!job_name.empty(), "MetricsConfig: job_name is empty");
util::check(!instance.empty(), "MetricsConfig: instance is empty");
util::check(!prometheus_env.empty(), "MetricsConfig: instance is empty");
util::check(!prometheus_env.empty(), "MetricsConfig: prometheus_env is empty");
}

std::string host;
std::string port;
std::string job_name;
std::string instance;
std::string prometheus_env;
Model model_;
};

class PrometheusInstance {
public:
Expand Down Expand Up @@ -67,6 +104,10 @@ class PrometheusInstance {

int push();

void configure(const MetricsConfig& config, const bool reconfigure = false);

MetricsConfig cfg_;

private:

struct HistogramInfo {
Expand All @@ -88,20 +129,8 @@ class PrometheusInstance {
// push gateway
std::string mongo_instance_;
std::shared_ptr<prometheus::Gateway> gateway_;
};
bool configured_;

class PrometheusConfigInstance {
public:
static std::shared_ptr<PrometheusConfigInstance> instance();

using Proto = arcticdb::proto::utils::PrometheusConfig;
Proto config;
static std::shared_ptr<PrometheusConfigInstance> instance_;
static std::once_flag init_flag_;

static void init(){
instance_ = std::make_shared<PrometheusConfigInstance>();
}
};

inline void log_prometheus_gauge(const std::string& metric_name, const std::string& metric_desc, size_t val) {
Expand All @@ -121,3 +150,16 @@ inline void log_prometheus_counter(const std::string& metric_name, const std::st
}

} // Namespace arcticdb

template<>
struct fmt::formatter<arcticdb::MetricsConfig> {

template<typename ParseContext>
constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }

template<typename FormatContext>
auto format(const arcticdb::MetricsConfig k, FormatContext &ctx) const {
return fmt::format_to(ctx.out(), "MetricsConfig: host={}, port={}, job_name={}, instance={}, prometheus_env={}, model={}",
k.host, k.port, k.job_name, k.instance, k.prometheus_env, static_cast<int>(k.model_));
}
};
Loading

0 comments on commit 4e5a0eb

Please sign in to comment.