diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java index 92a2c89f9bc9c..b7805c36b3bf9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java @@ -32,8 +32,14 @@ public interface RawReader { */ static CompletableFuture create(PulsarClient client, String topic, String subscription) { + return create(client, topic, subscription, true); + } + + static CompletableFuture create(PulsarClient client, String topic, String subscription, + boolean createTopicIfDoesNotExist) { CompletableFuture> future = new CompletableFuture<>(); - RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future); + RawReader r = + new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist); return future.thenApply(__ -> r); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index f65232413991f..3d7ad9f58657d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -51,7 +51,8 @@ public class RawReaderImpl implements RawReader { private RawConsumerImpl consumer; public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, - CompletableFuture> consumerFuture) { + CompletableFuture> consumerFuture, + boolean createTopicIfDoesNotExist) { consumerConfiguration = new ConsumerConfigurationData<>(); consumerConfiguration.getTopicNames().add(topic); consumerConfiguration.setSubscriptionName(subscription); @@ -61,8 +62,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription, consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest); consumerConfiguration.setAckReceiptEnabled(true); - consumer = new RawConsumerImpl(client, consumerConfiguration, - consumerFuture); + consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist); } @Override @@ -111,7 +111,7 @@ static class RawConsumerImpl extends ConsumerImpl { final Queue> pendingRawReceives; RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, - CompletableFuture> consumerFuture) { + CompletableFuture> consumerFuture, boolean createTopicIfDoesNotExist) { super(client, conf.getSingleTopic(), conf, @@ -123,7 +123,7 @@ static class RawConsumerImpl extends ConsumerImpl { MessageId.earliest, 0 /* startMessageRollbackDurationInSec */, Schema.BYTES, null, - false + createTopicIfDoesNotExist ); incomingRawMessages = new GrowableArrayBlockingQueue<>(); pendingRawReceives = new ConcurrentLinkedQueue<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java index e93a642c76e4d..983443432ff49 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java @@ -56,7 +56,7 @@ public Compactor(ServiceConfiguration conf, } public CompletableFuture compact(String topic) { - return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION).thenComposeAsync( + return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false).thenComposeAsync( this::compactAndCloseReader, scheduler); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 95d8926c9ff80..d3fcc36a54653 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,15 +37,18 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.RawMessage; import org.apache.pulsar.client.api.RawReader; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; import org.testng.Assert; @@ -496,4 +500,23 @@ public void testReadCancellationOnClose() throws Exception { } } } + + @Test + public void testAutoCreateTopic() throws ExecutionException, InterruptedException, PulsarAdminException { + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); + + RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); + TopicStats stats = admin.topics().getStats(topic); + Assert.assertNotNull(stats); + reader.closeAsync().join(); + + String topic2 = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); + try { + reader = RawReader.create(pulsarClient, topic2, subscription, false).get(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.TopicDoesNotExistException); + } + reader.closeAsync().join(); + } }