Skip to content

Commit

Permalink
Fix flaky mqtt test (apache#32765)
Browse files Browse the repository at this point in the history
* fix mqtt flaky test

* extract common method for connect to Mqtt
  • Loading branch information
twosom authored and reeba212 committed Dec 4, 2024
1 parent 7535ac3 commit cfa5c63
Showing 1 changed file with 41 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@
@RunWith(JUnit4.class)
public class MqttIOTest {

/** Functional interface used to verify the connection status of an MQTT client. */
@FunctionalInterface
interface ConnectionCondition {
/**
* Evaluates whether the given {@link Connection} satisfies the condition.
*
* @param connection the MQTT connection to check
* @return {@code true} if the condition is met, {@code false} otherwise
*/
boolean check(Connection connection);
}

private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);

private BrokerService brokerService;
Expand Down Expand Up @@ -123,18 +135,7 @@ public void testReadNoClientId() throws Exception {
new Thread(
() -> {
try {
LOG.info(
"Waiting pipeline connected to the MQTT broker before sending "
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
Thread.sleep(1000);
for (Connection connection : brokerService.getBroker().getClients()) {
if (!connection.getConnectionId().isEmpty()) {
pipelineConnected = true;
}
}
}
doConnect(connection -> !connection.getConnectionId().isEmpty());
for (int i = 0; i < 10; i++) {
publishConnection.publish(
topicName,
Expand Down Expand Up @@ -185,18 +186,7 @@ public void testRead() throws Exception {
new Thread(
() -> {
try {
LOG.info(
"Waiting pipeline connected to the MQTT broker before sending "
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
for (Connection connection : brokerService.getBroker().getClients()) {
if (connection.getConnectionId().startsWith("READ_PIPELINE")) {
pipelineConnected = true;
}
}
Thread.sleep(1000);
}
doConnect(connection -> connection.getConnectionId().startsWith("READ_PIPELINE"));
for (int i = 0; i < 10; i++) {
publishConnection.publish(
"READ_TOPIC",
Expand Down Expand Up @@ -225,7 +215,8 @@ public void testReadWithMetadata() throws Exception {
MqttIO.readWithMetadata()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, wildcardTopic))
.withMaxNumRecords(10);
.withMaxNumRecords(10)
.withMaxReadTime(Duration.standardSeconds(5));

final PCollection<MqttRecord> output = pipeline.apply(mqttReaderWithMetadata);
PAssert.that(output)
Expand All @@ -251,18 +242,7 @@ public void testReadWithMetadata() throws Exception {
new Thread(
() -> {
try {
LOG.info(
"Waiting pipeline connected to the MQTT broker before sending "
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
Thread.sleep(1000);
for (Connection connection : brokerService.getBroker().getClients()) {
if (!connection.getConnectionId().isEmpty()) {
pipelineConnected = true;
}
}
}
doConnect(connection -> !connection.getConnectionId().isEmpty());
for (int i = 0; i < 5; i++) {
publishConnection.publish(
topic1,
Expand Down Expand Up @@ -581,6 +561,30 @@ public void testReadObject() throws Exception {
assertEquals(cp1.oldestMessageTimestamp, cp2.oldestMessageTimestamp);
}

/**
* Attempts to establish a connection to the MQTT broker by checking each available client
* connection until the specified condition is met.
*
* <p>This method repeatedly checks the connection status of each MQTT client using the provided
* {@link ConnectionCondition}. It blocks execution within a loop, sleeping for 1 second between
* each check, until the condition is satisfied.
*
* @param condition the condition used to verify the connection status
* @throws Exception if any error occurs during the connection process
*/
private void doConnect(ConnectionCondition condition) throws Exception {
LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
for (Connection connection : brokerService.getBroker().getClients()) {
if (condition.check(connection)) {
pipelineConnected = true;
}
}
Thread.sleep(1000);
}
}

@After
public void stopBroker() throws Exception {
if (brokerService != null) {
Expand Down

0 comments on commit cfa5c63

Please sign in to comment.