From d385726c0048249a1b47763d30249d47fd25a1a9 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 9 Aug 2023 09:44:00 +0100 Subject: [PATCH] Remove aiopykube (#800) * Replace aiopykube and kubernetes-asyncio with kr8s when waiting for the scheduler * Delete aiopykube --- .github/workflows/operator.yaml | 2 - dask_kubernetes/aiopykube/__init__.py | 2 - dask_kubernetes/aiopykube/dask.py | 25 -- dask_kubernetes/aiopykube/mixins.py | 25 -- dask_kubernetes/aiopykube/objects.py | 248 ------------ dask_kubernetes/aiopykube/query.py | 73 ---- .../aiopykube/tests/test_pykube_objects.py | 39 -- dask_kubernetes/aiopykube/tests/test_query.py | 19 - dask_kubernetes/common/networking.py | 43 +- doc/source/aiopykube.rst | 379 ------------------ doc/source/index.rst | 1 - 11 files changed, 16 insertions(+), 840 deletions(-) delete mode 100644 dask_kubernetes/aiopykube/__init__.py delete mode 100644 dask_kubernetes/aiopykube/dask.py delete mode 100644 dask_kubernetes/aiopykube/mixins.py delete mode 100644 dask_kubernetes/aiopykube/objects.py delete mode 100644 dask_kubernetes/aiopykube/query.py delete mode 100644 dask_kubernetes/aiopykube/tests/test_pykube_objects.py delete mode 100644 dask_kubernetes/aiopykube/tests/test_query.py delete mode 100644 doc/source/aiopykube.rst diff --git a/.github/workflows/operator.yaml b/.github/workflows/operator.yaml index e87f42db9..5a8bb230f 100644 --- a/.github/workflows/operator.yaml +++ b/.github/workflows/operator.yaml @@ -7,7 +7,6 @@ on: - "ci/**" - "dask_kubernetes/operator/**" - "dask_kubernetes/common/**" - - "dask_kubernetes/aiopykube/**" - "dask_kubernetes/*" push: paths: @@ -16,7 +15,6 @@ on: - "ci/**" - "dask_kubernetes/operator/**" - "dask_kubernetes/common/**" - - "dask_kubernetes/aiopykube/**" - "dask_kubernetes/*" concurrency: diff --git a/dask_kubernetes/aiopykube/__init__.py b/dask_kubernetes/aiopykube/__init__.py deleted file mode 100644 index acc62b983..000000000 --- a/dask_kubernetes/aiopykube/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""An asyncio shim for pykube-ng.""" -from pykube import HTTPClient, KubeConfig diff --git a/dask_kubernetes/aiopykube/dask.py b/dask_kubernetes/aiopykube/dask.py deleted file mode 100644 index 80aec5455..000000000 --- a/dask_kubernetes/aiopykube/dask.py +++ /dev/null @@ -1,25 +0,0 @@ -from dask_kubernetes.aiopykube.objects import NamespacedAPIObject - - -class DaskCluster(NamespacedAPIObject): - version = "kubernetes.dask.org/v1" - endpoint = "daskclusters" - kind = "DaskCluster" - - -class DaskWorkerGroup(NamespacedAPIObject): - version = "kubernetes.dask.org/v1" - endpoint = "daskworkergroups" - kind = "DaskWorkerGroup" - - -class DaskAutoscaler(NamespacedAPIObject): - version = "kubernetes.dask.org/v1" - endpoint = "daskautoscalers" - kind = "DaskAutoscaler" - - -class DaskJob(NamespacedAPIObject): - version = "kubernetes.dask.org/v1" - endpoint = "daskjobs" - kind = "DaskJob" diff --git a/dask_kubernetes/aiopykube/mixins.py b/dask_kubernetes/aiopykube/mixins.py deleted file mode 100644 index e29ba0b5f..000000000 --- a/dask_kubernetes/aiopykube/mixins.py +++ /dev/null @@ -1,25 +0,0 @@ -import asyncio -import functools - -from pykube.mixins import ScalableMixin as _ScalableMixin - - -class AsyncMixin: - async def _sync(self, func, *args, **kwargs): - return await asyncio.get_event_loop().run_in_executor( - None, functools.partial(func, *args, **kwargs) - ) - - -class AsyncScalableMixin(_ScalableMixin): - async def scale(self, replicas=None): - count = self.scalable if replicas is None else replicas - await self.exists(ensure=True) - if self.scalable != count: - self.scalable = count - await self.update() - while True: - await self.reload() - if self.scalable == count: - break - await asyncio.sleep(1) diff --git a/dask_kubernetes/aiopykube/objects.py b/dask_kubernetes/aiopykube/objects.py deleted file mode 100644 index 00d65f121..000000000 --- a/dask_kubernetes/aiopykube/objects.py +++ /dev/null @@ -1,248 +0,0 @@ -from typing import Optional - -from requests import Response - - -from pykube.http import HTTPClient -from pykube.objects import ( - ObjectManager, - APIObject as _APIObject, - NamespacedAPIObject as _NamespacedAPIObject, - ConfigMap as _ConfigMap, - CronJob as _CronJob, - DaemonSet as _DaemonSet, - Deployment as _Deployment, - Endpoint as _Endpoint, - Event as _Event, - LimitRange as _LimitRange, - ResourceQuota as _ResourceQuota, - ServiceAccount as _ServiceAccount, - Ingress as _Ingress, - Job as _Job, - Namespace as _Namespace, - Node as _Node, - Pod as _Pod, - ReplicationController as _ReplicationController, - ReplicaSet as _ReplicaSet, - Secret as _Secret, - Service as _Service, - PersistentVolume as _PersistentVolume, - PersistentVolumeClaim as _PersistentVolumeClaim, - HorizontalPodAutoscaler as _HorizontalPodAutoscaler, - StatefulSet as _StatefulSet, - Role as _Role, - RoleBinding as _RoleBinding, - ClusterRole as _ClusterRole, - ClusterRoleBinding as _ClusterRoleBinding, - PodSecurityPolicy as _PodSecurityPolicy, - PodDisruptionBudget as _PodDisruptionBudget, - CustomResourceDefinition as _CustomResourceDefinition, -) -from dask_kubernetes.aiopykube.query import Query -from dask_kubernetes.aiopykube.mixins import AsyncScalableMixin, AsyncMixin - - -class AsyncObjectManager(ObjectManager): - def __call__(self, api: HTTPClient, namespace: str = None): - query = super().__call__(api=api, namespace=namespace) - return query._clone(Query) - - -class AsyncObjectMixin(AsyncMixin): - objects = AsyncObjectManager() - - async def exists(self, ensure=False): - return await self._sync(super().exists, ensure=ensure) - - async def create(self): - return await self._sync(super().create) - - async def reload(self): - return await self._sync(super().reload) - - def watch(self): - return super().watch() - - async def patch(self, strategic_merge_patch, *, subresource=None): - return await self._sync( - super().patch, strategic_merge_patch, subresource=subresource - ) - - async def update(self, is_strategic=True, *, subresource=None): - return await self._sync(super().update, is_strategic, subresource=subresource) - - async def delete(self, propagation_policy: str = None): - return await self._sync(super().delete, propagation_policy=propagation_policy) - - exists.__doc__ = _APIObject.exists.__doc__ - create.__doc__ = _APIObject.create.__doc__ - reload.__doc__ = _APIObject.reload.__doc__ - watch.__doc__ = _APIObject.watch.__doc__ - patch.__doc__ = _APIObject.patch.__doc__ - update.__doc__ = _APIObject.update.__doc__ - delete.__doc__ = _APIObject.delete.__doc__ - - -class APIObject(AsyncObjectMixin, _APIObject): - """APIObject.""" - - -class NamespacedAPIObject(AsyncObjectMixin, _NamespacedAPIObject): - """APIObject.""" - - -class ConfigMap(AsyncObjectMixin, _ConfigMap): - """ConfigMap.""" - - -class CronJob(AsyncObjectMixin, _CronJob): - """CronJob.""" - - -class DaemonSet(AsyncObjectMixin, _DaemonSet): - """DaemonSet.""" - - -class Deployment(AsyncScalableMixin, AsyncObjectMixin, _Deployment): - """Deployment.""" - - async def rollout_undo(self, target_revision=None): - return await self._sync(super().rollout_undo, target_revision) - - rollout_undo.__doc__ = _Deployment.rollout_undo.__doc__ - - -class Endpoint(AsyncObjectMixin, _Endpoint): - """Endpoint.""" - - -class Event(AsyncObjectMixin, _Event): - """Event.""" - - -class LimitRange(AsyncObjectMixin, _LimitRange): - """LimitRange.""" - - -class ResourceQuota(AsyncObjectMixin, _ResourceQuota): - """ResourceQuota.""" - - -class ServiceAccount(AsyncObjectMixin, _ServiceAccount): - """ServiceAccount.""" - - -class Ingress(AsyncObjectMixin, _Ingress): - """Ingress.""" - - -class Job(AsyncScalableMixin, AsyncObjectMixin, _Job): - """Job.""" - - -class Namespace(AsyncObjectMixin, _Namespace): - """Namespace.""" - - -class Node(AsyncObjectMixin, _Node): - """Node.""" - - -class Pod(AsyncObjectMixin, _Pod): - """Pod""" - - async def logs(self, *args, **kwargs): - return await self._sync(super().logs, *args, **kwargs) - - logs.__doc__ = _Pod.logs.__doc__ - - -class ReplicationController( - AsyncScalableMixin, AsyncObjectMixin, _ReplicationController -): - """ReplicationController.""" - - -class ReplicaSet(AsyncScalableMixin, AsyncObjectMixin, _ReplicaSet): - """ReplicaSet.""" - - -class Secret(AsyncObjectMixin, _Secret): - """Secret.""" - - -class Service(AsyncObjectMixin, _Service): - """Service.""" - - async def proxy_http_request(self, *args, **kwargs): - return await self._sync(super().proxy_http_request, *args, **kwargs) - - async def proxy_http_get( - self, path: str, port: Optional[int] = None, **kwargs - ) -> Response: - return await self.proxy_http_request("GET", path, port, **kwargs) - - async def proxy_http_post( - self, path: str, port: Optional[int] = None, **kwargs - ) -> Response: - return await self.proxy_http_request("POST", path, port, **kwargs) - - async def proxy_http_put( - self, path: str, port: Optional[int] = None, **kwargs - ) -> Response: - return await self.proxy_http_request("PUT", path, port, **kwargs) - - async def proxy_http_delete( - self, path: str, port: Optional[int] = None, **kwargs - ) -> Response: - return await self.proxy_http_request("DELETE", path, port, **kwargs) - - proxy_http_request.__doc__ = _Service.proxy_http_request.__doc__ - proxy_http_get.__doc__ = _Service.proxy_http_get.__doc__ - proxy_http_post.__doc__ = _Service.proxy_http_post.__doc__ - proxy_http_put.__doc__ = _Service.proxy_http_put.__doc__ - proxy_http_delete.__doc__ = _Service.proxy_http_delete.__doc__ - - -class PersistentVolume(AsyncObjectMixin, _PersistentVolume): - """PersistentVolume.""" - - -class PersistentVolumeClaim(AsyncObjectMixin, _PersistentVolumeClaim): - """PersistentVolumeClaim.""" - - -class HorizontalPodAutoscaler(AsyncObjectMixin, _HorizontalPodAutoscaler): - """HorizontalPodAutoscaler.""" - - -class StatefulSet(AsyncScalableMixin, AsyncObjectMixin, _StatefulSet): - """StatefulSet.""" - - -class Role(AsyncObjectMixin, _Role): - """Role.""" - - -class RoleBinding(AsyncObjectMixin, _RoleBinding): - """RoleBinding.""" - - -class ClusterRole(AsyncObjectMixin, _ClusterRole): - """ClusterRole.""" - - -class ClusterRoleBinding(AsyncObjectMixin, _ClusterRoleBinding): - """ClusterRoleBinding.""" - - -class PodSecurityPolicy(AsyncObjectMixin, _PodSecurityPolicy): - """PodSecurityPolicy.""" - - -class PodDisruptionBudget(AsyncObjectMixin, _PodDisruptionBudget): - """PodDisruptionBudget.""" - - -class CustomResourceDefinition(AsyncObjectMixin, _CustomResourceDefinition): - """CustomResourceDefinition.""" diff --git a/dask_kubernetes/aiopykube/query.py b/dask_kubernetes/aiopykube/query.py deleted file mode 100644 index c435fbee6..000000000 --- a/dask_kubernetes/aiopykube/query.py +++ /dev/null @@ -1,73 +0,0 @@ -"""An asyncio shim for pykube-ng.""" -from pykube.query import now, Table, Query as _Query, WatchQuery as _WatchQuery -from dask_kubernetes.aiopykube.mixins import AsyncMixin - - -class Query(_Query, AsyncMixin): - async def execute(self, **kwargs): - return await self._sync(super().execute, **kwargs) - - async def iterator(self): - response = await self.execute() - for obj in response.json().get("items") or []: - yield self.api_obj_class(self.api, obj) - - def __aiter__(self): - return self.iterator() - - async def get_by_name(self, name: str): - return await self._sync(super().get_by_name, name=name) - - async def get(self, *args, **kwargs): - return await self._sync(super().get, *args, **kwargs) - - async def get_or_none(self, *args, **kwargs): - return await self._sync(super().get_or_none, *args, **kwargs) - - async def as_table(self) -> Table: - response = await self.execute( - headers={"Accept": "application/json;as=Table;v=v1beta1;g=meta.k8s.io"} - ) - return Table(self.api_obj_class, response.json()) - - def watch(self, since=None, *, params=None): - query = self._clone(WatchQuery) - query.params = params - if since is now: - raise ValueError("now is not a supported since value in async version") - elif since is not None: - query.resource_version = since - return query - - @property - def query_cache(self): - raise NotImplementedError( - "Properties cannot make async HTTP requests to populate the cache. " - "Also the shim currently does not implement a cache at all." - ) - - @property - def response(self): - raise NotImplementedError( - "Properties cannot make HTTP requests. Use ``response = (await Query.execute()).json()`` instead." - ) - - def __len__(self): - raise TypeError( - "Cannot call len directly on async objects. " - "Instead you can use ``len([_ async for _ in Query(...)])``." - ) - - -class WatchQuery(_WatchQuery, AsyncMixin): - async def object_stream(self): - f = super().object_stream - object_stream = await self._sync(lambda: iter(f())) - while True: - try: - yield await self._sync(next, object_stream) - except StopIteration: - break - - def __aiter__(self): - return self.object_stream() diff --git a/dask_kubernetes/aiopykube/tests/test_pykube_objects.py b/dask_kubernetes/aiopykube/tests/test_pykube_objects.py deleted file mode 100644 index e1295c9c8..000000000 --- a/dask_kubernetes/aiopykube/tests/test_pykube_objects.py +++ /dev/null @@ -1,39 +0,0 @@ -import pytest - -import asyncio -import uuid - -from dask_kubernetes.aiopykube import HTTPClient, KubeConfig -from dask_kubernetes.aiopykube.objects import Pod - - -@pytest.mark.anyio -async def test_pod_create_and_delete(docker_image, k8s_cluster): - api = HTTPClient(KubeConfig.from_env()) - name = "test-" + uuid.uuid4().hex[:10] - pod = Pod( - api, - { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": name}, - "spec": { - "containers": [ - # Cannot use `gcr.io/google_containers/pause` as it's not available - # for arm64 - { - "name": "pause", - "image": docker_image, - "command": ["sleep", "1000"], - }, - ] - }, - }, - ) - - await pod.create() - assert await pod.exists() - await pod.delete() - while await pod.exists(): - await asyncio.sleep(0.1) - assert not await pod.exists() diff --git a/dask_kubernetes/aiopykube/tests/test_query.py b/dask_kubernetes/aiopykube/tests/test_query.py deleted file mode 100644 index cb428ebd5..000000000 --- a/dask_kubernetes/aiopykube/tests/test_query.py +++ /dev/null @@ -1,19 +0,0 @@ -import pytest - -from dask_kubernetes.aiopykube import HTTPClient, KubeConfig -from dask_kubernetes.aiopykube.query import Query -from dask_kubernetes.aiopykube.objects import Pod - - -@pytest.mark.anyio -async def test_pod_query(k8s_cluster): - api = HTTPClient(KubeConfig.from_env()) - async for pod in Query(api, Pod, namespace="kube-system"): - assert isinstance(pod, Pod) - - -@pytest.mark.anyio -async def test_pod_objects(k8s_cluster): - api = HTTPClient(KubeConfig.from_env()) - async for pod in Pod.objects(api).filter(namespace="kube-system"): - assert isinstance(pod, Pod) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index d29411214..c26d9ccd2 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -8,11 +8,11 @@ import kubernetes_asyncio as kubernetes from tornado.iostream import StreamClosedError +import kr8s +from kr8s.asyncio.objects import Pod from distributed.core import rpc from dask_kubernetes.common.utils import check_dependency -from dask_kubernetes.aiopykube.objects import Pod -from dask_kubernetes.aiopykube import HTTPClient, KubeConfig from dask_kubernetes.exceptions import CrashLoopBackOffError @@ -196,36 +196,25 @@ async def get_scheduler_address( async def wait_for_scheduler(cluster_name, namespace, timeout=None): pod_start_time = None - api = HTTPClient(KubeConfig.from_env()) while True: - async with kubernetes.client.api_client.ApiClient() as api_client: - k8s_api = kubernetes.client.CoreV1Api(api_client) - try: - [pod] = ( - await k8s_api.list_namespaced_pod( - namespace=namespace, - label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={cluster_name}", - ) - ).items - except ValueError: - await asyncio.sleep(0.25) - continue - pod = await Pod.objects(api, namespace=namespace).get_by_name(pod.metadata.name) - phase = pod.obj["status"]["phase"] - if phase == "Running": + try: + pod = await Pod.get( + label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={cluster_name}", + namespace=namespace, + ) + except kr8s.NotFoundError: + await asyncio.sleep(0.25) + continue + if pod.status.phase == "Running": if not pod_start_time: pod_start_time = time.time() - conditions = { - c["type"]: c["status"] for c in pod.obj["status"]["conditions"] - } - if "Ready" in conditions and conditions["Ready"] == "True": + if await pod.ready(): return - if "containerStatuses" in pod.obj["status"]: - for container in pod.obj["status"]["containerStatuses"]: + if "containerStatuses" in pod.status: + for container in pod.status.containerStatuses: if ( - "waiting" in container["state"] - and container["state"]["waiting"]["reason"] - == "CrashLoopBackOff" + "waiting" in container.state + and container.state.waiting.reason == "CrashLoopBackOff" and timeout and pod_start_time + timeout < time.time() ): diff --git a/doc/source/aiopykube.rst b/doc/source/aiopykube.rst deleted file mode 100644 index c84cbcd02..000000000 --- a/doc/source/aiopykube.rst +++ /dev/null @@ -1,379 +0,0 @@ -aiopykube -========= - -Throughout the lifetime of this library we have used almost every Python Kubernetes library available. -They all seem to have their own drawbacks but my favourite for clean code and readibility is `pykube-ng `_. - -However, ``pykube`` is built on ``requests`` and doesn't natively support ``asyncio`` which we heavily use in Dask. To make it a little easier to work with we have a -``dask_kubernetes.aiopykube`` submodule which provides an async shim that has the same API as ``pykube`` and runs all IO calls in a threadpool executor. - -The shim also includes a ``dask_kubernetes.aiopykube.dask`` submodule with custom objects to represent :doc:`Dask custom resources `. - -Examples --------- - -Iterate over Pods -^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - from dask_kubernetes.aiopykube import HTTPClient, KubeConfig - from dask_kubernetes.aiopykube.objects import Pod - - api = HTTPClient(KubeConfig.from_env()) - - async for pod in Pod.objects(namespace="default"): - # Do something - -Create a new Pod -^^^^^^^^^^^^^^^^ - -.. code-block:: python - - from dask_kubernetes.aiopykube import HTTPClient, KubeConfig - from dask_kubernetes.aiopykube.objects import Pod - - api = HTTPClient(KubeConfig.from_env()) - - pod = Pod( - api, - { - "apiVersion": "v1", - "kind": "Pod", - "metadata": {"name": name}, - "spec": { - "containers": [ - {"name": "pause", "image": "gcr.io/google_containers/pause"} - ] - }, - }, - ) - - await pod.create() - -Scale an existing deployment -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - from dask_kubernetes.aiopykube import HTTPClient, KubeConfig - from dask_kubernetes.aiopykube.objects import Deployment - - api = HTTPClient(KubeConfig.from_env()) - - deployment = await Deployment.objects(api).get_by_name("mydeployment") - await deployment.scale(5) - -Delete a DaskCluster custom resource -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. code-block:: python - - from dask_kubernetes.aiopykube import HTTPClient, KubeConfig - from dask_kubernetes.aiopykube.dask import DaskCluster - - api = HTTPClient(KubeConfig.from_env()) - cluster = await DaskCluster.objects(api, namespace=namespace).get_by_name("mycluster") - await cluster.delete() - -API ---- - -dask_kubernetes.aiopykube.query -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. autosummary:: - dask_kubernetes.aiopykube.query.Query - dask_kubernetes.aiopykube.query.WatchQuery - -Query -***** - -.. autoclass:: dask_kubernetes.aiopykube.query.Query - :members: - -WatchQuery -********** - -.. autoclass:: dask_kubernetes.aiopykube.query.WatchQuery - :members: - -dask_kubernetes.aiopykube.dask -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. autosummary:: - dask_kubernetes.aiopykube.dask.DaskCluster - dask_kubernetes.aiopykube.dask.DaskWorkerGroup - dask_kubernetes.aiopykube.dask.DaskJob - dask_kubernetes.aiopykube.dask.DaskAutoscaler - -DaskCluster -*********** - -.. autoclass:: dask_kubernetes.aiopykube.dask.DaskCluster - :members: - -DaskWorkerGroup -*************** - -.. autoclass:: dask_kubernetes.aiopykube.dask.DaskWorkerGroup - :members: - -DaskJob -******* - -.. autoclass:: dask_kubernetes.aiopykube.dask.DaskJob - :members: - -DaskAutoscaler -************** - -.. autoclass:: dask_kubernetes.aiopykube.dask.DaskAutoscaler - :members: - -dask_kubernetes.aiopykube.objects -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. autosummary:: - dask_kubernetes.aiopykube.objects.ConfigMap - dask_kubernetes.aiopykube.objects.CronJob - dask_kubernetes.aiopykube.objects.DaemonSet - dask_kubernetes.aiopykube.objects.Deployment - dask_kubernetes.aiopykube.objects.Endpoint - dask_kubernetes.aiopykube.objects.Event - dask_kubernetes.aiopykube.objects.LimitRange - dask_kubernetes.aiopykube.objects.ResourceQuota - dask_kubernetes.aiopykube.objects.ServiceAccount - dask_kubernetes.aiopykube.objects.Ingress - dask_kubernetes.aiopykube.objects.Job - dask_kubernetes.aiopykube.objects.Namespace - dask_kubernetes.aiopykube.objects.Node - dask_kubernetes.aiopykube.objects.Pod - dask_kubernetes.aiopykube.objects.ReplicationController - dask_kubernetes.aiopykube.objects.ReplicaSet - dask_kubernetes.aiopykube.objects.Secret - dask_kubernetes.aiopykube.objects.Service - dask_kubernetes.aiopykube.objects.PersistentVolume - dask_kubernetes.aiopykube.objects.PersistentVolumeClaim - dask_kubernetes.aiopykube.objects.HorizontalPodAutoscaler - dask_kubernetes.aiopykube.objects.StatefulSet - dask_kubernetes.aiopykube.objects.Role - dask_kubernetes.aiopykube.objects.RoleBinding - dask_kubernetes.aiopykube.objects.ClusterRole - dask_kubernetes.aiopykube.objects.ClusterRoleBinding - dask_kubernetes.aiopykube.objects.PodSecurityPolicy - dask_kubernetes.aiopykube.objects.PodDisruptionBudget - dask_kubernetes.aiopykube.objects.CustomResourceDefinition - -ConfigMap -********* - -.. autoclass:: dask_kubernetes.aiopykube.objects.ConfigMap - :members: - -CronJob -******* - -.. autoclass:: dask_kubernetes.aiopykube.objects.CronJob - :members: - -DaemonSet -********* - -.. autoclass:: dask_kubernetes.aiopykube.objects.DaemonSet - :members: - -Deployment -********** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Deployment - :members: - -Endpoint -******** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Endpoint - :members: - -Event -***** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Event - :members: - -LimitRange -********** - -.. autoclass:: dask_kubernetes.aiopykube.objects.LimitRange - :members: - -ResourceQuota -************* - -.. autoclass:: dask_kubernetes.aiopykube.objects.ResourceQuota - :members: - -ServiceAccount -************** - -.. autoclass:: dask_kubernetes.aiopykube.objects.ServiceAccount - :members: - -Ingress -******* - -.. autoclass:: dask_kubernetes.aiopykube.objects.Ingress - :members: - -Job -*** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Job - :members: - -Namespace -********* - -.. autoclass:: dask_kubernetes.aiopykube.objects.Namespace - :members: - -Node -**** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Node - :members: - -Pod -*** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Pod - :members: - -ReplicationController -********************* - -.. autoclass:: dask_kubernetes.aiopykube.objects.ReplicationController - :members: - -ReplicaSet -********** - -.. autoclass:: dask_kubernetes.aiopykube.objects.ReplicaSet - :members: - -Secret -****** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Secret - :members: - -Service -******* - -.. autoclass:: dask_kubernetes.aiopykube.objects.Service - :members: - -PersistentVolume -**************** - -.. autoclass:: dask_kubernetes.aiopykube.objects.PersistentVolume - :members: - -PersistentVolumeClaim -********************* - -.. autoclass:: dask_kubernetes.aiopykube.objects.PersistentVolumeClaim - :members: - -HorizontalPodAutoscaler -*********************** - -.. autoclass:: dask_kubernetes.aiopykube.objects.HorizontalPodAutoscaler - :members: - -StatefulSet -*********** - -.. autoclass:: dask_kubernetes.aiopykube.objects.StatefulSet - :members: - -Role -**** - -.. autoclass:: dask_kubernetes.aiopykube.objects.Role - :members: - -RoleBinding -*********** - -.. autoclass:: dask_kubernetes.aiopykube.objects.RoleBinding - :members: - -ClusterRole -*********** - -.. autoclass:: dask_kubernetes.aiopykube.objects.ClusterRole - :members: - -ClusterRoleBinding -****************** - -.. autoclass:: dask_kubernetes.aiopykube.objects.ClusterRoleBinding - :members: - -PodSecurityPolicy -***************** - -.. autoclass:: dask_kubernetes.aiopykube.objects.PodSecurityPolicy - :members: - -PodDisruptionBudget -******************* - -.. autoclass:: dask_kubernetes.aiopykube.objects.PodDisruptionBudget - :members: - -CustomResourceDefinition -************************ - -.. autoclass:: dask_kubernetes.aiopykube.objects.CustomResourceDefinition - :members: - -FAQ ---- - -Why roll our own wrapper? -^^^^^^^^^^^^^^^^^^^^^^^^^ - -There does appear to be an ``aiopykube`` package on PyPI but it hasn't been updated for a long time and doesn't link to any source on GitHub or other source repository. -This probably fills the same role but we're apprehensive to depend on it in this state. - -Why not release this is a separate package? -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -In theory we could pull the implementation out of ``dask-kubernetes`` into another dependency. If there is demand from the community to do this we could definitely consider it. - -Why not use ``kubernetes``, ``kubernetes-asyncio``, ``aiokubernetes``, etc? -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The most popular Kubernetes libraries for Python are swagger generated SDKs based on the Kubernetes API spec. -This results in libraries that do not feel Pythonic and only have complex reference documentation without any explaination, how-to or tutorial content. - -It's true that ``pykube`` is also a little lacking in documentation, however the code is simple enough and written by a human so code-spelunking isn't too unpleasant. - -Why use a threadpool executor instead of ``aiohttp``? -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The way ``pykube`` leverages ``requests`` has a lot of status checking and error handling (which is great). However, this would mean rewriting far more of the code in our shim if we -were to switch out the underlying IO library. To try and keep the shim as lightweight as possible we've simply wrapped all methods that make blocking IO calls in calls to ``run_in_executor``. - -If we were to ever spin this shim out into a separate package we would probably advocate for a deeper rewrite using ``aiohttp`` instead of ``requests``. - -Why are some methods not implemented? -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -There are a couple of design decisions in ``pykube`` which have limited how much of it can be converted to ``asyncio``. -There are a few properties, such as ``pukube.query.Query.query_cache``, that make HTTP calls. Using any kind of IO in a property or dunder like ``__len__`` is generally frowned upon -and therefore there is no ``asyncio`` way to make or call a property asynchronously. - -Instead of changing the API we've set these to raise ``NotImplementedError`` with useful exceptions that suggest an alternative way to achieve the same thing. diff --git a/doc/source/index.rst b/doc/source/index.rst index 789b6298d..7ea56d129 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -101,5 +101,4 @@ and have the cluster running. You can then use it to manage scaling and retrieve testing releasing - aiopykube history