Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Training operator SDK unit test #1938

Merged
merged 6 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/test-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
- pull_request

jobs:
test:
python-test:
name: Test
runs-on: ubuntu-latest

Expand All @@ -21,3 +21,9 @@ jobs:
version: 23.9.1
options: --check --exclude '/*kubeflow_org_v1*|__init__.py|api_client.py|configuration.py|exceptions.py|rest.py'
src: sdk/

- name: Install dependencies
run: pip install pytest python-dateutil urllib3 kubernetes

- name: Run unit test for training sdk
run: pytest ./sdk/python/kubeflow/training/api/training_client_test.py
2 changes: 1 addition & 1 deletion hack/python-sdk/gen-sdk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ echo "Generating swagger file ..."
go run "${repo_root}"/hack/swagger/main.go ${VERSION} >"${SWAGGER_CODEGEN_FILE}"

echo "Removing previously generated files ..."
rm -rf "${SDK_OUTPUT_PATH}"/docs/V1*.md "${SDK_OUTPUT_PATH}"/kubeflow/training/models "${SDK_OUTPUT_PATH}"/kubeflow/training/*.py "${SDK_OUTPUT_PATH}"/test/test_*.py
rm -rf "${SDK_OUTPUT_PATH}"/docs/KubeflowOrgV1*.md "${SDK_OUTPUT_PATH}"/kubeflow/training/models "${SDK_OUTPUT_PATH}"/kubeflow/training/*.py "${SDK_OUTPUT_PATH}"/test/test_*.py
deepanker13 marked this conversation as resolved.
Show resolved Hide resolved
echo "Generating Python SDK for Training Operator ..."
java -jar "${SWAGGER_CODEGEN_JAR}" generate -i "${repo_root}"/hack/python-sdk/swagger.json -g python -o "${SDK_OUTPUT_PATH}" -c "${SWAGGER_CODEGEN_CONF}"

Expand Down
194 changes: 194 additions & 0 deletions sdk/python/kubeflow/training/api/training_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import multiprocessing
import pytest
from unittest.mock import patch, Mock

from typing import Optional
from kubeflow.training import TrainingClient
from kubeflow.training import KubeflowOrgV1ReplicaSpec
from kubeflow.training import KubeflowOrgV1PyTorchJob
from kubeflow.training import KubeflowOrgV1PyTorchJobSpec
from kubeflow.training import KubeflowOrgV1RunPolicy
from kubeflow.training import KubeflowOrgV1SchedulingPolicy
from kubeflow.training import constants

from kubernetes.client import V1PodTemplateSpec
from kubernetes.client import V1ObjectMeta
from kubernetes.client import V1PodSpec
from kubernetes.client import V1Container
from kubernetes.client import V1ResourceRequirements


def create_namespaced_custom_object_response(*args, **kwargs):
if args[2] == "timeout":
raise multiprocessing.TimeoutError()
elif args[2] == "runtime":
raise RuntimeError()


def generate_container() -> V1Container:
return V1Container(
name="pytorch",
image="gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0",
args=["--backend", "gloo"],
resources=V1ResourceRequirements(limits={"memory": "1Gi", "cpu": "0.4"}),
)


def generate_pytorchjob(
job_namespace: str,
master: KubeflowOrgV1ReplicaSpec,
worker: KubeflowOrgV1ReplicaSpec,
scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None,
) -> KubeflowOrgV1PyTorchJob:
return KubeflowOrgV1PyTorchJob(
api_version=constants.API_VERSION,
kind=constants.PYTORCHJOB_KIND,
metadata=V1ObjectMeta(name="pytorchjob-mnist-ci-test", namespace=job_namespace),
spec=KubeflowOrgV1PyTorchJobSpec(
run_policy=KubeflowOrgV1RunPolicy(
clean_pod_policy="None",
scheduling_policy=scheduling_policy,
),
pytorch_replica_specs={"Master": master, "Worker": worker},
),
)


def create_job():
job_namespace = "test"
container = generate_container()
master = KubeflowOrgV1ReplicaSpec(
replicas=1,
restart_policy="OnFailure",
template=V1PodTemplateSpec(
metadata=V1ObjectMeta(
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(containers=[container]),
),
)

worker = KubeflowOrgV1ReplicaSpec(
replicas=1,
restart_policy="OnFailure",
template=V1PodTemplateSpec(
metadata=V1ObjectMeta(
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(containers=[container]),
),
)
pytorchjob = generate_pytorchjob(job_namespace, master, worker)
return pytorchjob


class DummyJobClass:
def __init__(self, kind) -> None:
self.kind = kind


test_data = [
(
"invalid extra parameter",
{"job": create_job(), "namespace": "test", "base_image": "test_image"},
ValueError,
),
("invalid job kind", {"job_kind": "invalid_job_kind"}, ValueError),
(
"job name missing ",
{"train_func": lambda: "test train function"},
ValueError,
),
("job name missing", {"base_image": "test_image"}, ValueError),
(
"uncallable train function",
{"name": "test job", "train_func": "uncallable train function"},
ValueError,
),
(
"invalid TFJob replica",
{
"name": "test job",
"train_func": lambda: "test train function",
"job_kind": constants.TFJOB_KIND,
},
ValueError,
),
(
"invalid PyTorchJob replica",
{
"name": "test job",
"train_func": lambda: "test train function",
"job_kind": constants.PYTORCHJOB_KIND,
},
ValueError,
),
(
"invalid pod template spec parameters",
{
"name": "test job",
"train_func": lambda: "test train function",
"job_kind": constants.MXJOB_KIND,
},
KeyError,
),
(
"paddle job can't be created using function",
{
"name": "test job",
"train_func": lambda: "test train function",
"job_kind": constants.PADDLEJOB_KIND,
},
ValueError,
),
(
"invalid job object",
{"job": DummyJobClass(constants.TFJOB_KIND)},
ValueError,
),
(
"create_namespaced_custom_object timeout error",
{"job": create_job(), "namespace": "timeout"},
TimeoutError,
),
(
"create_namespaced_custom_object runtime error",
{"job": create_job(), "namespace": "runtime"},
RuntimeError,
),
(
"valid flow",
{"job": create_job(), "namespace": "test"},
"success",
),
]


@pytest.fixture
def training_client():
with patch(
"kubernetes.client.CustomObjectsApi",
return_value=Mock(
create_namespaced_custom_object=Mock(
side_effect=create_namespaced_custom_object_response
)
),
), patch("kubernetes.client.CoreV1Api", return_value=Mock()), patch(
"kubernetes.config.load_kube_config", return_value=Mock()
):
client = TrainingClient(job_kind=constants.PYTORCHJOB_KIND)
yield client


@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data)
def test_create_job(training_client, test_name, kwargs, expected_output):
"""
test create_job function of training client
"""
print("Executing test:", test_name)
try:
training_client.create_job(**kwargs)
assert expected_output == "success"
except Exception as e:
assert type(e) == expected_output
print("test execution complete")