Skip to content

Commit

Permalink
Revert "Remove unused aiopykube (#795)"
Browse files Browse the repository at this point in the history
This reverts commit 67bde4b.
  • Loading branch information
jacobtomlinson committed Aug 4, 2023
1 parent 8016cb4 commit bcd3ac5
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
wait_for_scheduler_comm,
)
from dask_kubernetes.common.utils import get_current_namespace
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.dask import (
DaskCluster as AIODaskCluster,
)
from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError
from dask_kubernetes.operator._objects import (
DaskCluster,
Expand Down Expand Up @@ -258,6 +262,7 @@ def __init__(
name = name.format(
user=getpass.getuser(), uuid=str(uuid.uuid4())[:10], **os.environ
)
self.k8s_api = HTTPClient(KubeConfig.from_env())
self._instances.add(self)
self._rich_spinner = Spinner("dots", speed=0.5)
self._startup_component_status = {}
Expand Down Expand Up @@ -479,9 +484,15 @@ async def _get_scheduler_address(self):
async def _wait_for_controller(self):
"""Wait for the operator to set the status.phase."""
start = time.time()
cluster = await DaskCluster.get(self.name, namespace=self.namespace)
while start + self._resource_timeout > time.time():
if await cluster.ready():
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
if (
"status" in cluster.obj
and "phase" in cluster.obj["status"]
and cluster.obj["status"]["phase"] == "Running"
):
return
await asyncio.sleep(0.25)
raise TimeoutError(
Expand Down

0 comments on commit bcd3ac5

Please sign in to comment.