Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating status based on status of children #1129

Open
mooperd opened this issue Oct 14, 2024 · 0 comments
Open

Updating status based on status of children #1129

mooperd opened this issue Oct 14, 2024 · 0 comments
Labels
question Further information is requested

Comments

@mooperd
Copy link

mooperd commented Oct 14, 2024

Keywords

No response

Problem

Hello,

My operator creates jobs. I'm trying to find a neat way of updating the status of the operator when the job changes. Please could I have some guidence on the best way of implimenting this. I'm guessing I have to use a timer..... but then there might be thousands of concurrent image processing jobs 😬.

import kopf
import kubernetes.client
from kubernetes.client.rest import ApiException
import os

# Load Kubernetes configuration
kubernetes.config.load_incluster_config()  # Use this if running inside a cluster
# kubernetes.config.load_kube_config()     # Use this if running locally for testing

@kopf.on.create('imageprocessingjobs')
def on_create(spec, name, namespace, logger, patch, **kwargs):
    #TODO: Fix hardcoded namespace here!
    namespace="foo"
    logger.info(f"Processing ImageProcessingJob {name} in namespace {namespace}")

    s3_input_location = spec.get('s3_input_location')
    model_name = spec.get('model_name')
    s3_output_location = spec.get('s3_output_location')

    # Validate input
    if not s3_input_location or not model_name or not s3_output_location:
        raise kopf.HandledException("Missing required spec fields.")

    # Define the job name
    job_name = f"ipj-{name}"

    # Define the container image to use (replace with your actual image)
    container_image = "nvcr.io/nvidia/cloud-native/dcgm:3.3.0-1-ubuntu22.04"

    # Define the Kubernetes Job
    job_manifest = {
        'apiVersion': 'batch/v1',
        'kind': 'Job',
        'metadata': {
            'name': job_name,
            'namespace': namespace,
            'ownerReferences': [
                {
                    'apiVersion': 'foo.io/v1',
                    'kind': 'ImageProcessingJob',
                    'name': name,
                    'uid': kwargs['body']['metadata']['uid'],
                }
            ],
        },
        'spec': {
            'template': {
                'metadata': {
                    'labels': {
                        'job-name': job_name
                    }
                },
                'spec': {
                    'containers': [
                        {
                            'name': 'image-processor',
                            'image': container_image,
                            'command': ["/usr/bin/dcgmproftester12"],
                            'args': ["--no-dcgm-validation", "-t 1004", "-d 30"],
                            'resources': {'limits': {"nvidia.com/gpu": 1}},
                            'env': [
                                {'name': 'S3_INPUT_LOCATION', 'value': s3_input_location},
                                {'name': 'MODEL_NAME', 'value': model_name},
                                {'name': 'S3_OUTPUT_LOCATION', 'value': s3_output_location},
                            ],
                            # Add any other necessary configurations (e.g., volume mounts)
                        }
                    ],
                    'restartPolicy': 'Never',
                }
            },
            'backoffLimit': 3,
        }
    }

    # Create the Job
    batch_v1 = kubernetes.client.BatchV1Api()
    try:
        batch_v1.create_namespaced_job(namespace=namespace, body=job_manifest)
        logger.info(f"Job {job_name} created successfully.")
        # Update the status to reflect that the job is created
        patch.status['state'] = 'Created'
        patch.status['message'] = f'Job {job_name} created and running.'
    except ApiException as e:
        logger.error(f"Exception when creating Job: {e}")
        raise kopf.TemporaryError(f"Failed to create Job {job_name}", delay=30)

