Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[blog] - Beam YAML protobuf blogpost #32735

Merged
merged 18 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 274 additions & 0 deletions website/www/site/content/en/blog/beam-yaml-proto.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
---
title: "Efficient Streaming Data Processing with Beam YAML and Protobuf"
date: "2024-09-20T11:53:38+02:00"
categories:
- blog
authors:
- ffernandez92
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

As streaming data processing grows, so do its maintenance, complexity, and costs.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
This post will explain how to efficiently scale pipelines using [Protobuf](https://protobuf.dev/),
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
ensuring they are reusable and quick to deploy. Our goal is to keep this process simple
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
for engineers to implement using [Beam YAML](https://beam.apache.org/documentation/sdks/yaml/).

<!--more-->

# Simplifying Pipelines with Beam YAML
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

Creating a pipeline in Beam can be somewhat difficult, especially for newcomers with little experience with Beam.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
Setting up the project, managing dependencies, and so on can be challenging.
Beam YAML helps eliminate most of the boilerplate code,
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
allowing you to focus solely on the most important part: data transformation.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

Some of the main key benefits include:
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

* **Readability:** By using a declarative language ([YAML](https://yaml.org/)), we improve the human readability
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
aspect of the pipeline configuration.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
* **Reusability:** It is much simpler to reuse the same components across different pipelines.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
* **Maintainability:** It simplifies pipeline maintenance and updates.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

The following template shows an example of reading events from a [Kafka](https://kafka.apache.org/intro) topic and
writing them into [BigQuery](https://cloud.google.com/bigquery?hl=en).

```yaml
pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'TOPIC_NAME'
format: RAW/AVRO/JSON/PROTO
bootstrap_servers: 'BOOTSTRAP_SERVERS'
schema: 'SCHEMA'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: 'PROJECT_ID.DATASET.MOVIE_EVENTS_TABLE'
useAtLeastOnceSemantics: true

options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
```

# Bringing It All Together
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

### Let's create a simple proto event:
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

```protobuf
// events/v1/movie_event.proto

syntax = "proto3";

package event.v1;

import "bq_field.proto";
import "bq_table.proto";
import "buf/validate/validate.proto";
import "google/protobuf/wrappers.proto";

message MovieEvent {
option (gen_bq_schema.bigquery_opts).table_name = "movie_table";
google.protobuf.StringValue event_id = 1 [(gen_bq_schema.bigquery).description = "Unique Event ID"];
google.protobuf.StringValue user_id = 2 [(gen_bq_schema.bigquery).description = "Unique User ID"];
google.protobuf.StringValue movie_id = 3 [(gen_bq_schema.bigquery).description = "Unique Movie ID"];
google.protobuf.Int32Value rating = 4 [(buf.validate.field).int32 = {
// validates the average rating is at least 0
gte: 0,
// validates the average rating is at most 100
lte: 100
}, (gen_bq_schema.bigquery).description = "Movie rating"];
string event_dt = 5 [
(gen_bq_schema.bigquery).type_override = "DATETIME",
(gen_bq_schema.bigquery).description = "UTC Datetime representing when we received this event. Format: YYYY-MM-DDTHH:MM:SS",
(buf.validate.field) = {
string: {
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
},
ignore_empty: false,
}
];
}
```

As you can see here, there are important points to consider. Since we are planning to write these events to BigQuery,
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
we have imported the *[bq_field](https://buf.build/googlecloudplatform/bq-schema-api/file/main:bq_field.proto)*
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
and *[bq_table](https://buf.build/googlecloudplatform/bq-schema-api/file/main:bq_table.proto)* proto.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
These proto files help generate the BigQuery JSON schema.
In our example, we are also advocating for a shift-left approach, which means we want to move testing, quality,
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
and performance as early as possible in the development process. This is why we have included the *buf.validate*
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
elements to ensure that only valid events are generated from the source.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

Once we have our *movie_event.proto* in the *events/v1* folder, we can generate
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
the necessary [file descriptor](https://buf.build/docs/reference/descriptors).
Essentially, a file descriptor is a compiled representation of the schema that allows various tools and systems
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
to understand and work with Protobuf data dynamically. To simplify the process, we are using Buf in this example,
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
so we will need the following configuration files.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved


<b>Buf configuration:</b>

```yaml
# buf.yaml

version: v2
deps:
- buf.build/googlecloudplatform/bq-schema-api
- buf.build/bufbuild/protovalidate
breaking:
use:
- FILE
lint:
use:
- DEFAULT
```

```yaml
# buf.gen.yaml

version: v2
managed:
enabled: true
plugins:
# Python Plugins
- remote: buf.build/protocolbuffers/python
out: gen/python
- remote: buf.build/grpc/python
out: gen/python

# Java Plugins
- remote: buf.build/protocolbuffers/java:v25.2
out: gen/maven/src/main/java
- remote: buf.build/grpc/java
out: gen/maven/src/main/java

# BQ Schemas
- remote: buf.build/googlecloudplatform/bq-schema:v1.1.0
out: protoc-gen/bq_schema

```

Running the following two commands we will generate the necessary Java, Python, BigQuery schema, and Descriptor File:
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

```bash
// Generate the buf.lock file
buf deps update

// It will generate the descriptor in descriptor.binp.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
buf build . -o descriptor.binp --exclude-imports

// It will generate the Java, Python and BigQuery schema as described in buf.gen.yaml
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
buf generate --include-imports
```

# Let’s make our Beam YAML read proto:
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

These are the modifications we need to make to the YAML file:
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

```yaml
# movie_events_pipeline.yml

pipeline:
transforms:
- type: ReadFromKafka
name: ReadProtoMovieEvents
config:
topic: 'movie_proto'
format: PROTO
bootstrap_servers: '<BOOTSTRAP_SERVERS>'
file_descriptor_path: 'gs://my_proto_bucket/movie/v1.0.0/descriptor.binp'
message_name: 'event.v1.MovieEvent'
- type: WriteToBigQuery
name: WriteMovieEvents
input: ReadProtoMovieEvents
config:
table: '<PROJECT_ID>.raw.movie_table'
useAtLeastOnceSemantics: true
options:
streaming: true
dataflow_service_options: [streaming_mode_at_least_once]
```

As you can see, we just changed the format to be *PROTO* and added the *file_descriptor_path* and the *message_name*.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

### Let’s use Terraform to deploy it
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

We can consider using [Terraform](https://www.terraform.io/) to deploy our Beam YAML pipeline
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
with [Dataflow](https://cloud.google.com/products/dataflow?hl=en) as the runner.
The following Terraform code example demonstrates how to achieve this:

```hcl
// Enable Dataflow API.
resource "google_project_service" "enable_dataflow_api" {
project = var.gcp_project_id
service = "dataflow.googleapis.com"
}

// DF Beam YAML
resource "google_dataflow_flex_template_job" "data_movie_job" {
provider = google-beta
project = var.gcp_project_id
name = "movie-proto-events"
container_spec_gcs_path = "gs://dataflow-templates-${var.gcp_region}/latest/flex/Yaml_Template"
region = var.gcp_region
on_delete = "drain"
machine_type = "n2d-standard-4"
enable_streaming_engine = true
subnetwork = var.subnetwork
skip_wait_on_job_termination = true
parameters = {
yaml_pipeline_file = "gs://${var.bucket_name}/yamls/${var.package_version}/movie_events_pipeline.yml"
max_num_workers = 40
worker_zone = var.gcp_zone
}
depends_on = [google_project_service.enable_dataflow_api]
}
```

Assuming we have created the BigQuery table, which can also be done using Terraform and Proto as described earlier,
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
the previous code should create a Dataflow job using our Beam YAML code that reads Proto events from
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
Kafka and writes them into BigQuery.

# Improvements and Conclusions
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

Some potential improvements that can be done as part of community contributions to the previous Beam YAML code are:
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

* **Supporting Schema Registries:** Integrate with schema registries such as Buf Registry or Apicurio for
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
better schema management. In the current solution, we generate the descriptors via Buf and store them in GCS.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
better schema management. In the current solution, we generate the descriptors via Buf and store them in GCS.
better schema management. The current workflow generates the descriptors by using Buf and store them in Google Cloud Storage.

We could store them in a schema registry instead.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
We could store them in a schema registry instead.
The descriptors could be stored in a schema registry instead.



* **Enhanced Monitoring:** Implement advanced monitoring and alerting mechanisms to quickly identify and address
issues in the data pipeline.

As a conclusion, by leveraging Beam YAML and Protobuf, we have streamlined the creation and maintenance of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
As a conclusion, by leveraging Beam YAML and Protobuf, we have streamlined the creation and maintenance of
Leveraging Beam YAML and Protobuf lets us streamline the creation and maintenance of

data processing pipelines, significantly reducing complexity. This approach ensures that engineers can more
efficiently implement and scale robust, reusable pipelines, compared to writing the equivalent Beam code manually.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
efficiently implement and scale robust, reusable pipelines, compared to writing the equivalent Beam code manually.
efficiently implement and scale robust, reusable pipelines without needs to manually write Beam code.


## Contributing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## Contributing
## Contribute


Developers who wish to help build out and add functionalities are welcome to start contributing to the effort in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Developers who wish to help build out and add functionalities are welcome to start contributing to the effort in the
Developers who want to help build out and add functionalities are welcome to start contributing to the effort in the

Beam YAML module found [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Beam YAML module found [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml).
[Beam YAML module](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/yaml).


There is also a list of open [bugs](https://github.com/apache/beam/issues?q=is%3Aopen+is%3Aissue+label%3Ayaml) found
on the GitHub repo - now marked with the 'yaml' tag.
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved

While Beam YAML has been marked stable as of Beam 2.52, it is still under heavy development, with new features being
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
added with each release. Those who wish to be part of the design decisions and give insights to how the framework is
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
being used are highly encouraged to join the dev mailing list as those discussions will be directed there. A link to
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
the dev list can be found [here](https://beam.apache.org/community/contact-us/).
ffernandez92 marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion website/www/site/data/authors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,7 @@ jkinard:
email: [email protected]
jkim:
name: Jaehyeon Kim
email: [email protected]
email: [email protected]
ffernandez92:
name: Ferran Fernandez
email: [email protected]
Loading