Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
mukesh-ctds committed Apr 19, 2024
1 parent 72d6710 commit dde20bf
Showing 1 changed file with 135 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,141 +230,141 @@ public void beforeMethod(Method m) throws Exception {
* (11) Restart Broker-1 and connect producer/consumer on cluster-1
* @throws Exception
*/
@Test(dataProvider = "TopicsubscriptionTypes")
public void testClusterMigration(boolean persistent, SubscriptionType subType) throws Exception {
log.info("--- Starting ReplicatorTest::testClusterMigration ---");
final String topicName = BrokerTestUtil
.newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic");

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-1 producer/consumer
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s1").subscribe();
AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
assertFalse(topic1.getProducers().isEmpty());
assertFalse(topic1.getSubscriptions().isEmpty());

// build backlog
consumer1.close();
int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}

@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-2 producer/consumer
Producer<byte[]> producer2 = client2.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic2.getProducers().isEmpty());

ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);

retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);


topic1.checkClusterMigration().get();

log.info("before sending message");
sleep(1000);
producer1.sendAsync("test1".getBytes());

// producer is disconnected from cluster-1
retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
log.info("before asserting");
assertTrue(topic1.getProducers().isEmpty());

// create 3rd producer on cluster-1 which should be redirected to cluster-2
Producer<byte[]> producer3 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();

// producer is connected with cluster-2
retryStrategically((test) -> topic2.getProducers().size() == 3, 10, 500);
assertTrue(topic2.getProducers().size() == 3);

// try to consume backlog messages from cluster-1
consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
if (persistent) {
for (int i = 0; i < n; i++) {
Message<byte[]> msg = consumer1.receive();
assertEquals(msg.getData(), "test1".getBytes());
consumer1.acknowledge(msg);
}
}
// after consuming all messages, consumer should have disconnected
// from cluster-1 and reconnect with cluster-2
retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
assertFalse(topic2.getSubscriptions().isEmpty());

// not also create a new consumer which should also reconnect to cluster-2
Consumer<byte[]> consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s2").subscribe();
retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500);
assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());

// publish messages to cluster-2 and consume them
for (int i = 0; i < n; i++) {
producer1.send("test2".getBytes());
producer2.send("test2".getBytes());
producer3.send("test2".getBytes());
}
log.info("Successfully published messages by migrated producers");
for (int i = 0; i < n * 3; i++) {
assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());

}

// create non-migrated topic which should connect to cluster-1
String diffTopic = BrokerTestUtil
.newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic");
Consumer<byte[]> consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType)
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false)
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topicDiff = (AbstractTopic) pulsar1.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
assertNotNull(topicDiff);
for (int i = 0; i < n; i++) {
producerDiff.send("diff".getBytes());
assertEquals(consumerDiff.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes());
}

// restart broker-1
broker1.restart();
Producer<byte[]> producer4 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte[]> consumer3 = client1.newConsumer().topic(topicName).subscriptionType(subType)
.subscriptionName("s3").subscribe();
retryStrategically((test) -> topic2.getProducers().size() == 4, 10, 500);
assertTrue(topic2.getProducers().size() == 4);
retryStrategically((test) -> topic2.getSubscription("s3") != null, 10, 500);
assertFalse(topic2.getSubscription("s3").getConsumers().isEmpty());
for (int i = 0; i < n; i++) {
producer4.send("test3".getBytes());
assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
}

