Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: increase timeout for refresh load #37409

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions tests/restful_client_v2/base/testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest
import time
import uuid
from pymilvus import connections, db
from pymilvus import connections, db, MilvusClient
from utils.util_log import test_log as logger
from api.milvus import (VectorClient, CollectionClient, PartitionClient, IndexClient, AliasClient,
UserClient, RoleClient, ImportJobClient, StorageClient, Requests)
Expand Down Expand Up @@ -33,6 +33,7 @@ class Base:
role_client = None
import_job_client = None
storage_client = None
milvus_client = None


class TestBase(Base):
Expand Down Expand Up @@ -171,5 +172,11 @@ def update_database(self, db_name="default"):
self.vector_client.db_name = db_name
self.import_job_client.db_name = db_name



def wait_load_completed(self, collection_name, db_name="default", timeout=60):
t0 = time.time()
while True and time.time() - t0 < timeout:
rsp = self.collection_client.collection_describe(collection_name, db_name=db_name)
if "data" in rsp and "load" in rsp["data"] and rsp["data"]["load"] == "LoadStateLoaded":
break
else:
time.sleep(5)
69 changes: 40 additions & 29 deletions tests/restful_client_v2/testcases/test_jobs_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path
import pandas as pd
import numpy as np
from pymilvus import Collection
from pymilvus import Collection, utility
from utils.utils import gen_collection_name
from utils.util_log import test_log as logger
import pytest
Expand Down Expand Up @@ -50,8 +50,8 @@ def test_job_e2e(self, insert_num, import_task_num, auto_id, is_partition_key, e
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)

self.collection_client.collection_create(payload)
self.wait_load_completed(name)
# upload file to storage
data = []
for i in range(insert_num):
Expand Down Expand Up @@ -100,7 +100,7 @@ def test_job_e2e(self, insert_num, import_task_num, auto_id, is_partition_key, e
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
res = c.query(
expr="",
output_fields=["count(*)"],
Expand Down Expand Up @@ -140,8 +140,8 @@ def test_import_job_with_db(self, insert_num, import_task_num, auto_id, is_parti
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)

self.collection_client.collection_create(payload)
self.wait_load_completed(name)
# upload file to storage
data = []
for i in range(insert_num):
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_import_job_with_db(self, insert_num, import_task_num, auto_id, is_parti
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
res = c.query(
expr="",
output_fields=["count(*)"],
Expand Down Expand Up @@ -229,7 +229,8 @@ def test_import_job_with_partition(self, insert_num, import_task_num, auto_id, i
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
data = []
Expand Down Expand Up @@ -282,7 +283,7 @@ def test_import_job_with_partition(self, insert_num, import_task_num, auto_id, i
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
res = c.query(
expr="",
output_fields=["count(*)"],
Expand Down Expand Up @@ -321,7 +322,8 @@ def test_job_import_multi_json_file(self):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
file_nums = 2
Expand Down Expand Up @@ -373,7 +375,7 @@ def test_job_import_multi_json_file(self):
time.sleep(10)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == 2000
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -402,7 +404,8 @@ def test_job_import_multi_parquet_file(self):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
file_nums = 2
Expand Down Expand Up @@ -454,7 +457,7 @@ def test_job_import_multi_parquet_file(self):
time.sleep(10)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == 2000
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -483,7 +486,8 @@ def test_job_import_multi_numpy_file(self):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
file_nums = 2
Expand Down Expand Up @@ -540,7 +544,7 @@ def test_job_import_multi_numpy_file(self):
time.sleep(10)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == 2000
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -569,7 +573,8 @@ def test_job_import_multi_file_type(self):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
file_nums = 2
Expand Down Expand Up @@ -665,7 +670,7 @@ def test_job_import_multi_file_type(self):
time.sleep(10)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == 6000
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -722,8 +727,8 @@ def test_job_import_binlog_file_type(self, nb, dim, insert_round, auto_id,
{"fieldName": "image_emb", "indexName": "image_emb", "metricType": "L2"}
]
}
rsp = self.collection_client.collection_create(payload)
assert rsp['code'] == 0
self.collection_client.collection_create(payload)
self.wait_load_completed(name)
# create restore collection
restore_collection_name = f"{name}_restore"
payload["collectionName"] = restore_collection_name
Expand Down Expand Up @@ -848,7 +853,8 @@ def test_job_import_recovery_after_chaos(self, release_name):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
file_nums = 10
Expand Down Expand Up @@ -916,7 +922,7 @@ def test_job_import_recovery_after_chaos(self, release_name):
rsp = self.import_job_client.list_import_jobs(payload)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == file_nums * batch_size
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -948,7 +954,8 @@ def test_job_import_with_multi_task_and_datanode(self, release_name):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
task_num = 48
Expand Down Expand Up @@ -1009,7 +1016,7 @@ def test_job_import_with_multi_task_and_datanode(self, release_name):
rsp = self.import_job_client.list_import_jobs(payload)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == file_nums * batch_size * task_num
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -1038,7 +1045,8 @@ def test_job_import_with_extremely_large_task_num(self, release_name):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
task_num = 1000
Expand Down Expand Up @@ -1099,7 +1107,7 @@ def test_job_import_with_extremely_large_task_num(self, release_name):
rsp = self.import_job_client.list_import_jobs(payload)
# assert data count
c = Collection(name)
c.load(_refresh=True)
c.load(_refresh=True, timeou=120)
assert c.num_entities == file_nums * batch_size * task_num
# assert import data can be queried
payload = {
Expand Down Expand Up @@ -1140,7 +1148,8 @@ def test_create_import_job_with_json_dup_dynamic_key(self, insert_num, import_ta
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# upload file to storage
data = []
Expand Down Expand Up @@ -1211,7 +1220,8 @@ def test_import_job_with_empty_files(self):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# create import job
payload = {
Expand Down Expand Up @@ -1265,7 +1275,8 @@ def test_import_job_with_non_exist_binlog_files(self):
},
"indexParams": [{"fieldName": "book_intro", "indexName": "book_intro_vector", "metricType": "L2"}]
}
rsp = self.collection_client.collection_create(payload)
self.collection_client.collection_create(payload)
self.wait_load_completed(name)

# create import job
payload = {
Expand Down Expand Up @@ -1520,8 +1531,8 @@ def test_get_job_progress_with_mismatch_db_name(self, insert_num, import_task_nu
if time.time() - t0 > IMPORT_TIMEOUT:
assert False, "import job timeout"
c = Collection(name)
c.load(_refresh=True)
time.sleep(10)
c.load(_refresh=True, timeou=120)
res = c.query(
expr="",
output_fields=["count(*)"],
Expand Down