Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions scanpipe/pipes/vulnerablecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

import logging
from collections.abc import Callable
from collections.abc import Iterable
from typing import Any

from django.conf import settings

Expand Down Expand Up @@ -50,14 +53,14 @@
session.headers.update({"Authorization": f"Token {VULNERABLECODE_API_KEY}"})


def is_configured():
def is_configured() -> bool:
"""Return True if the required VulnerableCode settings have been set."""
if VULNERABLECODE_API_URL:
return True
return False


def is_available():
def is_available() -> bool:
"""Return True if the configured VulnerableCode server is available."""
if not is_configured():
return False
Expand All @@ -72,7 +75,7 @@ def is_available():
return response.status_code == requests.codes.ok


def chunked(iterable, chunk_size):
def chunked(iterable: list[Any], chunk_size: int) -> Iterable[list[Any]]:
"""
Break an `iterable` into lists of `chunk_size` length.

Expand All @@ -86,19 +89,19 @@ def chunked(iterable, chunk_size):
yield iterable[index:end]


def get_purls(packages):
def get_purls(packages: list[Any]) -> list[str]:
"""Return the PURLs for the given list of `packages`."""
return [package_url for package in packages if (package_url := package.package_url)]


def request_get(
url,
payload=None,
timeout=None,
):
url: str | None,
payload: dict[str, Any] | None = None,
timeout: int | None = None,
) -> dict[str, Any] | None:
"""Wrap the HTTP request calls on the API."""
if not url:
return
return None

params = {"format": "json"}
if payload:
Expand All @@ -111,41 +114,45 @@ def request_get(
return response.json()
except (requests.RequestException, ValueError, TypeError) as exception:
logger.debug(f"{label} [Exception] {exception}")
return None


def request_post(
url,
data,
timeout=None,
):
url: str,
data: dict[str, Any],
timeout: int | None = None,
) -> dict[str, Any] | None:
"""Wrap the HTTP POST request calls on the API."""
try:
response = session.post(url, json=data, timeout=timeout)
response.raise_for_status()
return response.json()
except (requests.RequestException, ValueError, TypeError) as exception:
logger.debug(f"{label} [Exception] {exception}")
return None


def _get_vulnerabilities(
url,
field_name,
field_value,
timeout=None,
):
url: str,
field_name: str,
field_value: str,
timeout: int | None = None,
) -> list[dict[str, Any]] | None:
"""Get the list of vulnerabilities."""
payload = {field_name: field_value}

response = request_get(url=url, payload=payload, timeout=timeout)
if response and response.get("count"):
results = response["results"]
return results
return None


def get_vulnerabilities_by_purl(
purl,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
purl: str,
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> list[dict[str, Any]] | None:
"""Get the list of vulnerabilities providing a package `purl`."""
return _get_vulnerabilities(
url=f"{api_url}packages/",
Expand All @@ -156,10 +163,10 @@ def get_vulnerabilities_by_purl(


def get_vulnerabilities_by_cpe(
cpe,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
cpe: str,
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> list[dict[str, Any]] | None:
"""Get the list of vulnerabilities providing a package or component `cpe`."""
return _get_vulnerabilities(
url=f"{api_url}cpes/",
Expand All @@ -170,10 +177,10 @@ def get_vulnerabilities_by_cpe(


def bulk_search_by_purl(
purls,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
purls: list[str],
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> list[dict[str, Any]] | None:
"""Bulk search of vulnerabilities using the provided list of `purls`."""
url = f"{api_url}packages/bulk_search"

Expand All @@ -183,14 +190,18 @@ def bulk_search_by_purl(
}

logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}")
return request_post(url, data, timeout)
response = request_post(url, data, timeout)
# API returns a list of vulnerability data dicts, not a dict
if isinstance(response, list):
return response
return None


def bulk_search_by_cpes(
cpes,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
cpes: list[str],
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> dict[str, Any] | None:
"""Bulk search of vulnerabilities using the provided list of `cpes`."""
url = f"{api_url}cpes/bulk_search"

Expand All @@ -202,7 +213,9 @@ def bulk_search_by_cpes(
return request_post(url, data, timeout)


def filter_vulnerabilities(vulnerabilities, ignore_set):
def filter_vulnerabilities(
vulnerabilities: list[dict[str, Any]], ignore_set: set[str]
) -> list[dict[str, Any]]:
"""Filter out vulnerabilities based on a list of ignored IDs and aliases."""
return [
vulnerability
Expand All @@ -213,8 +226,11 @@ def filter_vulnerabilities(vulnerabilities, ignore_set):


def fetch_vulnerabilities(
packages, chunk_size=1000, logger=logger.info, ignore_set=None
):
packages: list[Any],
chunk_size: int = 1000,
logger: Callable[[str], None] = logger.info,
ignore_set: set[str] | None = None,
) -> None:
"""
Fetch and store vulnerabilities for each provided ``packages``.
The PURLs are used for the lookups in batch of ``chunk_size`` per request.
Expand All @@ -223,8 +239,9 @@ def fetch_vulnerabilities(

for purls_batch in chunked(get_purls(packages), chunk_size):
response_data = bulk_search_by_purl(purls_batch)
for vulnerability_data in response_data:
vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data
if response_data:
for vulnerability_data in response_data:
vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data

unsaved_objects = []
for package in packages:
Expand Down