diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 3164325468fe0..598cda58e841d 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -414,7 +414,7 @@ def __init__(self, collection_name=None, partition_name=None, shards_num=2, dim= self.insert_data(nb=constants.ENTITIES_FOR_SEARCH, partition_name=self.p_name) log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s") - self.initial_entities = self.c_wrap.num_entities # do as a flush + self.initial_entities = self.c_wrap.collection.num_entities self.scale = 100000 # timestamp scale to make time.time() as int64 def insert_data(self, nb=constants.DELTA_PER_INS, partition_name=None): @@ -759,8 +759,7 @@ class InsertFlushChecker(Checker): def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None): super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self._flush = flush - self.initial_entities = self.c_wrap.num_entities - + self.initial_entities = self.c_wrap.collection.num_entities def keep_running(self): while True: t0 = time.time() @@ -803,17 +802,12 @@ def __init__(self, collection_name=None, shards_num=2, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("FlushChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) - self.initial_entities = self.c_wrap.num_entities + self.initial_entities = self.c_wrap.collection.num_entities @trace() def flush(self): - num_entities = self.c_wrap.num_entities - if num_entities >= (self.initial_entities + constants.DELTA_PER_INS): - result = True - self.initial_entities += constants.DELTA_PER_INS - else: - result = False - return num_entities, result + res, result = self.c_wrap.flush() + return res, result @exception_handler() def run_task(self): @@ -839,7 +833,7 @@ def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None) collection_name = cf.gen_unique_str("InsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self._flush = flush - self.initial_entities = self.c_wrap.num_entities + self.initial_entities = self.c_wrap.collection.num_entities self.inserted_data = [] self.scale = 1 * 10 ** 6 self.start_time_stamp = int(time.time() * self.scale) # us @@ -917,7 +911,7 @@ def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None) collection_name = cf.gen_unique_str("InsertChecker_") super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self._flush = flush - self.initial_entities = self.c_wrap.num_entities + self.initial_entities = self.c_wrap.collection.num_entities self.inserted_data = [] self.scale = 1 * 10 ** 6 self.start_time_stamp = int(time.time() * self.scale) # us diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 56b50a4dbdfbc..b47d6301085f2 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -80,6 +80,72 @@ def prepare_param_info(self, host, port, handler, replica_num, user, password, s param_info = ParamInfo() +en_vocabularies_distribution = { + "hello": 0.01, + "milvus": 0.01, + "vector": 0.01, + "database": 0.01 +} + +zh_vocabularies_distribution = { + "你好": 0.01, + "向量": 0.01, + "数据": 0.01, + "库": 0.01 +} + +def patch_faker_text(fake_instance, vocabularies_distribution): + """ + Monkey patch the text() method of a Faker instance to include custom vocabulary. + Each word in vocabularies_distribution has an independent chance to be inserted. + + Args: + fake_instance: Faker instance to patch + vocabularies_distribution: Dictionary where: + - key: word to insert + - value: probability (0-1) of inserting this word into each sentence + + Example: + vocabularies_distribution = { + "hello": 0.1, # 10% chance to insert "hello" in each sentence + "milvus": 0.1, # 10% chance to insert "milvus" in each sentence + } + """ + original_text = fake_instance.text + + def new_text(nb_sentences=100, *args, **kwargs): + sentences = [] + # Split original text into sentences + original_sentences = original_text(nb_sentences).split('.') + original_sentences = [s.strip() for s in original_sentences if s.strip()] + + for base_sentence in original_sentences: + words = base_sentence.split() + + # Independently decide whether to insert each word + for word, probability in vocabularies_distribution.items(): + if random.random() < probability: + # Choose random position to insert the word + insert_pos = random.randint(0, len(words)) + words.insert(insert_pos, word) + + # Reconstruct the sentence + base_sentence = ' '.join(words) + + # Ensure proper capitalization + base_sentence = base_sentence[0].upper() + base_sentence[1:] + sentences.append(base_sentence) + + return '. '.join(sentences) + '.' + + + + # Replace the original text method with our custom one + fake_instance.text = new_text + + + + def get_bm25_ground_truth(corpus, queries, top_k=100, language="en"): """ @@ -147,6 +213,14 @@ def blank_space_split(text): ) return tokenizer +def manual_check_text_match(df, word, col): + id_list = [] + for i in range(len(df)): + row = df.iloc[i] + # log.info(f"word :{word}, row: {row[col]}") + if word in row[col]: + id_list.append(row["id"]) + return id_list def analyze_documents(texts, language="en"): @@ -188,8 +262,8 @@ def check_token_overlap(text_a, text_b, language="en"): def split_dataframes(df, fields, language="en"): df_copy = df.copy() - tokenizer = custom_tokenizer(language) for col in fields: + tokenizer = custom_tokenizer(language) texts = df[col].to_list() tokenized = tokenizer.tokenize(texts, return_as="tuple") new_texts = [] diff --git a/tests/python_client/testcases/test_full_text_search.py b/tests/python_client/testcases/test_full_text_search.py index 608d547690d90..c54933c7e7373 100644 --- a/tests/python_client/testcases/test_full_text_search.py +++ b/tests/python_client/testcases/test_full_text_search.py @@ -15,6 +15,11 @@ Faker.seed(19530) fake_en = Faker("en_US") fake_zh = Faker("zh_CN") + +# patch faker to generate text with specific distribution +cf.patch_faker_text(fake_en, cf.en_vocabularies_distribution) +cf.patch_faker_text(fake_zh, cf.zh_vocabularies_distribution) + pd.set_option("expand_frame_repr", False) prefix = "full_text_search_collection" @@ -2214,6 +2219,7 @@ def test_full_text_search_default( if i + batch_size < len(df) else data[i: len(df)] ) + collection_w.flush() collection_w.create_index( "emb", {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 16, "efConstruction": 500}}, @@ -2429,9 +2435,10 @@ def test_full_text_search_with_jieba_tokenizer( collection_w.create_index("text", {"index_type": "INVERTED"}) collection_w.load() limit = 100 - search_data = [fake.text().lower() + " " + random.choice(tokens) for _ in range(nq)] + token = random.choice(tokens) + search_data = [fake.text().lower() + " " + token for _ in range(nq)] if expr == "text_match": - filter = f"text_match(text, '{tokens[0]}')" + filter = f"text_match(text, '{token}')" res, _ = collection_w.query( expr=filter, ) @@ -2488,7 +2495,7 @@ def test_full_text_search_with_jieba_tokenizer( result_text = r.text # verify search result satisfies the filter if expr == "text_match": - assert tokens[0] in result_text + assert token in result_text if expr == "id_range": assert _id < data_size // 2 # verify search result has overlap with search text @@ -2497,7 +2504,6 @@ def test_full_text_search_with_jieba_tokenizer( assert len( overlap) > 0, f"query text: {search_text}, \ntext: {result_text} \n overlap: {overlap} \n word freq a: {word_freq_a} \n word freq b: {word_freq_b}\n result: {r}" - @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("nq", [2]) @pytest.mark.parametrize("empty_percent", [0]) diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index cb7257efe9c37..dc1e6622c2e62 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -29,6 +29,11 @@ fake_en = Faker("en_US") fake_zh = Faker("zh_CN") fake_de = Faker("de_DE") + +# patch faker to generate text with specific distribution +cf.patch_faker_text(fake_en, cf.en_vocabularies_distribution) +cf.patch_faker_text(fake_zh, cf.zh_vocabularies_distribution) + pd.set_option("expand_frame_repr", False) @@ -4436,8 +4441,8 @@ class TestQueryTextMatch(TestcaseBase): @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("enable_partition_key", [True, False]) @pytest.mark.parametrize("enable_inverted_index", [True, False]) - @pytest.mark.parametrize("tokenizer", ["jieba", "default"]) - def test_query_text_match_normal( + @pytest.mark.parametrize("tokenizer", ["default"]) + def test_query_text_match_en_normal( self, tokenizer, enable_inverted_index, enable_partition_key ): """ @@ -4569,6 +4574,145 @@ def test_query_text_match_normal( for r in res: assert any([token in r[field] for token in top_10_tokens]) + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("enable_partition_key", [True, False]) + @pytest.mark.parametrize("enable_inverted_index", [True, False]) + @pytest.mark.parametrize("tokenizer", ["jieba"]) + @pytest.mark.xfail(reason="unstable") + def test_query_text_match_zh_normal( + self, tokenizer, enable_inverted_index, enable_partition_key + ): + """ + target: test text match normal + method: 1. enable text match and insert data with varchar + 2. get the most common words and query with text match + 3. verify the result + expected: text match successfully and result is correct + """ + tokenizer_params = { + "tokenizer": tokenizer, + } + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + is_partition_key=enable_partition_key, + tokenizer_params=tokenizer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + tokenizer_params=tokenizer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + tokenizer_params=tokenizer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + tokenizer_params=tokenizer_params, + ), + FieldSchema(name="emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 3000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + fake = fake_en + if tokenizer == "jieba": + language = "zh" + fake = fake_zh + else: + language = "en" + + data = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "emb": [random.random() for _ in range(dim)], + } + for i in range(data_size) + ] + df = pd.DataFrame(data) + log.info(f"dataframe\n{df}") + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i: i + batch_size] + if i + batch_size < len(df) + else data[i: len(df)] + ) + # only if the collection is flushed, the inverted index ca be applied. + # growing segment may be not applied, although in strong consistency. + collection_w.flush() + collection_w.create_index( + "emb", + {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}, + ) + if enable_inverted_index: + collection_w.create_index("word", {"index_type": "INVERTED"}) + collection_w.load() + # analyze the croup + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + # query single field for one token + for field in text_fields: + token = wf_map[field].most_common()[0][0] + expr = f"text_match({field}, '{token}')" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + assert len(res) > 0 + log.info(f"res len {len(res)}") + for r in res: + assert token in r[field] + + # verify inverted index + if enable_inverted_index: + if field == "word": + expr = f"{field} == '{token}'" + log.info(f"expr: {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + for r in res: + assert r[field] == token + # query single field for multi-word + for field in text_fields: + # match top 10 most common words + top_10_tokens = [] + for word, count in wf_map[field].most_common(10): + top_10_tokens.append(word) + string_of_top_10_words = " ".join(top_10_tokens) + expr = f"text_match({field}, '{string_of_top_10_words}')" + log.info(f"expr {expr}") + res, _ = collection_w.query(expr=expr, output_fields=["id", field]) + log.info(f"res len {len(res)}") + for r in res: + assert any([token in r[field] for token in top_10_tokens]) + + + @pytest.mark.skip("unimplemented") @pytest.mark.tags(CaseLabel.L0) def test_query_text_match_custom_analyzer(self): @@ -4787,6 +4931,7 @@ def test_query_text_match_with_combined_expression_for_single_field(self): wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) df_new = cf.split_dataframes(df, fields=text_fields) + log.info(f"df \n{df}") log.info(f"new df \n{df_new}") for field in text_fields: expr_list = [] @@ -4796,16 +4941,15 @@ def test_query_text_match_with_combined_expression_for_single_field(self): tmp = f"text_match({field}, '{word}')" log.info(f"tmp expr {tmp}") expr_list.append(tmp) - manual_result = df_new[ - df_new.apply(lambda row: word in row[field], axis=1) - ] - tmp_res = set(manual_result["id"].tolist()) - log.info(f"manual check result for {tmp} {len(manual_result)}") + tmp_res = cf.manual_check_text_match(df_new, word, field) + log.info(f"manual check result for {tmp} {len(tmp_res)}") pd_tmp_res_list.append(tmp_res) + log.info(f"manual res {len(pd_tmp_res_list)}, {pd_tmp_res_list}") final_res = set(pd_tmp_res_list[0]) for i in range(1, len(pd_tmp_res_list)): final_res = final_res.intersection(set(pd_tmp_res_list[i])) log.info(f"intersection res {len(final_res)}") + log.info(f"final res {final_res}") and_expr = " and ".join(expr_list) log.info(f"expr: {and_expr}") res, _ = collection_w.query(expr=and_expr, output_fields=text_fields) diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index 634a83f16774d..ee21195a10a8c 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -29,6 +29,11 @@ Faker.seed(19530) fake_en = Faker("en_US") fake_zh = Faker("zh_CN") + +# patch faker to generate text with specific distribution +cf.patch_faker_text(fake_en, cf.en_vocabularies_distribution) +cf.patch_faker_text(fake_zh, cf.zh_vocabularies_distribution) + pd.set_option("expand_frame_repr", False) @@ -13285,8 +13290,8 @@ class TestSearchWithTextMatchFilter(TestcaseBase): @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("enable_partition_key", [True, False]) @pytest.mark.parametrize("enable_inverted_index", [True, False]) - @pytest.mark.parametrize("tokenizer", ["jieba", "default"]) - def test_search_with_text_match_filter_normal( + @pytest.mark.parametrize("tokenizer", ["default"]) + def test_search_with_text_match_filter_normal_en( self, tokenizer, enable_inverted_index, enable_partition_key ): """ @@ -13442,3 +13447,163 @@ def test_search_with_text_match_filter_normal( r = r.to_dict() assert any([token in r["entity"][field] for token in top_10_tokens]) + @pytest.mark.tags(CaseLabel.L0) + @pytest.mark.parametrize("enable_partition_key", [True, False]) + @pytest.mark.parametrize("enable_inverted_index", [True, False]) + @pytest.mark.parametrize("tokenizer", ["jieba"]) + @pytest.mark.xfail(reason="unstable case") + def test_search_with_text_match_filter_normal_zh( + self, tokenizer, enable_inverted_index, enable_partition_key + ): + """ + target: test text match normal + method: 1. enable text match and insert data with varchar + 2. get the most common words and query with text match + 3. verify the result + expected: text match successfully and result is correct + """ + tokenizer_params = { + "tokenizer": tokenizer, + } + dim = 128 + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), + FieldSchema( + name="word", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + is_partition_key=enable_partition_key, + tokenizer_params=tokenizer_params, + ), + FieldSchema( + name="sentence", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + tokenizer_params=tokenizer_params, + ), + FieldSchema( + name="paragraph", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + tokenizer_params=tokenizer_params, + ), + FieldSchema( + name="text", + dtype=DataType.VARCHAR, + max_length=65535, + enable_tokenizer=True, + enable_match=True, + tokenizer_params=tokenizer_params, + ), + FieldSchema(name="float32_emb", dtype=DataType.FLOAT_VECTOR, dim=dim), + FieldSchema(name="sparse_emb", dtype=DataType.SPARSE_FLOAT_VECTOR), + ] + schema = CollectionSchema(fields=fields, description="test collection") + data_size = 5000 + collection_w = self.init_collection_wrap( + name=cf.gen_unique_str(prefix), schema=schema + ) + log.info(f"collection {collection_w.describe()}") + fake = fake_en + if tokenizer == "jieba": + language = "zh" + fake = fake_zh + else: + language = "en" + + data = [ + { + "id": i, + "word": fake.word().lower(), + "sentence": fake.sentence().lower(), + "paragraph": fake.paragraph().lower(), + "text": fake.text().lower(), + "float32_emb": [random.random() for _ in range(dim)], + "sparse_emb": cf.gen_sparse_vectors(1, dim=10000)[0], + } + for i in range(data_size) + ] + df = pd.DataFrame(data) + log.info(f"dataframe\n{df}") + batch_size = 5000 + for i in range(0, len(df), batch_size): + collection_w.insert( + data[i : i + batch_size] + if i + batch_size < len(df) + else data[i : len(df)] + ) + collection_w.flush() + collection_w.create_index( + "float32_emb", + {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 16, "efConstruction": 500}}, + ) + collection_w.create_index( + "sparse_emb", + {"index_type": "SPARSE_INVERTED_INDEX", "metric_type": "IP"}, + ) + if enable_inverted_index: + collection_w.create_index("word", {"index_type": "INVERTED"}) + collection_w.load() + # analyze the croup + text_fields = ["word", "sentence", "paragraph", "text"] + wf_map = {} + for field in text_fields: + wf_map[field] = cf.analyze_documents(df[field].tolist(), language=language) + # search with filter single field for one token + df_split = cf.split_dataframes(df, text_fields, language=language) + log.info(f"df_split\n{df_split}") + for ann_field in ["float32_emb", "sparse_emb"]: + log.info(f"ann_field {ann_field}") + if ann_field == "float32_emb": + search_data = [[random.random() for _ in range(dim)]] + elif ann_field == "sparse_emb": + search_data = cf.gen_sparse_vectors(1,dim=10000) + else: + search_data = [[random.random() for _ in range(dim)]] + for field in text_fields: + token = wf_map[field].most_common()[0][0] + expr = f"text_match({field}, '{token}')" + manual_result = df_split[ + df_split.apply(lambda row: token in row[field], axis=1) + ] + log.info(f"expr: {expr}, manual_check_result: {len(manual_result)}") + res_list, _ = collection_w.search( + data=search_data, + anns_field=ann_field, + param={}, + limit=100, + expr=expr, output_fields=["id", field]) + for res in res_list: + log.info(f"res len {len(res)} res {res}") + assert len(res) > 0 + for r in res: + r = r.to_dict() + assert token in r["entity"][field] + + # search with filter single field for multi-token + for field in text_fields: + # match top 10 most common words + top_10_tokens = [] + for word, count in wf_map[field].most_common(10): + top_10_tokens.append(word) + string_of_top_10_words = " ".join(top_10_tokens) + expr = f"text_match({field}, '{string_of_top_10_words}')" + log.info(f"expr {expr}") + res_list, _ = collection_w.search( + data=search_data, + anns_field=ann_field, + param={}, + limit=100, + expr=expr, output_fields=["id", field]) + for res in res_list: + log.info(f"res len {len(res)} res {res}") + assert len(res) > 0 + for r in res: + r = r.to_dict() + assert any([token in r["entity"][field] for token in top_10_tokens]) \ No newline at end of file diff --git a/tests/restful_client_v2/base/testbase.py b/tests/restful_client_v2/base/testbase.py index d4556aefeb5bb..85762fb338aad 100644 --- a/tests/restful_client_v2/base/testbase.py +++ b/tests/restful_client_v2/base/testbase.py @@ -3,7 +3,7 @@ import pytest import time import uuid -from pymilvus import connections, db +from pymilvus import connections, db, MilvusClient from utils.util_log import test_log as logger from api.milvus import (VectorClient, CollectionClient, PartitionClient, IndexClient, AliasClient, UserClient, RoleClient, ImportJobClient, StorageClient, Requests) @@ -33,6 +33,7 @@ class Base: role_client = None import_job_client = None storage_client = None + milvus_client = None class TestBase(Base): @@ -171,5 +172,11 @@ def update_database(self, db_name="default"): self.vector_client.db_name = db_name self.import_job_client.db_name = db_name - - + def wait_load_completed(self, collection_name, db_name="default", timeout=60): + t0 = time.time() + while True and time.time() - t0 < timeout: + rsp = self.collection_client.collection_describe(collection_name, db_name=db_name) + if "data" in rsp and "load" in rsp["data"] and rsp["data"]["load"] == "LoadStateLoaded": + break + else: + time.sleep(5) diff --git a/tests/restful_client_v2/testcases/test_jobs_operation.py b/tests/restful_client_v2/testcases/test_jobs_operation.py index 9cbe9b96887f2..f51f07ab26c6a 100644 --- a/tests/restful_client_v2/testcases/test_jobs_operation.py +++ b/tests/restful_client_v2/testcases/test_jobs_operation.py @@ -6,7 +6,7 @@ from pathlib import Path import pandas as pd import numpy as np -from pymilvus import Collection +from pymilvus import Collection, utility from utils.utils import gen_collection_name from utils.util_log import test_log as logger import pytest @@ -50,8 +50,8 @@ def test_job_e2e(self, insert_num, import_task_num, auto_id, is_partition_key, e }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) - + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage data = [] for i in range(insert_num): @@ -100,7 +100,7 @@ def test_job_e2e(self, insert_num, import_task_num, auto_id, is_partition_key, e if time.time() - t0 > IMPORT_TIMEOUT: assert False, "import job timeout" c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) res = c.query( expr="", output_fields=["count(*)"], @@ -140,8 +140,8 @@ def test_import_job_with_db(self, insert_num, import_task_num, auto_id, is_parti }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) - + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage data = [] for i in range(insert_num): @@ -190,7 +190,7 @@ def test_import_job_with_db(self, insert_num, import_task_num, auto_id, is_parti if time.time() - t0 > IMPORT_TIMEOUT: assert False, "import job timeout" c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) res = c.query( expr="", output_fields=["count(*)"], @@ -229,7 +229,8 @@ def test_import_job_with_partition(self, insert_num, import_task_num, auto_id, i }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage data = [] @@ -282,7 +283,7 @@ def test_import_job_with_partition(self, insert_num, import_task_num, auto_id, i if time.time() - t0 > IMPORT_TIMEOUT: assert False, "import job timeout" c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) res = c.query( expr="", output_fields=["count(*)"], @@ -321,7 +322,8 @@ def test_job_import_multi_json_file(self): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage file_nums = 2 @@ -373,7 +375,7 @@ def test_job_import_multi_json_file(self): time.sleep(10) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == 2000 # assert import data can be queried payload = { @@ -402,7 +404,8 @@ def test_job_import_multi_parquet_file(self): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage file_nums = 2 @@ -454,7 +457,7 @@ def test_job_import_multi_parquet_file(self): time.sleep(10) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == 2000 # assert import data can be queried payload = { @@ -483,7 +486,8 @@ def test_job_import_multi_numpy_file(self): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage file_nums = 2 @@ -540,7 +544,7 @@ def test_job_import_multi_numpy_file(self): time.sleep(10) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == 2000 # assert import data can be queried payload = { @@ -569,7 +573,8 @@ def test_job_import_multi_file_type(self): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage file_nums = 2 @@ -665,7 +670,7 @@ def test_job_import_multi_file_type(self): time.sleep(10) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == 6000 # assert import data can be queried payload = { @@ -722,8 +727,8 @@ def test_job_import_binlog_file_type(self, nb, dim, insert_round, auto_id, {"fieldName": "image_emb", "indexName": "image_emb", "metricType": "L2"} ] } - rsp = self.collection_client.collection_create(payload) - assert rsp['code'] == 0 + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # create restore collection restore_collection_name = f"{name}_restore" payload["collectionName"] = restore_collection_name @@ -848,7 +853,8 @@ def test_job_import_recovery_after_chaos(self, release_name): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage file_nums = 10 @@ -916,7 +922,7 @@ def test_job_import_recovery_after_chaos(self, release_name): rsp = self.import_job_client.list_import_jobs(payload) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == file_nums * batch_size # assert import data can be queried payload = { @@ -948,7 +954,8 @@ def test_job_import_with_multi_task_and_datanode(self, release_name): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage task_num = 48 @@ -1009,7 +1016,7 @@ def test_job_import_with_multi_task_and_datanode(self, release_name): rsp = self.import_job_client.list_import_jobs(payload) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == file_nums * batch_size * task_num # assert import data can be queried payload = { @@ -1038,7 +1045,8 @@ def test_job_import_with_extremely_large_task_num(self, release_name): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage task_num = 1000 @@ -1099,7 +1107,7 @@ def test_job_import_with_extremely_large_task_num(self, release_name): rsp = self.import_job_client.list_import_jobs(payload) # assert data count c = Collection(name) - c.load(_refresh=True) + c.load(_refresh=True, timeou=120) assert c.num_entities == file_nums * batch_size * task_num # assert import data can be queried payload = { @@ -1140,7 +1148,8 @@ def test_create_import_job_with_json_dup_dynamic_key(self, insert_num, import_ta }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # upload file to storage data = [] @@ -1211,7 +1220,8 @@ def test_import_job_with_empty_files(self): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # create import job payload = { @@ -1265,7 +1275,8 @@ def test_import_job_with_non_exist_binlog_files(self): }, "indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}] } - rsp = self.collection_client.collection_create(payload) + self.collection_client.collection_create(payload) + self.wait_load_completed(name) # create import job payload = { @@ -1520,8 +1531,8 @@ def test_get_job_progress_with_mismatch_db_name(self, insert_num, import_task_nu if time.time() - t0 > IMPORT_TIMEOUT: assert False, "import job timeout" c = Collection(name) - c.load(_refresh=True) time.sleep(10) + c.load(_refresh=True, timeou=120) res = c.query( expr="", output_fields=["count(*)"],