Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions robusta_krr/core/integrations/base_workload_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import abc
from typing import Optional

from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.utils.configurable import Configurable

class WorkloadLoader(Configurable, abc.ABC):

@abc.abstractmethod
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
...

@abc.abstractmethod
async def list_clusters(self) -> Optional[list[str]]:
...
3 changes: 2 additions & 1 deletion robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from robusta_krr.core.models.objects import K8sObjectData, PodData, HPAData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable
from .base_workload_loader import WorkloadLoader


AnyKubernetesAPIObject = Union[V1Deployment, V1DaemonSet, V1StatefulSet, V1Pod, V1Job]
Expand Down Expand Up @@ -243,7 +244,7 @@ def __get_metric(hpa: V2HorizontalPodAutoscaler, metric_name: str) -> Optional[f
}


class KubernetesLoader(Configurable):
class KubernetesLoader(WorkloadLoader):
async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.

Expand Down
151 changes: 151 additions & 0 deletions robusta_krr/core/integrations/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import asyncio
import itertools
from typing import Optional, List, Dict
from collections import defaultdict

from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceAllocations, ResourceType, RecommendationValue
from .prometheus.loader import MetricsLoader
from .base_workload_loader import WorkloadLoader

class PrometheusLoader(WorkloadLoader):
def __init__(self, config: Config):
super().__init__(config)
self.metrics_loader = MetricsLoader(config)

async def list_clusters(self) -> Optional[list[str]]:
self.debug("Working in Prometheus-based workload discovery mode. Only support a single cluster")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
"""List all scannable objects from Prometheus
In this workload discovery mode, clusters are not supported.

Returns:
A list of scannable objects.
"""
self.info(f"Listing scannable objects from Prometheus")
self.debug(f"Namespaces: {self.config.namespaces}")
try:
objects_tuple = await asyncio.gather(
self._list_deployments(),
)
except Exception as e:
self.error(f"Error trying to list pods from Prometheus: {e}")
self.debug_exception()
return []

objects = itertools.chain(*objects_tuple)
if self.config.namespaces == "*":
# NOTE: We are not scanning kube-system namespace by default
result = [obj for obj in objects if obj.namespace != "kube-system"]
else:
result = [obj for obj in objects if obj.namespace in self.config.namespaces]

namespaces = {obj.namespace for obj in result}
self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces from Prometheus({self.config.prometheus_url})")

return result

async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations:
limits = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_limits{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})")
requests = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_requests{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})")
requests_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
limits_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
for limit in limits:
if limit['metric']['resource'] == ResourceType.CPU:
limits_values[ResourceType.CPU] = float(limit['value'][1])
elif limit['metric']['resource'] == ResourceType.Memory:
limits_values[ResourceType.Memory] = float(limit['value'][1])

for request in requests:
if request['metric']['resource'] == ResourceType.CPU:
requests_values[ResourceType.CPU] = float(request['value'][1])
elif request['metric']['resource'] == ResourceType.Memory:
requests_values[ResourceType.Memory] = float(request['value'][1])
return ResourceAllocations(requests=requests_values, limits=limits_values)


async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str], labels: Dict[str, str]) -> List[K8sObjectData]:
return [
K8sObjectData(
cluster=None,
namespace=namespace,
labels=labels,
name=app_name,
kind="Deployment",
container=container_name,
allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find
pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
)
for container_name in containers
]

async def _list_containers(self, namespace: str, pod_selector: str) -> List[str]:
containers = await self.metrics_loader.loader.query("count by (container) (kube_pod_container_info{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}"'
"})")
return [container['metric']['container'] for container in containers]

