diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 8710e4f6e1ea4..2c556d4b54b6e 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -45,11 +45,11 @@ etcd: dir: default.etcd # Embedded Etcd only. please adjust in embedded Milvus: /tmp/milvus/etcdData/ auth: enabled: false # Whether to enable authentication - userName: # username for etcd authentication - password: # password for etcd authentication + userName: # username for etcd authentication + password: # password for etcd authentication metastore: - type: etcd # Default value: etcd, Valid values: [etcd, tikv] + type: etcd # Default value: etcd, Valid values: [etcd, tikv] # Related configuration of tikv, used to store Milvus metadata. # Notice that when TiKV is enabled for metastore, you still need to have etcd for service discovery. @@ -63,9 +63,9 @@ tikv: snapshotScanSize: 256 # batch size of tikv snapshot scan ssl: enabled: false # Whether to support TiKV secure connection mode - tlsCert: # path to your cert file - tlsKey: # path to your key file - tlsCACert: # path to your CACert file + tlsCert: # path to your cert file + tlsKey: # path to your key file + tlsCACert: # path to your CACert file localStorage: path: /var/lib/milvus/data/ # please adjust in embedded Milvus: /tmp/milvus/data/ @@ -97,12 +97,12 @@ minio: cloudProvider: aws # Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws". # Leave it empty if you want to use AWS default endpoint - iamEndpoint: + iamEndpoint: logLevel: fatal # Log level for aws sdk log. Supported level: off, fatal, error, warn, info, debug, trace - region: # Specify minio storage system location region + region: # Specify minio storage system location region useVirtualHost: false # Whether use virtual host mode for bucket requestTimeoutMs: 10000 # minio timeout for request time in milliseconds - # The maximum number of objects requested per batch in minio ListObjects rpc, + # The maximum number of objects requested per batch in minio ListObjects rpc, # 0 means using oss client by default, decrease these configration if ListObjects timeout listObjectsMaxKeys: 0 @@ -137,11 +137,11 @@ pulsar: # If you want to enable kafka, needs to comment the pulsar configs # kafka: -# brokerList: -# saslUsername: -# saslPassword: -# saslMechanisms: -# securityProtocol: +# brokerList: +# saslUsername: +# saslPassword: +# saslMechanisms: +# securityProtocol: # ssl: # enabled: false # whether to enable ssl mode # tlsCert: # path to client's public key (PEM) used for authentication @@ -179,8 +179,8 @@ natsmq: logSizeLimit: 536870912 # Size in bytes after the log file rolls over to a new one retention: maxAge: 4320 # Maximum age of any message in the P-channel - maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size - maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit + maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size + maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit # Related configuration of rootCoord, used to handle data definition language (DDL) and data control language (DCL) requests rootCoord: @@ -191,7 +191,7 @@ rootCoord: maxDatabaseNum: 64 # Maximum number of database maxGeneralCapacity: 65536 # upper limit for the sum of of product of partitionNumber and shardNumber gracefulStopTimeout: 5 # seconds. force stop node without graceful stop - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 53100 grpc: serverMaxSendSize: 536870912 @@ -224,7 +224,7 @@ proxy: enable: false # if use access log minioEnable: false # if upload sealed access log file to minio localPath: /tmp/milvus_access - filename: # Log filename, leave empty to use stdout. + filename: # Log filename, leave empty to use stdout. maxSize: 64 # Max size for a single file, in MB. cacheSize: 0 # Size of log write cache, in B cacheFlushInterval: 3 # time interval of auto flush write cache, in Seconds. (Close auto flush if interval was 0) @@ -245,10 +245,10 @@ proxy: http: enabled: true # Whether to enable the http server debug_mode: false # Whether to enable http server debug mode - port: # high-level restful api + port: # high-level restful api acceptTypeAllowInt64: true # high-level restful api, whether http client can deal with int64 enablePprof: true # Whether to enable pprof middleware on the metrics port - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 19530 internalPort: 19529 grpc: @@ -306,7 +306,7 @@ queryCoord: enableStoppingBalance: true # whether enable stopping balance channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 19531 grpc: serverMaxSendSize: 536870912 @@ -335,9 +335,9 @@ queryNode: enabled: true memoryLimit: 2147483648 # 2 GB, 2 * 1024 *1024 *1024 readAheadPolicy: willneed # The read ahead policy of chunk cache, options: `normal, random, sequential, willneed, dontneed` - # options: async, sync, disable. - # Specifies the necessity for warming up the chunk cache. - # 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the + # options: async, sync, disable. + # Specifies the necessity for warming up the chunk cache. + # 1. If set to "sync" or "async" the original vector data will be synchronously/asynchronously loaded into the # chunk cache during the load process. This approach has the potential to substantially reduce query/search latency # for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage; # 2. If set to "disable" original vector data will only be loaded into the chunk cache during search/query. @@ -387,7 +387,7 @@ queryNode: maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph enableSegmentPrune: false # use partition prune function on shard delegator queryStreamBatchSize: 4194304 # return batch size of stream query - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 21123 grpc: serverMaxSendSize: 536870912 @@ -409,7 +409,7 @@ indexNode: buildParallel: 1 enableDisk: true # enable index node build disk vector index maxDiskUsagePercentage: 95 - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 21121 grpc: serverMaxSendSize: 536870912 @@ -446,7 +446,7 @@ dataCoord: compactableProportion: 0.85 # over (compactableProportion * segment max # of rows) rows. # MUST BE GREATER THAN OR EQUAL TO !!! - # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%. + # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%. expansionRate: 1.25 segmentFlushInterval: 2 # the minimal interval duration(unit: Seconds) between flusing operation on same segment autoUpgradeSegmentIndex: false # whether auto upgrade segment index to index engine's version @@ -472,7 +472,7 @@ dataCoord: # clustering compaction will try best to distribute data into segments with size range in [preferSegmentSize, maxSegmentSize]. # data will be clustered by preferSegmentSize, if a cluster is larger than maxSegmentSize, will spilt it into multi segment # buffer between (preferSegmentSize, maxSegmentSize) is left for new data in the same cluster(range), to avoid globally redistribute too often - preferSegmentSize: 512m + preferSegmentSize: 512m maxSegmentSize: 1024m maxTrainSizeRatio: 0.8 # max data size ratio in analyze, if data is larger than it, will down sampling to meet this limit maxCentroidsNum: 10240 @@ -508,7 +508,7 @@ dataCoord: maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request. waitForIndex: true # Indicates whether the import operation waits for the completion of index building. gracefulStopTimeout: 5 # seconds. force stop node without graceful stop - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 13333 grpc: serverMaxSendSize: 536870912 @@ -556,7 +556,7 @@ dataNode: levelZeroBatchMemoryRatio: 0.05 # The minimal memory ratio of free memory for level zero compaction executing in batch mode levelZeroMaxBatchSize: -1 # Max batch size refers to the max number of L1/L2 segments in a batch when executing L0 compaction. Default to -1, any value that is less than 1 means no limit. Valid range: >= 1. gracefulStopTimeout: 1800 # seconds. force stop node without graceful stop - ip: # if not specified, use the first unicastable address + ip: # if not specified, use the first unicastable address port: 21124 grpc: serverMaxSendSize: 536870912 @@ -573,7 +573,7 @@ dataNode: log: level: info # Only supports debug, info, warn, error, panic, or fatal. Default 'info'. file: - rootPath: # root dir path to put logs, default "" means no log file will print. please adjust in embedded Milvus: /tmp/milvus/logs + rootPath: # root dir path to put logs, default "" means no log file will print. please adjust in embedded Milvus: /tmp/milvus/logs maxSize: 300 # MB maxAge: 10 # Maximum time for log retention in day. maxBackups: 20 @@ -631,7 +631,7 @@ common: authorizationEnabled: false # The superusers will ignore some system check processes, # like the old password verification when updating the credential - superUsers: + superUsers: tlsMode: 0 session: ttl: 30 # ttl value when session granting a lease to register service @@ -832,9 +832,9 @@ trace: # Fractions >= 1 will always sample. Fractions < 0 are treated as zero. sampleFraction: 0 jaeger: - url: # when exporter is jaeger should set the jaeger's URL + url: # when exporter is jaeger should set the jaeger's URL otlp: - endpoint: # example: "127.0.0.1:4318" + endpoint: # example: "127.0.0.1:4318" secure: true #when using GPU indexing, Milvus will utilize a memory pool to avoid frequent memory allocation and deallocation. @@ -844,5 +844,5 @@ trace: #milvus will automatically initialize half of the available GPU memory, #maxMemSize will the whole available GPU memory. gpu: - initMemSize: # Gpu Memory Pool init size - maxMemSize: # Gpu Memory Pool Max size + initMemSize: # Gpu Memory Pool init size + maxMemSize: # Gpu Memory Pool Max size diff --git a/internal/core/src/common/FieldData.cpp b/internal/core/src/common/FieldData.cpp index 792d1b209cfbe..bd913d6541567 100644 --- a/internal/core/src/common/FieldData.cpp +++ b/internal/core/src/common/FieldData.cpp @@ -43,7 +43,7 @@ FieldDataImpl::FillFieldData(const void* source, } std::copy_n(static_cast(source), element_count * dim_, - field_data_.data() + length_ * dim_); + data_.data() + length_ * dim_); length_ += element_count; } @@ -64,15 +64,15 @@ FieldDataImpl::FillFieldData( } std::copy_n(static_cast(field_data), element_count * dim_, - field_data_.data() + length_ * dim_); + data_.data() + length_ * dim_); ssize_t byte_count = (element_count + 7) / 8; // Note: if 'nullable == true` and valid_data is nullptr // means null_count == 0, will fill it with 0xFF if (valid_data == nullptr) { - std::fill_n(valid_data_.get(), byte_count, 0xFF); + valid_data_.resize(byte_count, 0xFF); } else { - std::copy_n(valid_data, byte_count, valid_data_.get()); + std::copy_n(valid_data, byte_count, valid_data_.data()); } length_ += element_count; diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index 6205ffc3321cc..3bfd65615b8b1 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -64,8 +64,8 @@ class FieldDataBase { virtual void* Data() = 0; - virtual const uint8_t* - ValidData() const = 0; + virtual uint8_t* + ValidData() = 0; // For all FieldDataImpl subclasses, this method returns a Type* that points // at the offset-th row of this field data. @@ -117,7 +117,7 @@ class FieldDataBase { get_null_count() const = 0; virtual bool - is_null(ssize_t offset) const = 0; + is_valid(ssize_t offset) const = 0; protected: const DataType data_type_; @@ -143,25 +143,38 @@ class FieldDataImpl : public FieldDataBase { : FieldDataBase(data_type, nullable), num_rows_(buffered_num_rows), dim_(is_type_entire_row ? 1 : dim) { - field_data_.resize(num_rows_ * dim_); + data_.resize(num_rows_ * dim_); if (nullable) { - if (IsVectorDataType(data_type)){ - PanicInfo(NotImplemented, - "vector type not support null"); + if (IsVectorDataType(data_type)) { + PanicInfo(NotImplemented, "vector type not support null"); } - valid_data_ = - std::shared_ptr(new uint8_t[(num_rows_ + 7) / 8]); + valid_data_.resize((num_rows_ + 7) / 8); } } explicit FieldDataImpl(size_t dim, DataType type, bool nullable, - FixedVector&& field_data) + FixedVector&& data) : FieldDataBase(type, nullable), dim_(is_type_entire_row ? 1 : dim) { - field_data_ = std::move(field_data); - Assert(field_data.size() % dim == 0); - num_rows_ = field_data.size() / dim; + AssertInfo(!nullable, "need to fill valid_data when nullable is true"); + data_ = std::move(data); + Assert(data.size() % dim == 0); + num_rows_ = data.size() / dim; + } + + explicit FieldDataImpl(size_t dim, + DataType type, + bool nullable, + FixedVector&& data, + FixedVector&& valid_data) + : FieldDataBase(type, nullable), dim_(is_type_entire_row ? 1 : dim) { + AssertInfo(nullable, + "no need to fill valid_data when nullable is false"); + data_ = std::move(data); + valid_data_ = std::move(valid_data); + Assert(data.size() % dim == 0); + num_rows_ = data.size() / dim; } void @@ -196,12 +209,12 @@ class FieldDataImpl : public FieldDataBase { void* Data() override { - return field_data_.data(); + return data_.data(); } - const uint8_t* - ValidData() const override { - return valid_data_.get(); + uint8_t* + ValidData() override { + return valid_data_.data(); } const void* @@ -210,23 +223,23 @@ class FieldDataImpl : public FieldDataBase { "field data subscript out of range"); AssertInfo(offset < length(), "subscript position don't has valid value"); - return &field_data_[offset]; - } - - std::optional - Value(ssize_t offset) { - if (!is_type_entire_row) { - return RawValue(offset); - } - AssertInfo(offset < get_num_rows(), - "field data subscript out of range"); - AssertInfo(offset < length(), - "subscript position don't has valid value"); - if (nullable_ && !valid_data_[offset]) { - return std::nullopt; - } - return &field_data_[offset]; - } + return &data_[offset]; + } + + // std::optional + // Value(ssize_t offset) { + // if (!is_type_entire_row) { + // return RawValue(offset); + // } + // AssertInfo(offset < get_num_rows(), + // "field data subscript out of range"); + // AssertInfo(offset < length(), + // "subscript position don't has valid value"); + // if (nullable_ && !valid_data_[offset]) { + // return std::nullopt; + // } + // return &field_data_[offset]; + // } int64_t Size() const override { @@ -250,8 +263,7 @@ class FieldDataImpl : public FieldDataBase { int64_t ValidDataSize() const override { if (nullable_) { - int byteSize = (length() + 7) / 8; - return sizeof(uint8_t) * byteSize; + return sizeof(uint8_t) * (length() + 7) / 8; } return 0; } @@ -278,10 +290,10 @@ class FieldDataImpl : public FieldDataBase { std::lock_guard lck(num_rows_mutex_); if (cap > num_rows_) { num_rows_ = cap; - field_data_.resize(num_rows_ * dim_); + data_.resize(num_rows_ * dim_); } if (nullable_) { - valid_data_ = std::shared_ptr(new uint8_t[num_rows_]); + valid_data_.reserve((num_rows_ + 7) / 8); } } @@ -297,11 +309,9 @@ class FieldDataImpl : public FieldDataBase { std::lock_guard lck(num_rows_mutex_); if (num_rows > num_rows_) { num_rows_ = num_rows; - field_data_.resize(num_rows_ * dim_); + data_.resize(num_rows_ * dim_); if (nullable_) { - ssize_t byte_count = (num_rows + 7) / 8; - valid_data_ = - std::shared_ptr(new uint8_t[byte_count]); + valid_data_.resize((num_rows + 7) / 8); } } } @@ -324,23 +334,23 @@ class FieldDataImpl : public FieldDataBase { } bool - is_null(ssize_t offset) const override { + is_valid(ssize_t offset) const override { std::shared_lock lck(tell_mutex_); if (!nullable_) { - return false; + return true; } auto bit = (valid_data_[offset >> 3] >> ((offset & 0x07))) & 1; - return !bit; + return bit; } protected: - FixedVector field_data_; - std::shared_ptr valid_data_; - // number of elements field_data_ can hold + FixedVector data_; + FixedVector valid_data_; + // number of elements data_ can hold int64_t num_rows_; mutable std::shared_mutex num_rows_mutex_; - int64_t null_count; - // number of actual elements in field_data_ + int64_t null_count{0}; + // number of actual elements in data_ size_t length_{}; mutable std::shared_mutex tell_mutex_; @@ -361,7 +371,7 @@ class FieldDataStringImpl : public FieldDataImpl { DataSize() const override { int64_t data_size = 0; for (size_t offset = 0; offset < length(); ++offset) { - data_size += field_data_[offset].size(); + data_size += data_[offset].size(); } return data_size; @@ -373,7 +383,7 @@ class FieldDataStringImpl : public FieldDataImpl { "field data subscript out of range"); AssertInfo(offset < length(), "subscript position don't has valid value"); - return field_data_[offset].size(); + return data_[offset].size(); } void @@ -390,9 +400,17 @@ class FieldDataStringImpl : public FieldDataImpl { auto i = 0; for (const auto& str : *array) { - field_data_[length_ + i] = str.value(); + data_[length_ + i] = str.value(); i++; } + if (IsNullable()) { + auto valid_data = array->null_bitmap_data(); + if (valid_data == nullptr) { + valid_data_.push_back(0xFF); + } else { + std::copy_n(valid_data, (n + 7) / 8, valid_data_.data()); + } + } length_ += n; } }; @@ -409,7 +427,7 @@ class FieldDataJsonImpl : public FieldDataImpl { DataSize() const override { int64_t data_size = 0; for (size_t offset = 0; offset < length(); ++offset) { - data_size += field_data_[offset].data().size(); + data_size += data_[offset].data().size(); } return data_size; @@ -421,7 +439,7 @@ class FieldDataJsonImpl : public FieldDataImpl { "field data subscript out of range"); AssertInfo(offset < length(), "subscript position don't has valid value"); - return field_data_[offset].data().size(); + return data_[offset].data().size(); } void @@ -448,10 +466,17 @@ class FieldDataJsonImpl : public FieldDataImpl { auto i = 0; for (const auto& json : *array) { - field_data_[length_ + i] = - Json(simdjson::padded_string(json.value())); + data_[length_ + i] = Json(simdjson::padded_string(json.value())); i++; } + if (IsNullable()) { + auto valid_data = array->null_bitmap_data(); + if (valid_data == nullptr) { + valid_data_.push_back(0xFF); + } else { + std::copy_n(valid_data, (n + 7) / 8, valid_data_.data()); + } + } length_ += n; } }; @@ -472,7 +497,7 @@ class FieldDataSparseVectorImpl DataSize() const override { int64_t data_size = 0; for (size_t i = 0; i < length(); ++i) { - data_size += field_data_[i].data_byte_size(); + data_size += data_[i].data_byte_size(); } return data_size; } @@ -483,7 +508,7 @@ class FieldDataSparseVectorImpl "field data subscript out of range"); AssertInfo(offset < length(), "subscript position don't has valid value"); - return field_data_[offset].data_byte_size(); + return data_[offset].data_byte_size(); } // source is a pointer to element_count of @@ -504,7 +529,7 @@ class FieldDataSparseVectorImpl auto& row = ptr[i]; vec_dim_ = std::max(vec_dim_, row.dim()); } - std::copy_n(ptr, element_count, field_data_.data() + length_); + std::copy_n(ptr, element_count, data_.data() + length_); length_ += element_count; } @@ -523,7 +548,7 @@ class FieldDataSparseVectorImpl for (int64_t i = 0; i < array->length(); ++i) { auto view = array->GetView(i); - auto& row = field_data_[length_ + i]; + auto& row = data_[length_ + i]; row = CopyAndWrapSparseRow(view.data(), view.size()); vec_dim_ = std::max(vec_dim_, row.dim()); } @@ -548,10 +573,10 @@ class FieldDataArrayImpl : public FieldDataImpl { } int64_t - DataSize() const override { + DataSize() const override { int64_t data_size = 0; for (size_t offset = 0; offset < length(); ++offset) { - data_size += field_data_[offset].byte_size(); + data_size += data_[offset].byte_size(); } return data_size; @@ -563,7 +588,7 @@ class FieldDataArrayImpl : public FieldDataImpl { "field data subscript out of range"); AssertInfo(offset < length(), "subscript position don't has valid value"); - return field_data_[offset].byte_size(); + return data_[offset].byte_size(); } }; diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index e9a4e12176687..d4142b9eecf44 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -71,13 +72,8 @@ class ColumnBase { return; } - if (!field_meta.is_vector()) { - is_scalar = true; - } else { - AssertInfo(!field_meta.is_nullable(), - "only support null in scalar"); - } type_size_ = field_meta.get_sizeof(); + data_cap_size_ = field_meta.get_sizeof() * reserve; // use anon mapping so we are able to free these memory with munmap only @@ -94,18 +90,8 @@ class ColumnBase { mapped_size); if (field_meta.is_nullable()) { - nullable = true; - valid_data_cap_size_ = (reserve + 7) / 8; - mapped_size += valid_data_cap_size_; - valid_data_ = static_cast(mmap(nullptr, - valid_data_cap_size_, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANON, - -1, - 0)); - AssertInfo(valid_data_ != MAP_FAILED, - "failed to create anon map, err: {}", - strerror(errno)); + nullable_ = true; + valid_data_.reserve(reserve); } UpdateMetricWhenMmap(mapped_size); } @@ -124,7 +110,7 @@ class ColumnBase { data_size_(0), data_cap_size_(reserve), mapping_type_(MAP_WITH_MANAGER), - nullable(nullable) { + nullable_(nullable) { AssertInfo((mcm != nullptr) && descriptor != nullptr, "use wrong mmap chunk manager and mmap chunk descriptor to " "create column."); @@ -135,15 +121,8 @@ class ColumnBase { AssertInfo(data_ != nullptr, "fail to create with mmap manager: map_size = {}", mapped_size); - if (nullable) { - nullable = true; - valid_data_cap_size_ = (reserve + 7) / 8; - mapped_size += valid_data_cap_size_; - valid_data_ = (uint8_t*)mcm_->Allocate( - mmap_descriptor_, (uint64_t)valid_data_cap_size_); - AssertInfo(valid_data_ != MAP_FAILED, - "failed to create anon map, err: {}", - strerror(errno)); + if (nullable_) { + valid_data_.reserve(reserve); } } @@ -170,24 +149,10 @@ class ColumnBase { strerror(errno)); madvise(data_, mapped_size, MADV_WILLNEED); - if (!field_meta.is_vector()) { - is_scalar = true; - if (field_meta.is_nullable()) { - nullable = true; - valid_data_cap_size_ = (num_rows_ + 7) / 8; - valid_data_size_ = (num_rows_ + 7) / 8; - mapped_size += valid_data_size_; - valid_data_ = static_cast(mmap(nullptr, - valid_data_cap_size_, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANON, - file.Descriptor(), - 0)); - AssertInfo(valid_data_ != MAP_FAILED, - "failed to create file-backed map, err: {}", - strerror(errno)); - madvise(valid_data_, valid_data_cap_size_, MADV_WILLNEED); - } + // valid_data store in memory + if (field_meta.is_nullable()) { + nullable_ = true; + valid_data_.reserve(num_rows_); } UpdateMetricWhenMmap(mapped_size); @@ -200,20 +165,15 @@ class ColumnBase { int dim, const DataType& data_type, bool nullable) - : type_size_(IsSparseFloatVectorDataType(data_type) - ? 1 - : GetDataTypeSize(data_type, dim)), - num_rows_( - IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)), - data_size_(size), + : data_size_(size), data_cap_size_(size), - nullable(nullable), + nullable_(nullable), mapping_type_(MappingType::MAP_WITH_FILE) { SetPaddingSize(data_type); // use exact same size of file, padding shall be written in file already // see also https://github.com/milvus-io/milvus/issues/34442 - size_t mapped_size = cap_size_; + size_t mapped_size = data_cap_size_; if (!IsVariableDataType(data_type)) { type_size_ = GetDataTypeSize(data_type, dim); num_rows_ = size / type_size_; @@ -223,23 +183,11 @@ class ColumnBase { AssertInfo(data_ != MAP_FAILED, "failed to create file-backed map, err: {}", strerror(errno)); - if (dim == 1) { - is_scalar = true; - if (nullable) { - valid_data_cap_size_ = (num_rows_ + 7) / 8; - valid_data_size_ = (num_rows_ + 7) / 8; - mapped_size += valid_data_size_; - valid_data_ = static_cast(mmap(nullptr, - valid_data_cap_size_, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANON, - file.Descriptor(), - 0)); - AssertInfo(valid_data_ != MAP_FAILED, - "failed to create file-backed map, err: {}", - strerror(errno)); - } + + if (nullable) { + valid_data_.reserve(num_rows_); } + UpdateMetricWhenMmap(mapped_size); } @@ -255,36 +203,25 @@ class ColumnBase { } UpdateMetricWhenMunmap(data_cap_size_ + padding_); } - if (valid_data_ != nullptr) { - if (munmap(valid_data_, valid_data_cap_size_)) { - AssertInfo(true, - "failed to unmap variable field, err={}", - strerror(errno)); - } - UpdateMetricWhenMunmap(valid_data_cap_size_); + if (nullable_) { + valid_data_.clear(); } } ColumnBase(ColumnBase&& column) noexcept : data_(column.data_), - nullable(column.nullable), + nullable_(column.nullable_), valid_data_(column.valid_data_), - valid_data_cap_size_(column.valid_data_cap_size_), - data_cap_size_(column.data_cap_size_), padding_(column.padding_), type_size_(column.type_size_), num_rows_(column.num_rows_), - data_size_(column.data_size_), - valid_data_size_(column.valid_data_size_) { + data_size_(column.data_size_) { column.data_ = nullptr; column.data_cap_size_ = 0; column.padding_ = 0; column.num_rows_ = 0; column.data_size_ = 0; - column.nullable = false; - column.valid_data_ = nullptr; - column.valid_data_cap_size_ = 0; - column.valid_data_size_ = 0; + column.nullable_ = false; } // Data() points at an addr that contains the elements @@ -299,14 +236,14 @@ class ColumnBase { return data_; } - const uint8_t* - ValidData() const { - return valid_data_; + bool + IsValid(size_t offset) const { + return valid_data_[offset]; } bool IsNullable() const { - return nullable; + return nullable_; } size_t @@ -314,11 +251,6 @@ class ColumnBase { return data_size_; } - size_t - ValidDataSize() const { - return valid_data_size_; - } - size_t NumRows() const { return num_rows_; @@ -326,7 +258,8 @@ class ColumnBase { virtual size_t ByteSize() const { - return data_cap_size_ + padding_ + valid_data_cap_size_; + // folly::fbvector implemented with bit compression. + return data_cap_size_ + padding_ + (valid_data_.size() + 7) / 8; } // The capacity of the column, @@ -363,13 +296,24 @@ class ColumnBase { data->DataSize(), data_ + data_size_); data_size_ = required_size; + if (nullable_) { + size_t required_rows = num_rows_ + data->get_num_rows(); + if (required_rows > valid_data_.size()) { + valid_data_.reserve(required_rows * 2); + } + + for (size_t i = 0; i < data->get_num_rows(); i++) { + valid_data_.push_back(data->is_valid(i)); + } + } num_rows_ += data->Length(); - AppendValidData(data->ValidData(), data->ValidDataSize()); } // Append one row virtual void Append(const char* data, size_t size) { + AssertInfo(!nullable_, + "no need to pass valid_data when nullable is false"); size_t required_size = data_size_ + size; if (required_size > data_cap_size_) { ExpandData(required_size * 2); @@ -380,16 +324,19 @@ class ColumnBase { num_rows_++; } - // append valid_data don't need to change num_rows - void - AppendValidData(const uint8_t* valid_data, size_t size) { - if (nullable == true) { - size_t required_size = valid_data_size_ + size; - if (required_size > valid_data_cap_size_) { - ExpandValidData(required_size * 2); - } - std::copy(valid_data, valid_data + size, valid_data_); + // Append one row + virtual void + Append(const char* data, const bool valid_data, size_t size) { + AssertInfo(nullable_, "need to pass valid_data_ when nullable is true"); + size_t required_size = data_size_ + size; + if (required_size > data_cap_size_) { + ExpandData(required_size * 2); } + + std::copy_n(data, size, data_ + data_size_); + valid_data_.push_back(valid_data); + data_size_ = required_size; + num_rows_++; } void @@ -412,6 +359,11 @@ class ColumnBase { } } + void + SetValidData(FixedVector&& valid_data) { + valid_data_ = std::move(valid_data); + } + protected: // only for memory mode and mmap manager mode, not mmap void @@ -472,82 +424,22 @@ class ColumnBase { } } - // only for memory mode and mmap manager mode, not mmap - void - ExpandValidData(size_t new_size) { - if (new_size == 0) { - return; - } - AssertInfo( - mapping_type_ == MappingType::MAP_WITH_ANONYMOUS || - mapping_type_ == MappingType::MAP_WITH_MANAGER, - "expand function only use in anonymous or with mmap manager"); - if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) { - size_t new_mapped_size = new_size + padding_; - auto valid_data = static_cast(mmap(nullptr, - new_mapped_size, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANON, - -1, - 0)); - UpdateMetricWhenMmap(true, new_mapped_size); - - AssertInfo(valid_data != MAP_FAILED, - "failed to expand map: {}, new_map_size={}", - strerror(errno), - new_size + padding_); - - if (valid_data_ != nullptr) { - std::memcpy(valid_data, valid_data_, valid_data_size_); - if (munmap(valid_data_, valid_data_cap_size_ + padding_)) { - auto err = errno; - size_t mapped_size = new_size + padding_; - munmap(valid_data, mapped_size); - UpdateMetricWhenMunmap(mapped_size); - - AssertInfo( - false, - "failed to unmap while expanding: {}, old_map_size={}", - strerror(err), - data_cap_size_ + padding_); - } - UpdateMetricWhenMunmap(data_cap_size_ + padding_); - } - - valid_data_ = valid_data; - valid_data_cap_size_ = new_size; - mapping_type_ = MappingType::MAP_WITH_ANONYMOUS; - } else if (mapping_type_ == MappingType::MAP_WITH_MANAGER) { - size_t new_mapped_size = new_size + padding_; - auto valid_data = mcm_->Allocate(mmap_descriptor_, new_mapped_size); - AssertInfo(valid_data != nullptr, - "fail to create with mmap manager: map_size = {}", - new_mapped_size); - std::memcpy(valid_data, valid_data_, valid_data_cap_size_); - // allocate space only append in one growing segment, so no need to munmap() - valid_data_ = (uint8_t*)valid_data; - valid_data_cap_size_ = new_size; - mapping_type_ = MappingType::MAP_WITH_MANAGER; - } - } - char* data_{nullptr}; - bool nullable{false}; - uint8_t* valid_data_{nullptr}; - size_t valid_data_cap_size_{0}; - // std::shared_ptr valid_data_{nullptr}; - bool is_scalar{false}; + bool nullable_{false}; + // When merging multiple valid_data, the bit operation logic is very complex + // for the reason that, FixedVector use bit granularity for storage and access + // so FixedVector is also used to store valid_data on the sealed segment. + FixedVector valid_data_; // capacity in bytes size_t data_cap_size_{0}; size_t padding_{0}; // type_size_ is not used for sparse float vector column. - const size_t type_size_{1}; + size_t type_size_{1}; size_t num_rows_{0}; // length in bytes storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; size_t data_size_{0}; - size_t valid_data_size_{0}; private: void @@ -724,7 +616,7 @@ class SparseFloatColumn : public ColumnBase { num_rows_ = indices.size(); // so that indices[num_rows_] - indices[num_rows_ - 1] is the size of // the last row. - indices.push_back(data_size_ + valid_data_size_); + indices.push_back(data_size_); for (size_t i = 0; i < num_rows_; i++) { auto vec_size = indices[i + 1] - indices[i]; AssertInfo( @@ -823,7 +715,7 @@ class VariableColumn : public ColumnBase { pos += sizeof(uint32_t) + size; } - return BufferView{pos, data_size_ + valid_data_size_ - (pos - data_)}; + return BufferView{pos, data_size_ - (pos - data_)}; } ViewType @@ -849,7 +741,7 @@ class VariableColumn : public ColumnBase { std::string_view RawAt(const int i) const { - std::string_view((*this)[i]); + return std::string_view((*this)[i]); } void @@ -858,6 +750,9 @@ class VariableColumn : public ColumnBase { indices_.emplace_back(data_size_); auto data = static_cast(chunk->RawValue(i)); data_size_ += sizeof(uint32_t) + data->size(); + if (nullable_) { + valid_data_.push_back(chunk->is_valid(i)); + } } load_buf_.emplace(std::move(chunk)); } @@ -876,10 +771,6 @@ class VariableColumn : public ColumnBase { data_size_ = 0; ExpandData(total_data_size); - size_t total_valid_data_size = valid_data_size_; - valid_data_size_ = 0; - ExpandValidData(total_valid_data_size); - while (!load_buf_.empty()) { auto chunk = std::move(load_buf_.front()); load_buf_.pop(); @@ -895,12 +786,11 @@ class VariableColumn : public ColumnBase { data_ + data_size_, data->c_str(), data->size()); data_size_ += data->size(); } - if (nullable == true) { - std::copy(chunk->ValidData(), - chunk->ValidDataSize() + chunk->ValidData(), - valid_data_); + if (nullable_) { + for (size_t i = 0; i < chunk->get_num_rows(); i++) { + valid_data_.push_back(chunk->is_valid(i)); + } } - valid_data_size_ += chunk->ValidDataSize(); } } @@ -982,9 +872,14 @@ class ArrayColumn : public ColumnBase { } void - Append(const Array& array) { + Append(const Array& array, bool valid_data = false) { indices_.emplace_back(data_size_); element_indices_.emplace_back(array.get_offsets()); + if (nullable_) { + return ColumnBase::Append(static_cast(array.data()), + array.byte_size(), + valid_data); + } ColumnBase::Append(static_cast(array.data()), array.byte_size()); } diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index 410e5204d62c8..2824a690d06c3 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -87,10 +87,8 @@ WriteFieldData(File& file, const FieldDataPtr& data, uint64_t& total_written, std::vector& indices, - std::vector>& element_indices) { - if (data->IsNullable()) { - total_written += file.Write(data->ValidData(), data->ValidDataSize()); - } + std::vector>& element_indices, + FixedVector& valid_data) { if (IsVariableDataType(data_type)) { switch (data_type) { case DataType::VARCHAR: @@ -180,5 +178,14 @@ WriteFieldData(File& file, total_written += data->DataSize(i); } } + if (data->IsNullable()) { + size_t required_rows = valid_data.size() + data->get_num_rows(); + if (required_rows > valid_data.size()) { + valid_data.reserve(required_rows * 2); + } + for (size_t i = 0; i < data->get_num_rows(); i++) { + valid_data.push_back(data->is_valid(i)); + } + } } } // namespace milvus diff --git a/internal/core/src/query/SearchOnGrowing.cpp b/internal/core/src/query/SearchOnGrowing.cpp index f228529b1e64b..0b722c2fa8485 100644 --- a/internal/core/src/query/SearchOnGrowing.cpp +++ b/internal/core/src/query/SearchOnGrowing.cpp @@ -105,7 +105,7 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment, segment.get_chunk_mutex()); int32_t current_chunk_id = 0; // step 3: brute force search where small indexing is unavailable - auto vec_ptr = record.get_field_data_base(vecfield_id); + auto vec_ptr = record.get_data_base(vecfield_id); auto vec_size_per_chunk = vec_ptr->get_size_per_chunk(); auto max_chunk = upper_div(active_count, vec_size_per_chunk); diff --git a/internal/core/src/query/groupby/SearchGroupByOperator.h b/internal/core/src/query/groupby/SearchGroupByOperator.h index 41e3d2299dc3c..ac1abbee5000d 100644 --- a/internal/core/src/query/groupby/SearchGroupByOperator.h +++ b/internal/core/src/query/groupby/SearchGroupByOperator.h @@ -41,8 +41,7 @@ class GrowingDataGetter : public DataGetter { const segcore::ConcurrentVector* growing_raw_data_; GrowingDataGetter(const segcore::SegmentGrowingImpl& segment, FieldId fieldId) { - growing_raw_data_ = - segment.get_insert_record().get_field_data(fieldId); + growing_raw_data_ = segment.get_insert_record().get_data(fieldId); } GrowingDataGetter(const GrowingDataGetter& other) diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index eb0abdfdd9d87..37167f232a3b4 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -141,9 +141,7 @@ class VectorBase { const int64_t size_per_chunk_; }; -template +template class ConcurrentVectorImpl : public VectorBase { public: ConcurrentVectorImpl(ConcurrentVectorImpl&&) = delete; @@ -237,42 +235,6 @@ class ConcurrentVectorImpl : public VectorBase { element_offset, static_cast(source), element_count); } - void - set_data(ssize_t element_offset, - const Type* source, - ssize_t element_count) { - auto chunk_id = element_offset / size_per_chunk_; - auto chunk_offset = element_offset % size_per_chunk_; - ssize_t source_offset = 0; - // first partition: - if (chunk_offset + element_count <= size_per_chunk_) { - // only first - fill_chunk( - chunk_id, chunk_offset, element_count, source, source_offset); - return; - } - - auto first_size = size_per_chunk_ - chunk_offset; - fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset); - - source_offset += size_per_chunk_ - chunk_offset; - element_count -= first_size; - ++chunk_id; - - // the middle - while (element_count >= size_per_chunk_) { - fill_chunk(chunk_id, 0, size_per_chunk_, source, source_offset); - source_offset += size_per_chunk_; - element_count -= size_per_chunk_; - ++chunk_id; - } - - // the final - if (element_count > 0) { - fill_chunk(chunk_id, 0, element_count, source, source_offset); - } - } - const void* get_chunk_data(ssize_t chunk_index) const override { return (const void*)chunks_ptr_->get_chunk_data(chunk_index); @@ -329,6 +291,41 @@ class ConcurrentVectorImpl : public VectorBase { } private: + void + set_data(ssize_t element_offset, + const Type* source, + ssize_t element_count) { + auto chunk_id = element_offset / size_per_chunk_; + auto chunk_offset = element_offset % size_per_chunk_; + ssize_t source_offset = 0; + // first partition: + if (chunk_offset + element_count <= size_per_chunk_) { + // only first + fill_chunk( + chunk_id, chunk_offset, element_count, source, source_offset); + return; + } + + auto first_size = size_per_chunk_ - chunk_offset; + fill_chunk(chunk_id, chunk_offset, first_size, source, source_offset); + + source_offset += size_per_chunk_ - chunk_offset; + element_count -= first_size; + ++chunk_id; + + // the middle + while (element_count >= size_per_chunk_) { + fill_chunk(chunk_id, 0, size_per_chunk_, source, source_offset); + source_offset += size_per_chunk_; + element_count -= size_per_chunk_; + ++chunk_id; + } + + // the final + if (element_count > 0) { + fill_chunk(chunk_id, 0, element_count, source, source_offset); + } + } void fill_chunk(ssize_t chunk_id, ssize_t chunk_offset, @@ -461,63 +458,6 @@ class ConcurrentVector int64_t dim_; }; -class ConcurrentValidDataVector : public ConcurrentVectorImpl { - public: - static_assert(IsScalar); - explicit ConcurrentValidDataVector(int64_t size_per_chunk) - : ConcurrentVectorImpl::ConcurrentVectorImpl( - 1, size_per_chunk) { - } - void - set_data_raw(ssize_t element_offset, - const std::vector& datas) override { - for (auto& field_data : datas) { - auto num_rows = field_data->get_num_rows(); - auto valid_data = std::make_unique(num_rows); - for (size_t i = 0; i < num_rows; ++i) { - auto bit = - (field_data->ValidData()[i >> 3] >> ((i & 0x07))) & 1; - valid_data[i] = bit; - } - set_data_raw(element_offset, valid_data.get(), num_rows); - element_offset += num_rows; - } - } - void - set_data_raw(ssize_t element_offset, - ssize_t element_count, - const DataArray* data, - const FieldMeta& field_meta) override { - if (field_meta.is_nullable()) { - return set_data_raw( - element_offset, data->valid_data().data(), element_count); - } - } - - void - set_data_raw(ssize_t element_offset, - const void* source, - ssize_t element_count) override { - throw SegcoreError( - NotImplemented, - "source type is specified in ConcurrentValidDataVector"); - } - - void - set_data_raw(ssize_t element_offset, - const bool* source, - ssize_t element_count) { - if (element_count == 0) { - return; - } - chunks_ptr_->emplace_to_at_least( - upper_div(element_offset + element_count, size_per_chunk_), - elements_per_row_ * size_per_chunk_); - set_data( - element_offset, static_cast(source), element_count); - } -}; - template <> class ConcurrentVector : public ConcurrentVectorImpl { diff --git a/internal/core/src/segcore/FieldIndexing.h b/internal/core/src/segcore/FieldIndexing.h index 19de3974747e1..2585e156f2c61 100644 --- a/internal/core/src/segcore/FieldIndexing.h +++ b/internal/core/src/segcore/FieldIndexing.h @@ -312,7 +312,7 @@ class IndexingRecord { } auto& indexing = field_indexings_.at(fieldId); auto type = indexing->get_field_meta().get_data_type(); - auto field_raw_data = record.get_field_data_base(fieldId); + auto field_raw_data = record.get_data_base(fieldId); if (type == DataType::VECTOR_FLOAT && reserved_offset + size >= indexing->get_build_threshold()) { indexing->AppendSegmentIndexDense( @@ -349,11 +349,11 @@ class IndexingRecord { if (type == DataType::VECTOR_FLOAT && reserved_offset + size >= indexing->get_build_threshold()) { - auto vec_base = record.get_field_data_base(fieldId); + auto vec_base = record.get_data_base(fieldId); indexing->AppendSegmentIndexDense( reserved_offset, size, vec_base, data->Data()); } else if (type == DataType::VECTOR_SPARSE_FLOAT) { - auto vec_base = record.get_field_data_base(fieldId); + auto vec_base = record.get_data_base(fieldId); indexing->AppendSegmentIndexSparse( reserved_offset, size, diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 4eb253487bc5d..f250e0449044c 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include #include @@ -293,6 +294,65 @@ class OffsetOrderedArray : public OffsetMap { std::vector> array_; }; +class ThreadSafeValidData { + public: + explicit ThreadSafeValidData() = default; + explicit ThreadSafeValidData(FixedVector data) + : data_(std::move(data)) { + } + + void + set_data_raw(const std::vector& datas) { + std::unique_lock lck(mutex_); + auto total = 0; + for (auto& field_data : datas) { + total += field_data->get_num_rows(); + } + if (length_ + total > data_.size()) { + data_.reserve(length_ + total); + } + length_ += total; + for (auto& field_data : datas) { + auto num_row = field_data->get_num_rows(); + for (size_t i = 0; i < num_row; i++) { + data_.push_back(field_data->is_valid(i)); + } + } + } + + void + set_data_raw(size_t num_rows, + const DataArray* data, + const FieldMeta& field_meta) { + std::unique_lock lck(mutex_); + if (field_meta.is_nullable()) { + if (length_ + num_rows > data_.size()) { + data_.reserve(length_ + num_rows); + } + + auto src = data->valid_data().data(); + for (size_t i = 0; i < num_rows; ++i) { + data_.push_back(src[i]); + // data_[length_ + i] = src[i]; + } + length_ += num_rows; + } + } + + bool + is_valid(size_t offset) { + std::shared_lock lck(mutex_); + Assert(offset < length_); + return data_[offset]; + } + + private: + mutable std::shared_mutex mutex_{}; + FixedVector data_; + // number of actual elements + size_t length_{0}; +}; + template struct InsertRecord { InsertRecord( @@ -306,7 +366,7 @@ struct InsertRecord { auto field_id = field.first; auto& field_meta = field.second; if (field_meta.is_nullable()) { - this->append_valid_data(field_id, size_per_chunk); + this->append_valid_data(field_id); } if (pk2offset_ == nullptr && pk_field_id.has_value() && pk_field_id.value() == field_id) { @@ -340,28 +400,28 @@ struct InsertRecord { } if (field_meta.is_vector()) { if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) { - this->append_field_data( + this->append_data( field_id, field_meta.get_dim(), size_per_chunk); continue; } else if (field_meta.get_data_type() == DataType::VECTOR_BINARY) { - this->append_field_data( + this->append_data( field_id, field_meta.get_dim(), size_per_chunk); continue; } else if (field_meta.get_data_type() == DataType::VECTOR_FLOAT16) { - this->append_field_data( + this->append_data( field_id, field_meta.get_dim(), size_per_chunk); continue; } else if (field_meta.get_data_type() == DataType::VECTOR_BFLOAT16) { - this->append_field_data( + this->append_data( field_id, field_meta.get_dim(), size_per_chunk); continue; } else if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { - this->append_field_data(field_id, - size_per_chunk); + this->append_data(field_id, + size_per_chunk); continue; } else { PanicInfo(DataTypeInvalid, @@ -371,44 +431,43 @@ struct InsertRecord { } switch (field_meta.get_data_type()) { case DataType::BOOL: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::INT8: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::INT16: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::INT32: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::INT64: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::FLOAT: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::DOUBLE: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::VARCHAR: { - this->append_field_data(field_id, - size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::JSON: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } case DataType::ARRAY: { - this->append_field_data(field_id, size_per_chunk); + this->append_data(field_id, size_per_chunk); break; } default: { @@ -535,23 +594,22 @@ struct InsertRecord { pk2offset_->seal(); } - // get field data without knowing the type + // get data without knowing the type VectorBase* - get_field_data_base(FieldId field_id) const { - AssertInfo(fields_data_.find(field_id) != fields_data_.end(), + get_data_base(FieldId field_id) const { + AssertInfo(data_.find(field_id) != data_.end(), "Cannot find field_data with field_id: " + std::to_string(field_id.get())); - AssertInfo( - fields_data_.at(field_id) != nullptr, - "fields_data_ at i is null" + std::to_string(field_id.get())); - return fields_data_.at(field_id).get(); + AssertInfo(data_.at(field_id) != nullptr, + "data_ at i is null" + std::to_string(field_id.get())); + return data_.at(field_id).get(); } // get field data in given type, const version template const ConcurrentVector* - get_field_data(FieldId field_id) const { - auto base_ptr = get_field_data_base(field_id); + get_data(FieldId field_id) const { + auto base_ptr = get_data_base(field_id); auto ptr = dynamic_cast*>(base_ptr); Assert(ptr); return ptr; @@ -560,21 +618,21 @@ struct InsertRecord { // get field data in given type, non-const version template ConcurrentVector* - get_field_data(FieldId field_id) { - auto base_ptr = get_field_data_base(field_id); + get_data(FieldId field_id) { + auto base_ptr = get_data_base(field_id); auto ptr = dynamic_cast*>(base_ptr); Assert(ptr); return ptr; } - ConcurrentValidDataVector* + ThreadSafeValidData* get_valid_data(FieldId field_id) const { AssertInfo(valid_data_.find(field_id) != valid_data_.end(), "Cannot find valid_data with field_id: " + std::to_string(field_id.get())); - auto ptr = valid_data_.at(field_id).get(); - Assert(ptr); - return ptr; + AssertInfo(valid_data_.at(field_id) != nullptr, + "valid_data_ at i is null" + std::to_string(field_id.get())); + return valid_data_.at(field_id).get(); } bool @@ -585,34 +643,32 @@ struct InsertRecord { // append a column of scalar or sparse float vector type template void - append_field_data(FieldId field_id, int64_t size_per_chunk) { + append_data(FieldId field_id, int64_t size_per_chunk) { static_assert(IsScalar || IsSparse); - fields_data_.emplace(field_id, - std::make_unique>( - size_per_chunk, mmap_descriptor_)); + data_.emplace(field_id, + std::make_unique>( + size_per_chunk, mmap_descriptor_)); } // append a column of scalar type void - append_valid_data(FieldId field_id, int64_t size_per_chunk) { - valid_data_.emplace( - field_id, - std::make_unique(size_per_chunk)); + append_valid_data(FieldId field_id) { + valid_data_.emplace(field_id, std::make_unique()); } // append a column of vector type template void - append_field_data(FieldId field_id, int64_t dim, int64_t size_per_chunk) { + append_data(FieldId field_id, int64_t dim, int64_t size_per_chunk) { static_assert(std::is_base_of_v); - fields_data_.emplace(field_id, - std::make_unique>( - dim, size_per_chunk, mmap_descriptor_)); + data_.emplace(field_id, + std::make_unique>( + dim, size_per_chunk, mmap_descriptor_)); } void - drop_data(FieldId field_id) { - fields_data_.erase(field_id); + drop_field_data(FieldId field_id) { + data_.erase(field_id); valid_data_.erase(field_id); } @@ -633,7 +689,7 @@ struct InsertRecord { ack_responder_.clear(); timestamp_index_ = TimestampIndex(); pk2offset_->clear(); - fields_data_.clear(); + data_.clear(); } bool @@ -655,8 +711,8 @@ struct InsertRecord { std::unique_ptr pk2offset_; private: - std::unordered_map> fields_data_{}; - std::unordered_map> + std::unordered_map> data_{}; + std::unordered_map> valid_data_{}; mutable std::shared_mutex shared_mutex_{}; storage::MmapChunkDescriptorPtr mmap_descriptor_; diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 011c8e9402719..06aabaec418a9 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -73,11 +73,11 @@ SegmentGrowingImpl::try_remove_chunks(FieldId fieldId) { if (indexing_record_.SyncDataWithIndex(fieldId)) { VectorBase* vec_data_base = dynamic_cast*>( - insert_record_.get_field_data_base(fieldId)); + insert_record_.get_data_base(fieldId)); if (!vec_data_base) { vec_data_base = dynamic_cast*>( - insert_record_.get_field_data_base(fieldId)); + insert_record_.get_data_base(fieldId)); } if (vec_data_base && vec_data_base->num_chunk() > 0 && chunk_mutex_.try_lock()) { @@ -121,14 +121,13 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset, fmt::format("can't find field {}", field_id.get())); auto data_offset = field_id_to_offset[field_id]; if (!indexing_record_.SyncDataWithIndex(field_id)) { - insert_record_.get_field_data_base(field_id)->set_data_raw( + insert_record_.get_data_base(field_id)->set_data_raw( reserved_offset, num_rows, &insert_record_proto->fields_data(data_offset), field_meta); - if (insert_record_.is_valid_data_exist(field_id)) { + if (field_meta.is_nullable()) { insert_record_.get_valid_data(field_id)->set_data_raw( - reserved_offset, num_rows, &insert_record_proto->fields_data(data_offset), field_meta); @@ -234,11 +233,11 @@ SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) { } if (!indexing_record_.SyncDataWithIndex(field_id)) { - insert_record_.get_field_data_base(field_id)->set_data_raw( + insert_record_.get_data_base(field_id)->set_data_raw( reserved_offset, field_data); if (insert_record_.is_valid_data_exist(field_id)) { insert_record_.get_valid_data(field_id)->set_data_raw( - reserved_offset, field_data); + field_data); } } if (segcore_config_.get_enable_interim_segment_index()) { @@ -326,7 +325,7 @@ SegmentGrowingImpl::LoadFieldDataV2(const LoadFieldDataInfo& infos) { } if (!indexing_record_.SyncDataWithIndex(field_id)) { - insert_record_.get_field_data_base(field_id)->set_data_raw( + insert_record_.get_data_base(field_id)->set_data_raw( reserved_offset, field_data); } if (segcore_config_.get_enable_interim_segment_index()) { @@ -428,7 +427,7 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { SpanBase SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { - auto vec = get_insert_record().get_field_data_base(field_id); + auto vec = get_insert_record().get_data_base(field_id); return vec->get_span_base(chunk_id); } @@ -465,7 +464,7 @@ std::unique_ptr SegmentGrowingImpl::bulk_subscript(FieldId field_id, const int64_t* seg_offsets, int64_t count) const { - auto vec_ptr = insert_record_.get_field_data_base(field_id); + auto vec_ptr = insert_record_.get_data_base(field_id); auto& field_meta = schema_->operator[](field_id); if (field_meta.is_vector()) { auto result = CreateVectorDataArray(count, field_meta); @@ -525,10 +524,9 @@ SegmentGrowingImpl::bulk_subscript(FieldId field_id, if (field_meta.is_nullable()) { auto valid_data_ptr = insert_record_.get_valid_data(field_id); auto res = result->mutable_valid_data()->mutable_data(); - auto& valid_data = *valid_data_ptr; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; - res[i] = valid_data[offset]; + res[i] = valid_data_ptr->is_valid(offset); } } switch (field_meta.get_data_type()) { diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 455b01b67d6e4..6c6dfdebeb680 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -426,7 +426,12 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { auto rawValue = field_data->RawValue(i); auto array = static_cast(rawValue); - var_column->Append(*array); + if (field_data->IsNullable()) { + var_column->Append(*array, + field_data->is_valid(i)); + } else { + var_column->Append(*array); + } // we stores the offset for each array element, so there is a additional uint64_t for each array element field_data_size = @@ -532,18 +537,19 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { uint64_t total_written = 0; std::vector indices{}; std::vector> element_indices{}; + FixedVector valid_data{}; while (data.channel->pop(field_data)) { WriteFieldData(file, data_type, field_data, total_written, indices, - element_indices); + element_indices, + valid_data); } WriteFieldPadding(file, data_type, total_written); - - auto num_rows = data.row_count; std::shared_ptr column{}; + auto num_rows = data.row_count; if (IsVariableDataType(data_type)) { switch (data_type) { case milvus::DataType::STRING: @@ -586,6 +592,8 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { column = std::make_shared(file, total_written, field_meta); } + column->SetValidData(std::move(valid_data)); + { std::unique_lock lck(mutex_); fields_.emplace(field_id, column); @@ -694,7 +702,7 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { auto& field_data = it->second; return field_data->Span(); } - auto field_data = insert_record_.get_field_data_base(field_id); + auto field_data = insert_record_.get_data_base(field_id); AssertInfo(field_data->num_chunk() == 1, "num chunk not equal to 1 for sealed segment"); return field_data->get_span_base(0); @@ -1236,12 +1244,9 @@ SegmentSealedImpl::get_raw_data(FieldId field_id, auto ret = fill_with_empty(field_id, count); if (column->IsNullable()) { auto dst = ret->mutable_valid_data()->mutable_data(); - // auto valid_data = std::make_unique(count); - for (size_t i = 0; i < count; ++i) { + for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; - auto bit = - (column->ValidData()[offset >> 3] >> ((offset & 0x07))) & 1; - dst[i] = bit; + dst[i] = column->IsValid(offset); } } switch (field_meta.get_data_type()) { diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index 863d8a67a1370..49d79529845ad 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -244,7 +244,7 @@ BaseEventData::Serialize() { ++offset) { auto str = static_cast( field_data->RawValue(offset)); - auto size = field_data->is_null(offset) ? -1 : str->size(); + auto size = field_data->is_valid(offset) ? str->size() : -1; payload_writer->add_one_string_payload(str->c_str(), size); } break; @@ -256,7 +256,7 @@ BaseEventData::Serialize() { static_cast(field_data->RawValue(offset)); auto array_string = array->output_data().SerializeAsString(); auto size = - field_data->is_null(offset) ? -1 : array_string.size(); + field_data->is_valid(offset) ? array_string.size() : -1; payload_writer->add_one_binary_payload( reinterpret_cast(array_string.c_str()), diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 86e244a513788..badfa00719610 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -79,8 +79,9 @@ std::map ReadAheadPolicy_Map = { std::vector genValidIter(const uint8_t* valid_data, int length) { std::vector valid_data_; + valid_data_.reserve(length); for (size_t i = 0; i < length; ++i) { - auto bit = (valid_data[i >> 3] >> ((i & 0x07))) & 1; + auto bit = (valid_data[i >> 3] >> (i & 0x07)) & 1; valid_data_.push_back(bit); } return valid_data_; diff --git a/internal/core/unittest/test_growing_index.cpp b/internal/core/unittest/test_growing_index.cpp index 7d619182b650d..eb8edcfde683d 100644 --- a/internal/core/unittest/test_growing_index.cpp +++ b/internal/core/unittest/test_growing_index.cpp @@ -150,10 +150,10 @@ TEST_P(GrowingIndexTest, Correctness) { const VectorBase* field_data = nullptr; if (is_sparse) { field_data = segmentImplPtr->get_insert_record() - .get_field_data(vec); + .get_data(vec); } else { field_data = segmentImplPtr->get_insert_record() - .get_field_data(vec); + .get_data(vec); } auto inserted = (i + 1) * per_batch; diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index f8d3cc59e8777..b97e8c89ec1f6 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -76,7 +76,7 @@ TEST(Util, GetDeleteBitmap) { } auto insert_offset = insert_record.reserved.fetch_add(N); insert_record.timestamps_.set_data_raw(insert_offset, tss.data(), N); - auto field_data = insert_record.get_field_data_base(i64_fid); + auto field_data = insert_record.get_data_base(i64_fid); field_data->set_data_raw(insert_offset, age_data.data(), N); insert_record.ack_responder_.AddSegment(insert_offset, insert_offset + N); diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 201dfc8bcbb6f..6e4faa28b1394 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -333,9 +333,8 @@ inline GeneratedData DataGen(SchemaPtr schema, auto insert_data = std::make_unique(); auto insert_cols = [&insert_data]( auto& data, int64_t count, auto& field_meta) { - auto nullable = field_meta.is_nullable(); FixedVector valid_data(count); - if (nullable) { + if (field_meta.is_nullable()) { for (int i = 0; i < count; ++i) { valid_data[i] = i % 2 == 0 ? true : false; } @@ -985,19 +984,19 @@ CreateFieldDataFromDataArray(ssize_t raw_count, int64_t dim) { field_data = storage::CreateFieldData(data_type, true, dim); int byteSize = (raw_count + 7) / 8; - auto valid_data = std::make_unique(byteSize); - auto valid_data_ptr = valid_data.get(); + uint8_t* valid_data = new uint8_t[byteSize]; for (int i = 0; i < raw_count; i++) { bool value = raw_valid_data[i]; int byteIndex = i / 8; int bitIndex = i % 8; if (value) { - valid_data_ptr[byteIndex] |= (1 << bitIndex); + valid_data[byteIndex] |= (1 << bitIndex); } else { - valid_data_ptr[byteIndex] &= ~(1 << bitIndex); + valid_data[byteIndex] &= ~(1 << bitIndex); } } - field_data->FillFieldData(raw_data, valid_data.get(), raw_count); + field_data->FillFieldData(raw_data, valid_data, raw_count); + delete[] valid_data; }; if (field_meta.is_vector()) {