Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14269] PulsarIOTest.testReadFromSimpleTopic flaky test #17473

Closed
wants to merge 14 commits into from
Closed
4 changes: 4 additions & 0 deletions sdks/java/io/pulsar/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,9 @@ dependencies {
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation "org.testcontainers:pulsar:1.15.3"
testImplementation "org.assertj:assertj-core:2.9.1"
testRuntimeOnly library.java.slf4j_jdk14
}

test {
testLogging.showStandardStreams = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void initPulsarClients() throws Exception {
// Close connection to Pulsar clients
@Teardown
public void teardown() throws Exception {
LOG.info("Client closed");
this.client.close();
this.admin.close();
}
Expand Down Expand Up @@ -164,18 +165,31 @@ public ProcessContinuation processElement(
return ProcessContinuation.resume();
}
Long currentTimestamp = message.getPublishTime();

// validate when pulsar client seek for the start timestamp, the next timestamp should be
// greater than the start restriction
if(currentTimestamp < tracker.currentRestriction().getFrom()) {
LOG.warn("Initial restriction ({}) is greather than Current timestamp ({})", tracker.currentRestriction().getFrom(), currentTimestamp);
return ProcessContinuation.resume();
}
// if tracker.tryclaim() return true, sdf must execute work otherwise
// doFn must exit processElement() without doing any work associated
// or claiming more work
LOG.info("From time {}", tracker.currentRestriction().getFrom());
LOG.info("Current time before claimed {}", currentTimestamp);
LOG.info("To time {}", tracker.currentRestriction().getTo());
if (!tracker.tryClaim(currentTimestamp)) {
reader.close();
return ProcessContinuation.stop();
}
LOG.info("Current time claimed {}", currentTimestamp);

if (pulsarSourceDescriptor.getEndMessageId() != null) {
MessageId currentMsgId = message.getMessageId();
boolean hasReachedEndMessageId =
currentMsgId.compareTo(pulsarSourceDescriptor.getEndMessageId()) == 0;
if (hasReachedEndMessageId) {
reader.close();
return ProcessContinuation.stop();
}
}
Expand All @@ -202,12 +216,13 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
@NewTracker
public OffsetRangeTracker restrictionTracker(
@Element PulsarSourceDescriptor pulsarSource, @Restriction OffsetRange restriction) {
LOG.info("Restriction Tracker {} to {}", restriction.getFrom(), restriction.getTo());
if (restriction.getTo() < Long.MAX_VALUE) {
return new OffsetRangeTracker(restriction);
return new OffsetRangeTracker(new OffsetRange(restriction.getFrom(), restriction.getTo()));
}

PulsarLatestOffsetEstimator offsetEstimator =
new PulsarLatestOffsetEstimator(this.admin, pulsarSource.getTopic());
LOG.info("GrowableOffsetRangeTracker from {}", restriction.getFrom());
return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator);
}

Expand Down Expand Up @@ -235,6 +250,7 @@ private PulsarLatestOffsetEstimator(PulsarAdmin admin, String topic) {
@Override
public long estimate() {
Message<byte[]> msg = memoizedBacklog.get();
LOG.info("LatestMsgTime {}", msg.getPublishTime());
return msg.getPublishTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class FakePulsarReader implements Reader<byte[]> {
private long endTimestamp;
private boolean reachedEndOfTopic;
private int numberOfMessages;
private boolean useWrongSeek;

public FakePulsarReader(String topic, int numberOfMessages) {
this.numberOfMessages = numberOfMessages;
Expand Down Expand Up @@ -66,6 +67,7 @@ public void setMock(String topic, int numberOfMessages) {
public void reset() {
this.reachedEndOfTopic = false;
this.currentMsg = 0;
this.useWrongSeek = false;
emptyMockRecords();
setMock(topic, numberOfMessages);
}
Expand All @@ -82,6 +84,14 @@ public long getEndTimestamp() {
return this.endTimestamp;
}

public void setWrongSeek(boolean flag) {
this.useWrongSeek = flag;
}

public long getMessageTimestamp(int position) {
return this.fakeMessages.get(position).getPublishTime();
}

@Override
public String getTopic() {
return this.topic;
Expand Down Expand Up @@ -141,6 +151,10 @@ public void seek(MessageId messageId) throws PulsarClientException {}
@Override
public void seek(long timestamp) throws PulsarClientException {
for (int i = 0; i < fakeMessages.size(); i++) {
if(useWrongSeek) {
currentMsg = 1;
break;
}
if (timestamp == fakeMessages.get(i).getPublishTime()) {
currentMsg = i;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class PulsarIOTest {

private static final String TOPIC = "PULSAR_IO_TEST";
protected static PulsarContainer pulsarContainer;
protected static PulsarClient client;

private long endExpectedTime = 0;
private long startTime = 0;
Expand All @@ -69,9 +68,7 @@ public class PulsarIOTest {
@Rule public final transient TestPipeline testPipeline = TestPipeline.create();

public List<Message<byte[]>> receiveMessages() throws PulsarClientException {
if (client == null) {
initClient();
}
PulsarClient client = initClient();
List<Message<byte[]>> messages = new ArrayList<>();
Consumer<byte[]> consumer =
client.newConsumer().topic(TOPIC).subscriptionName("receiveMockMessageFn").subscribe();
Expand All @@ -84,11 +81,13 @@ public List<Message<byte[]>> receiveMessages() throws PulsarClientException {
consumer.negativeAcknowledge(msg);
}
}
consumer.close();
client.close();
return messages;
}

public List<PulsarMessage> produceMessages() throws PulsarClientException {
client = initClient();
PulsarClient client = initClient();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
Consumer<byte[]> consumer =
client.newConsumer().topic(TOPIC).subscriptionName("produceMockMessageFn").subscribe();
Expand Down Expand Up @@ -136,7 +135,6 @@ private static void setupPulsarContainer() {
@BeforeClass
public static void setup() throws PulsarClientException {
setupPulsarContainer();
client = initClient();
}

@AfterClass
Expand All @@ -148,23 +146,27 @@ public static void afterClass() {

@Test
@SuppressWarnings({"rawtypes"})
public void testPulsarFunctionality() throws Exception {
try (Consumer consumer =
client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create(); ) {
public void testPulsarFunctionality() {
try {
PulsarClient client = initClient();
Consumer consumer = client.newConsumer().topic(TOPIC).subscriptionName("PulsarIO_IT").subscribe();
Producer<byte[]> producer = client.newProducer().topic(TOPIC).create();
String messageTxt = "testing pulsar functionality";
producer.send(messageTxt.getBytes(StandardCharsets.UTF_8));
CompletableFuture<Message> future = consumer.receiveAsync();
Message message = future.get(5, TimeUnit.SECONDS);
assertEquals(messageTxt, new String(message.getData(), StandardCharsets.UTF_8));
client.close();
} catch (Exception e) {
LOG.error(e.getMessage());
}
}

@Test
public void testReadFromSimpleTopic() {
try {
List<PulsarMessage> inputsMock = produceMessages();

PulsarIO.Read reader =
PulsarIO.read()
.withClientUrl(pulsarContainer.getPulsarBrokerUrl())
Expand All @@ -174,6 +176,9 @@ public void testReadFromSimpleTopic() {
.withEndTimestamp(endExpectedTime)
.withPublishTime();
testPipeline.apply(reader).apply(ParDo.of(new PulsarRecordsMetric()));
LOG.warn("Test PulsarIO read from simple topic");
LOG.warn("StartTime {}", startTime);
LOG.warn("EndTime {}", endExpectedTime);

PipelineResult pipelineResult = testPipeline.run();
MetricQueryResults metrics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ public void testProcessElementWhenHasReachedEndTopic() throws Exception {
assertEquals(DoFn.ProcessContinuation.stop(), result);
}

@Test
public void testProcessElementWhenPulsarSeeksWrongTimestamp() throws Exception {
MockOutputReceiver receiver = new MockOutputReceiver();
fakePulsarReader.setWrongSeek(true);
long startOffset = fakePulsarReader.getMessageTimestamp(3);
long endOffset = fakePulsarReader.getEndTimestamp();
OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(startOffset, endOffset));
DoFn.ProcessContinuation result =
dofnInstance.processElement(
PulsarSourceDescriptor.of(TOPIC, null, null, null, SERVICE_URL, ADMIN_URL),
tracker,
null,
(DoFn.OutputReceiver) receiver);
assertEquals(DoFn.ProcessContinuation.resume(), result);
}

private static class MockOutputReceiver implements DoFn.OutputReceiver<PulsarMessage> {

private final List<PulsarMessage> records = new ArrayList<>();
Expand Down