diff --git a/robusta_krr/core/integrations/base_workload_loader.py b/robusta_krr/core/integrations/base_workload_loader.py new file mode 100644 index 00000000..e6771a65 --- /dev/null +++ b/robusta_krr/core/integrations/base_workload_loader.py @@ -0,0 +1,15 @@ +import abc +from typing import Optional + +from robusta_krr.core.models.objects import K8sObjectData +from robusta_krr.utils.configurable import Configurable + +class WorkloadLoader(Configurable, abc.ABC): + + @abc.abstractmethod + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: + ... + + @abc.abstractmethod + async def list_clusters(self) -> Optional[list[str]]: + ... \ No newline at end of file diff --git a/robusta_krr/core/integrations/kubernetes.py b/robusta_krr/core/integrations/kubernetes.py index 7452e663..1a228a79 100644 --- a/robusta_krr/core/integrations/kubernetes.py +++ b/robusta_krr/core/integrations/kubernetes.py @@ -26,6 +26,7 @@ from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData from robusta_krr.core.models.result import ResourceAllocations from robusta_krr.utils.configurable import Configurable +from .base_workload_loader import WorkloadLoader from .rollout import RolloutAppsV1Api @@ -341,7 +342,7 @@ async def __list_hpa(self) -> dict[HPAKey, HPAData]: return await self.__list_hpa_v1() -class KubernetesLoader(Configurable): +class KubernetesLoader(WorkloadLoader): async def list_clusters(self) -> Optional[list[str]]: """List all clusters. diff --git a/robusta_krr/core/integrations/metrics.py b/robusta_krr/core/integrations/metrics.py new file mode 100644 index 00000000..a56e3583 --- /dev/null +++ b/robusta_krr/core/integrations/metrics.py @@ -0,0 +1,133 @@ +import asyncio +import itertools +from typing import Optional, List, Dict +from collections import defaultdict + +from robusta_krr.core.models.config import Config +from robusta_krr.core.models.objects import K8sObjectData, PodData +from robusta_krr.core.models.result import ResourceAllocations, ResourceType, RecommendationValue +from .prometheus.loader import MetricsLoader +from .base_workload_loader import WorkloadLoader + +class PrometheusLoader(WorkloadLoader): + def __init__(self, config: Config): + super().__init__(config) + self.metrics_loader = MetricsLoader(config) + + async def list_clusters(self) -> Optional[list[str]]: + self.debug("Working in Prometheus-based workload discovery mode. Only support a single cluster") + return None + + async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]: + """List all scannable objects from Prometheus + In this workload discovery mode, clusters are not supported. + + Returns: + A list of scannable objects. + """ + self.info(f"Listing scannable objects from Prometheus") + self.debug(f"Namespaces: {self.config.namespaces}") + try: + objects_tuple = await asyncio.gather( + self._list_deployments(), + ) + except Exception as e: + self.error(f"Error trying to list pods from Prometheus: {e}") + self.debug_exception() + return [] + + objects = itertools.chain(*objects_tuple) + if self.config.namespaces == "*": + # NOTE: We are not scanning kube-system namespace by default + result = [obj for obj in objects if obj.namespace != "kube-system"] + else: + result = [obj for obj in objects if obj.namespace in self.config.namespaces] + + namespaces = {obj.namespace for obj in result} + self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces from Prometheus({self.config.prometheus_url})") + + return result + + async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations: + limits = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_limits{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}", ' + f'container="{container_name}"' + "})") + requests = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_requests{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}", ' + f'container="{container_name}"' + "})") + requests_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + limits_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None} + for limit in limits: + if limit['metric']['resource'] == ResourceType.CPU: + limits_values[ResourceType.CPU] = float(limit['value'][1]) + elif limit['metric']['resource'] == ResourceType.Memory: + limits_values[ResourceType.Memory] = float(limit['value'][1]) + + for request in requests: + if request['metric']['resource'] == ResourceType.CPU: + requests_values[ResourceType.CPU] = float(request['value'][1]) + elif request['metric']['resource'] == ResourceType.Memory: + requests_values[ResourceType.Memory] = float(request['value'][1]) + return ResourceAllocations(requests=requests_values, limits=limits_values) + + + async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str]) -> List[K8sObjectData]: + return [ + K8sObjectData( + cluster=None, + namespace=namespace, + name=app_name, + kind="Deployment", + container=container_name, + allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find + pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods + ) + for container_name in containers + ] + + async def _list_containers(self, namespace: str, pod_selector: str) -> List[str]: + containers = await self.metrics_loader.loader.query("count by (container) (kube_pod_container_info{" + f'namespace="{namespace}", ' + f'pod=~"{pod_selector}"' + "})") + return [container['metric']['container'] for container in containers] + + async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str) -> list[K8sObjectData]: + if pod_owner_kind == "ReplicaSet": + # owner_name is ReplicaSet names + pods = await self.metrics_loader.loader.query("count by (owner_name, replicaset, pod) (kube_pod_owner{" + f'namespace="{namespace}", ' + f'owner_name=~"{owner_name}", ' + 'owner_kind="ReplicaSet"})') + if pods is None or len(pods) == 0: + return [] # no container + # [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']}, + # {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}] + pod_names = [pod['metric']['pod'] for pod in pods] + container_names = await self._list_containers(namespace, "|".join(pod_names)) + return await self.__build_from_owner(namespace, app_name, container_names, pod_names) + return [] + + async def _list_deployments(self) -> list[K8sObjectData]: + self.debug(f"Listing deployments in namespace({self.config.namespaces}) from Prometheus({self.config.prometheus_url})") + ns = "|".join(self.config.namespaces) + replicasets = await self.metrics_loader.loader.query("count by (namespace, owner_name, replicaset) (kube_replicaset_owner{" + f'namespace=~"{ns}", ' + 'owner_kind="Deployment"})') + # groupBy: 'ns/owner_name' => [{metadata}...] + pod_owner_kind = "ReplicaSet" + replicaset_dict = defaultdict(list) + for replicaset in replicasets: + replicaset_dict[replicaset['metric']['namespace'] + "/" + replicaset['metric']['owner_name']].append(replicaset['metric']) + objects = await asyncio.gather( + *[ + self._list_containers_in_pods(replicas[0]['owner_name'], pod_owner_kind, replicas[0]['namespace'], + "|".join(list(map(lambda metric: metric['replicaset'], replicas)))) + for replicas in replicaset_dict.values() + ] + ) + return list(itertools.chain(*objects)) diff --git a/robusta_krr/core/integrations/prometheus/loader.py b/robusta_krr/core/integrations/prometheus/loader.py index 590bc830..1d11d71f 100644 --- a/robusta_krr/core/integrations/prometheus/loader.py +++ b/robusta_krr/core/integrations/prometheus/loader.py @@ -24,6 +24,8 @@ class MetricsLoader(Configurable): + loader: MetricsService + def __init__( self, config: Config, @@ -48,11 +50,11 @@ def __init__( else None ) loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster) + if loader is None: raise PrometheusNotFound("No Prometheus or metrics service found") self.loader = loader - self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster") def get_metrics_service( diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py index e2654e8f..acecb273 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/base_metric_service.py @@ -36,6 +36,10 @@ def __init__( @abc.abstractmethod def check_connection(self): ... + + @abc.abstractmethod + async def query(self, query: str) -> dict: + ... def name(self) -> str: classname = self.__class__.__name__ diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 36eb1529..012734f6 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -16,6 +16,7 @@ class Config(pd.BaseSettings): clusters: Union[list[str], Literal["*"], None] = None kubeconfig: Optional[str] = None namespaces: Union[list[str], Literal["*"]] = pd.Field("*") + discovery_method: Literal["api-server", "prometheus"] = pd.Field("api-server") selector: Optional[str] = None # Value settings diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 52c4fbd4..90af54a6 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -1,6 +1,7 @@ from typing import Optional import pydantic as pd +from typing import Optional from robusta_krr.core.models.allocations import ResourceAllocations diff --git a/robusta_krr/core/runner.py b/robusta_krr/core/runner.py index 68ceb18d..26233a9c 100644 --- a/robusta_krr/core/runner.py +++ b/robusta_krr/core/runner.py @@ -6,6 +6,7 @@ from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult from robusta_krr.core.integrations.kubernetes import KubernetesLoader from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound +from robusta_krr.core.integrations.metrics import PrometheusLoader from robusta_krr.core.models.config import Config from robusta_krr.core.models.objects import K8sObjectData from robusta_krr.core.models.result import ( @@ -21,13 +22,15 @@ from robusta_krr.utils.progress_bar import ProgressBar from robusta_krr.utils.version import get_version - class Runner(Configurable): EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound) def __init__(self, config: Config) -> None: super().__init__(config) - self._k8s_loader = KubernetesLoader(self.config) + if config.discovery_method == "api-server": + self._workload_loader = KubernetesLoader(self.config) + else: + self._workload_loader = PrometheusLoader(self.config) self._metrics_service_loaders: dict[Optional[str], Union[MetricsLoader, Exception]] = {} self._metrics_service_loaders_error_logged: set[Exception] = set() self._strategy = self.config.create_strategy() @@ -152,8 +155,8 @@ async def _gather_objects_recommendations( ] async def _collect_result(self) -> Result: - clusters = await self._k8s_loader.list_clusters() - if clusters and len(clusters) > 1 and self.config.prometheus_url: + clusters = await self._workload_loader.list_clusters() + if clusters is not None and len(clusters) > 1 and self.config.prometheus_url: # this can only happen for multi-cluster querying a single centeralized prometheus # In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect raise ClusterNotSpecifiedException( @@ -161,7 +164,7 @@ async def _collect_result(self) -> Result: ) self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}') - objects = await self._k8s_loader.list_scannable_objects(clusters) + objects = await self._workload_loader.list_scannable_objects(clusters) if len(objects) == 0: self.warning("Current filters resulted in no objects available to scan.") @@ -194,9 +197,11 @@ async def run(self) -> None: try: self.config.load_kubeconfig() except Exception as e: - self.error(f"Could not load kubernetes configuration: {e}") - self.error("Try to explicitly set --context and/or --kubeconfig flags.") - return + if self.config.prometheus_url is None: + self.error(f"Could not load kubernetes configuration: {e}") + self.error("Try to explicitly set --context and/or --kubeconfig flags.") + return + self.warning("Could not load kubernetes configuration, use Prometheus-based worload instead.") try: result = await self._collect_result() diff --git a/robusta_krr/main.py b/robusta_krr/main.py index db974334..fe5961ac 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -72,6 +72,10 @@ def {func_name}( help="List of namespaces to run on. By default, will run on all namespaces.", rich_help_panel="Kubernetes Settings" ), + discovery_method: Optional[str] = typer.Option( + "api-server", + "--discovery-method", + help="Method to discover workload in the cluster.", selector: Optional[str] = typer.Option( None, "--selector", @@ -137,6 +141,7 @@ def {func_name}( kubeconfig=kubeconfig, clusters="*" if all_clusters else clusters, namespaces="*" if "*" in namespaces else namespaces, + discovery_method=discovery_method, selector=selector, prometheus_url=prometheus_url, prometheus_auth_header=prometheus_auth_header,