Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duet AI documentation lookup prompts for Beam ML #30351

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Prompt:
How can I use Apache Beam in my AI/ML pipeline?

Response:
Apache Beam is an open-source, unified programming model for defining batch and streaming data-parallel processing pipelines. Each Apache Beam pipeline is represented as a Directed Acyclic Graph (DAG) of data processing operations constructed using the Apache Beam SDK in one of the supported languages. To execute a pipeline, you need to deploy it to one of the supported Beam runners.

You can use Apache Beam for various tasks within your AI/ML pipeline, including data validation, preprocessing, model validation, and model deployment and inference.

Apache Beam offers a rich set of I/O connectors and `PTransforms` that allow for reading and writing data from and to various data sources and sinks, as well as performing data validation.

For data preprocessing, Apache Beam provides the `MLTransform` class. This feature allows you to ensure data consistency by applying the same preprocessing steps for both training and inference.

Additionally, Apache Beam allows integration with pre-trained models from PyTorch, Scikit-learn, and TensorFlow.

To execute machine learning inference tasks, Apache Beam provides the RunInference API. `RunInference` is a `PTransform` that supports the usage of any custom models. For an example of running inference on a custom model loaded with spaCy, refer to the Apache Beam GitHub repository.

The `RunInference` transform efficiently handles models of any size, making it suitable for large language models (LLMs) and other complex architectures. You can find an example of deploying and performing inference on large language models (LLMs) in the 'RunInference' section of the Apache Beam documentation.

For more information on implementing AI/ML pipelines using Apache Beam, see the 'Get started with AI/ML pipelines' section in the Apache Beam documentation.



olehborysevych marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
Prompt:
How can I orchestrate Apache Beam pipelines with Kubeflow?

Response:
You can use Apache Beam for various tasks within your machine learning project, including data validation, preprocessing, model validation, and model deployment and inference.

However, a comprehensive ML workflow also involves additional steps like data exploration, feature engineering, and model training. Additionally, ML workflows require metadata and artifact tracking for reproducibility and auditability. Popular tools for building and managing ML workflows include Kubeflow, TFX, and Apache Airflow.

Kubeflow is an open-source ML platform tailored for MLOps, which comprises best practices and guiding principles to streamline the development and maintenance of ML workflows. Kubeflow provides a set of tools for building, deploying, and managing end-to-end ML pipelines in the form of DAGs. The DAGs orchestrate the scheduling and execution of pipeline steps and facilitate the transfer of execution parameters, metadata, and artifacts between steps.

You can integrate an Apache Beam pipeline into a Kubeflow pipeline, where the Apache Beam pipeline DAG serves as a node within the Kubeflow pipeline DAG.

To run an Apache Beam pipeline as a part of a Kubeflow Pipeline (KFP), you need to:
1. Create KFP components by specifying interfaces and containerizing the component implementation.
2. Create the KFP pipeline by connecting components and defining how inputs and outputs are exchanged between them.
3. Compile and run the KFP pipeline by submitting it to a KFP client endpoint.

Here is an example of a KFP pipeline orchestrating an Apache Beam preprocessing pipeline:

```
kfp
├── pipeline.py
├── components
│ └── preprocess
│ ├── Dockerfile
│ ├── component.yaml
│ ├── requirements.txt
│ └── src
│ └── preprocess.py
└── requirements.txt
```

Apache Beam pipelines can be used as KFP components consisting of a YAML specification `component.yaml` and a Python source file `preprocess.py`. The YAML file defines the input and output arguments of the component, while the Python file contains the Apache Beam pipeline code.

Since KFP provides the input and output arguments as command-line arguments, you need to use the `ArgumentParser` class. You containerize the logic and prerequisites of the pipeline components, and then push the resulting container image to a container registry.

In the final stage, the `compiler` method transforms the KFP pipeline into a JSON file, and the `pipeline.py` script submits it to a KFP client endpoint.

```python
Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

client = kfp.Client()
experiment = client.create_experiment("KFP orchestration example")
run_result = client.run_pipeline(
experiment_id=experiment.id,
job_name="KFP orchestration job",
pipeline_package_path="pipeline.json",
params=run_arguments)
```

