Skip to content

Commit

Permalink
change python code block
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehyeon-kim authored and acrites committed Jul 17, 2024
1 parent e1467f0 commit 2bd5ed6
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ The application first reads text messages from an input Kafka topic. Next, it ex

We create a custom *Java IO Expansion Service* (`get_expansion_service`) and add it to the `ReadFromKafka` and `WriteToKafka` transforms of the Kafka Connector I/O. Although the Kafka I/O provides a function to create that service, it did not work for me (or I do not understand how to make use of it yet). Instead, I created a custom service, as illustrated in [Building Big Data Pipelines with Apache Beam by Jan Lukavský](https://www.packtpub.com/product/building-big-data-pipelines-with-apache-beam/9781800564930). The expansion service Jar file (`beam-sdks-java-io-expansion-service.jar`) must exist in the Kubernetes [*job*](https://kubernetes.io/docs/concepts/workloads/controllers/job/) that executes the pipeline, while the Java SDK (`/opt/apache/beam/boot`) must exist in the runner worker.

{{< highlight py >}}
```python
# beam/word_len/word_len.py
import json
import argparse
Expand Down Expand Up @@ -500,16 +500,16 @@ def run(argv=None, save_main_session=True):
if __name__ == "__main__":
run()
{{< /highlight >}}
```

The pipeline script is added to a Python package under a folder named `word_len`. A simple module named `run` is created, because it is executed as a module, for example, `python -m ...`. When I ran the pipeline as a script, I encountered an error. This packaging method is for demonstration only. For a recommended way of packaging a pipeline, see [Managing Python Pipeline Dependencies](https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/).

{{< highlight py >}}
```python
# beam/word_len/run.py
from . import *
run()
{{< /highlight >}}
```

Overall, the pipeline package uses the following structure.

Expand Down Expand Up @@ -759,7 +759,7 @@ alt="Kafka Input Topic">

A simple Python Kafka producer is created to check the output of the application. By default, the producer app sends random text from the [Faker](https://faker.readthedocs.io/en/master/) package to the input Kafka topic every one second.

{{< highlight py >}}
```python
# kafka/client/producer.py
import os
import time
Expand Down Expand Up @@ -812,7 +812,7 @@ if __name__ == "__main__":
if num_events % 5 == 0:
print(f"<<<<<{num_events} text sent... current>>>>\n{text}")
time.sleep(int(os.getenv("DELAY_SECONDS", "1")))
{{< /highlight >}}
```

Expose the Kafka bootstrap server on port 29092 using the `kubectl port-forward` command. Execute the Python script to start the producer app.

Expand Down

0 comments on commit 2bd5ed6

Please sign in to comment.