diff --git a/scripts/cve-reports/README.md b/scripts/cve-reports/README.md index c2bc3de..947ab72 100644 --- a/scripts/cve-reports/README.md +++ b/scripts/cve-reports/README.md @@ -14,6 +14,8 @@ send-scan.py --report-path --jira-url - `--report-path` - Specifies location of report(s). If it is single file, it will be parsed on its own. If it is a directory all files in the directory will be parsed one by one. Only JSON and SARIF formats are supported. - `--jira-url` - Specifies URL of Jira automation to which reports should be sent. +- `--add-github-meta` - If set, GitHub metadata is added to the sent request. This metadata will be generated automatically from the runner's environment variables: GITHUB_SERVER_URL, GITHUB_RUN_ID, GITHUB_SHA, GITHUB_REPOSITORY. The default behaviour is to not send the metadata. +- `--verbose` - If set, the body and return code of request will be printed. Defaults to non-verbose. ## Testing diff --git a/scripts/cve-reports/send-scan.py b/scripts/cve-reports/send-scan.py index f1f260b..fbc3f74 100755 --- a/scripts/cve-reports/send-scan.py +++ b/scripts/cve-reports/send-scan.py @@ -1,5 +1,5 @@ #!/usr/bin/python3 -# Copyright 2023 Canonical Ltd. +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. # @@ -7,11 +7,18 @@ import argparse import json +import logging import os +import sys from pathlib import Path +from typing import Dict, List import requests +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO, stream=sys.stdout, format="%(message)s") + + severity_to_priority_map = { "CRITICAL": "Highest", "HIGH": "High", @@ -19,8 +26,25 @@ } -def parse_json(filename): - """Parse JSON file""" +def get_github_meta() -> Dict[str, str]: + """Return a dictionary with GitHub metadata.""" + return { + "github_server_url": os.getenv("GITHUB_SERVER_URL"), + "github_run_id": os.getenv("GITHUB_RUN_ID"), + "github_sha": os.getenv("GITHUB_SHA"), + "github_repository": os.getenv("GITHUB_REPOSITORY"), + } + + +def parse_json(filename: Path) -> List[Dict[str, str]]: + """Parse JSON file. + + Args: + filename(Path): path of file to parse. + Returns: + List[Dict[str, str]]: List of CVE dictionary. + """ + record_list = [] with open(filename, "r") as json_file: data = json.load(json_file) @@ -58,18 +82,23 @@ def parse_json(filename): "priority": severity_to_priority_map.get(vuln["Severity"], "Lowest"), } ) - return record_list -def parse_sarif(filename): - """Parse SARIF file""" +def parse_sarif(filename: Path) -> List[Dict[str, str]]: + """Parse SARIF file. + + Args: + filename(Path): path of file to parse. + Returns: + List[Dict[str, str]]: List of CVE dictionary. + """ record_list = [] with open(filename, "r") as json_file: data = json.load(json_file) if "runs" not in data and "tool" not in data["runs"][0]: # no scan results found, skip this report - print(f"No results in report {filename}") + logger.warning(f"No results in report {filename}") return [] rules = data["runs"][0]["tool"]["driver"]["rules"] @@ -100,13 +129,45 @@ def parse_sarif(filename): "priority": severity_to_priority_map.get(severity, "Lowest"), } ) - return record_list -def main(report_path, jira_url): - input_path = Path(report_path) +def send_request_with_records( + records: List[Dict[str, str]], + jira_url: str, + gh_metadata: Dict[str, str] = {}, + verbose: bool = False, +) -> None: + """Send the request with records to Jira. + + Args: + records(List[Dict[str, str]]): a list of records of CVE's. + jira_url(str): the url to send the request to. + gh_metadata(Dict[str, str]): a dictionary containing the GitHub metadata + to attach to the request, defaults to {}. + verbose(bool): if True body of request and response code will be printed. + """ + + for record in records: + record = {**record, **gh_metadata} + res = requests.post(jira_url, json=record) + if verbose: + logger.info(record) + logger.info(res) + + +def main(report_path: str, jira_url: str, gh_meta: bool, verbose: bool) -> None: + """Main function for processing CVE's files. + + Args: + report_path(str): path where report is stored + jira_url(str): the url to send the request to. + gh_meta(bool): if True GitHub metadata will be attached to the request. + verbose(bool): if True GitHub metadata will be attached to the request. + """ + input_path = Path(report_path) + gh_metadata = get_github_meta() if gh_meta else {} file_list = [] if input_path.is_dir(): # directory is supplied, retrieve list of files @@ -114,31 +175,30 @@ def main(report_path, jira_url): elif input_path.is_file(): file_list.append(input_path) else: - print(f"Invalid input {report_path} supplied") + logger.error(f"Invalid input {report_path} supplied") return if not file_list: - print(f"Failed to retrieve list of files from {report_path}") + logger.error(f"Failed to retrieve list of files from {report_path}") return for file in file_list: - print(f"Processing report in: {file}") + logger.info(f"Processing report in: {file}") if file.suffix == ".json": records = parse_json(file) elif file.suffix == ".sarif": records = parse_sarif(file) else: - print(f"Unsupported file type: {file}. Skip it.") + logger.warning(f"Unsupported file type: {file}. Skip it.") continue - - # send records - for record in records: - requests.post(jira_url, json=record) + send_request_with_records(records, jira_url, gh_metadata, verbose) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--report-path") parser.add_argument("--jira-url") + parser.add_argument("--add-github-meta", action="store_true") + parser.add_argument("--verbose", action="store_true") args = parser.parse_args() - main(args.report_path, args.jira_url) + main(args.report_path, args.jira_url, args.add_github_meta, args.verbose)