From 92714da5785709726f85c4c6ec92451f5c23ad04 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 11 Sep 2023 04:14:42 -0400 Subject: [PATCH] Add type hints to KubeCluster constructor (#813) --- .../operator/kubecluster/kubecluster.py | 85 +++++++++---------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 3c76a070f..900387560 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -2,48 +2,43 @@ import asyncio import atexit -from contextlib import suppress -from enum import Enum import getpass import logging import os import time -from typing import ClassVar +import uuid import warnings import weakref -import uuid +from contextlib import suppress +from enum import Enum +from typing import ClassVar, Dict, List, Optional -from rich import box -from rich.live import Live -from rich.table import Table -from rich.console import Group -from rich.panel import Panel -from rich.spinner import Spinner +import dask.config import httpx import kr8s -from kr8s.asyncio.objects import Pod, Service import yaml - -import dask.config from distributed.core import Status, rpc from distributed.deploy import Cluster -from distributed.utils import ( - Log, - Logs, - TimeoutError, - format_dashboard_link, -) +from distributed.utils import Log, Logs, TimeoutError, format_dashboard_link +from kr8s.asyncio.objects import Pod, Service +from rich import box +from rich.console import Group +from rich.live import Live +from rich.panel import Panel +from rich.spinner import Spinner +from rich.table import Table +from tornado.ioloop import IOLoop -from dask_kubernetes.operator.networking import ( - get_scheduler_address, - wait_for_scheduler, - wait_for_scheduler_comm, -) from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError from dask_kubernetes.operator._objects import ( + DaskAutoscaler, DaskCluster, DaskWorkerGroup, - DaskAutoscaler, +) +from dask_kubernetes.operator.networking import ( + get_scheduler_address, + wait_for_scheduler, + wait_for_scheduler_comm, ) logger = logging.getLogger(__name__) @@ -90,7 +85,7 @@ class KubeCluster(Cluster): be the port you are forwarding to ````. create_mode: CreateMode (optional) How to handle cluster creation if the cluster resource already exists. - Default behaviour is to create a new clustser if one with that name + Default behavior is to create a new cluster if one with that name doesn't exist, or connect to an existing one if it does. You can also set ``CreateMode.CREATE_ONLY`` to raise an exception if a cluster with that name already exists. Or ``CreateMode.CONNECT_ONLY`` to raise an exception @@ -160,24 +155,24 @@ class KubeCluster(Cluster): def __init__( self, *, - name=None, - namespace=None, - image=None, - n_workers=None, - resources=None, - env=None, - worker_command=None, - port_forward_cluster_ip=None, - create_mode=None, - shutdown_on_close=None, - idle_timeout=None, - resource_timeout=None, - scheduler_service_type=None, - custom_cluster_spec=None, - scheduler_forward_port=None, - jupyter=False, - loop=None, - asynchronous=False, + name: Optional[str] = None, + namespace: Optional[str] = None, + image: Optional[str] = None, + n_workers: Optional[int] = None, + resources: Optional[Dict[str, str]] = None, + env: Optional[List[dict] | Dict[str, str]] = None, + worker_command: Optional[List[str]] = None, + port_forward_cluster_ip: Optional[bool] = None, + create_mode: Optional[CreateMode] = None, + shutdown_on_close: Optional[bool] = None, + idle_timeout: Optional[int] = None, + resource_timeout: Optional[int] = None, + scheduler_service_type: Optional[str] = None, + custom_cluster_spec: Optional[str | dict] = None, + scheduler_forward_port: Optional[int] = None, + jupyter: bool = False, + loop: Optional[IOLoop] = None, + asynchronous: bool = False, **kwargs, ): name = dask.config.get("kubernetes.name", override_with=name) @@ -258,7 +253,7 @@ def __init__( ) self._instances.add(self) self._rich_spinner = Spinner("dots", speed=0.5) - self._startup_component_status = {} + self._startup_component_status: dict = {} super().__init__(name=name, loop=loop, asynchronous=asynchronous, **kwargs)