From cfa5c6305fb2dcea91524d83fbf7272ead9bcfb7 Mon Sep 17 00:00:00 2001 From: twosom <72733442+twosom@users.noreply.github.com> Date: Mon, 14 Oct 2024 11:58:24 +0900 Subject: [PATCH] Fix flaky mqtt test (#32765) * fix mqtt flaky test * extract common method for connect to Mqtt --- .../apache/beam/sdk/io/mqtt/MqttIOTest.java | 78 ++++++++++--------- 1 file changed, 41 insertions(+), 37 deletions(-) diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 64b0728c879a..3ee6ed577a07 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -69,6 +69,18 @@ @RunWith(JUnit4.class) public class MqttIOTest { + /** Functional interface used to verify the connection status of an MQTT client. */ + @FunctionalInterface + interface ConnectionCondition { + /** + * Evaluates whether the given {@link Connection} satisfies the condition. + * + * @param connection the MQTT connection to check + * @return {@code true} if the condition is met, {@code false} otherwise + */ + boolean check(Connection connection); + } + private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class); private BrokerService brokerService; @@ -123,18 +135,7 @@ public void testReadNoClientId() throws Exception { new Thread( () -> { try { - LOG.info( - "Waiting pipeline connected to the MQTT broker before sending " - + "messages ..."); - boolean pipelineConnected = false; - while (!pipelineConnected) { - Thread.sleep(1000); - for (Connection connection : brokerService.getBroker().getClients()) { - if (!connection.getConnectionId().isEmpty()) { - pipelineConnected = true; - } - } - } + doConnect(connection -> !connection.getConnectionId().isEmpty()); for (int i = 0; i < 10; i++) { publishConnection.publish( topicName, @@ -185,18 +186,7 @@ public void testRead() throws Exception { new Thread( () -> { try { - LOG.info( - "Waiting pipeline connected to the MQTT broker before sending " - + "messages ..."); - boolean pipelineConnected = false; - while (!pipelineConnected) { - for (Connection connection : brokerService.getBroker().getClients()) { - if (connection.getConnectionId().startsWith("READ_PIPELINE")) { - pipelineConnected = true; - } - } - Thread.sleep(1000); - } + doConnect(connection -> connection.getConnectionId().startsWith("READ_PIPELINE")); for (int i = 0; i < 10; i++) { publishConnection.publish( "READ_TOPIC", @@ -225,7 +215,8 @@ public void testReadWithMetadata() throws Exception { MqttIO.readWithMetadata() .withConnectionConfiguration( MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, wildcardTopic)) - .withMaxNumRecords(10); + .withMaxNumRecords(10) + .withMaxReadTime(Duration.standardSeconds(5)); final PCollection output = pipeline.apply(mqttReaderWithMetadata); PAssert.that(output) @@ -251,18 +242,7 @@ public void testReadWithMetadata() throws Exception { new Thread( () -> { try { - LOG.info( - "Waiting pipeline connected to the MQTT broker before sending " - + "messages ..."); - boolean pipelineConnected = false; - while (!pipelineConnected) { - Thread.sleep(1000); - for (Connection connection : brokerService.getBroker().getClients()) { - if (!connection.getConnectionId().isEmpty()) { - pipelineConnected = true; - } - } - } + doConnect(connection -> !connection.getConnectionId().isEmpty()); for (int i = 0; i < 5; i++) { publishConnection.publish( topic1, @@ -581,6 +561,30 @@ public void testReadObject() throws Exception { assertEquals(cp1.oldestMessageTimestamp, cp2.oldestMessageTimestamp); } + /** + * Attempts to establish a connection to the MQTT broker by checking each available client + * connection until the specified condition is met. + * + *

This method repeatedly checks the connection status of each MQTT client using the provided + * {@link ConnectionCondition}. It blocks execution within a loop, sleeping for 1 second between + * each check, until the condition is satisfied. + * + * @param condition the condition used to verify the connection status + * @throws Exception if any error occurs during the connection process + */ + private void doConnect(ConnectionCondition condition) throws Exception { + LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ..."); + boolean pipelineConnected = false; + while (!pipelineConnected) { + for (Connection connection : brokerService.getBroker().getClients()) { + if (condition.check(connection)) { + pipelineConnected = true; + } + } + Thread.sleep(1000); + } + } + @After public void stopBroker() throws Exception { if (brokerService != null) {