diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 876ef9a49e8a..1dbe37791c05 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PCollectionTuple; @@ -60,7 +61,7 @@ public class KafkaWriteSchemaTransformProvider extends TypedSchemaTransformProvider< KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransformConfiguration> { - public static final String SUPPORTED_FORMATS_STR = "JSON,AVRO"; + public static final String SUPPORTED_FORMATS_STR = "RAW,JSON,AVRO"; public static final Set SUPPORTED_FORMATS = Sets.newHashSet(SUPPORTED_FORMATS_STR.split(",")); public static final TupleTag ERROR_TAG = new TupleTag() {}; @@ -131,10 +132,18 @@ public void finish() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { Schema inputSchema = input.get("input").getSchema(); - final SerializableFunction toBytesFn = - configuration.getFormat().equals("JSON") - ? JsonUtils.getRowToJsonBytesFunction(inputSchema) - : AvroUtils.getRowToAvroBytesFunction(inputSchema); + final SerializableFunction toBytesFn; + if (configuration.getFormat().equals("RAW")) { + int numFields = inputSchema.getFields().size(); + if (numFields != 1) { + throw new IllegalArgumentException("Expecting exactly one field, found " + numFields); + } + toBytesFn = getRowToRawBytesFunction(inputSchema.getField(0).getName()); + } else if (configuration.getFormat().equals("JSON")) { + toBytesFn = JsonUtils.getRowToJsonBytesFunction(inputSchema); + } else { + toBytesFn = AvroUtils.getRowToAvroBytesFunction(inputSchema); + } final Map configOverrides = configuration.getProducerConfigUpdates(); PCollectionTuple outputTuple = @@ -163,6 +172,19 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + public static SerializableFunction getRowToRawBytesFunction(String rowFieldName) { + return new SimpleFunction() { + @Override + public byte[] apply(Row input) { + byte[] rawBytes = input.getBytes(rowFieldName); + if (rawBytes == null) { + throw new NullPointerException(); + } + return rawBytes; + } + }; + } + @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { return "beam:schematransform:org.apache.beam:kafka_write:v1"; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java index 8d01ebe8233c..3a7769f42d72 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProviderTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.getRowToRawBytesFunction; + +import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.io.kafka.KafkaWriteSchemaTransformProvider.KafkaWriteSchemaTransform.ErrorCounterFn; @@ -47,6 +50,9 @@ public class KafkaWriteSchemaTransformProviderTest { private static final Schema BEAMSCHEMA = Schema.of(Schema.Field.of("name", Schema.FieldType.STRING)); + + private static final Schema BEAMRAWSCHEMA = + Schema.of(Schema.Field.of("payload", Schema.FieldType.BYTES)); private static final Schema ERRORSCHEMA = KafkaWriteSchemaTransformProvider.ERROR_SCHEMA; private static final List ROWS = @@ -55,9 +61,27 @@ public class KafkaWriteSchemaTransformProviderTest { Row.withSchema(BEAMSCHEMA).withFieldValue("name", "b").build(), Row.withSchema(BEAMSCHEMA).withFieldValue("name", "c").build()); + private static final List RAW_ROWS; + + static { + try { + RAW_ROWS = + Arrays.asList( + Row.withSchema(BEAMRAWSCHEMA).withFieldValue("payload", "a".getBytes("UTF8")).build(), + Row.withSchema(BEAMRAWSCHEMA).withFieldValue("payload", "b".getBytes("UTF8")).build(), + Row.withSchema(BEAMRAWSCHEMA) + .withFieldValue("payload", "c".getBytes("UTF8")) + .build()); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + final SerializableFunction valueMapper = JsonUtils.getRowToJsonBytesFunction(BEAMSCHEMA); + final SerializableFunction valueRawMapper = getRowToRawBytesFunction("payload"); + @Rule public transient TestPipeline p = TestPipeline.create(); @Test @@ -79,4 +103,24 @@ public void testKafkaErrorFnSuccess() throws Exception { PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(msg); p.run().waitUntilFinish(); } + + @Test + public void testKafkaErrorFnRawSuccess() throws Exception { + List> msg = + Arrays.asList( + KV.of(new byte[1], "a".getBytes("UTF8")), + KV.of(new byte[1], "b".getBytes("UTF8")), + KV.of(new byte[1], "c".getBytes("UTF8"))); + + PCollection input = p.apply(Create.of(RAW_ROWS)); + PCollectionTuple output = + input.apply( + ParDo.of(new ErrorCounterFn("Kafka-write-error-counter", valueRawMapper)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + output.get(ERROR_TAG).setRowSchema(ERRORSCHEMA); + + PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(msg); + p.run().waitUntilFinish(); + } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 6499ffc7e55a..b8fe0660b127 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -49,6 +49,7 @@ - type: renaming transforms: 'ReadFromKafka': 'ReadFromKafka' + 'WriteToKafka': 'WriteToKafka' config: mappings: 'ReadFromKafka': @@ -60,10 +61,16 @@ 'confluent_schema_registry_url': 'confluentSchemaRegistryUrl' 'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject' 'auto_offset_reset_config': 'autoOffsetResetConfig' + 'WriteToKafka': + 'format': 'format' + 'topic': 'topic' + 'bootstrap_servers': 'bootstrapServers' + 'producer_config_updates': 'ProducerConfigUpdates' underlying_provider: type: beamJar transforms: 'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1' + 'WriteToKafka': 'beam:schematransform:org.apache.beam:kafka_write:v1' config: gradle_target: 'sdks:java:io:expansion-service:shadowJar'