Skip to content

Commit

Permalink
fix: schema->size() check logic with system field (#28802)
Browse files Browse the repository at this point in the history
Now segcore load system field info as well, the growing segment
assertion shall not pass with "+ 2" value
This will cause all growing segments load failure
Fix #28801
Related to #28478
See also #28524

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 29, 2023
1 parent f5f4f08 commit 1dc0864
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ SegmentGrowingImpl::Insert(int64_t reserved_offset,
void
SegmentGrowingImpl::LoadFieldData(const LoadFieldDataInfo& infos) {
// schema don't include system field
AssertInfo(infos.field_infos.size() == schema_->size() + 2,
AssertInfo(infos.field_infos.size() == schema_->size(),
"lost some field data when load for growing segment");
AssertInfo(infos.field_infos.find(TimestampFieldID.get()) !=
infos.field_infos.end(),
Expand Down
2 changes: 2 additions & 0 deletions internal/core/unittest/test_c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3823,6 +3823,8 @@ TEST(CApiTest, SealedSegment_Update_Field_Size) {

TEST(CApiTest, GrowingSegment_Load_Field_Data) {
auto schema = std::make_shared<Schema>();
schema->AddField(FieldName("RowID"), FieldId(0), DataType::INT64);
schema->AddField(FieldName("Timestamp"), FieldId(1), DataType::INT64);
auto str_fid = schema->AddDebugField("string", DataType::VARCHAR);
auto vec_fid = schema->AddDebugField(
"vector_float", DataType::VECTOR_FLOAT, DIM, "L2");
Expand Down
32 changes: 13 additions & 19 deletions internal/querynodev2/segments/mock_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func genVectorFieldSchema(param vecFieldParam) *schemapb.FieldSchema {
}

func GenTestCollectionSchema(collectionName string, pkType schemapb.DataType) *schemapb.CollectionSchema {
fieldRowID := genConstantFieldSchema(rowIDField)
fieldTimestamp := genConstantFieldSchema(timestampField)
fieldBool := genConstantFieldSchema(simpleBoolField)
fieldInt8 := genConstantFieldSchema(simpleInt8Field)
fieldInt16 := genConstantFieldSchema(simpleInt16Field)
Expand Down Expand Up @@ -285,6 +287,7 @@ func GenTestCollectionSchema(collectionName string, pkType schemapb.DataType) *s
for i, field := range schema.GetFields() {
field.FieldID = 100 + int64(i)
}
schema.Fields = append(schema.Fields, fieldRowID, fieldTimestamp)
return &schema
}

Expand Down Expand Up @@ -656,6 +659,7 @@ func SaveBinLog(ctx context.Context,
msgLength,
schema)
if err != nil {
log.Warn("getStorageBlob return error", zap.Error(err))
return nil, nil, err
}

Expand All @@ -672,7 +676,6 @@ func SaveBinLog(ctx context.Context,
}

k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
// key := path.Join(defaultLocalStorage, "insert-log", k)
key := path.Join(chunkManager.RootPath(), "insert-log", k)
kvs[key] = blob.Value
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
Expand All @@ -695,7 +698,6 @@ func SaveBinLog(ctx context.Context,
}

k := JoinIDPath(collectionID, partitionID, segmentID, fieldID)
// key := path.Join(defaultLocalStorage, "stats-log", k)
key := path.Join(chunkManager.RootPath(), "stats-log", k)
kvs[key] = blob.Value[:]
statsBinlog = append(statsBinlog, &datapb.FieldBinlog{
Expand All @@ -715,13 +717,7 @@ func genStorageBlob(collectionID int64,
msgLength int,
schema *schemapb.CollectionSchema,
) ([]*storage.Blob, []*storage.Blob, error) {
tmpSchema := &schemapb.CollectionSchema{
Name: schema.Name,
AutoID: schema.AutoID,
Fields: []*schemapb.FieldSchema{genConstantFieldSchema(rowIDField), genConstantFieldSchema(timestampField)},
}
tmpSchema.Fields = append(tmpSchema.Fields, schema.Fields...)
collMeta := genCollectionMeta(collectionID, partitionID, tmpSchema)
collMeta := genCollectionMeta(collectionID, partitionID, schema)
inCodec := storage.NewInsertCodecWithSchema(collMeta)
insertData, err := genInsertData(msgLength, schema)
if err != nil {
Expand Down Expand Up @@ -755,15 +751,6 @@ func genInsertData(msgLength int, schema *schemapb.CollectionSchema) (*storage.I
Data: make(map[int64]storage.FieldData),
}

// set data for rowID field
insertData.Data[rowIDFieldID] = &storage.Int64FieldData{
Data: generateInt64Array(msgLength),
}
// set data for ts field
insertData.Data[timestampFieldID] = &storage.Int64FieldData{
Data: genTimestampFieldData(msgLength),
}

for _, f := range schema.Fields {
switch f.DataType {
case schemapb.DataType_Bool:
Expand Down Expand Up @@ -829,7 +816,14 @@ func genInsertData(msgLength int, schema *schemapb.CollectionSchema) (*storage.I
return nil, err
}
}

// set data for rowID field
insertData.Data[rowIDFieldID] = &storage.Int64FieldData{
Data: generateInt64Array(msgLength),
}
// set data for ts field
insertData.Data[timestampFieldID] = &storage.Int64FieldData{
Data: genTimestampFieldData(msgLength),
}
return insertData, nil
}

Expand Down

0 comments on commit 1dc0864

Please sign in to comment.