Skip to content

Commit

Permalink
test: add sparse vector datatype for import test (milvus-io#33166)
Browse files Browse the repository at this point in the history
Signed-off-by: zhuwenxing <[email protected]>
  • Loading branch information
zhuwenxing authored May 23, 2024
1 parent 3bec2c4 commit 6c18611
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 8 deletions.
38 changes: 30 additions & 8 deletions tests/python_client/common/bulk_insert_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class DataField:
pk_field = "uid"
vec_field = "vectors"
float_vec_field = "float_vectors"
sparse_vec_field = "sparse_vectors"
image_float_vec_field = "image_float_vec_field"
text_float_vec_field = "text_float_vec_field"
binary_vec_field = "binary_vec_field"
Expand Down Expand Up @@ -473,7 +474,22 @@ def gen_vectors(float_vector, rows, dim):
return vectors


def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None):
def gen_sparse_vectors(rows, sparse_format="dok"):
# default sparse format is dok, dict of keys
# another option is coo, coordinate List

rng = np.random.default_rng()
vectors = [{
d: rng.random() for d in random.sample(range(1000), random.randint(20, 30))
} for _ in range(rows)]
if sparse_format == "coo":
vectors = [
{"indices": list(x.keys()), "values": list(x.values())} for x in vectors
]
return vectors


def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128, array_length=None, sparse_format="dok"):
if array_length is None:
array_length = random.randint(0, 10)

Expand All @@ -483,6 +499,9 @@ def gen_data_by_data_field(data_field, rows, start=0, float_vector=True, dim=128
if "float" in data_field:
data = gen_vectors(float_vector=True, rows=rows, dim=dim)
data = pd.Series([np.array(x, dtype=np.dtype("float32")) for x in data])
elif "sparse" in data_field:
data = gen_sparse_vectors(rows, sparse_format=sparse_format)
data = pd.Series([json.dumps(x) for x in data], dtype=np.dtype("str"))
elif "fp16" in data_field:
data = gen_fp16_vectors(rows, dim)[1]
data = pd.Series([np.array(x, dtype=np.dtype("uint8")) for x in data])
Expand Down Expand Up @@ -596,7 +615,7 @@ def gen_json_files(is_row_based, rows, dim, auto_id, str_pk,
return files


def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False):
def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, dim=128, array_length=None, enable_dynamic_field=False, **kwargs):
data = []
for r in range(rows):
d = {}
Expand All @@ -605,6 +624,9 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
if "float" in data_field:
float_vector = True
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
if "sparse" in data_field:
sparse_format = kwargs.get("sparse_format", "dok")
d[data_field] = gen_sparse_vectors(1, sparse_format=sparse_format)[0]
if "binary" in data_field:
float_vector = False
d[data_field] = gen_vectors(float_vector=float_vector, rows=1, dim=dim)[0]
Expand Down Expand Up @@ -647,15 +669,15 @@ def gen_dict_data_by_data_field(data_fields, rows, start=0, float_vector=True, d
return data


def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False):
def gen_new_json_files(float_vector, rows, dim, data_fields, file_nums=1, array_length=None, file_size=None, err_type="", enable_dynamic_field=False, **kwargs):
files = []
if file_size is not None:
rows = 5000
start_uid = 0
for i in range(file_nums):
file_name = f"data-fields-{len(data_fields)}-rows-{rows}-dim-{dim}-file-num-{i}-{int(time.time())}.json"
file = f"{data_source}/{file_name}"
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field)
data = gen_dict_data_by_data_field(data_fields=data_fields, rows=rows, start=start_uid, float_vector=float_vector, dim=dim, array_length=array_length, enable_dynamic_field=enable_dynamic_field, **kwargs)
# log.info(f"data: {data}")
with open(file, "w") as f:
json.dump(data, f)
Expand Down Expand Up @@ -762,7 +784,7 @@ def gen_dynamic_field_data_in_parquet_file(rows, start=0):
return data


def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True):
def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_group_size=None, file_nums=1, array_length=None, err_type="", enable_dynamic_field=False, include_meta=True, sparse_format="doc"):
# gen numpy files
if err_type == "":
err_type = "none"
Expand All @@ -775,7 +797,7 @@ def gen_parquet_files(float_vector, rows, dim, data_fields, file_size=None, row_
all_field_data = {}
for data_field in data_fields:
data = gen_data_by_data_field(data_field=data_field, rows=rows, start=0,
float_vector=float_vector, dim=dim, array_length=array_length)
float_vector=float_vector, dim=dim, array_length=array_length, sparse_format=sparse_format)
all_field_data[data_field] = data
if enable_dynamic_field and include_meta:
all_field_data["$meta"] = gen_dynamic_field_data_in_parquet_file(rows=rows, start=0)
Expand Down Expand Up @@ -948,7 +970,7 @@ def prepare_bulk_insert_numpy_files(minio_endpoint="", bucket_name="milvus-bucke


def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-bucket", rows=100, dim=128, array_length=None, file_size=None, row_group_size=None,
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True):
enable_dynamic_field=False, data_fields=[DataField.vec_field], float_vector=True, file_nums=1, force=False, include_meta=True, sparse_format="doc"):
"""
Generate column based files based on params in parquet format and copy them to the minio
Note: each field in data_fields would be generated one parquet file.
Expand Down Expand Up @@ -980,7 +1002,7 @@ def prepare_bulk_insert_parquet_files(minio_endpoint="", bucket_name="milvus-buc
"""
files = gen_parquet_files(rows=rows, dim=dim, float_vector=float_vector, enable_dynamic_field=enable_dynamic_field,
data_fields=data_fields, array_length=array_length, file_size=file_size, row_group_size=row_group_size,
file_nums=file_nums, include_meta=include_meta)
file_nums=file_nums, include_meta=include_meta, sparse_format=sparse_format)
copy_files_to_minio(host=minio_endpoint, r_source=data_source, files=files, bucket_name=bucket_name, force=force)
return files

