Skip to content

Commit

Permalink
feat: support query aggregtion(#36380)
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Dec 20, 2024
1 parent 3d360c0 commit 942ce20
Show file tree
Hide file tree
Showing 107 changed files with 8,637 additions and 201 deletions.
813 changes: 813 additions & 0 deletions internal/agg/aggregate.go

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions internal/agg/aggregate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package agg

import "testing"

func TestMatchAggregationExpression(t *testing.T) {
tests := []struct {
expression string
expectedIsValid bool
expectedOperator string
expectedParam string
}{
{"count(*)", true, "count", "*"},
{"count(a)", true, "count", "a"},
{"sum(b)", true, "sum", "b"},
{"avg(c)", true, "avg", "c"},
{"min(d)", true, "min", "d"},
{"max(e)", true, "max", "e"},
{"invalidExpression", false, "", ""},
{"sum ( x )", true, "sum", "x"},
{"SUM(Z)", true, "sum", "Z"},
{"AVG( y )", true, "avg", "y"},
}

for _, test := range tests {
isValid, operator, param := MatchAggregationExpression(test.expression)
if isValid != test.expectedIsValid || operator != test.expectedOperator || param != test.expectedParam {
t.Errorf("MatchAggregationExpression(%q) = (%v, %q, %q), want (%v, %q, %q)",
test.expression, isValid, operator, param, test.expectedIsValid, test.expectedOperator, test.expectedParam)
}
}
}
6 changes: 6 additions & 0 deletions internal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/segcore/
FILES_MATCHING PATTERN "*_c.h"
)

# Install exec/operator/
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/exec/operator/
DESTINATION include/exec/operator
FILES_MATCHING PATTERN "*_c.h"
)

# Install exec/expression/function
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/exec/expression/function/
DESTINATION include/exec/expression/function
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ add_subdirectory( clustering )
add_subdirectory( exec )
add_subdirectory( bitset )
add_subdirectory( futures )
add_subdirectory( plan )

milvus_add_pkg_config("milvus_core")

Expand All @@ -66,7 +67,7 @@ add_library(milvus_core SHARED
$<TARGET_OBJECTS:milvus_exec>
$<TARGET_OBJECTS:milvus_bitset>
$<TARGET_OBJECTS:milvus_futures>
)
$<TARGET_OBJECTS:milvus_plan>)

