Skip to content

Commit

Permalink
Add status to TrainJob
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Velichkevich <[email protected]>
  • Loading branch information
andreyvelich committed Nov 12, 2024
1 parent e9ff7d1 commit 61d4e75
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 31 deletions.
4 changes: 4 additions & 0 deletions hack/python-sdk-v2/gen-sdk.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ git checkout ${SDK_OUTPUT_PATH}/kubeflow/training/__init__.py

# Manually modify the SDK version in the __init__.py file.
sed -i '' -e "s/__version__.*/__version__ = \"${SDK_VERSION}\"/" ${SDK_OUTPUT_PATH}/kubeflow/training/__init__.py

# Kubeflow models must have Kubernetes models to perform serialization.
printf "\n# Import Kubernetes models for the serialization\n" >>${SDK_OUTPUT_PATH}/kubeflow/training/models/__init__.py
printf "from kubernetes.client import *\n" >>${SDK_OUTPUT_PATH}/kubeflow/training/models/__init__.py
1 change: 0 additions & 1 deletion pkg/initializer_v2/dataset/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class HuggingFace(utils.DatasetProvider):

def load_config(self):
config_dict = utils.get_config_from_env(HuggingFaceDatasetConfig)
logging.info(f"Config for HuggingFace dataset initializer: {config_dict}")
self.config = HuggingFaceDatasetConfig(**config_dict)

def download_dataset(self):
Expand Down
1 change: 0 additions & 1 deletion pkg/initializer_v2/model/huggingface.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ class HuggingFace(utils.ModelProvider):

def load_config(self):
config_dict = utils.get_config_from_env(HuggingFaceModelInputConfig)
logging.info(f"Config for HuggingFace model initializer: {config_dict}")
self.config = HuggingFaceModelInputConfig(**config_dict)

def download_model(self):
Expand Down
90 changes: 64 additions & 26 deletions sdk_v2/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,25 @@ def list_jobs(self) -> List[types.TrainJob]:
response = thread.get(constants.DEFAULT_TIMEOUT)

