From b247d067e4cdaac581f21ca12e0830116f02892e Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 18 Sep 2024 14:17:25 -0400 Subject: [PATCH] Emit warning when Matt waiting for connection for extended period of time --- .../org/apache/beam/sdk/io/mqtt/MqttIO.java | 24 +++++++++++++++---- .../apache/beam/sdk/io/mqtt/MqttIOTest.java | 5 ++-- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 8b7f0991c2dd..666a138a59bb 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -45,6 +46,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.Message; import org.fusesource.mqtt.client.QoS; @@ -431,8 +433,7 @@ public boolean start() throws IOException { client = spec.connectionConfiguration().createClient(); LOG.debug("Reader client ID is {}", client.getClientId()); checkpointMark.clientId = client.getClientId().toString(); - connection = client.blockingConnection(); - connection.connect(); + connection = createConnection(client); connection.subscribe( new Topic[] {new Topic(spec.connectionConfiguration().getTopic(), QoS.AT_LEAST_ONCE)}); return advance(); @@ -569,8 +570,7 @@ public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); client = spec.connectionConfiguration().createClient(); LOG.debug("MQTT writer client ID is {}", client.getClientId()); - connection = client.blockingConnection(); - connection.connect(); + connection = createConnection(client); } @ProcessElement @@ -590,4 +590,20 @@ public void closeMqttClient() throws Exception { } } } + + /** Create a connected MQTT BlockingConnection from given client, aware of connection timeout. */ + static BlockingConnection createConnection(MQTT client) throws Exception { + FutureConnection futureConnection = client.futureConnection(); + org.fusesource.mqtt.client.Future connecting = futureConnection.connect(); + while (true) { + try { + connecting.await(1, TimeUnit.MINUTES); + } catch (TimeoutException e) { + LOG.warn("Connection to MQTT broker pending after waiting for 1 minute"); + continue; + } + break; + } + return new BlockingConnection(futureConnection); + } } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 30adad708f8d..52c1b80a846a 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -142,8 +142,7 @@ public void testReadNoClientId() throws Exception { publisherThread.join(); } - @Test(timeout = 30 * 1000) - @Ignore("https://github.com/apache/beam/issues/19092 Flake Non-deterministic output.") + @Test(timeout = 40 * 1000) public void testRead() throws Exception { PCollection output = pipeline.apply( @@ -180,12 +179,12 @@ public void testRead() throws Exception { + "messages ..."); boolean pipelineConnected = false; while (!pipelineConnected) { - Thread.sleep(1000); for (Connection connection : brokerService.getBroker().getClients()) { if (connection.getConnectionId().startsWith("READ_PIPELINE")) { pipelineConnected = true; } } + Thread.sleep(2000); } for (int i = 0; i < 10; i++) { publishConnection.publish(