Skip to content

Commit

Permalink
Homogenise first and last with others over has_value_ behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
alexowens90 committed Apr 18, 2024
1 parent 7683b07 commit 3e3af1a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 19 deletions.
48 changes: 29 additions & 19 deletions cpp/arcticdb/processing/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,73 +393,83 @@ class FirstBucketAggregator {
public:
void push(T value) {
if constexpr (std::is_same_v<T, std::optional<std::string_view>>) {
if (ARCTICDB_UNLIKELY(!first_.has_value() || !first_.value().has_value())) {
if (ARCTICDB_UNLIKELY(!has_value_ || !first_.has_value())) {
first_ = value;
}
} else if (std::is_floating_point_v<T>) {
if (ARCTICDB_UNLIKELY(!first_.has_value() || std::isnan(*first_))) {
if (ARCTICDB_UNLIKELY(!has_value_ || std::isnan(first_))) {
first_ = value;
}
} else {
if (ARCTICDB_UNLIKELY(!first_.has_value())) {
if (ARCTICDB_UNLIKELY(!has_value_)) {
first_ = value;
}
}
has_value_ = true;
}

T finalize() {
T res;
if constexpr (std::is_floating_point_v<T>) {
res = first_.value_or(std::numeric_limits<T>::quiet_NaN());
if (ARCTICDB_LIKELY(has_value_)) {
res = first_;
has_value_ = false;
} else {
debug::check<ErrorCode::E_ASSERTION_FAILURE>(first_.has_value(), "finalize called on non-float FirstBucketAggregator with no values");
res = *first_;
if constexpr (std::is_floating_point_v<T>) {
res = std::numeric_limits<T>::quiet_NaN();
} else {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("FirstBucketAggregator::finalize called with no values pushed");
}
}
first_.reset();
return res;
}

[[nodiscard]] bool has_value() const {
return first_.has_value();
return has_value_;
}
private:
std::optional<T> first_{std::nullopt};
bool has_value_{false};
T first_;
};

template<typename T>
class LastBucketAggregator {
public:
void push(T value) {
if constexpr (std::is_same_v<T, std::optional<std::string_view>>) {
if (ARCTICDB_LIKELY(!last_.has_value() || value.has_value())) {
if (ARCTICDB_LIKELY(!has_value_ || value.has_value())) {
last_ = value;
}
} else if (std::is_floating_point_v<T>) {
if (ARCTICDB_LIKELY(!last_.has_value() || !std::isnan(value))) {
if (ARCTICDB_LIKELY(!has_value_ || !std::isnan(value))) {
last_ = value;
}
} else {
last_ = value;
}
has_value_ = true;
}

T finalize() {
T res;
if constexpr (std::is_floating_point_v<T>) {
res = last_.value_or(std::numeric_limits<T>::quiet_NaN());
if (ARCTICDB_LIKELY(has_value_)) {
res = last_;
has_value_ = false;
} else {
debug::check<ErrorCode::E_ASSERTION_FAILURE>(last_.has_value(), "finalize called on non-float LastBucketAggregator with no values");
res = *last_;
if constexpr (std::is_floating_point_v<T>) {
res = std::numeric_limits<T>::quiet_NaN();
} else {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("LastBucketAggregator::finalize called with no values pushed");
}
}
last_.reset();
return res;
}

[[nodiscard]] bool has_value() const {
return last_.has_value();
return has_value_;
}
private:
std::optional<T> last_{std::nullopt};
bool has_value_{false};
T last_;
};

class CountBucketAggregator {
Expand Down
41 changes: 41 additions & 0 deletions python/tests/unit/arcticdb/version_store/test_resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,47 @@ def test_resampling_string_columns_supported_aggregations(lmdb_version_store_tin
assert_frame_equal(expected, received)


# def test_resampling_bool_columns(lmdb_version_store_tiny_segment):
# lib = lmdb_version_store_tiny_segment
# sym = "test_resampling_bool_columns"
#
# idx = [0, 1, 1000, 1001, 2000, 2001, 3000, 3001]
# idx = np.array(idx, dtype="datetime64[ns]")
#
# col = [True, True, True, False, False, True, False, False]
#
# df = pd.DataFrame({"col": col}, index=idx)
# lib.write(sym, df)
#
# expected = df.resample("us").agg(
# sum=pd.NamedAgg("col", "sum"),
# mean=pd.NamedAgg("col", "mean"),
# min=pd.NamedAgg("col", "min"),
# max=pd.NamedAgg("col", "max"),
# first=pd.NamedAgg("col", "first"),
# last=pd.NamedAgg("col", "last"),
# count=pd.NamedAgg("col", "count"),
# )
# expected = expected.reindex(columns=sorted(expected.columns))
# expected["count"] = expected["count"].astype(np.uint64)
#
# q = QueryBuilder()
# q = q.resample("us").agg(
# {
# "sum": ("col", "sum"),
# "mean": ("col", "mean"),
# "min": ("col", "min"),
# "max": ("col", "max"),
# "first": ("col", "first"),
# "last": ("col", "last"),
# "count": ("col", "count"),
# }
# )
# received = lib.read(sym, query_builder=q).data
# received = received.reindex(columns=sorted(received.columns))
# assert_frame_equal(expected, received)


def test_resampling_named_agg(lmdb_version_store_v1):
lib = lmdb_version_store_v1
sym = "test_resampling_named_agg"
Expand Down

0 comments on commit 3e3af1a

Please sign in to comment.