diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 69d9e19c5706..1faa03040ff7 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -48,6 +48,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.FutureConnection; @@ -71,16 +73,28 @@ *

To configure a MQTT source, you have to provide a MQTT connection configuration including * {@code ClientId}, a {@code ServerURI}, a {@code Topic} pattern, and optionally {@code username} * and {@code password} to connect to the MQTT broker. The following example illustrates how to - * configure the source with multiple topics: + * configure the source with a single topic: * *

{@code
  * pipeline.apply(
  *   MqttIO.read()
  *    .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
  *      "tcp://host:11883",
- *      "topic1",
- *      "topic2",
- *      "topic3"))
+ *      "my_topic"))
+ * );
+ * }
+ * + *

For subscribing to multiple topics, use {@link + * ConnectionConfiguration#createWithMultipleTopics(String, Iterable)}. This allows subscribing to + * multiple topics with a single configuration: + * + *

{@code
+ * pipeline.apply(
+ *   MqttIO.read()
+ *    .withConnectionConfiguration(MqttIO.ConnectionConfiguration.createWithMultipleTopics(
+ *      "tcp://host:11883",
+ *      Arrays.asList("topic1", "topic2", "topic3")))
+ * );
  * }
* *

Reading with Metadata from a MQTT broker

@@ -91,17 +105,16 @@ * and payload. This allows you to implement business logic that can differ depending on the topic * from which the message was received. * - *

You can also subscribe to multiple topics with {@code readWithMetadata}, as shown in the - * following example: + *

Similar to the {@code read} method, you can also subscribe to multiple topics with {@code + * readWithMetadata}: * *

{@code
  * PCollection records = pipeline.apply(
  *   MqttIO.readWithMetadata()
- *    .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
+ *    .withConnectionConfiguration(MqttIO.ConnectionConfiguration.createWithMultipleTopics(
  *      "tcp://host:11883",
- *      "topic1",
- *      "topic2",
- *      "topic3"))
+ *      Arrays.asList("topic1", "topic2", "topic3")))
+ * );
  * }
* *

By using the topic information, you can apply different processing logic depending on the @@ -252,23 +265,36 @@ abstract static class Builder { * @param topic The MQTT getTopic pattern. * @return A connection configuration to the MQTT broker. */ - public static ConnectionConfiguration create( - String serverUri, String topic, @Nullable String... additionalTopics) { + public static ConnectionConfiguration create(String serverUri, String topic) { checkArgument(serverUri != null, "serverUri can not be null"); checkArgument(topic != null, "topic can not be null"); + return new AutoValue_MqttIO_ConnectionConfiguration.Builder() + .setServerUri(serverUri) + .setTopic(topic) + .build(); + } - if (additionalTopics != null && additionalTopics.length > 0) { - List topics = new ArrayList<>(additionalTopics.length + 1); - Collections.addAll(topics, additionalTopics); - Collections.addAll(topics, topic); - return new AutoValue_MqttIO_ConnectionConfiguration.Builder() - .setServerUri(serverUri) - .setTopicList(topics) - .build(); + /** + * Creates a connection configuration to the MQTT broker for multiple topics. This method allows + * subscribing to multiple topics simultaneously. + * + * @param serverUri The MQTT broker URI. + * @param topics An iterable collection of MQTT topic patterns to subscribe to. + * @return A connection configuration to the MQTT broker with the specified topics. + */ + public static ConnectionConfiguration createWithMultipleTopics( + String serverUri, Iterable topics) { + checkArgument(serverUri != null, "serverUri can not be null"); + checkArgument(topics != null, "topics can not be null"); + int topicsSize = Iterables.size(topics); + checkArgument(topicsSize > 0, "topics can not be empty"); + + if (topicsSize == 1) { + return create(serverUri, Iterables.getOnlyElement(topics)); } else { return new AutoValue_MqttIO_ConnectionConfiguration.Builder() .setServerUri(serverUri) - .setTopic(topic) + .setTopicList(Lists.newArrayList(topics)) .build(); } } diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 410c462d4fe4..39c02b0c5a50 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -33,6 +33,7 @@ import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -278,7 +279,7 @@ public void testReadWithMetadata() throws Exception { } @Test(timeout = 60 * 1000) - public void testReadWithTopicArray() throws Exception { + public void testReadWithMultipleTopics() throws Exception { final String topic1 = "some-topic-1"; final String topic2 = "some-topic-2"; final String topic3 = "some-topic-3"; @@ -286,7 +287,8 @@ public void testReadWithTopicArray() throws Exception { final Read mqttReaderWithTopicArray = MqttIO.readWithMetadata() .withConnectionConfiguration( - MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, topic1, topic2)) + MqttIO.ConnectionConfiguration.createWithMultipleTopics( + "tcp://localhost:" + port, Arrays.asList(topic1, topic2))) .withMaxNumRecords(15) .withMaxReadTime(Duration.standardSeconds(5)); @@ -636,15 +638,15 @@ public void testWriteWithPayloadFn() { } @Test - public void testWriteWithMultipleTopic() { + public void testWriteWithMultipleTopics() { final IllegalArgumentException exception = assertThrows( IllegalArgumentException.class, () -> MqttIO.write() .withConnectionConfiguration( - MqttIO.ConnectionConfiguration.create( - "serverUri", "topic1", "topic2", "topic3")) + MqttIO.ConnectionConfiguration.createWithMultipleTopics( + "serverUri", Arrays.asList("topic1", "topic2", "topic3"))) .expand(pipeline.apply(Create.of(new byte[] {})))); assertEquals("can not have multiple topics", exception.getMessage()); @@ -688,9 +690,10 @@ public void testCreateConnectionWithSingleTopic() { } @Test - public void testCreateConnectionWithMultipleTopic() { + public void testCreateConnectionWithMultipleTopics() { final MqttIO.ConnectionConfiguration connectionConfiguration = - MqttIO.ConnectionConfiguration.create("serverUri", "topic1", "topic2", "topic3", "topic4"); + MqttIO.ConnectionConfiguration.createWithMultipleTopics( + "serverUri", Arrays.asList("topic1", "topic2", "topic3", "topic4")); assertNull(connectionConfiguration.getTopic()); assertNotNull(connectionConfiguration.getTopicList());