Skip to content

Commit

Permalink
Fix pylint issues
Browse files Browse the repository at this point in the history
Signed-off-by: Uday Beswal <[email protected]>
  • Loading branch information
UdBe committed Oct 8, 2024
1 parent f263b89 commit 0199735
Showing 1 changed file with 56 additions and 21 deletions.
77 changes: 56 additions & 21 deletions scripts/ib_pipelines_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import csv
from datetime import datetime, timedelta, timezone
import sys
import requests
import re
import requests

class PipelineChecker:
"""
Expand Down Expand Up @@ -107,7 +107,10 @@ def get_job_trace(self, job_web_url):
raw_log_url = job_web_url + '/trace'
response = requests.get(raw_log_url, timeout=20)
if response.status_code != 200:
print(f"Failed to retrieve job trace from {raw_log_url}. Status code: {response.status_code}")
print(
f"Failed to retrieve job trace from {raw_log_url}. "
f"Status code: {response.status_code}"
)
return ""
return response.text

Expand All @@ -119,7 +122,7 @@ def check_rapidfort_scan(self, jobs):
jobs (list): The list of jobs in the pipeline.
Returns:
tuple: (status, job) of the rapidfort-scan job, or ("not found", None) if the job does not exist.
tuple: (status, job) of the rapidfort-scan job.
"""
for job in jobs:
if job['name'] == 'rapidfort-scan':
Expand Down Expand Up @@ -169,29 +172,50 @@ def process_pipeline(self, link):
pipeline_web_url = latest_pipeline['web_url']
pipeline_time_created = self.format_timestamp(latest_pipeline['created_at'])
if self.is_pipeline_inactive(latest_pipeline['created_at']):
self.inactive_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.inactive_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
jobs = self.get_jobs(endpoint, pipeline_id)
rf_scan_status, rf_scan_job = self.check_rapidfort_scan(jobs)
print(f"Time Created At: {pipeline_time_created}\nPipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\nrapidfort-scan status: {rf_scan_status}")
print(
f"Time Created At: {pipeline_time_created}\n"
f"Pipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\n"
f"rapidfort-scan status: {rf_scan_status}"
)
print("-" * 50)
if rf_scan_status == 'success':
job_trace = self.get_job_trace(rf_scan_job['web_url'])
if re.search(r'Partial coverage completed', job_trace, re.IGNORECASE):
self.partial_coverage_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.partial_coverage_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
rf_scan_status = 'success (partial coverage)'
else:
else:
self.passed_pipelines += 1
elif rf_scan_status == 'failed':
self.failed_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.failed_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
elif rf_scan_status == 'not found':
self.not_found_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.not_found_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
elif rf_scan_status == 'skipped':
self.skipped_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.skipped_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
else:
print(f"Unknown rapidfort-scan status: {rf_scan_status}")
print("-" * 50)
self.write_to_csv(pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name)

self.write_to_csv(
pipeline_time_created, pipeline_id, pipeline_web_url,
rf_scan_status, project_name
)
else:
print(f"No pipelines found for project endpoint: {endpoint}")
print("-" * 50)
Expand All @@ -202,22 +226,29 @@ def init_csv_file(self):
"""
with open(self.CSV_FILE_PATH, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(["Pipeline Time Created", "Pipeline ID", "Pipeline URL", "rapidfort-scan Status", "Project Name"])

def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name):
writer.writerow([
"Pipeline Time Created", "Pipeline ID", "Pipeline URL",
"rapidfort-scan Status", "Project Name"
])
# pylint: disable=too-many-arguments
def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url,
rf_scan_status, project_name):
"""
Write the pipeline information to the CSV file.
Write pipeline information to the CSV file.
Args:
pipeline_time_created (str): The formatted pipeline creation time.
pipeline_id (int): The pipeline ID.
pipeline_web_url (str): The pipeline URL.
pipeline_time_created (str): The time the pipeline was created.
pipeline_id (int): The ID of the pipeline.
pipeline_web_url (str): The web URL of the pipeline.
rf_scan_status (str): The status of the rapidfort-scan job.
project_name (str): The name of the project.
"""
with open(self.CSV_FILE_PATH, 'a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow([pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name])
writer.writerow([
pipeline_time_created, pipeline_id, pipeline_web_url,
rf_scan_status, project_name
])

def print_summary(self):
"""
Expand All @@ -226,7 +257,10 @@ def print_summary(self):
print("Summary of Pipelines:")
print(f"Total: {self.total_pipelines}")
print(f"Passed: {self.passed_pipelines}")
print(f"Failed (including partial coverage): {len(self.failed_pipelines) + len(self.partial_coverage_pipelines)}")
print(
f"Failed (including partial coverage): "
f"{len(self.failed_pipelines) + len(self.partial_coverage_pipelines)}"
)
print(f"Skipped due to non-related failure: {len(self.skipped_pipelines)}")
print(f"Not found: {len(self.not_found_pipelines)}")
print(f"Inactive (not run in last 3 days): {len(self.inactive_pipelines)}")
Expand Down Expand Up @@ -284,6 +318,7 @@ def run(self):
print("::set-output name=workflow-status::passed")
sys.exit(0) # Exit with zero status


if __name__ == "__main__":
checker = PipelineChecker()
checker.run()

0 comments on commit 0199735

Please sign in to comment.