For a complete example of orchestrating an Apache Beam preprocessing pipeline with Kubeflow, refer to the Apache Beam GitHub repository.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Prompt:
How can I orchestrate Apache Beam pipelines with TFX?

olehborysevych marked this conversation as resolved.
Show resolved Hide resolved
Response:
You can use Apache Beam for various tasks within your machine learning project, including data validation, preprocessing, model validation, and model deployment and inference.

However, a comprehensive ML workflow also involves additional steps like data exploration, feature engineering, and model training. Additionally, ML workflows require metadata and artifact tracking for reproducibility and auditability. Popular tools for building and managing ML workflows include Kubeflow, TFX, and Apache Airflow.

TFX is an end-to-end platform designed for deploying production ML pipelines. TFX pipelines consist of a sequence of components that together form an ML pipeline. Each TFX pipeline DAG requires its own orchestrator. The natively supported orchestrators include Kubeflow Pipelines, Apache Airflow, and Apache Beam itself.

To integrate an Apache Beam pipeline into a TFX pipeline, you need to follow these steps:
1. Create TFX components by specifying interfaces and containerizing the component implementation.
2. Create the TFX pipeline by connecting these components and specifying how inputs and outputs are exchanged between them.
3. Compile and execute the TFX pipeline in the target environment.

TFX provides a rich set of standard components for constructing ML pipelines. These components can be used as building blocks for creating custom components. TFX heavily relies on Apache Beam libraries for implementing data-parallel pipelines within these components. You must execute components created using these libraries with one of the supported Apache Beam runners.

To launch the pipeline, you must provide two configurations: the orchestrator for the TFX pipeline and the pipeline options necessary for running Apache Beam pipelines.

You can find the complete TFX example code in the Apache Beam GitHub repository.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Prompt:
How can I run inference on a trained ML model using Apache Beam?

Response:
Apache Beam enables efficient inference on both local and remote ML models within your pipelines through the RunInference API. This functionality is available in the Python SDK versions 2.40.0 and later. The Java SDK versions 2.41.0 and later also support the API through Apache Beam’s Multi-language Pipelines framework. The `RunInference` transform performs inference on a `PCollection` of examples using an ML model and outputs a `PCollection` containing both the input examples and the corresponding output predictions.

Key features of the RunInference API include:
* Support for both batch and streaming inference.
* Centralized model management for efficient memory and bandwidth usage.
* Compatibility with multiple model frameworks and model hubs.
* Automatic model refreshing to ensure the latest model version is used.
* GPU support for model inference.

The RunInference API supports a variety of frameworks and model hubs, including TensorFlow, PyTorch, Scikit-learn, XGBoost, Hugging Face, TensorFlow Hub, Vertex AI, TensorRT, and ONNX. Additionally, you can easily integrate custom model frameworks.

To import a model into your Apache Beam pipeline, you will need to configure the `ModelHandler` object, which wraps the underlying model and allows you to set necessary environment variables for inference.

Here is an example of importing a PyTorch model handler for use in your pipeline:

```python
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.base import RunInference

model_handler = PytorchModelHandlerTensor(
# Model handler setup
)

with pipeline as p:
predictions = p | 'Read' >> beam.ReadFromSource('a_source')
| 'RunInference' >> RunInference(model_handler)
```

If you need to run inference on a model that isn't explicitly supported, you can create your own `ModelHandler` or `KeyedModelHandler` with custom logic to load and use your model. For an example of running inference on a custom model loaded with spaCy, refer to the examples in the Apache Beam GitHub repository.

For recommended patterns and best practices when leveraging Apache Beam for inference tasks, see the 'RunInference Patterns' section in the official documentation.

For comprehensive end-to-end examples of inference with supported model frameworks and model hubs, including examples of using the RunInference API in the Java SDK, see the Apache Beam GitHub repository. Additionally, for an illustration of a composite Python transform integrating the RunInference API with preprocessing and postprocessing from a Beam Java SDK pipeline, you can refer to the Apache Beam official documentation.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Prompt:
What metrics are available for monitoring the performance of ML model inference in Apache Beam?