Expand Down
242 changes: 242 additions & 0 deletions tests/python_client/testcases/test_bulk_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,248 @@ def test_bulk_insert_all_field_with_parquet(self, auto_id, dim, entities, enable
assert len(res) == len(query_data)
if enable_partition_key:
assert len(self.collection_wrap.partitions) > 1

@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("include_meta", [True, False])
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
def test_bulk_insert_sparse_vector_with_parquet(self, auto_id, dim, entities, enable_dynamic_field, include_meta, sparse_format):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify
"""
if enable_dynamic_field is False and include_meta is True:
pytest.skip("include_meta only works with enable_dynamic_field")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_sparse_vec_field(name=df.sparse_vec_field),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_parquet_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
include_meta=include_meta,
sparse_format=sparse_format
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)

# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in sparse_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_sparse_inverted_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
for field_name in float_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
assert "name" in fields_from_search
assert "address" in fields_from_search
search_data = cf.gen_sparse_vectors(1, dim)
search_params = ct.default_sparse_search_params
for field_name in sparse_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
assert "name" in fields_from_search
assert "address" in fields_from_search


@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True, False])
@pytest.mark.parametrize("dim", [128]) # 128
@pytest.mark.parametrize("entities", [1000]) # 1000
@pytest.mark.parametrize("enable_dynamic_field", [True, False])
@pytest.mark.parametrize("include_meta", [True, False])
@pytest.mark.parametrize("sparse_format", ["doc", "coo"])
def test_bulk_insert_sparse_vector_with_json(self, auto_id, dim, entities, enable_dynamic_field, include_meta, sparse_format):
"""
collection schema 1: [pk, int64, float64, string float_vector]
data file: vectors.parquet and uid.parquet,
Steps:
1. create collection
2. import data
3. verify
"""
if enable_dynamic_field is False and include_meta is True:
pytest.skip("include_meta only works with enable_dynamic_field")
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_int64_field(name=df.int_field),
cf.gen_float_field(name=df.float_field),
cf.gen_string_field(name=df.string_field),
cf.gen_json_field(name=df.json_field),
cf.gen_array_field(name=df.array_int_field, element_type=DataType.INT64),
cf.gen_array_field(name=df.array_float_field, element_type=DataType.FLOAT),
cf.gen_array_field(name=df.array_string_field, element_type=DataType.VARCHAR, max_length=100),
cf.gen_array_field(name=df.array_bool_field, element_type=DataType.BOOL),
cf.gen_float_vec_field(name=df.float_vec_field, dim=dim),
cf.gen_sparse_vec_field(name=df.sparse_vec_field),
]
data_fields = [f.name for f in fields if not f.to_dict().get("auto_id", False)]
files = prepare_bulk_insert_new_json_files(
minio_endpoint=self.minio_endpoint,
bucket_name=self.bucket_name,
rows=entities,
dim=dim,
data_fields=data_fields,
enable_dynamic_field=enable_dynamic_field,
force=True,
include_meta=include_meta,
sparse_format=sparse_format
)
self._connect()
c_name = cf.gen_unique_str("bulk_insert")
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=enable_dynamic_field)
self.collection_wrap.init_collection(c_name, schema=schema)

# import data
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=300
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states:{states}")
assert success
num_entities = self.collection_wrap.num_entities
log.info(f" collection entities: {num_entities}")
assert num_entities == entities
# verify imported data is available for search
index_params = ct.default_index
float_vec_fields = [f.name for f in fields if "vec" in f.name and "float" in f.name]
sparse_vec_fields = [f.name for f in fields if "vec" in f.name and "sparse" in f.name]
for f in float_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=index_params
)
for f in sparse_vec_fields:
self.collection_wrap.create_index(
field_name=f, index_params=ct.default_sparse_inverted_index
)
self.collection_wrap.load()
log.info(f"wait for load finished and be ready for search")
time.sleep(2)
# log.info(f"query seg info: {self.utility_wrap.get_query_segment_info(c_name)[0]}")
search_data = cf.gen_vectors(1, dim)
search_params = ct.default_search_params
for field_name in float_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
assert "name" in fields_from_search
assert "address" in fields_from_search
search_data = cf.gen_sparse_vectors(1, dim)
search_params = ct.default_sparse_search_params
for field_name in sparse_vec_fields:
res, _ = self.collection_wrap.search(
search_data,
field_name,
param=search_params,
limit=1,
output_fields=["*"],
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
for hit in res:
for r in hit:
fields_from_search = r.fields.keys()
for f in fields:
assert f.name in fields_from_search
if enable_dynamic_field and include_meta:
assert "name" in fields_from_search
assert "address" in fields_from_search


@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("auto_id", [True])
Expand Down

0 comments on commit 6c18611

Please sign in to comment.