Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: OpenTelemetry Support #33176

Open
1 of 17 tasks
Kludex opened this issue Nov 20, 2024 · 11 comments
Open
1 of 17 tasks

[Feature Request]: OpenTelemetry Support #33176

Kludex opened this issue Nov 20, 2024 · 11 comments

Comments

@Kludex
Copy link

Kludex commented Nov 20, 2024

What would you like to happen?

This is more a question than a feature request.

I was wondering... Why I don't see any OpenTelemetry related issue on Apache Beam? Is it because the runners already provide observability about their jobs?

I'm more interested in the Python side for now, but if there's no reason, would it make sense to create a opentelemetry-instrumentation-apache-beam package?

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

This is a good idea. We do have a plan to investigate this idea. cc @Abacn
Meanwhile, you are welcome to work on this.

@Kludex
Copy link
Author

Kludex commented Dec 4, 2024

@liferoad @Abacn I was able to do what I want by manually creating a wrapper on the PTransformers.

class AutoSpanDoFn(beam.DoFn):
    def __init__(self, transform: beam.PTransform[InputT, OutputT]):
        super().__init__()
        self.transform = transform


    def process(self, element: Any, *args, **kwargs):
        label = self.transform.label if hasattr(self.transform, "label") else "UnnamedTransform"
        with logfire.span(label):
            yield element




class AutoSpanTransform(beam.PTransform[InputT, OutputT]):
    def __init__(self, transform: beam.PTransform[InputT, OutputT]):
        super().__init__()
        self.transform = transform


    def expand(self, input_or_inputs: InputT) -> OutputT:
        return input_or_inputs | beam.ParDo(AutoSpanDoFn(self.transform)) | self.transform

And then, I can use it like this:

with logfire.span("main"):
    with Pipeline() as pipeline:
        text = [
            "To be, or not to be: that is the question: ",
            "Whether 'tis nobler in the mind to suffer ",
            "The slings and arrows of outrageous fortune, ",
            "Or to take arms against a sea of troubles, ",
        ]

        pipeline = (
            pipeline
            | "Create" >> beam.Create(text)
            | "Split" >> AutoSpanTransform(beam.ParDo(Split()))
            | "Filter" >> AutoSpanTransform(beam.Filter(lambda x: x != "the"))
            | "Print" >> AutoSpanTransform(beam.Map(logfire_print))
        )

You can see the full code here: https://github.com/Kludex/logfire-apache-beam/blob/main/main.py

You can run it with python main.py.

This is what I see in Logfire (the observability platform we are developing):
image

Now... I need to be able to do it automatically. I tried to do some patching, like this:

_original_pipeline_apply = Pipeline.apply


def patched_pipeline_apply(self, transform, *args, **kwargs):
    if (
        isinstance(transform, beam.PTransform)
        and not isinstance(transform, AutoSpanTransform)
        and not getattr(transform, "_instrumented", False)
    ):
        transform = AutoSpanTransform(transform)  # AutoSpanTrasnform has a `_instrumented = True` in this code.
    return _original_pipeline_apply(self, transform, *args, **kwargs)


Pipeline.apply = patched_pipeline_apply

But I keep getting recursive exception - I'm still debugging it. But... I would appreciate help in two things:

  1. Would you accept a PR adding some kind of API for me to not need to do this? Maybe a callback on the Pipeline()?
  2. On my solution above, do you have any ideas on how to do it cleaner? In case I need a faster solution.

@Kludex
Copy link
Author

Kludex commented Dec 4, 2024

I got something nice running. Although not there yet...

My understanding about this package is growing a bit more... 😅

def patch_beam_pipeline():
    Pipeline.apply = logfire.instrument("{transform}", extract_args=True)(Pipeline.apply)
    Pipeline.run = logfire.instrument()(Pipeline.run)

patch_beam_pipeline()

With the lines above, I'm able to patch run and apply - I thought the transformers would run in the apply, so that's what I was trying to patch, but it looks like the important bit is the run. Although I see how nice having both instrumented gets:

Screenshot 2024-12-04 at 11 26 23

Now I need to find where the PTransformers logic is executed.

EDIT: Okay, it seems that is the PipelineRunner who runs it... Which makes sense... 👀

@Kludex
Copy link
Author

Kludex commented Dec 4, 2024

This is becoming a great exercise to profile apache-beam itself 😅

Now I know how things run...

Screenshot 2024-12-04 at 14 09 02

I've enabled logfire.install_auto_tracing("apache_beam", min_duration=0.00001) to see the above.

I'm now trying to find out the best places to add spans to get execution time. I know already that run_pipeline and run_stages from PipelineRunner are good places.