Response:
Apache Beam enables efficient inference on both local and remote ML models within your pipelines through the RunInference API. This functionality is available in the Python SDK versions 2.40.0 and later. The Java SDK versions 2.41.0 and later also support the API through Apache Beam’s Multi-language Pipelines framework.

The RunInference API provides built-in support for monitoring the performance of ML model inference. Here is a list of commonly used metrics for inference benchmarking:
* `model_byte_size`: size of the memory footprint of the model load and initialization.
* `load_model_latency_milli_secs`: time taken to load and initialize the model.
* `num_inferences`: total number of elements passed to `run_inference()`.
* `inference_batch_latency_micro_secs`: time taken to perform inference across all batches of examples.
* `inference_request_batch_byte_size`: size of the batch in bytes.
* `inference_request_batch_size`: number of examples in the batch.
* `failed_batches_counter`: number of failed batches.

You can derive additional metrics from the ones listed above. Example:
Total time taken for inference = `num_inferences` x `inference_batch_latency_micro_secs_MEAN`

Here is a simplified example of how to use the RunInference API to perform inference on a language model (LM):

```python
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| "Create inputs" >> beam.Create(<INPUTS>>)
| "Tokenize" >> beam.ParDo(Tokenize(<TOKENIZER_NAME>))
| "Inference" >> RunInference(<MODEL_HANDLER>))
```

After running the pipeline, you can query the metrics using the following code:
olehborysevych marked this conversation as resolved.
Show resolved Hide resolved

```python
metrics = pipeline.result.metrics().query(beam.metrics.MetricsFilter())
```

Metrics are also available in the Google Cloud Dataflow UI. For the complete source code of the example and instructions to run it in Dataflow, refer to the Apache Beam GitHub repository.

For a sample implementation of a metrics collector that tracks ML-related performance and memory usage, see the base Python inference example in the Apache Beam GitHub repository.
48 changes: 48 additions & 0 deletions learning/prompts/documentation-lookup-nolinks/40_ai_bigquery_ml.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Prompt:
How can I integrate BigQuery ML with Apache Beam?

Response:
BigQuery ML allows you to develop and execute ML models using GoogleSQL queries. You can seamlessly integrate these models into your Apache Beam pipeline using TFX Basic Shared Libraries (tfx_bsl) and perform inference on them using the RunInference API.

For an example of training a basic BigQuery ML model, refer to the BigQuery documentation.

Once you have trained your model, you will need to export it. Here is an example BigQuery command to export a model to a Google Cloud Storage bucket:

```
bq extract -m <model name> gs://<cloud storage path>
```

To incorporate your BigQuery ML model into an Apache Beam pipeline using `tfx_bsl`, it must be saved in the TensorFlow SavedModel format. You will need to download the model to your local directory to perform local predictions:

```python
import apache_beam
import tensorflow as tf
from google.protobuf import text_format
from tensorflow.python.framework import tensor_util
from tfx_bsl.beam import run_inference
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2

inputs = tf.train.Example(...)

model_path = <path to the directory where the model is stored>

def extract_prediction(response):
# Extract the prediction from the response depending on the signature of the model

with beam.Pipeline() as p:
res = (
p
| beam.Create([inputs])
| RunInference(
model_spec_pb2.InferenceSpecType(
saved_model_spec=model_spec_pb2.SavedModelSpec(
model_path=model_path,
signature_name=['serving_default'])))
| beam.ParDo(extract_prediction)
)
```

This example uses the `RunInference` transform from the `tfx_bsl` library, directing it to the local directory where the model is stored. The transform takes `tf.train.Example` type elements as inputs and produces `tensorflow_serving.apis.prediction_log_pb2.PredictionLog` type elements as outputs.

Depending on the signature of your model, you can extract values from the output. For the prediction output format of exported models for each model type, refer to the 'Prediction output format' section in the BigQuery documentation.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
Prompt:
How can I make sure my Apache Beam pipeline uses the latest version of the ML model?