async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str, labels: Dict[str, str]) -> list[K8sObjectData]:
if pod_owner_kind == "ReplicaSet":
# owner_name is ReplicaSet names
pods = await self.metrics_loader.loader.query("count by (owner_name, replicaset, pod) (kube_pod_owner{"
f'namespace="{namespace}", '
f'owner_name=~"{owner_name}", '
'owner_kind="ReplicaSet"})')
if pods is None or len(pods) == 0:
return [] # no container
# [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']},
# {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}]
pod_names = [pod['metric']['pod'] for pod in pods]
container_names = await self._list_containers(namespace, "|".join(pod_names))
return await self.__build_from_owner(namespace, app_name, container_names, pod_names, labels)
return []

async def _list_labels(self, owner_kind: str, namespace: str, owner_name: str) -> Dict[str, str]:
if owner_kind == "Deployment":
self.debug(f"{owner_kind} in {namespace}: {owner_name}")
labels_metric = await self.metrics_loader.loader.query("kube_deployment_labels{"
f'namespace="{namespace}", '
f'deployment="{owner_name}"'
"}")
if len(labels_metric) == 0:
return {}
labels = {}
for key in labels_metric[0]['metric'].keys():
if key.startswith('label_'):
labels[key[6:]] = labels_metric[0]['metric'][key]
return labels

async def _list_deployments(self) -> list[K8sObjectData]:
self.debug(f"Listing deployments in namespace({self.config.namespaces}) from Prometheus({self.config.prometheus_url})")
ns = "|".join(self.config.namespaces)
replicasets = await self.metrics_loader.loader.query("count by (namespace, owner_name, replicaset) (kube_replicaset_owner{"
f'namespace=~"{ns}", '
'owner_kind="Deployment"})')
# groupBy: 'ns/owner_name' => [{metadata}...]
pod_owner_kind = "ReplicaSet"
replicaset_dict = defaultdict(list)
for replicaset in replicasets:
replicaset_dict[replicaset['metric']['namespace'] + "/" + replicaset['metric']['owner_name']].append(replicaset['metric'])
objects = await asyncio.gather(
*[
self._list_containers_in_pods(replicas[0]['owner_name'], pod_owner_kind, replicas[0]['namespace'],
"|".join(list(map(lambda metric: metric['replicaset'], replicas))),
await self._list_labels("Deployment", replicas[0]['namespace'], replicas[0]['owner_name'])
)
for replicas in replicaset_dict.values()
]
)
return list(itertools.chain(*objects))
8 changes: 5 additions & 3 deletions robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@


class MetricsLoader(Configurable):
loader: MetricsService

def __init__(
self,
config: Config,
Expand All @@ -49,10 +51,10 @@ def __init__(
if cluster is not None
else None
)
self.loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster)
if not self.loader:
loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster)
if not loader:
raise PrometheusNotFound("No Prometheus or metrics service found")

self.loader = loader
self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster")

