diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java index f7e091580680..299a4f222e3a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java @@ -39,7 +39,7 @@ public abstract class KafkaReadSchemaTransformConfiguration { public static final Set VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest"); - public static final String VALID_FORMATS_STR = "AVRO,JSON"; + public static final String VALID_FORMATS_STR = "RAW,AVRO,JSON"; public static final Set VALID_DATA_FORMATS = Sets.newHashSet(VALID_FORMATS_STR.split(",")); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index 0c091bf9ba84..fcba5c355df1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -112,16 +113,56 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati consumerConfigs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + String format = configuration.getFormat(); + + if (format != null && format.equals("RAW")) { + if (inputSchema != null) { + throw new IllegalArgumentException( + "To read from Kafka in RAW format, you can't provide a schema."); + } + Schema rawSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build(); + SerializableFunction valueMapper = getRawBytesToRowFunction(rawSchema); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + KafkaIO.Read kafkaRead = + KafkaIO.readBytes() + .withConsumerConfigUpdates(consumerConfigs) + .withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores()) + .withTopic(configuration.getTopic()) + .withBootstrapServers(configuration.getBootstrapServers()); + if (isTest) { + kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs)); + } + + PCollection kafkaValues = + input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create()); + + PCollectionTuple outputTuple = + kafkaValues.apply( + ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + return PCollectionRowTuple.of( + "output", + outputTuple.get(OUTPUT_TAG).setRowSchema(rawSchema), + "errors", + outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA)); + } + }; + } + if (inputSchema != null && !inputSchema.isEmpty()) { assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl()) : "To read from Kafka, a schema must be provided directly or though Confluent " + "Schema Registry, but not both."; + final Schema beamSchema = - Objects.equals(configuration.getFormat(), "JSON") + Objects.equals(format, "JSON") ? JsonUtils.beamSchemaFromJsonSchema(inputSchema) : AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema)); SerializableFunction valueMapper = - Objects.equals(configuration.getFormat(), "JSON") + Objects.equals(format, "JSON") ? JsonUtils.getJsonBytesToRowFunction(beamSchema) : AvroUtils.getAvroBytesToRowFunction(beamSchema); return new SchemaTransform() { @@ -193,6 +234,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } + public static SerializableFunction getRawBytesToRowFunction(Schema rawSchema) { + return new SimpleFunction() { + @Override + public Row apply(byte[] input) { + return Row.withSchema(rawSchema).addValue(input).build(); + } + }; + } + @Override public String identifier() { return "beam:schematransform:org.apache.beam:kafka_read:v1"; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java index 6b9dde4dc952..367f0f7a535e 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java @@ -147,4 +147,22 @@ public void testBuildTransformWithJsonSchema() throws IOException { StandardCharsets.UTF_8)) .build()); } + + @Test + public void testBuildTransformWithRawFormat() throws IOException { + ServiceLoader serviceLoader = + ServiceLoader.load(SchemaTransformProvider.class); + List providers = + StreamSupport.stream(serviceLoader.spliterator(), false) + .filter(provider -> provider.getClass() == KafkaReadSchemaTransformProvider.class) + .collect(Collectors.toList()); + KafkaReadSchemaTransformProvider kafkaProvider = + (KafkaReadSchemaTransformProvider) providers.get(0); + kafkaProvider.from( + KafkaReadSchemaTransformConfiguration.builder() + .setTopic("anytopic") + .setBootstrapServers("anybootstrap") + .setFormat("RAW") + .build()); + } } diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 9ad4f53ba1f6..a80cca86fe41 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -46,6 +46,27 @@ config: gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' +- type: renaming + transforms: + 'ReadFromKafka': 'ReadFromKafka' + config: + mappings: + 'ReadFromKafka': + 'schema': 'schema' + 'consumer_config': 'consumerConfigUpdates' + 'format': 'format' + 'topic': 'topic' + 'bootstrap_servers': 'bootstrapServers' + 'confluent_schema_registry_url': 'confluentSchemaRegistryUrl' + 'confluent_schema_registry_subject': 'confluentSchemaRegistrySubject' + 'auto_offset_reset_config': 'autoOffsetResetConfig' + underlying_provider: + type: beamJar + transforms: + 'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' + - type: python transforms: 'ReadFromBigQuery': 'apache_beam.yaml.yaml_io.read_from_bigquery'