Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YAML] Kafka Read Provider #28865

Merged
merged 4 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class KafkaReadSchemaTransformConfiguration {

public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");

public static final String VALID_FORMATS_STR = "AVRO,JSON";
public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON";
public static final Set<String> VALID_DATA_FORMATS =
Sets.newHashSet(VALID_FORMATS_STR.split(","));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
Expand Down Expand Up @@ -112,16 +113,56 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati
consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

String format = configuration.getFormat();

if (format != null && format.equals("RAW")) {
if (inputSchema != null) {
throw new IllegalArgumentException(
"To read from Kafka in RAW format, you can't provide a schema.");
}
Schema rawSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
SerializableFunction<byte[], Row> valueMapper = getRawBytesToRowFunction(rawSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wanting to wrap my head around why we'd have bytes with row. Row tends to imply schemas.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before:

- raw: Produces records with a single `payload` field whose contents

It does have an schema is just that this schema only has one attribute which is payload and then the values are raw bytes.

I do agree that this feature might not see widespread use. Ideally, users should utilize schemas for data processing. However, there could be scenarios where this feature comes in handy. For instance, imagine you have events in a format like a,b,c in Kafka, resembling CSV data. In such cases, you might receive raw bytes in the payload, and it becomes your responsibility to parse them downstream in the correct way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Ideally users should be publishing schema'd data, but technically Kafka is just a plumber of bytes and doesn't otherwise impose restrictions. This allows users to at least get at the data and do what they want with it. (Also, perhaps they're using a schema we don't support yet, like protobuf or Cap'n Proto or something.)

return new SchemaTransform() {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
KafkaIO.Read<byte[], byte[]> kafkaRead =
KafkaIO.readBytes()
.withConsumerConfigUpdates(consumerConfigs)
.withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores())
.withTopic(configuration.getTopic())
.withBootstrapServers(configuration.getBootstrapServers());
if (isTest) {
kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
}

PCollection<byte[]> kafkaValues =
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());

PCollectionTuple outputTuple =
kafkaValues.apply(
ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));

return PCollectionRowTuple.of(
"output",
outputTuple.get(OUTPUT_TAG).setRowSchema(rawSchema),
"errors",
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
}
};
}

if (inputSchema != null && !inputSchema.isEmpty()) {
assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
: "To read from Kafka, a schema must be provided directly or though Confluent "
+ "Schema Registry, but not both.";
Comment on lines 156 to 158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, getConfluentSchemaRegistry ... So we might just use that more generically, OR we might look [ not now ] to refactor that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, that was my idea here. Again, it's a bit uncertain to me if this isn't used by someone somewhere, so I think we still want to support that functionality. That said, though, it makes total sense to include other schema registry technologies as we keep refactoring this class.

Copy link
Contributor

@brucearctor brucearctor Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rule of thumb: If there isn't a test then it isn't important or nobody is using. So, lack of a test tells me effectively that nobody is using [ otherwise, if they found it important they would have written a test ].

Please keep this in mind, as it also helps reinforce the importance of writing tests. We need tests in place to ensure others do not break things we intend to use.

Naturally, we need to be careful, but tests help us with that.


final Schema beamSchema =
Objects.equals(configuration.getFormat(), "JSON")
Objects.equals(format, "JSON")
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
SerializableFunction<byte[], Row> valueMapper =
Objects.equals(configuration.getFormat(), "JSON")
Objects.equals(format, "JSON")
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
return new SchemaTransform() {
Expand Down Expand Up @@ -193,6 +234,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

public static SerializableFunction<byte[], Row> getRawBytesToRowFunction(Schema rawSchema) {
return new SimpleFunction<byte[], Row>() {
@Override
public Row apply(byte[] input) {
return Row.withSchema(rawSchema).addValue(input).build();
}
};
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:kafka_read:v1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,22 @@ public void testBuildTransformWithJsonSchema() throws IOException {
StandardCharsets.UTF_8))
.build());
}

@Test
public void testBuildTransformWithRawFormat() throws IOException {
ServiceLoader<SchemaTransformProvider> serviceLoader =
ServiceLoader.load(SchemaTransformProvider.class);
List<SchemaTransformProvider> providers =
StreamSupport.stream(serviceLoader.spliterator(), false)
.filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class)
.collect(Collectors.toList());
KafkaReadSchemaTransformProvider kafkaProvider =
(KafkaReadSchemaTransformProvider) providers.get(0);
kafkaProvider.from(
KafkaReadSchemaTransformConfiguration.builder()
.setTopic("anytopic")
.setBootstrapServers("anybootstrap")
.setFormat("RAW")
.build());
}
}
21 changes: 21 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'

- type: renaming
transforms:
'ReadFromKafka': 'ReadFromKafka'
config:
mappings:
'ReadFromKafka':
'schema': 'schema'
'consumer_config': 'consumerConfigUpdates'
'format': 'format'
'topic': 'topic'
'bootstrap_servers': 'bootstrapServers'
'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject'
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the naming of confluent_schema_registry_... ? what if using a different schema registry, like redpanda's?

'auto_offset_reset_config': 'autoOffsetResetConfig'
underlying_provider:
type: beamJar
transforms:
'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1'
config:
gradle_target: 'sdks:java:io:expansion-service:shadowJar'

- type: python
transforms:
'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'
Expand Down
Loading