Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ease-delete-size-…
Browse files Browse the repository at this point in the history
…mix-compaction
  • Loading branch information
XuanYang-cn committed Oct 31, 2024
2 parents 07c4145 + 2092dc0 commit c80724e
Show file tree
Hide file tree
Showing 48 changed files with 872 additions and 266 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Contributions to Milvus are welcome from everyone. See [Guidelines for Contribut
### All contributors

<br><!-- Do not remove start of hero-bot -->
<img src="https://img.shields.io/badge/all--contributors-410-orange"><br>
<img src="https://img.shields.io/badge/all--contributors-411-orange"><br>
<a href="https://github.com/0xflotus"><img src="https://avatars.githubusercontent.com/u/26602940?v=4" width="30px" /></a>
<a href="https://github.com/ABNER-1"><img src="https://avatars.githubusercontent.com/u/24547351?v=4" width="30px" /></a>
<a href="https://github.com/Accagain2014"><img src="https://avatars.githubusercontent.com/u/9635216?v=4" width="30px" /></a>
Expand All @@ -185,6 +185,7 @@ Contributions to Milvus are welcome from everyone. See [Guidelines for Contribut
<a href="https://github.com/AnthonyTsu1984"><img src="https://avatars.githubusercontent.com/u/115786031?v=4" width="30px" /></a>
<a href="https://github.com/Aredcap"><img src="https://avatars.githubusercontent.com/u/40494761?v=4" width="30px" /></a>
<a href="https://github.com/ArenaSu"><img src="https://avatars.githubusercontent.com/u/21214629?v=4" width="30px" /></a>
<a href="https://github.com/Armaggheddon"><img src="https://avatars.githubusercontent.com/u/47779194?v=4" width="30px" /></a>
<a href="https://github.com/Arya0812"><img src="https://avatars.githubusercontent.com/u/114047052?v=4" width="30px" /></a>
<a href="https://github.com/BUPTAnderson"><img src="https://avatars.githubusercontent.com/u/13449703?v=4" width="30px" /></a>
<a href="https://github.com/Ben-Aaron-Bio-Rad"><img src="https://avatars.githubusercontent.com/u/54123439?v=4" width="30px" /></a>
Expand Down
3 changes: 2 additions & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ Milvus [训练营](https://github.com/milvus-io/bootcamp)能够帮助你了解
### All contributors

<br><!-- Do not remove start of hero-bot -->
<img src="https://img.shields.io/badge/all--contributors-410-orange"><br>
<img src="https://img.shields.io/badge/all--contributors-411-orange"><br>
<a href="https://github.com/0xflotus"><img src="https://avatars.githubusercontent.com/u/26602940?v=4" width="30px" /></a>
<a href="https://github.com/ABNER-1"><img src="https://avatars.githubusercontent.com/u/24547351?v=4" width="30px" /></a>
<a href="https://github.com/Accagain2014"><img src="https://avatars.githubusercontent.com/u/9635216?v=4" width="30px" /></a>
Expand All @@ -167,6 +167,7 @@ Milvus [训练营](https://github.com/milvus-io/bootcamp)能够帮助你了解
<a href="https://github.com/AnthonyTsu1984"><img src="https://avatars.githubusercontent.com/u/115786031?v=4" width="30px" /></a>
<a href="https://github.com/Aredcap"><img src="https://avatars.githubusercontent.com/u/40494761?v=4" width="30px" /></a>
<a href="https://github.com/ArenaSu"><img src="https://avatars.githubusercontent.com/u/21214629?v=4" width="30px" /></a>
<a href="https://github.com/Armaggheddon"><img src="https://avatars.githubusercontent.com/u/47779194?v=4" width="30px" /></a>
<a href="https://github.com/Arya0812"><img src="https://avatars.githubusercontent.com/u/114047052?v=4" width="30px" /></a>
<a href="https://github.com/BUPTAnderson"><img src="https://avatars.githubusercontent.com/u/13449703?v=4" width="30px" /></a>
<a href="https://github.com/Ben-Aaron-Bio-Rad"><img src="https://avatars.githubusercontent.com/u/54123439?v=4" width="30px" /></a>
Expand Down
2 changes: 1 addition & 1 deletion ci/jenkins/PR.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pipeline {
gitBaseRef: gitBaseRef,
pullRequestNumber: "$env.CHANGE_ID",
suppress_suffix_of_image_tag: true,
make_cmd: "make clean && make install use_disk_index=ON",
make_cmd: "make clean && make install USE_ASAN=ON use_disk_index=ON",
images: '["milvus","pytest","helm"]'

milvus_image_tag = tekton.query_result job_name, 'milvus-image-tag'
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/mmap/ChunkedColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ class ChunkedColumnBase : public ColumnBase {

class ChunkedColumn : public ChunkedColumnBase {
public:
ChunkedColumn() = default;
// memory mode ctor
ChunkedColumn(const FieldMeta& field_meta) : ChunkedColumnBase(field_meta) {
}
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ class SingleChunkVariableColumn : public SingleChunkColumnBase {
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews() const override {
std::vector<std::string_view> res;
res.reserve(num_rows_);
char* pos = data_;
for (size_t i = 0; i < num_rows_; ++i) {
uint32_t size;
Expand All @@ -696,6 +697,7 @@ class SingleChunkVariableColumn : public SingleChunkColumnBase {
[[nodiscard]] std::vector<ViewType>
Views() const {
std::vector<ViewType> res;
res.reserve(num_rows_);
char* pos = data_;
for (size_t i = 0; i < num_rows_; ++i) {
uint32_t size;
Expand Down
11 changes: 8 additions & 3 deletions internal/core/src/query/SearchOnSealed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,30 +130,35 @@ SearchOnSealed(const Schema& schema,
chunk_size);
bitset_ptr = reinterpret_cast<const uint8_t*>(bitset_data);
}
offset += chunk_size;
BitsetView bitset_view(bitset_ptr, chunk_size);

if (search_info.group_by_field_id_.has_value()) {
auto sub_qr = BruteForceSearchIterators(dataset,
vec_data,
row_count,
chunk_size,
search_info,
bitset_view,
data_type);
final_qr.merge(sub_qr);
} else {
auto sub_qr = BruteForceSearch(dataset,
vec_data,
row_count,
chunk_size,
search_info,
bitset_view,
data_type);
for (auto& o : sub_qr.mutable_seg_offsets()) {
if (o != -1) {
o += offset;
}
}
final_qr.merge(sub_qr);
}

if (!aligned) {
delete[] bitset_ptr;
}
offset += chunk_size;
}
if (search_info.group_by_field_id_.has_value()) {
result.AssembleChunkVectorIterators(
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class OffsetOrderedArray : public OffsetMap {
[](const std::pair<T, int64_t>& elem,
const T& value) { return elem.first < value; });

return it != array_.end();
return it != array_.end() && it->first == target;
}

std::vector<int64_t>
Expand Down
1 change: 1 addition & 0 deletions internal/core/unittest/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ set(MILVUS_TEST_FILES
test_timestamp_index.cpp
test_tracer.cpp
test_utils.cpp
test_chunked_segment.cpp
)

if ( INDEX_ENGINE STREQUAL "cardinal" )
Expand Down
110 changes: 110 additions & 0 deletions internal/core/unittest/test_chunked_segment.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include <gtest/gtest.h>
#include <algorithm>
#include <cstdint>
#include "common/BitsetView.h"
#include "common/QueryInfo.h"
#include "common/Schema.h"
#include "knowhere/comp/index_param.h"
#include "mmap/ChunkedColumn.h"
#include "query/SearchOnSealed.h"
#include "test_utils/DataGen.h"
#include <vector>

struct DeferRelease {
using functype = std::function<void()>;
void
AddDefer(const functype& closure) {
closures.push_back(closure);
}

~DeferRelease() {
for (auto& closure : closures) {
closure();
}
}

std::vector<functype> closures;
};

using namespace milvus;
TEST(test_chunk_segment, TestSearchOnSealed) {
DeferRelease defer;

int dim = 16;
int chunk_num = 3;
int chunk_size = 100;
int total_row_count = chunk_num * chunk_size;
int bitset_size = (total_row_count + 7) / 8;
int chunk_bitset_size = (chunk_size + 7) / 8;

auto column = std::make_shared<ChunkedColumn>();
auto schema = std::make_shared<Schema>();
auto fakevec_id = schema->AddDebugField(
"fakevec", DataType::VECTOR_FLOAT, dim, knowhere::metric::COSINE);

for (int i = 0; i < chunk_num; i++) {
auto dataset = segcore::DataGen(schema, chunk_size);
auto data = dataset.get_col<float>(fakevec_id);
auto buf_size = chunk_bitset_size + 4 * data.size();

char* buf = new char[buf_size];
defer.AddDefer([buf]() { delete[] buf; });
memcpy(buf + chunk_bitset_size, data.data(), 4 * data.size());

auto chunk = std::make_shared<FixedWidthChunk>(
chunk_size, dim, buf, buf_size, 4, false);
column->AddChunk(chunk);
}

SearchInfo search_info;
auto search_conf = knowhere::Json{
{knowhere::meta::METRIC_TYPE, knowhere::metric::COSINE},
};
search_info.search_params_ = search_conf;
search_info.field_id_ = fakevec_id;
search_info.metric_type_ = knowhere::metric::COSINE;
// expect to return all rows
search_info.topk_ = total_row_count;

uint8_t* bitset_data = new uint8_t[bitset_size];
defer.AddDefer([bitset_data]() { delete[] bitset_data; });
std::fill(bitset_data, bitset_data + bitset_size, 0);
BitsetView bv(bitset_data, total_row_count);

auto query_ds = segcore::DataGen(schema, 1);
auto col_query_data = query_ds.get_col<float>(fakevec_id);
auto query_data = col_query_data.data();
SearchResult search_result;

query::SearchOnSealed(*schema,
column,
search_info,
query_data,
1,
chunk_size * chunk_num,
bv,
search_result);

std::set<int64_t> offsets;
for (auto& offset : search_result.seg_offsets_) {
if (offset != -1) {
offsets.insert(offset);
}
}
// check all rows are returned
ASSERT_EQ(total_row_count, offsets.size());
for (int i = 0; i < total_row_count; i++) {
ASSERT_TRUE(offsets.find(i) != offsets.end());
}
}
3 changes: 2 additions & 1 deletion internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionWithBM25Function() {

// 8 + 8 + 8 + 7 + 8 = 39
// 39*1024 = 39936
// plus buffer on null bitsets etc., let's make it 45000
// writer will automatically flush after 1024 rows.
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "39935")
paramtable.Get().Save(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key, "45000")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.BinLogMaxSize.Key)

compactionResult, err := s.task.Compact()
Expand Down
28 changes: 15 additions & 13 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,12 @@ func (t *mixCompactionTask) mergeSplit(
return nil, err
}
for _, paths := range binlogPaths {
err := t.dealBinlogPaths(ctx, delta, mWriter, pkField, paths, &deletedRowCount, &expiredRowCount)
del, exp, err := t.writePaths(ctx, delta, mWriter, pkField, paths)
if err != nil {
return nil, err
}
deletedRowCount += del
expiredRowCount += exp
}
res, err := mWriter.Finish()
if err != nil {
Expand Down Expand Up @@ -186,12 +188,14 @@ func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp)
return false
}

func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp, mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string, deletedRowCount, expiredRowCount *int64) error {
func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp,
mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string,
) (deletedRowCount, expiredRowCount int64, err error) {
log := log.With(zap.Strings("paths", paths))
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return err
return
}

blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
Expand All @@ -201,42 +205,40 @@ func (t *mixCompactionTask) dealBinlogPaths(ctx context.Context, delta map[inter
iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return err
return
}
defer iter.Close()

for {
err := iter.Next()
err = iter.Next()
if err != nil {
if err == sio.EOF {
err = nil
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return err
return
}
}
v := iter.Value()
if isValueDeleted(v, delta) {
oldDeletedRowCount := *deletedRowCount
*deletedRowCount = oldDeletedRowCount + 1
deletedRowCount++
continue
}

// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(v.Timestamp)) {
oldExpiredRowCount := *expiredRowCount
*expiredRowCount = oldExpiredRowCount + 1
expiredRowCount++
continue
}

err = mWriter.Write(v)
if err != nil {
log.Warn("compact wrong, failed to writer row", zap.Error(err))
return err
return
}
}

return nil
return
}

func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
Expand Down
Loading

0 comments on commit c80724e

Please sign in to comment.