From e6f4a7430f094693572675b7aad4d12ea2ea849e Mon Sep 17 00:00:00 2001 From: sallyom Date: Mon, 24 Jun 2024 23:41:59 -0400 Subject: [PATCH] update tracegen script Signed-off-by: sallyom --- ci/trace-steps.py | 115 ++++++++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 60 deletions(-) diff --git a/ci/trace-steps.py b/ci/trace-steps.py index a7c77e99..d71c66ad 100644 --- a/ci/trace-steps.py +++ b/ci/trace-steps.py @@ -1,75 +1,70 @@ import os import time -import logging from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter -from opentelemetry.trace import SpanContext, TraceFlags, TraceState, NonRecordingSpan -# Set up logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Set up OpenTelemetry tracing -trace.set_tracer_provider( - TracerProvider( - resource=Resource.create({"service.name": os.getenv("WORKFLOW_NAME")}) - ) -) +# Initialize Tracer +trace.set_tracer_provider(TracerProvider()) tracer = trace.get_tracer(__name__) - -# Set up OTLP exporter to send to OpenTelemetry Collector -otlp_exporter = OTLPSpanExporter(endpoint="http://0.0.0.0:4317", insecure=True) - -# Set up span processor -span_processor = BatchSpanProcessor(otlp_exporter) +span_processor = BatchSpanProcessor(ConsoleSpanExporter()) trace.get_tracer_provider().add_span_processor(span_processor) -# Optionally, export to console for debugging -console_exporter = ConsoleSpanExporter() -trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(console_exporter)) +def trace_step(step_name: str): + """ + Decorated function that traces execution time. + Args: + step_name (str): Name of the step to trace. + """ + def decorator(func): + def wrapper(*args, **kwargs): + with tracer.start_as_current_span(step_name) as span: + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + span.set_attribute("duration_s", duration) + print(f"Step: {step_name}, Duration: {duration}s") + return result + return wrapper + return decorator -def retry_operation(operation, retries=3, delay=5): - for attempt in range(retries): - try: - return operation() - except Exception as e: - logger.error(f"Attempt {attempt + 1} failed with error: {e}") - if attempt < retries - 1: - time.sleep(delay) - else: - raise +def trace_job(job_name: str): + """ + Decorated function that traces execution time. + Args: + job_name (str): Name of the job to trace. + """ + def decorator(func): + def wrapper(*args, **kwargs): + with tracer.start_as_current_span(job_name) as span: + start_time = time.time() + result = func(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + span.set_attribute("total_duration_s", duration) + print(f"Job: {job_name}, Total Duration: {duration}s") + return result + return wrapper + return decorator -def start_trace(step_name): - span = tracer.start_span(name=step_name) - return span +@trace_step("Example Step") +def example_step(): + time.sleep(5) # Simulate work + return "Step Completed: Example Step" -def end_trace(span): - span.end() +@trace_job("Example Job") +def example_job(): + step_result = example_step() + return f"Job Completed: Example Job with {step_result}" if __name__ == "__main__": - step_name = os.getenv("STEP_NAME", "default_step") - action = os.getenv("TRACE_ACTION", "start") + job_name = os.getenv("WORKFLOW_NAME", "Default Job") + step_name = os.getenv("STEP_NAME", "Default Step") + trace_action = os.getenv("TRACE_ACTION", "start") + + if trace_action == "start": + example_job() + elif trace_action == "end": + print(f"Ending trace for {job_name} - {step_name}") - if action == "start": - span = retry_operation(lambda: start_trace(step_name)) - with open(f"/tmp/trace_{step_name}.txt", "w") as f: - f.write(str(span.get_span_context().trace_id)) - elif action == "end": - trace_id = os.getenv("TRACE_ID") - if not trace_id: - with open(f"/tmp/trace_{step_name}.txt", "r") as f: - trace_id = f.read().strip() - trace_id = int(trace_id, 16) # Convert trace_id back to int - span_context = SpanContext( - trace_id=trace_id, - span_id=0, # Span ID will be generated - trace_flags=TraceFlags(TraceFlags.SAMPLED), - trace_state=TraceState(), - is_remote=True - ) - with tracer.start_as_current_span(name=step_name, context=trace.set_span_in_context(NonRecordingSpan(span_context))): - span = tracer.start_span(name=step_name) - end_trace(span)