Skip to content

Commit

Permalink
[SDK] test: add unit test for list_jobs method of the training_client (
Browse files Browse the repository at this point in the history
…kubeflow#2267)

Signed-off-by: wei-chenglai <[email protected]>
Signed-off-by: sailesh duddupudi <[email protected]>
  • Loading branch information
seanlaii authored and saileshd1402 committed Dec 2, 2024
1 parent dc6a517 commit c0b64e0
Showing 1 changed file with 118 additions and 0 deletions.
118 changes: 118 additions & 0 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ def get_namespaced_custom_object_response(*args, **kwargs):
return mock_thread


def list_namespaced_custom_object_response(*args, **kwargs):
if args[2] == TIMEOUT:
raise multiprocessing.TimeoutError()
elif args[2] == RUNTIME:
raise RuntimeError()
elif args[2] == "empty-namespace":
mock_response = {"items": []}
elif args[2] == "multi-jobs":
mock_response = {
"items": [
serialize_k8s_object(generate_job_with_status(create_job())),
serialize_k8s_object(generate_job_with_status(create_job())),
]
}
else:
mock_response = {
"items": [serialize_k8s_object(generate_job_with_status(create_job()))]
}

mock_thread = Mock()
mock_thread.get.return_value = mock_response
return mock_thread


def list_namespaced_pod_response(*args, **kwargs):
class MockResponse:
def get(self, timeout):
Expand Down Expand Up @@ -482,6 +506,74 @@ def __init__(self, kind) -> None:
),
]

test_data_list_jobs = [
(
"valid flow with default namespace and default timeout",
{},
SUCCESS,
[generate_job_with_status(create_job())],
),
(
"valid flow with all parameters set",
{
"namespace": TEST_NAME,
"job_kind": constants.PYTORCHJOB_KIND,
"timeout": 120,
},
SUCCESS,
[generate_job_with_status(create_job())],
),
(
"valid flow with empty job list",
{
"namespace": "empty-namespace",
},
SUCCESS,
[],
),
(
"valid flow with multiple jobs",
{
"namespace": "multi-jobs",
},
SUCCESS,
[
generate_job_with_status(create_job()),
generate_job_with_status(create_job()),
],
),
(
"invalid flow with default namespace and a Job that doesn't exist",
{"job_kind": constants.TFJOB_KIND},
RuntimeError,
None,
),
(
"invalid flow with incorrect parameter",
{"test": "example"},
TypeError,
None,
),
(
"invalid flow with incorrect job_kind value",
{"job_kind": "FailJob"},
ValueError,
None,
),
(
"runtime error case",
{"namespace": RUNTIME},
RuntimeError,
None,
),
(
"timeout error case",
{"namespace": TIMEOUT},
TimeoutError,
None,
),
]


test_data_delete_job = [
(
Expand Down Expand Up @@ -854,6 +946,9 @@ def training_client():
get_namespaced_custom_object=Mock(
side_effect=get_namespaced_custom_object_response
),
list_namespaced_custom_object=Mock(
side_effect=list_namespaced_custom_object_response
),
),
), patch(
"kubernetes.client.CoreV1Api",
Expand Down Expand Up @@ -1109,3 +1204,26 @@ def test_is_job_succeeded(training_client, test_name, kwargs, expected_output):
assert type(e) is expected_output

print("test execution complete")


@pytest.mark.parametrize(
"test_name,kwargs,expected_status,expected_response", test_data_list_jobs
)
def test_list_jobs(
training_client, test_name, kwargs, expected_status, expected_response
):
"""
test list_jobs function of training client
"""
print("Executing test: ", test_name)
try:
resp = training_client.list_jobs(**kwargs)
assert expected_status == SUCCESS
assert isinstance(resp, list)
assert len(resp) == len(expected_response)
for actual_job, expected_job in zip(resp, expected_response):
assert actual_job.to_dict() == expected_job.to_dict()
except Exception as e:
assert type(e) is expected_status

print("test execution complete")

0 comments on commit c0b64e0

Please sign in to comment.