From 4a6d9d7d26fa141f5545100748c4d8fe99d3be5b Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Tue, 26 Apr 2022 15:49:00 -0500 Subject: [PATCH 01/14] remove one client usage for all the tests to avoid race conditions --- .../beam/sdk/io/pulsar/PulsarIOTest.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java index eeb6a5d7652c..61649fbcc639 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java @@ -59,7 +59,6 @@ public class PulsarIOTest { private static final String TOPIC = "PULSAR_IO_TEST"; protected static PulsarContainer pulsarContainer; - protected static PulsarClient client; private long endExpectedTime = 0; private long startTime = 0; @@ -69,9 +68,7 @@ public class PulsarIOTest { @Rule public final transient TestPipeline testPipeline = TestPipeline.create(); public List> receiveMessages() throws PulsarClientException { - if (client == null) { - initClient(); - } + PulsarClient client = initClient(); List> messages = new ArrayList<>(); Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("receiveMockMessageFn").subscribe(); @@ -84,11 +81,13 @@ public List> receiveMessages() throws PulsarClientException { consumer.negativeAcknowledge(msg); } } + consumer.close(); + client.close(); return messages; } public List produceMessages() throws PulsarClientException { - client = initClient(); + PulsarClient client = initClient(); Producer producer = client.newProducer().topic(TOPIC).create(); Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("produceMockMessageFn").subscribe(); @@ -136,7 +135,6 @@ private static void setupPulsarContainer() { @BeforeClass public static void setup() throws PulsarClientException { setupPulsarContainer(); - client = initClient(); } @AfterClass @@ -148,16 +146,19 @@ public static void afterClass() { @Test @SuppressWarnings({"rawtypes"}) - public void testPulsarFunctionality() throws Exception { - try (Consumer consumer = - client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe(); - Producer producer = client.newProducer().topic(TOPIC).create(); ) { + public void testPulsarFunctionality() { + try { + PulsarClient client = initClient(); + Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe(); + Producer producer = client.newProducer().topic(TOPIC).create(); String messageTxt = "testing pulsar functionality"; producer.send(messageTxt.getBytes(StandardCharsets.UTF_8)); CompletableFuture future = consumer.receiveAsync(); Message message = future.get(5, TimeUnit.SECONDS); assertEquals(messageTxt, new String(message.getData(), StandardCharsets.UTF_8)); client.close(); + } catch (Exception e) { + LOG.error(e.getMessage()); } } From 197319aa4e1824d67c9b514921c8c7813bf1167a Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 27 Apr 2022 13:31:01 -0500 Subject: [PATCH 02/14] add info logs --- .../org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 6 +++++- .../java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index fc881f33e67e..c503a1549e09 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -171,6 +171,9 @@ public ProcessContinuation processElement( reader.close(); return ProcessContinuation.stop(); } + LOG.info("From time ", tracker.currentRestriction().getFrom()); + LOG.info("To time ", tracker.currentRestriction().getTo()); + if (pulsarSourceDescriptor.getEndMessageId() != null) { MessageId currentMsgId = message.getMessageId(); boolean hasReachedEndMessageId = @@ -205,9 +208,9 @@ public OffsetRangeTracker restrictionTracker( if (restriction.getTo() < Long.MAX_VALUE) { return new OffsetRangeTracker(restriction); } - PulsarLatestOffsetEstimator offsetEstimator = new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic()); + LOG.info("RestrictionFrom", restriction.getFrom()); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } @@ -235,6 +238,7 @@ private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) { @Override public long estimate() { Message msg = memoizedBacklog.get(); + LOG.info("LatestMsgTime", msg.getPublishTime()); return msg.getPublishTime(); } } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java index 61649fbcc639..2e693160e262 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java @@ -176,6 +176,9 @@ public void testReadFromSimpleTopic() { .withPublishTime(); testPipeline.apply(reader).apply(ParDo.of(new PulsarRecordsMetric())); + LOG.info("StartTime", startTime); + LOG.info("EndTime", endExpectedTime); + PipelineResult pipelineResult = testPipeline.run(); MetricQueryResults metrics = pipelineResult From e216d4e0b3ee6329599fee3b5020d6ed8c13b5cf Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 27 Apr 2022 14:31:04 -0500 Subject: [PATCH 03/14] missing placeholder for logger --- .../org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 8 ++++---- .../java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index c503a1549e09..bc588534c8c2 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -171,8 +171,8 @@ public ProcessContinuation processElement( reader.close(); return ProcessContinuation.stop(); } - LOG.info("From time ", tracker.currentRestriction().getFrom()); - LOG.info("To time ", tracker.currentRestriction().getTo()); + LOG.info("From time {}", tracker.currentRestriction().getFrom()); + LOG.info("To time {}", tracker.currentRestriction().getTo()); if (pulsarSourceDescriptor.getEndMessageId() != null) { MessageId currentMsgId = message.getMessageId(); @@ -210,7 +210,7 @@ public OffsetRangeTracker restrictionTracker( } PulsarLatestOffsetEstimator offsetEstimator = new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic()); - LOG.info("RestrictionFrom", restriction.getFrom()); + LOG.info("RestrictionFrom {}", restriction.getFrom()); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } @@ -238,7 +238,7 @@ private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) { @Override public long estimate() { Message msg = memoizedBacklog.get(); - LOG.info("LatestMsgTime", msg.getPublishTime()); + LOG.info("LatestMsgTime {}", msg.getPublishTime()); return msg.getPublishTime(); } } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java index 2e693160e262..de89c711b457 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java @@ -176,9 +176,9 @@ public void testReadFromSimpleTopic() { .withPublishTime(); testPipeline.apply(reader).apply(ParDo.of(new PulsarRecordsMetric())); - LOG.info("StartTime", startTime); - LOG.info("EndTime", endExpectedTime); - + LOG.info("StartTime {}", startTime); + LOG.info("EndTime {}", endExpectedTime); + PipelineResult pipelineResult = testPipeline.run(); MetricQueryResults metrics = pipelineResult From 2132b5ea689ba62b2600f56bd0d434cc2451af97 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Tue, 3 May 2022 14:38:04 -0500 Subject: [PATCH 04/14] add testruntime with slf4j --- sdks/java/io/pulsar/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/pulsar/build.gradle b/sdks/java/io/pulsar/build.gradle index 9ee14b0933ed..e453693c603b 100644 --- a/sdks/java/io/pulsar/build.gradle +++ b/sdks/java/io/pulsar/build.gradle @@ -43,5 +43,6 @@ dependencies { testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation "org.testcontainers:pulsar:1.15.3" testImplementation "org.assertj:assertj-core:2.9.1" + testRuntimeOnly library.java.slf4j_jdk14 } From 7beb96095fdd6540fa51d806ec37fc285d3524c4 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Thu, 5 May 2022 12:10:59 -0500 Subject: [PATCH 05/14] change logs from info to warn --- .../org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 8 ++++---- .../java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index bc588534c8c2..fbbfb3e87740 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -171,8 +171,8 @@ public ProcessContinuation processElement( reader.close(); return ProcessContinuation.stop(); } - LOG.info("From time {}", tracker.currentRestriction().getFrom()); - LOG.info("To time {}", tracker.currentRestriction().getTo()); + LOG.warn("From time {}", tracker.currentRestriction().getFrom()); + LOG.warn("To time {}", tracker.currentRestriction().getTo()); if (pulsarSourceDescriptor.getEndMessageId() != null) { MessageId currentMsgId = message.getMessageId(); @@ -210,7 +210,7 @@ public OffsetRangeTracker restrictionTracker( } PulsarLatestOffsetEstimator offsetEstimator = new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic()); - LOG.info("RestrictionFrom {}", restriction.getFrom()); + LOG.warn("RestrictionFrom {}", restriction.getFrom()); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } @@ -238,7 +238,7 @@ private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) { @Override public long estimate() { Message msg = memoizedBacklog.get(); - LOG.info("LatestMsgTime {}", msg.getPublishTime()); + LOG.warn("LatestMsgTime {}", msg.getPublishTime()); return msg.getPublishTime(); } } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java index de89c711b457..adc25acdfe38 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java @@ -166,6 +166,7 @@ public void testPulsarFunctionality() { public void testReadFromSimpleTopic() { try { List inputsMock = produceMessages(); + PulsarIO.Read reader = PulsarIO.read() .withClientUrl(pulsarContainer.getPulsarBrokerUrl()) @@ -175,9 +176,9 @@ public void testReadFromSimpleTopic() { .withEndTimestamp(endExpectedTime) .withPublishTime(); testPipeline.apply(reader).apply(ParDo.of(new PulsarRecordsMetric())); - - LOG.info("StartTime {}", startTime); - LOG.info("EndTime {}", endExpectedTime); + LOG.warn("Test PulsarIO read from simple topic"); + LOG.warn("StartTime {}", startTime); + LOG.warn("EndTime {}", endExpectedTime); PipelineResult pipelineResult = testPipeline.run(); MetricQueryResults metrics = From a289904506c8efcb58783bce7199919077716ffb Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Thu, 5 May 2022 17:23:24 -0500 Subject: [PATCH 06/14] add test logging flag --- sdks/java/io/pulsar/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/pulsar/build.gradle b/sdks/java/io/pulsar/build.gradle index e453693c603b..6a853d063c17 100644 --- a/sdks/java/io/pulsar/build.gradle +++ b/sdks/java/io/pulsar/build.gradle @@ -44,5 +44,5 @@ dependencies { testImplementation "org.testcontainers:pulsar:1.15.3" testImplementation "org.assertj:assertj-core:2.9.1" testRuntimeOnly library.java.slf4j_jdk14 - + testLogging.showStandardStreams = true } From 5296c6d2b04dfe5e85bd2ce9318591e50e3172cd Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Fri, 6 May 2022 10:51:50 -0500 Subject: [PATCH 07/14] fix gradle to use testloggin flag --- sdks/java/io/pulsar/build.gradle | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/java/io/pulsar/build.gradle b/sdks/java/io/pulsar/build.gradle index 6a853d063c17..aab5aa742b6d 100644 --- a/sdks/java/io/pulsar/build.gradle +++ b/sdks/java/io/pulsar/build.gradle @@ -44,5 +44,8 @@ dependencies { testImplementation "org.testcontainers:pulsar:1.15.3" testImplementation "org.assertj:assertj-core:2.9.1" testRuntimeOnly library.java.slf4j_jdk14 +} + +test { testLogging.showStandardStreams = true } From b0e29ba6790954ac60cdb3d88af15a0072c5783c Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Wed, 1 Jun 2022 15:09:46 -0500 Subject: [PATCH 08/14] log current timestamp --- .../org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index fbbfb3e87740..09bafaad0b71 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -171,8 +171,9 @@ public ProcessContinuation processElement( reader.close(); return ProcessContinuation.stop(); } - LOG.warn("From time {}", tracker.currentRestriction().getFrom()); - LOG.warn("To time {}", tracker.currentRestriction().getTo()); + LOG.info("From time {}", tracker.currentRestriction().getFrom()); + LOG.info("Current time {}", currentTimestamp); + LOG.info("To time {}", tracker.currentRestriction().getTo()); if (pulsarSourceDescriptor.getEndMessageId() != null) { MessageId currentMsgId = message.getMessageId(); @@ -238,7 +239,7 @@ private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) { @Override public long estimate() { Message msg = memoizedBacklog.get(); - LOG.warn("LatestMsgTime {}", msg.getPublishTime()); + LOG.info("LatestMsgTime {}", msg.getPublishTime()); return msg.getPublishTime(); } } From 57ecc981d9c3e921e9601fc4fcb3c95056934b1b Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Mon, 4 Jul 2022 10:38:04 -0500 Subject: [PATCH 09/14] restriction changed and logs added --- .../beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index 09bafaad0b71..4eaced04a97d 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -53,6 +53,7 @@ public class ReadFromPulsarDoFn extends DoFn, Instant> extractOutputTimestampFn; @@ -61,6 +62,7 @@ public ReadFromPulsarDoFn(PulsarIO.Read transform) { this.clientUrl = transform.getClientUrl(); this.adminUrl = transform.getAdminUrl(); this.pulsarClientSerializableFunction = transform.getPulsarClient(); + this.failOverTimestamp = 0L; } // Open connection to Pulsar clients @@ -167,6 +169,7 @@ public ProcessContinuation processElement( // if tracker.tryclaim() return true, sdf must execute work otherwise // doFn must exit processElement() without doing any work associated // or claiming more work + LOG.info("Current time before claimed {}", currentTimestamp); if (!tracker.tryClaim(currentTimestamp)) { reader.close(); return ProcessContinuation.stop(); @@ -183,6 +186,7 @@ public ProcessContinuation processElement( return ProcessContinuation.stop(); } } + failOverTimestamp = currentTimestamp; PulsarMessage pulsarMessage = new PulsarMessage(message.getTopicName(), message.getPublishTime(), message); Instant outputTimestamp = extractOutputTimestampFn.apply(message); @@ -206,12 +210,17 @@ public WatermarkEstimator newWatermarkEstimator( @NewTracker public OffsetRangeTracker restrictionTracker( @Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange restriction) { + LOG.info("Restriction Tracker {} to {}", restriction.getFrom(), restriction.getTo()); if (restriction.getTo() < Long.MAX_VALUE) { - return new OffsetRangeTracker(restriction); + if(restriction.getFrom() < failOverTimestamp && restriction.getTo() > failOverTimestamp) { + LOG.info("Restriction Tracker with limit {} to {}", failOverTimestamp, restriction.getTo()); + return new OffsetRangeTracker(new OffsetRange(failOverTimestamp, restriction.getTo())); + } + return new OffsetRangeTracker(new OffsetRange(restriction.getFrom(), restriction.getTo())); } PulsarLatestOffsetEstimator offsetEstimator = new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic()); - LOG.warn("RestrictionFrom {}", restriction.getFrom()); + LOG.info("RestrictionFrom {}", restriction.getFrom()); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } From a35b9323ff5e77d9a429f6b2376dc03881c5620f Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Thu, 7 Jul 2022 10:32:00 -0500 Subject: [PATCH 10/14] add log for failover --- .../org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index 4eaced04a97d..762e27614c10 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -187,6 +187,7 @@ public ProcessContinuation processElement( } } failOverTimestamp = currentTimestamp; + LOG.info("Failover timestamp {}", failOverTimestamp); PulsarMessage pulsarMessage = new PulsarMessage(message.getTopicName(), message.getPublishTime(), message); Instant outputTimestamp = extractOutputTimestampFn.apply(message); @@ -210,7 +211,7 @@ public WatermarkEstimator newWatermarkEstimator( @NewTracker public OffsetRangeTracker restrictionTracker( @Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange restriction) { - LOG.info("Restriction Tracker {} to {}", restriction.getFrom(), restriction.getTo()); + LOG.info("Restriction Tracker {} to {} with failovertimestamp {}", restriction.getFrom(), restriction.getTo(), failOverTimestamp); if (restriction.getTo() < Long.MAX_VALUE) { if(restriction.getFrom() < failOverTimestamp && restriction.getTo() > failOverTimestamp) { LOG.info("Restriction Tracker with limit {} to {}", failOverTimestamp, restriction.getTo()); @@ -220,7 +221,7 @@ public OffsetRangeTracker restrictionTracker( } PulsarLatestOffsetEstimator offsetEstimator = new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic()); - LOG.info("RestrictionFrom {}", restriction.getFrom()); + LOG.info("GrowableOffsetRangeTracker from {}", restriction.getFrom()); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } From d02030c1f1f163c463309d2ae37b028d6483ef5f Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Thu, 7 Jul 2022 12:56:38 -0500 Subject: [PATCH 11/14] add validation when pulsar seeks for the subscription --- .../sdk/io/pulsar/ReadFromPulsarDoFn.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index 762e27614c10..ec3e7ab3f9c2 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -95,6 +95,7 @@ public void initPulsarClients() throws Exception { // Close connection to Pulsar clients @Teardown public void teardown() throws Exception { + LOG.info("Client closed"); this.client.close(); this.admin.close(); } @@ -166,28 +167,34 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } Long currentTimestamp = message.getPublishTime(); + + // validate when pulsar client seek for the start timestamp, the next timestamp should be + // greater than the start restriction + if(currentTimestamp < tracker.currentRestriction().getFrom()) { + reader.close(); + return ProcessContinuation.stop(); + } // if tracker.tryclaim() return true, sdf must execute work otherwise // doFn must exit processElement() without doing any work associated // or claiming more work + LOG.info("From time {}", tracker.currentRestriction().getFrom()); LOG.info("Current time before claimed {}", currentTimestamp); + LOG.info("To time {}", tracker.currentRestriction().getTo()); if (!tracker.tryClaim(currentTimestamp)) { reader.close(); return ProcessContinuation.stop(); } - LOG.info("From time {}", tracker.currentRestriction().getFrom()); - LOG.info("Current time {}", currentTimestamp); - LOG.info("To time {}", tracker.currentRestriction().getTo()); + LOG.info("Current time claimed {}", currentTimestamp); if (pulsarSourceDescriptor.getEndMessageId() != null) { MessageId currentMsgId = message.getMessageId(); boolean hasReachedEndMessageId = currentMsgId.compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0; if (hasReachedEndMessageId) { + reader.close(); return ProcessContinuation.stop(); } } - failOverTimestamp = currentTimestamp; - LOG.info("Failover timestamp {}", failOverTimestamp); PulsarMessage pulsarMessage = new PulsarMessage(message.getTopicName(), message.getPublishTime(), message); Instant outputTimestamp = extractOutputTimestampFn.apply(message); @@ -211,12 +218,8 @@ public WatermarkEstimator newWatermarkEstimator( @NewTracker public OffsetRangeTracker restrictionTracker( @Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange restriction) { - LOG.info("Restriction Tracker {} to {} with failovertimestamp {}", restriction.getFrom(), restriction.getTo(), failOverTimestamp); + LOG.info("Restriction Tracker {} to {}", restriction.getFrom(), restriction.getTo()); if (restriction.getTo() < Long.MAX_VALUE) { - if(restriction.getFrom() < failOverTimestamp && restriction.getTo() > failOverTimestamp) { - LOG.info("Restriction Tracker with limit {} to {}", failOverTimestamp, restriction.getTo()); - return new OffsetRangeTracker(new OffsetRange(failOverTimestamp, restriction.getTo())); - } return new OffsetRangeTracker(new OffsetRange(restriction.getFrom(), restriction.getTo())); } PulsarLatestOffsetEstimator offsetEstimator = From 20c872887d7b969c0816a215b9a30e3fe078689e Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Thu, 7 Jul 2022 13:58:57 -0500 Subject: [PATCH 12/14] add validation when pulsar retrieves in seek function an invalid offset --- .../beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 3 +-- .../sdk/io/pulsar/ReadFromPulsarDoFnTest.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index ec3e7ab3f9c2..189cb89fffe0 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -53,7 +53,6 @@ public class ReadFromPulsarDoFn extends DoFn, Instant> extractOutputTimestampFn; @@ -62,7 +61,6 @@ public ReadFromPulsarDoFn(PulsarIO.Read transform) { this.clientUrl = transform.getClientUrl(); this.adminUrl = transform.getAdminUrl(); this.pulsarClientSerializableFunction = transform.getPulsarClient(); - this.failOverTimestamp = 0L; } // Open connection to Pulsar clients @@ -171,6 +169,7 @@ public ProcessContinuation processElement( // validate when pulsar client seek for the start timestamp, the next timestamp should be // greater than the start restriction if(currentTimestamp < tracker.currentRestriction().getFrom()) { + LOG.warn("Initial restriction ({}) is greather than Current timestamp ({})", tracker.currentRestriction().getFrom(), currentTimestamp); reader.close(); return ProcessContinuation.stop(); } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index 273a1915d2bb..1b418136bd2e 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -160,6 +160,22 @@ public void testProcessElementWhenHasReachedEndTopic() throws Exception { assertEquals(DoFn.ProcessContinuation.stop(), result); } + @Test + public void testProcessElementWhenPulsarSeeksWrongTimestamp() throws Exception { + MockOutputReceiver receiver = new MockOutputReceiver(); + fakePulsarReader.setWrongSeek(true); + long startOffset = fakePulsarReader.getMessageTimestamp(3); + long endOffset = fakePulsarReader.getEndTimestamp(); + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset)); + DoFn.ProcessContinuation result = + dofnInstance.processElement( + PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL), + tracker, + null, + (DoFn.OutputReceiver) receiver); + assertEquals(DoFn.ProcessContinuation.stop(), result); + } + private static class MockOutputReceiver implements DoFn.OutputReceiver { private final List records = new ArrayList<>(); From 7bed031fe46623cfb3bce33a4460a07720fde401 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Thu, 7 Jul 2022 15:20:21 -0500 Subject: [PATCH 13/14] changes in fakepulsareader for unit test --- .../beam/sdk/io/pulsar/FakePulsarReader.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java index 834fd0427532..574fa6223041 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java @@ -39,6 +39,7 @@ public class FakePulsarReader implements Reader { private long endTimestamp; private boolean reachedEndOfTopic; private int numberOfMessages; + private boolean useWrongSeek; public FakePulsarReader(String topic, int numberOfMessages) { this.numberOfMessages = numberOfMessages; @@ -66,6 +67,7 @@ public void setMock(String topic, int numberOfMessages) { public void reset() { this.reachedEndOfTopic = false; this.currentMsg = 0; + this.useWrongSeek = false; emptyMockRecords(); setMock(topic, numberOfMessages); } @@ -82,6 +84,14 @@ public long getEndTimestamp() { return this.endTimestamp; } + public void setWrongSeek(boolean flag) { + this.useWrongSeek = flag; + } + + public long getMessageTimestamp(int position) { + return this.fakeMessages.get(position).getPublishTime(); + } + @Override public String getTopic() { return this.topic; @@ -141,6 +151,10 @@ public void seek(MessageId messageId) throws PulsarClientException {} @Override public void seek(long timestamp) throws PulsarClientException { for (int i = 0; i < fakeMessages.size(); i++) { + if(useWrongSeek) { + currentMsg = 1; + break; + } if (timestamp == fakeMessages.get(i).getPublishTime()) { currentMsg = i; break; From 345452c6819e4c56891dd6145d751323ee4c89b2 Mon Sep 17 00:00:00 2001 From: Marco Robles Date: Fri, 8 Jul 2022 11:30:50 -0500 Subject: [PATCH 14/14] change process continuation to avoid skip a value in process element --- .../java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java | 3 +-- .../org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index 189cb89fffe0..6c6cbc22da90 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -170,8 +170,7 @@ public ProcessContinuation processElement( // greater than the start restriction if(currentTimestamp < tracker.currentRestriction().getFrom()) { LOG.warn("Initial restriction ({}) is greather than Current timestamp ({})", tracker.currentRestriction().getFrom(), currentTimestamp); - reader.close(); - return ProcessContinuation.stop(); + return ProcessContinuation.resume(); } // if tracker.tryclaim() return true, sdf must execute work otherwise // doFn must exit processElement() without doing any work associated diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index 1b418136bd2e..c0480c8a8a02 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -173,7 +173,7 @@ public void testProcessElementWhenPulsarSeeksWrongTimestamp() throws Exception { tracker, null, (DoFn.OutputReceiver) receiver); - assertEquals(DoFn.ProcessContinuation.stop(), result); + assertEquals(DoFn.ProcessContinuation.resume(), result); } private static class MockOutputReceiver implements DoFn.OutputReceiver {