def get_metrics_service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __init__(
@abc.abstractmethod
def check_connection(self):
...

@abc.abstractmethod
async def query(self, query: str) -> dict:
...

def name(self) -> str:
classname = self.__class__.__name__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(

if self.auth_header:
headers = {"Authorization": self.auth_header}
elif not self.config.inside_cluster:
elif not self.config.inside_cluster and self.api_client is not None:
self.api_client.update_params_for_auth(headers, {}, ["BearerToken"])

self.prometheus = CustomPrometheusConnect(url=self.url, disable_ssl=not self.ssl_enabled, headers=headers)
Expand Down
1 change: 1 addition & 0 deletions robusta_krr/core/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Config(pd.BaseSettings):
clusters: Union[list[str], Literal["*"], None] = None
kubeconfig: Optional[str] = None
namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
discovery_method: Union[Literal["api-server"], Literal["prometheus"]] = pd.Field("api-server")

# Value settings
cpu_min_value: int = pd.Field(5, ge=0) # in millicores
Expand Down
2 changes: 2 additions & 0 deletions robusta_krr/core/models/objects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

import pydantic as pd
from typing import Optional, Dict

from robusta_krr.core.models.allocations import ResourceAllocations

Expand Down Expand Up @@ -31,6 +32,7 @@ class K8sObjectData(pd.BaseModel):
hpa: Optional[HPAData]
namespace: str
kind: str
labels: Dict[str, str]
allocations: ResourceAllocations

def __str__(self) -> str:
Expand Down
30 changes: 22 additions & 8 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
from robusta_krr.core.integrations.kubernetes import KubernetesLoader
from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound
from robusta_krr.core.integrations.metrics import PrometheusLoader
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.models.result import (
Expand All @@ -22,13 +23,23 @@
from robusta_krr.utils.progress_bar import ProgressBar
from robusta_krr.utils.version import get_version

async def gather_with_concurrency(n: int, *coros):
semaphore = asyncio.Semaphore(n)

async def sem_coro(coro):
async with semaphore:
return await coro
return await asyncio.gather(*(sem_coro(c) for c in coros))

class Runner(Configurable):
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)

def __init__(self, config: Config) -> None:
super().__init__(config)
self._k8s_loader = KubernetesLoader(self.config)
if config.discovery_method == "api-server":
self._workload_loader = KubernetesLoader(self.config)
else:
self._workload_loader = PrometheusLoader(self.config)
self._metrics_service_loaders: dict[Optional[str], Union[MetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = self.config.create_strategy()
Expand Down Expand Up @@ -136,7 +147,8 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> tupl
async def _gather_objects_recommendations(
self, objects: list[K8sObjectData]
) -> list[tuple[ResourceAllocations, MetricsData]]:
recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather(
recommendations: list[tuple[RunResult, MetricsData]] = await gather_with_concurrency(
self.config.max_workers,
*[self._calculate_object_recommendations(object) for object in objects]
)

Expand All @@ -153,16 +165,16 @@ async def _gather_objects_recommendations(
]

async def _collect_result(self) -> Result:
clusters = await self._k8s_loader.list_clusters()
if len(clusters) > 1 and self.config.prometheus_url:
clusters = await self._workload_loader.list_clusters()
if clusters is not None and len(clusters) > 1 and self.config.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
# In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect
raise ClusterNotSpecifiedException(
f"Cannot scan multiple clusters for this prometheus, Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}"
)

self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}')
objects = await self._k8s_loader.list_scannable_objects(clusters)
objects = await self._workload_loader.list_scannable_objects(clusters)

if len(objects) == 0:
self.warning("Current filters resulted in no objects available to scan.")
Expand Down Expand Up @@ -195,9 +207,11 @@ async def run(self) -> None:
try:
self.config.load_kubeconfig()
except Exception as e:
self.error(f"Could not load kubernetes configuration: {e}")
self.error("Try to explicitly set --context and/or --kubeconfig flags.")
return
if self.config.prometheus_url is None:
self.error(f"Could not load kubernetes configuration: {e}")
self.error("Try to explicitly set --context and/or --kubeconfig flags.")
return
self.warning("Could not load kubernetes configuration, use Prometheus-based worload instead.")

try:
result = await self._collect_result()
Expand Down
7 changes: 7 additions & 0 deletions robusta_krr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ def {func_name}(
help="List of namespaces to run on. By default, will run on all namespaces.",
rich_help_panel="Kubernetes Settings"
),
discovery_method: Optional[str] = typer.Option(
"api-server",
"--discovery-method",
help="Method to discover workload in the cluster.",
rich_help_panel="Kubernetes Settings"
),
prometheus_url: Optional[str] = typer.Option(
None,
"--prometheus-url",
Expand Down Expand Up @@ -124,6 +130,7 @@ def {func_name}(
kubeconfig=kubeconfig,
clusters="*" if all_clusters else clusters,
namespaces="*" if "*" in namespaces else namespaces,
discovery_method=discovery_method,
prometheus_url=prometheus_url,
prometheus_auth_header=prometheus_auth_header,
prometheus_ssl_enabled=prometheus_ssl_enabled,
Expand Down