diff --git a/README.md b/README.md index 6dfafc29..de05e3cf 100644 --- a/README.md +++ b/README.md @@ -430,7 +430,7 @@ If your Prometheus monitors multiple clusters we require the label you defined f For example, if your cluster has the Prometheus label `cluster: "my-cluster-name"`, then run this command: ```sh -krr.py simple --prometheus-label cluster -l my-cluster-name +krr.py simple --prometheus-cluster-key cluster -l my-cluster-name ``` You may also need the `-p` flag to explicitly give Prometheus' URL. diff --git a/examples/custom_strategy.py b/examples/custom_strategy.py index f9e281d1..05735e57 100644 --- a/examples/custom_strategy.py +++ b/examples/custom_strategy.py @@ -3,7 +3,7 @@ import pydantic as pd import robusta_krr -from robusta_krr.api.models import K8sObjectData, MetricsPodData, ResourceRecommendation, ResourceType, RunResult +from robusta_krr.api.models import K8sWorkload, MetricsPodData, ResourceRecommendation, ResourceType, RunResult from robusta_krr.api.strategies import BaseStrategy, StrategySettings from robusta_krr.core.integrations.prometheus.metrics import MaxMemoryLoader, PercentileCPULoader @@ -24,7 +24,7 @@ class CustomStrategy(BaseStrategy[CustomStrategySettings]): rich_console = True # Whether to use rich console for the CLI metrics = [PercentileCPULoader(90), MaxMemoryLoader] # The metrics to use for the strategy - def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult: + def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult: return { ResourceType.CPU: ResourceRecommendation(request=self.settings.param_1, limit=None), ResourceType.Memory: ResourceRecommendation(request=self.settings.param_2, limit=self.settings.param_2), diff --git a/robusta_krr/api/models.py b/robusta_krr/api/models.py index 3a453ce9..e61f01ef 100644 --- a/robusta_krr/api/models.py +++ b/robusta_krr/api/models.py @@ -1,6 +1,6 @@ from robusta_krr.core.abstract.strategies import MetricsPodData, PodsTimeData, ResourceRecommendation, RunResult from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType -from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.objects import K8sWorkload, PodData from robusta_krr.core.models.result import ResourceScan, Result from robusta_krr.core.models.severity import Severity, register_severity_calculator @@ -8,7 +8,7 @@ "ResourceType", "ResourceAllocations", "RecommendationValue", - "K8sObjectData", + "K8sWorkload", "PodData", "Result", "Severity", diff --git a/robusta_krr/core/abstract/cluster_loader.py b/robusta_krr/core/abstract/cluster_loader.py new file mode 100644 index 00000000..86767d1a --- /dev/null +++ b/robusta_krr/core/abstract/cluster_loader.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import abc +import logging +from typing import Optional, TYPE_CHECKING + +from .workload_loader import BaseWorkloadLoader + +if TYPE_CHECKING: + from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector + + +logger = logging.getLogger("krr") + + +class BaseClusterLoader(abc.ABC): + """ + A class that wraps loading data from multiple clusters. + For example, a centralized prometheus server that can query multiple clusters. + Or one kubeconfig can define connections to multiple clusters. + """ + + @abc.abstractmethod + async def list_clusters(self) -> Optional[list[str]]: + pass + + @abc.abstractmethod + def get_workload_loader(self, cluster: Optional[str]) -> BaseWorkloadLoader: + pass + + def try_get_workload_loader(self, cluster: Optional[str]) -> Optional[BaseWorkloadLoader]: + try: + return self.get_workload_loader(cluster) + except Exception as e: + logger.error(f"Could not connect to cluster {cluster} and will skip it: {e}") + return None + + @abc.abstractmethod + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + """ + Connect to a Prometheus server and return a PrometheusConnector instance. + Cluster = None means that prometheus is the only one: either centralized or in-cluster. + raise prometrix.PrometheusNotFound if Prometheus is not available. + """ + + pass diff --git a/robusta_krr/core/abstract/metrics.py b/robusta_krr/core/abstract/metrics.py index 3b6f19c5..e1151ea6 100644 --- a/robusta_krr/core/abstract/metrics.py +++ b/robusta_krr/core/abstract/metrics.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod from robusta_krr.core.abstract.strategies import PodsTimeData -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload class BaseMetric(ABC): @@ -16,6 +16,6 @@ class BaseMetric(ABC): @abstractmethod async def load_data( - self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + self, object: K8sWorkload, period: datetime.timedelta, step: datetime.timedelta ) -> PodsTimeData: ... diff --git a/robusta_krr/core/abstract/strategies.py b/robusta_krr/core/abstract/strategies.py index 0005a227..2d1ea9be 100644 --- a/robusta_krr/core/abstract/strategies.py +++ b/robusta_krr/core/abstract/strategies.py @@ -9,15 +9,15 @@ import pydantic as pd from numpy.typing import NDArray -from robusta_krr.core.models.result import K8sObjectData, ResourceType +from robusta_krr.core.models.result import K8sWorkload, ResourceType if TYPE_CHECKING: - from robusta_krr.core.abstract.metrics import BaseMetric # noqa: F401 from robusta_krr.core.integrations.prometheus.metrics import PrometheusMetric SelfRR = TypeVar("SelfRR", bound="ResourceRecommendation") +# TODO: rename so it isn't the same name as ResourceRecommendation in result.py class ResourceRecommendation(pd.BaseModel): """A class to represent resource recommendation with optional request and limit values. @@ -133,7 +133,7 @@ def description(self) -> Optional[str]: # Abstract method that needs to be implemented by subclass. # This method is intended to calculate resource recommendation based on history data and kubernetes object data. @abc.abstractmethod - def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult: + def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult: pass # This method is intended to return a strategy by its name. @@ -167,6 +167,6 @@ def get_settings_type(cls) -> type[StrategySettings]: "StrategySettings", "PodsTimeData", "MetricsPodData", - "K8sObjectData", + "K8sWorkload", "ResourceType", ] diff --git a/robusta_krr/core/abstract/workload_loader.py b/robusta_krr/core/abstract/workload_loader.py new file mode 100644 index 00000000..42860eef --- /dev/null +++ b/robusta_krr/core/abstract/workload_loader.py @@ -0,0 +1,23 @@ +import abc +import logging + +from robusta_krr.core.models.objects import K8sWorkload, PodData + + +logger = logging.getLogger("krr") + + +class BaseWorkloadLoader(abc.ABC): + """A base class for single cluster workload loaders.""" + + @abc.abstractmethod + async def list_workloads(self) -> list[K8sWorkload]: + pass + + +class IListPodsFallback(abc.ABC): + """This is an interface that a workload loader can implement to have a fallback method to list pods.""" + + @abc.abstractmethod + async def load_pods(self, object: K8sWorkload) -> list[PodData]: + pass diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py deleted file mode 100644 index a772a5c2..00000000 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ /dev/null @@ -1,543 +0,0 @@ -import asyncio -import logging -from collections import defaultdict -from concurrent.futures import ThreadPoolExecutor -from typing import Any, Awaitable, Callable, Iterable, Optional, Union - -from kubernetes import client, config # type: ignore -from kubernetes.client import ApiException -from kubernetes.client.models import ( - V1Container, - V1DaemonSet, - V1Deployment, - V1Job, - V1Pod, - V1PodList, - V1StatefulSet, - V2HorizontalPodAutoscaler, -) - -from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import HPAData, K8sObjectData, KindLiteral, PodData -from robusta_krr.core.models.result import ResourceAllocations -from robusta_krr.utils.object_like_dict import ObjectLikeDict - -from . import config_patch as _ - -logger = logging.getLogger("krr") - -AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] -HPAKey = tuple[str, str, str] - - -class ClusterLoader: - def __init__(self, cluster: Optional[str]=None): - self.cluster = cluster - # This executor will be running requests to Kubernetes API - self.executor = ThreadPoolExecutor(settings.max_workers) - self.api_client = settings.get_kube_client(cluster) - self.apps = client.AppsV1Api(api_client=self.api_client) - self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) - self.batch = client.BatchV1Api(api_client=self.api_client) - self.core = client.CoreV1Api(api_client=self.api_client) - self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) - self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) - - self.__kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) - - self.__jobs_for_cronjobs: dict[str, list[V1Job]] = {} - self.__jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) - - async def list_scannable_objects(self) -> list[K8sObjectData]: - """List all scannable objects. - - Returns: - A list of scannable objects. - """ - - logger.info(f"Listing scannable objects in {self.cluster}") - logger.debug(f"Namespaces: {settings.namespaces}") - logger.debug(f"Resources: {settings.resources}") - - self.__hpa_list = await self._try_list_hpa() - workload_object_lists = await asyncio.gather( - self._list_deployments(), - self._list_rollouts(), - self._list_deploymentconfig(), - self._list_all_statefulsets(), - self._list_all_daemon_set(), - self._list_all_jobs(), - self._list_all_cronjobs(), - ) - - return [ - object - for workload_objects in workload_object_lists - for object in workload_objects - # NOTE: By default we will filter out kube-system namespace - if not (settings.namespaces == "*" and object.namespace == "kube-system") - ] - - async def _list_jobs_for_cronjobs(self, namespace: str) -> list[V1Job]: - if namespace not in self.__jobs_for_cronjobs: - loop = asyncio.get_running_loop() - - async with self.__jobs_loading_locks[namespace]: - logging.debug(f"Loading jobs for cronjobs in {namespace}") - ret = await loop.run_in_executor( - self.executor, - lambda: self.batch.list_namespaced_job(namespace=namespace), - ) - self.__jobs_for_cronjobs[namespace] = ret.items - - return self.__jobs_for_cronjobs[namespace] - - async def list_pods(self, object: K8sObjectData) -> list[PodData]: - loop = asyncio.get_running_loop() - - if object.kind == "CronJob": - namespace_jobs = await self._list_jobs_for_cronjobs(object.namespace) - ownered_jobs_uids = [ - job.metadata.uid - for job in namespace_jobs - if any( - owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid - for owner in job.metadata.owner_references or [] - ) - ] - selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" - - else: - if object.selector is None: - return [] - - selector = self._build_selector_query(object.selector) - if selector is None: - return [] - - ret: V1PodList = await loop.run_in_executor( - self.executor, - lambda: self.core.list_namespaced_pod( - namespace=object._api_resource.metadata.namespace, label_selector=selector - ), - ) - - return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] - - @staticmethod - def _get_match_expression_filter(expression) -> str: - if expression.operator.lower() == "exists": - return expression.key - elif expression.operator.lower() == "doesnotexist": - return f"!{expression.key}" - - values = ",".join(expression.values) - return f"{expression.key} {expression.operator} ({values})" - - @staticmethod - def _build_selector_query(selector: Any) -> Union[str, None]: - label_filters = [] - - if selector.match_labels is not None: - label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] - - if selector.match_expressions is not None: - label_filters += [ - ClusterLoader._get_match_expression_filter(expression) for expression in selector.match_expressions - ] - - if label_filters == []: - # NOTE: This might mean that we have DeploymentConfig, - # which uses ReplicationController and it has a dict like matchLabels - if len(selector) != 0: - label_filters += [f"{label[0]}={label[1]}" for label in selector.items()] - else: - return None - - return ",".join(label_filters) - - def __build_scannable_object( - self, item: AnyKubernetesAPIObject, container: V1Container, kind: Optional[str] = None - ) -> K8sObjectData: - name = item.metadata.name - namespace = item.metadata.namespace - kind = kind or item.__class__.__name__[2:] - - obj = K8sObjectData( - cluster=self.cluster, - namespace=namespace, - name=name, - kind=kind, - container=container.name, - allocations=ResourceAllocations.from_container(container), - hpa=self.__hpa_list.get((namespace, kind, name)), - ) - obj._api_resource = item - return obj - - def _should_list_resource(self, resource: str) -> bool: - if settings.resources == "*": - return True - return resource in settings.resources - - async def _list_namespaced_or_global_objects( - self, - kind: KindLiteral, - all_namespaces_request: Callable, - namespaced_request: Callable - ) -> list[Any]: - logger.debug(f"Listing {kind}s in {self.cluster}") - loop = asyncio.get_running_loop() - - if settings.namespaces == "*": - requests = [ - loop.run_in_executor( - self.executor, - lambda: all_namespaces_request( - watch=False, - label_selector=settings.selector, - ), - ) - ] - else: - requests = [ - loop.run_in_executor( - self.executor, - lambda ns=namespace: namespaced_request( - namespace=ns, - watch=False, - label_selector=settings.selector, - ), - ) - for namespace in settings.namespaces - ] - - result = [ - item - for request_result in await asyncio.gather(*requests) - for item in request_result.items - ] - - logger.debug(f"Found {len(result)} {kind} in {self.cluster}") - return result - - async def _list_scannable_objects( - self, - kind: KindLiteral, - all_namespaces_request: Callable, - namespaced_request: Callable, - extract_containers: Callable[[Any], Union[Iterable[V1Container], Awaitable[Iterable[V1Container]]]], - filter_workflows: Optional[Callable[[Any], bool]] = None, - ) -> list[K8sObjectData]: - if not self._should_list_resource(kind): - logger.debug(f"Skipping {kind}s in {self.cluster}") - return - - if not self.__kind_available[kind]: - return - - result = [] - try: - for item in await self._list_namespaced_or_global_objects(kind, all_namespaces_request, namespaced_request): - if filter_workflows is not None and not filter_workflows(item): - continue - - containers = extract_containers(item) - if asyncio.iscoroutine(containers): - containers = await containers - - result.extend(self.__build_scannable_object(item, container, kind) for container in containers) - except ApiException as e: - if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: - if self.__kind_available[kind]: - logger.debug(f"{kind} API not available in {self.cluster}") - self.__kind_available[kind] = False - else: - logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") - logger.error("Will skip this object type and continue.") - - return result - - def _list_deployments(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="Deployment", - all_namespaces_request=self.apps.list_deployment_for_all_namespaces, - namespaced_request=self.apps.list_namespaced_deployment, - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_rollouts(self) -> list[K8sObjectData]: - async def _extract_containers(item: Any) -> list[V1Container]: - if item.spec.template is not None: - return item.spec.template.spec.containers - - loop = asyncio.get_running_loop() - - logging.debug( - f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}" - ) - - # Template can be None and object might have workloadRef - workloadRef = item.spec.workloadRef - if workloadRef is not None: - ret = await loop.run_in_executor( - self.executor, - lambda: self.apps.read_namespaced_deployment( - namespace=item.metadata.namespace, name=workloadRef.name - ), - ) - return ret.spec.template.spec.containers - - return [] - - # NOTE: Using custom objects API returns dicts, but all other APIs return objects - # We need to handle this difference using a small wrapper - return self._list_scannable_objects( - kind="Rollout", - all_namespaces_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_cluster_custom_object( - group="argoproj.io", - version="v1alpha1", - plural="rollouts", - **kwargs, - ) - ), - namespaced_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_namespaced_custom_object( - group="argoproj.io", - version="v1alpha1", - plural="rollouts", - **kwargs, - ) - ), - extract_containers=_extract_containers, - ) - - def _list_deploymentconfig(self) -> list[K8sObjectData]: - # NOTE: Using custom objects API returns dicts, but all other APIs return objects - # We need to handle this difference using a small wrapper - return self._list_scannable_objects( - kind="DeploymentConfig", - all_namespaces_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_cluster_custom_object( - group="apps.openshift.io", - version="v1", - plural="deploymentconfigs", - **kwargs, - ) - ), - namespaced_request=lambda **kwargs: ObjectLikeDict( - self.custom_objects.list_namespaced_custom_object( - group="apps.openshift.io", - version="v1", - plural="deploymentconfigs", - **kwargs, - ) - ), - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_all_statefulsets(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="StatefulSet", - all_namespaces_request=self.apps.list_stateful_set_for_all_namespaces, - namespaced_request=self.apps.list_namespaced_stateful_set, - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_all_daemon_set(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="DaemonSet", - all_namespaces_request=self.apps.list_daemon_set_for_all_namespaces, - namespaced_request=self.apps.list_namespaced_daemon_set, - extract_containers=lambda item: item.spec.template.spec.containers, - ) - - def _list_all_jobs(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="Job", - all_namespaces_request=self.batch.list_job_for_all_namespaces, - namespaced_request=self.batch.list_namespaced_job, - extract_containers=lambda item: item.spec.template.spec.containers, - # NOTE: If the job has ownerReference and it is a CronJob, then we should skip it - filter_workflows=lambda item: not any( - owner.kind == "CronJob" for owner in item.metadata.owner_references or [] - ), - ) - - def _list_all_cronjobs(self) -> list[K8sObjectData]: - return self._list_scannable_objects( - kind="CronJob", - all_namespaces_request=self.batch.list_cron_job_for_all_namespaces, - namespaced_request=self.batch.list_namespaced_cron_job, - extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers, - ) - - async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: - loop = asyncio.get_running_loop() - res = await loop.run_in_executor( - self.executor, - lambda: self._list_namespaced_or_global_objects( - kind="HPA-v1", - all_namespaces_request=self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces, - namespaced_request=self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler, - ), - ) - return { - ( - hpa.metadata.namespace, - hpa.spec.scale_target_ref.kind, - hpa.spec.scale_target_ref.name, - ): HPAData( - min_replicas=hpa.spec.min_replicas, - max_replicas=hpa.spec.max_replicas, - current_replicas=hpa.status.current_replicas, - desired_replicas=hpa.status.desired_replicas, - target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, - target_memory_utilization_percentage=None, - ) - async for hpa in res - } - - async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: - res = await self._list_namespaced_or_global_objects( - kind="HPA-v2", - all_namespaces_request=self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces, - namespaced_request=self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler, - ) - def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: - return next( - ( - metric.resource.target.average_utilization - for metric in hpa.spec.metrics - if metric.type == "Resource" and metric.resource.name == metric_name - ), - None, - ) - return { - ( - hpa.metadata.namespace, - hpa.spec.scale_target_ref.kind, - hpa.spec.scale_target_ref.name, - ): HPAData( - min_replicas=hpa.spec.min_replicas, - max_replicas=hpa.spec.max_replicas, - current_replicas=hpa.status.current_replicas, - desired_replicas=hpa.status.desired_replicas, - target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), - target_memory_utilization_percentage=__get_metric(hpa, "memory"), - ) - for hpa in res - } - - # TODO: What should we do in case of other metrics bound to the HPA? - async def __list_hpa(self) -> dict[HPAKey, HPAData]: - """List all HPA objects in the cluster. - - Returns: - dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name). - """ - - try: - # Try to use V2 API first - return await self.__list_hpa_v2() - except ApiException as e: - if e.status != 404: - # If the error is other than not found, then re-raise it. - raise - - # If V2 API does not exist, fall back to V1 - return await self.__list_hpa_v1() - - async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: - try: - return await self.__list_hpa() - except Exception as e: - logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") - logger.error( - "Will assume that there are no HPA. " - "Be careful as this may lead to inaccurate results if object actually has HPA." - ) - return {} - - -class KubernetesLoader: - def __init__(self) -> None: - self._cluster_loaders: dict[Optional[str], ClusterLoader] = {} - - async def list_clusters(self) -> Optional[list[str]]: - """List all clusters. - - Returns: - A list of clusters. - """ - - if settings.inside_cluster: - logger.debug("Working inside the cluster") - return None - - try: - contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) - except config.ConfigException: - if settings.clusters is not None and settings.clusters != "*": - logger.warning("Could not load context from kubeconfig.") - logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") - return settings.clusters - else: - logger.error( - "Could not load context from kubeconfig. " - "Please check your kubeconfig file or pass -c flag with the context name." - ) - return None - - logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") - logger.debug(f"Current cluster: {current_context['name']}") - - logger.debug(f"Configured clusters: {settings.clusters}") - - # None, empty means current cluster - if not settings.clusters: - return [current_context["name"]] - - # * means all clusters - if settings.clusters == "*": - return [context["name"] for context in contexts] - - return [context["name"] for context in contexts if context["name"] in settings.clusters] - - def _try_create_cluster_loader(self, cluster: Optional[str]) -> Optional[ClusterLoader]: - try: - return ClusterLoader(cluster=cluster) - except Exception as e: - logger.error(f"Could not load cluster {cluster} and will skip it: {e}") - return None - - async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: - """List all scannable objects. - - Yields: - Each scannable object as it is loaded. - """ - if clusters is None: - _cluster_loaders = [self._try_create_cluster_loader(None)] - else: - _cluster_loaders = [self._try_create_cluster_loader(cluster) for cluster in clusters] - - self.cluster_loaders = {cl.cluster: cl for cl in _cluster_loaders if cl is not None} - if self.cluster_loaders == {}: - logger.error("Could not load any cluster.") - return - - return [ - object - for cluster_loader in self.cluster_loaders.values() - for object in await cluster_loader.list_scannable_objects() - ] - - async def load_pods(self, object: K8sObjectData) -> list[PodData]: - try: - cluster_loader = self.cluster_loaders[object.cluster] - except KeyError: - raise RuntimeError(f"Cluster loader for cluster {object.cluster} not found") from None - - return await cluster_loader.list_pods(object) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py new file mode 100644 index 00000000..1af3de93 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/__init__.py @@ -0,0 +1,344 @@ +from __future__ import annotations + +import asyncio +import logging +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Awaitable, Callable, Optional + +from kubernetes import client, config # type: ignore +from kubernetes.client import ApiException # type: ignore +from kubernetes.client.models import V1Container, V2HorizontalPodAutoscaler # type: ignore +from functools import cache + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.exceptions import CriticalRunnerException +from robusta_krr.core.models.objects import HPAData, HPAKey, K8sWorkload, KindLiteral, PodData +from robusta_krr.core.models.result import ResourceAllocations + + +from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader, IListPodsFallback +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader +from .loaders import ( + BaseKindLoader, + CronJobLoader, + DaemonSetLoader, + DeploymentConfigLoader, + DeploymentLoader, + JobLoader, + RolloutLoader, + StatefulSetLoader, +) + +logger = logging.getLogger("krr") + + +class KubeAPIClusterLoader(BaseClusterLoader): + # NOTE: For KubeAPIClusterLoader we have to first connect to read kubeconfig + # We do not need to connect to Prometheus from here, as we query all data from Kubernetes API + # Also here we might have different Prometeus instances for different clusters + + def __init__(self) -> None: + try: + settings.load_kubeconfig() + except Exception as e: + logger.error(f"Could not load kubernetes configuration: {e.__class__.__name__}\n{e}") + logger.error("Try to explicitly set --context and/or --kubeconfig flags.") + logger.error("Alternatively, try a prometheus-only mode with `--mode prometheus`") + raise CriticalRunnerException("Could not load kubernetes configuration") from e + + self._prometheus_connectors: dict[Optional[str], PrometheusConnector] = {} + + async def list_clusters(self) -> Optional[list[str]]: + if settings.inside_cluster: + logger.debug("Working inside the cluster") + return None + + try: + contexts, current_context = config.list_kube_config_contexts(settings.kubeconfig) + except config.ConfigException: + if settings.clusters is not None and settings.clusters != "*": + logger.warning("Could not load context from kubeconfig.") + logger.warning(f"Falling back to clusters from CLI: {settings.clusters}") + return settings.clusters + else: + logger.error( + "Could not load context from kubeconfig. " + "Please check your kubeconfig file or pass -c flag with the context name." + ) + return None + + logger.debug(f"Found {len(contexts)} clusters: {', '.join([context['name'] for context in contexts])}") + logger.debug(f"Current cluster: {current_context['name']}") + logger.debug(f"Configured clusters: {settings.clusters}") + + # None, empty means current cluster + if not settings.clusters: + return [current_context["name"]] + + # * means all clusters + if settings.clusters == "*": + return [context["name"] for context in contexts] + + return [context["name"] for context in contexts if context["name"] in settings.clusters] + + @cache + def get_workload_loader(self, cluster: Optional[str]) -> KubeAPIWorkloadLoader: + return KubeAPIWorkloadLoader(cluster) + + @cache + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + connector = PrometheusConnector(cluster=cluster) + if settings.prometheus_url is not None: + logger.info(f"Connecting to Prometheus using URL: {settings.prometheus_url}") + connector.connect(settings.prometheus_url) + else: + logger.info(f"Trying to discover PromQL service" + (f" for cluster {cluster}" if cluster else "")) + connector.discover(api_client=settings.get_kube_client(cluster)) + + return connector + + +class KubeAPIWorkloadLoader(BaseWorkloadLoader, IListPodsFallback): + kind_loaders: list[BaseKindLoader] = [ + DeploymentLoader, + RolloutLoader, + DeploymentConfigLoader, + StatefulSetLoader, + DaemonSetLoader, + JobLoader, + CronJobLoader, + ] + + def __init__(self, cluster: Optional[str]) -> None: + self.cluster = cluster + + # This executor will be running requests to Kubernetes API + self.executor = ThreadPoolExecutor(settings.max_workers) + self.api_client = settings.get_kube_client(cluster) + + self.autoscaling_v1 = client.AutoscalingV1Api(api_client=self.api_client) + self.autoscaling_v2 = client.AutoscalingV2Api(api_client=self.api_client) + + self._kind_available: defaultdict[KindLiteral, bool] = defaultdict(lambda: True) + self._hpa_list: dict[HPAKey, HPAData] = {} + self._workload_loaders: dict[KindLiteral, BaseKindLoader] = { + loader.kind: loader(self.api_client, self.executor) for loader in self.kind_loaders + } + + async def list_workloads(self) -> list[K8sWorkload]: + """List all scannable objects. + + Returns: + A list of scannable objects. + """ + + logger.info(f"Listing scannable objects in {self.cluster}") + logger.debug(f"Namespaces: {settings.namespaces}") + logger.debug(f"Resources: {settings.resources}") + + self._hpa_list = await self._try_list_hpa() + workload_object_lists = await asyncio.gather( + *[ + self._fetch_workload(loader) + for loader in self._workload_loaders.values() + if self._should_list_resource(loader.kind) + ] + ) + + return [ + object + for workload_objects in workload_object_lists + for object in workload_objects + # NOTE: By default we will filter out kube-system namespace + if not (settings.namespaces == "*" and object.namespace == "kube-system") + ] + + async def load_pods(self, object: K8sWorkload) -> list[PodData]: + return await self._workload_loaders[object.kind].list_pods(object) + + def _build_scannable_object(self, item: Any, container: V1Container, kind: Optional[str] = None) -> K8sWorkload: + name = item.metadata.name + namespace = item.metadata.namespace + kind = kind or item.__class__.__name__[2:] + + obj = K8sWorkload( + cluster=self.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container.name, + allocations=ResourceAllocations.from_container(container), + hpa=self._hpa_list.get(HPAKey(namespace=namespace, kind=kind, name=name)), + ) + obj._api_resource = item + return obj + + def _should_list_resource(self, resource: str) -> bool: + if settings.resources == "*": + return True + return resource in settings.resources + + async def _list_namespaced_or_global_objects( + self, + kind: KindLiteral, + all_namespaces_request: Callable[..., Awaitable[Any]], + namespaced_request: Callable[..., Awaitable[Any]], + ) -> list[Any]: + logger.debug(f"Listing {kind}s in {self.cluster}") + + if settings.namespaces == "*": + requests = [ + all_namespaces_request( + label_selector=settings.selector, + ) + ] + else: + requests = [ + namespaced_request( + namespace=namespace, + label_selector=settings.selector, + ) + for namespace in settings.namespaces + ] + + result = [item for request_result in await asyncio.gather(*requests) for item in request_result.items] + + logger.debug(f"Found {len(result)} {kind}" + (f" for cluster {self.cluster}" if self.cluster else "")) + return result + + async def _fetch_workload(self, loader: BaseKindLoader) -> list[K8sWorkload]: + kind = loader.kind + + if not self._should_list_resource(kind): + logger.debug(f"Skipping {kind}s" + (f" for cluster {self.cluster}" if self.cluster else "")) + return + + if not self._kind_available[kind]: + return + + result = [] + try: + for item in await self._list_namespaced_or_global_objects( + kind, loader.all_namespaces_request_async, loader.namespaced_request_async + ): + if not loader.filter(item): + continue + + containers = await loader.extract_containers(item) + if asyncio.iscoroutine(containers): + containers = await containers + + result.extend(self._build_scannable_object(item, container, kind) for container in containers) + except ApiException as e: + if kind in ("Rollout", "DeploymentConfig") and e.status in [400, 401, 403, 404]: + if self._kind_available[kind]: + logger.debug(f"{kind} API not available in {self.cluster}") + self._kind_available[kind] = False + else: + logger.exception(f"Error {e.status} listing {kind} in cluster {self.cluster}: {e.reason}") + logger.error("Will skip this object type and continue.") + + return result + + async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: + loop = asyncio.get_running_loop() + res = await self._list_namespaced_or_global_objects( + kind="HPA-v1", + all_namespaces_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v1.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), + ), + namespaced_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v1.list_namespaced_horizontal_pod_autoscaler(**kwargs), + ), + ) + + return { + HPAKey( + namespace=hpa.metadata.namespace, + kind=hpa.spec.scale_target_ref.kind, + name=hpa.spec.scale_target_ref.name, + ): HPAData( + min_replicas=hpa.spec.min_replicas, + max_replicas=hpa.spec.max_replicas, + target_cpu_utilization_percentage=hpa.spec.target_cpu_utilization_percentage, + target_memory_utilization_percentage=None, + ) + async for hpa in res + } + + async def __list_hpa_v2(self) -> dict[HPAKey, HPAData]: + loop = asyncio.get_running_loop() + + res = await self._list_namespaced_or_global_objects( + kind="HPA-v2", + all_namespaces_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v2.list_horizontal_pod_autoscaler_for_all_namespaces(**kwargs), + ), + namespaced_request=lambda **kwargs: loop.run_in_executor( + self.executor, + lambda: self.autoscaling_v2.list_namespaced_horizontal_pod_autoscaler(**kwargs), + ), + ) + + def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[float]: + return next( + ( + metric.resource.target.average_utilization + for metric in hpa.spec.metrics + if metric.type == "Resource" and metric.resource.name == metric_name + ), + None, + ) + + return { + HPAKey( + namespace=hpa.metadata.namespace, + kind=hpa.spec.scale_target_ref.kind, + name=hpa.spec.scale_target_ref.name, + ): HPAData( + min_replicas=hpa.spec.min_replicas, + max_replicas=hpa.spec.max_replicas, + target_cpu_utilization_percentage=__get_metric(hpa, "cpu"), + target_memory_utilization_percentage=__get_metric(hpa, "memory"), + ) + for hpa in res + } + + # TODO: What should we do in case of other metrics bound to the HPA? + async def __list_hpa(self) -> dict[HPAKey, HPAData]: + """List all HPA objects in the cluster. + + Returns: + dict[tuple[str, str], HPAData]: A dictionary of HPA objects, indexed by scaleTargetRef (kind, name). + """ + + try: + # Try to use V2 API first + return await self.__list_hpa_v2() + except ApiException as e: + if e.status != 404: + # If the error is other than not found, then re-raise it. + raise + + # If V2 API does not exist, fall back to V1 + return await self.__list_hpa_v1() + + async def _try_list_hpa(self) -> dict[HPAKey, HPAData]: + try: + return await self.__list_hpa() + except Exception as e: + logger.exception(f"Error trying to list hpa in cluster {self.cluster}: {e}") + logger.error( + "Will assume that there are no HPA. " + "Be careful as this may lead to inaccurate results if object actually has HPA." + ) + return {} + + +__all__ = ["KubeAPIWorkloadLoader", "KubeAPIClusterLoader"] diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py new file mode 100644 index 00000000..6ca6efd6 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/__init__.py @@ -0,0 +1,19 @@ +from .base import BaseKindLoader +from .cronjobs import CronJobLoader +from .daemonsets import DaemonSetLoader +from .deploymentconfigs import DeploymentConfigLoader +from .deployments import DeploymentLoader +from .jobs import JobLoader +from .rollouts import RolloutLoader +from .statefulsets import StatefulSetLoader + +__all__ = [ + "BaseKindLoader", + "CronJobLoader", + "DeploymentLoader", + "DaemonSetLoader", + "DeploymentConfigLoader", + "JobLoader", + "RolloutLoader", + "StatefulSetLoader", +] \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py new file mode 100644 index 00000000..6430a6df --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/base.py @@ -0,0 +1,121 @@ +import abc +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Iterable, Optional, Union + +from kubernetes import client # type: ignore +from kubernetes.client.api_client import ApiClient # type: ignore +from kubernetes.client.models import V1Container, V1PodList # type: ignore + +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +logger = logging.getLogger("krr") + +HPAKey = tuple[str, str, str] + + +class BaseKindLoader(abc.ABC): + """ + This class is used to define how to load a specific kind of Kubernetes object. + It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. + """ + + kind: KindLiteral + + def __init__(self, api_client: Optional[ApiClient], executor: ThreadPoolExecutor) -> None: + self.executor = executor + self.api_client = api_client + self.apps = client.AppsV1Api(api_client=self.api_client) + self.custom_objects = client.CustomObjectsApi(api_client=self.api_client) + self.batch = client.BatchV1Api(api_client=self.api_client) + self.core = client.CoreV1Api(api_client=self.api_client) + + @abc.abstractmethod + def all_namespaces_request(self, label_selector: str) -> Any: + pass + + async def all_namespaces_request_async(self, label_selector: str) -> Any: + """Default async implementation executes the request in a thread pool.""" + + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.all_namespaces_request( + label_selector=label_selector, + ), + ) + + @abc.abstractmethod + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + pass + + async def namespaced_request_async(self, namespace: str, label_selector: str) -> Any: + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self.executor, + lambda: self.namespaced_request( + namespace=namespace, + label_selector=label_selector, + ), + ) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + return item.spec.template.spec.containers + + def filter(self, item: Any) -> bool: + return True + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + loop = asyncio.get_running_loop() + + if object.selector is None: + return [] + + selector = self._build_selector_query(object.selector) + if selector is None: + return [] + + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + + @classmethod + def _get_match_expression_filter(cls, expression: Any) -> str: + if expression.operator.lower() == "exists": + return expression.key + elif expression.operator.lower() == "doesnotexist": + return f"!{expression.key}" + + values = ",".join(expression.values) + return f"{expression.key} {expression.operator} ({values})" + + @classmethod + def _build_selector_query(cls, selector: Any) -> Union[str, None]: + label_filters = [] + + if selector.match_labels is not None: + label_filters += [f"{label[0]}={label[1]}" for label in selector.match_labels.items()] + + # normally the kubernetes API client renames matchLabels to match_labels in python + # but for CRDs like ArgoRollouts that renaming doesn't happen + if getattr(selector, "matchLabels", None): + label_filters += [f"{label[0]}={label[1]}" for label in getattr(selector, "matchLabels").items()] + + if selector.match_expressions is not None: + label_filters += [cls._get_match_expression_filter(expression) for expression in selector.match_expressions] + + if label_filters == []: + # NOTE: This might mean that we have DeploymentConfig, + # which uses ReplicationController and it has a dict like matchLabels + if len(selector) != 0: + label_filters += [f"{label[0]}={label[1]}" for label in selector.items()] + else: + return None + + return ",".join(label_filters) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py new file mode 100644 index 00000000..d999c650 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/cronjobs.py @@ -0,0 +1,68 @@ +import asyncio +from collections import defaultdict +import logging +from typing import Any, Iterable + +from kubernetes.client.models import V1Container, V1Job, V1PodList # type: ignore + +from robusta_krr.core.models.objects import K8sWorkload, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class CronJobLoader(BaseKindLoader): + kind = "CronJob" + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + self._jobs: dict[str, list[V1Job]] = {} + self._jobs_loading_locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock) + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.batch.list_cron_job_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.batch.list_namespaced_cron_job(namespace=namespace, label_selector=label_selector, watch=False) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + return item.spec.job_template.spec.template.spec.containers + + async def list_pods(self, object: K8sWorkload) -> list[PodData]: + loop = asyncio.get_running_loop() + + namespace_jobs = await self._list_jobs(object.namespace) + ownered_jobs_uids = [ + job.metadata.uid + for job in namespace_jobs + if any( + owner.kind == "CronJob" and owner.uid == object._api_resource.metadata.uid + for owner in job.metadata.owner_references or [] + ) + ] + selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" + + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object._api_resource.metadata.namespace, label_selector=selector + ), + ) + + return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items] + + async def _list_jobs(self, namespace: str) -> list[V1Job]: + if namespace not in self._jobs: + loop = asyncio.get_running_loop() + + async with self._jobs_loading_locks[namespace]: + logging.debug(f"Loading jobs for cronjobs in {namespace}") + ret = await loop.run_in_executor( + self.executor, + lambda: self.batch.list_namespaced_job(namespace=namespace), + ) + self._jobs[namespace] = ret.items + + return self._jobs[namespace] diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py new file mode 100644 index 00000000..1005bed2 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/daemonsets.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class DaemonSetLoader(BaseKindLoader): + kind = "DaemonSet" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_daemon_set_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_daemon_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py new file mode 100644 index 00000000..33b64d95 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deploymentconfigs.py @@ -0,0 +1,38 @@ +import logging +from typing import Any + +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class DeploymentConfigLoader(BaseKindLoader): + kind = "DeploymentConfig" + + # NOTE: Using custom objects API returns dicts, but all other APIs return objects + # We need to handle this difference using a small wrapper + + def all_namespaces_request(self, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_cluster_custom_object( + group="apps.openshift.io", + version="v1", + plural="deploymentconfigs", + label_selector=label_selector, + watch=False, + ) + ) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_namespaced_custom_object( + group="apps.openshift.io", + version="v1", + plural="deploymentconfigs", + namespace=namespace, + label_selector=label_selector, + watch=False, + ) + ) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py new file mode 100644 index 00000000..fc202b57 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/deployments.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class DeploymentLoader(BaseKindLoader): + kind = "Deployment" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_deployment_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_deployment(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py new file mode 100644 index 00000000..87ce5a77 --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/jobs.py @@ -0,0 +1,18 @@ +from typing import Any + +from .base import BaseKindLoader + + +class JobLoader(BaseKindLoader): + kind = "Job" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.batch.list_job_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.batch.list_namespaced_job(namespace=namespace, label_selector=label_selector, watch=False) + + def filter(self, item: Any) -> bool: + # NOTE: If the job has ownerReference and it is a CronJob, + # then we should skip it, as it is a part of the CronJob + return not any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []) diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py new file mode 100644 index 00000000..0148745a --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/rollouts.py @@ -0,0 +1,61 @@ +import asyncio +import logging +from typing import Any, Iterable + +from kubernetes.client.models import V1Container # type: ignore + +from robusta_krr.utils.object_like_dict import ObjectLikeDict + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class RolloutLoader(BaseKindLoader): + kind = "Rollout" + + # NOTE: Using custom objects API returns dicts, but all other APIs return objects + # We need to handle this difference using a small wrapper + + def all_namespaces_request(self, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_cluster_custom_object( + group="argoproj.io", + version="v1alpha1", + plural="rollouts", + label_selector=label_selector, + watch=False, + ) + ) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return ObjectLikeDict( + self.custom_objects.list_namespaced_custom_object( + group="argoproj.io", + version="v1alpha1", + plural="rollouts", + namespace=namespace, + label_selector=label_selector, + watch=False, + ) + ) + + async def extract_containers(self, item: Any) -> Iterable[V1Container]: + if item.spec.template is not None: + return item.spec.template.spec.containers + + logging.debug( + f"Rollout has workloadRef, fetching template for {item.metadata.name} in {item.metadata.namespace}" + ) + + # Template can be None and object might have workloadRef + workloadRef = item.spec.workloadRef + if workloadRef is not None: + loop = asyncio.get_running_loop() + ret = await loop.run_in_executor( + self.executor, + lambda: self.apps.read_namespaced_deployment(namespace=item.metadata.namespace, name=workloadRef.name), + ) + return ret.spec.template.spec.containers + + return [] diff --git a/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py new file mode 100644 index 00000000..069f2c6b --- /dev/null +++ b/robusta_krr/core/integrations/kubernetes/cluster_loader/loaders/statefulsets.py @@ -0,0 +1,13 @@ +from typing import Any + +from .base import BaseKindLoader + + +class StatefulSetLoader(BaseKindLoader): + kind = "StatefulSet" + + def all_namespaces_request(self, label_selector: str) -> Any: + return self.apps.list_stateful_set_for_all_namespaces(label_selector=label_selector, watch=False) + + def namespaced_request(self, namespace: str, label_selector: str) -> Any: + return self.apps.list_namespaced_stateful_set(namespace=namespace, label_selector=label_selector, watch=False) diff --git a/robusta_krr/core/integrations/prometheus/__init__.py b/robusta_krr/core/integrations/prometheus/__init__.py index cedf1c0b..1ce9d2ac 100644 --- a/robusta_krr/core/integrations/prometheus/__init__.py +++ b/robusta_krr/core/integrations/prometheus/__init__.py @@ -1,3 +1,3 @@ -from .loader import PrometheusMetricsLoader +from .connector import PrometheusConnector from .metrics_service.prometheus_metrics_service import PrometheusDiscovery, PrometheusNotFound from .prometheus_utils import ClusterNotSpecifiedException diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py new file mode 100644 index 00000000..3cc6b04f --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/__init__.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import asyncio +import itertools +import logging + +from collections import Counter, defaultdict + +from typing import Optional +from functools import cache + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics.base import PrometheusMetric +from robusta_krr.core.models.config import settings +from robusta_krr.core.models.objects import HPAData, HPAKey, K8sWorkload +from robusta_krr.core.abstract.workload_loader import BaseWorkloadLoader +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader +from robusta_krr.core.models.exceptions import CriticalRunnerException +from .loaders import BaseKindLoader, DoubleParentLoader, SimpleParentLoader + + +logger = logging.getLogger("krr") + + +class PrometheusClusterLoader(BaseClusterLoader): + # NOTE: For PrometheusClusterLoader we have to first connect to Prometheus, as we query all data from it + + def __init__(self) -> None: + self.prometheus = PrometheusConnector() + if not settings.prometheus_url: + raise CriticalRunnerException( + "Prometheus URL is not provided. " + "Can not auto-discover Prometheus with `--mode prometheus`. " + "Please provide the URL with `--prometheus-url` flag." + ) + + self.prometheus.connect(settings.prometheus_url) + + async def list_clusters(self) -> Optional[list[str]]: + if settings.prometheus_label is None: + logger.info("Assuming that Prometheus contains only one cluster.") + logger.info("If you have multiple clusters in Prometheus, please provide the `-l` flag.") + return None + + clusters = await self.prometheus.loader.query( + f""" + avg by({settings.prometheus_label}) ( + kube_pod_container_resource_limits + ) + """ + ) + + return [cluster["metric"][settings.prometheus_label] for cluster in clusters] + + @cache + def get_workload_loader(self, cluster: str) -> PrometheusWorkloadLoader: + return PrometheusWorkloadLoader(cluster, self.prometheus) + + def get_prometheus(self, cluster: Optional[str]) -> PrometheusConnector: + # NOTE: With prometheus workload loader we can only have one Prometheus provided in parameters + # so in case of multiple clusters in one Prometheus (centralized version) + # for each cluster we will have the same PrometheusConnector (keyed by None) + return self.prometheus + + +class PrometheusWorkloadLoader(BaseWorkloadLoader): + workloads: list[type[BaseKindLoader]] = [DoubleParentLoader, SimpleParentLoader] + + def __init__(self, cluster: str, prometheus: PrometheusConnector) -> None: + self.cluster = cluster + self.prometheus = prometheus + self.loaders = [loader(cluster, prometheus) for loader in self.workloads] + + async def list_workloads(self) -> list[K8sWorkload]: + workloads = list( + itertools.chain( + *await asyncio.gather(*[loader.list_workloads(settings.namespaces) for loader in self.loaders]) + ) + ) + + hpas = await self.__list_hpa() + + for workload in workloads: + workload.hpa = hpas.get( + HPAKey( + namespace=workload.namespace, + kind=workload.kind, + name=workload.name, + ) + ) + + kind_counts = Counter([workload.kind for workload in workloads]) + for kind, count in kind_counts.items(): + logger.info(f"Found {count} {kind} in {self.cluster}") + + return workloads + + async def __list_hpa(self) -> dict[HPAKey, HPAData]: + cluster_selector = f"{settings.prometheus_label}={self.cluster}" if settings.prometheus_label else "" + + hpa_metrics, max_replicas, min_replicas, target_metrics = await asyncio.gather( + self.prometheus.loader.query("kube_horizontalpodautoscaler_info" + cluster_selector), + self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_max_replicas" + cluster_selector), + self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_min_replicas" + cluster_selector), + self.prometheus.loader.query("kube_horizontalpodautoscaler_spec_target_metric" + cluster_selector), + ) + + max_replicas_dict = { + (metric["metric"]["namespace"], metric["metric"]["horizontalpodautoscaler"]): metric["value"][1] + for metric in max_replicas + } + min_replicas_dict = { + (metric["metric"]["namespace"], metric["metric"]["horizontalpodautoscaler"]): metric["value"][1] + for metric in min_replicas + } + target_metric_dict = defaultdict(dict) + for metric in target_metrics: + target_metric_dict[(metric["metric"]["namespace"], metric["metric"]["horizontalpodautoscaler"])] |= { + metric["metric"]["metric_name"]: metric["value"][1] + } + + hpas = {} + if not hpa_metrics: + return {} + + for hpa in hpa_metrics: + metric = hpa["metric"] + hpa_name = metric["horizontalpodautoscaler"] + key = HPAKey( + namespace=metric["namespace"], + kind=metric["scaletargetref_kind"], + name=metric["scaletargetref_name"], + ) + + max_replicas_value = max_replicas_dict[metric["namespace"], hpa_name] + min_replicas_value = min_replicas_dict[metric["namespace"], hpa_name] + cpu_utilization = target_metric_dict[metric["namespace"], hpa_name].get("cpu") + memory_utilization = target_metric_dict[metric["namespace"], hpa_name].get("memory") + + hpas[key] = HPAData( + min_replicas=max_replicas_value, + max_replicas=min_replicas_value, + target_cpu_utilization_percentage=cpu_utilization, + target_memory_utilization_percentage=memory_utilization, + ) + + return hpas + + +__all__ = ["PrometheusClusterLoader", "PrometheusWorkloadLoader"] diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py new file mode 100644 index 00000000..18b9a3d3 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/__init__.py @@ -0,0 +1,9 @@ +from .base import BaseKindLoader +from .double_parent import DoubleParentLoader +from .simple_parent import SimpleParentLoader + +__all__ = [ + "BaseKindLoader", + "DoubleParentLoader", + "SimpleParentLoader", +] \ No newline at end of file diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py new file mode 100644 index 00000000..cc91a3f8 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/base.py @@ -0,0 +1,107 @@ +import abc +import logging + +from typing import Literal, Union + +from kubernetes.client.models import ( # type: ignore + V1DaemonSet, + V1Deployment, + V1Job, + V1Pod, + V1StatefulSet, +) +from robusta_krr.core.models.config import settings + +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral + +logger = logging.getLogger("krr") + +AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job] +HPAKey = tuple[str, str, str] + + +class BaseKindLoader(abc.ABC): + """ + This class is used to define how to load a specific kind of Kubernetes object. + It does not load the objects itself, but is used by the `KubeAPIWorkloadLoader` to load objects. + """ + + kinds: list[KindLiteral] = [] + + def __init__(self, cluster: str, prometheus: PrometheusConnector) -> None: + self.cluster = cluster + self.prometheus = prometheus + + @property + def kinds_to_scan(self) -> list[KindLiteral]: + return [kind for kind in self.kinds if kind in settings.resources] if settings.resources != "*" else self.kinds + + @property + def cluster_selector(self) -> str: + if settings.prometheus_label is not None: + return f'{settings.prometheus_cluster_label}="{settings.prometheus_label}",' + + if settings.prometheus_cluster_label is None: + return "" + + return f'{settings.prometheus_cluster_label}="{self.cluster}",' if self.cluster else "" + + @abc.abstractmethod + def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + pass + + async def _parse_allocation(self, namespace: str, pods: list[str], container_name: str) -> ResourceAllocations: + limits = await self.prometheus.loader.query( + f""" + avg by(resource) ( + kube_pod_container_resource_limits{{ + {self.cluster_selector} + namespace="{namespace}", + pod=~"{'|'.join(pods)}", + container="{container_name}" + }} + ) + """ + ) + requests = await self.prometheus.loader.query( + f""" + avg by(resource) ( + kube_pod_container_resource_requests{{ + {self.cluster_selector} + namespace="{namespace}", + pod=~"{'|'.join(pods)}", + container="{container_name}" + }} + ) + """ + ) + requests_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + limits_values: dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + for limit in limits: + if limit["metric"]["resource"] == ResourceType.CPU: + limits_values[ResourceType.CPU] = float(limit["value"][1]) + elif limit["metric"]["resource"] == ResourceType.Memory: + limits_values[ResourceType.Memory] = float(limit["value"][1]) + + for request in requests: + if request["metric"]["resource"] == ResourceType.CPU: + requests_values[ResourceType.CPU] = float(request["value"][1]) + elif request["metric"]["resource"] == ResourceType.Memory: + requests_values[ResourceType.Memory] = float(request["value"][1]) + return ResourceAllocations(requests=requests_values, limits=limits_values) + + async def _list_containers_in_pods(self, pods: list[str]) -> set[str]: + containers = await self.prometheus.loader.query( + f""" + count by (container) ( + kube_pod_container_info{{ + {self.cluster_selector} + pod=~"{'|'.join(pods)}" + }} + ) + """ + ) + + return {container["metric"]["container"] for container in containers} diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py new file mode 100644 index 00000000..942d3fab --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/double_parent.py @@ -0,0 +1,123 @@ +import logging +from collections import defaultdict +import itertools +import asyncio +from typing import Literal, Union + +from robusta_krr.core.models.objects import K8sWorkload, KindLiteral, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + +SubownerLiteral = Literal["ReplicaSet", "ReplicationController", "Job"] + + +class DoubleParentLoader(BaseKindLoader): + kinds = ["Deployment", "Rollout", "DeploymentConfig", "CronJob"] + + kind_subowner_map: dict[KindLiteral, SubownerLiteral] = { + "Deployment": "ReplicaSet", + "Rollout": "ReplicaSet", + "DeploymentConfig": "ReplicationController", + "CronJob": "Job", + } + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + return list( + itertools.chain( + *await asyncio.gather( + *[ + self.list_workloads_by_subowner(namespaces, subowner) + for subowner in set(self.kind_subowner_map.values()) + ] + ) + ) + ) + + async def list_workloads_by_subowner( + self, namespaces: Union[list[str], Literal["*"]], subowner_kind: SubownerLiteral + ) -> list[K8sWorkload]: + kinds = [kind for kind in self.kinds_to_scan if self.kind_subowner_map[kind] == subowner_kind] + + if kinds == []: + return [] + + logger.debug(f"Listing {', '.join(kinds)}") + # NOTE: kube-system is excluded if we scan all namespaces + namespace_selector = ( + ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' + ) + + metric_name = f"kube_{subowner_kind.lower()}_owner" + subowner_label = subowner_kind.lower() if subowner_kind != "Job" else "job_name" + + # Replica is for ReplicaSet and/or ReplicationController + subowners = await self.prometheus.loader.query( + f""" + count by (namespace, owner_name, {subowner_label}, owner_kind) ( + {metric_name} {{ + {self.cluster_selector} + {namespace_selector}, + owner_kind=~"{'|'.join(kinds)}" + }} + ) + """ + ) + # groupBy: (namespace, owner_name, owner_kind) => [replicaset,...] + replicas_by_owner = defaultdict(list) + for subowner in subowners: + metric = subowner["metric"] + key = metric["namespace"], metric["owner_name"], metric["owner_kind"] + replicas_by_owner[key].append(metric[subowner_label]) + + return list( + itertools.chain( + *await asyncio.gather( + *[ + self._list_pods_of_subowner( + namespace, + name, + kind, + subowner_kind, + subowners, + ) + for (namespace, name, kind), subowners in replicas_by_owner.items() + ] + ) + ) + ) + + async def _list_pods_of_subowner( + self, namespace: str, name: str, kind: str, subowner_kind: str, subowner_names: list[str] + ) -> list[K8sWorkload]: + pods = await self.prometheus.loader.query( + f""" + count by (namespace, owner_name, owner_kind, pod) ( + kube_pod_owner{{ + {self.cluster_selector} + namespace="{namespace}", + owner_name=~"{'|'.join(subowner_names)}", + owner_kind="{subowner_kind}" + }} + ) + """ + ) + if pods is None or len(pods) == 0: + return [] + + pod_names = [pod["metric"]["pod"] for pod in pods] + containers = await self._list_containers_in_pods(pod_names) + + return [ + K8sWorkload( + cluster=self.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container_name, + allocations=await self._parse_allocation(namespace, pod_names, container_name), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for container_name in containers + ] diff --git a/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py new file mode 100644 index 00000000..00d30236 --- /dev/null +++ b/robusta_krr/core/integrations/prometheus/cluster_loader/loaders/simple_parent.py @@ -0,0 +1,85 @@ +import asyncio +from collections import defaultdict +import logging +from typing import Literal, Union + +from robusta_krr.core.models.objects import K8sWorkload, PodData + +from .base import BaseKindLoader + +logger = logging.getLogger("krr") + + +class SimpleParentLoader(BaseKindLoader): + kinds = ["DaemonSet", "StatefulSet", "Job"] + + async def list_workloads(self, namespaces: Union[list[str], Literal["*"]]) -> list[K8sWorkload]: + if self.kinds_to_scan == []: + return [] + + logger.debug(f"Listing {', '.join(self.kinds_to_scan)}") + namespace_selector = ( + ('namespace=~"' + "|".join(namespaces) + '"') if namespaces != "*" else 'namespace!="kube-system"' + ) + + results = await self.prometheus.loader.query( + f""" + count by (namespace, owner_name, owner_kind, pod) ( + kube_pod_owner{{ + {self.cluster_selector} + {namespace_selector}, + owner_kind=~"{'|'.join(self.kinds_to_scan)}" + }} + ) + """ + ) + if results is None or len(results) == 0: + return [] + + # groupBy: (namespace, owner_name, owner_kind) => [pod, ... ] + workloads: defaultdict[tuple[str, str, str], list[str]] = defaultdict(list) + for result in results: + metric = result["metric"] + key = metric["namespace"], metric["owner_name"], metric["owner_kind"] + workloads[key].append(metric["pod"]) + + # NOTE: We do not show jobs that are a part of a cronjob, so we filter them out + job_workloads = [name for (_, name, kind) in workloads if kind == "Job"] + if job_workloads != []: + cronjobs = await self.prometheus.loader.query( + f""" + count by (namespace, job_name) ( + kube_job_owner{{ + {self.cluster_selector} + {namespace_selector}, + owner_kind="CronJob" + }} + ) + """ + ) + for cronjob in cronjobs: + metric = cronjob["metric"] + key = (metric["namespace"], metric["job_name"], "Job") + if key in workloads: + del workloads[key] + + workloads_containers = dict( + zip( + workloads.keys(), + await asyncio.gather(*[self._list_containers_in_pods(pods) for pods in workloads.values()]), + ) + ) + + return [ + K8sWorkload( + cluster=self.cluster, + namespace=namespace, + name=name, + kind=kind, + container=container, + allocations=await self._parse_allocation(namespace, pod_names, container), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for (namespace, name, kind), pod_names in workloads.items() + for container in workloads_containers[namespace, name, kind] + ] diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/connector.py similarity index 52% rename from robusta_krr/core/integrations/prometheus/loader.py rename to robusta_krr/core/integrations/prometheus/connector.py index db493927..8020c2e0 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/connector.py @@ -5,13 +5,12 @@ from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Optional -from kubernetes import config as k8s_config from kubernetes.client.api_client import ApiClient from kubernetes.client.exceptions import ApiException from prometrix import MetricsNotFound, PrometheusNotFound from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.objects import K8sWorkload, PodData from .metrics_service.prometheus_metrics_service import PrometheusMetricsService from .metrics_service.thanos_metrics_service import ThanosMetricsService @@ -23,7 +22,8 @@ logger = logging.getLogger("krr") -class PrometheusMetricsLoader: + +class PrometheusConnector: def __init__(self, *, cluster: Optional[str] = None) -> None: """ Initializes the Prometheus Loader. @@ -33,56 +33,59 @@ def __init__(self, *, cluster: Optional[str] = None) -> None: """ self.executor = ThreadPoolExecutor(settings.max_workers) - self.api_client = settings.get_kube_client(context=cluster) - loader = self.get_metrics_service(api_client=self.api_client, cluster=cluster) - if loader is None: - raise PrometheusNotFound( - f"Wasn't able to connect to any Prometheus service in {cluster or 'inner'} cluster\n" - "Try using port-forwarding and/or setting the url manually (using the -p flag.).\n" - "For more information, see 'Giving the Explicit Prometheus URL' at https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" - ) - - self.loader = loader + self.cluster = cluster - logger.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster") - - def get_metrics_service( - self, - api_client: Optional[ApiClient] = None, - cluster: Optional[str] = None, - ) -> Optional[PrometheusMetricsService]: - if settings.prometheus_url is not None: - logger.info("Prometheus URL is specified, will not auto-detect a metrics service") - metrics_to_check = [PrometheusMetricsService] - else: - logger.info("No Prometheus URL is specified, trying to auto-detect a metrics service") - metrics_to_check = [VictoriaMetricsService, ThanosMetricsService, MimirMetricsService, PrometheusMetricsService] + def discover(self, api_client: ApiClient) -> None: + """Try to automatically discover a Prometheus service.""" + metrics_to_check: list[PrometheusMetricsService] = [ + VictoriaMetricsService, + ThanosMetricsService, + MimirMetricsService, + PrometheusMetricsService, + ] for metric_service_class in metrics_to_check: - service_name = metric_service_class.name() + logger.info(f"Trying to find {metric_service_class.name()}{self._for_cluster_postfix}") try: - loader = metric_service_class(api_client=api_client, cluster=cluster, executor=self.executor) - loader.check_connection() - except MetricsNotFound as e: - logger.info(f"{service_name} not found: {e}") - except ApiException as e: - logger.warning( - f"Unable to automatically discover a {service_name} in the cluster ({e}). " - "Try specifying how to connect to Prometheus via cli options" - ) + loader = metric_service_class.discover(api_client=api_client) + self._connect(loader) + except Exception: + logger.info(f"Wasn't able to find {metric_service_class.name()}{self._for_cluster_postfix}") else: - logger.info(f"{service_name} found") - loader.validate_cluster_name() - return loader + return - return None + raise PrometheusNotFound + + def connect(self, url: Optional[str] = None) -> None: + """Connect to a Prometheus service using a URL.""" + loader = PrometheusMetricsService(url=url) + self._connect(loader) + logger.info(f"{loader.name()} connected successfully") + + def _connect(self, loader: PrometheusMetricsService) -> None: + service_name = loader.name() + try: + loader.check_connection() + except MetricsNotFound as e: + logger.info(f"{service_name} not found: {e}") + raise PrometheusNotFound(f"Wasn't able to connect to {service_name}" + self._for_cluster_postfix) + except ApiException as e: + logger.warning( + f"Unable to automatically discover a {service_name}{self._for_cluster_postfix} ({e}). " + "Try specifying how to connect to Prometheus via cli options" + ) + raise e + else: + logger.info(f"{service_name} found") + loader.validate_cluster_name() + self.loader = loader async def get_history_range( self, history_duration: datetime.timedelta ) -> Optional[tuple[datetime.datetime, datetime.datetime]]: return await self.loader.get_history_range(history_duration) - async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> list[PodData]: + async def load_pods(self, object: K8sWorkload, period: datetime.timedelta) -> list[PodData]: try: return await self.loader.load_pods(object, period) except Exception as e: @@ -91,7 +94,7 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) -> async def gather_data( self, - object: K8sObjectData, + object: K8sWorkload, strategy: BaseStrategy, period: datetime.timedelta, *, @@ -114,3 +117,8 @@ async def gather_data( MetricLoader.__name__: await self.loader.gather_data(object, MetricLoader, period, step) for MetricLoader in strategy.metrics } + + @property + def _for_cluster_postfix(self) -> str: + """The string postfix to be used in logging messages.""" + return f" for {self.cluster} cluster" if self.cluster else "" diff --git a/robusta_krr/core/integrations/prometheus/metrics/base.py b/robusta_krr/core/integrations/prometheus/metrics/base.py index 4169b0f0..67266dd2 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/base.py +++ b/robusta_krr/core/integrations/prometheus/metrics/base.py @@ -15,7 +15,7 @@ from robusta_krr.core.abstract.metrics import BaseMetric from robusta_krr.core.abstract.strategies import PodsTimeData from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload class PrometheusSeries(TypedDict): @@ -75,19 +75,8 @@ def __init__( if self.pods_batch_size is not None and self.pods_batch_size <= 0: raise ValueError("pods_batch_size must be positive") - def get_prometheus_cluster_label(self) -> str: - """ - Generates the cluster label for querying a centralized Prometheus - - Returns: - str: a promql safe label string for querying the cluster. - """ - if settings.prometheus_cluster_label is None: - return "" - return f', {settings.prometheus_label}="{settings.prometheus_cluster_label}"' - @abc.abstractmethod - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: """ This method should be implemented by all subclasses to provide a query string to fetch metrics. @@ -152,7 +141,7 @@ async def query_prometheus(self, data: PrometheusMetricData) -> list[PrometheusS return await loop.run_in_executor(self.executor, lambda: self._query_prometheus_sync(data)) async def load_data( - self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta + self, object: K8sWorkload, period: datetime.timedelta, step: datetime.timedelta ) -> PodsTimeData: """ Asynchronous method that loads metric data for a specific object. diff --git a/robusta_krr/core/integrations/prometheus/metrics/cpu.py b/robusta_krr/core/integrations/prometheus/metrics/cpu.py index c7a2c733..14318caf 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/cpu.py +++ b/robusta_krr/core/integrations/prometheus/metrics/cpu.py @@ -1,4 +1,4 @@ -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from .base import PrometheusMetric, QueryType @@ -10,17 +10,16 @@ class CPULoader(PrometheusMetric): query_type: QueryType = QueryType.QueryRange - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max( rate( container_cpu_usage_seconds_total{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }}[{step}] ) ) by (container, pod, job) @@ -36,19 +35,18 @@ def PercentileCPULoader(percentile: float) -> type[PrometheusMetric]: raise ValueError("percentile must be between 0 and 100") class PercentileCPULoader(PrometheusMetric): - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" quantile_over_time( {round(percentile / 100, 2)}, max( rate( container_cpu_usage_seconds_total{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }}[{step}] ) ) by (container, pod, job) @@ -64,17 +62,16 @@ class CPUAmountLoader(PrometheusMetric): A metric loader for loading CPU points count. """ - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" count_over_time( max( container_cpu_usage_seconds_total{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) [{duration}:{step}] diff --git a/robusta_krr/core/integrations/prometheus/metrics/memory.py b/robusta_krr/core/integrations/prometheus/metrics/memory.py index 85dfba6b..d6d5baa3 100644 --- a/robusta_krr/core/integrations/prometheus/metrics/memory.py +++ b/robusta_krr/core/integrations/prometheus/metrics/memory.py @@ -1,4 +1,4 @@ -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from .base import PrometheusMetric, QueryType @@ -10,16 +10,15 @@ class MemoryLoader(PrometheusMetric): query_type: QueryType = QueryType.QueryRange - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max( container_memory_working_set_bytes{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) """ @@ -30,17 +29,16 @@ class MaxMemoryLoader(PrometheusMetric): A metric loader for loading max memory usage metrics. """ - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max_over_time( max( container_memory_working_set_bytes{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) [{duration}:{step}] @@ -53,23 +51,23 @@ class MemoryAmountLoader(PrometheusMetric): A metric loader for loading memory points count. """ - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" count_over_time( max( container_memory_working_set_bytes{{ + {object.cluster_selector} namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (container, pod, job) [{duration}:{step}] ) """ + # TODO: Need to battle test if this one is correct. class MaxOOMKilledMemoryLoader(PrometheusMetric): """ @@ -78,29 +76,28 @@ class MaxOOMKilledMemoryLoader(PrometheusMetric): warning_on_no_data = False - def get_query(self, object: K8sObjectData, duration: str, step: str) -> str: + def get_query(self, object: K8sWorkload, duration: str, step: str) -> str: pods_selector = "|".join(pod.name for pod in object.pods) - cluster_label = self.get_prometheus_cluster_label() return f""" max_over_time( max( max( kube_pod_container_resource_limits{{ + {object.cluster_selector} resource="memory", namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} - }} + }} ) by (pod, container, job) * on(pod, container, job) group_left(reason) max( kube_pod_container_status_last_terminated_reason{{ + {object.cluster_selector} reason="OOMKilled", namespace="{object.namespace}", pod=~"{pods_selector}", container="{object.container}" - {cluster_label} }} ) by (pod, container, job, reason) ) by (container, pod, job) diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index a3b0ee0f..9b0c1630 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -7,44 +7,34 @@ from robusta_krr.core.abstract.strategies import PodsTimeData from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from ..metrics import PrometheusMetric class MetricsService(abc.ABC): - def __init__( - self, - api_client: Optional[ApiClient] = None, - cluster: Optional[str] = None, - executor: Optional[ThreadPoolExecutor] = None, - ) -> None: - self.api_client = api_client - self.cluster = cluster or "default" - self.executor = executor - @abc.abstractmethod def check_connection(self): - ... - - @classmethod - def name(cls) -> str: - classname = cls.__name__ - return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname + pass @abc.abstractmethod def get_cluster_names(self) -> Optional[List[str]]: - ... + pass @abc.abstractmethod async def gather_data( self, - object: K8sObjectData, + object: K8sWorkload, LoaderClass: type[PrometheusMetric], period: datetime.timedelta, step: datetime.timedelta = datetime.timedelta(minutes=30), ) -> PodsTimeData: - ... + pass + + @classmethod + def name(cls) -> str: + classname = cls.__name__ + return classname.replace("MetricsService", "") if classname != MetricsService.__name__ else classname def get_prometheus_cluster_label(self) -> str: """ diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py index ea3af57c..05bbc702 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/mimir_metrics_service.py @@ -7,8 +7,9 @@ from .prometheus_metrics_service import PrometheusMetricsService + class MimirMetricsDiscovery(MetricsServiceDiscovery): - def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_metrics_url(self) -> Optional[str]: """ Finds the Mimir Metrics URL using selectors. Args: diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index 8331249f..f11a32d6 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -1,8 +1,11 @@ +from __future__ import annotations + import asyncio import logging from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Iterable, List, Optional +from typing_extensions import Self from kubernetes.client import ApiClient from prometheus_api_client import PrometheusApiClientException @@ -11,7 +14,7 @@ from robusta_krr.core.abstract.strategies import PodsTimeData from robusta_krr.core.integrations import openshift from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.objects import K8sWorkload, PodData from robusta_krr.utils.batched import batched from robusta_krr.utils.service_discovery import MetricsServiceDiscovery @@ -23,7 +26,7 @@ class PrometheusDiscovery(MetricsServiceDiscovery): - def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_metrics_url(self) -> Optional[str]: """ Finds the Prometheus URL using selectors. Args: @@ -56,18 +59,21 @@ class PrometheusMetricsService(MetricsService): def __init__( self, - *, + url: str, cluster: Optional[str] = None, - api_client: Optional[ApiClient] = None, executor: Optional[ThreadPoolExecutor] = None, + api_client: Optional[ApiClient] = None, ) -> None: - super().__init__(api_client=api_client, cluster=cluster, executor=executor) - - logger.info(f"Trying to connect to {self.name()} for {self.cluster} cluster") + self.url = url + self.url_postfix + self.cluster = cluster + self.executor = executor or ThreadPoolExecutor(settings.max_workers) + self.api_client = api_client self.auth_header = settings.prometheus_auth_header self.ssl_enabled = settings.prometheus_ssl_enabled + logger.info(f"Using {self.name()} at {self.url}" + self._for_cluster_postfix) + if settings.openshift: logging.info("Openshift flag is set, trying to load token from service account.") openshift_token = openshift.load_token() @@ -78,20 +84,6 @@ def __init__( else: logging.warning("Openshift token is not found, trying to connect without it.") - self.prometheus_discovery = self.service_discovery(api_client=self.api_client) - - self.url = settings.prometheus_url - self.url = self.url or self.prometheus_discovery.find_metrics_url() - - if not self.url: - raise PrometheusNotFound( - f"{self.name()} instance could not be found while scanning in {self.cluster} cluster." - ) - - self.url += self.url_postfix - - logger.info(f"Using {self.name()} at {self.url} for cluster {cluster or 'default'}") - headers = settings.prometheus_other_headers headers |= self.additional_headers @@ -99,9 +91,23 @@ def __init__( headers |= {"Authorization": self.auth_header} elif not settings.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) + self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self) self.prometheus = get_custom_prometheus_connect(self.prom_config) + @classmethod + def discover( + cls, + api_client: ApiClient, + cluster: Optional[str] = None, + executor: Optional[ThreadPoolExecutor] = None, + ) -> Self: + url = cls.service_discovery(api_client=api_client).find_metrics_url() + if not url: + raise PrometheusNotFound(f"{cls.name()} instance could not be found while scanning") + + return cls(url, cluster, executor, api_client) + def check_connection(self): """ Checks the connection to Prometheus. @@ -112,10 +118,14 @@ def check_connection(self): async def query(self, query: str) -> dict: loop = asyncio.get_running_loop() - return await loop.run_in_executor( - self.executor, - lambda: self.prometheus.safe_custom_query(query=query)["result"], - ) + try: + return await loop.run_in_executor( + self.executor, + lambda: self.prometheus.safe_custom_query(query=query)["result"], + ) + except PrometheusApiClientException as e: + logger.error(f"Error while querying Prometheus: {query}") + raise e async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict: loop = asyncio.get_running_loop() @@ -177,7 +187,7 @@ async def get_history_range(self, history_duration: timedelta) -> tuple[datetime async def gather_data( self, - object: K8sObjectData, + object: K8sWorkload, LoaderClass: type[PrometheusMetric], period: timedelta, step: timedelta = timedelta(minutes=30), @@ -203,7 +213,7 @@ async def gather_data( return data - async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]: + async def load_pods(self, object: K8sWorkload, period: timedelta) -> list[PodData]: """ List pods related to the object and add them to the object's pods list. Args: @@ -245,7 +255,9 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD }}[{period_literal}] """ ) - pod_owners = {repl_controller["metric"]["replicationcontroller"] for repl_controller in replication_controllers} + pod_owners = { + repl_controller["metric"]["replicationcontroller"] for repl_controller in replication_controllers + } pod_owner_kind = "ReplicationController" del replication_controllers @@ -306,3 +318,8 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD del pods_status_result return list({PodData(name=pod, deleted=pod not in current_pods_set) for pod in related_pods}) + + @property + def _for_cluster_postfix(self) -> str: + """The string postfix to be used in logging messages.""" + return f" for {self.cluster} cluster" if self.cluster else "" diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py index eaf16201..4a3dea11 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/thanos_metrics_service.py @@ -9,7 +9,7 @@ class ThanosMetricsDiscovery(MetricsServiceDiscovery): - def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_metrics_url(self) -> Optional[str]: """ Finds the Thanos URL using selectors. Args: diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py index e8fbcd02..f2c46904 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/victoria_metrics_service.py @@ -9,7 +9,7 @@ class VictoriaMetricsDiscovery(MetricsServiceDiscovery): - def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_metrics_url(self) -> Optional[str]: """ Finds the Victoria Metrics URL using selectors. Args: diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index ff6142a6..4f8b5066 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -1,28 +1,36 @@ from __future__ import annotations import logging +from enum import Enum import sys from typing import Any, Literal, Optional, Union import pydantic as pd -from kubernetes import config +from kubernetes import config, client from kubernetes.config.config_exception import ConfigException from rich.console import Console from rich.logging import RichHandler from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import AnyStrategy, BaseStrategy +from robusta_krr.core.abstract.cluster_loader import BaseClusterLoader from robusta_krr.core.models.objects import KindLiteral logger = logging.getLogger("krr") +class LoadingMode(str, Enum): + KUBEAPI = "kubeapi" + PROMETHETUS = "prometheus" + + class Config(pd.BaseSettings): quiet: bool = pd.Field(False) verbose: bool = pd.Field(False) clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None + mode: LoadingMode = pd.Field(LoadingMode.KUBEAPI) impersonate_user: Optional[str] = None impersonate_group: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") @@ -136,6 +144,19 @@ def logging_console(self) -> Console: self._logging_console = Console(file=sys.stderr if self.log_to_stderr else sys.stdout, width=self.width) return self._logging_console + def create_cluster_loader(self) -> BaseClusterLoader: + from robusta_krr.core.integrations.prometheus.cluster_loader import PrometheusClusterLoader + from robusta_krr.core.integrations.kubernetes.cluster_loader import KubeAPIClusterLoader + + if settings.mode == LoadingMode.KUBEAPI: + logger.info("Connecting using Kubernetes API, will load the kubeconfig.") + return KubeAPIClusterLoader() + elif settings.mode == LoadingMode.PROMETHETUS: + logger.info("Connecting using Prometheus, will load the kubeconfig.") + return PrometheusClusterLoader() + else: + raise NotImplementedError(f"Workload loader {settings.mode} is not implemented") + def load_kubeconfig(self) -> None: try: config.load_kube_config(config_file=self.kubeconfig, context=self.context) @@ -144,7 +165,7 @@ def load_kubeconfig(self) -> None: config.load_incluster_config() self.inside_cluster = True - def get_kube_client(self, context: Optional[str] = None): + def get_kube_client(self, context: Optional[str] = None) -> client.ApiClient: if context is None: return None diff --git a/robusta_krr/core/models/exceptions.py b/robusta_krr/core/models/exceptions.py new file mode 100644 index 00000000..70f3b052 --- /dev/null +++ b/robusta_krr/core/models/exceptions.py @@ -0,0 +1,2 @@ +class CriticalRunnerException(Exception): + """This exception will be raised when a critical error occurs in the runner and the runner cannot continue.""" diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index e4b400d9..21289036 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -19,11 +19,21 @@ def __hash__(self) -> int: return hash(self.name) +class HPAKey(pd.BaseModel): + namespace: str + kind: str + name: str + + class Config: + allow_mutation = False + + def __hash__(self) -> int: + return hash((self.namespace, self.kind, self.name)) + + class HPAData(pd.BaseModel): min_replicas: Optional[int] max_replicas: int - current_replicas: Optional[int] - desired_replicas: int target_cpu_utilization_percentage: Optional[float] target_memory_utilization_percentage: Optional[float] @@ -35,7 +45,7 @@ class HPAData(pd.BaseModel): ] -class K8sObjectData(pd.BaseModel): +class K8sWorkload(pd.BaseModel): # NOTE: Here None means that we are running inside the cluster cluster: Optional[str] name: str @@ -52,12 +62,27 @@ class K8sObjectData(pd.BaseModel): def __str__(self) -> str: return f"{self.kind} {self.namespace}/{self.name}/{self.container}" + def __repr__(self) -> str: + return f"" + def __hash__(self) -> int: return hash(str(self)) def add_warning(self, warning: PodWarning) -> None: self.warnings.add(warning) + @property + def cluster_selector(self) -> str: + from robusta_krr.core.models.config import settings + + if settings.prometheus_label is not None: + return f'{settings.prometheus_cluster_label}="{settings.prometheus_label}",' + + if settings.prometheus_cluster_label is None: + return "" + + return f'{settings.prometheus_cluster_label}="{self.cluster}",' if self.cluster else "" + @property def current_pods_count(self) -> int: return len([pod for pod in self.pods if not pod.deleted]) @@ -80,7 +105,7 @@ def selector(self) -> V1LabelSelector: else: return self._api_resource.spec.selector - def split_into_batches(self, n: int) -> list[K8sObjectData]: + def split_into_batches(self, n: int) -> list[K8sWorkload]: """ Batch this object into n objects, splitting the pods into batches of size n. """ @@ -89,7 +114,7 @@ def split_into_batches(self, n: int) -> list[K8sObjectData]: return [self] return [ - K8sObjectData( + K8sWorkload( cluster=self.cluster, name=self.name, container=self.container, diff --git a/robusta_krr/core/models/result.py b/robusta_krr/core/models/result.py index 2d5ffbc9..1735125d 100644 --- a/robusta_krr/core/models/result.py +++ b/robusta_krr/core/models/result.py @@ -6,7 +6,7 @@ from robusta_krr.core.abstract import formatters from robusta_krr.core.models.allocations import RecommendationValue, ResourceAllocations, ResourceType -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.severity import Severity @@ -22,12 +22,12 @@ class ResourceRecommendation(pd.BaseModel): class ResourceScan(pd.BaseModel): - object: K8sObjectData + object: K8sWorkload recommended: ResourceRecommendation severity: Severity @classmethod - def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan: + def calculate(cls, object: K8sWorkload, recommendation: ResourceAllocations) -> ResourceScan: recommendation_processed = ResourceRecommendation(requests={}, limits={}, info={}) for resource_type in ResourceType: diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 8e08521c..68cbeb05 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -1,26 +1,28 @@ import asyncio +import itertools import logging import math import os -import sys import warnings from concurrent.futures import ThreadPoolExecutor from typing import Optional, Union from datetime import timedelta -from prometrix import PrometheusNotFound from rich.console import Console from slack_sdk import WebClient +from prometrix import PrometheusNotFound from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult -from robusta_krr.core.integrations.kubernetes import KubernetesLoader -from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, PrometheusMetricsLoader +from robusta_krr.core.abstract.workload_loader import IListPodsFallback +from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException from robusta_krr.core.models.config import settings -from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.core.models.exceptions import CriticalRunnerException +from robusta_krr.core.models.objects import K8sWorkload from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, StrategyData from robusta_krr.utils.intro import load_intro_message from robusta_krr.utils.progress_bar import ProgressBar from robusta_krr.utils.version import get_version, load_latest_version + logger = logging.getLogger("krr") @@ -33,40 +35,14 @@ def custom_print(*objects, rich: bool = True, force: bool = False) -> None: print_func(*objects) # type: ignore -class CriticalRunnerException(Exception): ... - - class Runner: - EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) - def __init__(self) -> None: - self._k8s_loader = KubernetesLoader() - self._metrics_service_loaders: dict[Optional[str], Union[PrometheusMetricsLoader, Exception]] = {} - self._metrics_service_loaders_error_logged: set[Exception] = set() - self._strategy = settings.create_strategy() + self.strategy = settings.create_strategy() self.errors: list[dict] = [] # This executor will be running calculations for recommendations - self._executor = ThreadPoolExecutor(settings.max_workers) - - def _get_prometheus_loader(self, cluster: Optional[str]) -> Optional[PrometheusMetricsLoader]: - if cluster not in self._metrics_service_loaders: - try: - self._metrics_service_loaders[cluster] = PrometheusMetricsLoader(cluster=cluster) - except Exception as e: - self._metrics_service_loaders[cluster] = e - - result = self._metrics_service_loaders[cluster] - if isinstance(result, self.EXPECTED_EXCEPTIONS): - if result not in self._metrics_service_loaders_error_logged: - self._metrics_service_loaders_error_logged.add(result) - logger.error(str(result)) - return None - elif isinstance(result, Exception): - raise result - - return result + self.executor = ThreadPoolExecutor(settings.max_workers) @staticmethod def __parse_version_string(version: str) -> tuple[int, ...]: @@ -93,7 +69,7 @@ async def _greet(self) -> None: custom_print(intro_message) custom_print(f"\nRunning Robusta's KRR (Kubernetes Resource Recommender) {current_version}") - custom_print(f"Using strategy: {self._strategy}") + custom_print(f"Using strategy: {self.strategy}") custom_print(f"Using formatter: {settings.format}") if latest_version is not None and self.__check_newer_version_available(current_version, latest_version): custom_print(f"[yellow bold]A newer version of KRR is available: {latest_version}[/yellow bold]") @@ -165,16 +141,19 @@ def _format_result(self, result: RunResult) -> RunResult: for resource, recommendation in result.items() } - async def _calculate_object_recommendations(self, object: K8sObjectData) -> Optional[RunResult]: - prometheus_loader = self._get_prometheus_loader(object.cluster) + async def _calculate_object_recommendations(self, object: K8sWorkload) -> Optional[RunResult]: + prometheus = self.connector.get_prometheus(object.cluster) - if prometheus_loader is None: + if prometheus is None: return None - object.pods = await prometheus_loader.load_pods(object, self._strategy.settings.history_timedelta) - if object.pods == []: - # Fallback to Kubernetes API - object.pods = await self._k8s_loader.load_pods(object) + cluster_loader = self.connector.get_workload_loader(object.cluster) + + object.pods = await prometheus.load_pods(object, self.strategy.settings.history_timedelta) + if object.pods == [] and isinstance(cluster_loader, IListPodsFallback): + # Fallback to IListPodsFallback if Prometheus did not return any pods + # IListPodsFallback is implemented by the Kubernetes API connector + object.pods = await cluster_loader.load_pods(object) # NOTE: Kubernetes API returned pods, but Prometheus did not # This might happen with fast executing jobs @@ -185,25 +164,34 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> Opti "Loaded pods from Kubernetes API instead." ) - metrics = await prometheus_loader.gather_data( + metrics = await prometheus.gather_data( object, - self._strategy, - self._strategy.settings.history_timedelta, - step=self._strategy.settings.timeframe_timedelta, + self.strategy, + self.strategy.settings.history_timedelta, + step=self.strategy.settings.timeframe_timedelta, ) # NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive # But keep in mind that numpy calcluations will not block the GIL loop = asyncio.get_running_loop() - result = await loop.run_in_executor(self._executor, self._strategy.run, metrics, object) + result = await loop.run_in_executor(self.executor, self.strategy.run, metrics, object) logger.info(f"Calculated recommendations for {object} (using {len(metrics)} metrics)") return self._format_result(result) - async def _check_data_availability(self, cluster: Optional[str]) -> None: - prometheus_loader = self._get_prometheus_loader(cluster) - if prometheus_loader is None: - return + async def _check_cluster(self, cluster: Optional[str]) -> bool: + try: + prometheus_loader = self.connector.get_prometheus(cluster) + except PrometheusNotFound: + logger.error( + f"Wasn't able to connect to any Prometheus service" f" for cluster {cluster}" + if cluster is not None + else "" + "\nTry using port-forwarding and/or setting the url manually (using the -p flag.).\n" + "For more information, see 'Giving the Explicit Prometheus URL' at " + "https://github.com/robusta-dev/krr?tab=readme-ov-file#usage" + ) + return False try: history_range = await prometheus_loader.get_history_range(timedelta(hours=5)) @@ -216,14 +204,17 @@ async def _check_data_availability(self, cluster: Optional[str]) -> None: "name": "HistoryRangeError", } ) - return + return True # We can try to continue without history range - logger.debug(f"History range for {cluster}: {history_range}") - enough_data = self._strategy.settings.history_range_enough(history_range) + logger.debug( + f"History range{f' for cluster {cluster}' if cluster else ''}: " + f"({history_range[0]})-({history_range[1]})" + ) + enough_data = self.strategy.settings.history_range_enough(history_range) if not enough_data: logger.warning(f"Not enough history available for cluster {cluster}.") - try_after = history_range[0] + self._strategy.settings.history_timedelta + try_after = history_range[0] + self.strategy.settings.history_timedelta logger.warning( "If the cluster is freshly installed, it might take some time for the enough data to be available." @@ -239,25 +230,37 @@ async def _check_data_availability(self, cluster: Optional[str]) -> None: } ) - async def _gather_object_allocations(self, k8s_object: K8sObjectData) -> Optional[ResourceScan]: - recommendation = await self._calculate_object_recommendations(k8s_object) + return True - self.__progressbar.progress() + async def _gather_object_allocations(self, k8s_object: K8sWorkload) -> Optional[ResourceScan]: + try: + recommendation = await self._calculate_object_recommendations(k8s_object) - if recommendation is None: - return None + self.__progressbar.progress() - return ResourceScan.calculate( - k8s_object, - ResourceAllocations( - requests={resource: recommendation[resource].request for resource in ResourceType}, - limits={resource: recommendation[resource].limit for resource in ResourceType}, - info={resource: recommendation[resource].info for resource in ResourceType}, - ), - ) + if recommendation is None: + return None + return ResourceScan.calculate( + k8s_object, + ResourceAllocations( + requests={resource: recommendation[resource].request for resource in ResourceType}, + limits={resource: recommendation[resource].limit for resource in ResourceType}, + info={resource: recommendation[resource].info for resource in ResourceType}, + ), + ) + except Exception as e: + logger.error(f"Failed to gather allocations for {k8s_object}") + logger.exception(e) + return None + async def _collect_result(self) -> Result: - clusters = await self._k8s_loader.list_clusters() + clusters = await self.connector.list_clusters() + if clusters is None: + logger.info("Can not list clusters, single cluster mode.") + else: + logger.info(f"Clusters available: {', '.join(clusters)}") + if clusters and len(clusters) > 1 and settings.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining @@ -269,14 +272,38 @@ async def _collect_result(self) -> Result: logger.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') + # This is for code clarity. All functions take str | None as cluster parameter if clusters is None: - await self._check_data_availability(None) - else: - await asyncio.gather(*[self._check_data_availability(cluster) for cluster in clusters]) + clusters = [None] + + checks = await asyncio.gather(*[self._check_cluster(cluster) for cluster in clusters]) + clusters = [cluster for cluster, check in zip(clusters, checks) if check] + + if clusters == []: + raise CriticalRunnerException("No clusters available to scan.") + + workload_loaders = {cluster: self.connector.try_get_workload_loader(cluster) for cluster in clusters} + + # NOTE: we filter out None values as they are clusters that we could not connect to + workload_loaders = {cluster: loader for cluster, loader in workload_loaders.items() if loader is not None} + + if workload_loaders == {}: + raise CriticalRunnerException("Could not connect to any cluster.") with ProgressBar(title="Calculating Recommendation") as self.__progressbar: - workloads = await self._k8s_loader.list_scannable_objects(clusters) + # We gather all workloads from all clusters in parallel (asyncio.gather) + # Then we chain all workloads together (itertools.chain) + workloads = list( + itertools.chain( + *await asyncio.gather(*[loader.list_workloads() for loader in workload_loaders.values()]) + ) + ) + # Then we gather all recommendations for all workloads in parallel (asyncio.gather) scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads]) + # NOTE: Previously we were streaming workloads to + # calculate recommendations as soon as they were available (not waiting for all workloads to be loaded), + # but it gave minor performance improvements (most of the time was spent on calculating recommendations) + # So we decided to do those two steps sequentially to simplify the code successful_scans = [scan for scan in scans if scan is not None] @@ -292,11 +319,11 @@ async def _collect_result(self) -> Result: raise CriticalRunnerException("No successful scans were made. Check the logs for more information.") return Result( - scans=scans, - description=self._strategy.description, + scans=successful_scans, + description=self.strategy.description, strategy=StrategyData( - name=str(self._strategy).lower(), - settings=self._strategy.settings.dict(), + name=str(self.strategy).lower(), + settings=self.strategy.settings.dict(), ), ) @@ -304,23 +331,18 @@ async def run(self) -> int: """Run the Runner. The return value is the exit code of the program.""" await self._greet() - try: - settings.load_kubeconfig() - except Exception as e: - logger.error(f"Could not load kubernetes configuration: {e}") - logger.error("Try to explicitly set --context and/or --kubeconfig flags.") - return 1 # Exit with error - try: # eks has a lower step limit than other types of prometheus, it will throw an error - step_count = self._strategy.settings.history_duration * 60 / self._strategy.settings.timeframe_duration + step_count = self.strategy.settings.history_duration * 60 / self.strategy.settings.timeframe_duration if settings.eks_managed_prom and step_count > 11000: - min_step = self._strategy.settings.history_duration * 60 / 10000 + min_step = self.strategy.settings.history_duration * 60 / 10000 logger.warning( f"The timeframe duration provided is insufficient and will be overridden with {min_step}. " f"Kindly adjust --timeframe_duration to a value equal to or greater than {min_step}." ) - self._strategy.settings.timeframe_duration = min_step + self.strategy.settings.timeframe_duration = min_step + + self.connector = settings.create_cluster_loader() result = await self._collect_result() logger.info("Result collected, displaying...") diff --git a/robusta_krr/main.py b/robusta_krr/main.py index 5c2d01aa..021c9efa 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -15,7 +15,7 @@ from robusta_krr import formatters as concrete_formatters # noqa: F401 from robusta_krr.core.abstract import formatters from robusta_krr.core.abstract.strategies import BaseStrategy -from robusta_krr.core.models.config import Config +from robusta_krr.core.models.config import Config, LoadingMode from robusta_krr.core.runner import Runner from robusta_krr.utils.version import get_version @@ -53,19 +53,27 @@ def run_strategy( "--kubeconfig", "-k", help="Path to kubeconfig file. If not provided, will attempt to find it.", + rich_help_panel="KubeAPI Mode Settings", + ), + mode: LoadingMode = typer.Option( + LoadingMode.KUBEAPI, + "--mode", + "-m", + help="Loading mode. KubeAPI mode requires a kubeconfig and supports auto-discovery. Prometheus mode requires to pass a --prometheus-url.", + case_sensitive=False, rich_help_panel="Kubernetes Settings", ), impersonate_user: Optional[str] = typer.Option( None, "--as", help="Impersonate a user, just like `kubectl --as`. For example, system:serviceaccount:default:krr-account.", - rich_help_panel="Kubernetes Settings", + rich_help_panel="KubeAPI Mode Settings", ), impersonate_group: Optional[str] = typer.Option( None, "--as-group", help="Impersonate a user inside of a group, just like `kubectl --as-group`. For example, system:authenticated.", - rich_help_panel="Kubernetes Settings", + rich_help_panel="KubeAPI Mode Settings", ), clusters: List[str] = typer.Option( None, @@ -73,13 +81,13 @@ def run_strategy( "--cluster", "-c", help="List of clusters to run on. By default, will run on the current cluster. Use --all-clusters to run on all clusters.", - rich_help_panel="Kubernetes Settings", + rich_help_panel="KubeAPI Mode Settings", ), all_clusters: bool = typer.Option( False, "--all-clusters", help="Run on all clusters. Overrides --context.", - rich_help_panel="Kubernetes Settings", + rich_help_panel="KubeAPI Mode Settings", ), namespaces: List[str] = typer.Option( None, @@ -100,7 +108,7 @@ def run_strategy( "--selector", "-s", help="Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -s key1=value1,key2=value2). Matching objects must satisfy all of the specified label constraints.", - rich_help_panel="Kubernetes Settings", + rich_help_panel="KubeAPI Mode Settings", ), prometheus_url: Optional[str] = typer.Option( None, @@ -130,15 +138,16 @@ def run_strategy( ), prometheus_cluster_label: Optional[str] = typer.Option( None, - "--prometheus-cluster-label", + "--prometheus-cluster-value", "-l", - help="The label in prometheus for your cluster.(Only relevant for centralized prometheus)", + help="The name of the cluster in Prometheus to scan. e.g. 'gke-prod-1'. Use with `--prometheus-cluster-key`. Only relevant for centralized prometheus.", rich_help_panel="Prometheus Settings", ), prometheus_label: str = typer.Option( None, + "--prometheus-cluster-key", "--prometheus-label", - help="The label in prometheus used to differentiate clusters. (Only relevant for centralized prometheus)", + help="The label in prometheus used to differentiate different clusters (e.g. 'cluster'). Only relevant for centralized prometheus.", rich_help_panel="Prometheus Settings", ), eks_managed_prom: bool = typer.Option( @@ -250,6 +259,7 @@ def run_strategy( try: config = Config( kubeconfig=kubeconfig, + mode=mode, impersonate_user=impersonate_user, impersonate_group=impersonate_group, clusters="*" if all_clusters else clusters, diff --git a/robusta_krr/strategies/simple.py b/robusta_krr/strategies/simple.py index 3cecd18e..498401fd 100644 --- a/robusta_krr/strategies/simple.py +++ b/robusta_krr/strategies/simple.py @@ -5,7 +5,7 @@ from robusta_krr.core.abstract.strategies import ( BaseStrategy, - K8sObjectData, + K8sWorkload, MetricsPodData, PodsTimeData, ResourceRecommendation, @@ -103,7 +103,7 @@ def metrics(self) -> list[type[PrometheusMetric]]: return metrics def __calculate_cpu_proposal( - self, history_data: MetricsPodData, object_data: K8sObjectData + self, history_data: MetricsPodData, object_data: K8sWorkload ) -> ResourceRecommendation: data = history_data["PercentileCPULoader"] @@ -130,7 +130,7 @@ def __calculate_cpu_proposal( return ResourceRecommendation(request=cpu_usage, limit=None) def __calculate_memory_proposal( - self, history_data: MetricsPodData, object_data: K8sObjectData + self, history_data: MetricsPodData, object_data: K8sWorkload ) -> ResourceRecommendation: data = history_data["MaxMemoryLoader"] @@ -173,7 +173,7 @@ def __calculate_memory_proposal( request=memory_usage, limit=memory_usage, info="OOMKill detected" if oomkill_detected else None ) - def run(self, history_data: MetricsPodData, object_data: K8sObjectData) -> RunResult: + def run(self, history_data: MetricsPodData, object_data: K8sWorkload) -> RunResult: return { ResourceType.CPU: self.__calculate_cpu_proposal(history_data, object_data), ResourceType.Memory: self.__calculate_memory_proposal(history_data, object_data), diff --git a/robusta_krr/utils/service_discovery.py b/robusta_krr/utils/service_discovery.py index 988a8c0c..056ce1bb 100644 --- a/robusta_krr/utils/service_discovery.py +++ b/robusta_krr/utils/service_discovery.py @@ -81,6 +81,7 @@ def find_url(self, selectors: list[str]) -> Optional[str]: self.find_ingress_host(label_selector) ingress_url = self.find_ingress_host(label_selector) if ingress_url: + logger.debug(f"Found ingress with label selector {label_selector}") return ingress_url return None @@ -88,5 +89,5 @@ def find_url(self, selectors: list[str]) -> Optional[str]: class MetricsServiceDiscovery(ServiceDiscovery, ABC): @abstractmethod - def find_metrics_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]: + def find_metrics_url(self) -> Optional[str]: pass diff --git a/tests/conftest.py b/tests/conftest.py index b1d8d228..c582ac3c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,10 +5,15 @@ import numpy as np import pytest -from robusta_krr.api.models import K8sObjectData, PodData, ResourceAllocations +from robusta_krr.api.models import K8sWorkload, PodData, ResourceAllocations from robusta_krr.strategies.simple import SimpleStrategy, SimpleStrategySettings +from robusta_krr.core.integrations.prometheus.connector import PrometheusConnector +from robusta_krr.core.integrations.prometheus.metrics_service.prometheus_metrics_service import PrometheusMetricsService +from robusta_krr.core.models.config import Config +from robusta_krr.core.integrations.kubernetes.cluster_loader import KubeAPIClusterLoader, KubeAPIWorkloadLoader -TEST_OBJECT = K8sObjectData( + +TEST_OBJECT = K8sWorkload( cluster="mock-cluster", name="mock-object-1", container="mock-container-1", @@ -28,8 +33,9 @@ @pytest.fixture(autouse=True, scope="session") def mock_list_clusters(): - with patch( - "robusta_krr.core.integrations.kubernetes.KubernetesLoader.list_clusters", + with patch.object( + KubeAPIClusterLoader, + "list_clusters", new=AsyncMock(return_value=[TEST_OBJECT.cluster]), ): yield @@ -37,8 +43,9 @@ def mock_list_clusters(): @pytest.fixture(autouse=True, scope="session") def mock_list_scannable_objects(): - with patch( - "robusta_krr.core.integrations.kubernetes.KubernetesLoader.list_scannable_objects", + with patch.object( + KubeAPIWorkloadLoader, + "list_workloads", new=AsyncMock(return_value=[TEST_OBJECT]), ): yield @@ -46,8 +53,9 @@ def mock_list_scannable_objects(): @pytest.fixture(autouse=True, scope="session") def mock_load_kubeconfig(): - with patch("robusta_krr.core.models.config.Config.load_kubeconfig", return_value=None): - yield + with patch.object(Config, "load_kubeconfig", return_value=None): + with patch.object(Config, "get_kube_client", return_value=None): + yield @pytest.fixture(autouse=True, scope="session") @@ -60,8 +68,9 @@ def mock_prometheus_loader(): settings = SimpleStrategySettings() strategy = SimpleStrategy(settings) - with patch( - "robusta_krr.core.integrations.prometheus.loader.PrometheusMetricsLoader.gather_data", + with patch.object( + PrometheusConnector, + "gather_data", new=AsyncMock( return_value={ metric.__name__: {pod.name: metric_points_data for pod in TEST_OBJECT.pods} @@ -75,8 +84,9 @@ def mock_prometheus_loader(): @pytest.fixture(autouse=True, scope="session") def mock_prometheus_load_pods(): - with patch( - "robusta_krr.core.integrations.prometheus.loader.PrometheusMetricsLoader.load_pods", + with patch.object( + PrometheusConnector, + "load_pods", new=AsyncMock( return_value=TEST_OBJECT.pods, ), @@ -92,13 +102,15 @@ async def get_history_range(self, history_duration: timedelta) -> tuple[datetime start = now - history_duration return start, now - with patch( - "robusta_krr.core.integrations.prometheus.loader.PrometheusMetricsLoader.get_history_range", get_history_range - ): + with patch.object(PrometheusConnector, "get_history_range", get_history_range): yield @pytest.fixture(autouse=True, scope="session") -def mock_prometheus_init(): - with patch("robusta_krr.core.integrations.prometheus.loader.PrometheusMetricsLoader.__init__", return_value=None): - yield +def mock_prometheus_connector_connect(): + def _connect(self, loader) -> None: + self.loader = loader + + with patch.object(PrometheusConnector, "_connect", _connect): + with patch.object(PrometheusMetricsService, "discover", return_value=None): + yield