|
| 1 | +import os |
| 2 | +import json |
| 3 | +import requests |
| 4 | +from typing import Optional, Dict |
| 5 | + |
| 6 | +SONARQUBE_URL = "http://localhost:9000" |
| 7 | +PAGE_SIZE = 500 |
| 8 | + |
| 9 | +class SonarQubeAPI: |
| 10 | + def __init__(self, base_url: str = SONARQUBE_URL, token: Optional[str] = None): |
| 11 | + self.base_url = base_url.rstrip('/') |
| 12 | + self.token = token or os.getenv('SONARQUBE_TOKEN') |
| 13 | + |
| 14 | + if not self.token: |
| 15 | + raise ValueError("SonarQube token is required. Provide it as parameter or set SONARQUBE_TOKEN environment variable.") |
| 16 | + |
| 17 | + self.headers = { |
| 18 | + 'Authorization': f'Bearer {self.token}', |
| 19 | + 'Content-Type': 'application/json' |
| 20 | + } |
| 21 | + |
| 22 | + def _get_issues(self, component_key: str) -> Dict: |
| 23 | + url = f"{self.base_url}/api/issues/search" |
| 24 | + all_issues = [] |
| 25 | + page = 1 |
| 26 | + |
| 27 | + while True: |
| 28 | + params = { |
| 29 | + 'componentKeys': component_key, |
| 30 | + 'ps': PAGE_SIZE, |
| 31 | + 'p': page |
| 32 | + } |
| 33 | + try: |
| 34 | + response = requests.get(url, headers=self.headers, params=params) |
| 35 | + response.raise_for_status() |
| 36 | + data = response.json() |
| 37 | + |
| 38 | + page_issues = data.get('issues', []) |
| 39 | + all_issues.extend(page_issues) |
| 40 | + |
| 41 | + if len(page_issues) == 0: |
| 42 | + break |
| 43 | + page += 1 |
| 44 | + |
| 45 | + except requests.exceptions.RequestException as e: |
| 46 | + print(f"Error calling SonarQube API: {e}") |
| 47 | + if hasattr(e, 'response') and e.response is not None: |
| 48 | + print(f"Response status: {e.response.status_code}") |
| 49 | + print(f"Response text: {e.response.text}") |
| 50 | + raise |
| 51 | + return { |
| 52 | + 'total': len(all_issues), |
| 53 | + 'issues': all_issues |
| 54 | + } |
| 55 | + |
| 56 | + def _get_rule_details(self, rule_key: str) -> Dict: |
| 57 | + url = f"{self.base_url}/api/rules/show" |
| 58 | + params = { |
| 59 | + 'key': rule_key |
| 60 | + } |
| 61 | + try: |
| 62 | + response = requests.get(url, headers=self.headers, params=params) |
| 63 | + response.raise_for_status() |
| 64 | + data = response.json() |
| 65 | + except requests.exceptions.RequestException as e: |
| 66 | + print(f"Error calling SonarQube API: {e}") |
| 67 | + if hasattr(e, 'response') and e.response is not None: |
| 68 | + print(f"Response status: {e.response.status_code}") |
| 69 | + print(f"Response text: {e.response.text}") |
| 70 | + raise |
| 71 | + return data |
| 72 | + |
| 73 | + def is_scan_successful(self, project_key: str) -> bool: |
| 74 | + url = f"{self.base_url}/api/ce/component" |
| 75 | + params = { |
| 76 | + 'component': project_key |
| 77 | + } |
| 78 | + try: |
| 79 | + response = requests.get(url, headers=self.headers, params=params) |
| 80 | + response.raise_for_status() |
| 81 | + data = response.json() |
| 82 | + info = data.get('current', {}) |
| 83 | + result = info.get('status', '') == 'SUCCESS' |
| 84 | + return result |
| 85 | + except Exception as e: |
| 86 | + print(f"Error: {e}") |
| 87 | + return False |
| 88 | + |
| 89 | + def get_issues_for_file(self, project_key: str, file_path: str) -> Dict: |
| 90 | + component_key = f"{project_key}:{file_path}" |
| 91 | + return self._get_issues(component_key) |
| 92 | + |
| 93 | + def get_all_issues(self, project_key: str) -> Dict: |
| 94 | + return self._get_issues(project_key) |
| 95 | + |
| 96 | + def get_rules_and_fix_method(self, rule_key: str) -> Dict: |
| 97 | + data = self._get_rule_details(rule_key) |
| 98 | + result = {} |
| 99 | + rule = data.get('rule', {}) |
| 100 | + if rule: |
| 101 | + result['rule_key'] = rule.get('key', '') |
| 102 | + result['rule_name'] = rule.get('name', '') |
| 103 | + result['severity'] = rule.get('severity', '') |
| 104 | + result['type'] = rule.get('type', '') |
| 105 | + |
| 106 | + description_sections = rule.get('descriptionSections', []) |
| 107 | + for section in description_sections: |
| 108 | + section_key = section.get('key', '') |
| 109 | + section_content = section.get('content', '') |
| 110 | + result[section_key] = section_content |
| 111 | + return result |
| 112 | + |
| 113 | + def print_all_issues(self, project_key: str) -> None: |
| 114 | + try: |
| 115 | + issues_data = self.get_all_issues(project_key) |
| 116 | + issues = issues_data.get('issues', []) |
| 117 | + total = issues_data.get('total', 0) |
| 118 | + |
| 119 | + severity_count = {} |
| 120 | + for issue in issues: |
| 121 | + impacts = issue.get('impacts', []) |
| 122 | + if impacts and len(impacts) > 0: |
| 123 | + severity = impacts[0].get('severity', 'UNKNOWN') |
| 124 | + severity_count[severity] = severity_count.get(severity, 0) + 1 |
| 125 | + print(f"Total issues: {total}") |
| 126 | + for severity, count in sorted(severity_count.items()): |
| 127 | + print(f"{severity}: {count}") |
| 128 | + except Exception as e: |
| 129 | + print(f"Error: {e}") |
| 130 | + |
| 131 | + def print_file_issues(self, project_key: str, file_path: str) -> None: |
| 132 | + try: |
| 133 | + issues_data = self.get_issues_for_file(project_key, file_path) |
| 134 | + issues = issues_data.get('issues', []) |
| 135 | + |
| 136 | + if issues: |
| 137 | + # Show only first 3 issues |
| 138 | + for i, issue in enumerate(issues[:3], 1): |
| 139 | + print(f"{i}. {issue.get('message', 'No message')}") |
| 140 | + print(f" Rule: {issue.get('rule', 'Unknown')}") |
| 141 | + |
| 142 | + impacts = issue.get('impacts', []) |
| 143 | + severity = 'Unknown' |
| 144 | + if impacts and len(impacts) > 0: |
| 145 | + severity = impacts[0].get('severity', 'Unknown') |
| 146 | + |
| 147 | + print(f" Severity: {severity}") |
| 148 | + print(f" Type: {issue.get('type', 'Unknown')}") |
| 149 | + |
| 150 | + if 'textRange' in issue: |
| 151 | + line = issue['textRange'].get('startLine', 'Unknown') |
| 152 | + print(f" Line: {line}") |
| 153 | + print() |
| 154 | + # Show remaining count if there are more than 3 issues |
| 155 | + if len(issues) > 3: |
| 156 | + remaining = len(issues) - 3 |
| 157 | + print(f"... and {remaining} more issues not shown") |
| 158 | + else: |
| 159 | + print("No issues found") |
| 160 | + except Exception as e: |
| 161 | + print(f"Error: {e}") |
| 162 | + |
| 163 | + def save_all_issues(self, project_key: str, file_path: str) -> None: |
| 164 | + try: |
| 165 | + issues_data = self.get_all_issues(project_key) |
| 166 | + with open(file_path, 'w') as f: |
| 167 | + json.dump(issues_data, f, indent=4) |
| 168 | + print(f"All issues saved to {file_path}") |
| 169 | + except Exception as e: |
| 170 | + print(f"Error: {e}") |
| 171 | + |
| 172 | +if __name__ == "__main__": |
| 173 | + api = SonarQubeAPI() |
| 174 | + project_key = "commons-collections" |
| 175 | + file_path = "src/main/java/org/apache/commons/collections4/map/AbstractHashedMap.java" |
| 176 | + rule_key_1 = "java:S2160" |
| 177 | + rule_key_2 = "java:S1117" |
| 178 | + rule_key_3 = "java:S5993" |
| 179 | + print(f"Checking scan success for project {project_key}: {api.is_scan_successful(project_key)}") |
| 180 | + |
| 181 | + print("Project issues summary:") |
| 182 | + api.print_all_issues(project_key) |
| 183 | + |
| 184 | + print(f"\nFile issues details:") |
| 185 | + api.print_file_issues(project_key, file_path) |
| 186 | + |
| 187 | + print(f"\nRule details:") |
| 188 | + print(api.get_rules_and_fix_method(rule_key_3)) |
0 commit comments