From 5685814f5771eb62cf59e7d027cbacde61f60bb8 Mon Sep 17 00:00:00 2001 From: luzhang Date: Tue, 15 Oct 2024 16:07:17 +0800 Subject: [PATCH] add json key inverted index in stats to speed up json expr Signed-off-by: luzhang --- internal/core/src/common/Consts.h | 1 + internal/core/src/common/Json.h | 74 +++ internal/core/src/common/jsmn.h | 498 ++++++++++++++++++ internal/core/src/exec/expression/Expr.h | 14 + .../core/src/exec/expression/TermExpr.cpp | 76 +++ internal/core/src/exec/expression/TermExpr.h | 4 + .../core/src/index/InvertedIndexTantivy.cpp | 1 + .../core/src/index/JsonKeyInvertedIndex.cpp | 249 +++++++++ .../core/src/index/JsonKeyInvertedIndex.h | 97 ++++ internal/core/src/indexbuilder/index_c.cpp | 68 +++ internal/core/src/indexbuilder/index_c.h | 5 + internal/core/src/mmap/ChunkedColumn.h | 8 +- internal/core/src/mmap/Column.h | 8 +- .../src/segcore/ChunkedSegmentSealedImpl.h | 16 + internal/core/src/segcore/SegmentInterface.h | 1 + internal/core/src/segcore/SegmentSealed.h | 10 + .../core/src/segcore/SegmentSealedImpl.cpp | 29 + internal/core/src/segcore/SegmentSealedImpl.h | 16 + internal/core/src/segcore/segment_c.cpp | 54 ++ internal/core/src/segcore/segment_c.h | 6 + .../core/src/storage/DiskFileManagerImpl.cpp | 197 +++---- .../core/src/storage/DiskFileManagerImpl.h | 41 +- internal/core/src/storage/Util.cpp | 22 + internal/core/src/storage/Util.h | 13 + .../tantivy-binding/include/tantivy-binding.h | 2 +- .../tantivy/tantivy-binding/src/array.rs | 4 +- .../tantivy-binding/src/docid_collector.rs | 8 +- .../tantivy-binding/src/index_reader.rs | 32 +- .../tantivy-binding/src/index_reader_text.rs | 2 +- .../tantivy-binding/src/vec_collector.rs | 10 +- internal/core/unittest/test_c_api.cpp | 384 ++++++++++++++ internal/datacoord/job_manager.go | 32 ++ internal/datacoord/meta.go | 3 +- internal/datacoord/segment_operator.go | 12 + internal/datacoord/task_stats.go | 7 + internal/indexnode/indexnode_service.go | 23 +- internal/indexnode/task_stats.go | 104 ++++ internal/indexnode/taskinfo_ops.go | 66 ++- internal/proto/data_coord.proto | 12 + internal/proto/index_cgo_msg.proto | 10 + internal/proto/index_coord.proto | 1 + internal/proto/query_coord.proto | 1 + internal/proto/worker.proto | 1 + internal/querycoordv2/utils/types.go | 33 +- internal/querynodev2/segments/segment.go | 34 ++ .../querynodev2/segments/segment_loader.go | 25 +- internal/util/indexcgowrapper/index.go | 37 ++ pkg/util/typeutil/field_schema.go | 7 + 48 files changed, 2160 insertions(+), 198 deletions(-) create mode 100644 internal/core/src/common/jsmn.h create mode 100644 internal/core/src/index/JsonKeyInvertedIndex.cpp create mode 100644 internal/core/src/index/JsonKeyInvertedIndex.h diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index 1f9d51e447680..f8bde3598d6b8 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -47,6 +47,7 @@ const char KMEANS_CLUSTER[] = "KMEANS"; const char VEC_OPT_FIELDS[] = "opt_fields"; const char PAGE_RETAIN_ORDER[] = "page_retain_order"; const char TEXT_LOG_ROOT_PATH[] = "text_log"; +const char JSON_KEY_INDEX_LOG_ROOT_PATH[] = "json_key_index_log"; const char DEFAULT_PLANNODE_ID[] = "0"; const char DEAFULT_QUERY_ID[] = "0"; diff --git a/internal/core/src/common/Json.h b/internal/core/src/common/Json.h index 297dbcbdcca77..60b0764135b24 100644 --- a/internal/core/src/common/Json.h +++ b/internal/core/src/common/Json.h @@ -71,6 +71,35 @@ ExtractSubJson(const std::string& json, const std::vector& keys) { return buffer.GetString(); } +inline std::pair +ParseTopLevelKey(const std::string& json_pointer, bool escaped = false) { + if (json_pointer.empty()) { + return {"", ""}; + } + + Assert(json_pointer[0] == '/'); + size_t start = 1; + size_t end = json_pointer.find('/', start); + + std::string top_key = (end == std::string::npos) + ? json_pointer.substr(start) + : json_pointer.substr(start, end - start); + + if (escaped) { + if (top_key.find("~0") != std::string::npos) { + top_key.replace(top_key.find("~0"), 2, "~"); + } + if (top_key.find("~1") != std::string::npos) { + top_key.replace(top_key.find("~1"), 2, "/"); + } + } + + std::string remaining_path = + (end == std::string::npos) ? "" : json_pointer.substr(end); + + return {top_key, remaining_path}; +} + using document = simdjson::ondemand::document; template using value_result = simdjson::simdjson_result; @@ -146,6 +175,25 @@ class Json { return doc; } + value_result + doc(uint16_t offset, uint16_t length) const { + thread_local simdjson::ondemand::parser parser; + + // it's always safe to add the padding, + // as we have allocated the memory with this padding + auto doc = parser.iterate( + data_.data() + offset, length, length + simdjson::SIMDJSON_PADDING); + AssertInfo(doc.error() == simdjson::SUCCESS, + "failed to parse the json {} offset {}, length {}: {}, " + "total_json:{}", + std::string(data_.data() + offset, length), + offset, + length, + simdjson::error_message(doc.error()), + data_); + return doc; + } + value_result dom_doc() const { thread_local simdjson::dom::parser parser; @@ -160,6 +208,21 @@ class Json { return doc; } + value_result + dom_doc(uint16_t offset, uint16_t length) const { + thread_local simdjson::dom::parser parser; + + // it's always safe to add the padding, + // as we have allocated the memory with this padding + auto doc = parser.parse(data_.data() + offset, + length + simdjson::SIMDJSON_PADDING); + AssertInfo(doc.error() == simdjson::SUCCESS, + "failed to parse the json {}: {}", + std::string(data_.data() + offset, length), + simdjson::error_message(doc.error())); + return doc; + } + bool exist(std::string_view pointer) const { return doc().at_pointer(pointer).error() == simdjson::SUCCESS; @@ -199,6 +262,17 @@ class Json { return doc().at_pointer(pointer).get(); } + template + value_result + at(uint16_t offset, uint16_t length) const { + return doc(offset, length).get(); + } + + value_result + array_at(uint16_t offset, uint16_t length) const { + return dom_doc(offset, length).get_array(); + } + // get dom array by JSON pointer, // call `size()` to get array size, // call `at()` to get array element by index, diff --git a/internal/core/src/common/jsmn.h b/internal/core/src/common/jsmn.h new file mode 100644 index 0000000000000..f20b56ba48a68 --- /dev/null +++ b/internal/core/src/common/jsmn.h @@ -0,0 +1,498 @@ +/* + * MIT License + * + * Copyright (c) 2010 Serge Zaitsev + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#ifndef JSMN_H +#define JSMN_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define JSMN_STATIC +#ifdef JSMN_STATIC +#define JSMN_API static +#else +#define JSMN_API extern +#endif + +/** + * JSON type identifier. Basic types are: + * o Object + * o Array + * o String + * o Other primitive: number, boolean (true/false) or null + */ +typedef enum { + JSMN_UNDEFINED = 0, + JSMN_OBJECT = 1 << 0, + JSMN_ARRAY = 1 << 1, + JSMN_STRING = 1 << 2, + JSMN_PRIMITIVE = 1 << 3 +} jsmntype_t; + +enum jsmnerr { + /* Not enough tokens were provided */ + JSMN_ERROR_NOMEM = -1, + /* Invalid character inside JSON string */ + JSMN_ERROR_INVAL = -2, + /* The string is not a full JSON packet, more bytes expected */ + JSMN_ERROR_PART = -3 +}; + +/** + * JSON token description. + * type type (object, array, string etc.) + * start start position in JSON data string + * end end position in JSON data string + */ +typedef struct jsmntok { + jsmntype_t type; + int start; + int end; + int size; +#ifdef JSMN_PARENT_LINKS + int parent; +#endif +} jsmntok_t; + +/** + * JSON parser. Contains an array of token blocks available. Also stores + * the string being parsed now and current position in that string. + */ +typedef struct jsmn_parser { + unsigned int pos; /* offset in the JSON string */ + unsigned int toknext; /* next token to allocate */ + int toksuper; /* superior token node, e.g. parent object or array */ +} jsmn_parser; + +/** + * Create JSON parser over an array of tokens + */ +JSMN_API void +jsmn_init(jsmn_parser* parser); + +/** + * Run JSON parser. It parses a JSON data string into and array of tokens, each + * describing + * a single JSON object. + */ +JSMN_API int +jsmn_parse(jsmn_parser* parser, + const char* js, + const size_t len, + jsmntok_t* tokens, + const unsigned int num_tokens); + +#ifndef JSMN_HEADER +/** + * Allocates a fresh unused token from the token pool. + */ +static jsmntok_t* +jsmn_alloc_token(jsmn_parser* parser, + jsmntok_t* tokens, + const size_t num_tokens) { + jsmntok_t* tok; + if (parser->toknext >= num_tokens) { + return NULL; + } + tok = &tokens[parser->toknext++]; + tok->start = tok->end = -1; + tok->size = 0; +#ifdef JSMN_PARENT_LINKS + tok->parent = -1; +#endif + return tok; +} + +/** + * Fills token type and boundaries. + */ +static void +jsmn_fill_token(jsmntok_t* token, + const jsmntype_t type, + const int start, + const int end) { + token->type = type; + token->start = start; + token->end = end; + token->size = 0; +} + +/** + * Fills next available token with JSON primitive. + */ +static int +jsmn_parse_primitive(jsmn_parser* parser, + const char* js, + const size_t len, + jsmntok_t* tokens, + const size_t num_tokens) { + jsmntok_t* token; + int start; + + start = parser->pos; + + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { + switch (js[parser->pos]) { +#ifndef JSMN_STRICT + /* In strict mode primitive must be followed by "," or "}" or "]" */ + case ':': +#endif + case '\t': + case '\r': + case '\n': + case ' ': + case ',': + case ']': + case '}': + goto found; + default: + /* to quiet a warning from gcc*/ + break; + } + if (js[parser->pos] < 32 || js[parser->pos] >= 127) { + parser->pos = start; + return JSMN_ERROR_INVAL; + } + } +#ifdef JSMN_STRICT + /* In strict mode primitive must be followed by a comma/object/array */ + parser->pos = start; + return JSMN_ERROR_PART; +#endif + +found: + if (tokens == NULL) { + parser->pos--; + return 0; + } + token = jsmn_alloc_token(parser, tokens, num_tokens); + if (token == NULL) { + parser->pos = start; + return JSMN_ERROR_NOMEM; + } + jsmn_fill_token(token, JSMN_PRIMITIVE, start, parser->pos); +#ifdef JSMN_PARENT_LINKS + token->parent = parser->toksuper; +#endif + parser->pos--; + return 0; +} + +/** + * Fills next token with JSON string. + */ +static int +jsmn_parse_string(jsmn_parser* parser, + const char* js, + const size_t len, + jsmntok_t* tokens, + const size_t num_tokens) { + jsmntok_t* token; + + int start = parser->pos; + + /* Skip starting quote */ + parser->pos++; + + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { + char c = js[parser->pos]; + + /* Quote: end of string */ + if (c == '\"') { + if (tokens == NULL) { + return 0; + } + token = jsmn_alloc_token(parser, tokens, num_tokens); + if (token == NULL) { + parser->pos = start; + return JSMN_ERROR_NOMEM; + } + jsmn_fill_token(token, JSMN_STRING, start + 1, parser->pos); +#ifdef JSMN_PARENT_LINKS + token->parent = parser->toksuper; +#endif + return 0; + } + + /* Backslash: Quoted symbol expected */ + if (c == '\\' && parser->pos + 1 < len) { + int i; + parser->pos++; + switch (js[parser->pos]) { + /* Allowed escaped symbols */ + case '\"': + case '/': + case '\\': + case 'b': + case 'f': + case 'r': + case 'n': + case 't': + break; + /* Allows escaped symbol \uXXXX */ + case 'u': + parser->pos++; + for (i = 0; + i < 4 && parser->pos < len && js[parser->pos] != '\0'; + i++) { + /* If it isn't a hex character we have an error */ + if (!((js[parser->pos] >= 48 && + js[parser->pos] <= 57) || /* 0-9 */ + (js[parser->pos] >= 65 && + js[parser->pos] <= 70) || /* A-F */ + (js[parser->pos] >= 97 && + js[parser->pos] <= 102))) { /* a-f */ + parser->pos = start; + return JSMN_ERROR_INVAL; + } + parser->pos++; + } + parser->pos--; + break; + /* Unexpected symbol */ + default: + parser->pos = start; + return JSMN_ERROR_INVAL; + } + } + } + parser->pos = start; + return JSMN_ERROR_PART; +} + +/** + * Parse JSON string and fill tokens. + */ +JSMN_API int +jsmn_parse(jsmn_parser* parser, + const char* js, + const size_t len, + jsmntok_t* tokens, + const unsigned int num_tokens) { + int r; + int i; + jsmntok_t* token; + int count = parser->toknext; + + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { + char c; + jsmntype_t type; + + c = js[parser->pos]; + switch (c) { + case '{': + case '[': + count++; + if (tokens == NULL) { + break; + } + token = jsmn_alloc_token(parser, tokens, num_tokens); + if (token == NULL) { + return JSMN_ERROR_NOMEM; + } + if (parser->toksuper != -1) { + jsmntok_t* t = &tokens[parser->toksuper]; +#ifdef JSMN_STRICT + /* In strict mode an object or array can't become a key */ + if (t->type == JSMN_OBJECT) { + return JSMN_ERROR_INVAL; + } +#endif + t->size++; +#ifdef JSMN_PARENT_LINKS + token->parent = parser->toksuper; +#endif + } + token->type = (c == '{' ? JSMN_OBJECT : JSMN_ARRAY); + token->start = parser->pos; + parser->toksuper = parser->toknext - 1; + break; + case '}': + case ']': + if (tokens == NULL) { + break; + } + type = (c == '}' ? JSMN_OBJECT : JSMN_ARRAY); +#ifdef JSMN_PARENT_LINKS + if (parser->toknext < 1) { + return JSMN_ERROR_INVAL; + } + token = &tokens[parser->toknext - 1]; + for (;;) { + if (token->start != -1 && token->end == -1) { + if (token->type != type) { + return JSMN_ERROR_INVAL; + } + token->end = parser->pos + 1; + parser->toksuper = token->parent; + break; + } + if (token->parent == -1) { + if (token->type != type || parser->toksuper == -1) { + return JSMN_ERROR_INVAL; + } + break; + } + token = &tokens[token->parent]; + } +#else + for (i = parser->toknext - 1; i >= 0; i--) { + token = &tokens[i]; + if (token->start != -1 && token->end == -1) { + if (token->type != type) { + return JSMN_ERROR_INVAL; + } + parser->toksuper = -1; + token->end = parser->pos + 1; + break; + } + } + /* Error if unmatched closing bracket */ + if (i == -1) { + return JSMN_ERROR_INVAL; + } + for (; i >= 0; i--) { + token = &tokens[i]; + if (token->start != -1 && token->end == -1) { + parser->toksuper = i; + break; + } + } +#endif + break; + case '\"': + r = jsmn_parse_string(parser, js, len, tokens, num_tokens); + if (r < 0) { + return r; + } + count++; + if (parser->toksuper != -1 && tokens != NULL) { + tokens[parser->toksuper].size++; + } + break; + case '\t': + case '\r': + case '\n': + case ' ': + break; + case ':': + parser->toksuper = parser->toknext - 1; + break; + case ',': + if (tokens != NULL && parser->toksuper != -1 && + tokens[parser->toksuper].type != JSMN_ARRAY && + tokens[parser->toksuper].type != JSMN_OBJECT) { +#ifdef JSMN_PARENT_LINKS + parser->toksuper = tokens[parser->toksuper].parent; +#else + for (i = parser->toknext - 1; i >= 0; i--) { + if (tokens[i].type == JSMN_ARRAY || + tokens[i].type == JSMN_OBJECT) { + if (tokens[i].start != -1 && tokens[i].end == -1) { + parser->toksuper = i; + break; + } + } + } +#endif + } + break; +#ifdef JSMN_STRICT + /* In strict mode primitives are: numbers and booleans */ + case '-': + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + case 't': + case 'f': + case 'n': + /* And they must not be keys of the object */ + if (tokens != NULL && parser->toksuper != -1) { + const jsmntok_t* t = &tokens[parser->toksuper]; + if (t->type == JSMN_OBJECT || + (t->type == JSMN_STRING && t->size != 0)) { + return JSMN_ERROR_INVAL; + } + } +#else + /* In non-strict mode every unquoted value is a primitive */ + default: +#endif + r = jsmn_parse_primitive(parser, js, len, tokens, num_tokens); + if (r < 0) { + return r; + } + count++; + if (parser->toksuper != -1 && tokens != NULL) { + tokens[parser->toksuper].size++; + } + break; + +#ifdef JSMN_STRICT + /* Unexpected char in strict mode */ + default: + return JSMN_ERROR_INVAL; +#endif + } + } + + if (tokens != NULL) { + for (i = parser->toknext - 1; i >= 0; i--) { + /* Unmatched opened object or array */ + if (tokens[i].start != -1 && tokens[i].end == -1) { + return JSMN_ERROR_PART; + } + } + } + + return count; +} + +/** + * Creates a new parser based over a given buffer with an array of tokens + * available. + */ +JSMN_API void +jsmn_init(jsmn_parser* parser) { + parser->pos = 0; + parser->toknext = 0; + parser->toksuper = -1; +} + +#endif /* JSMN_HEADER */ + +#ifdef __cplusplus +} +#endif + +#endif /* JSMN_H */ diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index 307792a539ac2..bfeb4814f893f 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -27,6 +27,7 @@ #include "exec/QueryContext.h" #include "expr/ITypeExpr.h" #include "query/PlanProto.h" +#include "segcore/SegmentSealedImpl.h" namespace milvus { namespace exec { @@ -714,6 +715,19 @@ class SegmentExpr : public Expr { use_index_ = false; } + bool + CanUseJsonKeyIndex(FieldId field_id) const { + if (segment_->type() == SegmentType::Sealed) { + auto sealed_seg = + dynamic_cast(segment_); + Assert(sealed_seg != nullptr); + if (sealed_seg->GetJsonKeyIndex(field_id) != nullptr) { + return true; + } + } + return false; + } + protected: const segcore::SegmentInternalInterface* segment_; const FieldId field_id_; diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index fcb27a1c747a2..343b7123a853a 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -72,6 +72,7 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { result = ExecVisitorImplTemplateJson(); break; } + auto start = std::chrono::steady_clock::now(); auto type = expr_->vals_[0].val_case(); switch (type) { case proto::plan::GenericValue::ValCase::kBoolVal: @@ -89,6 +90,11 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { default: PanicInfo(DataTypeInvalid, "unknown data type: {}", type); } + std::cout << "optimize cost:" + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count() + << "us" << std::endl; break; } case DataType::ARRAY: { @@ -417,12 +423,82 @@ PhyTermFilterExpr::ExecTermJsonVariableInField() { return res_vec; } +template +VectorPtr +PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() { + using GetType = std::conditional_t, + std::string_view, + ValueType>; + Assert(segment_->type() == SegmentType::Sealed && num_data_chunk_ == 1); + auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ + ? active_count_ - current_data_chunk_pos_ + : batch_size_; + + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); + std::unordered_set term_set; + for (const auto& element : expr_->vals_) { + term_set.insert(GetValueFromProto(element)); + } + + if (term_set.empty()) { + MoveCursor(); + return std::make_shared( + TargetBitmap(real_batch_size, false), + TargetBitmap(real_batch_size, true)); + } + + if (cached_index_chunk_id_ != 0) { + const auto* sealed_seg = + dynamic_cast(segment_); + auto field_id = expr_->column_.field_id_; + auto* index = sealed_seg->GetJsonKeyIndex(field_id); + Assert(index != nullptr); + auto filter_func = [sealed_seg, &term_set, &field_id](uint32_t row_id, + uint16_t offset, + uint16_t size) { + std::cout << row_id << " " << offset << " " << size << std::endl; + auto start = std::chrono::steady_clock::now(); + auto json_pair = sealed_seg->GetJsonData(field_id, row_id); + if (!json_pair.second) { + return false; + } + auto json = + milvus::Json(json_pair.first.data(), json_pair.first.size()); + auto val = json.at(offset, size); + if (val.error()) { + return false; + } + std::cout << "json search cost:" + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count() + << "us" << std::endl; + return term_set.find(ValueType(val.value())) != term_set.end(); + }; + cached_index_chunk_res_ = + index->FilterByPath(pointer, filter_func).clone(); + cached_index_chunk_id_ = 0; + } + + TargetBitmap result; + result.append( + cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); + current_data_chunk_pos_ += real_batch_size; + return std::make_shared(std::move(result), + TargetBitmap(real_batch_size, true)); +} + template VectorPtr PhyTermFilterExpr::ExecTermJsonFieldInVariable() { using GetType = std::conditional_t, std::string_view, ValueType>; + FieldId field_id = expr_->column_.field_id_; + if (CanUseJsonKeyIndex(field_id)) { + return ExecJsonInVariableByKeyIndex(); + } + auto real_batch_size = GetNextBatchSize(); if (real_batch_size == 0) { return nullptr; diff --git a/internal/core/src/exec/expression/TermExpr.h b/internal/core/src/exec/expression/TermExpr.h index a816c6c9c6153..c8688543508e2 100644 --- a/internal/core/src/exec/expression/TermExpr.h +++ b/internal/core/src/exec/expression/TermExpr.h @@ -117,6 +117,10 @@ class PhyTermFilterExpr : public SegmentExpr { VectorPtr ExecTermArrayFieldInVariable(); + template + VectorPtr + ExecJsonInVariableByKeyIndex(); + private: std::shared_ptr expr_; milvus::Timestamp query_timestamp_; diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 1db1fa9d01bff..7b48a585743ae 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -97,6 +97,7 @@ InvertedIndexTantivy::~InvertedIndexTantivy() { auto local_chunk_manager = storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); auto prefix = path_; + LOG_INFO("Inverted index remove path:{}", path_); local_chunk_manager->RemoveDir(prefix); } diff --git a/internal/core/src/index/JsonKeyInvertedIndex.cpp b/internal/core/src/index/JsonKeyInvertedIndex.cpp new file mode 100644 index 0000000000000..eb2d9c4f74afb --- /dev/null +++ b/internal/core/src/index/JsonKeyInvertedIndex.cpp @@ -0,0 +1,249 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include + +#include "index/JsonKeyInvertedIndex.h" +#include "index/InvertedIndexUtil.h" +#include "index/Utils.h" + +namespace milvus::index { +constexpr const char* TMP_JSON_INVERTED_LOG_PREFIX = + "/tmp/milvus/json-key-inverted-index-log/"; + +void +JsonKeyInvertedIndex::AddInvertedRecord(const std::vector& paths, + uint32_t row_id, + uint16_t offset, + uint16_t length) { + auto key = std::string("/") + Join(paths, "."); + LOG_INFO("xxxx insert inverted key: {}, row_id: {}, offset: {}, length:{}", + key, + row_id, + offset, + length); + int64_t combine_id = EncodeOffset(row_id, offset, length); + wrapper_->add_multi_data(&key, 1, combine_id); +} + +void +JsonKeyInvertedIndex::TravelJson(const char* json, + jsmntok* tokens, + int& index, + std::vector& path, + int32_t offset) { + jsmntok current = tokens[0]; + Assert(current.type != JSMN_UNDEFINED); + if (current.type == JSMN_OBJECT) { + int j = 1; + for (int i = 0; i < current.size; i++) { + Assert(tokens[j].type == JSMN_STRING && tokens[j].size != 0); + std::string key(json + tokens[j].start, + tokens[j].end - tokens[j].start); + path.push_back(key); + j++; + int consumed = 0; + TravelJson(json, tokens + j, consumed, path, offset); + path.pop_back(); + j += consumed; + } + index = j; + } else if (current.type == JSMN_PRIMITIVE) { + AddInvertedRecord( + path, offset, current.start, current.end - current.start); + index++; + } else if (current.type == JSMN_ARRAY) { + AddInvertedRecord( + path, offset, current.start, current.end - current.start); + // skip array parse + int count = current.size; + int j = 1; + while (count > 0) { + if (tokens[j].size == 0) { + count--; + } else { + count += tokens[j].size; + } + j++; + } + index = j; + + } else if (current.type == JSMN_STRING) { + Assert(current.size == 0); + AddInvertedRecord( + path, offset, current.start, current.end - current.start); + index++; + } +} + +void +JsonKeyInvertedIndex::AddJson(const char* json, int64_t offset) { + jsmn_parser parser; + jsmntok_t* tokens = (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); + if (!tokens) { + fprintf(stderr, "Memory allocation failed\n"); + return; + } + int num_tokens = 0; + int token_capacity = 16; + + jsmn_init(&parser); + + while (1) { + int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); + if (r < 0) { + if (r == JSMN_ERROR_NOMEM) { + // Reallocate tokens array if not enough space + token_capacity *= 2; + tokens = (jsmntok_t*)realloc( + tokens, token_capacity * sizeof(jsmntok_t)); + if (!tokens) { + PanicInfo(ErrorCode::UnexpectedError, "realloc failed"); + } + continue; + } else { + free(tokens); + PanicInfo(ErrorCode::UnexpectedError, + "Failed to parse Json: {}, error: {}", + json, + int(r)); + } + } + num_tokens = r; + break; + } + + int index = 0; + std::vector paths; + TravelJson(json, tokens, index, paths, offset); +} + +JsonKeyInvertedIndex::JsonKeyInvertedIndex( + const storage::FileManagerContext& ctx, bool is_load) { + schema_ = ctx.fieldDataMeta.field_schema; + field_id_ = ctx.fieldDataMeta.field_id; + mem_file_manager_ = std::make_shared(ctx); + disk_file_manager_ = std::make_shared(ctx); + + if (is_load) { + auto prefix = disk_file_manager_->GetLocalJsonKeyIndexPrefix(); + path_ = prefix; + } else { + auto prefix = disk_file_manager_->GetJsonKeyIndexIdentifier(); + path_ = std::string(TMP_JSON_INVERTED_LOG_PREFIX) + prefix; + + boost::filesystem::create_directories(path_); + std::string field_name = + std::to_string(disk_file_manager_->GetFieldDataMeta().field_id); + d_type_ = TantivyDataType::Keyword; + wrapper_ = std::make_shared( + field_name.c_str(), d_type_, path_.c_str()); + } +} + +BinarySet +JsonKeyInvertedIndex::Upload(const Config& config) { + finish(); + + // std::string path = "/photo_id"; + // auto array = wrapper_->term_query(path); + // std::cout << "xxxdebug filter result, array size:" << array.array_.len + // << std::endl; + + boost::filesystem::path p(path_); + boost::filesystem::directory_iterator end_iter; + + for (boost::filesystem::directory_iterator iter(p); iter != end_iter; + iter++) { + if (boost::filesystem::is_directory(*iter)) { + LOG_WARN("{} is a directory", iter->path().string()); + } else { + LOG_INFO("trying to add json key inverted index log: {}", + iter->path().string()); + AssertInfo( + disk_file_manager_->AddJsonKeyIndexLog(iter->path().string()), + "failed to add json key inverted index log: {}", + iter->path().string()); + LOG_INFO("json key inverted index log: {} added", + iter->path().string()); + } + } + + BinarySet ret; + + auto remote_paths_to_size = disk_file_manager_->GetRemotePathsToFileSize(); + for (auto& file : remote_paths_to_size) { + ret.Append(file.first, nullptr, file.second); + } + + return ret; +} + +void +JsonKeyInvertedIndex::Load(milvus::tracer::TraceContext ctx, + const Config& config) { + auto index_files = + GetValueFromConfig>(config, "index_files"); + AssertInfo(index_files.has_value(), + "index file paths is empty when load json key index"); + + disk_file_manager_->CacheJsonKeyIndexToDisk(index_files.value()); + AssertInfo( + tantivy_index_exist(path_.c_str()), "index not exist: {}", path_); + wrapper_ = std::make_shared(path_.c_str()); + LOG_INFO("load json key index done for field id:{} with dir:{}", + field_id_, + path_); +} + +void +JsonKeyInvertedIndex::BuildWithFieldData( + const std::vector& field_datas) { + AssertInfo(schema_.data_type() == proto::schema::DataType::JSON, + "schema data type is {}", + schema_.data_type()); + if (schema_.nullable()) { + int64_t total = 0; + for (const auto& data : field_datas) { + total += data->get_null_count(); + } + null_offset.reserve(total); + } + int64_t offset = 0; + if (schema_.nullable()) { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + for (int i = 0; i < n; i++) { + if (!data->is_valid(i)) { + null_offset.push_back(i); + } + AddJson(static_cast(data->RawValue(i)) + ->data() + .data(), + offset++); + } + } + } else { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + for (int i = 0; i < n; i++) { + AddJson(static_cast(data->RawValue(i)) + ->data() + .data(), + offset++); + } + } + } + LOG_INFO("build json key index done for field id:{}", field_id_); +} + +} // namespace milvus::index diff --git a/internal/core/src/index/JsonKeyInvertedIndex.h b/internal/core/src/index/JsonKeyInvertedIndex.h new file mode 100644 index 0000000000000..23b4a163a6697 --- /dev/null +++ b/internal/core/src/index/JsonKeyInvertedIndex.h @@ -0,0 +1,97 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include + +#include "index/InvertedIndexTantivy.h" +#include "common/jsmn.h" + +namespace milvus::index { + +using stdclock = std::chrono::high_resolution_clock; +class JsonKeyInvertedIndex : public InvertedIndexTantivy { + public: + explicit JsonKeyInvertedIndex(const storage::FileManagerContext& ctx, + bool is_load); + + ~JsonKeyInvertedIndex() override {}; + + public: + BinarySet + Upload(const Config& config) override; + + void + Load(milvus::tracer::TraceContext ctx, const Config& config) override; + + void + BuildWithFieldData(const std::vector& datas) override; + + const TargetBitmap + FilterByPath(const std::string& path, + std::function filter) { + TargetBitmap bitset(Count()); + auto start = std::chrono::steady_clock::now(); + auto array = wrapper_->term_query(path); + std::cout << "index cost:" + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count() + << "us" << std::endl; + std::cout << "filter size:" << array.array_.len << std::endl; + for (size_t j = 0; j < array.array_.len; j++) { + auto the_offset = array.array_.array[j]; + auto tuple = DecodeOffset(the_offset); + auto row_id = std::get<0>(tuple); + bitset[row_id] = filter( + std::get<0>(tuple), std::get<1>(tuple), std::get<2>(tuple)); + } + return std::move(bitset); + } + + private: + void + AddJson(const char* json, int64_t offset); + + void + TravelJson(const char* json, + jsmntok* tokens, + int& index, + std::vector& path, + int32_t offset); + + void + AddInvertedRecord(const std::vector& paths, + uint32_t row_id, + uint16_t offset, + uint16_t length); + + int64_t + EncodeOffset(uint32_t row_id, uint16_t row_offset, uint16_t size) { + return static_cast(row_id) << 32 | + static_cast(row_offset) << 16 | + static_cast(size); + } + + std::tuple + DecodeOffset(int64_t encode_offset) { + uint32_t row_id = (encode_offset >> 32) & 0xFFFFFFFF; + uint16_t row_offset = (encode_offset >> 16) & 0xFFFF; + uint16_t size = encode_offset & 0xFFFF; + return std::make_tuple(row_id, row_offset, size); + } + + private: + int64_t field_id_; +}; +} // namespace milvus::index diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index f4f4613c72259..2efcd7ff505c5 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -34,6 +34,7 @@ #include "pb/index_cgo_msg.pb.h" #include "storage/Util.h" #include "index/Meta.h" +#include "index/JsonKeyInvertedIndex.h" using namespace milvus; CStatus @@ -237,6 +238,73 @@ CreateIndex(CIndex* res_index, } } +CStatus +BuildJsonKeyIndex(CBinarySet* c_binary_set, + const uint8_t* serialized_build_index_info, + const uint64_t len) { + try { + auto build_index_info = + std::make_unique(); + auto res = + build_index_info->ParseFromArray(serialized_build_index_info, len); + AssertInfo(res, "Unmarshall build index info failed"); + + auto field_type = + static_cast(build_index_info->field_schema().data_type()); + + auto storage_config = + get_storage_config(build_index_info->storage_config()); + auto config = get_config(build_index_info); + + // init file manager + milvus::storage::FieldDataMeta field_meta{ + build_index_info->collectionid(), + build_index_info->partitionid(), + build_index_info->segmentid(), + build_index_info->field_schema().fieldid(), + build_index_info->field_schema()}; + + milvus::storage::IndexMeta index_meta{ + build_index_info->segmentid(), + build_index_info->field_schema().fieldid(), + build_index_info->buildid(), + build_index_info->index_version(), + "", + build_index_info->field_schema().name(), + field_type, + build_index_info->dim(), + }; + auto chunk_manager = + milvus::storage::CreateChunkManager(storage_config); + + milvus::storage::FileManagerContext fileManagerContext( + field_meta, index_meta, chunk_manager); + + auto field_schema = + FieldMeta::ParseFrom(build_index_info->field_schema()); + auto index = + std::make_unique(fileManagerContext, false); + index->Build(config); + auto binary = + std::make_unique(index->Upload(config)); + *c_binary_set = binary.release(); + auto status = CStatus(); + status.error_code = Success; + status.error_msg = ""; + return status; + } catch (SegcoreError& e) { + auto status = CStatus(); + status.error_code = e.get_error_code(); + status.error_msg = strdup(e.what()); + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = UnexpectedError; + status.error_msg = strdup(e.what()); + return status; + } +} + CStatus BuildTextIndex(CBinarySet* c_binary_set, const uint8_t* serialized_build_index_info, diff --git a/internal/core/src/indexbuilder/index_c.h b/internal/core/src/indexbuilder/index_c.h index 6d26adc3442d9..c9b3e34d1f20b 100644 --- a/internal/core/src/indexbuilder/index_c.h +++ b/internal/core/src/indexbuilder/index_c.h @@ -35,6 +35,11 @@ CreateIndex(CIndex* res_index, CStatus DeleteIndex(CIndex index); +CStatus +BuildJsonKeyIndex(CBinarySet* c_binary_set, + const uint8_t* serialized_build_index_info, + const uint64_t len); + CStatus BuildTextIndex(CBinarySet* c_binary_set, const uint8_t* serialized_build_index_info, diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index 16b45f4557395..0cbdac3cc5199 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -59,7 +59,7 @@ class ChunkedColumnBase : public ColumnBase { } } - virtual ~ChunkedColumnBase(){}; + virtual ~ChunkedColumnBase() {}; virtual void AppendBatch(const FieldDataPtr data) override { @@ -141,6 +141,12 @@ class ChunkedColumnBase : public ColumnBase { "GetBatchBuffer only supported for VariableColumn"); } + virtual std::string_view + RawAt(const size_t i) const { + PanicInfo(ErrorCode::Unsupported, + "RawAt only supported for VariableColumn"); + } + virtual std::pair, FixedVector> StringViews(int64_t chunk_id) const { PanicInfo(ErrorCode::Unsupported, diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index 698097d30677e..8ab484a182d4c 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -323,6 +323,12 @@ class SingleChunkColumnBase : public ColumnBase { "StringViews only supported for VariableColumn"); } + virtual std::string_view + RawAt(const size_t i) const { + PanicInfo(ErrorCode::Unsupported, + "RawAt only supported for VariableColumn"); + } + virtual void AppendBatch(const FieldDataPtr data) { size_t required_size = data_size_ + data->DataSize(); @@ -748,7 +754,7 @@ class SingleChunkVariableColumn : public SingleChunkColumnBase { } std::string_view - RawAt(const int i) const { + RawAt(const size_t i) const { return std::string_view((*this)[i]); } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h index fb07c1594b553..b807d79b73e2f 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.h +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.h @@ -97,6 +97,22 @@ class ChunkedSegmentSealedImpl : public SegmentSealed { void LoadTextIndex(FieldId field_id, std::unique_ptr index) override; + void + LoadJsonKeyIndex( + FieldId field_id, + std::unique_ptr index) override { + PanicInfo(ErrorCode::NotImplemented, "not supported"); + } + + index::JsonKeyInvertedIndex* + GetJsonKeyIndex(FieldId field_id) const override { + PanicInfo(ErrorCode::NotImplemented, "not supported"); + } + + std::pair + GetJsonData(FieldId field_id, size_t offset) const override { + PanicInfo(ErrorCode::NotImplemented, "not supported"); + } public: size_t diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index fe09f7c3afb79..33db7cc3c6e23 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -38,6 +38,7 @@ #include "index/SkipIndex.h" #include "mmap/Column.h" #include "index/TextMatchIndex.h" +#include "index/JsonKeyInvertedIndex.h" namespace milvus::segcore { diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index b84b3b9b94d5c..7978822231522 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -52,6 +52,16 @@ class SegmentSealed : public SegmentInternalInterface { LoadTextIndex(FieldId field_id, std::unique_ptr index) = 0; + virtual void + LoadJsonKeyIndex(FieldId field_id, + std::unique_ptr index) = 0; + + virtual index::JsonKeyInvertedIndex* + GetJsonKeyIndex(FieldId field_id) const = 0; + + virtual std::pair + GetJsonData(FieldId field_id, size_t offset) const = 0; + SegmentType type() const override { return SegmentType::Sealed; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index dea574191845d..fc715b45f6401 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -2142,4 +2142,33 @@ SegmentSealedImpl::LoadTextIndex(FieldId field_id, text_indexes_[field_id] = std::move(index); } +void +SegmentSealedImpl::LoadJsonKeyIndex( + FieldId field_id, std::unique_ptr index) { + std::unique_lock lck(mutex_); + const auto& field_meta = schema_->operator[](field_id); + json_key_indexes_[field_id] = std::move(index); +} + +index::JsonKeyInvertedIndex* +SegmentSealedImpl::GetJsonKeyIndex(FieldId field_id) const { + std::shared_lock lck(mutex_); + auto iter = json_key_indexes_.find(field_id); + if (iter == json_key_indexes_.end()) { + return nullptr; + } + return iter->second.get(); +} + +std::pair +SegmentSealedImpl::GetJsonData(FieldId field_id, size_t offset) const { + std::shared_lock lck(mutex_); + bool is_valid = true; + auto column = fields_.at(field_id); + if (column->IsNullable()) { + is_valid = column->IsValid(offset); + } + return std::make_pair(std::move(column->RawAt(offset)), is_valid); +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 1c07c1047a7e1..840ae016459b8 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -36,6 +36,7 @@ #include "common/Types.h" #include "common/IndexMeta.h" #include "index/TextMatchIndex.h" +#include "index/JsonKeyInvertedIndex.h" namespace milvus::segcore { @@ -100,6 +101,17 @@ class SegmentSealedImpl : public SegmentSealed { LoadTextIndex(FieldId field_id, std::unique_ptr index) override; + void + LoadJsonKeyIndex( + FieldId field_id, + std::unique_ptr index) override; + + index::JsonKeyInvertedIndex* + GetJsonKeyIndex(FieldId field_id) const override; + + std::pair + GetJsonData(FieldId field_id, size_t offset) const override; + public: size_t GetMemoryUsageInBytes() const override { @@ -386,6 +398,10 @@ class SegmentSealedImpl : public SegmentSealed { // whether the segment is sorted by the pk bool is_sorted_by_pk_ = false; + + // used for json expr optimization + std::unordered_map> + json_key_indexes_; }; inline SegmentSealedUPtr diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index fd7180d1ef184..09ab999810a14 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -479,6 +479,60 @@ LoadTextIndex(CSegmentInterface c_segment, } } +CStatus +LoadJsonKeyIndex(CTraceContext c_trace, + CSegmentInterface c_segment, + const uint8_t* serialized_load_json_key_index_info, + const uint64_t len) { + try { + auto ctx = milvus::tracer::TraceContext{ + c_trace.traceID, c_trace.spanID, c_trace.traceFlags}; + auto segment_interface = + reinterpret_cast(c_segment); + auto segment = + dynamic_cast(segment_interface); + AssertInfo(segment != nullptr, "segment conversion failed"); + + auto info_proto = + std::make_unique(); + info_proto->ParseFromArray(serialized_load_json_key_index_info, len); + + milvus::storage::FieldDataMeta field_meta{info_proto->collectionid(), + info_proto->partitionid(), + segment->get_segment_id(), + info_proto->fieldid(), + info_proto->schema()}; + milvus::storage::IndexMeta index_meta{segment->get_segment_id(), + info_proto->fieldid(), + info_proto->buildid(), + info_proto->version()}; + auto remote_chunk_manager = + milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + + milvus::Config config; + std::vector files; + for (const auto& f : info_proto->files()) { + files.push_back(f); + } + config["index_files"] = files; + + milvus::storage::FileManagerContext file_ctx( + field_meta, index_meta, remote_chunk_manager); + + auto index = std::make_unique( + file_ctx, true); + index->Load(ctx, config); + + segment->LoadJsonKeyIndex(milvus::FieldId(info_proto->fieldid()), + std::move(index)); + + return milvus::SuccessCStatus(); + } catch (std::exception& e) { + return milvus::FailureCStatus(&e); + } +} + CStatus UpdateFieldRawDataSize(CSegmentInterface c_segment, int64_t field_id, diff --git a/internal/core/src/segcore/segment_c.h b/internal/core/src/segcore/segment_c.h index 80bb099886405..007f14f2cf37b 100644 --- a/internal/core/src/segcore/segment_c.h +++ b/internal/core/src/segcore/segment_c.h @@ -122,6 +122,12 @@ LoadTextIndex(CSegmentInterface c_segment, const uint8_t* serialized_load_text_index_info, const uint64_t len); +CStatus +LoadJsonKeyIndex(CTraceContext c_trace, + CSegmentInterface c_segment, + const uint8_t* serialied_load_json_key_index_info, + const uint64_t len); + CStatus UpdateFieldRawDataSize(CSegmentInterface c_segment, int64_t field_id, diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index e54fa6d748825..3b022ef76947a 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -79,8 +79,18 @@ DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name, return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num); } +std::string +DiskFileManagerImpl::GetRemoteJsonKeyIndexPath(const std::string& file_name, + int64_t slice_num) { + auto remote_prefix = GetRemoteJsonKeyLogPrefix(); + return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num); +} + bool -DiskFileManagerImpl::AddFile(const std::string& file) noexcept { +DiskFileManagerImpl::AddFileInternal( + const std::string& file, + const std::function& + get_remote_path) noexcept { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); FILEMANAGER_TRY @@ -115,8 +125,7 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { } auto batch_size = std::min(FILE_SLICE_SIZE, int64_t(fileSize) - offset); - batch_remote_files.emplace_back( - GetRemoteIndexPath(fileName, slice_num)); + batch_remote_files.emplace_back(get_remote_path(fileName, slice_num)); remote_file_sizes.emplace_back(batch_size); local_file_offsets.emplace_back(offset); offset += batch_size; @@ -132,56 +141,28 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept { } // namespace knowhere bool -DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept { - auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); - FILEMANAGER_TRY - if (!local_chunk_manager->Exist(file)) { - LOG_ERROR("local file {} not exists", file); - return false; - } - - // record local file path - local_paths_.emplace_back(file); - - auto fileName = GetFileName(file); - auto fileSize = local_chunk_manager->Size(file); - - std::vector batch_remote_files; - std::vector remote_file_sizes; - std::vector local_file_offsets; - - int slice_num = 0; - auto parallel_degree = - uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); - for (int64_t offset = 0; offset < fileSize; slice_num++) { - if (batch_remote_files.size() >= parallel_degree) { - AddBatchIndexFiles(file, - local_file_offsets, - batch_remote_files, - - remote_file_sizes); - batch_remote_files.clear(); - remote_file_sizes.clear(); - local_file_offsets.clear(); - } +DiskFileManagerImpl::AddFile(const std::string& file) noexcept { + return AddFileInternal(file, + [this](const std::string& file_name, int slice_num) { + return GetRemoteIndexPath(file_name, slice_num); + }); +} - auto batch_size = std::min(FILE_SLICE_SIZE, int64_t(fileSize) - offset); - batch_remote_files.emplace_back( - GetRemoteTextLogPath(fileName, slice_num)); - remote_file_sizes.emplace_back(batch_size); - local_file_offsets.emplace_back(offset); - offset += batch_size; - } - if (batch_remote_files.size() > 0) { - AddBatchIndexFiles( - file, local_file_offsets, batch_remote_files, remote_file_sizes); - } - FILEMANAGER_CATCH - FILEMANAGER_END +bool +DiskFileManagerImpl::AddJsonKeyIndexLog(const std::string& file) noexcept { + return AddFileInternal( + file, [this](const std::string& file_name, int slice_num) { + return GetRemoteJsonKeyIndexPath(file_name, slice_num); + }); +} - return true; -} // namespace knowhere +bool +DiskFileManagerImpl::AddTextLog(const std::string& file) noexcept { + return AddFileInternal( + file, [this](const std::string& file_name, int slice_num) { + return GetRemoteTextLogPath(file_name, slice_num); + }); +} void DiskFileManagerImpl::AddBatchIndexFiles( @@ -236,8 +217,9 @@ DiskFileManagerImpl::AddBatchIndexFiles( } void -DiskFileManagerImpl::CacheIndexToDisk( - const std::vector& remote_files) { +DiskFileManagerImpl::CacheIndexToDiskInternal( + const std::vector& remote_files, + const std::function& get_local_index_prefix) noexcept { auto local_chunk_manager = LocalChunkManagerSingleton::GetInstance().GetChunkManager(); @@ -263,7 +245,7 @@ DiskFileManagerImpl::CacheIndexToDisk( for (auto& slices : index_slices) { auto prefix = slices.first; auto local_index_file_name = - GetLocalIndexObjectPrefix() + + get_local_index_prefix() + prefix.substr(prefix.find_last_of('/') + 1); local_chunk_manager->CreateFile(local_index_file_name); auto file = @@ -304,57 +286,24 @@ DiskFileManagerImpl::CacheIndexToDisk( } void -DiskFileManagerImpl::CacheTextLogToDisk( +DiskFileManagerImpl::CacheIndexToDisk( const std::vector& remote_files) { - auto local_chunk_manager = - LocalChunkManagerSingleton::GetInstance().GetChunkManager(); - - std::map> index_slices; - for (auto& file_path : remote_files) { - auto pos = file_path.find_last_of("_"); - AssertInfo(pos > 0, "invalided index file path:{}", file_path); - try { - auto idx = std::stoi(file_path.substr(pos + 1)); - index_slices[file_path.substr(0, pos)].emplace_back(idx); - } catch (const std::logic_error& e) { - auto err_message = fmt::format( - "invalided text log path:{}, error:{}", file_path, e.what()); - LOG_ERROR(err_message); - throw std::logic_error(err_message); - } - } - - for (auto& slices : index_slices) { - std::sort(slices.second.begin(), slices.second.end()); - } - - for (auto& slices : index_slices) { - auto prefix = slices.first; - auto local_index_file_name = - GetLocalTextIndexPrefix() + "/" + - prefix.substr(prefix.find_last_of('/') + 1); - local_chunk_manager->CreateFile(local_index_file_name); - auto file = - File::Open(local_index_file_name, O_CREAT | O_RDWR | O_TRUNC); + return CacheIndexToDiskInternal( + remote_files, [this]() { return GetLocalIndexObjectPrefix(); }); +} - // Get the remote files - std::vector batch_remote_files; - batch_remote_files.reserve(slices.second.size()); - for (int& iter : slices.second) { - auto origin_file = prefix + "_" + std::to_string(iter); - batch_remote_files.push_back(origin_file); - } +void +DiskFileManagerImpl::CacheTextLogToDisk( + const std::vector& remote_files) { + return CacheIndexToDiskInternal( + remote_files, [this]() { return GetLocalTextIndexPrefix(); }); +} - auto index_chunks = GetObjectData(rcm_.get(), batch_remote_files); - for (auto& chunk : index_chunks) { - auto index_data = chunk.get()->GetFieldData(); - auto index_size = index_data->Size(); - auto chunk_data = reinterpret_cast( - const_cast(index_data->Data())); - file.Write(chunk_data, index_size); - } - local_paths_.emplace_back(local_index_file_name); - } +void +DiskFileManagerImpl::CacheJsonKeyIndexToDisk( + const std::vector& remote_files) { + return CacheIndexToDiskInternal( + remote_files, [this]() { return GetLocalJsonKeyIndexPrefix(); }); } void @@ -693,6 +642,12 @@ DiskFileManagerImpl::GetFileName(const std::string& localfile) { return localPath.filename().string(); } +std::string +DiskFileManagerImpl::GetIndexIdentifier() { + return GenIndexPathIdentifier(index_meta_.build_id, + index_meta_.index_version); +} + std::string DiskFileManagerImpl::GetLocalIndexObjectPrefix() { auto local_chunk_manager = @@ -701,6 +656,14 @@ DiskFileManagerImpl::GetLocalIndexObjectPrefix() { local_chunk_manager, index_meta_.build_id, index_meta_.index_version); } +std::string +DiskFileManagerImpl::GetTextIndexIdentifier() { + return std::to_string(index_meta_.build_id) + "/" + + std::to_string(index_meta_.index_version) + "/" + + std::to_string(field_meta_.segment_id) + "/" + + std::to_string(field_meta_.field_id); +} + std::string DiskFileManagerImpl::GetLocalTextIndexPrefix() { auto local_chunk_manager = @@ -713,17 +676,31 @@ DiskFileManagerImpl::GetLocalTextIndexPrefix() { } std::string -DiskFileManagerImpl::GetIndexIdentifier() { - return GenIndexPathIdentifier(index_meta_.build_id, - index_meta_.index_version); +DiskFileManagerImpl::GetJsonKeyIndexIdentifier() { + return GenJsonKeyIndexPathIdentifier(index_meta_.build_id, + index_meta_.index_version, + field_meta_.segment_id, + field_meta_.field_id); } std::string -DiskFileManagerImpl::GetTextIndexIdentifier() { - return std::to_string(index_meta_.build_id) + "/" + - std::to_string(index_meta_.index_version) + "/" + - std::to_string(field_meta_.segment_id) + - std::to_string(field_meta_.field_id); +DiskFileManagerImpl::GetLocalJsonKeyIndexPrefix() { + auto local_chunk_manager = + LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + return GenJsonKeyIndexPathPrefix(local_chunk_manager, + index_meta_.build_id, + index_meta_.index_version, + field_meta_.segment_id, + field_meta_.field_id); +} + +std::string +DiskFileManagerImpl::GetRemoteJsonKeyLogPrefix() { + return GenJsonKeyIndexPathPrefix(rcm_, + index_meta_.build_id, + index_meta_.index_version, + field_meta_.segment_id, + field_meta_.field_id); } std::string diff --git a/internal/core/src/storage/DiskFileManagerImpl.h b/internal/core/src/storage/DiskFileManagerImpl.h index b2c87b1ff78db..aee9bca069b9e 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.h +++ b/internal/core/src/storage/DiskFileManagerImpl.h @@ -51,27 +51,42 @@ class DiskFileManagerImpl : public FileManagerImpl { bool AddTextLog(const std::string& filename) noexcept; + bool + AddJsonKeyIndexLog(const std::string& filename) noexcept; + public: std::string GetName() const override { return "DiskFileManagerImpl"; } + std::string + GetIndexIdentifier(); + std::string GetLocalIndexObjectPrefix(); + // Different from user index, a text index task may have multiple text fields sharing same build_id/task_id. So + // segment_id and field_id are required to identify a unique text index, in case that we support multiple index task + // in the same indexnode at the same time later. + std::string + GetTextIndexIdentifier(); + // Similar to GetTextIndexIdentifier, segment_id and field_id is also required. std::string GetLocalTextIndexPrefix(); + // Used for building index, using this index identifier mode to construct tmp building-index dir. std::string - GetIndexIdentifier(); + GetJsonKeyIndexIdentifier(); - // Different from user index, a text index task may have multiple text fields sharing same build_id/task_id. So - // segment_id and field_id are required to identify a unique text index, in case that we support multiple index task - // in the same indexnode at the same time later. + // Used for loading index, using this index prefix dir to store index. std::string - GetTextIndexIdentifier(); + GetLocalJsonKeyIndexPrefix(); + + // Used for upload index to remote storage, using this index prefix dir as remote storage directory + std::string + GetRemoteJsonKeyLogPrefix(); std::string GetLocalRawDataObjectPrefix(); @@ -92,6 +107,9 @@ class DiskFileManagerImpl : public FileManagerImpl { void CacheTextLogToDisk(const std::vector& remote_files); + void + CacheJsonKeyIndexToDisk(const std::vector& remote_files); + void AddBatchIndexFiles(const std::string& local_file_name, const std::vector& local_file_offsets, @@ -125,6 +143,19 @@ class DiskFileManagerImpl : public FileManagerImpl { std::string GetRemoteTextLogPath(const std::string& file_name, int64_t slice_num) const; + std::string + GetRemoteJsonKeyIndexPath(const std::string& file_name, int64_t slice_num); + + bool + AddFileInternal(const std::string& file_name, + const std::function& + get_remote_path) noexcept; + + void + CacheIndexToDiskInternal( + const std::vector& remote_files, + const std::function& get_local_index_prefix) noexcept; + private: // local file path (abs path) std::vector local_paths_; diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 0ccf13b45fe9d..36770a915dc20 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -517,6 +517,28 @@ GenTextIndexPathPrefix(ChunkManagerPtr cm, build_id, index_version, segment_id, field_id); } +std::string +GenJsonKeyIndexPathIdentifier(int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id) { + return std::to_string(build_id) + "/" + std::to_string(index_version) + + "/" + std::to_string(segment_id) + "/" + std::to_string(field_id) + + "/"; +} + +std::string +GenJsonKeyIndexPathPrefix(ChunkManagerPtr cm, + int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id) { + return cm->GetRootPath() + "/" + std::string(JSON_KEY_INDEX_LOG_ROOT_PATH) + + "/" + + GenJsonKeyIndexPathIdentifier( + build_id, index_version, segment_id, field_id); +} + std::string GetIndexPathPrefixWithBuildID(ChunkManagerPtr cm, int64_t build_id) { return cm->GetRootPath() + "/" + std::string(INDEX_ROOT_PATH) + "/" + diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index b3a6a124fbe70..638ad9b08de5b 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -92,6 +92,19 @@ GenTextIndexPathPrefix(ChunkManagerPtr cm, int64_t segment_id, int64_t field_id); +std::string +GenJsonKeyIndexPathIdentifier(int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id); + +std::string +GenJsonKeyIndexPathPrefix(ChunkManagerPtr cm, + int64_t build_id, + int64_t index_version, + int64_t segment_id, + int64_t field_id); + std::string GenFieldRawDataPathPrefix(ChunkManagerPtr cm, int64_t segment_id, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index c443ec7fc7a0e..12767f3082ce8 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -15,7 +15,7 @@ enum class TantivyDataType : uint8_t { }; struct RustArray { - uint32_t *array; + int64_t *array; size_t len; size_t cap; }; diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/array.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/array.rs index 9d71ffa315b05..a029c75c6c019 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/array.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/array.rs @@ -2,13 +2,13 @@ use libc::size_t; #[repr(C)] pub struct RustArray { - array: *mut u32, + array: *mut i64, len: size_t, cap: size_t, } impl RustArray { - pub fn from_vec(vec: Vec) -> RustArray { + pub fn from_vec(vec: Vec) -> RustArray { let len = vec.len(); let cap = vec.capacity(); let v = vec.leak(); diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs index 95d585b436d16..f04aa5b34adfd 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/docid_collector.rs @@ -7,7 +7,7 @@ use tantivy::{ pub(crate) struct DocIdCollector; impl Collector for DocIdCollector { - type Fruit = Vec; + type Fruit = Vec; type Child = DocIdChildCollector; fn for_segment( @@ -41,16 +41,16 @@ impl Collector for DocIdCollector { } pub(crate) struct DocIdChildCollector { - docs: Vec, + docs: Vec, column: Column, } impl SegmentCollector for DocIdChildCollector { - type Fruit = Vec; + type Fruit = Vec; fn collect(&mut self, doc: DocId, _score: Score) { self.column.values_for_doc(doc).for_each(|doc_id| { - self.docs.push(doc_id as u32); + self.docs.push(doc_id); }) } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs index 3ac514759dfa0..9a651d59a7d16 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs @@ -65,7 +65,7 @@ impl IndexReaderWrapper { sum } - pub(crate) fn search(&self, q: &dyn Query) -> Vec { + pub(crate) fn search(&self, q: &dyn Query) -> Vec { let searcher = self.reader.searcher(); match self.id_field { Some(_) => { @@ -79,7 +79,7 @@ impl IndexReaderWrapper { } } - pub fn term_query_i64(&self, term: i64) -> Vec { + pub fn term_query_i64(&self, term: i64) -> Vec { let q = TermQuery::new( Term::from_field_i64(self.field, term), IndexRecordOption::Basic, @@ -87,7 +87,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn lower_bound_range_query_i64(&self, lower_bound: i64, inclusive: bool) -> Vec { + pub fn lower_bound_range_query_i64(&self, lower_bound: i64, inclusive: bool) -> Vec { let q = RangeQuery::new_i64_bounds( self.field_name.to_string(), make_bounds(lower_bound, inclusive), @@ -96,7 +96,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn upper_bound_range_query_i64(&self, upper_bound: i64, inclusive: bool) -> Vec { + pub fn upper_bound_range_query_i64(&self, upper_bound: i64, inclusive: bool) -> Vec { let q = RangeQuery::new_i64_bounds( self.field_name.to_string(), Bound::Unbounded, @@ -111,14 +111,14 @@ impl IndexReaderWrapper { upper_bound: i64, lb_inclusive: bool, ub_inclusive: bool, - ) -> Vec { + ) -> Vec { let lb = make_bounds(lower_bound, lb_inclusive); let ub = make_bounds(upper_bound, ub_inclusive); let q = RangeQuery::new_i64_bounds(self.field_name.to_string(), lb, ub); self.search(&q) } - pub fn term_query_f64(&self, term: f64) -> Vec { + pub fn term_query_f64(&self, term: f64) -> Vec { let q = TermQuery::new( Term::from_field_f64(self.field, term), IndexRecordOption::Basic, @@ -126,7 +126,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn lower_bound_range_query_f64(&self, lower_bound: f64, inclusive: bool) -> Vec { + pub fn lower_bound_range_query_f64(&self, lower_bound: f64, inclusive: bool) -> Vec { let q = RangeQuery::new_f64_bounds( self.field_name.to_string(), make_bounds(lower_bound, inclusive), @@ -135,7 +135,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn upper_bound_range_query_f64(&self, upper_bound: f64, inclusive: bool) -> Vec { + pub fn upper_bound_range_query_f64(&self, upper_bound: f64, inclusive: bool) -> Vec { let q = RangeQuery::new_f64_bounds( self.field_name.to_string(), Bound::Unbounded, @@ -150,14 +150,14 @@ impl IndexReaderWrapper { upper_bound: f64, lb_inclusive: bool, ub_inclusive: bool, - ) -> Vec { + ) -> Vec { let lb = make_bounds(lower_bound, lb_inclusive); let ub = make_bounds(upper_bound, ub_inclusive); let q = RangeQuery::new_f64_bounds(self.field_name.to_string(), lb, ub); self.search(&q) } - pub fn term_query_bool(&self, term: bool) -> Vec { + pub fn term_query_bool(&self, term: bool) -> Vec { let q = TermQuery::new( Term::from_field_bool(self.field, term), IndexRecordOption::Basic, @@ -165,7 +165,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn term_query_keyword(&self, term: &str) -> Vec { + pub fn term_query_keyword(&self, term: &str) -> Vec { let q = TermQuery::new( Term::from_field_text(self.field, term), IndexRecordOption::Basic, @@ -173,7 +173,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn lower_bound_range_query_keyword(&self, lower_bound: &str, inclusive: bool) -> Vec { + pub fn lower_bound_range_query_keyword(&self, lower_bound: &str, inclusive: bool) -> Vec { let q = RangeQuery::new_str_bounds( self.field_name.to_string(), make_bounds(lower_bound, inclusive), @@ -182,7 +182,7 @@ impl IndexReaderWrapper { self.search(&q) } - pub fn upper_bound_range_query_keyword(&self, upper_bound: &str, inclusive: bool) -> Vec { + pub fn upper_bound_range_query_keyword(&self, upper_bound: &str, inclusive: bool) -> Vec { let q = RangeQuery::new_str_bounds( self.field_name.to_string(), Bound::Unbounded, @@ -197,19 +197,19 @@ impl IndexReaderWrapper { upper_bound: &str, lb_inclusive: bool, ub_inclusive: bool, - ) -> Vec { + ) -> Vec { let lb = make_bounds(lower_bound, lb_inclusive); let ub = make_bounds(upper_bound, ub_inclusive); let q = RangeQuery::new_str_bounds(self.field_name.to_string(), lb, ub); self.search(&q) } - pub fn prefix_query_keyword(&self, prefix: &str) -> Vec { + pub fn prefix_query_keyword(&self, prefix: &str) -> Vec { let pattern = format!("{}(.|\n)*", prefix); self.regex_query(&pattern) } - pub fn regex_query(&self, pattern: &str) -> Vec { + pub fn regex_query(&self, pattern: &str) -> Vec { let q = RegexQuery::from_pattern(&pattern, self.field).unwrap(); self.search(&q) } diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs index 654346fc868c4..19d51defc212e 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_text.rs @@ -9,7 +9,7 @@ use crate::{index_reader::IndexReaderWrapper, tokenizer::default_tokenizer}; impl IndexReaderWrapper { // split the query string into multiple tokens using index's default tokenizer, // and then execute the disconjunction of term query. - pub(crate) fn match_query(&self, q: &str) -> Vec { + pub(crate) fn match_query(&self, q: &str) -> Vec { // clone the tokenizer to make `match_query` thread-safe. let mut tokenizer = self .index diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/vec_collector.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/vec_collector.rs index 73299f24779e0..64d1c558205c6 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/vec_collector.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/vec_collector.rs @@ -7,7 +7,7 @@ use tantivy::{ pub struct VecCollector; impl Collector for VecCollector { - type Fruit = Vec; + type Fruit = Vec; type Child = VecChildCollector; @@ -23,7 +23,7 @@ impl Collector for VecCollector { false } - fn merge_fruits(&self, segment_fruits: Vec>) -> tantivy::Result> { + fn merge_fruits(&self, segment_fruits: Vec>) -> tantivy::Result> { if segment_fruits.len() == 1 { Ok(segment_fruits.into_iter().next().unwrap()) } else { @@ -44,14 +44,14 @@ impl Collector for VecCollector { } pub struct VecChildCollector { - docs: Vec, + docs: Vec, } impl SegmentCollector for VecChildCollector { - type Fruit = Vec; + type Fruit = Vec; fn collect(&mut self, doc: DocId, _score: tantivy::Score) { - self.docs.push(doc); + self.docs.push(doc as i64); } fn harvest(self) -> Self::Fruit { diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 6cd11609038ef..5ad22c8e1f7c9 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -46,6 +46,7 @@ #include "segcore/load_index_c.h" #include "test_utils/c_api_test_utils.h" #include "segcore/vector_index_c.h" +#include "common/jsmn.h" namespace chrono = std::chrono; @@ -5280,3 +5281,386 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) { TEST(CApiTest, IsLoadWithDisk) { ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0)); } + +// 1000 keys +std::string +GenerateJson(int N) { + std::vector data(N); + std::default_random_engine er(67); + std::normal_distribution<> distr(0, 1); + std::vector keys; + for (int i = 0; i < N; i++) { + keys.push_back("keys" + std::to_string(i)); + } + std::string json_string; + std::vector values(N); + for (int i = 0; i < N; i++) { + if (i % 7 == 0 || i % 7 == 4) { + values[i] = std::to_string(er()); + } else if (i % 7 == 1 || i % 7 == 5) { + values[i] = std::to_string(static_cast(er())); + } else if (i % 7 == 2 || i % 7 == 6) { + values[i] = er() / 2 == 0 ? "true" : "false"; + } else if (i % 7 == 3) { + values[i] = "\"xxxx" + std::to_string(i) + "\""; + // } else if (i % 7 == 4) { + // std::vector intvec(10); + // for (int j = 0; j < 10; j++) { + // intvec[j] = std::to_string(i + j); + // } + // values[i] = "[" + join(intvec, ",") + "]"; + // } else if (i % 7 == 5) { + // std::vector doublevec(10); + // for (int j = 0; j < 10; j++) { + // doublevec[j] = + // std::to_string(static_cast(i + j + er())); + // } + // values[i] = "[" + join(doublevec, ",") + "]"; + // } else if (i % 7 == 6) { + // std::vector stringvec(10); + // for (int j = 0; j < 10; j++) { + // stringvec[j] = "\"xxx" + std::to_string(j) + "\""; + // } + // values[i] = "[" + join(stringvec, ",") + "]"; + } + } + json_string += "{"; + for (int i = 0; i < N - 1; i++) { + json_string += R"(")" + keys[i] + R"(":)" + values[i] + R"(,)"; + } + json_string += R"(")" + keys[N - 1] + R"(":)" + values[N - 1]; + json_string += "}"; + return json_string; +} + +void +ParseJson(const std::string& json) { + jsmn_parser p; + jsmntok_t t[2002]; + + jsmn_init(&p); + int r = jsmn_parse( + &p, json.c_str(), strlen(json.c_str()), t, sizeof(t) / sizeof(t[0])); + if (r < 0) { + printf("Failed to parse JSON: %d\n", r); + return; + } + if (r < 1 || t[0].type != JSMN_OBJECT) { + printf("Object expected\n"); + return; + } + //std::cout << r << std::endl; +} + +TEST(CApiTest, test_parse_perform) { + for (int i = 0; i < 10000; i++) { + { + int64_t all_cost = 0; + for (int j = 0; j < 10000; j++) { + auto json_string = GenerateJson(1000); + if (j == 0) { + std::cout << json_string.size() << std::endl; + } + //std::cout << json_string << std::endl; + auto start = std::chrono::steady_clock::now(); + ParseJson(json_string); + all_cost += + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + } + std::cout << "cost: " << all_cost << "us" << std::endl; + } + { + int64_t all_cost = 0; + for (int j = 0; j < 10000; j++) { + auto json_string = GenerateJson(100); + if (j == 0) { + std::cout << json_string.size() << std::endl; + } + //std::cout << json_string << std::endl; + auto start = std::chrono::steady_clock::now(); + ParseJson(json_string); + all_cost += + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + } + std::cout << "cost: " << all_cost << "us" << std::endl; + } + { + int64_t all_cost = 0; + for (int j = 0; j < 10000; j++) { + auto json_string = GenerateJson(50); + if (j == 0) { + std::cout << json_string.size() << std::endl; + } + auto start = std::chrono::steady_clock::now(); + ParseJson(json_string); + all_cost += + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count(); + } + std::cout << "cost: " << all_cost << "us" << std::endl; + } + } +} + +void +extract_key_value_pairs(const char* json, size_t len) { + jsmn_parser parser; + jsmntok_t* tokens = + (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation + if (!tokens) { + fprintf(stderr, "Memory allocation failed\n"); + return; + } + int num_tokens = 0; + int token_capacity = 16; + + // Initialize the parser + jsmn_init(&parser); + + size_t pos = 0; + while (pos < len) { + size_t chunk_size = + len - pos > 256 ? 256 : len - pos; // Read in chunks of 256 bytes + int r = + jsmn_parse(&parser, json + pos, chunk_size, tokens, token_capacity); + if (r < 0) { + if (r == JSMN_ERROR_NOMEM) { + // Reallocate tokens array if not enough space + token_capacity *= 2; // Double the capacity + tokens = (jsmntok_t*)realloc( + tokens, token_capacity * sizeof(jsmntok_t)); + if (!tokens) { + fprintf(stderr, "Memory reallocation failed\n"); + return; + } + continue; // Try parsing again + } else { + fprintf(stderr, "Failed to parse JSON: %d\n", r); + free(tokens); + return; + } + } + + // Update the position + pos += chunk_size; + } + + // Iterate through the tokens + for (int i = 0; i < parser.toknext; i++) { + if (tokens[i].type == JSMN_OBJECT) { + for (int j = 0; j < tokens[i].size; j++) { + // The next token is the key (string) + j++; + printf("Key: %.*s\n", + tokens[j].end - tokens[j].start, + json + tokens[j].start); + + // The next token is the value + j++; + printf("Value: %.*s\n", + tokens[j].end - tokens[j].start, + json + tokens[j].start); + } + } + } + + // Clean up + free(tokens); +} + +void +TravelJson(const char* json, + jsmntok* tokens, + int& index, + std::vector& path) { + jsmntok current = tokens[0]; + if (current.type == JSMN_OBJECT) { + int j = 1; + for (int i = 0; i < current.size; i++) { + assert(tokens[j].type == JSMN_STRING && tokens[j].size != 0); + std::string key(json + tokens[j].start, + tokens[j].end - tokens[j].start); + path.push_back(key); + j++; + int consumed = 0; + TravelJson(json, tokens + j, consumed, path); + path.pop_back(); + j += consumed; + } + index = j; + } else if (current.type == JSMN_PRIMITIVE) { + std::cout << "key:" << Join(path, ".") << "values:" + << std::string(json + current.start, + current.end - current.start) + << std::endl; + index++; + } else if (current.type == JSMN_ARRAY) { + std::cout << "key:" << Join(path, ".") << "values:" + << std::string(json + current.start, + current.end - current.start) + << std::endl; + // skip next array parse + int count = current.size; + int j = 1; + while (count > 0) { + if (tokens[j].size == 0) { + count--; + } else { + count += tokens[j].size; + } + j++; + } + index = j; + + } else if (current.type == JSMN_STRING) { + if (current.size == 0) { + std::cout << "key:" << Join(path, ".") << " values:" + << std::string(json + current.start, + current.end - current.start) + << std::endl; + index++; + } else { + throw std::runtime_error("not should happen"); + } + } else { + throw std::runtime_error("not should happen"); + } +} + +void +extract_key_value_pairs(const char* json) { + jsmn_parser parser; + jsmntok_t* tokens = + (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation + if (!tokens) { + fprintf(stderr, "Memory allocation failed\n"); + return; + } + int num_tokens = 0; + int token_capacity = 16; + + // Initialize the parser + jsmn_init(&parser); + + // Parse the JSON string + while (1) { + int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); + if (r < 0) { + if (r == JSMN_ERROR_NOMEM) { + // Reallocate tokens array if not enough space + token_capacity *= 2; // Double the capacity + tokens = (jsmntok_t*)realloc( + tokens, token_capacity * sizeof(jsmntok_t)); + if (!tokens) { + fprintf(stderr, "Memory reallocation failed\n"); + return; + } + continue; // Try parsing again + } else { + fprintf(stderr, "Failed to parse JSON: %d\n", r); + free(tokens); + return; + } + } + num_tokens = r; + break; // Exit the loop if parsing was successful + } + + std::cout << "num_tokens:" << num_tokens << std::endl; + // Iterate through the tokens + for (int i = 0; i < num_tokens; i++) { + std::cout << "i:" << i << "type: " << tokens[i].type + << "token size:" << tokens[i].size << std::endl; + printf("value: %.*s\n", + tokens[i].end - tokens[i].start, + json + tokens[i].start); + } + + std::cout << "-----------------" << std::endl; + int index = 0; + std::vector path; + TravelJson(json, tokens, index, path); + + // Clean up + free(tokens); +} + +void +extract_json(const char* json) { + jsmn_parser parser; + jsmntok_t* tokens = + (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation + if (!tokens) { + fprintf(stderr, "Memory allocation failed\n"); + return; + } + int num_tokens = 0; + int token_capacity = 16; + + // Initialize the parser + jsmn_init(&parser); + + // Parse the JSON string + while (1) { + int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); + if (r < 0) { + if (r == JSMN_ERROR_NOMEM) { + // Reallocate tokens array if not enough space + token_capacity *= 2; // Double the capacity + tokens = (jsmntok_t*)realloc( + tokens, token_capacity * sizeof(jsmntok_t)); + if (!tokens) { + fprintf(stderr, "Memory reallocation failed\n"); + return; + } + continue; // Try parsing again + } else { + fprintf(stderr, "Failed to parse JSON: %d\n", r); + free(tokens); + return; + } + } + num_tokens = r; + break; // Exit the loop if parsing was successful + } + + // assert(tokens[0].type == JSMN_OBJECT); + + // Iterate through the tokens + for (int i = 0; i < num_tokens; i++) { + std::cout << "i:" << i << "type: " << tokens[i].type + << "token size:" << tokens[i].size << std::endl; + printf("value: %.*s\n", + tokens[i].end - tokens[i].start, + json + tokens[i].start); + } + + // Clean up + free(tokens); +} + +TEST(CApiTest, test_jsmn_function) { + int64_t all_cost = 0; + // auto json_string = GenerateJson(50); + // std::cout << json_string << std::endl; + // extract_key_value_pairs(json_string.c_str()); + + std::string json_string = + R"({"keys0": ["value0", 234, "values1"], "keys1": ["value3", 1235]})"; + std::cout << json_string << std::endl; + extract_key_value_pairs(json_string.c_str()); + + json_string = + R"({"keys0": [{"keys1": 1234, "keys2": "xxx"}, {"keys3": 567, "keys4": "xxxxx"}]})"; + std::cout << json_string << std::endl; + extract_key_value_pairs(json_string.c_str()); + + json_string = R"({"keys0": {"keys1": { "keys2": "xxx", "keys3" :1234}}})"; + std::cout << json_string << std::endl; + extract_key_value_pairs(json_string.c_str()); +} diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 44a7cc5901981..03980f6ab78fc 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -8,6 +8,7 @@ import ( "github.com/cockroachdb/errors" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -82,6 +83,7 @@ func (jm *statsJobManager) triggerStatsTaskLoop() { jm.triggerSortStatsTask() jm.triggerTextStatsTask() jm.triggerBM25StatsTask() + jm.triggerJsonKeyIndexStatsTask() case segID := <-getStatsTaskChSingleton(): log.Info("receive new segment to trigger stats task", zap.Int64("segmentID", segID)) @@ -139,6 +141,13 @@ func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { return false } +func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { + if !isFlush(segment) { + return false + } + return true +} + func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool { // TODO: docking bm25 stats task return false @@ -170,6 +179,29 @@ func (jm *statsJobManager) triggerTextStatsTask() { } } +func (jm *statsJobManager) triggerJsonKeyIndexStatsTask() { + collections := jm.mt.GetCollections() + for _, collection := range collections { + needTriggerFieldIDs := make([]UniqueID, 0) + for _, field := range collection.Schema.GetFields() { + if field.GetDataType() == schemapb.DataType_JSON { + needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID()) + } + } + segments := jm.mt.SelectSegments(WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { + return needDoJsonKeyIndex(seg, needTriggerFieldIDs) + })) + for _, segment := range segments { + if err := jm.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil { + log.Warn("create stats task with json key index for segment failed, wait for retry:", + zap.Int64("segmentID", segment.GetID()), zap.Error(err)) + continue + } + } + } + +} + func (jm *statsJobManager) triggerBM25StatsTask() { collections := jm.mt.GetCollections() for _, collection := range collections { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index e1d8155b7eb62..8a3fc7cd352d6 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1988,6 +1988,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats Statslogs: result.GetStatsLogs(), Bm25Statslogs: result.GetBm25Logs(), TextStatsLogs: result.GetTextStatsLogs(), + JsonKeyStats: result.GetJsonKeyStatsLogs(), CreatedByCompaction: true, CompactionFrom: []int64{oldSegmentID}, LastExpireTime: cloned.GetLastExpireTime(), @@ -2004,7 +2005,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats segment.State = commonpb.SegmentState_Dropped } - log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", result.GetNumRows())) + log.Info("meta update: prepare for complete stats mutation - complete", zap.Int64("num rows", result.GetNumRows()), zap.Any("xxxx", segmentInfo)) if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{cloned.SegmentInfo, segment.SegmentInfo}, metastore.BinlogsIncrement{Segment: segment.SegmentInfo}); err != nil { log.Warn("fail to alter segments and new segment", zap.Error(err)) diff --git a/internal/datacoord/segment_operator.go b/internal/datacoord/segment_operator.go index 91b4da67ba8ce..2558865d51061 100644 --- a/internal/datacoord/segment_operator.go +++ b/internal/datacoord/segment_operator.go @@ -43,6 +43,18 @@ func SetTextIndexLogs(textIndexLogs map[int64]*datapb.TextIndexStats) SegmentOpe } } +func SetJsonKeyIndexLogs(jsonKeyIndexLogs map[int64]*datapb.JsonKeyStats) SegmentOperator { + return func(segment *SegmentInfo) bool { + if segment.JsonKeyStats == nil { + segment.JsonKeyStats = make(map[int64]*datapb.JsonKeyStats) + } + for field, logs := range jsonKeyIndexLogs { + segment.JsonKeyStats[field] = logs + } + return true + } +} + type segmentCriterion struct { collectionID int64 channel string diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 477b3b2268ef7..ab5ee1b53cb04 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -313,6 +313,13 @@ func (st *statsTask) SetJobInfo(meta *meta) error { zap.Int64("segmentID", st.segmentID), zap.Error(err)) return err } + case indexpb.StatsSubJob_JsonKeyIndexJob: + err := meta.UpdateSegment(st.taskInfo.GetSegmentID(), SetJsonKeyIndexLogs(st.taskInfo.GetJsonKeyStatsLogs())) + if err != nil { + log.Warn("save json key index stats result failed", zap.Int64("taskId", st.taskID), + zap.Int64("segmentID", st.segmentID), zap.Error(err)) + return err + } case indexpb.StatsSubJob_BM25Job: // TODO: support bm25 job } diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index f000eba84a3c3..137346e3e73cb 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -504,18 +504,19 @@ func (i *IndexNode) QueryJobsV2(ctx context.Context, req *workerpb.QueryJobsV2Re info := i.getStatsTaskInfo(req.GetClusterID(), taskID) if info != nil { results = append(results, &workerpb.StatsResult{ - TaskID: taskID, - State: info.state, - FailReason: info.failReason, - CollectionID: info.collID, - PartitionID: info.partID, - SegmentID: info.segID, - Channel: info.insertChannel, - InsertLogs: info.insertLogs, - StatsLogs: info.statsLogs, - TextStatsLogs: info.textStatsLogs, + TaskID: taskID, + State: info.state, + FailReason: info.failReason, + CollectionID: info.collID, + PartitionID: info.partID, + SegmentID: info.segID, + Channel: info.insertChannel, + InsertLogs: info.insertLogs, + StatsLogs: info.statsLogs, + TextStatsLogs: info.textStatsLogs, Bm25Logs: info.bm25Logs, - NumRows: info.numRows, + NumRows: info.numRows, + JsonKeyStatsLogs: info.jsonKeyStatsLogs, }) } } diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index ac340db78e5bd..282bfdf137658 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -336,6 +336,13 @@ func (st *statsTask) Execute(ctx context.Context) error { log.Warn("stats wrong, failed to create text index", zap.Error(err)) return err } + } else if st.req.GetSubJobType() == indexpb.StatsSubJob_JsonKeyIndexJob { + err = st.createJsonKeyIndex(ctx, st.req.GetStorageConfig(), st.req.GetCollectionID(), + st.req.GetPartitionID(), st.req.GetTargetSegmentID(), st.req.GetTaskVersion(), st.req.GetTaskID(), insertLogs) + if err != nil { + log.Warn("stats wrong, failed to create text index", zap.Error(err)) + return err + } } return nil @@ -726,3 +733,100 @@ func (st *statsTask) createTextIndex(ctx context.Context, zap.Duration("total elapse", totalElapse)) return nil } + +func (st *statsTask) createJsonKeyIndex(ctx context.Context, + storageConfig *indexpb.StorageConfig, + collectionID int64, + partitionID int64, + segmentID int64, + version int64, + taskID int64, + insertBinlogs []*datapb.FieldBinlog, +) error { + log := log.Ctx(ctx).With( + zap.String("clusterID", st.req.GetClusterID()), + zap.Int64("taskID", st.req.GetTaskID()), + zap.Int64("collectionID", st.req.GetCollectionID()), + zap.Int64("partitionID", st.req.GetPartitionID()), + zap.Int64("segmentID", st.req.GetSegmentID()), + zap.Any("statsJobType", st.req.GetSubJobType()), + ) + + fieldBinlogs := lo.GroupBy(insertBinlogs, func(binlog *datapb.FieldBinlog) int64 { + return binlog.GetFieldID() + }) + + getInsertFiles := func(fieldID int64) ([]string, error) { + binlogs, ok := fieldBinlogs[fieldID] + if !ok { + return nil, fmt.Errorf("field binlog not found for field %d", fieldID) + } + result := make([]string, 0, len(binlogs)) + for _, binlog := range binlogs { + for _, file := range binlog.GetBinlogs() { + result = append(result, metautil.BuildInsertLogPath(storageConfig.GetRootPath(), collectionID, partitionID, segmentID, fieldID, file.GetLogID())) + } + } + return result, nil + } + + newStorageConfig, err := ParseStorageConfig(storageConfig) + if err != nil { + return err + } + + jsonKeyIndexStats := make(map[int64]*datapb.JsonKeyStats) + for _, field := range st.req.GetSchema().GetFields() { + h := typeutil.CreateFieldSchemaHelper(field) + if !h.EnableJsonKeyIndex() { + continue + } + log.Info("field enable json key index, ready to create json key index", zap.Int64("field id", field.GetFieldID())) + // create text index and upload the text index files. + files, err := getInsertFiles(field.GetFieldID()) + if err != nil { + return err + } + + buildIndexParams := &indexcgopb.BuildIndexInfo{ + BuildID: taskID, + CollectionID: collectionID, + PartitionID: partitionID, + SegmentID: segmentID, + IndexVersion: version, + InsertFiles: files, + FieldSchema: field, + StorageConfig: newStorageConfig, + } + + uploaded, err := indexcgowrapper.CreateJsonKeyIndex(ctx, buildIndexParams) + if err != nil { + return err + } + jsonKeyIndexStats[field.GetFieldID()] = &datapb.JsonKeyStats{ + FieldID: field.GetFieldID(), + Version: version, + BuildID: taskID, + Files: lo.Keys(uploaded), + } + log.Info("field enable json key index, create json key index done", + zap.Int64("field id", field.GetFieldID()), + zap.Strings("files", lo.Keys(uploaded)), + ) + } + + totalElapse := st.tr.RecordSpan() + + st.node.storeJsonKeyIndexResult(st.req.GetClusterID(), + st.req.GetTaskID(), + st.req.GetCollectionID(), + st.req.GetPartitionID(), + st.req.GetTargetSegmentID(), + st.req.GetInsertChannel(), + jsonKeyIndexStats) + + log.Info("create json key index done", + zap.Int64("target segmentID", st.req.GetTargetSegmentID()), + zap.Duration("total elapse", totalElapse)) + return nil +} diff --git a/internal/indexnode/taskinfo_ops.go b/internal/indexnode/taskinfo_ops.go index 23c71983fe50c..ebaffb449f6a6 100644 --- a/internal/indexnode/taskinfo_ops.go +++ b/internal/indexnode/taskinfo_ops.go @@ -312,18 +312,19 @@ func (i *IndexNode) waitTaskFinish() { } type statsTaskInfo struct { - cancel context.CancelFunc - state indexpb.JobState - failReason string - collID UniqueID - partID UniqueID - segID UniqueID - insertChannel string - numRows int64 - insertLogs []*datapb.FieldBinlog - statsLogs []*datapb.FieldBinlog - textStatsLogs map[int64]*datapb.TextIndexStats + cancel context.CancelFunc + state indexpb.JobState + failReason string + collID UniqueID + partID UniqueID + segID UniqueID + insertChannel string + numRows int64 + insertLogs []*datapb.FieldBinlog + statsLogs []*datapb.FieldBinlog + textStatsLogs map[int64]*datapb.TextIndexStats bm25Logs []*datapb.FieldBinlog + jsonKeyStatsLogs map[int64]*datapb.JsonKeyStats } func (i *IndexNode) loadOrStoreStatsTask(clusterID string, taskID UniqueID, info *statsTaskInfo) *statsTaskInfo { @@ -410,24 +411,45 @@ func (i *IndexNode) storeStatsTextIndexResult( } } +func (i *IndexNode) storeJsonKeyIndexResult( + clusterID string, + taskID UniqueID, + collID UniqueID, + partID UniqueID, + segID UniqueID, + channel string, + jsonKeyIndexLogs map[int64]*datapb.JsonKeyStats) { + key := taskKey{ClusterID: clusterID, TaskID: taskID} + i.stateLock.Lock() + defer i.stateLock.Unlock() + if info, ok := i.statsTasks[key]; ok { + info.jsonKeyStatsLogs = jsonKeyIndexLogs + info.segID = segID + info.collID = collID + info.partID = partID + info.insertChannel = channel + } +} + func (i *IndexNode) getStatsTaskInfo(clusterID string, taskID UniqueID) *statsTaskInfo { i.stateLock.Lock() defer i.stateLock.Unlock() if info, ok := i.statsTasks[taskKey{ClusterID: clusterID, TaskID: taskID}]; ok { return &statsTaskInfo{ - cancel: info.cancel, - state: info.state, - failReason: info.failReason, - collID: info.collID, - partID: info.partID, - segID: info.segID, - insertChannel: info.insertChannel, - numRows: info.numRows, - insertLogs: info.insertLogs, - statsLogs: info.statsLogs, - textStatsLogs: info.textStatsLogs, + cancel: info.cancel, + state: info.state, + failReason: info.failReason, + collID: info.collID, + partID: info.partID, + segID: info.segID, + insertChannel: info.insertChannel, + numRows: info.numRows, + insertLogs: info.insertLogs, + statsLogs: info.statsLogs, + textStatsLogs: info.textStatsLogs, bm25Logs: info.bm25Logs, + jsonKeyStatsLogs: info.jsonKeyStatsLogs, } } return nil diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 4b84970de2693..cf11803bab154 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -357,6 +357,9 @@ message SegmentInfo { // textStatsLogs is used to record tokenization index for fields. map textStatsLogs = 26; repeated FieldBinlog bm25statslogs = 27; + + // jsonKeyStats is used to record json key index for fields. + map jsonKeyStats = 28; } message SegmentStartPosition { @@ -444,6 +447,15 @@ message TextIndexStats { int64 buildID = 6; } +message JsonKeyStats { + int64 fieldID = 1; + int64 version = 2; + repeated string files = 3; + int64 log_size = 4; + int64 memory_size = 5; + int64 buildID = 6; +} + message Binlog { int64 entries_num = 1; uint64 timestamp_from = 2; diff --git a/internal/proto/index_cgo_msg.proto b/internal/proto/index_cgo_msg.proto index 92e98100f35ec..4e1be8c6991ae 100644 --- a/internal/proto/index_cgo_msg.proto +++ b/internal/proto/index_cgo_msg.proto @@ -92,3 +92,13 @@ message LoadTextIndexInfo { int64 collectionID = 6; int64 partitionID = 7; } + +message LoadJsonKeyIndexInfo { + int64 FieldID = 1; + int64 version = 2; + int64 buildID = 3; + repeated string files = 4; + schema.FieldSchema schema = 5; + int64 collectionID = 6; + int64 partitionID = 7; +} diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 7377954ebaf12..e3bf9e4d23e26 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -321,4 +321,5 @@ enum StatsSubJob { Sort = 1; TextIndexJob = 2; BM25Job=3; + JsonKeyIndexJob = 4; } diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index ef5c8d8d1b66b..6809f964024a4 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -368,6 +368,7 @@ message SegmentLoadInfo { bool is_sorted = 19; map textStatsLogs = 20; repeated data.FieldBinlog bm25logs = 21; + map jsonKeyStatsLogs = 22; } message FieldIndexInfo { diff --git a/internal/proto/worker.proto b/internal/proto/worker.proto index 6f8f72a0a441f..ca3d1160fadcb 100644 --- a/internal/proto/worker.proto +++ b/internal/proto/worker.proto @@ -199,6 +199,7 @@ message StatsResult { map text_stats_logs = 10; int64 num_rows = 11; repeated data.FieldBinlog bm25_logs = 12; + map json_key_stats_logs = 13; } message StatsResults { diff --git a/internal/querycoordv2/utils/types.go b/internal/querycoordv2/utils/types.go index 511081d73763b..8056449ff07c7 100644 --- a/internal/querycoordv2/utils/types.go +++ b/internal/querycoordv2/utils/types.go @@ -74,22 +74,23 @@ func PackSegmentLoadInfo(segment *datapb.SegmentInfo, channelCheckpoint *msgpb.M zap.Duration("tsLag", tsLag)) } loadInfo := &querypb.SegmentLoadInfo{ - SegmentID: segment.ID, - PartitionID: segment.PartitionID, - CollectionID: segment.CollectionID, - BinlogPaths: segment.Binlogs, - NumOfRows: segment.NumOfRows, - Statslogs: segment.Statslogs, - Deltalogs: segment.Deltalogs, - Bm25Logs: segment.Bm25Statslogs, - InsertChannel: segment.InsertChannel, - IndexInfos: indexes, - StartPosition: segment.GetStartPosition(), - DeltaPosition: channelCheckpoint, - Level: segment.GetLevel(), - StorageVersion: segment.GetStorageVersion(), - IsSorted: segment.GetIsSorted(), - TextStatsLogs: segment.GetTextStatsLogs(), + SegmentID: segment.ID, + PartitionID: segment.PartitionID, + CollectionID: segment.CollectionID, + BinlogPaths: segment.Binlogs, + NumOfRows: segment.NumOfRows, + Statslogs: segment.Statslogs, + Deltalogs: segment.Deltalogs, + Bm25Logs: segment.Bm25Statslogs, + InsertChannel: segment.InsertChannel, + IndexInfos: indexes, + StartPosition: segment.GetStartPosition(), + DeltaPosition: channelCheckpoint, + Level: segment.GetLevel(), + StorageVersion: segment.GetStorageVersion(), + IsSorted: segment.GetIsSorted(), + TextStatsLogs: segment.GetTextStatsLogs(), + JsonKeyStatsLogs: segment.GetJsonKeyStats(), } return loadInfo } diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 4b651e12cc7ab..b3818327bfbf9 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1274,6 +1274,40 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI return HandleCStatus(ctx, &status, "LoadTextIndex failed") } +func (s *LocalSegment) LoadJsonKeyIndex(ctx context.Context, jsonKeyStats *datapb.JsonKeyStats, schemaHelper *typeutil.SchemaHelper) error { + log.Ctx(ctx).Info("load json key index", zap.Int64("field id", jsonKeyStats.GetFieldID()), zap.Any("json key logs", jsonKeyStats)) + + f, err := schemaHelper.GetFieldFromID(jsonKeyStats.GetFieldID()) + if err != nil { + return err + } + + cgoProto := &indexcgopb.LoadJsonKeyIndexInfo{ + FieldID: jsonKeyStats.GetFieldID(), + Version: jsonKeyStats.GetVersion(), + BuildID: jsonKeyStats.GetBuildID(), + Files: jsonKeyStats.GetFiles(), + Schema: f, + CollectionID: s.Collection(), + PartitionID: s.Partition(), + } + + marshaled, err := proto.Marshal(cgoProto) + if err != nil { + return err + } + + var status C.CStatus + _, _ = GetLoadPool().Submit(func() (any, error) { + traceCtx := ParseCTraceContext(ctx) + status = C.LoadJsonKeyIndex(traceCtx.ctx, s.ptr, (*C.uint8_t)(unsafe.Pointer(&marshaled[0])), (C.uint64_t)(len(marshaled))) + return nil, nil + }).Await() + + return HandleCStatus(ctx, &status, "Load JsonKeyStats failed") + +} + func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { log := log.Ctx(ctx).With( zap.Int64("collectionID", s.Collection()), diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 4149028deb7bf..14074564972a5 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -679,6 +679,7 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll []*datapb.FieldBinlog, // fields info map[int64]*datapb.TextIndexStats, // text indexed info map[int64]struct{}, // unindexed text fields + map[int64]*datapb.JsonKeyStats, // json key stats info ) { fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) for _, indexInfo := range loadInfo.IndexInfos { @@ -715,6 +716,16 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll } } + jsonKeyIndexInfo := make(map[int64]*datapb.JsonKeyStats, len(loadInfo.GetJsonKeyStatsLogs())) + for _, fieldStatsLog := range loadInfo.GetJsonKeyStatsLogs() { + jsonKeyLog, ok := jsonKeyIndexInfo[fieldStatsLog.FieldID] + if !ok { + jsonKeyIndexInfo[fieldStatsLog.FieldID] = fieldStatsLog + } else if fieldStatsLog.GetVersion() > jsonKeyLog.GetVersion() { + jsonKeyIndexInfo[fieldStatsLog.FieldID] = fieldStatsLog + } + } + unindexedTextFields := make(map[int64]struct{}) for _, field := range schema.GetFields() { h := typeutil.CreateFieldSchemaHelper(field) @@ -724,7 +735,7 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll } } - return indexedFieldInfos, fieldBinlogs, textIndexedInfo, unindexedTextFields + return indexedFieldInfos, fieldBinlogs, textIndexedInfo, unindexedTextFields, jsonKeyIndexInfo } func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *querypb.SegmentLoadInfo, segment *LocalSegment) (err error) { @@ -748,7 +759,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu collection := segment.GetCollection() schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) - indexedFieldInfos, fieldBinlogs, textIndexes, unindexedTextFields := separateLoadInfoV2(loadInfo, collection.Schema()) + indexedFieldInfos, fieldBinlogs, textIndexes, unindexedTextFields, jsonKeyStats := separateLoadInfoV2(loadInfo, collection.Schema()) if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil { return err } @@ -759,6 +770,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), zap.Int64s("indexed text fields", lo.Keys(textIndexes)), zap.Int64s("unindexed text fields", lo.Keys(unindexedTextFields)), + zap.Int64s("indexed json key fields", lo.Keys(jsonKeyStats)), ) if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { return err @@ -805,6 +817,13 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu } } + for _, info := range jsonKeyStats { + if err := segment.LoadJsonKeyIndex(ctx, info, schemaHelper); err != nil { + return err + } + } + loadJsonKeyIndexesSpan := tr.RecordSpan() + // 4. rectify entries number for binlog in very rare cases // https://github.com/milvus-io/milvus/23654 // legacy entry num = 0 @@ -818,6 +837,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu zap.Duration("loadRawDataSpan", loadRawDataSpan), zap.Duration("patchEntryNumberSpan", patchEntryNumberSpan), zap.Duration("loadTextIndexesSpan", loadTextIndexesSpan), + zap.Duration("loadJsonKeyIndexSpan", loadJsonKeyIndexesSpan), ) return nil } @@ -836,6 +856,7 @@ func (loader *segmentLoader) LoadSegment(ctx context.Context, zap.String("shard", segment.Shard().VirtualName()), zap.Int64("segmentID", segment.ID()), ) + log.Info("start loading segment files", zap.Int64("rowNum", loadInfo.GetNumOfRows()), zap.String("segmentType", segment.Type().String())) diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index c87a3801feeae..255fc99d43e4a 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -163,6 +163,43 @@ func CreateTextIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexI return res, nil } +func CreateJsonKeyIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (map[string]int64, error) { + buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo) + if err != nil { + log.Ctx(ctx).Warn("marshal buildIndexInfo failed", + zap.String("clusterID", buildIndexInfo.GetClusterID()), + zap.Int64("buildID", buildIndexInfo.GetBuildID()), + zap.Error(err)) + return nil, err + } + var cBinarySet C.CBinarySet + status := C.BuildJsonKeyIndex(&cBinarySet, (*C.uint8_t)(unsafe.Pointer(&buildIndexInfoBlob[0])), (C.uint64_t)(len(buildIndexInfoBlob))) + if err := HandleCStatus(&status, "failed to build json key index"); err != nil { + return nil, err + } + + defer func() { + if cBinarySet != nil { + C.DeleteBinarySet(cBinarySet) + } + }() + + res := make(map[string]int64) + indexFilePaths, err := GetBinarySetKeys(cBinarySet) + if err != nil { + return nil, err + } + for _, path := range indexFilePaths { + size, err := GetBinarySetSize(cBinarySet, path) + if err != nil { + return nil, err + } + res[path] = size + } + + return res, nil +} + // TODO: this seems to be used only for test. We should mark the method // name with ForTest, or maybe move to test file. func (index *CgoIndex) Build(dataset *Dataset) error { diff --git a/pkg/util/typeutil/field_schema.go b/pkg/util/typeutil/field_schema.go index bbf3ab446719a..561c82e871878 100644 --- a/pkg/util/typeutil/field_schema.go +++ b/pkg/util/typeutil/field_schema.go @@ -53,6 +53,13 @@ func (h *FieldSchemaHelper) EnableMatch() bool { return err == nil && enable } +func (h *FieldSchemaHelper) EnableJsonKeyIndex() bool { + if IsJSONType(h.schema.GetDataType()) { + return true + } + return false +} + func (h *FieldSchemaHelper) EnableTokenizer() bool { if !IsStringType(h.schema.GetDataType()) { return false