@liferoad
Copy link
Collaborator

liferoad commented Dec 4, 2024

This looks interesting. I am wondering how this can work with remote Runners like Dataflow or Flink.

@Kludex
Copy link
Author

Kludex commented Dec 4, 2024

This looks interesting. I am wondering how this can work with remote Runners like Dataflow or Flink.

I'm going to run with Dataflow soon.

I still didn't find the what I wanted... I want to get the place where the Map function runs, for example... But I think that gets pickled or something, and then I'm having a hard time to find the right location (I think).

Screenshot 2024-12-04 at 15 25 25

This is the code I use - it's not as clean as the snippets above:

def Pipeline__init_subclass__(cls: Pipeline, **kwargs):
    cls.apply = logfire.instrument("{transform}", extract_args=True)(cls.apply)
    cls.run = logfire.instrument("running {cls}")(cls.run)
    cls.visit = logfire.instrument()(cls.visit)


def PipelineRunner__init_subclass__(cls: PipelineRunner, **kwargs):
    cls.run_pipeline = logfire.instrument()(cls.run_pipeline)


def DoFnInvoker__init_subclass__(cls: DoFnInvoker, **kwargs):
    cls.invoke_process = logfire.instrument("{transform}")(cls.invoke_process)


def patch_beam_pipeline():
    Pipeline.__init_subclass__ = classmethod(Pipeline__init_subclass__)
    Pipeline.apply = logfire.instrument("{transform}", extract_args=True)(Pipeline.apply)
    Pipeline.run = logfire.instrument("running {self}")(Pipeline.run)
    Pipeline.visit = logfire.instrument()(Pipeline.visit)

    PipelineRunner.__init_subclass__ = classmethod(PipelineRunner__init_subclass__)
    PipelineRunner.run_pipeline = logfire.instrument()(PipelineRunner.run_pipeline)

    FnApiRunner.run_stages = logfire.instrument("running stages from {self}")(FnApiRunner.run_stages)

    SdkWorker.process_bundle = logfire.instrument("running process_bundle from {self}")(SdkWorker.process_bundle)

patch_beam_pipeline()

@liferoad
Copy link
Collaborator

cc @Abacn PTAL.

@Kludex
Copy link
Author

Kludex commented Dec 10, 2024

BTW, I checked in Dataflow.

I made it work, but not as smooth as I wanted - I can provide code in the next days... Mainly because the context, and the exporter configured don't survive the pickling.

@Kludex
Copy link
Author

Kludex commented Dec 12, 2024

Okay... Hi there again... 👋

Let me try to explain where I stand now, and I'm actually going to stop on my own, to ask advice.

I've been trying to make it work in Dataflow. This is the pipeline I have in hands (for testing purposes):

class Split(beam.DoFn):
    def process(self, element: str):
        logfire.info(f"in split {element}")
        return element.split(" ")


def logfire_print(element: str):
    with logfire.span("Print"):
        logfire.info(element)
        logfire.info("{globals}", globals=globals())
        logfire.info("{locals}", locals=locals())
        logfire.info("{traceback}", traceback=traceback.format_stack())


pipeline_options = PipelineOptions()

with Pipeline(options=pipeline_options) as pipeline:
    text = [
        "To be, or not to be: that is the question: ",
        "Whether 'tis nobler in the mind to suffer ",
        "The slings and arrows of outrageous fortune, ",
        "Or to take arms against a sea of troubles, ",
    ]

    pipeline = (
        pipeline
        | "Create" >> beam.Create(text)
        | "Split" >> beam.ParDo(Split())
        | "Filter" >> beam.Filter(lambda x: x != "the")
        | "Print" >> beam.Map(logfire_print)
    )

What we want is to have a span that starts when we run the pipeline, i.e. on the machine that ran it:

 uv run python main.py \
    --region ... \
    --runner DataflowRunner \
    --project ... \
    --temp_location ... \
    --requirements_file ./requirements.txt \
    --save_main_session

And we want to have a span being created when each step runs e.g. when logfire_print is called for each element, we want to have the logfire_print wrapped, and then any logfire.info() will be contained on this automatic span that was created.

The problem is... Unfortunately, I can't monkeypatch anything on the worker side, because we actually pickle the function to send it to the worker.

Screenshot 2024-12-12 at 14 28 07

This is what I got ☝️

I wanted Print to be inside the first big span...

@liferoad
Copy link
Collaborator

Is it possible to redirect regular logs to logfire?

@Kludex
Copy link
Author

Kludex commented Dec 12, 2024

Is it possible to redirect regular logs to logfire?

No, only OpenTelemetry data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants