From 2bd5ed6519a31e6018f67e1423b4c2b8e630b20d Mon Sep 17 00:00:00 2001 From: Jaehyeon Kim Date: Tue, 25 Jun 2024 08:28:39 +1000 Subject: [PATCH] change python code block --- .../blog/deploy-python-pipeline-on-flink-runner.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/website/www/site/content/en/blog/deploy-python-pipeline-on-flink-runner.md b/website/www/site/content/en/blog/deploy-python-pipeline-on-flink-runner.md index 392581cc22c1..36deaaacce94 100644 --- a/website/www/site/content/en/blog/deploy-python-pipeline-on-flink-runner.md +++ b/website/www/site/content/en/blog/deploy-python-pipeline-on-flink-runner.md @@ -270,7 +270,7 @@ The application first reads text messages from an input Kafka topic. Next, it ex We create a custom *Java IO Expansion Service* (`get_expansion_service`) and add it to the `ReadFromKafka` and `WriteToKafka` transforms of the Kafka Connector I/O. Although the Kafka I/O provides a function to create that service, it did not work for me (or I do not understand how to make use of it yet). Instead, I created a custom service, as illustrated in [Building Big Data Pipelines with Apache Beam by Jan Lukavský](https://www.packtpub.com/product/building-big-data-pipelines-with-apache-beam/9781800564930). The expansion service Jar file (`beam-sdks-java-io-expansion-service.jar`) must exist in the Kubernetes [*job*](https://kubernetes.io/docs/concepts/workloads/controllers/job/) that executes the pipeline, while the Java SDK (`/opt/apache/beam/boot`) must exist in the runner worker. -{{< highlight py >}} +```python # beam/word_len/word_len.py import json import argparse @@ -500,16 +500,16 @@ def run(argv=None, save_main_session=True): if __name__ == "__main__": run() -{{< /highlight >}} +``` The pipeline script is added to a Python package under a folder named `word_len`. A simple module named `run` is created, because it is executed as a module, for example, `python -m ...`. When I ran the pipeline as a script, I encountered an error. This packaging method is for demonstration only. For a recommended way of packaging a pipeline, see [Managing Python Pipeline Dependencies](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/). -{{< highlight py >}} +```python # beam/word_len/run.py from . import * run() -{{< /highlight >}} +``` Overall, the pipeline package uses the following structure. @@ -759,7 +759,7 @@ alt="Kafka Input Topic"> A simple Python Kafka producer is created to check the output of the application. By default, the producer app sends random text from the [Faker](https://faker.readthedocs.io/en/master/) package to the input Kafka topic every one second. -{{< highlight py >}} +```python # kafka/client/producer.py import os import time @@ -812,7 +812,7 @@ if __name__ == "__main__": if num_events % 5 == 0: print(f"<<<<<{num_events} text sent... current>>>>\n{text}") time.sleep(int(os.getenv("DELAY_SECONDS", "1"))) -{{< /highlight >}} +``` Expose the Kafka bootstrap server on port 29092 using the `kubectl port-forward` command. Execute the Python script to start the producer app.