diff --git a/scripts/ib_pipelines_check.py b/scripts/ib_pipelines_check.py index a9264f740c..09fc431063 100644 --- a/scripts/ib_pipelines_check.py +++ b/scripts/ib_pipelines_check.py @@ -7,6 +7,7 @@ import csv from datetime import datetime, timedelta, timezone import sys +import re import requests class PipelineChecker: @@ -23,6 +24,7 @@ def __init__(self): self.not_found_pipelines = [] self.skipped_pipelines = [] self.inactive_pipelines = [] + self.partial_coverage_pipelines = [] self.total_pipelines = 0 self.passed_pipelines = 0 self.init_csv_file() @@ -92,6 +94,26 @@ def get_jobs(self, endpoint, pipeline_id): response.raise_for_status() return response.json() + def get_job_trace(self, job_web_url): + """ + Get the log trace of a specific job using the job's web URL. + + Args: + job_web_url (str): The web URL of the job. + + Returns: + str: The job log trace. + """ + raw_log_url = job_web_url + '/trace' + response = requests.get(raw_log_url, timeout=20) + if response.status_code != 200: + print( + f"Failed to retrieve job trace from {raw_log_url}. " + f"Status code: {response.status_code}" + ) + return "" + return response.text + def check_rapidfort_scan(self, jobs): """ Check the status of the rapidfort-scan job. @@ -100,12 +122,12 @@ def check_rapidfort_scan(self, jobs): jobs (list): The list of jobs in the pipeline. Returns: - str: The status of the rapidfort-scan job or "not found" if the job does not exist. + tuple: (status, job) of the rapidfort-scan job. """ for job in jobs: if job['name'] == 'rapidfort-scan': - return job['status'] - return "not found" + return job['status'], job + return "not found", None @staticmethod def format_timestamp(iso_timestamp): @@ -124,13 +146,13 @@ def format_timestamp(iso_timestamp): @staticmethod def is_pipeline_inactive(pipeline_created_at): """ - Check if the pipeline was last run more than 7 days ago. + Check if the pipeline was last run more than 3 days ago. Args: pipeline_created_at (str): The ISO 8601 timestamp of the pipeline creation. Returns: - bool: True if the pipeline was run more than 7 days ago, False otherwise. + bool: True if the pipeline was run more than 3 days ago, False otherwise. """ dt = datetime.fromisoformat(pipeline_created_at.replace("Z", "+00:00")) return datetime.now(timezone.utc) - dt > timedelta(days=3) @@ -150,20 +172,50 @@ def process_pipeline(self, link): pipeline_web_url = latest_pipeline['web_url'] pipeline_time_created = self.format_timestamp(latest_pipeline['created_at']) if self.is_pipeline_inactive(latest_pipeline['created_at']): - self.inactive_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.inactive_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) jobs = self.get_jobs(endpoint, pipeline_id) - rf_scan_status = self.check_rapidfort_scan(jobs) - self.write_to_csv(pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name) - print(f"Time Created At: {pipeline_time_created}\nPipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\nrapidfort-scan status: {rf_scan_status}") + rf_scan_status, rf_scan_job = self.check_rapidfort_scan(jobs) + print( + f"Time Created At: {pipeline_time_created}\n" + f"Pipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\n" + f"rapidfort-scan status: {rf_scan_status}" + ) print("-" * 50) - if rf_scan_status == 'failed': - self.failed_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + if rf_scan_status == 'success': + job_trace = self.get_job_trace(rf_scan_job['web_url']) + if re.search(r'Partial coverage completed', job_trace, re.IGNORECASE): + self.partial_coverage_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) + rf_scan_status = 'success (partial coverage)' + else: + self.passed_pipelines += 1 + elif rf_scan_status == 'failed': + self.failed_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) elif rf_scan_status == 'not found': - self.not_found_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.not_found_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) elif rf_scan_status == 'skipped': - self.skipped_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.skipped_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) else: - self.passed_pipelines += 1 + print(f"Unknown rapidfort-scan status: {rf_scan_status}") + print("-" * 50) + self.write_to_csv( + pipeline_time_created, pipeline_id, pipeline_web_url, + rf_scan_status, project_name + ) else: print(f"No pipelines found for project endpoint: {endpoint}") print("-" * 50) @@ -174,22 +226,29 @@ def init_csv_file(self): """ with open(self.CSV_FILE_PATH, 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file) - writer.writerow(["Pipeline Time Created", "Pipeline ID", "Pipeline URL", "rapidfort-scan Status", "Project Name"]) - - def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name): + writer.writerow([ + "Pipeline Time Created", "Pipeline ID", "Pipeline URL", + "rapidfort-scan Status", "Project Name" + ]) + # pylint: disable=too-many-arguments + def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url, + rf_scan_status, project_name): """ - Write the pipeline information to the CSV file. + Write pipeline information to the CSV file. Args: - pipeline_time_created (str): The formatted pipeline creation time. - pipeline_id (int): The pipeline ID. - pipeline_web_url (str): The pipeline URL. + pipeline_time_created (str): The time the pipeline was created. + pipeline_id (int): The ID of the pipeline. + pipeline_web_url (str): The web URL of the pipeline. rf_scan_status (str): The status of the rapidfort-scan job. project_name (str): The name of the project. """ with open(self.CSV_FILE_PATH, 'a', newline='', encoding='utf-8') as file: writer = csv.writer(file) - writer.writerow([pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name]) + writer.writerow([ + pipeline_time_created, pipeline_id, pipeline_web_url, + rf_scan_status, project_name + ]) def print_summary(self): """ @@ -198,16 +257,20 @@ def print_summary(self): print("Summary of Pipelines:") print(f"Total: {self.total_pipelines}") print(f"Passed: {self.passed_pipelines}") - print(f"Failed: {len(self.failed_pipelines)}") + print( + f"Failed (including partial coverage): " + f"{len(self.failed_pipelines) + len(self.partial_coverage_pipelines)}" + ) print(f"Skipped due to non-related failure: {len(self.skipped_pipelines)}") print(f"Not found: {len(self.not_found_pipelines)}") print(f"Inactive (not run in last 3 days): {len(self.inactive_pipelines)}") print("-" * 50) - if self.failed_pipelines: - print("Failed Pipelines:") - for idx, failed_pipeline in enumerate(self.failed_pipelines, 1): - print(f"\n{idx}. {failed_pipeline}") + combined_failed_pipelines = self.failed_pipelines + self.partial_coverage_pipelines + if combined_failed_pipelines: + print("Failed Pipelines (including partial coverage):") + for idx, pipeline in enumerate(combined_failed_pipelines, 1): + print(f"\n{idx}. {pipeline}") else: print("No pipelines failed the rapidfort-scan.") print("-" * 50) @@ -255,6 +318,7 @@ def run(self): print("::set-output name=workflow-status::passed") sys.exit(0) # Exit with zero status + if __name__ == "__main__": checker = PipelineChecker() checker.run()