Response:
Production ML workflows often involve updating the ML model with new data. You can ensure that your Apache Beam pipeline uses the latest version of the ML model by leveraging the RunInference API alongside Apache Beam's side inputs feature. Side inputs are additional inputs that can be provided to a `ParDo` transform, in addition to the main input `PCollection`.

The `RunInference` transform accepts an optional input parameter `model_metadata_pcoll`, which is a side input `PCollection` emitting `ModelMetadata`. `ModelMetadata` is a `NamedTuple` object containing `model_id` and `model_name`, used to load the model for inference and identify it in the metrics generated by the `RunInference` transform. The URL or path to the model should be compatible with the respective `ModelHandler` requirements.

If the main collection emits inputs before the `model_metadata_pcoll` side input is available, the main `PCollection` will be buffered until the `model_metadata_pcoll` side input is emitted.

For more information on `ModelMetadata`, refer to the Apache Beam Python SDK documentation.

A common approach to model updates in production is to use a `FileWatchPattern` as a side input:

```python
import apache_beam as beam
from apache_beam.ml.inference.utils import WatchFilePattern
from apache_beam.ml.inference.base import RunInference

tf_model_handler = ... # model handler for the model

with beam.Pipeline() as pipeline:

file_pattern = '<path_to_model_file>'

side_input_pcoll = (
pipeline
| "FilePatternUpdates" >> WatchFilePattern(file_pattern=file_pattern))

main_input_pcoll = ... # main input PCollection

inference_pcoll = (
main_input_pcoll
| "RunInference" >> RunInference(
model_handler=model_handler,
model_metadata_pcoll=side_input_pcoll))
```

In the provided example, the `model_metadata_pcoll` parameter expects a `PCollection` of `ModelMetadata` compatible with the `AsSingleton` marker. Given that the pipeline employs the `WatchFilePattern` class as a side input, it automatically manages windowing and encapsulates the output into `ModelMetadata`.

For more information, refer to the section 'Use `WatchFilePattern` to auto-update ML models in RunInference' in the Apache Beam documentation.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
Prompt:
How can I use custom remote inference calls in my Apache Beam pipeline?

Response:
The optimal approach for conducting inference within an Apache Beam pipeline is by leveraging the RunInference API provided by the Apache Beam Python SDK. This feature allows you to seamlessly incorporate ML models into your pipeline or execute remote inference calls.

When developing custom inference calls, consider the following factors:
* **API quotas**: heavy loads may lead to reaching API quota limits. You can use `PipelineOptions` to specify the maximum number of parallel API calls. Use `direct_num_workers` for the Direct Runner or `max_num_workers` for the Google Cloud Dataflow Runner. Refer to the Beam Capability Matrix for information about other runners.
* **Error handling**: it is essential to handle errors in case of API call failures. Consider implementing exponential backoff for retries or utilizing dead-letter queues for failed API calls.
* **Monitoring**: incorporate monitoring and performance metrics to track the performance of your inference calls and the health of your pipeline.
* **Batching**: batching can be used to send multiple inputs in a single API call for improved efficiency.

To execute external API calls with the `RunInference` transform, you will need to create a `beam.DoFn` in the form of a custom model handler:

```python
class CustomModelHandler(ModelHandler):
"""DoFn that accepts a batch of inputs and sends that batch to the remote API for inference"""

def initialize_client(self):
"""Initiate the Custom remote API client."""
client = ... # Initialize the client
return client

def run_inference(self, batch, model, inference):

# Initialize the client.
client = self.initialize_client()

# Prepare a batch request for all inputs in the batch.
inputs = ... # Process inputs from the batch
input_requests = ... # Prepare input requests for the model
batch_request = ... # Prepare batch request for the model

# Send the batch request to the remote endpoint.
responses = client.request(request=batch_request).responses

return responses
```

Integrate this custom model handler into your pipeline as demonstrated in the following example:

```python
with beam.Pipeline() as pipeline:
_ = (pipeline | "Create inputs" >> beam.Create(<read input data>)
| "Inference" >> RunInference(model_handler=CustomModelHandler())
| "Process outputs" >> beam.Map(<write output data>)
)
```

For a comprehensive example of using the RunInference API for remote inference calls, refer to the Apache Beam GitHub repository.
Loading
Loading