diff --git a/sdks/java/io/pulsar/build.gradle b/sdks/java/io/pulsar/build.gradle index 9ee14b0933ed..aab5aa742b6d 100644 --- a/sdks/java/io/pulsar/build.gradle +++ b/sdks/java/io/pulsar/build.gradle @@ -43,5 +43,9 @@ dependencies { testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation "org.testcontainers:pulsar:1.15.3" testImplementation "org.assertj:assertj-core:2.9.1" + testRuntimeOnly library.java.slf4j_jdk14 +} +test { + testLogging.showStandardStreams = true } diff --git a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java index fc881f33e67e..6c6cbc22da90 100644 --- a/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java +++ b/sdks/java/io/pulsar/src/main/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFn.java @@ -93,6 +93,7 @@ public void initPulsarClients() throws Exception { // Close connection to Pulsar clients @Teardown public void teardown() throws Exception { + LOG.info("Client closed"); this.client.close(); this.admin.close(); } @@ -164,18 +165,31 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } Long currentTimestamp = message.getPublishTime(); + + // validate when pulsar client seek for the start timestamp, the next timestamp should be + // greater than the start restriction + if(currentTimestamp < tracker.currentRestriction().getFrom()) { + LOG.warn("Initial restriction ({}) is greather than Current timestamp ({})", tracker.currentRestriction().getFrom(), currentTimestamp); + return ProcessContinuation.resume(); + } // if tracker.tryclaim() return true, sdf must execute work otherwise // doFn must exit processElement() without doing any work associated // or claiming more work + LOG.info("From time {}", tracker.currentRestriction().getFrom()); + LOG.info("Current time before claimed {}", currentTimestamp); + LOG.info("To time {}", tracker.currentRestriction().getTo()); if (!tracker.tryClaim(currentTimestamp)) { reader.close(); return ProcessContinuation.stop(); } + LOG.info("Current time claimed {}", currentTimestamp); + if (pulsarSourceDescriptor.getEndMessageId() != null) { MessageId currentMsgId = message.getMessageId(); boolean hasReachedEndMessageId = currentMsgId.compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0; if (hasReachedEndMessageId) { + reader.close(); return ProcessContinuation.stop(); } } @@ -202,12 +216,13 @@ public WatermarkEstimator newWatermarkEstimator( @NewTracker public OffsetRangeTracker restrictionTracker( @Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange restriction) { + LOG.info("Restriction Tracker {} to {}", restriction.getFrom(), restriction.getTo()); if (restriction.getTo() < Long.MAX_VALUE) { - return new OffsetRangeTracker(restriction); + return new OffsetRangeTracker(new OffsetRange(restriction.getFrom(), restriction.getTo())); } - PulsarLatestOffsetEstimator offsetEstimator = new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic()); + LOG.info("GrowableOffsetRangeTracker from {}", restriction.getFrom()); return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator); } @@ -235,6 +250,7 @@ private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) { @Override public long estimate() { Message msg = memoizedBacklog.get(); + LOG.info("LatestMsgTime {}", msg.getPublishTime()); return msg.getPublishTime(); } } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java index 834fd0427532..574fa6223041 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/FakePulsarReader.java @@ -39,6 +39,7 @@ public class FakePulsarReader implements Reader { private long endTimestamp; private boolean reachedEndOfTopic; private int numberOfMessages; + private boolean useWrongSeek; public FakePulsarReader(String topic, int numberOfMessages) { this.numberOfMessages = numberOfMessages; @@ -66,6 +67,7 @@ public void setMock(String topic, int numberOfMessages) { public void reset() { this.reachedEndOfTopic = false; this.currentMsg = 0; + this.useWrongSeek = false; emptyMockRecords(); setMock(topic, numberOfMessages); } @@ -82,6 +84,14 @@ public long getEndTimestamp() { return this.endTimestamp; } + public void setWrongSeek(boolean flag) { + this.useWrongSeek = flag; + } + + public long getMessageTimestamp(int position) { + return this.fakeMessages.get(position).getPublishTime(); + } + @Override public String getTopic() { return this.topic; @@ -141,6 +151,10 @@ public void seek(MessageId messageId) throws PulsarClientException {} @Override public void seek(long timestamp) throws PulsarClientException { for (int i = 0; i < fakeMessages.size(); i++) { + if(useWrongSeek) { + currentMsg = 1; + break; + } if (timestamp == fakeMessages.get(i).getPublishTime()) { currentMsg = i; break; diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java index eeb6a5d7652c..adc25acdfe38 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java @@ -59,7 +59,6 @@ public class PulsarIOTest { private static final String TOPIC = "PULSAR_IO_TEST"; protected static PulsarContainer pulsarContainer; - protected static PulsarClient client; private long endExpectedTime = 0; private long startTime = 0; @@ -69,9 +68,7 @@ public class PulsarIOTest { @Rule public final transient TestPipeline testPipeline = TestPipeline.create(); public List> receiveMessages() throws PulsarClientException { - if (client == null) { - initClient(); - } + PulsarClient client = initClient(); List> messages = new ArrayList<>(); Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("receiveMockMessageFn").subscribe(); @@ -84,11 +81,13 @@ public List> receiveMessages() throws PulsarClientException { consumer.negativeAcknowledge(msg); } } + consumer.close(); + client.close(); return messages; } public List produceMessages() throws PulsarClientException { - client = initClient(); + PulsarClient client = initClient(); Producer producer = client.newProducer().topic(TOPIC).create(); Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("produceMockMessageFn").subscribe(); @@ -136,7 +135,6 @@ private static void setupPulsarContainer() { @BeforeClass public static void setup() throws PulsarClientException { setupPulsarContainer(); - client = initClient(); } @AfterClass @@ -148,16 +146,19 @@ public static void afterClass() { @Test @SuppressWarnings({"rawtypes"}) - public void testPulsarFunctionality() throws Exception { - try (Consumer consumer = - client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe(); - Producer producer = client.newProducer().topic(TOPIC).create(); ) { + public void testPulsarFunctionality() { + try { + PulsarClient client = initClient(); + Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe(); + Producer producer = client.newProducer().topic(TOPIC).create(); String messageTxt = "testing pulsar functionality"; producer.send(messageTxt.getBytes(StandardCharsets.UTF_8)); CompletableFuture future = consumer.receiveAsync(); Message message = future.get(5, TimeUnit.SECONDS); assertEquals(messageTxt, new String(message.getData(), StandardCharsets.UTF_8)); client.close(); + } catch (Exception e) { + LOG.error(e.getMessage()); } } @@ -165,6 +166,7 @@ public void testPulsarFunctionality() throws Exception { public void testReadFromSimpleTopic() { try { List inputsMock = produceMessages(); + PulsarIO.Read reader = PulsarIO.read() .withClientUrl(pulsarContainer.getPulsarBrokerUrl()) @@ -174,6 +176,9 @@ public void testReadFromSimpleTopic() { .withEndTimestamp(endExpectedTime) .withPublishTime(); testPipeline.apply(reader).apply(ParDo.of(new PulsarRecordsMetric())); + LOG.warn("Test PulsarIO read from simple topic"); + LOG.warn("StartTime {}", startTime); + LOG.warn("EndTime {}", endExpectedTime); PipelineResult pipelineResult = testPipeline.run(); MetricQueryResults metrics = diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index 273a1915d2bb..c0480c8a8a02 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -160,6 +160,22 @@ public void testProcessElementWhenHasReachedEndTopic() throws Exception { assertEquals(DoFn.ProcessContinuation.stop(), result); } + @Test + public void testProcessElementWhenPulsarSeeksWrongTimestamp() throws Exception { + MockOutputReceiver receiver = new MockOutputReceiver(); + fakePulsarReader.setWrongSeek(true); + long startOffset = fakePulsarReader.getMessageTimestamp(3); + long endOffset = fakePulsarReader.getEndTimestamp(); + OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset)); + DoFn.ProcessContinuation result = + dofnInstance.processElement( + PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL), + tracker, + null, + (DoFn.OutputReceiver) receiver); + assertEquals(DoFn.ProcessContinuation.resume(), result); + } + private static class MockOutputReceiver implements DoFn.OutputReceiver { private final List records = new ArrayList<>();