From 67a39a50df3f677d9d1e8c8519f80b222c1101fb Mon Sep 17 00:00:00 2001 From: "gabry.wu" Date: Fri, 20 Oct 2023 05:56:41 +0800 Subject: [PATCH] make BeamKafkaTable.createKafkaRead to be protected, and we can override it appending options to KafkaIO.Read (#29051) --- .../sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java | 2 +- .../sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java index f1ec20831a4c..ab1817f6d75c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java @@ -110,7 +110,7 @@ public PCollection buildIOReader(PBegin begin) { .setRowSchema(getSchema()); } - KafkaIO.Read createKafkaRead() { + protected KafkaIO.Read createKafkaRead() { KafkaIO.Read kafkaRead; if (topics != null) { kafkaRead = diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java index 44b4dbe21aca..158b0345bd8b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java @@ -61,7 +61,7 @@ public KafkaTestTable(Schema beamSchema, List topics, int partitionsPerT } @Override - KafkaIO.Read createKafkaRead() { + protected KafkaIO.Read createKafkaRead() { return super.createKafkaRead().withConsumerFactoryFn(this::mkMockConsumer); }