for item in response["items"]:
result.append(
types.TrainJob(
name=item["metadata"]["name"],
runtime_ref=item["spec"]["runtimeRef"]["name"],
creation_timestamp=item["metadata"]["creationTimestamp"],
)

item = self.api_client.deserialize(
utils.FakeResponse(item),
models.KubeflowOrgV2alpha1TrainJob,
)

train_job = types.TrainJob(
name=item.metadata.name, # type: ignore
runtime_ref=item.spec.runtime_ref.name, # type: ignore
creation_timestamp=item.metadata.creation_timestamp, # type: ignore
)
# TODO: Add status
if "status" in item:
pass

# TODO (andreyvelich): This should be changed.
if item.status: # type: ignore
train_job.status = utils.get_trainjob_status(
item.status.conditions # type: ignore
)

result.append(train_job)

except multiprocessing.TimeoutError:
raise TimeoutError(
Expand Down Expand Up @@ -277,7 +286,7 @@ def get_job_pods(self, name: str) -> List[types.Pod]:
result.append(
types.Pod(
name=item.metadata.name,
type=utils.get_pod_type(item.metadata.labels),
component=utils.get_pod_type(item.metadata.labels),
status=item.status.phase if item.status else None,
)
)
Expand All @@ -294,15 +303,26 @@ def get_job_pods(self, name: str) -> List[types.Pod]:
return result

def get_job_logs(
self, name: str, follow: bool = False, stage: str = "trainer"
self,
name: str,
follow: bool = False,
component: str = constants.JOB_TRAINER_NODE,
node_index: int = 0,
) -> Dict[str, str]:
"""Get the Trainer logs from TrainJob"""
"""Get the logs from TrainJob
TODO (andreyvelich): Should we change node_index to node_rank ?
TODO (andreyvelich): For the initializer, we can add the unit argument.
"""

# Trainer node with index 0 must be deployed.
pod = None
# Get Initializer or Trainer Pod.
for p in self.get_job_pods(name):
if p.type == constants.MASTER_NODE and p.type != constants.POD_PENDING:
pod = p
if p.status != constants.POD_PENDING:
if p.component == component and component == constants.JOB_INITIALIZER:
pod = p
elif p.component == component + "-" + str(node_index):
pod = p

if pod is None:
return {}

Expand All @@ -311,7 +331,8 @@ def get_job_logs(

# TODO (andreyvelich): Potentially, refactor this.
# Support logging of multiple Pods.
if follow:
# TODO (andreyvelich): Currently, follow is supported only for Trainer.
if follow and component == constants.JOB_TRAINER_NODE:
log_streams = []
log_streams.append(
watch.Watch().stream(
Expand Down Expand Up @@ -341,24 +362,41 @@ def get_job_logs(
finished[index] = True
break
# Print logs to the StdOut
print(f"[{pod.type}]: {logline}")
print(f"[{pod.component}]: {logline}")
# Add logs to the results dict.
if pod.type not in logs_dict:
logs_dict[pod.type] = logline + "\n"
if pod.component not in logs_dict:
logs_dict[pod.component] = logline + "\n"
else:
logs_dict[pod.type] += logline + "\n"
logs_dict[pod.component] += logline + "\n"
except queue.Empty:
break
if all(finished):
return logs_dict

try:
pod_logs = self.core_api.read_namespaced_pod_log(
name=pod.name,
namespace=self.namespace,
container=constants.CONTAINER_TRAINER,
)
logs_dict[pod.name] = pod_logs
if component == constants.JOB_INITIALIZER:
logs_dict[constants.CONTAINER_DATASET_INITIALIZER] = (
self.core_api.read_namespaced_pod_log(
name=pod.name,
namespace=self.namespace,
container=constants.CONTAINER_DATASET_INITIALIZER,
)
)
logs_dict[constants.CONTAINER_MODEL_INITIALIZER] = (
self.core_api.read_namespaced_pod_log(
name=pod.name,
namespace=self.namespace,
container=constants.CONTAINER_MODEL_INITIALIZER,
)
)
else:
logs_dict[component + "-" + str(node_index)] = (
self.core_api.read_namespaced_pod_log(
name=pod.name,
namespace=self.namespace,
container=constants.CONTAINER_TRAINER,
)
)
except Exception:
raise RuntimeError(
f"Failed to read logs for the pod {self.namespace}/{pod.name}"
Expand Down
9 changes: 8 additions & 1 deletion sdk_v2/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os

# How long to wait in seconds for requests to the Kubernetes API Server.
DEFAULT_TIMEOUT = 120
Expand Down Expand Up @@ -49,7 +50,7 @@
TRAINJOB_PLURAL = "trainjobs"

# The default PIP index URL to download Python packages.
DEFAULT_PIP_INDEX_URL = "https://pypi.org/simple"
DEFAULT_PIP_INDEX_URL = os.getenv("DEFAULT_PIP_INDEX_URL", "https://pypi.org/simple")

# The default command for the Trainer.
DEFAULT_COMMAND = ["bash", "-c"]
Expand All @@ -69,6 +70,12 @@
# The Job name for the initializer.
JOB_INITIALIZER = "initializer"

# The container name for the dataset initializer.
CONTAINER_DATASET_INITIALIZER = "dataset-initializer"

# The container name for the model initializer.
CONTAINER_MODEL_INITIALIZER = "model-initializer"

# The Job name for the trainer nodes.
JOB_TRAINER_NODE = "trainer-node"

Expand Down
3 changes: 3 additions & 0 deletions sdk_v2/kubeflow/training/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@
from kubeflow.training.models.kubeflow_org_v2alpha1_training_runtime import KubeflowOrgV2alpha1TrainingRuntime
from kubeflow.training.models.kubeflow_org_v2alpha1_training_runtime_list import KubeflowOrgV2alpha1TrainingRuntimeList
from kubeflow.training.models.kubeflow_org_v2alpha1_training_runtime_spec import KubeflowOrgV2alpha1TrainingRuntimeSpec

# Import Kubernetes models for the serialization
from kubernetes.client import *
4 changes: 2 additions & 2 deletions sdk_v2/kubeflow/training/types/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ class TrainJob:
name: str
runtime_ref: str
creation_timestamp: str
status: Optional[str] = None
status: Optional[str] = "Unknown"


# Representation for the Pod.
@dataclass
class Pod:
name: str
type: str
component: str
status: Optional[str] = None


Expand Down
28 changes: 28 additions & 0 deletions sdk_v2/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ def get_default_target_namespace() -> str:
return f.readline()


class FakeResponse:
"""Fake object of RESTResponse to deserialize
Ref) https://github.com/kubeflow/katib/pull/1630#discussion_r697877815
Ref) https://github.com/kubernetes-client/python/issues/977#issuecomment-592030030
"""

def __init__(self, obj):
self.data = json.dumps(obj)


# TODO (andreyvelich): Discuss if we want to support V1ResourceRequirements resources as input.
def get_resources_per_node(resources_per_node: dict) -> client.V1ResourceRequirements:
"""
Expand Down Expand Up @@ -197,6 +207,24 @@ def get_model_config(
return m_config


# TODO (andreyvelich): Discuss how we should show TrainJob status to SDK users.
def get_trainjob_status(conditions: List[client.V1Condition]) -> str:
"""
Convert the TrainJob status to the user-friendly status.
"""
status = "Unknown"

for c in conditions:
if c.type == "Created" and c.status == "True":
status = "Created"
elif c.type == "Complete" and c.status == "True":
status = "Succeeded"
elif c.type == "Failed" and c.status == "True":
status = "Failed"

return status


def get_pod_type(labels: Dict) -> str:
"""
Identify the Pod type from the given Pod labels.
Expand Down

0 comments on commit 61d4e75

Please sign in to comment.