-
Notifications
You must be signed in to change notification settings - Fork 4.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* prompts added * fix whitespace * Apply suggestions from code review Co-authored-by: Daria Bezkorovaina <[email protected]> --------- Co-authored-by: Daria Bezkorovaina <[email protected]>
- Loading branch information
1 parent
5ae3751
commit 65f4ad9
Showing
6 changed files
with
775 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
Prompt: | ||
What does this code do? | ||
|
||
```java | ||
package kafka; | ||
|
||
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; | ||
import org.apache.beam.sdk.Pipeline; | ||
import org.apache.beam.sdk.io.kafka.KafkaIO; | ||
import org.apache.beam.sdk.io.kafka.KafkaRecord; | ||
import org.apache.beam.sdk.options.Description; | ||
import org.apache.beam.sdk.options.PipelineOptionsFactory; | ||
import org.apache.beam.sdk.transforms.DoFn; | ||
import org.apache.beam.sdk.transforms.ParDo; | ||
import org.apache.beam.sdk.values.KV; | ||
import org.apache.kafka.common.serialization.IntegerDeserializer; | ||
import org.apache.kafka.common.serialization.StringDeserializer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ReadKafkaTopic { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ReadKafkaTopic.class); | ||
|
||
public interface ReadKafkaTopicOptions extends DataflowPipelineOptions { | ||
@Description("Kafka Bootstrap Server") | ||
String getBootstrapServer(); | ||
|
||
void setBootstrapServer(String value); | ||
|
||
@Description("Kafka Topic Name") | ||
String getTopicName(); | ||
|
||
void setTopicName(String value); | ||
} | ||
|
||
public static void main(String[] args) { | ||
ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class); | ||
|
||
Pipeline p = Pipeline.create(options); | ||
|
||
p | ||
.apply(KafkaIO.<Integer, String>read() | ||
.withBootstrapServers(options.getBootstrapServer()) | ||
.withTopic(options.getTopicName()) | ||
.withKeyDeserializer(IntegerDeserializer.class) | ||
.withValueDeserializer(StringDeserializer.class) | ||
) | ||
.apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() { | ||
@ProcessElement | ||
public void processElement(ProcessContext c) { | ||
KV<Integer, String> kv = c.element().getKV(); | ||
String message = String.format("Message Key: %s, Message Value: %s", | ||
kv.getKey(), kv.getValue()); | ||
LOG.info(message); | ||
c.output(message); | ||
} | ||
}) | ||
); | ||
|
||
p.run(); | ||
} | ||
} | ||
``` | ||
|
||
Response: | ||
This code snippet uses the Apache Beam KafkaIO connector to read messages from a Kafka topic. | ||
|
||
The `ReadKafkaTopicOptions` interface specifies the Kafka bootstrap server and topic name. The `@Description` annotation provides a description of each option. When running the pipeline, use the command-line arguments `--bootstrapServer` and `--topicName` to set these parameters. | ||
|
||
```java | ||
public interface ReadKafkaTopicOptions extends DataflowPipelineOptions { | ||
@Description("Kafka Bootstrap Server") | ||
String getBootstrapServer(); | ||
|
||
void setBootstrapServer(String value); | ||
|
||
@Description("Kafka Topic Name") | ||
String getTopicName(); | ||
|
||
void setTopicName(String value); | ||
} | ||
``` | ||
In the `main` method, the `PipelineOptionsFactory` class creates a `ReadKafkaTopicOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. | ||
|
||
```java | ||
ReadKafkaTopicOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ReadKafkaTopicOptions.class); | ||
|
||
Pipeline p = Pipeline.create(options); | ||
``` | ||
|
||
The pipeline reads messages from the specified Kafka topic using `KafkaIO.read()`. This method configures the Kafka source by providing the bootstrap server, topic name, key deserializer, and value deserializer. Then, the pipeline applies a `ParDo` transform to log the Kafka message key and value. | ||
|
||
```java | ||
.apply(KafkaIO.<Integer, String>read() | ||
.withBootstrapServers(options.getBootstrapServer()) | ||
.withTopic(options.getTopicName()) | ||
.withKeyDeserializer(IntegerDeserializer.class) | ||
.withValueDeserializer(StringDeserializer.class) | ||
) | ||
.apply("Log message", ParDo.of(new DoFn<KafkaRecord<Integer, String>, String>() { | ||
@ProcessElement | ||
public void processElement(ProcessContext c) { | ||
KV<Integer, String> kv = c.element().getKV(); | ||
String message = String.format("Message Key: %s, Message Value: %s", | ||
kv.getKey(), kv.getValue()); | ||
LOG.info(message); | ||
c.output(message); | ||
} | ||
}) | ||
); | ||
``` | ||
|
||
Finally, the `run` method executes the pipeline. | ||
|
||
```java | ||
p.run(); | ||
``` |
116 changes: 116 additions & 0 deletions
116
learning/prompts/code-explanation/java/06_io_tfrecord.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
Prompt: | ||
What does this code do? | ||
|
||
```java | ||
package tfrecord; | ||
|
||
import org.apache.beam.sdk.Pipeline; | ||
import org.apache.beam.sdk.io.Compression; | ||
import org.apache.beam.sdk.io.TFRecordIO; | ||
import org.apache.beam.sdk.options.Description; | ||
import org.apache.beam.sdk.options.PipelineOptions; | ||
import org.apache.beam.sdk.options.PipelineOptionsFactory; | ||
import org.apache.beam.sdk.options.Validation; | ||
import org.apache.beam.sdk.transforms.DoFn; | ||
import org.apache.beam.sdk.transforms.ParDo; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ReadTFRecordFile { | ||
|
||
private static final Logger LOG = | ||
LoggerFactory.getLogger(ReadTFRecordFile.class); | ||
|
||
public interface ReadTFRecordFileOptions extends PipelineOptions { | ||
|
||
@Description("A glob file pattern to read TFRecords from") | ||
@Validation.Required | ||
String getFilePattern(); | ||
|
||
void setFilePattern(String filePattern); | ||
} | ||
|
||
public static void main(String[] args) { | ||
ReadTFRecordFileOptions options = | ||
PipelineOptionsFactory.fromArgs(args) | ||
.withValidation().as(ReadTFRecordFileOptions.class); | ||
|
||
Pipeline p = Pipeline.create(options); | ||
|
||
p.apply( | ||
"Read from TFRecord file", | ||
TFRecordIO.read() | ||
.from(options.getFilePattern()) | ||
.withCompression(Compression.UNCOMPRESSED)) | ||
.apply( | ||
"Convert and log", | ||
ParDo.of( | ||
new DoFn<byte[], String>() { | ||
@DoFn.ProcessElement | ||
public void processElement(ProcessContext c) { | ||
String output = | ||
new String(c.element(), Charsets.UTF_8); | ||
LOG.info("Output: {}", output); | ||
c.output(output); | ||
} | ||
})); | ||
|
||
p.run(); | ||
} | ||
} | ||
``` | ||
|
||
Response: | ||
This code snippet uses the Apache Beam TFRecordIO connector to read messages from TFRecord files. | ||
|
||
The `ReadTFRecordFileOptions` interface defines the options that can be set when running the pipeline. In this case, it specifies the glob file pattern to read TFRecords from. Use the `--filePattern` command line argument to set the file pattern when running the pipeline. The `@Description` annotation provides a description of the option, and the `@Validation.Required` annotation ensures that the option is required. | ||
|
||
```java | ||
public interface ReadTFRecordFileOptions extends PipelineOptions { | ||
|
||
@Description("A glob file pattern to read TFRecords from") | ||
@Validation.Required | ||
String getFilePattern(); | ||
|
||
void setFilePattern(String filePattern); | ||
} | ||
``` | ||
|
||
In the `main` method, the `PipelineOptionsFactory` class creates a `ReadTFRecordFileOptions` object from the command-line arguments. The `Pipeline.create` method creates a new pipeline with the specified options. | ||
|
||
```java | ||
ReadTFRecordFileOptions options = | ||
PipelineOptionsFactory.fromArgs(args) | ||
.withValidation().as(ReadTFRecordFileOptions.class); | ||
|
||
Pipeline p = Pipeline.create(options); | ||
``` | ||
|
||
The `TFRecordIO.read` method configures the TFRecord input operations. It specifies the file pattern for TFRecord files using the `from` method and defines the compression type as `UNCOMPRESSED` using the `withCompression` method. `UNCOMPRESSED` is the default compression type that ensures the TFRecords are read without compression. The `apply` method then applies the transformation to the pipeline. | ||
|
||
```java | ||
p.apply( | ||
"Read from TFRecord file", | ||
TFRecordIO.read() | ||
.from(options.getFilePattern()) | ||
.withCompression(Compression.UNCOMPRESSED)) | ||
.apply( | ||
"Convert and log", | ||
ParDo.of( | ||
new DoFn<byte[], String>() { | ||
@DoFn.ProcessElement | ||
public void processElement(ProcessContext c) { | ||
String output = | ||
new String(c.element(), Charsets.UTF_8); | ||
LOG.info("Output: {}", output); | ||
c.output(output); | ||
} | ||
})); | ||
``` | ||
|
||
Finally, the `run` method executes the pipeline. | ||
|
||
```java | ||
p.run(); | ||
``` |
Oops, something went wrong.