Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(samples): Add Dataflow to Pub/Sub snippet #11104

Merged
merged 8 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions dataflow/snippets/noxfile_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Default TEST_CONFIG_OVERRIDE for python repos.

# You can copy this file into your directory, then it will be imported from
# the noxfile.py.

# The source of truth:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12"],
rsamborski marked this conversation as resolved.
Show resolved Hide resolved
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For new samples we recommend using type hints, they make the code more self-documented and easier to understand.

Type hints can sometimes be a little tricky to figure out, so feel free to ask if you need any help with any of them.

I believe you can run the type checks with nox -s lint (more info).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks -- I added type hints. I'm not super familiar with them, so let me know if anything looks wonky!

# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": None,
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
"envs": {},
}
99 changes: 99 additions & 0 deletions dataflow/snippets/tests/test_write_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# !/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import time
import uuid

from google.cloud import pubsub_v1

import pytest

from ..write_pubsub import write_to_pubsub


topic_id = f'test-topic-{uuid.uuid4()}'
subscription_id = f'{topic_id}-sub'
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

NUM_MESSAGES = 4
TIMEOUT = 60 * 5


@pytest.fixture(scope="function")
def setup_and_teardown():
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

try:
publisher.create_topic(request={"name": topic_path})
subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)
yield
finally:
subscriber.delete_subscription(
request={"subscription": subscription_path})
publisher.delete_topic(request={"topic": topic_path})


def read_messages():
received_messages = []
ack_ids = []

# Read messages from Pub/Sub. It might be necessary to read multiple
# batches, Use a timeout value to avoid potentially looping forever.
start_time = time.time()
while time.time() - start_time <= TIMEOUT:
# Pull messages from Pub/Sub.
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
)
received_messages.append(response.received_messages)

for received_message in response.received_messages:
ack_ids.append(received_message.ack_id)

# Acknowledge the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)

if (len(received_messages) >= NUM_MESSAGES):
break

time.sleep(5)

return received_messages


def test_write_to_pubsub(setup_and_teardown):
sys.argv = [
davidcavazos marked this conversation as resolved.
Show resolved Hide resolved
'',
'--streaming',
f'--project={project_id}',
f'--topic={topic_id}'
]
write_to_pubsub()

# Read from Pub/Sub to verify the pipeline successfully wrote messages.
# Duplicate reads are possible.
messages = read_messages()
assert (len(messages) >= NUM_MESSAGES)
66 changes: 66 additions & 0 deletions dataflow/snippets/write_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataflow_pubsub_write_with_attributes]
import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions


def item_to_message(item):
attributes = {}
davidcavazos marked this conversation as resolved.
Show resolved Hide resolved
attributes['buyer'] = item['name']
attributes['timestamp'] = str(item['ts'])
data = bytes(item['product'], 'utf-8')

return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv=None):
davidcavazos marked this conversation as resolved.
Show resolved Hide resolved

# Parse the pipeline options passed into the application.
class MyOptions(PipelineOptions):
@classmethod
# Define custom pipeline options that specify the project ID and Pub/Sub
# topic.
def _add_argparse_args(cls, parser):
parser.add_argument("--project", required=True)
parser.add_argument("--topic", required=True)

example_data = [
{'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
{'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
{'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
{'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
]
options = MyOptions()

topic_path = f'projects/{options.project}/topics/{options.topic}'
davidcavazos marked this conversation as resolved.
Show resolved Hide resolved

with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Create elements" >> beam.Create(example_data)
| "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
| WriteToPubSub(topic=topic_path, with_attributes=True)
)

print('Pipeline ran successfully.')
# [END dataflow_pubsub_write_with_attributes]


if __name__ == "__main__":
write_to_pubsub()