Skip to content

Commit

Permalink
Print Job info when E2E fails
Browse files Browse the repository at this point in the history
  • Loading branch information
andreyvelich committed Sep 14, 2023
1 parent aaa6b45 commit fea6efb
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 117 deletions.
35 changes: 24 additions & 11 deletions sdk/python/test/e2e/test_e2e_mpijob.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@
from kubeflow.training import KubeflowOrgV1SchedulingPolicy
from kubeflow.training.constants import constants

from test.e2e.utils import (
verify_job_e2e,
verify_unschedulable_job_e2e,
get_pod_spec_scheduler_name,
)
import test.e2e.utils as utils
from test.e2e.constants import TEST_GANG_SCHEDULER_NAME_ENV_KEY
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

Expand Down Expand Up @@ -64,7 +60,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
),
spec=V1PodSpec(
containers=[launcher_container],
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
),
),
)
Expand All @@ -78,7 +74,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
),
spec=V1PodSpec(
containers=[worker_container],
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
),
),
)
Expand All @@ -94,15 +90,26 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
try:
utils.verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"MPIJob E2E fails. Exception: {e}")

TRAINING_CLIENT.update_job(patched_mpijob, JOB_NAME, job_namespace)
logging.info(f"List of updated {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"MPIJob E2E fails. Exception: {e}")

TRAINING_CLIENT.delete_job(JOB_NAME)
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


@pytest.mark.skipif(
Expand Down Expand Up @@ -140,8 +147,14 @@ def test_sdk_e2e(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"MPIJob E2E fails. Exception: {e}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


Expand Down
37 changes: 26 additions & 11 deletions sdk/python/test/e2e/test_e2e_mxjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@
from kubeflow.training import KubeflowOrgV1SchedulingPolicy
from kubeflow.training.constants import constants

from test.e2e.utils import (
verify_job_e2e,
verify_unschedulable_job_e2e,
get_pod_spec_scheduler_name,
)
import test.e2e.utils as utils
from test.e2e.constants import TEST_GANG_SCHEDULER_NAME_ENV_KEY
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

Expand Down Expand Up @@ -65,7 +61,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
),
spec=V1PodSpec(
containers=[worker_container],
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
),
),
)
Expand All @@ -79,7 +75,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
),
spec=V1PodSpec(
containers=[server_container],
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
),
),
)
Expand All @@ -93,7 +89,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
),
spec=V1PodSpec(
containers=[scheduler_container],
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
),
),
)
Expand All @@ -117,13 +113,26 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
try:
utils.verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"MXJob E2E fails. Exception: {e}")

TRAINING_CLIENT.update_job(schedulable_mxjob, JOB_NAME, job_namespace)
logging.info(f"List of updated {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"MXJob E2E fails. Exception: {e}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)

Expand Down Expand Up @@ -174,8 +183,14 @@ def test_sdk_e2e(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"MXJob E2E fails. Exception: {e}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


Expand Down
31 changes: 22 additions & 9 deletions sdk/python/test/e2e/test_e2e_paddlejob.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@
from kubeflow.training import KubeflowOrgV1SchedulingPolicy
from kubeflow.training.constants import constants

from test.e2e.utils import (
verify_job_e2e,
verify_unschedulable_job_e2e,
get_pod_spec_scheduler_name,
)
import test.e2e.utils as utils
from test.e2e.constants import TEST_GANG_SCHEDULER_NAME_ENV_KEY
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

Expand Down Expand Up @@ -63,7 +59,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
containers=[container],
),
),
Expand All @@ -80,14 +76,25 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
try:
utils.verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PaddleJob E2E fails. Exception: {e}")

TRAINING_CLIENT.update_job(schedulable_paddlejob, JOB_NAME, job_namespace)
logging.info(f"List of updated {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PaddleJob E2E fails. Exception: {e}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


Expand Down Expand Up @@ -115,8 +122,14 @@ def test_sdk_e2e(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PaddleJob E2E fails. Exception: {e}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


Expand Down
65 changes: 23 additions & 42 deletions sdk/python/test/e2e/test_e2e_pytorchjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@
from kubeflow.training import KubeflowOrgV1SchedulingPolicy
from kubeflow.training import constants

from test.e2e.utils import (
verify_job_e2e,
verify_unschedulable_job_e2e,
get_pod_spec_scheduler_name,
)
import test.e2e.utils as utils
from test.e2e.constants import TEST_GANG_SCHEDULER_NAME_ENV_KEY
from test.e2e.constants import GANG_SCHEDULERS, NONE_GANG_SCHEDULERS

Expand Down Expand Up @@ -63,7 +59,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
containers=[container],
),
),
Expand All @@ -77,7 +73,7 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
annotations={constants.ISTIO_SIDECAR_INJECTION: "false"}
),
spec=V1PodSpec(
scheduler_name=get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
scheduler_name=utils.get_pod_spec_scheduler_name(GANG_SCHEDULER_NAME),
containers=[container],
),
),
Expand All @@ -97,14 +93,25 @@ def test_sdk_e2e_with_gang_scheduling(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
try:
utils.verify_unschedulable_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PyTorchJob E2E fails. Exception: {e}")

TRAINING_CLIENT.update_job(schedulable_pytorchjob, JOB_NAME, job_namespace)
logging.info(f"List of updated {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PyTorchJob E2E fails. Exception: {e}")

utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


Expand Down Expand Up @@ -143,40 +150,14 @@ def test_sdk_e2e(job_namespace):
logging.info(f"List of created {TRAINING_CLIENT.job_kind}s")
logging.info(TRAINING_CLIENT.list_jobs(job_namespace))

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)

TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


def test_pytorchjob_from_func(job_namespace):
# Test Training function.
def train_func(parameters):
import pandas as pd
import time

print(f"Package pandas=={pd.__version__} is installed")
print(f"Input function parameters are: {parameters}")

print("Stat Training ....")
for i in range(10):
print(f"Epoch: {i} finished")
time.sleep(1)

print("Training is complete")

TRAINING_CLIENT.create_job(
name=JOB_NAME,
namespace=job_namespace,
parameters={"lr": "0.01"},
train_func=train_func,
num_worker_replicas=1,
packages_to_install=["pandas==1.3.5"],
)

logging.info("Get created PyTorchJob from function")
logging.info(TRAINING_CLIENT.get_job(JOB_NAME, job_namespace))
try:
utils.verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
except Exception as e:
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)
raise Exception(f"PyTorchJob E2E fails. Exception: {e}")

verify_job_e2e(TRAINING_CLIENT, JOB_NAME, job_namespace, wait_timeout=900)
utils.print_job_results(TRAINING_CLIENT, JOB_NAME, job_namespace)
TRAINING_CLIENT.delete_job(JOB_NAME, job_namespace)


Expand Down
Loading

0 comments on commit fea6efb

Please sign in to comment.