log.info("Successfully consumed messages by migrated consumers");
}
// @Test(dataProvider = "TopicsubscriptionTypes")
// public void testClusterMigration(boolean persistent, SubscriptionType subType) throws Exception {
// log.info("--- Starting ReplicatorTest::testClusterMigration ---");
// final String topicName = BrokerTestUtil
// .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic");
//
// @Cleanup
// PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
// .build();
// // cluster-1 producer/consumer
// Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
// .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionType(subType)
// .subscriptionName("s1").subscribe();
// AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get();
// retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500);
// retryStrategically((test) -> !topic1.getSubscriptions().isEmpty(), 5, 500);
// assertFalse(topic1.getProducers().isEmpty());
// assertFalse(topic1.getSubscriptions().isEmpty());
//
// // build backlog
// consumer1.close();
// int n = 5;
// for (int i = 0; i < n; i++) {
// producer1.send("test1".getBytes());
// }
//
// @Cleanup
// PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()).statsInterval(0, TimeUnit.SECONDS)
// .build();
// // cluster-2 producer/consumer
// Producer<byte[]> producer2 = client2.newProducer().topic(topicName).enableBatching(false)
// .producerName("cluster2-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
// assertFalse(topic2.getProducers().isEmpty());
//
// ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
// admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
//
// retryStrategically((test) -> {
// try {
// topic1.checkClusterMigration().get();
// return true;
// } catch (Exception e) {
// // ok
// }
// return false;
// }, 10, 500);
//
//
// topic1.checkClusterMigration().get();
//
// log.info("before sending message");
// sleep(1000);
// producer1.sendAsync("test1".getBytes());
//
// // producer is disconnected from cluster-1
// retryStrategically((test) -> topic1.getProducers().isEmpty(), 10, 500);
// log.info("before asserting");
// assertTrue(topic1.getProducers().isEmpty());
//
// // create 3rd producer on cluster-1 which should be redirected to cluster-2
// Producer<byte[]> producer3 = client1.newProducer().topic(topicName).enableBatching(false)
// .producerName("cluster1-2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
//
// // producer is connected with cluster-2
// retryStrategically((test) -> topic2.getProducers().size() == 3, 10, 500);
// assertTrue(topic2.getProducers().size() == 3);
//
// // try to consume backlog messages from cluster-1
// consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
// if (persistent) {
// for (int i = 0; i < n; i++) {
// Message<byte[]> msg = consumer1.receive();
// assertEquals(msg.getData(), "test1".getBytes());
// consumer1.acknowledge(msg);
// }
// }
// // after consuming all messages, consumer should have disconnected
// // from cluster-1 and reconnect with cluster-2
// retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500);
// assertFalse(topic2.getSubscriptions().isEmpty());
//
// // not also create a new consumer which should also reconnect to cluster-2
// Consumer<byte[]> consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType)
// .subscriptionName("s2").subscribe();
// retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500);
// assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty());
//
// // publish messages to cluster-2 and consume them
// for (int i = 0; i < n; i++) {
// producer1.send("test2".getBytes());
// producer2.send("test2".getBytes());
// producer3.send("test2".getBytes());
// }
// log.info("Successfully published messages by migrated producers");
// for (int i = 0; i < n * 3; i++) {
// assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
// assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test2".getBytes());
//
// }
//
// // create non-migrated topic which should connect to cluster-1
// String diffTopic = BrokerTestUtil
// .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic");
// Consumer<byte[]> consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType)
// .subscriptionName("s1-d").subscribe();
// Producer<byte[]> producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false)
// .producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// AbstractTopic topicDiff = (AbstractTopic) pulsar1.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
// assertNotNull(topicDiff);
// for (int i = 0; i < n; i++) {
// producerDiff.send("diff".getBytes());
// assertEquals(consumerDiff.receive(2, TimeUnit.SECONDS).getData(), "diff".getBytes());
// }
//
// // restart broker-1
// broker1.restart();
// Producer<byte[]> producer4 = client1.newProducer().topic(topicName).enableBatching(false)
// .producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// Consumer<byte[]> consumer3 = client1.newConsumer().topic(topicName).subscriptionType(subType)
// .subscriptionName("s3").subscribe();
// retryStrategically((test) -> topic2.getProducers().size() == 4, 10, 500);
// assertTrue(topic2.getProducers().size() == 4);
// retryStrategically((test) -> topic2.getSubscription("s3") != null, 10, 500);
// assertFalse(topic2.getSubscription("s3").getConsumers().isEmpty());
// for (int i = 0; i < n; i++) {
// producer4.send("test3".getBytes());
// assertEquals(consumer1.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
// assertEquals(consumer2.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
// assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes());
// }
//
// log.info("Successfully consumed messages by migrated consumers");
// }

@Test(dataProvider = "TopicsubscriptionTypes")
public void testClusterMigrationWithReplicationBacklog(boolean persistent, SubscriptionType subType) throws Exception {
Expand Down

0 comments on commit dde20bf

Please sign in to comment.