From 9f3a291563260786965982f39233f45aa9c028d0 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Thu, 23 Jun 2022 11:33:47 -0700 Subject: [PATCH 1/2] pulsar connector accepts auth params --- .../apache/beam/sdk/io/pulsar/PulsarIO.java | 36 ++++++++++++++++++- .../sdk/io/pulsar/PulsarSourceDescriptor.java | 15 ++++++-- .../sdk/io/pulsar/ReadFromPulsarDoFn.java | 11 +++++- .../beam/sdk/io/pulsar/WriteToPulsarDoFn.java | 9 ++++- .../sdk/io/pulsar/ReadFromPulsarDoFnTest.java | 14 ++++---- 5 files changed, 73 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java index 6d0f0a08a019..aab602eaf3a9 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java @@ -51,6 +51,10 @@ public abstract static class Read extends PTransform expand(PBegin input) { getEndTimestamp(), getEndMessageId(), getClientUrl(), - getAdminUrl()))) + getAdminUrl(), + getAuthPluginClassName(), + getAuthParameters()))) .apply(ParDo.of(new ReadFromPulsarDoFn(this))) .setCoder(PulsarMessageCoder.of()); } @@ -156,6 +174,10 @@ public abstract static class Write extends PTransform, PDone abstract String getClientUrl(); + abstract @Nullable String getAuthPluginClassName(); + + abstract @Nullable String getAuthParameters(); + abstract Builder builder(); @AutoValue.Builder @@ -164,6 +186,10 @@ abstract static class Builder { abstract Builder setClientUrl(String clientUrl); + abstract Read.Builder setAuthPluginClassName(String authPluginClassName); + + abstract Read.Builder setAuthParameters(String authParameters); + abstract Write build(); } @@ -175,6 +201,14 @@ public Write withClientUrl(String clientUrl) { return builder().setClientUrl(clientUrl).build(); } + public Read withAuthPluginClassName(String pluginClassName) { + return builder().setAuthPluginClassName(pluginClassName).build(); + } + + public Read withAuthParameters(String parameters) { + return builder().setAuthParameters(parameters).build(); + } + @Override public PDone expand(PCollection input) { input.apply(ParDo.of(new WriteToPulsarDoFn(this))); diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java index 427d37d1d72a..36d9fbc53846 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java @@ -50,14 +50,25 @@ public abstract class PulsarSourceDescriptor implements Serializable { @SchemaFieldName("admin_url") abstract String getAdminUrl(); + @SchemaFieldName("authPluginClassName") + @Nullable + abstract String getAuthPluginClassName(); + + @SchemaFieldName("authParams") + @Nullable + abstract String getAuthParams(); + public static PulsarSourceDescriptor of( String topic, Long startOffsetTimestamp, Long endOffsetTimestamp, MessageId endMessageId, String clientUrl, - String adminUrl) { + String adminUrl, + String authPluginClassName, + String authParams) { return new AutoValue_PulsarSourceDescriptor( - topic, startOffsetTimestamp, endOffsetTimestamp, endMessageId, clientUrl, adminUrl); + topic, startOffsetTimestamp, endOffsetTimestamp, endMessageId, + clientUrl, adminUrl, authPluginClassName, authParams); } } diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index fc881f33e67e..df93105e47d1 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -53,6 +53,8 @@ public class ReadFromPulsarDoFn extends DoFn, Instant> extractOutputTimestampFn; @@ -60,11 +62,14 @@ public ReadFromPulsarDoFn(PulsarIO.Read transform) { this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn(); this.clientUrl = transform.getClientUrl(); this.adminUrl = transform.getAdminUrl(); + this.authPluginClassName = transform.getAuthPluginClassName(); + this.authParameters = transform.getAuthParameters(); this.pulsarClientSerializableFunction = transform.getPulsarClient(); } // Open connection to Pulsar clients @Setup + // TODO add auth related public void initPulsarClients() throws Exception { if (this.clientUrl == null) { this.clientUrl = PulsarIOUtils.SERVICE_URL; @@ -76,7 +81,10 @@ public void initPulsarClients() throws Exception { if (this.client == null) { this.client = pulsarClientSerializableFunction.apply(this.clientUrl); if (this.client == null) { - this.client = PulsarClient.builder().serviceUrl(clientUrl).build(); + this.client = PulsarClient.builder() + .serviceUrl(clientUrl) + .authentication(authPluginClassName, authParameters) + .build(); } } @@ -84,6 +92,7 @@ public void initPulsarClients() throws Exception { this.admin = PulsarAdmin.builder() .serviceHttpUrl(adminUrl) + .authentication(authPluginClassName, authParameters) .tlsTrustCertsFilePath(null) .allowTlsInsecureConnection(false) .build(); diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java index 9659940e02bd..63c5c40a2514 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java @@ -31,15 +31,22 @@ public class WriteToPulsarDoFn extends DoFn { private PulsarClient client; private String clientUrl; private String topic; + private String authPluginClassName; + private String authParameters; WriteToPulsarDoFn(PulsarIO.Write transform) { this.clientUrl = transform.getClientUrl(); this.topic = transform.getTopic(); + this.authPluginClassName = transform.getAuthPluginClassName(); + this.authParameters = transform.getAuthParameters(); } @Setup public void setup() throws PulsarClientException { - client = PulsarClient.builder().serviceUrl(clientUrl).build(); + client = PulsarClient.builder() + .serviceUrl(clientUrl) + .authentication(authPluginClassName, authParameters) + .build(); producer = client.newProducer().topic(topic).compressionType(CompressionType.LZ4).create(); } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index 273a1915d2bb..5b04c2760816 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -77,7 +77,7 @@ public void testInitialRestrictionWhenHasStartOffset() throws Exception { OffsetRange result = dofnInstance.getInitialRestriction( PulsarSourceDescriptor.of( - TOPIC, expectedStartOffset, null, null, SERVICE_URL, ADMIN_URL)); + TOPIC, expectedStartOffset, null, null, SERVICE_URL, ADMIN_URL, null, null)); assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result); } @@ -87,7 +87,7 @@ public void testInitialRestrictionWithConsumerPosition() throws Exception { OffsetRange result = dofnInstance.getInitialRestriction( PulsarSourceDescriptor.of( - TOPIC, expectedStartOffset, null, null, SERVICE_URL, ADMIN_URL)); + TOPIC, expectedStartOffset, null, null, SERVICE_URL, ADMIN_URL, null, null)); assertEquals(new OffsetRange(expectedStartOffset, Long.MAX_VALUE), result); } @@ -97,7 +97,7 @@ public void testInitialRestrictionWithConsumerEndPosition() throws Exception { long endOffset = fakePulsarReader.getEndTimestamp(); OffsetRange result = dofnInstance.getInitialRestriction( - PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL)); + PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL, null, null)); assertEquals(new OffsetRange(startOffset, endOffset), result); } @@ -108,7 +108,7 @@ public void testProcessElement() throws Exception { long endOffset = fakePulsarReader.getEndTimestamp(); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset)); PulsarSourceDescriptor descriptor = - PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL); + PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL, null, null); DoFn.ProcessContinuation result = dofnInstance.processElement(descriptor, tracker, null, (DoFn.OutputReceiver) receiver); int expectedResultWithoutCountingLastOffset = NUMBEROFMESSAGES - 1; @@ -123,7 +123,7 @@ public void testProcessElementWhenEndMessageIdIsDefined() throws Exception { MessageId endMessageId = DefaultImplementation.newMessageId(50L, 50L, 50); DoFn.ProcessContinuation result = dofnInstance.processElement( - PulsarSourceDescriptor.of(TOPIC, null, null, endMessageId, SERVICE_URL, ADMIN_URL), + PulsarSourceDescriptor.of(TOPIC, null, null, endMessageId, SERVICE_URL, ADMIN_URL, null, null), tracker, null, (DoFn.OutputReceiver) receiver); @@ -138,7 +138,7 @@ public void testProcessElementWithEmptyRecords() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)); DoFn.ProcessContinuation result = dofnInstance.processElement( - PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL), + PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL, null, null), tracker, null, (DoFn.OutputReceiver) receiver); @@ -153,7 +153,7 @@ public void testProcessElementWhenHasReachedEndTopic() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(0L, Long.MAX_VALUE)); DoFn.ProcessContinuation result = dofnInstance.processElement( - PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL), + PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL, null, null), tracker, null, (DoFn.OutputReceiver) receiver); From 9367986c1bb1b00335b7d465b18641cadc3fc987 Mon Sep 17 00:00:00 2001 From: Neng Lu Date: Mon, 18 Jul 2022 16:03:38 -0700 Subject: [PATCH 2/2] format and type fix --- .../java/org/apache/beam/sdk/io/pulsar/PulsarIO.java | 8 ++++---- .../beam/sdk/io/pulsar/PulsarSourceDescriptor.java | 10 ++++++++-- .../apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 3 ++- .../apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java | 3 ++- .../beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java | 9 ++++++--- 5 files changed, 22 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java index aab602eaf3a9..f5594389cf0a 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarIO.java @@ -186,9 +186,9 @@ abstract static class Builder { abstract Builder setClientUrl(String clientUrl); - abstract Read.Builder setAuthPluginClassName(String authPluginClassName); + abstract Builder setAuthPluginClassName(String authPluginClassName); - abstract Read.Builder setAuthParameters(String authParameters); + abstract Builder setAuthParameters(String authParameters); abstract Write build(); } @@ -201,11 +201,11 @@ public Write withClientUrl(String clientUrl) { return builder().setClientUrl(clientUrl).build(); } - public Read withAuthPluginClassName(String pluginClassName) { + public Write withAuthPluginClassName(String pluginClassName) { return builder().setAuthPluginClassName(pluginClassName).build(); } - public Read withAuthParameters(String parameters) { + public Write withAuthParameters(String parameters) { return builder().setAuthParameters(parameters).build(); } diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java index 36d9fbc53846..0006720e5166 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/PulsarSourceDescriptor.java @@ -68,7 +68,13 @@ public static PulsarSourceDescriptor of( String authPluginClassName, String authParams) { return new AutoValue_PulsarSourceDescriptor( - topic, startOffsetTimestamp, endOffsetTimestamp, endMessageId, - clientUrl, adminUrl, authPluginClassName, authParams); + topic, + startOffsetTimestamp, + endOffsetTimestamp, + endMessageId, + clientUrl, + adminUrl, + authPluginClassName, + authParams); } } diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index df93105e47d1..7346fd4da021 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -81,7 +81,8 @@ public void initPulsarClients() throws Exception { if (this.client == null) { this.client = pulsarClientSerializableFunction.apply(this.clientUrl); if (this.client == null) { - this.client = PulsarClient.builder() + this.client = + PulsarClient.builder() .serviceUrl(clientUrl) .authentication(authPluginClassName, authParameters) .build(); diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java index 63c5c40a2514..37beb1f7f7d4 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/WriteToPulsarDoFn.java @@ -43,7 +43,8 @@ public class WriteToPulsarDoFn extends DoFn { @Setup public void setup() throws PulsarClientException { - client = PulsarClient.builder() + client = + PulsarClient.builder() .serviceUrl(clientUrl) .authentication(authPluginClassName, authParameters) .build(); diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index 5b04c2760816..8d8323cae2e3 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -97,7 +97,8 @@ public void testInitialRestrictionWithConsumerEndPosition() throws Exception { long endOffset = fakePulsarReader.getEndTimestamp(); OffsetRange result = dofnInstance.getInitialRestriction( - PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL, null, null)); + PulsarSourceDescriptor.of( + TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL, null, null)); assertEquals(new OffsetRange(startOffset, endOffset), result); } @@ -108,7 +109,8 @@ public void testProcessElement() throws Exception { long endOffset = fakePulsarReader.getEndTimestamp(); OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset)); PulsarSourceDescriptor descriptor = - PulsarSourceDescriptor.of(TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL, null, null); + PulsarSourceDescriptor.of( + TOPIC, startOffset, endOffset, null, SERVICE_URL, ADMIN_URL, null, null); DoFn.ProcessContinuation result = dofnInstance.processElement(descriptor, tracker, null, (DoFn.OutputReceiver) receiver); int expectedResultWithoutCountingLastOffset = NUMBEROFMESSAGES - 1; @@ -123,7 +125,8 @@ public void testProcessElementWhenEndMessageIdIsDefined() throws Exception { MessageId endMessageId = DefaultImplementation.newMessageId(50L, 50L, 50); DoFn.ProcessContinuation result = dofnInstance.processElement( - PulsarSourceDescriptor.of(TOPIC, null, null, endMessageId, SERVICE_URL, ADMIN_URL, null, null), + PulsarSourceDescriptor.of( + TOPIC, null, null, endMessageId, SERVICE_URL, ADMIN_URL, null, null), tracker, null, (DoFn.OutputReceiver) receiver);