diff --git a/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb b/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb index 7cb5e8fba9..002bd8a3fd 100644 --- a/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb +++ b/examples/pytorch/image-classification/Train-CNN-with-FashionMNIST.ipynb @@ -33,8 +33,8 @@ "metadata": {}, "outputs": [], "source": [ - "!pip install torch==1.12.1\n", - "!pip install torchvision==0.13.1\n", + "!pip install torch==2.1.2\n", + "!pip install torchvision==0.19.1\n", "\n", "# TODO (andreyvelich): Change to release version when SDK with the new APIs is published.\n", "!pip install git+https://github.com/kubeflow/training-operator.git#subdirectory=sdk/python" @@ -52,16 +52,9 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "id": "69f21f33-5c64-452c-90c4-977fc0dadb3b", "metadata": { - "execution": { - "iopub.execute_input": "2024-03-05T21:44:44.851155Z", - "iopub.status.busy": "2024-03-05T21:44:44.850918Z", - "iopub.status.idle": "2024-03-05T21:44:44.862195Z", - "shell.execute_reply": "2024-03-05T21:44:44.860949Z", - "shell.execute_reply.started": "2024-03-05T21:44:44.851138Z" - }, "tags": [] }, "outputs": [], @@ -102,11 +95,6 @@ " x = self.fc2(x)\n", " return F.log_softmax(x, dim=1)\n", "\n", - " # Get dist parameters.\n", - " # Kubeflow Training Operator automatically set appropriate RANK and WORLD_SIZE based on the configuration.\n", - " RANK = int(os.environ[\"RANK\"])\n", - " WORLD_SIZE = int(os.environ[\"WORLD_SIZE\"])\n", - "\n", " # IF GPU is available, nccl dist backend is used. Otherwise, gloo dist backend is used.\n", " if torch.cuda.is_available():\n", " device = \"cuda\"\n", @@ -114,17 +102,26 @@ " else:\n", " device = \"cpu\"\n", " backend = \"gloo\"\n", - "\n", + " \n", " logging.info(f\"Using Device: {device}, Backend: {backend}\")\n", "\n", - " model = Net()\n", - " # Attach model to the device.\n", - " model = model.to(device)\n", + " # Setup PyTorch DDP. Distributed environment will be set automatically by Training Operator.\n", + " dist.init_process_group(backend=backend)\n", + " Distributor = torch.nn.parallel.DistributedDataParallel\n", + " local_rank = int(os.getenv(\"LOCAL_RANK\", 0))\n", + " logging.info(\n", + " \"Distributed Training for WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}\".format(\n", + " dist.get_world_size(),\n", + " dist.get_rank(),\n", + " local_rank,\n", + " )\n", + " )\n", "\n", - " # Attach model to DistributedDataParallel strategy.\n", - " dist.init_process_group(backend=\"gloo\", rank=RANK, world_size=WORLD_SIZE)\n", - " Distributor = nn.parallel.DistributedDataParallel\n", + " # Attach model to the correct device.\n", + " device = torch.device(f\"{device}:{local_rank}\")\n", + " model = Net().to(device)\n", " model = Distributor(model)\n", + " optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)\n", "\n", " # Get Fashion MNIST Dataset.\n", " dataset = datasets.FashionMNIST(\n", @@ -142,11 +139,10 @@ " )\n", "\n", " # Start Training.\n", - " logging.info(f\"Start training for RANK: {RANK}. WORLD_SIZE: {WORLD_SIZE}\")\n", + " logging.info(f\"Start training for RANK: {dist.get_rank()}. WORLD_SIZE: {dist.get_world_size()}\")\n", "\n", " for epoch in range(int(parameters[\"NUM_EPOCHS\"])):\n", " model.train()\n", - " optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)\n", "\n", " for batch_idx, (data, target) in enumerate(train_loader):\n", " # Attach tensors to the device.\n", @@ -158,7 +154,7 @@ " loss = F.nll_loss(output, target)\n", " loss.backward()\n", " optimizer.step()\n", - " if batch_idx % 10 == 0:\n", + " if batch_idx % 10 == 0 and dist.get_rank() == 0:\n", " logging.info(\n", " \"Train Epoch: {} [{}/{} ({:.0f}%)]\\tloss={:.4f}\".format(\n", " epoch,\n", @@ -168,8 +164,8 @@ " loss.item(),\n", " )\n", " )\n", - "\n", - " logging.info(\"Training is finished\")" + " if dist.get_rank() == 0:\n", + " logging.info(\"Training is finished\")" ] }, { @@ -195,13 +191,6 @@ "execution_count": 3, "id": "9e2c6fd8-d0ba-4bc6-ac90-d4cf09751ace", "metadata": { - "execution": { - "iopub.execute_input": "2024-03-05T21:44:47.071420Z", - "iopub.status.busy": "2024-03-05T21:44:47.071188Z", - "iopub.status.idle": "2024-03-05T21:46:56.033826Z", - "shell.execute_reply": "2024-03-05T21:46:56.032986Z", - "shell.execute_reply.started": "2024-03-05T21:44:47.071404Z" - }, "tags": [] }, "outputs": [ @@ -209,9 +198,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "2024-03-05T21:44:47Z INFO Using Device: cpu, Backend: gloo\n", - "2024-03-05T21:44:47Z INFO Added key: store_based_barrier_key:1 to store for rank: 0\n", - "2024-03-05T21:44:47Z INFO Rank 0: Completed store-based barrier for key:store_based_barrier_key:1 with 1 nodes.\n" + "2024-10-08T13:58:29Z INFO Using Device: cpu, Backend: gloo\n", + "2024-10-08T13:58:29Z INFO Distributed Training for WORLD_SIZE: 1, RANK: 0, LOCAL_RANK: 0\n" ] }, { @@ -223,18 +211,11 @@ ] }, { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "f84c269459b842199b83caaee8bee276", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - " 0%| | 0/26421880 [00:00 1 or num_procs_per_worker is not None + ): + entrypoint = constants.ENTRYPOINT_TORCH + else: + entrypoint = constants.ENTRYPOINT_PYTHON + + command, args = utils.get_command_using_train_func( + train_func=train_func, + entrypoint=entrypoint, + train_func_parameters=parameters, + packages_to_install=packages_to_install, + pip_index_url=pip_index_url, + ) + # Get Training Container template. container_spec = utils.get_container_spec( name=constants.JOB_PARAMETERS[job_kind]["container"], base_image=base_image, - train_func=train_func, - train_func_parameters=parameters, - packages_to_install=packages_to_install, - pip_index_url=pip_index_url, + command=command, + args=args, resources=resources_per_worker, ) @@ -443,6 +473,10 @@ def create_job( # Configure template for different Jobs. # TODO (andreyvelich): Add support for other kinds (e.g. MPIJob). if job_kind == constants.TFJOB_KIND: + if num_procs_per_worker is not None: + raise ValueError( + f"num_procs_per_worker can't be set for {constants.TFJOB_KIND}" + ) job = utils.get_tfjob_template( name=name, namespace=namespace, @@ -451,12 +485,18 @@ def create_job( num_chief_replicas=num_chief_replicas, num_ps_replicas=num_ps_replicas, ) - elif job_kind == constants.PYTORCHJOB_KIND and num_workers: + elif job_kind == constants.PYTORCHJOB_KIND: + if num_chief_replicas is not None or num_ps_replicas is not None: + raise ValueError( + "num_chief_replicas and num_ps_replicas can't be set for " + f"{constants.PYTORCHJOB_KIND}" + ) job = utils.get_pytorchjob_template( name=name, namespace=namespace, worker_pod_template_spec=pod_template_spec, num_workers=num_workers, + num_procs_per_worker=num_procs_per_worker, ) else: raise ValueError( diff --git a/sdk/python/kubeflow/training/api/training_client_test.py b/sdk/python/kubeflow/training/api/training_client_test.py index ea8c495032..a82d284e6f 100644 --- a/sdk/python/kubeflow/training/api/training_client_test.py +++ b/sdk/python/kubeflow/training/api/training_client_test.py @@ -1,5 +1,4 @@ import multiprocessing -from typing import Optional from unittest.mock import Mock, patch import pytest @@ -10,7 +9,6 @@ KubeflowOrgV1PyTorchJobSpec, KubeflowOrgV1ReplicaSpec, KubeflowOrgV1RunPolicy, - KubeflowOrgV1SchedulingPolicy, TrainingClient, constants, ) @@ -21,10 +19,11 @@ V1ObjectMeta, V1PodSpec, V1PodTemplateSpec, - V1ResourceRequirements, ) TEST_NAME = "test" +TEST_IMAGE = "docker.io/test-training" + TIMEOUT = "timeout" RUNTIME = "runtime" MOCK_POD_OBJ = "mock_pod_obj" @@ -126,41 +125,20 @@ def get(self, timeout): return MockResponse() -def generate_container() -> V1Container: - return V1Container( - name="pytorch", - image="gcr.io/kubeflow-ci/pytorch-dist-mnist-test:v1.0", - args=["--backend", "gloo"], - resources=V1ResourceRequirements(limits={"memory": "1Gi", "cpu": "0.4"}), - ) - - -def generate_pytorchjob( - job_namespace: str, - master: KubeflowOrgV1ReplicaSpec, - worker: KubeflowOrgV1ReplicaSpec, - scheduling_policy: Optional[KubeflowOrgV1SchedulingPolicy] = None, -) -> KubeflowOrgV1PyTorchJob: - return KubeflowOrgV1PyTorchJob( - api_version=constants.API_VERSION, - kind=constants.PYTORCHJOB_KIND, - metadata=V1ObjectMeta(name="pytorchjob-mnist-ci-test", namespace=job_namespace), - spec=KubeflowOrgV1PyTorchJobSpec( - run_policy=KubeflowOrgV1RunPolicy( - clean_pod_policy="None", - scheduling_policy=scheduling_policy, - ), - pytorch_replica_specs={"Master": master, "Worker": worker}, - ), +def create_job( + command=None, + args=None, + num_workers=2, +): + container = V1Container( + name=constants.PYTORCHJOB_CONTAINER, + image=TEST_IMAGE, + command=command, + args=args, ) - -def create_job(): - job_namespace = TEST_NAME - container = generate_container() master = KubeflowOrgV1ReplicaSpec( replicas=1, - restart_policy="OnFailure", template=V1PodTemplateSpec( metadata=V1ObjectMeta( annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} @@ -169,20 +147,58 @@ def create_job(): ), ) - worker = KubeflowOrgV1ReplicaSpec( - replicas=1, - restart_policy="OnFailure", - template=V1PodTemplateSpec( - metadata=V1ObjectMeta( - annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} + pytorch_replica_specs = {"Master": master} + + # PyTorchJob always has 1 master and N-1 worker replicas. + if num_workers > 1: + pytorch_replica_specs["Worker"] = KubeflowOrgV1ReplicaSpec( + replicas=num_workers - 1, + template=V1PodTemplateSpec( + metadata=V1ObjectMeta( + annotations={constants.ISTIO_SIDECAR_INJECTION: "false"} + ), + spec=V1PodSpec(containers=[container]), ), - spec=V1PodSpec(containers=[container]), + ) + + pytorchjob = KubeflowOrgV1PyTorchJob( + api_version=constants.API_VERSION, + kind=constants.PYTORCHJOB_KIND, + metadata=V1ObjectMeta(name=TEST_NAME, namespace=TEST_NAME), + spec=KubeflowOrgV1PyTorchJobSpec( + run_policy=KubeflowOrgV1RunPolicy(clean_pod_policy=None), + pytorch_replica_specs=pytorch_replica_specs, ), ) - pytorchjob = generate_pytorchjob(job_namespace, master, worker) + return pytorchjob +# Check if actual string contains all elements from the expected list. +class AnyStringWithElementsFromList: + def __init__(self, expected): + self.expected = expected + + def __eq__(self, actual): + return all(e in str(actual) for e in self.expected) + + +def create_job_from_func(num_workers, packages_to_install=None, pip_index_url=None): + + command = constants.DEFAULT_COMMAND + if num_workers > 1: + args = [f'{constants.ENTRYPOINT_TORCH} "$program_path/ephemeral_script.py"'] + else: + args = [f'{constants.ENTRYPOINT_PYTHON} "$program_path/ephemeral_script.py"'] + + if pip_index_url and packages_to_install: + args += [f"--index-url {pip_index_url} {packages_to_install[0]}"] + + job = create_job(command, AnyStringWithElementsFromList(args), num_workers) + + return job + + def generate_job_with_status( job: constants.JOB_MODELS_TYPE, condition_type: str = constants.JOB_CONDITION_SUCCEEDED, @@ -204,92 +220,148 @@ def __init__(self, kind) -> None: test_data_create_job = [ + ( + "valid flow", + {"job": create_job(), "namespace": TEST_NAME}, + SUCCESS, + create_job(), + ), + ( + "valid flow to create multi-node job with torchrun", + { + "name": TEST_NAME, + "namespace": TEST_NAME, + "train_func": lambda: print("Test Training Function"), + "base_image": TEST_IMAGE, + "num_workers": 3, + "packages_to_install": ["boto3==1.34.14"], + "pip_index_url": "https://pypi.custom.com/simple", + }, + SUCCESS, + create_job_from_func( + num_workers=3, + packages_to_install=["boto3==1.34.1"], + pip_index_url="https://pypi.custom.com/simple", + ), + ), + ( + "valid flow to create job with 1 worker", + { + "name": TEST_NAME, + "namespace": TEST_NAME, + "train_func": lambda: print("Test Training Function"), + "base_image": TEST_IMAGE, + "num_workers": 1, + }, + SUCCESS, + create_job_from_func(num_workers=1), + ), + ( + "valid flow to create job using image", + { + "name": TEST_NAME, + "namespace": TEST_NAME, + "base_image": TEST_IMAGE, + "num_workers": 2, + }, + SUCCESS, + create_job(num_workers=2), + ), ( "invalid extra parameter", - {"job": create_job(), "namespace": TEST_NAME, "base_image": "test_image"}, + { + "job": create_job(), + "namespace": TEST_NAME, + "base_image": "test_image", + }, ValueError, + None, + ), + ( + "invalid job kind", + {"job_kind": "invalid_job_kind"}, + ValueError, + None, ), - ("invalid job kind", {"job_kind": "invalid_job_kind"}, ValueError), ( - "job name missing ", + "job name missing with train function", {"train_func": lambda: "test train function"}, ValueError, + None, + ), + ( + "job name missing with base image", + {"base_image": "test_image"}, + ValueError, + None, ), - ("job name missing", {"base_image": "test_image"}, ValueError), ( "uncallable train function", - {"name": "test job", "train_func": "uncallable train function"}, + { + "name": TEST_NAME, + "train_func": "uncallable train function", + }, ValueError, + None, ), ( - "invalid TFJob replica", + "invalid number of workers", { - "name": "test job", - "train_func": lambda: "test train function", + "name": TEST_NAME, + "num_workers": 0, + }, + ValueError, + None, + ), + ( + "num_procs_per_worker is set for TFJob", + { + "name": TEST_NAME, "job_kind": constants.TFJOB_KIND, + "num_procs_per_worker": 5, + "base_image": "test_image", }, ValueError, + None, ), ( - "invalid PyTorchJob replica", + "num_chief_replicas and num_ps_replicas is set for PyTorchJov", { - "name": "test job", - "train_func": lambda: "test train function", - "job_kind": constants.PYTORCHJOB_KIND, + "name": TEST_NAME, + "num_chief_replicas": 1, + "num_ps_replicas": 1, + "base_image": "test_image", }, ValueError, + None, ), ( "paddle job can't be created using function", { - "name": "test job", + "name": TEST_NAME, "train_func": lambda: "test train function", "job_kind": constants.PADDLEJOB_KIND, }, ValueError, + None, ), ( "invalid job object", {"job": DummyJobClass(constants.TFJOB_KIND)}, ValueError, + None, ), ( "create_namespaced_custom_object timeout error", {"job": create_job(), "namespace": TIMEOUT}, TimeoutError, + None, ), ( "create_namespaced_custom_object runtime error", {"job": create_job(), "namespace": RUNTIME}, RuntimeError, - ), - ( - "valid flow", - {"job": create_job(), "namespace": TEST_NAME}, - SUCCESS, - ), - ( - "valid flow to create job from func", - { - "name": "test-job", - "namespace": TEST_NAME, - "train_func": lambda: print("Test Training Function"), - "base_image": "docker.io/test-training", - "num_workers": 3, - "packages_to_install": ["boto3==1.34.14"], - "pip_index_url": "https://pypi.custom.com/simple", - }, - SUCCESS, - ), - ( - "valid flow to create job using image", - { - "name": "test-job", - "namespace": TEST_NAME, - "base_image": "docker.io/test-training", - "num_workers": 2, - }, - SUCCESS, + None, ), ] @@ -962,15 +1034,26 @@ def training_client(): yield client -@pytest.mark.parametrize("test_name,kwargs,expected_output", test_data_create_job) -def test_create_job(training_client, test_name, kwargs, expected_output): +@pytest.mark.parametrize( + "test_name,kwargs,expected_output,expected_job", test_data_create_job +) +def test_create_job(training_client, test_name, kwargs, expected_output, expected_job): """ test create_job function of training client """ print("Executing test:", test_name) try: training_client.create_job(**kwargs) + assert expected_output == SUCCESS + + training_client.custom_api.create_namespaced_custom_object.assert_called_with( + constants.GROUP, + constants.VERSION, + kwargs["namespace"], + constants.JOB_PARAMETERS[constants.PYTORCHJOB_KIND]["plural"], + expected_job, + ) except Exception as e: assert type(e) is expected_output print("test execution complete") diff --git a/sdk/python/kubeflow/training/constants/constants.py b/sdk/python/kubeflow/training/constants/constants.py index dbbd885baa..dba4d49681 100644 --- a/sdk/python/kubeflow/training/constants/constants.py +++ b/sdk/python/kubeflow/training/constants/constants.py @@ -118,6 +118,10 @@ PYTORCHJOB_REPLICA_TYPES = (REPLICA_TYPE_MASTER.lower(), REPLICA_TYPE_WORKER.lower()) PYTORCHJOB_BASE_IMAGE = "docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime" +ENTRYPOINT_TORCH = "torchrun" +ENTRYPOINT_PYTHON = "python -u" +DEFAULT_COMMAND = ["bash", "-c"] + # XGBoostJob constants XGBOOSTJOB_KIND = "XGBoostJob" XGBOOSTJOB_MODEL = "KubeflowOrgV1XGBoostJob" diff --git a/sdk/python/kubeflow/training/utils/utils.py b/sdk/python/kubeflow/training/utils/utils.py index 56011e94aa..4f2d9c97d7 100644 --- a/sdk/python/kubeflow/training/utils/utils.py +++ b/sdk/python/kubeflow/training/utils/utils.py @@ -120,10 +120,10 @@ def get_script_for_python_packages( script_for_python_packages = textwrap.dedent( f""" if ! [ -x "$(command -v pip)" ]; then - python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip + python -m ensurepip || python -m ensurepip --user || apt-get install python-pip fi - PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet \ + PIP_DISABLE_PIP_VERSION_CHECK=1 python -m pip install --quiet \ --no-warn-script-location --index-url {pip_index_url} {packages_str} """ ) @@ -132,7 +132,8 @@ def get_script_for_python_packages( def get_command_using_train_func( - train_func: Optional[Callable], + train_func: Callable, + entrypoint: str, train_func_parameters: Optional[Dict[str, Any]] = None, packages_to_install: Optional[List[str]] = None, pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, @@ -170,11 +171,11 @@ def get_command_using_train_func( {func_code} EOM printf "%s" \"$SCRIPT\" > \"$program_path/ephemeral_script.py\" - python3 -u \"$program_path/ephemeral_script.py\"""" + {entrypoint} \"$program_path/ephemeral_script.py\"""" ) # Add function code to the execute script. - exec_script = exec_script.format(func_code=func_code) + exec_script = exec_script.format(func_code=func_code, entrypoint=entrypoint) # Install Python packages if that is required. if packages_to_install is not None: @@ -184,16 +185,13 @@ def get_command_using_train_func( ) # Return container command and args to execute training function. - return ["bash", "-c"], [exec_script] + return constants.DEFAULT_COMMAND, [exec_script] def get_container_spec( name: str, base_image: str, - train_func: Optional[Callable] = None, - train_func_parameters: Optional[Dict[str, Any]] = None, - packages_to_install: Optional[List[str]] = None, - pip_index_url: str = constants.DEFAULT_PIP_INDEX_URL, + command: Optional[List[str]] = None, args: Optional[List[str]] = None, resources: Union[dict, models.V1ResourceRequirements, None] = None, volume_mounts: Optional[List[models.V1VolumeMount]] = None, @@ -207,18 +205,13 @@ def get_container_spec( # Create initial container spec. container_spec = models.V1Container( - name=name, image=base_image, args=args, volume_mounts=volume_mounts + name=name, + image=base_image, + command=command, + args=args, + volume_mounts=volume_mounts, ) - # If training function is set, override container command and args to execute the function. - if train_func is not None: - container_spec.command, container_spec.args = get_command_using_train_func( - train_func=train_func, - train_func_parameters=train_func_parameters, - packages_to_install=packages_to_install, - pip_index_url=pip_index_url, - ) - # Convert dict to the Kubernetes container resources if that is required. if isinstance(resources, dict): # Convert all keys in resources to lowercase. @@ -265,15 +258,10 @@ def get_tfjob_template( name: str, namespace: str, pod_template_spec: models.V1PodTemplateSpec, - num_workers: Optional[int] = None, + num_workers: int, num_chief_replicas: Optional[int] = None, num_ps_replicas: Optional[int] = None, ): - # Check if at least one replica is set. - # TODO (andreyvelich): Remove this check once we have CEL validation. - # Ref: https://github.com/kubeflow/training-operator/issues/1708 - if num_workers is None and num_chief_replicas is None and num_ps_replicas is None: - raise ValueError("At least one replica for TFJob must be set") # Create TFJob template. tfjob = models.KubeflowOrgV1TFJob( @@ -320,14 +308,8 @@ def get_pytorchjob_template( num_workers: int, worker_pod_template_spec: Optional[models.V1PodTemplateSpec], master_pod_template_spec: Optional[models.V1PodTemplateSpec] = None, - num_procs_per_worker: Optional[int] = None, - elastic_policy: Optional[models.KubeflowOrgV1ElasticPolicy] = None, + num_procs_per_worker: Optional[Union[int, str]] = None, ): - # Check if at least one Worker is set. - # TODO (andreyvelich): Remove this check once we have CEL validation. - # Ref: https://github.com/kubeflow/training-operator/issues/1708 - if num_workers is None or num_workers < 1: - raise ValueError("At least one Worker for PyTorchJob must be set") # Create PyTorchJob template. pytorchjob = models.KubeflowOrgV1PyTorchJob( @@ -337,11 +319,9 @@ def get_pytorchjob_template( spec=models.KubeflowOrgV1PyTorchJobSpec( run_policy=models.KubeflowOrgV1RunPolicy(clean_pod_policy=None), pytorch_replica_specs={}, - elastic_policy=elastic_policy, ), ) - # TODO (andreyvelich): Should we make spec.nproc_per_node int ? if num_procs_per_worker: pytorchjob.spec.nproc_per_node = str(num_procs_per_worker)