diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json
index ea8cc5d6e7bae..ecee42cc452d9 100644
--- a/docs/content/_navigation.json
+++ b/docs/content/_navigation.json
@@ -979,6 +979,10 @@
{
"title": "Migrating a BashOperator for dbt",
"path": "/integrations/airlift/operator-migration/bash-operator-dbt"
+ },
+ {
+ "title": "Migrating a KubernetesPodOperator",
+ "path": "/integrations/airlift/operator-migration/kubernetes-pod-operator"
}
]
}
diff --git a/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx b/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx
new file mode 100644
index 0000000000000..54f9e3ac40287
--- /dev/null
+++ b/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx
@@ -0,0 +1,159 @@
+# Operator migration guides: Migrating usage of `KubernetesPodOperator`
+
+In this page, we'll explain migrating an Airflow `KubernetesPodOperator` to Dagster.
+
+### Background
+
+The KubernetesPodOperator in Apache Airflow enables users to execute containerized tasks within Kubernetes pods as part of their data pipelines.
+
+```python file=/integrations/airlift/operator_migration/kubernetes_pod_operator.py
+from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
+
+k8s_hello_world = KubernetesPodOperator(
+ task_id="hello_world_task",
+ name="hello-world-pod",
+ image="bash:latest",
+ cmds=["bash", "-cx"],
+ arguments=['echo "Hello World!"'],
+)
+```
+
+### Dagster equivalent
+
+The Dagster equivalent is to use the to execute a task within a Kubernetes pod.
+
+```python file=/integrations/airlift/operator_migration/using_k8s_pipes.py
+from dagster_k8s import PipesK8sClient
+
+from dagster import AssetExecutionContext, asset
+
+container_cfg = {
+ "name": "hello-world-pod",
+ "image": "bash:latest",
+ "command": ["bash", "-cx"],
+ "args": ['echo "Hello World!"'],
+}
+
+
+@asset
+def execute_hello_world_task(context: AssetExecutionContext):
+ return (
+ PipesK8sClient()
+ .run(
+ context=context.op_execution_context,
+ base_pod_meta={"name": "hello-world-pod"},
+ base_pod_spec={"containers": [container_cfg]},
+ )
+ .get_results()
+ )
+```
+
+### Migrating the operator
+
+Migrating the operator breaks down into a few steps:
+
+1. Ensure that your Dagster deployment has access to the Kubernetes cluster.
+2. Write an that executes the task within a Kubernetes pod using the .
+3. Use `dagster-airlift` to proxy execution of the original task to Dagster.
+
+### Step 1: Ensure access to the Kubernetes cluster
+
+First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The accepts `kubeconfig` and `kubecontext`, and `env` arguments to configure the Kubernetes client.
+
+Here's an example of what this might look like when configuring the client to access an EKS cluster:
+
+```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_client endbefore=end_client
+from dagster_k8s import PipesK8sClient
+
+eks_client = PipesK8sClient(
+ # The client will have automatic access to all
+ # environment variables in the execution context.
+ env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
+ kubeconfig_file="path/to/kubeconfig",
+ kube_context="my-eks-cluster",
+)
+```
+
+### Step 2: Writing an asset that executes the task within a Kubernetes pod
+
+Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the . In comparison to the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code.
+
+In the [parameter comparison](/integrations/airlift/kubernetes-pod-operator#parameter-comparison) section of this doc, you'll find a detailed comparison describing how to map the KubernetesPodOperator parameters to the PipesK8sClient parameters.
+
+```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_asset endbefore=end_asset
+from dagster import AssetExecutionContext, asset
+
+container_cfg = {
+ "name": "hello-world-pod",
+ "image": "bash:latest",
+ "command": ["bash", "-cx"],
+ "args": ['echo "Hello World!"'],
+}
+
+
+@asset
+def execute_hello_world_task(context: AssetExecutionContext):
+ return eks_client.run(
+ context=context.op_execution_context,
+ base_pod_meta={"name": "hello-world-pod"},
+ base_pod_spec={"containers": [container_cfg]},
+ ).get_results()
+```
+
+This is just a snippet of what the PipesK8sClient can do. Take a look at our full guide on the [dagster-k8s PipesK8sClient](/dagster-pipes/kubernetes) for more information.
+
+### Step 3: Using `dagster-airlift` to proxy execution
+
+Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process.
+
+### Parameter comparison
+
+Here's a comparison of the parameters between the KubernetesPodOperator and the PipesK8sClient: Directly supported arguments:
+
+- in_cluster (named load_incluster_config here)
+- cluster_context (named kube_context here)
+- config_file (named kubeconfig_file here)
+
+Many arguments are supported indirectly via the pod_spec argument.
+
+- volumes: Volumes to be used by the Pod (key `volumes`)
+- affinity: Node affinity/anti-affinity rules for the Pod (key `affinity`)
+- node_selector: Node selection constraints for the Pod (key `nodeSelector`)
+- hostnetwork: Enable host networking for the Pod (key `hostNetwork`)
+- dns_config: DNS settings for the Pod (key `dnsConfig`)
+- dnspolicy: DNS policy for the Pod (key `dnsPolicy`)
+- hostname: Hostname of the Pod (key `hostname`)
+- subdomain: Subdomain for the Pod (key `subdomain`)
+- schedulername: Scheduler to be used for the Pod (key `schedulerName`)
+- service_account_name: Service account to be used by the Pod (key `serviceAccountName`)
+- priority_class_name: Priority class for the Pod (key `priorityClassName`)
+- security_context: Security context for the entire Pod (key `securityContext`)
+- tolerations: Tolerations for the Pod (key `tolerations`)
+- image_pull_secrets: Secrets for pulling container images (key `imagePullSecrets`)
+- termination_grace_period: Grace period for Pod termination (key `terminationGracePeriodSeconds`)
+- active_deadline_seconds: Deadline for the Pod's execution (key `activeDeadlineSeconds`)
+- host_aliases: Additional entries for the Pod's /etc/hosts (key `hostAliases`)
+- init_containers: Initialization containers for the Pod (key `initContainers`)
+
+The following arguments are supported under the nested `containers` key of the pod_spec argument:
+
+- image: Docker image for the container (key 'image')
+- cmds: Entrypoint command for the container (key `command`)
+- arguments: Arguments for the entrypoint command (key `args`)
+- ports: List of ports to expose from the container (key `ports`)
+- volume_mounts: List of volume mounts for the container (key `volumeMounts`)
+- env_vars: Environment variables for the container (key `env`)
+- env_from: List of sources to populate environment variables (key `envFrom`)
+- image_pull_policy: Policy for pulling the container image (key `imagePullPolicy`)
+- container_resources: Resource requirements for the container (key `resources`)
+- container_security_context: Security context for the container (key `securityContext`)
+- termination_message_policy: Policy for the termination message (key `terminationMessagePolicy`)
+
+For a full list, see the [kubernetes container spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#container-v1-core). The following arguments are supported under the pod_metadata argument, which configures the metadata of the pod:
+
+- name: `name`
+- namespace: `namespace`
+- labels: `labels`
+- annotations: `annotations`
+
+For a full list, see the [kubernetes objectmeta spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#objectmeta-v1-meta).
diff --git a/docs/content/integrations/airlift/operator-migration/overview.mdx b/docs/content/integrations/airlift/operator-migration/overview.mdx
index 19ed7d5eb6a5d..dc3fde15f8998 100644
--- a/docs/content/integrations/airlift/operator-migration/overview.mdx
+++ b/docs/content/integrations/airlift/operator-migration/overview.mdx
@@ -9,4 +9,12 @@ This page contains a collection of reference materials for migrating usage of co
title="Bash Operator for dbt"
href="/integrations/airlift/operator-migration/bash-operator-dbt"
>
+
+
diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py
new file mode 100644
index 0000000000000..45f5c5ffc326a
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py
@@ -0,0 +1,35 @@
+AWS_CREDENTIALS = {}
+
+# start_client
+from dagster_k8s import PipesK8sClient
+
+eks_client = PipesK8sClient(
+ # The client will have automatic access to all
+ # environment variables in the execution context.
+ env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
+ kubeconfig_file="path/to/kubeconfig",
+ kube_context="my-eks-cluster",
+)
+# end_client
+
+# start_asset
+from dagster import AssetExecutionContext, asset
+
+container_cfg = {
+ "name": "hello-world-pod",
+ "image": "bash:latest",
+ "command": ["bash", "-cx"],
+ "args": ['echo "Hello World!"'],
+}
+
+
+@asset
+def execute_hello_world_task(context: AssetExecutionContext):
+ return eks_client.run(
+ context=context.op_execution_context,
+ base_pod_meta={"name": "hello-world-pod"},
+ base_pod_spec={"containers": [container_cfg]},
+ ).get_results()
+
+
+# end_asset
diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py
new file mode 100644
index 0000000000000..bde2a3e5b1a86
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py
@@ -0,0 +1,9 @@
+from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
+
+k8s_hello_world = KubernetesPodOperator(
+ task_id="hello_world_task",
+ name="hello-world-pod",
+ image="bash:latest",
+ cmds=["bash", "-cx"],
+ arguments=['echo "Hello World!"'],
+)
diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py
new file mode 100644
index 0000000000000..298851e9ff2e8
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py
@@ -0,0 +1,23 @@
+from dagster_k8s import PipesK8sClient
+
+from dagster import AssetExecutionContext, asset
+
+container_cfg = {
+ "name": "hello-world-pod",
+ "image": "bash:latest",
+ "command": ["bash", "-cx"],
+ "args": ['echo "Hello World!"'],
+}
+
+
+@asset
+def execute_hello_world_task(context: AssetExecutionContext):
+ return (
+ PipesK8sClient()
+ .run(
+ context=context.op_execution_context,
+ base_pod_meta={"name": "hello-world-pod"},
+ base_pod_spec={"containers": [container_cfg]},
+ )
+ .get_results()
+ )