set(LINK_TARGETS
boost_bitset_ext
Expand Down
204 changes: 204 additions & 0 deletions internal/core/src/common/BitUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <cstdint>

namespace milvus {
namespace bits {

template <typename T>
inline bool
isBitSet(const T* bits, uint32_t idx) {
return bits[idx / (sizeof(bits[0]) * 8)] &
(static_cast<T>(1) << (idx & ((sizeof(bits[0]) * 8) - 1)));
}

template <typename T, typename U>
constexpr T
roundUp(T value, U factor) {
return (value + (factor - 1)) / factor * factor;
}

constexpr uint64_t
nBytes(int32_t value) {
return roundUp(value, 8) / 8;
}

constexpr inline uint64_t
lowMask(int32_t bits) {
return (1UL << bits) - 1;
}

inline int32_t
getAndClearLastSetBit(uint16_t& bits) {
int32_t trailingZeros = __builtin_ctz(bits);
bits &= bits - 1;
return trailingZeros;
}

constexpr inline uint64_t
highMask(int32_t bits) {
return lowMask(bits) << (64 - bits);
}

/**
* Invokes a function for each batch of bits (partial or full words)
* in a given range.
*
* @param begin first bit to check (inclusive)
* @param end last bit to check (exclusive)
* @param partialWordFunc function to invoke for a partial word;
* takes index of the word and mask
* @param fullWordFunc function to invoke for a full word;
* takes index of the word
*/
template <typename PartialWordFunc, typename FullWordFunc>
inline void
forEachWord(int32_t begin,
int32_t end,
PartialWordFunc partialWordFunc,
FullWordFunc fullWordFunc) {
if (begin >= end) {
return;
}
int32_t firstWord = roundUp(begin, 64);
int32_t lastWord = end & ~63L;
if (lastWord < firstWord) {
partialWordFunc(lastWord / 64,
lowMask(end - lastWord) & highMask(firstWord - begin));
return;
}
if (begin != firstWord) {
partialWordFunc(begin / 64, highMask(firstWord - begin));
}
for (int32_t i = firstWord; i + 64 <= lastWord; i += 64) {
fullWordFunc(i / 64);
}
if (end != lastWord) {
partialWordFunc(lastWord / 64, lowMask(end - lastWord));
}
}

inline int32_t
countBits(const uint64_t* bits, int32_t begin, int32_t end) {
int32_t count = 0;
forEachWord(
begin,
end,
[&count, bits](int32_t idx, uint64_t mask) {
count += __builtin_popcountll(bits[idx] & mask);
},
[&count, bits](int32_t idx) {
count += __builtin_popcountll(bits[idx]);
});
return count;
}

inline bool
isPowerOfTwo(uint64_t size) {
return bits::countBits(&size, 0, sizeof(uint64_t) * 8) <= 1;
}

template <typename T = uint64_t>
inline int32_t
countLeadingZeros(T word) {
static_assert(std::is_same_v<T, uint64_t> ||
std::is_same_v<T, __uint128_t>);
/// Built-in Function: int __builtin_clz (unsigned int x) returns the number
/// of leading 0-bits in x, starting at the most significant bit position. If
/// x is 0, the result is undefined.
if (word == 0) {
return sizeof(T) * 8;
}
if constexpr (std::is_same_v<T, uint64_t>) {
return __builtin_clzll(word);
} else {
uint64_t hi = word >> 64;
uint64_t lo = static_cast<uint64_t>(word);
return (hi == 0) ? 64 + __builtin_clzll(lo) : __builtin_clzll(hi);
}
}

inline uint64_t
nextPowerOfTwo(uint64_t size) {
if (size == 0) {
return 0;
}
uint32_t bits = 63 - countLeadingZeros(size);
uint64_t lower = 1ULL << bits;
// Size is a power of 2.
if (lower == size) {
return size;
}
return 2 * lower;
}

// This is the Hash128to64 function from Google's cityhash (available
// under the MIT License). We use it to reduce multiple 64 bit hashes
// into a single hash.
#if defined(FOLLY_DISABLE_UNDEFINED_BEHAVIOR_SANITIZER)
FOLLY_DISABLE_UNDEFINED_BEHAVIOR_SANITIZER("unsigned-integer-overflow")
#endif
inline uint64_t
hashMix(const uint64_t upper, const uint64_t lower) noexcept {
// Murmur-inspired hashing.
const uint64_t kMul = 0x9ddfea08eb382d69ULL;
uint64_t a = (lower ^ upper) * kMul;
a ^= (a >> 47);
uint64_t b = (upper ^ a) * kMul;
b ^= (b >> 47);
b *= kMul;
return b;
}

/// Extract bits from integer 'a' at the corresponding bit locations specified
/// by 'mask' to contiguous low bits in return value; the remaining upper bits
/// in return value are set to zero.
template <typename T>
inline T
extractBits(T a, T mask);

#ifdef __BMI2__
template <>
inline uint32_t
extractBits(uint32_t a, uint32_t mask) {
return _pext_u32(a, mask);
}
template <>
inline uint64_t
extractBits(uint64_t a, uint64_t mask) {
return _pext_u64(a, mask);
}
#else
template <typename T>
T
extractBits(T a, T mask) {
constexpr int kBitsCount = 8 * sizeof(T);
T dst = 0;
for (int i = 0, k = 0; i < kBitsCount; ++i) {
if (mask & 1) {
dst |= ((a & 1) << k);
++k;
}
a >>= 1;
mask >>= 1;
}
return dst;
}
#endif
} // namespace bits
} // namespace milvus
49 changes: 49 additions & 0 deletions internal/core/src/common/ComplexVector.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "Vector.h"

namespace milvus {

void
BaseVector::prepareForReuse(milvus::VectorPtr& vector,
milvus::vector_size_t size) {
if (!vector.unique()) {
vector = std::make_shared<BaseVector>(
vector->type(), size, vector->nullCount());
} else {
vector->prepareForReuse();
vector->resize(size);
}
}

void
BaseVector::prepareForReuse() {
null_count_ = std::nullopt;
}

void
RowVector::resize(milvus::vector_size_t new_size, bool setNotNull) {
const auto oldSize = size();
BaseVector::resize(new_size, setNotNull);
for (auto& child : childrens()) {
if (new_size > oldSize) {
child->resize(new_size, setNotNull);
}
}
}

} // namespace milvus
67 changes: 67 additions & 0 deletions internal/core/src/common/FieldData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,71 @@ InitScalarFieldData(const DataType& type, bool nullable, int64_t cap_rows) {
}
}

void
ResizeScalarFieldData(const DataType& type,
int64_t new_num_rows,
FieldDataPtr& field_data) {
switch (type) {
case DataType::BOOL: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<bool>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT8: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<int8_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT16: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<int16_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT32: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<int32_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT64: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<int64_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::FLOAT: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<float>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::DOUBLE: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<double>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::STRING:
case DataType::VARCHAR: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<std::string>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::JSON: {
auto inner_field_data =
std::dynamic_pointer_cast<FieldData<Json>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
default:
PanicInfo(DataTypeInvalid,
"ResizeScalarFieldData not support data type " +
GetDataTypeName(type));
}
}

} // namespace milvus
Loading

0 comments on commit 942ce20

Please sign in to comment.