From 588e986c661e6662fb2428da61ab6d76ec21f5eb Mon Sep 17 00:00:00 2001 From: hemant-rgb Date: Wed, 24 Dec 2025 11:20:01 +0530 Subject: [PATCH] Improve docstring, add type hints, and fix minor typos in vulnerablecode.py Signed-off-by: hemant-rgb --- scanpipe/pipes/vulnerablecode.py | 97 +++++++++++++++++++------------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/scanpipe/pipes/vulnerablecode.py b/scanpipe/pipes/vulnerablecode.py index 6c6073b5d0..1cc4978df6 100644 --- a/scanpipe/pipes/vulnerablecode.py +++ b/scanpipe/pipes/vulnerablecode.py @@ -21,6 +21,9 @@ # Visit https://github.com/aboutcode-org/scancode.io for support and download. import logging +from collections.abc import Callable +from collections.abc import Iterable +from typing import Any from django.conf import settings @@ -50,14 +53,14 @@ session.headers.update({"Authorization": f"Token {VULNERABLECODE_API_KEY}"}) -def is_configured(): +def is_configured() -> bool: """Return True if the required VulnerableCode settings have been set.""" if VULNERABLECODE_API_URL: return True return False -def is_available(): +def is_available() -> bool: """Return True if the configured VulnerableCode server is available.""" if not is_configured(): return False @@ -72,7 +75,7 @@ def is_available(): return response.status_code == requests.codes.ok -def chunked(iterable, chunk_size): +def chunked(iterable: list[Any], chunk_size: int) -> Iterable[list[Any]]: """ Break an `iterable` into lists of `chunk_size` length. @@ -86,19 +89,19 @@ def chunked(iterable, chunk_size): yield iterable[index:end] -def get_purls(packages): +def get_purls(packages: list[Any]) -> list[str]: """Return the PURLs for the given list of `packages`.""" return [package_url for package in packages if (package_url := package.package_url)] def request_get( - url, - payload=None, - timeout=None, -): + url: str | None, + payload: dict[str, Any] | None = None, + timeout: int | None = None, +) -> dict[str, Any] | None: """Wrap the HTTP request calls on the API.""" if not url: - return + return None params = {"format": "json"} if payload: @@ -111,27 +114,30 @@ def request_get( return response.json() except (requests.RequestException, ValueError, TypeError) as exception: logger.debug(f"{label} [Exception] {exception}") + return None def request_post( - url, - data, - timeout=None, -): + url: str, + data: dict[str, Any], + timeout: int | None = None, +) -> dict[str, Any] | None: + """Wrap the HTTP POST request calls on the API.""" try: response = session.post(url, json=data, timeout=timeout) response.raise_for_status() return response.json() except (requests.RequestException, ValueError, TypeError) as exception: logger.debug(f"{label} [Exception] {exception}") + return None def _get_vulnerabilities( - url, - field_name, - field_value, - timeout=None, -): + url: str, + field_name: str, + field_value: str, + timeout: int | None = None, +) -> list[dict[str, Any]] | None: """Get the list of vulnerabilities.""" payload = {field_name: field_value} @@ -139,13 +145,14 @@ def _get_vulnerabilities( if response and response.get("count"): results = response["results"] return results + return None def get_vulnerabilities_by_purl( - purl, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + purl: str, + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> list[dict[str, Any]] | None: """Get the list of vulnerabilities providing a package `purl`.""" return _get_vulnerabilities( url=f"{api_url}packages/", @@ -156,10 +163,10 @@ def get_vulnerabilities_by_purl( def get_vulnerabilities_by_cpe( - cpe, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + cpe: str, + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> list[dict[str, Any]] | None: """Get the list of vulnerabilities providing a package or component `cpe`.""" return _get_vulnerabilities( url=f"{api_url}cpes/", @@ -170,10 +177,10 @@ def get_vulnerabilities_by_cpe( def bulk_search_by_purl( - purls, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + purls: list[str], + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> list[dict[str, Any]] | None: """Bulk search of vulnerabilities using the provided list of `purls`.""" url = f"{api_url}packages/bulk_search" @@ -183,14 +190,18 @@ def bulk_search_by_purl( } logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}") - return request_post(url, data, timeout) + response = request_post(url, data, timeout) + # API returns a list of vulnerability data dicts, not a dict + if isinstance(response, list): + return response + return None def bulk_search_by_cpes( - cpes, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + cpes: list[str], + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> dict[str, Any] | None: """Bulk search of vulnerabilities using the provided list of `cpes`.""" url = f"{api_url}cpes/bulk_search" @@ -202,7 +213,9 @@ def bulk_search_by_cpes( return request_post(url, data, timeout) -def filter_vulnerabilities(vulnerabilities, ignore_set): +def filter_vulnerabilities( + vulnerabilities: list[dict[str, Any]], ignore_set: set[str] +) -> list[dict[str, Any]]: """Filter out vulnerabilities based on a list of ignored IDs and aliases.""" return [ vulnerability @@ -213,8 +226,11 @@ def filter_vulnerabilities(vulnerabilities, ignore_set): def fetch_vulnerabilities( - packages, chunk_size=1000, logger=logger.info, ignore_set=None -): + packages: list[Any], + chunk_size: int = 1000, + logger: Callable[[str], None] = logger.info, + ignore_set: set[str] | None = None, +) -> None: """ Fetch and store vulnerabilities for each provided ``packages``. The PURLs are used for the lookups in batch of ``chunk_size`` per request. @@ -223,8 +239,9 @@ def fetch_vulnerabilities( for purls_batch in chunked(get_purls(packages), chunk_size): response_data = bulk_search_by_purl(purls_batch) - for vulnerability_data in response_data: - vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data + if response_data: + for vulnerability_data in response_data: + vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data unsaved_objects = [] for package in packages: