diff --git a/learning/prompts/documentation-lookup-nolinks/33_ai_ml_pipelines.md b/learning/prompts/documentation-lookup-nolinks/33_ai_ml_pipelines.md new file mode 100644 index 000000000000..30d62c0faac0 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/33_ai_ml_pipelines.md @@ -0,0 +1,22 @@ +Prompt: +How can I use Apache Beam in my AI/ML pipeline? + +Response: +Apache Beam is an open-source, unified programming model for defining batch and streaming data-parallel processing pipelines. Each Apache Beam pipeline is represented as a Directed Acyclic Graph (DAG) of data processing operations constructed using the Apache Beam SDK in one of the supported languages. To execute a pipeline, you need to deploy it to one of the supported Beam runners. + +You can use Apache Beam for various tasks within your AI/ML pipeline, including data validation, preprocessing, model validation, and model deployment and inference. + +Apache Beam offers a rich set of I/O connectors and `PTransforms` that allow for reading and writing data from and to various data sources and sinks, as well as performing data validation. + +For data preprocessing, Apache Beam provides the `MLTransform` class. This feature allows you to ensure data consistency by applying the same preprocessing steps for both training and inference. + +Additionally, Apache Beam allows integration with pre-trained models from PyTorch, Scikit-learn, and TensorFlow. + +To execute machine learning inference tasks, Apache Beam provides the RunInference API. `RunInference` is a `PTransform` that supports the usage of any custom models. For an example of running inference on a custom model loaded with spaCy, refer to the Apache Beam GitHub repository. + +The `RunInference` transform efficiently handles models of any size, making it suitable for large language models (LLMs) and other complex architectures. You can find an example of deploying and performing inference on large language models (LLMs) in the 'RunInference' section of the Apache Beam documentation. + +For more information on implementing AI/ML pipelines using Apache Beam, see the 'Get started with AI/ML pipelines' section in the Apache Beam documentation. + + + diff --git a/learning/prompts/documentation-lookup-nolinks/36_ai_kfp_orchestration.md b/learning/prompts/documentation-lookup-nolinks/36_ai_kfp_orchestration.md new file mode 100644 index 000000000000..df0419a67342 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/36_ai_kfp_orchestration.md @@ -0,0 +1,51 @@ +Prompt: +How can I orchestrate Apache Beam pipelines with Kubeflow? + +Response: +You can use Apache Beam for various tasks within your machine learning project, including data validation, preprocessing, model validation, and model deployment and inference. + +However, a comprehensive ML workflow also involves additional steps like data exploration, feature engineering, and model training. Additionally, ML workflows require metadata and artifact tracking for reproducibility and auditability. Popular tools for building and managing ML workflows include Kubeflow, TFX, and Apache Airflow. + +Kubeflow is an open-source ML platform tailored for MLOps, which comprises best practices and guiding principles to streamline the development and maintenance of ML workflows. Kubeflow provides a set of tools for building, deploying, and managing end-to-end ML pipelines in the form of DAGs. The DAGs orchestrate the scheduling and execution of pipeline steps and facilitate the transfer of execution parameters, metadata, and artifacts between steps. + +You can integrate an Apache Beam pipeline into a Kubeflow pipeline, where the Apache Beam pipeline DAG serves as a node within the Kubeflow pipeline DAG. + +To run an Apache Beam pipeline as a part of a Kubeflow Pipeline (KFP), you need to: +1. Create KFP components by specifying interfaces and containerizing the component implementation. +2. Create the KFP pipeline by connecting components and defining how inputs and outputs are exchanged between them. +3. Compile and run the KFP pipeline by submitting it to a KFP client endpoint. + +Here is an example of a KFP pipeline orchestrating an Apache Beam preprocessing pipeline: + +``` + kfp + ├── pipeline.py + ├── components + │ └── preprocess + │ ├── Dockerfile + │ ├── component.yaml + │ ├── requirements.txt + │ └── src + │ └── preprocess.py + └── requirements.txt +``` + +Apache Beam pipelines can be used as KFP components consisting of a YAML specification `component.yaml` and a Python source file `preprocess.py`. The YAML file defines the input and output arguments of the component, while the Python file contains the Apache Beam pipeline code. + +Since KFP provides the input and output arguments as command-line arguments, you need to use the `ArgumentParser` class. You containerize the logic and prerequisites of the pipeline components, and then push the resulting container image to a container registry. + +In the final stage, the `compiler` method transforms the KFP pipeline into a JSON file, and the `pipeline.py` script submits it to a KFP client endpoint. + +```python +Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json") + +client = kfp.Client() +experiment = client.create_experiment("KFP orchestration example") +run_result = client.run_pipeline( + experiment_id=experiment.id, + job_name="KFP orchestration job", + pipeline_package_path="pipeline.json", + params=run_arguments) +``` + +For a complete example of orchestrating an Apache Beam preprocessing pipeline with Kubeflow, refer to the Apache Beam GitHub repository. diff --git a/learning/prompts/documentation-lookup-nolinks/37_ai_tfx_orchestration.md b/learning/prompts/documentation-lookup-nolinks/37_ai_tfx_orchestration.md new file mode 100644 index 000000000000..861f011a3cd9 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/37_ai_tfx_orchestration.md @@ -0,0 +1,20 @@ +Prompt: +How can I orchestrate Apache Beam pipelines with TFX? + +Response: +You can use Apache Beam for various tasks within your machine learning project, including data validation, preprocessing, model validation, and model deployment and inference. + +However, a comprehensive ML workflow also involves additional steps like data exploration, feature engineering, and model training. Additionally, ML workflows require metadata and artifact tracking for reproducibility and auditability. Popular tools for building and managing ML workflows include Kubeflow, TFX, and Apache Airflow. + +TFX is an end-to-end platform designed for deploying production ML pipelines. TFX pipelines consist of a sequence of components that together form an ML pipeline. Each TFX pipeline DAG requires its own orchestrator. The natively supported orchestrators include Kubeflow Pipelines, Apache Airflow, and Apache Beam itself. + +To integrate an Apache Beam pipeline into a TFX pipeline, you need to follow these steps: +1. Create TFX components by specifying interfaces and containerizing the component implementation. +2. Create the TFX pipeline by connecting these components and specifying how inputs and outputs are exchanged between them. +3. Compile and execute the TFX pipeline in the target environment. + +TFX provides a rich set of standard components for constructing ML pipelines. These components can be used as building blocks for creating custom components. TFX heavily relies on Apache Beam libraries for implementing data-parallel pipelines within these components. You must execute components created using these libraries with one of the supported Apache Beam runners. + +To launch the pipeline, you must provide two configurations: the orchestrator for the TFX pipeline and the pipeline options necessary for running Apache Beam pipelines. + +You can find the complete TFX example code in the Apache Beam GitHub repository. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup-nolinks/38_ai_runinference.md b/learning/prompts/documentation-lookup-nolinks/38_ai_runinference.md new file mode 100644 index 000000000000..992410da43ec --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/38_ai_runinference.md @@ -0,0 +1,37 @@ +Prompt: +How can I run inference on a trained ML model using Apache Beam? + +Response: +Apache Beam enables efficient inference on both local and remote ML models within your pipelines through the RunInference API. This functionality is available in the Python SDK versions 2.40.0 and later. The Java SDK versions 2.41.0 and later also support the API through Apache Beam’s Multi-language Pipelines framework. The `RunInference` transform performs inference on a `PCollection` of examples using an ML model and outputs a `PCollection` containing both the input examples and the corresponding output predictions. + +Key features of the RunInference API include: +* Support for both batch and streaming inference. +* Centralized model management for efficient memory and bandwidth usage. +* Compatibility with multiple model frameworks and model hubs. +* Automatic model refreshing to ensure the latest model version is used. +* GPU support for model inference. + +The RunInference API supports a variety of frameworks and model hubs, including TensorFlow, PyTorch, Scikit-learn, XGBoost, Hugging Face, TensorFlow Hub, Vertex AI, TensorRT, and ONNX. Additionally, you can easily integrate custom model frameworks. + +To import a model into your Apache Beam pipeline, you will need to configure the `ModelHandler` object, which wraps the underlying model and allows you to set necessary environment variables for inference. + +Here is an example of importing a PyTorch model handler for use in your pipeline: + +```python +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.base import RunInference + + model_handler = PytorchModelHandlerTensor( + # Model handler setup + ) + +with pipeline as p: + predictions = p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(model_handler) +``` + +If you need to run inference on a model that isn't explicitly supported, you can create your own `ModelHandler` or `KeyedModelHandler` with custom logic to load and use your model. For an example of running inference on a custom model loaded with spaCy, refer to the examples in the Apache Beam GitHub repository. + +For recommended patterns and best practices when leveraging Apache Beam for inference tasks, see the 'RunInference Patterns' section in the official documentation. + +For comprehensive end-to-end examples of inference with supported model frameworks and model hubs, including examples of using the RunInference API in the Java SDK, see the Apache Beam GitHub repository. Additionally, for an illustration of a composite Python transform integrating the RunInference API with preprocessing and postprocessing from a Beam Java SDK pipeline, you can refer to the Apache Beam official documentation. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup-nolinks/39_ai_runinference_metrics.md b/learning/prompts/documentation-lookup-nolinks/39_ai_runinference_metrics.md new file mode 100644 index 000000000000..003f921b79d7 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/39_ai_runinference_metrics.md @@ -0,0 +1,38 @@ +Prompt: +What metrics are available for monitoring the performance of ML model inference in Apache Beam? + +Response: +Apache Beam enables efficient inference on both local and remote ML models within your pipelines through the RunInference API. This functionality is available in the Python SDK versions 2.40.0 and later. The Java SDK versions 2.41.0 and later also support the API through Apache Beam’s Multi-language Pipelines framework. + +The RunInference API provides built-in support for monitoring the performance of ML model inference. Here is a list of commonly used metrics for inference benchmarking: +* `model_byte_size`: size of the memory footprint of the model load and initialization. +* `load_model_latency_milli_secs`: time taken to load and initialize the model. +* `num_inferences`: total number of elements passed to `run_inference()`. +* `inference_batch_latency_micro_secs`: time taken to perform inference across all batches of examples. +* `inference_request_batch_byte_size`: size of the batch in bytes. +* `inference_request_batch_size`: number of examples in the batch. +* `failed_batches_counter`: number of failed batches. + +You can derive additional metrics from the ones listed above. Example: +Total time taken for inference = `num_inferences` x `inference_batch_latency_micro_secs_MEAN` + +Here is a simplified example of how to use the RunInference API to perform inference on a language model (LM): + +```python + with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | "Create inputs" >> beam.Create(>) + | "Tokenize" >> beam.ParDo(Tokenize()) + | "Inference" >> RunInference()) +``` + +After running the pipeline, you can query the metrics using the following code: + +```python +metrics = pipeline.result.metrics().query(beam.metrics.MetricsFilter()) +``` + +Metrics are also available in the Google Cloud Dataflow UI. For the complete source code of the example and instructions to run it in Dataflow, refer to the Apache Beam GitHub repository. + +For a sample implementation of a metrics collector that tracks ML-related performance and memory usage, see the base Python inference example in the Apache Beam GitHub repository. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup-nolinks/40_ai_bigquery_ml.md b/learning/prompts/documentation-lookup-nolinks/40_ai_bigquery_ml.md new file mode 100644 index 000000000000..e7c6d019ba3a --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/40_ai_bigquery_ml.md @@ -0,0 +1,48 @@ +Prompt: +How can I integrate BigQuery ML with Apache Beam? + +Response: +BigQuery ML allows you to develop and execute ML models using GoogleSQL queries. You can seamlessly integrate these models into your Apache Beam pipeline using TFX Basic Shared Libraries (tfx_bsl) and perform inference on them using the RunInference API. + +For an example of training a basic BigQuery ML model, refer to the BigQuery documentation. + +Once you have trained your model, you will need to export it. Here is an example BigQuery command to export a model to a Google Cloud Storage bucket: + +``` +bq extract -m gs:// +``` + +To incorporate your BigQuery ML model into an Apache Beam pipeline using `tfx_bsl`, it must be saved in the TensorFlow SavedModel format. You will need to download the model to your local directory to perform local predictions: + +```python +import apache_beam +import tensorflow as tf +from google.protobuf import text_format +from tensorflow.python.framework import tensor_util +from tfx_bsl.beam import run_inference +from tfx_bsl.public.beam import RunInference +from tfx_bsl.public.proto import model_spec_pb2 + +inputs = tf.train.Example(...) + +model_path = + +def extract_prediction(response): + # Extract the prediction from the response depending on the signature of the model + +with beam.Pipeline() as p: + res = ( + p + | beam.Create([inputs]) + | RunInference( + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path, + signature_name=['serving_default']))) + | beam.ParDo(extract_prediction) + ) +``` + +This example uses the `RunInference` transform from the `tfx_bsl` library, directing it to the local directory where the model is stored. The transform takes `tf.train.Example` type elements as inputs and produces `tensorflow_serving.apis.prediction_log_pb2.PredictionLog` type elements as outputs. + +Depending on the signature of your model, you can extract values from the output. For the prediction output format of exported models for each model type, refer to the 'Prediction output format' section in the BigQuery documentation. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup-nolinks/41_ai_model_refresh.md b/learning/prompts/documentation-lookup-nolinks/41_ai_model_refresh.md new file mode 100644 index 000000000000..7f7890fa91e8 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/41_ai_model_refresh.md @@ -0,0 +1,42 @@ +Prompt: +How can I make sure my Apache Beam pipeline uses the latest version of the ML model? + +Response: +Production ML workflows often involve updating the ML model with new data. You can ensure that your Apache Beam pipeline uses the latest version of the ML model by leveraging the RunInference API alongside Apache Beam's side inputs feature. Side inputs are additional inputs that can be provided to a `ParDo` transform, in addition to the main input `PCollection`. + +The `RunInference` transform accepts an optional input parameter `model_metadata_pcoll`, which is a side input `PCollection` emitting `ModelMetadata`. `ModelMetadata` is a `NamedTuple` object containing `model_id` and `model_name`, used to load the model for inference and identify it in the metrics generated by the `RunInference` transform. The URL or path to the model should be compatible with the respective `ModelHandler` requirements. + +If the main collection emits inputs before the `model_metadata_pcoll` side input is available, the main `PCollection` will be buffered until the `model_metadata_pcoll` side input is emitted. + +For more information on `ModelMetadata`, refer to the Apache Beam Python SDK documentation. + +A common approach to model updates in production is to use a `FileWatchPattern` as a side input: + +```python +import apache_beam as beam +from apache_beam.ml.inference.utils import WatchFilePattern +from apache_beam.ml.inference.base import RunInference + +tf_model_handler = ... # model handler for the model + +with beam.Pipeline() as pipeline: + + file_pattern = '' + + side_input_pcoll = ( + pipeline + | "FilePatternUpdates" >> WatchFilePattern(file_pattern=file_pattern)) + + main_input_pcoll = ... # main input PCollection + + inference_pcoll = ( + main_input_pcoll + | "RunInference" >> RunInference( + model_handler=model_handler, + model_metadata_pcoll=side_input_pcoll)) +``` + +In the provided example, the `model_metadata_pcoll` parameter expects a `PCollection` of `ModelMetadata` compatible with the `AsSingleton` marker. Given that the pipeline employs the `WatchFilePattern` class as a side input, it automatically manages windowing and encapsulates the output into `ModelMetadata`. + +For more information, refer to the section 'Use `WatchFilePattern` to auto-update ML models in RunInference' in the Apache Beam documentation. + diff --git a/learning/prompts/documentation-lookup-nolinks/42_ai_custom_inference.md b/learning/prompts/documentation-lookup-nolinks/42_ai_custom_inference.md new file mode 100644 index 000000000000..5a622b3465f1 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/42_ai_custom_inference.md @@ -0,0 +1,50 @@ +Prompt: +How can I use custom remote inference calls in my Apache Beam pipeline? + +Response: +The optimal approach for conducting inference within an Apache Beam pipeline is by leveraging the RunInference API provided by the Apache Beam Python SDK. This feature allows you to seamlessly incorporate ML models into your pipeline or execute remote inference calls. + +When developing custom inference calls, consider the following factors: +* **API quotas**: heavy loads may lead to reaching API quota limits. You can use `PipelineOptions` to specify the maximum number of parallel API calls. Use `direct_num_workers` for the Direct Runner or `max_num_workers` for the Google Cloud Dataflow Runner. Refer to the Beam Capability Matrix for information about other runners. +* **Error handling**: it is essential to handle errors in case of API call failures. Consider implementing exponential backoff for retries or utilizing dead-letter queues for failed API calls. +* **Monitoring**: incorporate monitoring and performance metrics to track the performance of your inference calls and the health of your pipeline. +* **Batching**: batching can be used to send multiple inputs in a single API call for improved efficiency. + +To execute external API calls with the `RunInference` transform, you will need to create a `beam.DoFn` in the form of a custom model handler: + +```python +class CustomModelHandler(ModelHandler): + """DoFn that accepts a batch of inputs and sends that batch to the remote API for inference""" + + def initialize_client(self): + """Initiate the Custom remote API client.""" + client = ... # Initialize the client + return client + + def run_inference(self, batch, model, inference): + + # Initialize the client. + client = self.initialize_client() + + # Prepare a batch request for all inputs in the batch. + inputs = ... # Process inputs from the batch + input_requests = ... # Prepare input requests for the model + batch_request = ... # Prepare batch request for the model + + # Send the batch request to the remote endpoint. + responses = client.request(request=batch_request).responses + + return responses +``` + +Integrate this custom model handler into your pipeline as demonstrated in the following example: + +```python +with beam.Pipeline() as pipeline: + _ = (pipeline | "Create inputs" >> beam.Create() + | "Inference" >> RunInference(model_handler=CustomModelHandler()) + | "Process outputs" >> beam.Map() + ) +``` + +For a comprehensive example of using the RunInference API for remote inference calls, refer to the Apache Beam GitHub repository. diff --git a/learning/prompts/documentation-lookup-nolinks/43_ai_llm_inference.md b/learning/prompts/documentation-lookup-nolinks/43_ai_llm_inference.md new file mode 100644 index 000000000000..1a46bd44f709 --- /dev/null +++ b/learning/prompts/documentation-lookup-nolinks/43_ai_llm_inference.md @@ -0,0 +1,60 @@ +Prompt: +How can I use Apache Beam to run inference on Large Language Models (LLMs)? + +Response: +The optimal approach for conducting inference within an Apache Beam pipeline is by leveraging the RunInference API provided by the Apache Beam Python SDK. This feature allows you to seamlessly incorporate ML models into your pipeline or execute remote inference calls. + +You can use the `RunInference` transform with large models as long as they fit on your hardware. In addition, RunInference has several mechanisms for reducing memory usage. For example, `RunInference` by default will load at most a single copy of each model per process (rather than one per thread). + +Many Beam runners run multiple Beam processes per machine at once. Loading large models like LLMs multiple times might cause problems because of the negative impact on memory footprint. For memory-intensive models, `RunInference` provides a mechanism for more intelligently sharing memory across multiple processes to reduce the overall memory footprint. To enable this mode, set the parameter `large_model` to True in model configuration and Beam will take care of the memory management. + +The typical workflow for conducting inference on Large Language Models (LLMs) within an Apache Beam pipeline involves the following steps: +1. Read the input text data from a source such as a file or a Pub/Sub topic. +2. Encode the text into tokens understandable by the LLM model, typically using a tokenizer. +3. Use the `RunInference` transform to obtain predictions from the model. +4. Decode the predictions into human-readable text. + +Here is an example demonstrating how to leverage the RunInference API with LLMs in an Apache Beam pipeline: + +```python +import apache_beam as beam +from apache_beam.ml.inference.base import RunInference + +with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | "CreateInputs" >> beam.Create(task_sentences) + | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer)) + | "RunInference" >> RunInference(model_handler=model_handler) + | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer)) + ) +``` + +In this example, `Preprocess` and `Postprocess` are `DoFn` classes responsible for encoding and decoding the input and output data using a tokenizer. The `model_handler` is a `ModelHandler` object that wraps the underlying model and allows you to configure environment variables required to load the model. + +Here is an example of how to configure a PyTorch model handler for use in your pipeline: + +```python +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +model_handler = PytorchModelHandlerTensor( + state_dict_path=known_args.model_state_dict_path, + model_class=T5ForConditionalGeneration, + model_params={ + "config": AutoConfig.from_pretrained(known_args.model_name) + }, + device="cpu", + inference_fn=gen_fn, + large_model=True) +``` + +Each specific model handler has its own configuration parameters. For example, the `PytorchModelHandlerTensor` requires the following parameters: +* `state_dict_path`: the path to the saved model state dictionary. +* `model_class`: the class of the PyTorch model that defines model structure. +* `model_params`: the dictionary of model parameters. +* `device`: the device to run the model on (e.g. "cpu" or "gpu"). +* `inference_fn`: the function to run the inference during RunInference. +* `large_model`: whether to use the memory minimization mode for large models. + +For information on supported popular frameworks and models, refer to the reference documentation for the `apache_beam.ml.inference` package. + +You can find the comprehensive example of using the RunInference API with LLMs in the Apache Beam GitHub repository. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup/33_ai_ml_pipelines.md b/learning/prompts/documentation-lookup/33_ai_ml_pipelines.md new file mode 100644 index 000000000000..086b3eea65e5 --- /dev/null +++ b/learning/prompts/documentation-lookup/33_ai_ml_pipelines.md @@ -0,0 +1,15 @@ +Prompt: +How can I use Apache Beam in my AI/ML pipeline? + +Response: +Apache Beam is an open-source, unified programming model for defining batch and streaming data-parallel processing pipelines. Each Apache Beam pipeline is represented as a Directed Acyclic Graph (DAG) of data processing operations constructed using the Apache Beam SDK in one of the [supported languages](https://beam.apache.org/documentation/sdks/java/). To execute a pipeline, you need to deploy it to one of the supported [Beam runners](https://beam.apache.org/documentation/runners/capability-matrix/). + +You can use Apache Beam for various tasks within your AI/ML pipeline, including data validation, preprocessing, model validation, and model deployment and inference. + +Apache Beam offers a rich set of [I/O connectors](https://beam.apache.org/documentation/io/connectors/) and [transforms](https://beam.apache.org/documentation/transforms/python/) that allow for reading and writing data from and to various data sources and sinks, as well as performing data validation. + +For data preprocessing, Apache Beam provides the [MLTransform](https://beam.apache.org/documentation/ml/preprocess-data/) class. This feature allows you to ensure data consistency by applying the same preprocessing steps for both training and inference. Additionally, Apache Beam allows integration with pre-trained models from [PyTorch](https://pytorch.org/), [Scikit-learn](https://scikit-learn.org/stable/), and [TensorFlow](https://www.tensorflow.org/). + +To execute machine learning inference tasks, Apache Beam provides the RunInference API. [`RunInference`](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) is a [`PTransform`](https://beam.apache.org/documentation/basics/#ptransform) that supports the usage of any custom models. For an example of running inference on a custom model loaded with [spaCy](https://spacy.io/), refer to the [Bring your own ML model to Beam RunInference](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_custom_inference.ipynb) example in the Apache Beam GitHub repository. + +The `RunInference` transform efficiently handles models of any size, making it suitable for large language models (LLMs) and other complex architectures. You can find an example of deploying and performing inference on large language models (LLMs) in the [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) section of the Apache Beam documentation. For more information on implementing AI/ML pipelines using Apache Beam, see the [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/) section in the Apache Beam documentation. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup/36_ai_kfp_orchestration.md b/learning/prompts/documentation-lookup/36_ai_kfp_orchestration.md new file mode 100644 index 000000000000..9284b99c4179 --- /dev/null +++ b/learning/prompts/documentation-lookup/36_ai_kfp_orchestration.md @@ -0,0 +1,51 @@ +Prompt: +How can I orchestrate Apache Beam pipelines with Kubeflow? + +Response: +You can use Apache Beam for various tasks within your machine learning project, including data validation, preprocessing, model validation, and model deployment and inference. + +However, a comprehensive ML workflow also involves additional steps like data exploration, feature engineering, and model training. Additionally, ML workflows require metadata and artifact tracking for reproducibility and auditability. Popular tools for building and managing ML workflows include [Kubeflow](https://www.kubeflow.org/), [TFX](https://www.tensorflow.org/tfx), and [Apache Airflow](https://airflow.apache.org/). + +Kubeflow is an open-source ML platform tailored for MLOps, which comprises best practices and guiding principles to streamline the development and maintenance of ML workflows. Kubeflow provides a set of tools for building, deploying, and managing end-to-end ML pipelines in the form of DAGs. The DAGs orchestrate the scheduling and execution of pipeline steps and facilitate the transfer of execution parameters, metadata, and artifacts between steps. + +You can integrate an Apache Beam pipeline into a Kubeflow pipeline, where the Apache Beam pipeline DAG serves as a node within the Kubeflow pipeline DAG. + +To run an Apache Beam pipeline as a part of a Kubeflow Pipeline (KFP), you need to: +1. Create KFP components by specifying interfaces and containerizing the component implementation. +2. Create the KFP pipeline by connecting components and defining how inputs and outputs are exchanged between them. +3. Compile and run the KFP pipeline by submitting it to a KFP client endpoint. + +Here is an example of a KFP pipeline orchestrating an Apache Beam preprocessing pipeline: + +``` + kfp + ├── pipeline.py + ├── components + │ └── preprocess + │ ├── Dockerfile + │ ├── component.yaml + │ ├── requirements.txt + │ └── src + │ └── preprocess.py + └── requirements.txt +``` + +Apache Beam pipelines can be used as KFP components consisting of a YAML specification `component.yaml` and a Python source file `preprocess.py`. The YAML file defines the input and output arguments of the component, while the Python file contains the Apache Beam pipeline code. + +Since KFP provides the input and output arguments as command-line arguments, you need to use the `ArgumentParser` class. You containerize the logic and prerequisites of the pipeline components, and then push the resulting container image to a container registry. + +In the final stage, the `compiler` method transforms the KFP pipeline into a JSON file, and the `pipeline.py` script submits it to a KFP client endpoint. + +```python +Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json") + +client = kfp.Client() +experiment = client.create_experiment("KFP orchestration example") +run_result = client.run_pipeline( + experiment_id=experiment.id, + job_name="KFP orchestration job", + pipeline_package_path="pipeline.json", + params=run_arguments) +``` + +For a complete example of orchestrating an Apache Beam preprocessing pipeline with Kubeflow, refer to the [Apache Beam GitHub repository](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/kfp). diff --git a/learning/prompts/documentation-lookup/37_ai_tfx_orchestration.md b/learning/prompts/documentation-lookup/37_ai_tfx_orchestration.md new file mode 100644 index 000000000000..5b3b9ce183ef --- /dev/null +++ b/learning/prompts/documentation-lookup/37_ai_tfx_orchestration.md @@ -0,0 +1,20 @@ +Prompt: +How can I orchestrate Apache Beam pipelines with TFX? + +Response: +You can use Apache Beam for various tasks within your machine learning project, including data validation, preprocessing, model validation, and model deployment and inference. + +However, a comprehensive ML workflow also involves additional steps like data exploration, feature engineering, and model training. Additionally, ML workflows require metadata and artifact tracking for reproducibility and auditability. Popular tools for building and managing ML workflows include [Kubeflow](https://www.kubeflow.org/), [TFX](https://www.tensorflow.org/tfx), and [Apache Airflow](https://airflow.apache.org/). + +TFX is an end-to-end platform designed for deploying production ML pipelines. TFX pipelines consist of a sequence of components that together form an ML pipeline. Each TFX pipeline DAG requires its own orchestrator. The natively supported orchestrators include [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/), [Apache Airflow](https://airflow.apache.org/), and Apache Beam itself. + +To integrate an Apache Beam pipeline into a TFX pipeline, you need to follow these steps: +1. Create TFX components by specifying interfaces and containerizing the component implementation. +2. Create the TFX pipeline by connecting these components and specifying how inputs and outputs are exchanged between them. +3. Compile and execute the TFX pipeline in the target environment. + +TFX provides a rich set of [standard components](https://www.tensorflow.org/tfx/guide#tfx_standard_components) for constructing ML pipelines. These components can be used as building blocks for creating custom components. TFX heavily relies on Apache Beam libraries for implementing data-parallel pipelines within these components. You must execute components created using these libraries with one of the supported Apache Beam runners. + +To launch the pipeline, you must provide two configurations: the orchestrator for the TFX pipeline and the pipeline options necessary for running Apache Beam pipelines. + +You can find the complete TFX example code in the [Apache Beam GitHub repository](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/ml-orchestration/tfx). \ No newline at end of file diff --git a/learning/prompts/documentation-lookup/38_ai_runinference.md b/learning/prompts/documentation-lookup/38_ai_runinference.md new file mode 100644 index 000000000000..a494f74941c5 --- /dev/null +++ b/learning/prompts/documentation-lookup/38_ai_runinference.md @@ -0,0 +1,39 @@ +Prompt: +How can I run inference on a trained ML model using Apache Beam? + +Response: +Apache Beam enables efficient inference on both local and remote ML models within your pipelines through the RunInference API. This functionality is available in the Python SDK versions 2.40.0 and later. The Java SDK versions 2.41.0 and later also support the API through Apache Beam’s [Multi-language Pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) framework. The `RunInference` transform performs inference on a [`PCollection`](https://beam.apache.org/documentation/programming-guide/#pcollections) of examples using an ML model and outputs a `PCollection` containing both the input examples and the corresponding output predictions. + +Key features of the RunInference API include: +* Support for both batch and streaming inference. +* Centralized model management for efficient memory and bandwidth usage. +* Compatibility with multiple model frameworks and model hubs. +* Automatic model refreshing to ensure the latest model version is used. +* GPU support for model inference. + +The RunInference API supports a variety of frameworks and model hubs, including [TensorFlow](https://www.tensorflow.org/), [PyTorch](https://pytorch.org/), [Scikit-learn](https://scikit-learn.org/), [XGBoost](https://xgboost.ai/), [Hugging Face](https://huggingface.co/), [TensorFlow Hub](https://www.tensorflow.org/hub), [Vertex AI](https://cloud.google.com/vertex-ai), [TensorRT](https://developer.nvidia.com/tensorrt), and [ONNX](https://onnx.ai/). Additionally, you can easily integrate custom model frameworks. + +To import a model into your Apache Beam pipeline, you will need to configure the [`ModelHandler`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelHandler) object, which wraps the underlying model and allows you to set necessary environment variables for inference. + +Here is an example of importing a PyTorch model handler for use in your pipeline: + +```python +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.base import RunInference + + model_handler = PytorchModelHandlerTensor( + # Model handler setup + ) + +with pipeline as p: + predictions = p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(model_handler) +``` + +For comprehensive end-to-end examples of inference with supported model frameworks and model hubs, refer to the [Apache Beam GitHub repository](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference). + +If you need to run inference on a model that isn't explicitly supported, you can [create your own `ModelHandler` or `KeyedModelHandler`](https://beam.apache.org/documentation/ml/about-ml/#use-custom-models with custom logic] to load and use your model. For an example of running inference on a custom model loaded with [spaCy](https://spacy.io/), refer to the [Bring your own ML model to Beam RunInference](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/run_custom_inference.ipynb) example in the Apache Beam GitHub repository. + +For recommended patterns and best practices when leveraging Apache Beam for inference tasks, see the [RunInference Patterns](https://beam.apache.org/documentation/ml/about-ml/#runinference-patterns) section in the official documentation. + +For an example of using the RunInference API in the Java SDK, see the [example multi-language pipelines](https://github.com/apache/beam/tree/master/examples/multi-language) in the Apache Beam GitHub repository. Additionally, for an illustration of a composite Python transform integrating the RunInference API with preprocessing and postprocessing from a Beam Java SDK pipeline, you can refer to the [Using RunInference from Java SDK](https://beam.apache.org/documentation/ml/multi-language-inference/) section in the official documentation. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup/39_ai_runinference_metrics.md b/learning/prompts/documentation-lookup/39_ai_runinference_metrics.md new file mode 100644 index 000000000000..2c19af86f3e4 --- /dev/null +++ b/learning/prompts/documentation-lookup/39_ai_runinference_metrics.md @@ -0,0 +1,39 @@ +Prompt: +What metrics are available for monitoring the performance of ML model inference in Apache Beam? + +Response: +Apache Beam enables efficient inference on both local and remote ML models within your pipelines through the RunInference API. This functionality is available in the Python SDK versions 2.40.0 and later. The Java SDK versions 2.41.0 and later also support the API through Apache Beam’s [Multi-language Pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines) framework. + +The RunInference API provides built-in support for monitoring the performance of ML model inference. Here is a list of commonly used metrics for inference benchmarking: + +* `model_byte_size`: size of the memory footprint of the model load and initialization. +* `load_model_latency_milli_secs`: time taken to load and initialize the model. +* `num_inferences`: total number of elements passed to `run_inference()`. +* `inference_batch_latency_micro_secs`: time taken to perform inference across all batches of examples. +* `inference_request_batch_byte_size`: size of the batch in bytes. +* `inference_request_batch_size`: number of examples in the batch. +* `failed_batches_counter`: number of failed batches. + +You can derive additional metrics from the ones listed above. Example: +Total time taken for inference = `num_inferences` x `inference_batch_latency_micro_secs_MEAN` + +Here is a simplified example of how to use the RunInference API to perform inference on a language model (LM): + +```python + with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | "Create inputs" >> beam.Create(>) + | "Tokenize" >> beam.ParDo(Tokenize()) + | "Inference" >> RunInference()) +``` + +After running the pipeline, you can query the metrics using the following code: + +```python +metrics = pipeline.result.metrics().query(beam.metrics.MetricsFilter()) +``` + +Metrics are also available in the [Google Cloud Dataflow](https://cloud.google.com/dataflow) UI. For the complete source code of the example and instructions to run it in Dataflow, refer to the [Apache Beam GitHub repository](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference/runinference_metrics/). + +For a sample implementation of a metrics collector that tracks ML-related performance and memory usage, see the [base Python inference example](https://github.com/akvelon/beam/blob/371576a3b17b940380192378848dd00c55d0cc19/sdks/python/apache_beam/ml/inference/base.py#L1228) in the Apache Beam GitHub repository. \ No newline at end of file diff --git a/learning/prompts/documentation-lookup/40_ai_bigquery_ml.md b/learning/prompts/documentation-lookup/40_ai_bigquery_ml.md new file mode 100644 index 000000000000..3d7ba82de9a9 --- /dev/null +++ b/learning/prompts/documentation-lookup/40_ai_bigquery_ml.md @@ -0,0 +1,52 @@ +Prompt: +How can I integrate BigQuery ML with Apache Beam? + +Response: +[BigQuery ML](https://cloud.google.com/bigquery/docs/bqml-introduction) allows you to develop and execute ML models using [GoogleSQL](https://cloud.google.com/bigquery/docs/introduction-sql) queries. You can seamlessly integrate these models into your Apache Beam pipeline using [TFX Basic Shared Libraries (tfx_bsl)](https://github.com/tensorflow/tfx-bsl) and perform inference on them using the RunInference API. + +For an example of training a basic BigQuery ML model, refer to the [BigQuery documentation](https://cloud.google.com/bigquery/docs/bqml-introduction). + +Once you have trained your model, you will need to export it. Here is an example BigQuery command to export a model to a [Google Cloud Storage](https://cloud.google.com/storage/docs/creating-buckets) bucket: + +``` +bq extract -m gs:// +``` + +To incorporate your BigQuery ML model into an Apache Beam pipeline using `tfx_bsl`, it must be saved in the [TensorFlow SavedModel](https://www.tensorflow.org/guide/saved_model) format. You will need to download the model to your local directory to perform local predictions: + +```python +import apache_beam +import tensorflow as tf +from google.protobuf import text_format +from tensorflow.python.framework import tensor_util +from tfx_bsl.beam import run_inference +from tfx_bsl.public.beam import RunInference +from tfx_bsl.public.proto import model_spec_pb2 + +inputs = tf.train.Example(...) + +model_path = + +def extract_prediction(response): + # Extract the prediction from the response depending on the signature of the model + +with beam.Pipeline() as p: + res = ( + p + | beam.Create([inputs]) + | RunInference( + model_spec_pb2.InferenceSpecType( + saved_model_spec=model_spec_pb2.SavedModelSpec( + model_path=model_path, + signature_name=['serving_default']))) + | beam.ParDo(extract_prediction) + ) +``` + +This example uses the [`RunInference`](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) transform from the `tfx_bsl` library, directing it to the local directory where the model is stored. The transform takes `tf.train.Example` type elements as inputs and produces `tensorflow_serving.apis.prediction_log_pb2.PredictionLog` type elements as outputs. + +Depending on the signature of your model, you can extract values from the output. For the prediction output format of exported models for each model type, refer to the [Prediction output format](https://cloud.google.com/bigquery/docs/exporting-models#prediction_output_format) section in the BigQuery documentation. + + + + diff --git a/learning/prompts/documentation-lookup/41_ai_model_refresh.md b/learning/prompts/documentation-lookup/41_ai_model_refresh.md new file mode 100644 index 000000000000..f23c30af600e --- /dev/null +++ b/learning/prompts/documentation-lookup/41_ai_model_refresh.md @@ -0,0 +1,42 @@ +Prompt: +How can I make sure my Apache Beam pipeline uses the latest version of the ML model? + +Response: +Production ML workflows often involve updating the ML model with new data. You can ensure that your Apache Beam pipeline uses the latest version of the ML model by leveraging the RunInference API alongside Apache Beam's [side inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) feature. Side inputs are additional inputs that can be provided to a [`ParDo`](https://beam.apache.org/documentation/programming-guide/#pardo) transform, in addition to the main input [`PCollection`](https://beam.apache.org/documentation/basics/#pcollection). + +The `RunInference` transform accepts an optional input parameter `model_metadata_pcoll`, which is a side input `PCollection` emitting `ModelMetadata`. `ModelMetadata` is a `NamedTuple` object containing `model_id` and `model_name`, used to load the model for inference and identify it in the metrics generated by the `RunInference` transform. The URL or path to the model should be compatible with the respective [`ModelHandler` requirements](https://beam.apache.org/documentation/ml/about-ml/#modify-a-python-pipeline-to-use-an-ml-model). + +If the main collection emits inputs before the `model_metadata_pcoll` side input is available, the main `PCollection` will be buffered until the `model_metadata_pcoll` side input is emitted. + +For more information on `ModelMetadata`, refer to the [Apache Beam Python SDK documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata). + +A common approach to model updates in production is to use a `FileWatchPattern` as a side input: + +```python +import apache_beam as beam +from apache_beam.ml.inference.utils import WatchFilePattern +from apache_beam.ml.inference.base import RunInference + +tf_model_handler = ... # model handler for the model + +with beam.Pipeline() as pipeline: + + file_pattern = '' + + side_input_pcoll = ( + pipeline + | "FilePatternUpdates" >> WatchFilePattern(file_pattern=file_pattern)) + + main_input_pcoll = ... # main input PCollection + + inference_pcoll = ( + main_input_pcoll + | "RunInference" >> RunInference( + model_handler=model_handler, + model_metadata_pcoll=side_input_pcoll)) +``` + +In the provided example, the `model_metadata_pcoll` parameter expects a `PCollection` of `ModelMetadata` compatible with the `AsSingleton` marker. Given that the pipeline employs the `WatchFilePattern` class as a side input, it automatically manages windowing and encapsulates the output into `ModelMetadata`. + +For more information, refer to the [Use `WatchFilePattern` to auto-update ML models in RunInference](https://beam.apache.org/documentation/ml/side-input-updates/) section in the Apache Beam documentation. + diff --git a/learning/prompts/documentation-lookup/42_ai_custom_inference.md b/learning/prompts/documentation-lookup/42_ai_custom_inference.md new file mode 100644 index 000000000000..d4cea37cce0b --- /dev/null +++ b/learning/prompts/documentation-lookup/42_ai_custom_inference.md @@ -0,0 +1,50 @@ +Prompt: +How can I use custom remote inference calls in my Apache Beam pipeline? + +Response: +The optimal approach for conducting inference within an Apache Beam pipeline is by leveraging the [RunInference API](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.html#apache_beam.ml.inference.RunInference) provided by the Apache Beam Python SDK. This feature allows you to seamlessly incorporate ML models into your pipeline or execute remote inference calls. + +When developing custom inference calls, consider the following factors: +* **API quotas**: heavy loads may lead to reaching API quota limits. You can use [`PipelineOptions`](https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options) to specify the maximum number of parallel API calls. Use `direct_num_workers` for the [Direct Runner](https://beam.apache.org/documentation/runners/direct/) or `max_num_workers` for the [Google Cloud Dataflow Runner](https://beam.apache.org/documentation/runners/dataflow/). Refer to the [Beam Capability Matrix](https://beam.apache.org/documentation/runners/capability-matrix/) for information about other runners. +* **Error handling**: it is essential to handle errors in case of API call failures. Consider implementing exponential backoff for retries or utilizing dead-letter queues for failed API calls. +* **Monitoring**: incorporate monitoring and performance metrics to track the performance of your inference calls and the health of your pipeline. +* **Batching**: batching can be used to send multiple inputs in a single API call for improved efficiency. + +To execute external API calls with the `RunInference` transform, you will need to create a `beam.DoFn` in the form of a custom model handler: + +```python +class CustomModelHandler(ModelHandler): + """DoFn that accepts a batch of inputs and sends that batch to the remote API for inference""" + + def initialize_client(self): + """Initiate the Custom remote API client.""" + client = ... # Initialize the client + return client + + def run_inference(self, batch, model, inference): + + # Initialize the client. + client = self.initialize_client() + + # Prepare a batch request for all inputs in the batch. + inputs = ... # Process inputs from the batch + input_requests = ... # Prepare input requests for the model + batch_request = ... # Prepare batch request for the model + + # Send the batch request to the remote endpoint. + responses = client.request(request=batch_request).responses + + return responses +``` + +Integrate this custom model handler into your pipeline as demonstrated in the following example: + +```python +with beam.Pipeline() as pipeline: + _ = (pipeline | "Create inputs" >> beam.Create() + | "Inference" >> RunInference(model_handler=CustomModelHandler()) + | "Process outputs" >> beam.Map() + ) +``` + +For a comprehensive example of using the RunInference API for remote inference calls, refer to the [Apache Beam GitHub repository](https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/custom_remote_inference.ipynb). diff --git a/learning/prompts/documentation-lookup/43_ai_llm_inference.md b/learning/prompts/documentation-lookup/43_ai_llm_inference.md new file mode 100644 index 000000000000..da21cbc827a8 --- /dev/null +++ b/learning/prompts/documentation-lookup/43_ai_llm_inference.md @@ -0,0 +1,60 @@ +Prompt: +How can I use Apache Beam to run inference on Large Language Models (LLMs)? + +Response: +The optimal approach for conducting inference within an Apache Beam pipeline is by leveraging the [RunInference API](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.html#apache_beam.ml.inference.RunInference) provided by the Apache Beam Python SDK. This feature allows you to seamlessly incorporate ML models into your pipeline or execute remote inference calls. + +You can use the `RunInference` transform with large models as long as they fit on your hardware. In addition, RunInference has several mechanisms for reducing memory usage. For example, `RunInference` by default will load at most a single copy of each model per process (rather than one per thread). + +Many Beam runners run multiple Beam processes per machine at once. Loading large models like LLMs multiple times might cause problems because of the negative impact on memory footprint. For memory-intensive models, `RunInference` provides a mechanism for more intelligently sharing memory across multiple processes to reduce the overall memory footprint. To enable this mode, set the parameter `large_model` to True in model configuration and Beam will take care of the memory management. + +The typical workflow for conducting inference on Large Language Models (LLMs) within an Apache Beam pipeline involves the following steps: +1. Read the input text data from a source such as a file or a Pub/Sub topic. +2. Encode the text into tokens understandable by the LLM model, typically using a tokenizer. +3. Use the `RunInference` transform to obtain predictions from the model. +4. Decode the predictions into human-readable text. + +Here is an example demonstrating how to leverage the RunInference API with LLMs in an Apache Beam pipeline: + +```python +import apache_beam as beam +from apache_beam.ml.inference.base import RunInference + +with beam.Pipeline(options=pipeline_options) as pipeline: + _ = ( + pipeline + | "CreateInputs" >> beam.Create(task_sentences) + | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer)) + | "RunInference" >> RunInference(model_handler=model_handler) + | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer)) + ) +``` + +In this example, `Preprocess` and `Postprocess` are `DoFn` classes responsible for encoding and decoding the input and output data using a tokenizer. The `model_handler` is a `ModelHandler` object that wraps the underlying model and allows you to configure environment variables required to load the model. + +Here is an example of how to configure a PyTorch model handler for use in your pipeline: + +```python +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +model_handler = PytorchModelHandlerTensor( + state_dict_path=known_args.model_state_dict_path, + model_class=T5ForConditionalGeneration, + model_params={ + "config": AutoConfig.from_pretrained(known_args.model_name) + }, + device="cpu", + inference_fn=gen_fn, + large_model=True) +``` + +Each specific model handler has its own configuration parameters. For example, the `PytorchModelHandlerTensor` requires the following parameters: +* `state_dict_path`: the path to the saved model state dictionary. +* `model_class`: the class of the PyTorch model that defines model structure. +* `model_params`: the dictionary of model parameters. +* `device`: the device to run the model on (e.g. "cpu" or "gpu"). +* `inference_fn`: the function to run the inference during RunInference. +* `large_model`: whether to use the memory minimization mode for large models. + +For information on supported popular frameworks and models, refer to the reference documentation for the [`apache_beam.ml.inference` package](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.html). + +You can find the comprehensive example of using the RunInference API with LLMs in the [Apache Beam GitHub repository](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/large_language_modeling/main.py). \ No newline at end of file