diff --git a/scripts/ib_pipelines_check.py b/scripts/ib_pipelines_check.py index 8ea8bc8abc..09fc431063 100644 --- a/scripts/ib_pipelines_check.py +++ b/scripts/ib_pipelines_check.py @@ -7,8 +7,8 @@ import csv from datetime import datetime, timedelta, timezone import sys -import requests import re +import requests class PipelineChecker: """ @@ -107,7 +107,10 @@ def get_job_trace(self, job_web_url): raw_log_url = job_web_url + '/trace' response = requests.get(raw_log_url, timeout=20) if response.status_code != 200: - print(f"Failed to retrieve job trace from {raw_log_url}. Status code: {response.status_code}") + print( + f"Failed to retrieve job trace from {raw_log_url}. " + f"Status code: {response.status_code}" + ) return "" return response.text @@ -119,7 +122,7 @@ def check_rapidfort_scan(self, jobs): jobs (list): The list of jobs in the pipeline. Returns: - tuple: (status, job) of the rapidfort-scan job, or ("not found", None) if the job does not exist. + tuple: (status, job) of the rapidfort-scan job. """ for job in jobs: if job['name'] == 'rapidfort-scan': @@ -169,29 +172,50 @@ def process_pipeline(self, link): pipeline_web_url = latest_pipeline['web_url'] pipeline_time_created = self.format_timestamp(latest_pipeline['created_at']) if self.is_pipeline_inactive(latest_pipeline['created_at']): - self.inactive_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.inactive_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) jobs = self.get_jobs(endpoint, pipeline_id) rf_scan_status, rf_scan_job = self.check_rapidfort_scan(jobs) - print(f"Time Created At: {pipeline_time_created}\nPipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\nrapidfort-scan status: {rf_scan_status}") + print( + f"Time Created At: {pipeline_time_created}\n" + f"Pipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\n" + f"rapidfort-scan status: {rf_scan_status}" + ) print("-" * 50) if rf_scan_status == 'success': job_trace = self.get_job_trace(rf_scan_job['web_url']) if re.search(r'Partial coverage completed', job_trace, re.IGNORECASE): - self.partial_coverage_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.partial_coverage_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) rf_scan_status = 'success (partial coverage)' - else: + else: self.passed_pipelines += 1 elif rf_scan_status == 'failed': - self.failed_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.failed_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) elif rf_scan_status == 'not found': - self.not_found_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.not_found_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) elif rf_scan_status == 'skipped': - self.skipped_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}") + self.skipped_pipelines.append( + f"{project_name}\nPipeline ID: {pipeline_id}\n" + f"Pipeline URL: {pipeline_web_url}" + ) else: print(f"Unknown rapidfort-scan status: {rf_scan_status}") print("-" * 50) - self.write_to_csv(pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name) - + self.write_to_csv( + pipeline_time_created, pipeline_id, pipeline_web_url, + rf_scan_status, project_name + ) else: print(f"No pipelines found for project endpoint: {endpoint}") print("-" * 50) @@ -202,22 +226,29 @@ def init_csv_file(self): """ with open(self.CSV_FILE_PATH, 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file) - writer.writerow(["Pipeline Time Created", "Pipeline ID", "Pipeline URL", "rapidfort-scan Status", "Project Name"]) - - def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name): + writer.writerow([ + "Pipeline Time Created", "Pipeline ID", "Pipeline URL", + "rapidfort-scan Status", "Project Name" + ]) + # pylint: disable=too-many-arguments + def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url, + rf_scan_status, project_name): """ - Write the pipeline information to the CSV file. + Write pipeline information to the CSV file. Args: - pipeline_time_created (str): The formatted pipeline creation time. - pipeline_id (int): The pipeline ID. - pipeline_web_url (str): The pipeline URL. + pipeline_time_created (str): The time the pipeline was created. + pipeline_id (int): The ID of the pipeline. + pipeline_web_url (str): The web URL of the pipeline. rf_scan_status (str): The status of the rapidfort-scan job. project_name (str): The name of the project. """ with open(self.CSV_FILE_PATH, 'a', newline='', encoding='utf-8') as file: writer = csv.writer(file) - writer.writerow([pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name]) + writer.writerow([ + pipeline_time_created, pipeline_id, pipeline_web_url, + rf_scan_status, project_name + ]) def print_summary(self): """ @@ -226,7 +257,10 @@ def print_summary(self): print("Summary of Pipelines:") print(f"Total: {self.total_pipelines}") print(f"Passed: {self.passed_pipelines}") - print(f"Failed (including partial coverage): {len(self.failed_pipelines) + len(self.partial_coverage_pipelines)}") + print( + f"Failed (including partial coverage): " + f"{len(self.failed_pipelines) + len(self.partial_coverage_pipelines)}" + ) print(f"Skipped due to non-related failure: {len(self.skipped_pipelines)}") print(f"Not found: {len(self.not_found_pipelines)}") print(f"Inactive (not run in last 3 days): {len(self.inactive_pipelines)}") @@ -284,6 +318,7 @@ def run(self): print("::set-output name=workflow-status::passed") sys.exit(0) # Exit with zero status + if __name__ == "__main__": checker = PipelineChecker() checker.run()