Skip to content

Commit

Permalink
tmp: adds code for pipeline analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
thehapyone committed Dec 15, 2024
1 parent 7576bbf commit ad70fa1
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 7 deletions.
8 changes: 8 additions & 0 deletions examples/agents/mr_processing_crew.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ agents:
args:
result_as_answer: true

- role: Pipeline Retriever
goal: Retrieve all data about the merge request pipeline from the GitLab API for further processing.
backstory: A skilled communicator with GitLab, adept at retrieving and presenting pipeline information efficiently.
tools:
- name: GitlabPipelineTool
args:
result_as_answer: true

tasks:
- description: |
Validate the provided MR input and extract identifiers. User Input: {input}
Expand Down
2 changes: 1 addition & 1 deletion sage/flows/mergebot.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


# Sample Input
bot_input = "MR https://gitlab.its.getingecloud.net/dts/lestrade/aide-recorder/-/merge_requests/40"
bot_input = "MR https://gitlab.its.getingecloud.net/dts/lestrade/aide-recorder/-/merge_requests/53"


def extract_merge_request_id(output_string):
Expand Down
248 changes: 242 additions & 6 deletions sage/tools/gitlab_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,195 @@ class GitLabAPIWrapperExtra(GitLabAPIWrapper):
Extended GitLab API Wrapper with additional merge request functionalities.
"""

def strip_ansi_codes(self, text: str) -> str:
"""
Removes ANSI escape sequences from the text.
Parameters:
text (str): The text containing ANSI codes.
Returns:
str: The cleaned text without ANSI codes.
"""
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
return ansi_escape.sub("", text)

def extract_step_script_section(self, log: str) -> str:
"""
Extracts the 'step_script' section from the job log.
Parameters:
log (str): The full job log.
Returns:
str: The 'step_script' section of the log.
"""
# Patterns to identify section starts and ends
section_start_pattern = r"section_start:\d+:(\w+)"
section_end_pattern = r"section_end:\d+:(\w+)"

lines = log.split("\n")
in_step_script = False
step_script_log = []

for line in lines:
# Check for section start
start_match = re.match(section_start_pattern, line)
if start_match:
section_name = start_match.group(1)
if section_name == "step_script":
in_step_script = True
continue # Skip the section start line

# Check for section end
end_match = re.match(section_end_pattern, line)
if end_match:
section_name = end_match.group(1)
if section_name == "step_script":
in_step_script = False
continue # Skip the section end line

# Collect lines within 'step_script' section
if in_step_script:
step_script_log.append(line)

return "\n".join(step_script_log)

def parse_job_log(self, log: str, job_status: str) -> Dict[str, Any]:
"""
Parses the job log to extract warnings and errors with context,
focusing on the 'step_script' section.
Parameters:
log (str): The job log string.
job_status (str): Status of the job (e.g., 'success', 'failed').
Returns:
Dict[str, Any]: A dictionary containing lists of warnings and errors with context.
"""
# First, extract the 'step_script' section
step_script_lines = log.split("\n")

warnings = []
errors = []

# Remove ANSI codes from lines
lines = [self.strip_ansi_codes(line) for line in step_script_lines]

context_size = 5
# If the job failed, capture the tail of the step_script
if job_status == "failed":
error_context = "\n".join(lines[-30:])
errors.append(error_context)
else:
# For successful jobs, look for warnings and errors with context
i = 0
while i < len(lines):
line = lines[i]
if "WARNING" in line or "WARN" in line:
# Collect context lines before and after
start = max(i - context_size, 0)
end = min(i + context_size, len(lines))
context = "\n".join(lines[start:end])
warnings.append(context)
i = end # Skip lines already processed
else:
i += 1

return {"warnings": warnings, "errors": errors}

def get_pipeline_job(self, job_id: int) -> dict:
"""Gets the job information"""
job = self.gitlab_repo_instance.jobs.get(job_id)

# Get the job logs
job_log = job.trace()

# Parse the log to extract warnings and errors
parsed_log = self.parse_job_log(job_log.decode("utf-8"), job.status)
num_warnings = len(parsed_log["warnings"])
num_errors = len(parsed_log["errors"])

# Prepare job details
job_detail = {
"id": job.id,
"name": job.name,
"status": job.status,
"stage": job.stage,
"created_at": job.created_at,
"started_at": job.started_at,
"finished_at": job.finished_at,
"duration": job.duration,
"web_url": job.web_url,
"warnings_count": num_warnings,
"errors_count": num_errors,
}

# Include sample of warnings and errors if any
if num_warnings > 0:
job_detail["warnings_sample"] = parsed_log["warnings"][:5]
if num_errors > 0:
job_detail["errors_sample"] = parsed_log["errors"][:5]

return job_detail

def get_pipeline_details(self, pipeline_id: int) -> str:
"""
Retrieves detailed information of a pipeline, including job logs and summaries.
Parameters:
pipeline_id (int): The ID of the pipeline.
Returns:
str: A formatted string containing pipeline details and job summaries.
"""
try:
# Fetch the pipeline
pipeline = self.gitlab_repo_instance.pipelines.get(pipeline_id)

# Basic pipeline info
pipeline_info = {
"id": pipeline.id,
"status": pipeline.status,
"ref": pipeline.ref,
"sha": pipeline.sha,
"created_at": pipeline.created_at,
"updated_at": pipeline.updated_at,
"web_url": pipeline.web_url,
}

# Fetch jobs associated with the pipeline
jobs = pipeline.jobs.list(get_all=True)
jobs_details = []
total_warnings = 0
total_errors = 0

for job in jobs:
# Fetch job log
job_detail = self.get_pipeline_job(job.id)

num_warnings = job_detail["warnings_count"]
num_errors = job_detail["errors_count"]
total_warnings += num_warnings
total_errors += num_errors
jobs_details.append(job_detail)

# Summarize pipeline results
summary = {
"Pipeline Info": pipeline_info,
"Total Jobs": len(jobs_details),
"Total Warnings": total_warnings,
"Total Errors": total_errors,
"Jobs Details": jobs_details,
}

# Format the output for better readability
formatted_output = json.dumps(summary, indent=2, default=str)
return formatted_output

except Exception as e:
return f"Failed to retrieve pipeline details for Pipeline ID {pipeline_id}: {str(e)}"

def parse_diff_content(self, diff: str):
"""
Parses the diff content to calculate lines added and removed.
Expand Down Expand Up @@ -329,6 +518,13 @@ def run(self, mode: str, query: str = "", body: dict = {}) -> str:
return self.approve_merge_request(**body)
except ValueError:
return "Invalid input format."
elif mode == "get_pipeline_details":
try:
pipeline_id = int(query.strip())
pipeline_details = self.get_pipeline_details(pipeline_id)
return pipeline_details
except ValueError:
return "Invalid input. Please provide the Pipeline ID as an integer."
elif mode == "post_merge_request_thread_comment":
try:
parts = query.split("\n\n", 2)
Expand Down Expand Up @@ -396,6 +592,42 @@ def run(self, mode: str, query: str = "", body: dict = {}) -> str:
"""


FETCH_PIPELINE_DETAILS_PROMPT = """
This tool will fetch detailed information of a specific pipeline, including job logs and summarized relevant information.
Example input: {"pipeline_id": "12345"}
"""


class GitlabPipelineToolSchema(BaseModel):
"""Input for GitlabPipelineTool."""

pipeline_id: str = Field(..., description="The ID of the pipeline")


class GitlabPipelineTool(BaseTool):
"""Tool for fetching and summarizing pipeline details from GitLab."""

mode: str = "get_pipeline_details"
name: str = "Get Pipeline Details"
description: str = FETCH_PIPELINE_DETAILS_PROMPT
args_schema: Type[BaseModel] = GitlabPipelineToolSchema
api_wrapper: GitLabAPIWrapperExtra = GitLabAPIWrapperExtra(
gitlab_branch="main",
)

def _run(
self,
**kwargs: Any,
) -> str:
"""Use the GitLab API to fetch pipeline details."""
pipeline_id = kwargs.get("pipeline_id")

if not pipeline_id:
return "The Pipeline ID is required."

return self.api_wrapper.run(self.mode, str(pipeline_id))


class GitlabToolSchema(BaseModel):
"""Input for GitlabTools."""

Expand Down Expand Up @@ -491,9 +723,13 @@ def _run(


# # Example usage
# if __name__ == "__main__":
# mr_iid = 51 # Replace with your MR IID
# api_wrapper = GitLabAPIWrapperExtra(
# gitlab_branch="main",
# )
# print(api_wrapper.approve_merge_request(mr_iid))
if __name__ == "__main__":
mr_iid = 51 # Replace with your MR IID
api_wrapper = GitLabAPIWrapperExtra(
gitlab_branch="main",
)
# print(api_wrapper.approve_merge_request(mr_iid))
# Replace '12345' with your actual pipeline ID
pipeline_tool = GitlabPipelineTool()
result = pipeline_tool._run(pipeline_id="110292")
print(result)

0 comments on commit ad70fa1

Please sign in to comment.