-
Notifications
You must be signed in to change notification settings - Fork 115
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: sallyom <[email protected]>
- Loading branch information
Showing
1 changed file
with
55 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,70 @@ | ||
import os | ||
import time | ||
import logging | ||
from opentelemetry import trace | ||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter | ||
from opentelemetry.sdk.resources import Resource | ||
from opentelemetry.sdk.trace import TracerProvider | ||
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter | ||
from opentelemetry.trace import SpanContext, TraceFlags, TraceState, NonRecordingSpan | ||
|
||
# Set up logging | ||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
# Set up OpenTelemetry tracing | ||
trace.set_tracer_provider( | ||
TracerProvider( | ||
resource=Resource.create({"service.name": os.getenv("WORKFLOW_NAME")}) | ||
) | ||
) | ||
# Initialize Tracer | ||
trace.set_tracer_provider(TracerProvider()) | ||
tracer = trace.get_tracer(__name__) | ||
|
||
# Set up OTLP exporter to send to OpenTelemetry Collector | ||
otlp_exporter = OTLPSpanExporter(endpoint="http://0.0.0.0:4317", insecure=True) | ||
|
||
# Set up span processor | ||
span_processor = BatchSpanProcessor(otlp_exporter) | ||
span_processor = BatchSpanProcessor(ConsoleSpanExporter()) | ||
trace.get_tracer_provider().add_span_processor(span_processor) | ||
|
||
# Optionally, export to console for debugging | ||
console_exporter = ConsoleSpanExporter() | ||
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(console_exporter)) | ||
def trace_step(step_name: str): | ||
""" | ||
Decorated function that traces execution time. | ||
Args: | ||
step_name (str): Name of the step to trace. | ||
""" | ||
def decorator(func): | ||
def wrapper(*args, **kwargs): | ||
with tracer.start_as_current_span(step_name) as span: | ||
start_time = time.time() | ||
result = func(*args, **kwargs) | ||
end_time = time.time() | ||
duration = end_time - start_time | ||
span.set_attribute("duration_s", duration) | ||
print(f"Step: {step_name}, Duration: {duration}s") | ||
return result | ||
return wrapper | ||
return decorator | ||
|
||
def retry_operation(operation, retries=3, delay=5): | ||
for attempt in range(retries): | ||
try: | ||
return operation() | ||
except Exception as e: | ||
logger.error(f"Attempt {attempt + 1} failed with error: {e}") | ||
if attempt < retries - 1: | ||
time.sleep(delay) | ||
else: | ||
raise | ||
def trace_job(job_name: str): | ||
""" | ||
Decorated function that traces execution time. | ||
Args: | ||
job_name (str): Name of the job to trace. | ||
""" | ||
def decorator(func): | ||
def wrapper(*args, **kwargs): | ||
with tracer.start_as_current_span(job_name) as span: | ||
start_time = time.time() | ||
result = func(*args, **kwargs) | ||
end_time = time.time() | ||
duration = end_time - start_time | ||
span.set_attribute("total_duration_s", duration) | ||
print(f"Job: {job_name}, Total Duration: {duration}s") | ||
return result | ||
return wrapper | ||
return decorator | ||
|
||
def start_trace(step_name): | ||
span = tracer.start_span(name=step_name) | ||
return span | ||
@trace_step("Example Step") | ||
def example_step(): | ||
time.sleep(5) # Simulate work | ||
return "Step Completed: Example Step" | ||
|
||
def end_trace(span): | ||
span.end() | ||
@trace_job("Example Job") | ||
def example_job(): | ||
step_result = example_step() | ||
return f"Job Completed: Example Job with {step_result}" | ||
|
||
if __name__ == "__main__": | ||
step_name = os.getenv("STEP_NAME", "default_step") | ||
action = os.getenv("TRACE_ACTION", "start") | ||
job_name = os.getenv("WORKFLOW_NAME", "Default Job") | ||
step_name = os.getenv("STEP_NAME", "Default Step") | ||
trace_action = os.getenv("TRACE_ACTION", "start") | ||
|
||
if trace_action == "start": | ||
example_job() | ||
elif trace_action == "end": | ||
print(f"Ending trace for {job_name} - {step_name}") | ||
|
||
if action == "start": | ||
span = retry_operation(lambda: start_trace(step_name)) | ||
with open(f"/tmp/trace_{step_name}.txt", "w") as f: | ||
f.write(str(span.get_span_context().trace_id)) | ||
elif action == "end": | ||
trace_id = os.getenv("TRACE_ID") | ||
if not trace_id: | ||
with open(f"/tmp/trace_{step_name}.txt", "r") as f: | ||
trace_id = f.read().strip() | ||
trace_id = int(trace_id, 16) # Convert trace_id back to int | ||
span_context = SpanContext( | ||
trace_id=trace_id, | ||
span_id=0, # Span ID will be generated | ||
trace_flags=TraceFlags(TraceFlags.SAMPLED), | ||
trace_state=TraceState(), | ||
is_remote=True | ||
) | ||
with tracer.start_as_current_span(name=step_name, context=trace.set_span_in_context(NonRecordingSpan(span_context))): | ||
span = tracer.start_span(name=step_name) | ||
end_trace(span) |