Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky mqtt test #32765

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@
@RunWith(JUnit4.class)
public class MqttIOTest {

/** Functional interface used to verify the connection status of an MQTT client. */
@FunctionalInterface
interface ConnectionCondition {
/**
* Evaluates whether the given {@link Connection} satisfies the condition.
*
* @param connection the MQTT connection to check
* @return {@code true} if the condition is met, {@code false} otherwise
*/
boolean check(Connection connection);
}

private static final Logger LOG = LoggerFactory.getLogger(MqttIOTest.class);

private BrokerService brokerService;
Expand Down Expand Up @@ -123,18 +135,7 @@ public void testReadNoClientId() throws Exception {
new Thread(
() -> {
try {
LOG.info(
"Waiting pipeline connected to the MQTT broker before sending "
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
Thread.sleep(1000);
for (Connection connection : brokerService.getBroker().getClients()) {
if (!connection.getConnectionId().isEmpty()) {
pipelineConnected = true;
}
}
}
doConnect(connection -> !connection.getConnectionId().isEmpty());
for (int i = 0; i < 10; i++) {
publishConnection.publish(
topicName,
Expand Down Expand Up @@ -185,18 +186,7 @@ public void testRead() throws Exception {
new Thread(
() -> {
try {
LOG.info(
"Waiting pipeline connected to the MQTT broker before sending "
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
for (Connection connection : brokerService.getBroker().getClients()) {
if (connection.getConnectionId().startsWith("READ_PIPELINE")) {
pipelineConnected = true;
}
}
Thread.sleep(1000);
}
doConnect(connection -> connection.getConnectionId().startsWith("READ_PIPELINE"));
for (int i = 0; i < 10; i++) {
publishConnection.publish(
"READ_TOPIC",
Expand Down Expand Up @@ -225,7 +215,8 @@ public void testReadWithMetadata() throws Exception {
MqttIO.readWithMetadata()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, wildcardTopic))
.withMaxNumRecords(10);
.withMaxNumRecords(10)
.withMaxReadTime(Duration.standardSeconds(5));

final PCollection<MqttRecord> output = pipeline.apply(mqttReaderWithMetadata);
PAssert.that(output)
Expand All @@ -251,18 +242,7 @@ public void testReadWithMetadata() throws Exception {
new Thread(
() -> {
try {
LOG.info(
"Waiting pipeline connected to the MQTT broker before sending "
+ "messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
Thread.sleep(1000);
for (Connection connection : brokerService.getBroker().getClients()) {
if (!connection.getConnectionId().isEmpty()) {
pipelineConnected = true;
}
}
}
doConnect(connection -> !connection.getConnectionId().isEmpty());
for (int i = 0; i < 5; i++) {
publishConnection.publish(
topic1,
Expand Down Expand Up @@ -581,6 +561,30 @@ public void testReadObject() throws Exception {
assertEquals(cp1.oldestMessageTimestamp, cp2.oldestMessageTimestamp);
}

/**
* Attempts to establish a connection to the MQTT broker by checking each available client
* connection until the specified condition is met.
*
* <p>This method repeatedly checks the connection status of each MQTT client using the provided
* {@link ConnectionCondition}. It blocks execution within a loop, sleeping for 1 second between
* each check, until the condition is satisfied.
*
* @param condition the condition used to verify the connection status
* @throws Exception if any error occurs during the connection process
*/
private void doConnect(ConnectionCondition condition) throws Exception {
LOG.info("Waiting pipeline connected to the MQTT broker before sending messages ...");
boolean pipelineConnected = false;
while (!pipelineConnected) {
for (Connection connection : brokerService.getBroker().getClients()) {
if (condition.check(connection)) {
pipelineConnected = true;
}
}
Thread.sleep(1000);
}
}

@After
public void stopBroker() throws Exception {
if (brokerService != null) {
Expand Down
Loading