Skip to content

Commit

Permalink
add createWithMultipleTopics method
Browse files Browse the repository at this point in the history
  • Loading branch information
twosom committed Nov 5, 2024
1 parent 2885acf commit d5fce0d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.FutureConnection;
Expand All @@ -71,16 +73,28 @@
* <p>To configure a MQTT source, you have to provide a MQTT connection configuration including
* {@code ClientId}, a {@code ServerURI}, a {@code Topic} pattern, and optionally {@code username}
* and {@code password} to connect to the MQTT broker. The following example illustrates how to
* configure the source with multiple topics:
* configure the source with a single topic:
*
* <pre>{@code
* pipeline.apply(
* MqttIO.read()
* .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
* "tcp://host:11883",
* "topic1",
* "topic2",
* "topic3"))
* "my_topic"))
* );
* }</pre>
*
* <p>For subscribing to multiple topics, use {@link
* ConnectionConfiguration#createWithMultipleTopics(String, Iterable)}. This allows subscribing to
* multiple topics with a single configuration:
*
* <pre>{@code
* pipeline.apply(
* MqttIO.read()
* .withConnectionConfiguration(MqttIO.ConnectionConfiguration.createWithMultipleTopics(
* "tcp://host:11883",
* Arrays.asList("topic1", "topic2", "topic3")))
* );
* }</pre>
*
* <h3>Reading with Metadata from a MQTT broker</h3>
Expand All @@ -91,17 +105,16 @@
* and payload. This allows you to implement business logic that can differ depending on the topic
* from which the message was received.
*
* <p>You can also subscribe to multiple topics with {@code readWithMetadata}, as shown in the
* following example:
* <p>Similar to the {@code read} method, you can also subscribe to multiple topics with {@code
* readWithMetadata}:
*
* <pre>{@code
* PCollection<MqttRecord> records = pipeline.apply(
* MqttIO.readWithMetadata()
* .withConnectionConfiguration(MqttIO.ConnectionConfiguration.create(
* .withConnectionConfiguration(MqttIO.ConnectionConfiguration.createWithMultipleTopics(
* "tcp://host:11883",
* "topic1",
* "topic2",
* "topic3"))
* Arrays.asList("topic1", "topic2", "topic3")))
* );
* }</pre>
*
* <p>By using the topic information, you can apply different processing logic depending on the
Expand Down Expand Up @@ -252,23 +265,36 @@ abstract static class Builder {
* @param topic The MQTT getTopic pattern.
* @return A connection configuration to the MQTT broker.
*/
public static ConnectionConfiguration create(
String serverUri, String topic, @Nullable String... additionalTopics) {
public static ConnectionConfiguration create(String serverUri, String topic) {
checkArgument(serverUri != null, "serverUri can not be null");
checkArgument(topic != null, "topic can not be null");
return new AutoValue_MqttIO_ConnectionConfiguration.Builder()
.setServerUri(serverUri)
.setTopic(topic)
.build();
}

if (additionalTopics != null && additionalTopics.length > 0) {
List<String> topics = new ArrayList<>(additionalTopics.length + 1);
Collections.addAll(topics, additionalTopics);
Collections.addAll(topics, topic);
return new AutoValue_MqttIO_ConnectionConfiguration.Builder()
.setServerUri(serverUri)
.setTopicList(topics)
.build();
/**
* Creates a connection configuration to the MQTT broker for multiple topics. This method allows
* subscribing to multiple topics simultaneously.
*
* @param serverUri The MQTT broker URI.
* @param topics An iterable collection of MQTT topic patterns to subscribe to.
* @return A connection configuration to the MQTT broker with the specified topics.
*/
public static ConnectionConfiguration createWithMultipleTopics(
String serverUri, Iterable<String> topics) {
checkArgument(serverUri != null, "serverUri can not be null");
checkArgument(topics != null, "topics can not be null");
int topicsSize = Iterables.size(topics);
checkArgument(topicsSize > 0, "topics can not be empty");

if (topicsSize == 1) {
return create(serverUri, Iterables.getOnlyElement(topics));
} else {
return new AutoValue_MqttIO_ConnectionConfiguration.Builder()
.setServerUri(serverUri)
.setTopic(topic)
.setTopicList(Lists.newArrayList(topics))
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -278,15 +279,16 @@ public void testReadWithMetadata() throws Exception {
}

@Test(timeout = 60 * 1000)
public void testReadWithTopicArray() throws Exception {
public void testReadWithMultipleTopics() throws Exception {
final String topic1 = "some-topic-1";
final String topic2 = "some-topic-2";
final String topic3 = "some-topic-3";

final Read<MqttRecord> mqttReaderWithTopicArray =
MqttIO.readWithMetadata()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create("tcp://localhost:" + port, topic1, topic2))
MqttIO.ConnectionConfiguration.createWithMultipleTopics(
"tcp://localhost:" + port, Arrays.asList(topic1, topic2)))
.withMaxNumRecords(15)
.withMaxReadTime(Duration.standardSeconds(5));

Expand Down Expand Up @@ -636,15 +638,15 @@ public void testWriteWithPayloadFn() {
}

@Test
public void testWriteWithMultipleTopic() {
public void testWriteWithMultipleTopics() {
final IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() ->
MqttIO.write()
.withConnectionConfiguration(
MqttIO.ConnectionConfiguration.create(
"serverUri", "topic1", "topic2", "topic3"))
MqttIO.ConnectionConfiguration.createWithMultipleTopics(
"serverUri", Arrays.asList("topic1", "topic2", "topic3")))
.expand(pipeline.apply(Create.of(new byte[] {}))));

assertEquals("can not have multiple topics", exception.getMessage());
Expand Down Expand Up @@ -688,9 +690,10 @@ public void testCreateConnectionWithSingleTopic() {
}

@Test
public void testCreateConnectionWithMultipleTopic() {
public void testCreateConnectionWithMultipleTopics() {
final MqttIO.ConnectionConfiguration connectionConfiguration =
MqttIO.ConnectionConfiguration.create("serverUri", "topic1", "topic2", "topic3", "topic4");
MqttIO.ConnectionConfiguration.createWithMultipleTopics(
"serverUri", Arrays.asList("topic1", "topic2", "topic3", "topic4"));

assertNull(connectionConfiguration.getTopic());
assertNotNull(connectionConfiguration.getTopicList());
Expand Down

0 comments on commit d5fce0d

Please sign in to comment.