diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index ea8cc5d6e7bae..ca09d226b7b0b 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -979,6 +979,14 @@ { "title": "Migrating a BashOperator for dbt", "path": "/integrations/airlift/operator-migration/bash-operator-dbt" + }, + { + "title": "Migrating a KubernetesPodOperator", + "path": "/integrations/airlift/operator-migration/kubernetes-pod-operator" + }, + { + "title": "Migrating a PythonOperator", + "path": "/integrations/airlift/operator-migration/python-operator" } ] } diff --git a/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx b/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx new file mode 100644 index 0000000000000..54f9e3ac40287 --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx @@ -0,0 +1,159 @@ +# Operator migration guides: Migrating usage of `KubernetesPodOperator` + +In this page, we'll explain migrating an Airflow `KubernetesPodOperator` to Dagster. + +### Background + +The KubernetesPodOperator in Apache Airflow enables users to execute containerized tasks within Kubernetes pods as part of their data pipelines. + +```python file=/integrations/airlift/operator_migration/kubernetes_pod_operator.py +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +k8s_hello_world = KubernetesPodOperator( + task_id="hello_world_task", + name="hello-world-pod", + image="bash:latest", + cmds=["bash", "-cx"], + arguments=['echo "Hello World!"'], +) +``` + +### Dagster equivalent + +The Dagster equivalent is to use the to execute a task within a Kubernetes pod. + +```python file=/integrations/airlift/operator_migration/using_k8s_pipes.py +from dagster_k8s import PipesK8sClient + +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return ( + PipesK8sClient() + .run( + context=context.op_execution_context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ) + .get_results() + ) +``` + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Ensure that your Dagster deployment has access to the Kubernetes cluster. +2. Write an that executes the task within a Kubernetes pod using the . +3. Use `dagster-airlift` to proxy execution of the original task to Dagster. + +### Step 1: Ensure access to the Kubernetes cluster + +First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The accepts `kubeconfig` and `kubecontext`, and `env` arguments to configure the Kubernetes client. + +Here's an example of what this might look like when configuring the client to access an EKS cluster: + +```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_client endbefore=end_client +from dagster_k8s import PipesK8sClient + +eks_client = PipesK8sClient( + # The client will have automatic access to all + # environment variables in the execution context. + env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"}, + kubeconfig_file="path/to/kubeconfig", + kube_context="my-eks-cluster", +) +``` + +### Step 2: Writing an asset that executes the task within a Kubernetes pod + +Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the . In comparison to the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code. + +In the [parameter comparison](/integrations/airlift/kubernetes-pod-operator#parameter-comparison) section of this doc, you'll find a detailed comparison describing how to map the KubernetesPodOperator parameters to the PipesK8sClient parameters. + +```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_asset endbefore=end_asset +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return eks_client.run( + context=context.op_execution_context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ).get_results() +``` + +This is just a snippet of what the PipesK8sClient can do. Take a look at our full guide on the [dagster-k8s PipesK8sClient](/dagster-pipes/kubernetes) for more information. + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. + +### Parameter comparison + +Here's a comparison of the parameters between the KubernetesPodOperator and the PipesK8sClient: Directly supported arguments: + +- in_cluster (named load_incluster_config here) +- cluster_context (named kube_context here) +- config_file (named kubeconfig_file here) + +Many arguments are supported indirectly via the pod_spec argument. + +- volumes: Volumes to be used by the Pod (key `volumes`) +- affinity: Node affinity/anti-affinity rules for the Pod (key `affinity`) +- node_selector: Node selection constraints for the Pod (key `nodeSelector`) +- hostnetwork: Enable host networking for the Pod (key `hostNetwork`) +- dns_config: DNS settings for the Pod (key `dnsConfig`) +- dnspolicy: DNS policy for the Pod (key `dnsPolicy`) +- hostname: Hostname of the Pod (key `hostname`) +- subdomain: Subdomain for the Pod (key `subdomain`) +- schedulername: Scheduler to be used for the Pod (key `schedulerName`) +- service_account_name: Service account to be used by the Pod (key `serviceAccountName`) +- priority_class_name: Priority class for the Pod (key `priorityClassName`) +- security_context: Security context for the entire Pod (key `securityContext`) +- tolerations: Tolerations for the Pod (key `tolerations`) +- image_pull_secrets: Secrets for pulling container images (key `imagePullSecrets`) +- termination_grace_period: Grace period for Pod termination (key `terminationGracePeriodSeconds`) +- active_deadline_seconds: Deadline for the Pod's execution (key `activeDeadlineSeconds`) +- host_aliases: Additional entries for the Pod's /etc/hosts (key `hostAliases`) +- init_containers: Initialization containers for the Pod (key `initContainers`) + +The following arguments are supported under the nested `containers` key of the pod_spec argument: + +- image: Docker image for the container (key 'image') +- cmds: Entrypoint command for the container (key `command`) +- arguments: Arguments for the entrypoint command (key `args`) +- ports: List of ports to expose from the container (key `ports`) +- volume_mounts: List of volume mounts for the container (key `volumeMounts`) +- env_vars: Environment variables for the container (key `env`) +- env_from: List of sources to populate environment variables (key `envFrom`) +- image_pull_policy: Policy for pulling the container image (key `imagePullPolicy`) +- container_resources: Resource requirements for the container (key `resources`) +- container_security_context: Security context for the container (key `securityContext`) +- termination_message_policy: Policy for the termination message (key `terminationMessagePolicy`) + +For a full list, see the [kubernetes container spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#container-v1-core). The following arguments are supported under the pod_metadata argument, which configures the metadata of the pod: + +- name: `name` +- namespace: `namespace` +- labels: `labels` +- annotations: `annotations` + +For a full list, see the [kubernetes objectmeta spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#objectmeta-v1-meta). diff --git a/docs/content/integrations/airlift/operator-migration/overview.mdx b/docs/content/integrations/airlift/operator-migration/overview.mdx index 19ed7d5eb6a5d..dc3fde15f8998 100644 --- a/docs/content/integrations/airlift/operator-migration/overview.mdx +++ b/docs/content/integrations/airlift/operator-migration/overview.mdx @@ -9,4 +9,12 @@ This page contains a collection of reference materials for migrating usage of co title="Bash Operator for dbt" href="/integrations/airlift/operator-migration/bash-operator-dbt" > + + diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py new file mode 100644 index 0000000000000..45f5c5ffc326a --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py @@ -0,0 +1,35 @@ +AWS_CREDENTIALS = {} + +# start_client +from dagster_k8s import PipesK8sClient + +eks_client = PipesK8sClient( + # The client will have automatic access to all + # environment variables in the execution context. + env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"}, + kubeconfig_file="path/to/kubeconfig", + kube_context="my-eks-cluster", +) +# end_client + +# start_asset +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return eks_client.run( + context=context.op_execution_context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ).get_results() + + +# end_asset diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py new file mode 100644 index 0000000000000..bde2a3e5b1a86 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py @@ -0,0 +1,9 @@ +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +k8s_hello_world = KubernetesPodOperator( + task_id="hello_world_task", + name="hello-world-pod", + image="bash:latest", + cmds=["bash", "-cx"], + arguments=['echo "Hello World!"'], +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py new file mode 100644 index 0000000000000..298851e9ff2e8 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py @@ -0,0 +1,23 @@ +from dagster_k8s import PipesK8sClient + +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return ( + PipesK8sClient() + .run( + context=context.op_execution_context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ) + .get_results() + )