@kopf.on.delete('imageprocessingjobs')
def on_delete(spec, name, namespace, logger, **kwargs):
    logger.info(f"Cleaning up resources for ImageProcessingJob {name}")
    job_name = f"ipj-{name}"

    # Delete the Job
    batch_v1 = kubernetes.client.BatchV1Api()
    try:
        batch_v1.delete_namespaced_job(
            name=job_name,
            namespace=namespace,
            body=kubernetes.client.V1DeleteOptions(
                propagation_policy='Foreground'
            )
        )
        logger.info(f"Job {job_name} deleted.")
    except ApiException as e:
        logger.error(f"Exception when deleting Job: {e}")
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: imageprocessingjobs.darwinist.io
spec:
  group: foo.io
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              required:
                - s3_input_location
                - model_name
                - s3_output_location
              properties:
                s3_input_location:
                  type: string
                model_name:
                  type: string
                s3_output_location:
                  type: string
            status:
              type: object
              properties:
                state:
                  type: string
                message:
                  type: string
                conditions:
                  type: array
                  items:
                    type: object
                    properties:
                      type:
                        type: string
                      status:
                        type: string
                      reason:
                        type: string
                      message:
                        type: string
                      lastUpdateTime:
                        type: string
      subresources:
        status: {}  # Enables the status subresource

The 2nd log entry doesn't mention a resource - does this also refer to namespaces?

[2024-10-14 22:22:27,598] kopf._core.reactor.o [WARNING ] Not enough permissions to list namespaces. Falling back to a list of namespaces which are assumed to exist: {'foo'}
[2024-10-14 22:22:27,600] kopf._core.reactor.o [WARNING ] Not enough permissions to watch for resources: changes (creation/deletion/updates) will not be noticed; the resources are only refreshed on operator restarts.
[2024-10-14 22:22:27,600] kopf._core.reactor.o [WARNING ] Not enough permissions to watch for namespaces: changes (deletion/creation) will not be noticed; the namespaces are only refreshed on operator restarts.

The ClusterRole should provide access to namespaces.

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: operator-cluster-role
rules:
  - apiGroups: ["darwinist.io"]
    resources: ["imageprocessingjobs", "imageprocessingjobs/status"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["pods", "pods/log", "services", "endpoints", "persistentvolumeclaims", "events"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: ["batch", "batch/v1", "extensions/v1beta1"]
    resources: ["jobs"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
  - apiGroups: [""]
    resources: ["namespaces"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]

I tried this but it does not seem to be firing

@kopf.on.update('batch/v1', 'jobs')  # Track updates to all jobs
def on_job_update(namespace, name, status, labels, logger, **kwargs):
    logger.info(f"Job {name} updated in namespace {namespace}. Checking for completion or failure.")
    # Check if the job is one of the image processing jobs
    if 'job-name' in labels and labels['job-name'].startswith("ipj-"):
        job_name = labels['job-name']
        logger.info(f"Job {job_name} in namespace {namespace} has been updated.")

        # Extract job conditions and update the ImageProcessingJob status
        job_conditions = status.get('conditions', [])
        for condition in job_conditions:
            if condition['type'] == 'Complete' and condition['status'] == 'True':
                logger.info(f"Job {job_name} completed successfully.")
                # Here you would patch the associated ImageProcessingJob's status
                update_imageprocessingjob_status(job_name, namespace, 'Succeeded', logger)
            elif condition['type'] == 'Failed' and condition['status'] == 'True':
                logger.error(f"Job {job_name} failed.")
                # Here you would patch the associated ImageProcessingJob's status
                update_imageprocessingjob_status(job_name, namespace, 'Failed', logger)

def update_imageprocessingjob_status(job_name, namespace, state, logger):
    namespace="darwinist"
    # Extract the ImageProcessingJob name from the job name
    imageprocessingjob_name = job_name.replace('ipj-', '')

    # Get the CustomObjectsApi to patch the ImageProcessingJob status
    custom_objects_api = kubernetes.client.CustomObjectsApi()
    
    try:
        # Patch the status of the ImageProcessingJob
        custom_objects_api.patch_namespaced_custom_object_status(
            group="darwinist.io",
            version="v1",
            namespace=namespace,
            plural="imageprocessingjobs",
            name=imageprocessingjob_name,
            body={"status": {"state": state, "message": f"Job {job_name} {state.lower()}."}}
        )
        logger.info(f"Updated ImageProcessingJob {imageprocessingjob_name} with state: {state}")
    except ApiException as e:
        logger.error(f"Failed to update ImageProcessingJob {imageprocessingjob_name} status: {e}")
@mooperd mooperd added the question Further information is requested label Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant