Skip to content

Commit

Permalink
change test code
Browse files Browse the repository at this point in the history
  • Loading branch information
twosom committed Nov 10, 2024
1 parent 259241c commit 7e7d655
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.beam.sdk.coders.ByteArrayCoder;
Expand Down Expand Up @@ -325,6 +326,8 @@ public void testReadWithMultipleTopics() throws Exception {
client.setHost("tcp://localhost:" + port);
final BlockingConnection publishConnection = client.blockingConnection();
publishConnection.connect();

final CountDownLatch publishLatch = new CountDownLatch(3);
Thread publisherThread =
new Thread(
() -> {
Expand All @@ -337,20 +340,23 @@ public void testReadWithMultipleTopics() throws Exception {
QoS.EXACTLY_ONCE,
false);
}
publishLatch.countDown();
for (int i = 5; i < 10; i++) {
publishConnection.publish(
topic2,
("This is test " + i).getBytes(StandardCharsets.UTF_8),
QoS.EXACTLY_ONCE,
false);
}
publishLatch.countDown();
for (int i = 10; i < 15; i++) {
publishConnection.publish(
topic3,
("This is test " + i).getBytes(StandardCharsets.UTF_8),
QoS.EXACTLY_ONCE,
false);
}
publishLatch.countDown();

} catch (Exception e) {
// nothing to do
Expand All @@ -359,8 +365,8 @@ public void testReadWithMultipleTopics() throws Exception {

publisherThread.start();
pipeline.run();

publishConnection.disconnect();
publishLatch.await();
publisherThread.join();
}

Expand Down

0 comments on commit 7e7d655

Please sign in to comment.