Skip to content

Commit

Permalink
enhance: add bitmap offset cache to speed up retrieve raw data (#35498)
Browse files Browse the repository at this point in the history
#35458

Signed-off-by: luzhang <[email protected]>
Co-authored-by: luzhang <[email protected]>
  • Loading branch information
zhagnlu and luzhang authored Aug 23, 2024
1 parent 75da36d commit 42f7800
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 21 deletions.
49 changes: 47 additions & 2 deletions internal/core/src/index/BitmapIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ BitmapIndex<T>::Serialize(const Config& config) {
ret_set.Append(BITMAP_INDEX_META, index_meta.first, index_meta.second);

LOG_INFO("build bitmap index with cardinality = {}, num_rows = {}",
Cardinality(),
data_.size(),
total_num_rows_);

Disassemble(ret_set);
Expand Down Expand Up @@ -345,6 +345,31 @@ BitmapIndex<T>::DeserializeIndexData(const uint8_t* data_ptr,
}
}

template <typename T>
void
BitmapIndex<T>::BuildOffsetCache() {
if (build_mode_ == BitmapIndexBuildMode::ROARING) {
data_offsets_cache_.resize(total_num_rows_);
for (auto it = data_.begin(); it != data_.end(); it++) {
for (const auto& v : it->second) {
data_offsets_cache_[v] = it;
}
}
} else {
for (auto it = bitsets_.begin(); it != bitsets_.end(); it++) {
bitsets_offsets_cache_.resize(total_num_rows_);
const auto& bits = it->second;
for (int i = 0; i < bits.size(); i++) {
if (bits[i]) {
bitsets_offsets_cache_[i] = it;
}
}
}
}
use_offset_cache_ = true;
LOG_INFO("build offset cache for bitmap index");
}

template <>
void
BitmapIndex<std::string>::DeserializeIndexData(const uint8_t* data_ptr,
Expand Down Expand Up @@ -377,6 +402,9 @@ template <typename T>
void
BitmapIndex<T>::LoadWithoutAssemble(const BinarySet& binary_set,
const Config& config) {
auto enable_offset_cache =
GetValueFromConfig<bool>(config, ENABLE_OFFSET_CACHE);

auto index_meta_buffer = binary_set.GetByName(BITMAP_INDEX_META);
auto index_meta = DeserializeIndexMeta(index_meta_buffer->data.get(),
index_meta_buffer->size);
Expand All @@ -387,6 +415,10 @@ BitmapIndex<T>::LoadWithoutAssemble(const BinarySet& binary_set,
auto index_data_buffer = binary_set.GetByName(BITMAP_INDEX_DATA);
DeserializeIndexData(index_data_buffer->data.get(), index_length);

if (enable_offset_cache.has_value() && enable_offset_cache.value()) {
BuildOffsetCache();
}

LOG_INFO("load bitmap index with cardinality = {}, num_rows = {}",
Cardinality(),
total_num_rows_);
Expand Down Expand Up @@ -575,7 +607,6 @@ BitmapIndex<T>::RangeForRoaring(const T value, const OpType op) {
}
auto lb = data_.begin();
auto ub = data_.end();

switch (op) {
case OpType::LessThan: {
ub = std::lower_bound(data_.begin(),
Expand Down Expand Up @@ -758,12 +789,26 @@ BitmapIndex<T>::RangeForRoaring(const T lower_value,
return res;
}

template <typename T>
T
BitmapIndex<T>::Reverse_Lookup_InCache(size_t idx) const {
if (build_mode_ == BitmapIndexBuildMode::ROARING) {
return data_offsets_cache_[idx]->first;
} else {
return bitsets_offsets_cache_[idx]->first;
}
}

template <typename T>
T
BitmapIndex<T>::Reverse_Lookup(size_t idx) const {
AssertInfo(is_built_, "index has not been built");
AssertInfo(idx < total_num_rows_, "out of range of total coun");

if (use_offset_cache_) {
return Reverse_Lookup_InCache(idx);
}

if (build_mode_ == BitmapIndexBuildMode::ROARING) {
for (auto it = data_.begin(); it != data_.end(); it++) {
for (const auto& v : it->second) {
Expand Down
11 changes: 11 additions & 0 deletions internal/core/src/index/BitmapIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ class BitmapIndex : public ScalarIndex<T> {
void
DeserializeIndexData(const uint8_t* data_ptr, size_t index_length);

void
BuildOffsetCache();

T
Reverse_Lookup_InCache(size_t idx) const;

void
ChooseIndexLoadMode(int64_t index_length);

Expand Down Expand Up @@ -210,6 +216,11 @@ class BitmapIndex : public ScalarIndex<T> {
std::map<T, TargetBitmap> bitsets_;
size_t total_num_rows_{0};
proto::schema::FieldSchema schema_;
bool use_offset_cache_{false};
std::vector<typename std::map<T, roaring::Roaring>::iterator>
data_offsets_cache_;
std::vector<typename std::map<T, TargetBitmap>::iterator>
bitsets_offsets_cache_;
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;

// generate valid_bitset to speed up NotIn and IsNull and IsNotNull operate
Expand Down
3 changes: 0 additions & 3 deletions internal/core/src/index/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
#include "common/Tracer.h"
#include "common/Types.h"

const std::string kMmapFilepath = "mmap_filepath";
const std::string kEnableMmap = "enable_mmap";

namespace milvus::index {

class IndexBase {
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/Meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ constexpr const char* INDEX_ENGINE_VERSION = "index_engine_version";
constexpr const char* BITMAP_INDEX_CARDINALITY_LIMIT =
"bitmap_cardinality_limit";

// index config key
constexpr const char* MMAP_FILE_PATH = "mmap_filepath";
constexpr const char* ENABLE_MMAP = "enable_mmap";
constexpr const char* INDEX_FILES = "index_files";
constexpr const char* ENABLE_OFFSET_CACHE = "indexoffsetcache.enabled";

// VecIndex file metas
constexpr const char* DISK_ANN_PREFIX_PATH = "index_prefix";
constexpr const char* DISK_ANN_RAW_DATA_PATH = "data_path";
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/index/StringIndexMarisa.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ StringIndexMarisa::LoadWithoutAssemble(const BinarySet& set,
}

file.Seek(0, SEEK_SET);
if (config.contains(kEnableMmap)) {
if (config.contains(ENABLE_MMAP)) {
trie_.mmap(file_name.c_str());
} else {
trie_.read(file.Descriptor());
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <tuple>
#include <map>
#include <string>
#include <boost/algorithm/string.hpp>

#include "common/Types.h"
#include "common/FieldData.h"
Expand Down Expand Up @@ -79,7 +80,12 @@ void inline CheckParameter(Config& conf,
template <typename T>
inline std::optional<T>
GetValueFromConfig(const Config& cfg, const std::string& key) {
// cfg value are all string type
if (cfg.contains(key)) {
if constexpr (std::is_same_v<T, bool>) {
return boost::algorithm::to_lower_copy(
cfg.at(key).get<std::string>()) == "true";
}
return cfg.at(key).get<T>();
}
return std::nullopt;
Expand Down
6 changes: 3 additions & 3 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,9 @@ VectorDiskAnnIndex<T>::update_load_json(const Config& config) {
}
}

if (config.contains(kMmapFilepath)) {
load_config.erase(kMmapFilepath);
load_config[kEnableMmap] = true;
if (config.contains(MMAP_FILE_PATH)) {
load_config.erase(MMAP_FILE_PATH);
load_config[ENABLE_MMAP] = true;
}

return load_config;
Expand Down
9 changes: 5 additions & 4 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "index/Index.h"
#include "index/IndexInfo.h"
#include "index/Meta.h"
#include "index/Utils.h"
#include "common/EasyAssert.h"
#include "config/ConfigKnowhere.h"
Expand Down Expand Up @@ -142,7 +143,7 @@ template <typename T>
void
VectorMemIndex<T>::Load(milvus::tracer::TraceContext ctx,
const Config& config) {
if (config.contains(kMmapFilepath)) {
if (config.contains(MMAP_FILE_PATH)) {
return LoadFromFile(config);
}

Expand Down Expand Up @@ -483,7 +484,7 @@ VectorMemIndex<T>::GetSparseVector(const DatasetPtr dataset) const {

template <typename T>
void VectorMemIndex<T>::LoadFromFile(const Config& config) {
auto filepath = GetValueFromConfig<std::string>(config, kMmapFilepath);
auto filepath = GetValueFromConfig<std::string>(config, MMAP_FILE_PATH);
AssertInfo(filepath.has_value(), "mmap filepath is empty when load index");

std::filesystem::create_directories(
Expand Down Expand Up @@ -598,8 +599,8 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {

LOG_INFO("load index into Knowhere...");
auto conf = config;
conf.erase(kMmapFilepath);
conf[kEnableMmap] = true;
conf.erase(MMAP_FILE_PATH);
conf[ENABLE_MMAP] = true;
auto start_deserialize = std::chrono::system_clock::now();
auto stat = index_.DeserializeFromFile(filepath.value(), conf);
auto deserialize_duration =
Expand Down
6 changes: 3 additions & 3 deletions internal/core/src/segcore/load_index_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
static_cast<milvus::segcore::LoadIndexInfo*>(c_load_index_info);
auto& index_params = load_index_info->index_params;
auto field_type = load_index_info->field_type;

auto engine_version = load_index_info->index_engine_version;

milvus::index::CreateIndexInfo index_info;
Expand Down Expand Up @@ -271,7 +270,7 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {

auto config = milvus::index::ParseConfigFromIndexParams(
load_index_info->index_params);
config["index_files"] = load_index_info->index_files;
config[milvus::index::INDEX_FILES] = load_index_info->index_files;

milvus::storage::FileManagerContext fileManagerContext(
field_meta, index_meta, remote_chunk_manager);
Expand All @@ -289,9 +288,10 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) {
std::to_string(load_index_info->field_id) /
std::to_string(load_index_info->index_id);

config[kMmapFilepath] = filepath.string();
config[milvus::index::MMAP_FILE_PATH] = filepath.string();
}

LOG_DEBUG("load index with configs: {}", config.dump());
load_index_info->index->Load(ctx, config);

span->End();
Expand Down
8 changes: 7 additions & 1 deletion internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,19 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
func ValidateIndexParams(index *model.Index) error {
indexType := GetIndexType(index.IndexParams)
indexParams := funcutil.KeyValuePair2Map(index.IndexParams)
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
if err := indexparamcheck.ValidateMmapIndexParams(indexType, indexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid mmap index params", err.Error())
}
userIndexParams := funcutil.KeyValuePair2Map(index.UserIndexParams)
if err := indexparamcheck.ValidateMmapIndexParams(indexType, userIndexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid mmap user index params", err.Error())
}
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, indexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
}
if err := indexparamcheck.ValidateOffsetCacheIndexParams(indexType, userIndexParams); err != nil {
return merr.WrapErrParameterInvalidMsg("invalid offset cache index params", err.Error())
}
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@ const (

// common properties
const (
MmapEnabledKey = "mmap.enabled"
LazyLoadEnableKey = "lazyload.enabled"
PartitionKeyIsolationKey = "partitionkey.isolation"
FieldSkipLoadKey = "field.skipLoad"
MmapEnabledKey = "mmap.enabled"
LazyLoadEnableKey = "lazyload.enabled"
PartitionKeyIsolationKey = "partitionkey.isolation"
FieldSkipLoadKey = "field.skipLoad"
IndexOffsetCacheEnabledKey = "indexoffsetcache.enabled"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/util/indexparamcheck/bitmap_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ func Test_BitmapIndexChecker(t *testing.T) {
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Double}))
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Float}))
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Array, ElementType: schemapb.DataType_Double}))
assert.Error(t, c.CheckValidDataType(&schemapb.FieldSchema{DataType: schemapb.DataType_Double, IsPrimaryKey: true}))
}
3 changes: 3 additions & 0 deletions pkg/util/indexparamcheck/bitmap_index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ func (c *BITMAPChecker) CheckTrain(params map[string]string) error {
}

func (c *BITMAPChecker) CheckValidDataType(field *schemapb.FieldSchema) error {
if field.IsPrimaryKey {
return fmt.Errorf("create bitmap index on primary key not supported")
}
mainType := field.GetDataType()
elemType := field.GetElementType()
if !typeutil.IsBoolType(mainType) && !typeutil.IsIntegerType(mainType) &&
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/indexparamcheck/index_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func IsVectorMmapIndex(indexType IndexType) bool {
indexType == IndexSparseWand
}

func IsOffsetCacheSupported(indexType IndexType) bool {
return indexType == IndexBitmap
}

func IsDiskIndex(indexType IndexType) bool {
return indexType == IndexDISKANN
}
Expand All @@ -96,3 +100,18 @@ func ValidateMmapIndexParams(indexType IndexType, indexParams map[string]string)
}
return nil
}

func ValidateOffsetCacheIndexParams(indexType IndexType, indexParams map[string]string) error {
offsetCacheEnable, ok := indexParams[common.IndexOffsetCacheEnabledKey]
if !ok {
return nil
}
enable, err := strconv.ParseBool(offsetCacheEnable)
if err != nil {
return fmt.Errorf("invalid %s value: %s, expected: true, false", common.IndexOffsetCacheEnabledKey, offsetCacheEnable)
}
if enable && IsOffsetCacheSupported(indexType) {
return fmt.Errorf("only bitmap index support %s now", common.IndexOffsetCacheEnabledKey)
}
return nil
}
1 change: 1 addition & 0 deletions pkg/util/indexparams/index_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var configableIndexParams = typeutil.NewSet[string]()

func init() {
configableIndexParams.Insert(common.MmapEnabledKey)
configableIndexParams.Insert(common.IndexOffsetCacheEnabledKey)
}

func IsConfigableIndexParam(key string) bool {
Expand Down

0 comments on commit 42f7800

Please sign in to comment.