From eefc51712c97515e5913f6e5fceb4e98f5cd3b18 Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Tue, 19 Sep 2023 11:47:44 +0800 Subject: [PATCH] [fix] [auto-recovery] Fix pulsar ledger auditor dead lock problem. (#21181) --- .../PulsarLedgerAuditorManager.java | 13 ++ .../replication/AuditorPeriodicCheckTest.java | 8 +- .../replication/AutoRecoveryMainTest.java | 201 ++++++++++++++++++ 3 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java index bc35380fec19c..d664ecdcd2016 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerAuditorManager.java @@ -27,6 +27,7 @@ import org.apache.pulsar.metadata.api.coordination.LeaderElection; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; @Slf4j @@ -38,6 +39,7 @@ class PulsarLedgerAuditorManager implements LedgerAuditorManager { private final LeaderElection leaderElection; private LeaderElectionState leaderElectionState; private String bookieId; + private boolean sessionExpired = false; PulsarLedgerAuditorManager(MetadataStoreExtended store, String ledgersRoot) { this.coordinationService = new CoordinationServiceImpl(store); @@ -47,6 +49,14 @@ class PulsarLedgerAuditorManager implements LedgerAuditorManager { this.leaderElection = coordinationService.getLeaderElection(String.class, electionPath, this::handleStateChanges); this.leaderElectionState = LeaderElectionState.NoLeader; + store.registerSessionListener(event -> { + if (SessionEvent.SessionLost == event) { + synchronized (this) { + sessionExpired = true; + notifyAll(); + } + } + }); } private void handleStateChanges(LeaderElectionState state) { @@ -71,6 +81,9 @@ public void tryToBecomeAuditor(String bookieId, Consumer listener) while (true) { try { synchronized (this) { + if (sessionExpired) { + throw new IllegalStateException("Zookeeper session expired, give up to become auditor."); + } if (leaderElectionState == LeaderElectionState.Leading) { return; } else { diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java index c761d46c62266..901361dd3a277 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java @@ -44,8 +44,8 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterTest; -import org.testng.annotations.BeforeTest; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; /** @@ -68,7 +68,7 @@ public AuditorPeriodicCheckTest() throws Exception { Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver"); } - @BeforeTest + @BeforeMethod @Override public void setUp() throws Exception { super.setUp(); @@ -99,7 +99,7 @@ public void setUp() throws Exception { driver.initialize(serverConfiguration, NullStatsLogger.INSTANCE); } - @AfterTest + @AfterMethod @Override public void tearDown() throws Exception { if (null != driver) { diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java new file mode 100644 index 0000000000000..d12ee177ece69 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.replication; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.AssertJUnit.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.util.TestUtils; +import org.apache.pulsar.metadata.bookkeeper.PulsarLedgerManagerFactory; +import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Test the AuditorPeer. + */ +public class AutoRecoveryMainTest extends BookKeeperClusterTestCase { + + public AutoRecoveryMainTest() throws Exception { + super(3); + Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver"); + Class.forName("org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver"); + } + + @BeforeMethod + @Override + public void setUp() throws Exception { + super.setUp(); + } + + /** + * Test that, if an autorecovery looses its ZK connection/session it will + * shutdown. + */ + @Test + public void testAutoRecoverySessionLoss() throws Exception { + confByIndex(0).setMetadataServiceUri( + zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", "")); + confByIndex(1).setMetadataServiceUri( + zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", "")); + confByIndex(2).setMetadataServiceUri( + zkUtil.getMetadataServiceUri().replaceAll("zk://", "metadata-store:").replaceAll("/ledgers", "")); + /* + * initialize three AutoRecovery instances. + */ + AutoRecoveryMain main1 = new AutoRecoveryMain(confByIndex(0)); + AutoRecoveryMain main2 = new AutoRecoveryMain(confByIndex(1)); + AutoRecoveryMain main3 = new AutoRecoveryMain(confByIndex(2)); + + /* + * start main1, make sure all the components are started and main1 is + * the current Auditor + */ + PulsarMetadataClientDriver pulsarMetadataClientDriver1 = startAutoRecoveryMain(main1); + ZooKeeper zk1 = getZk(pulsarMetadataClientDriver1); + + // Wait until auditor gets elected + for (int i = 0; i < 10; i++) { + try { + if (main1.auditorElector.getCurrentAuditor() != null) { + break; + } else { + Thread.sleep(1000); + } + } catch (IOException e) { + Thread.sleep(1000); + } + } + BookieId currentAuditor = main1.auditorElector.getCurrentAuditor(); + assertNotNull(currentAuditor); + Auditor auditor1 = main1.auditorElector.getAuditor(); + assertEquals("Current Auditor should be AR1", currentAuditor, BookieImpl.getBookieId(confByIndex(0))); + Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertNotNull(auditor1); + assertTrue("Auditor of AR1 should be running", auditor1.isRunning()); + }); + + + /* + * start main2 and main3 + */ + PulsarMetadataClientDriver pulsarMetadataClientDriver2 = startAutoRecoveryMain(main2); + ZooKeeper zk2 = getZk(pulsarMetadataClientDriver2); + + PulsarMetadataClientDriver pulsarMetadataClientDriver3 = startAutoRecoveryMain(main3); + ZooKeeper zk3 = getZk(pulsarMetadataClientDriver3); + + + /* + * make sure AR1 is still the current Auditor and AR2's and AR3's + * auditors are not running. + */ + assertEquals("Current Auditor should still be AR1", currentAuditor, BookieImpl.getBookieId(confByIndex(0))); + Awaitility.await().untilAsserted(() -> { + assertTrue("AR2's Auditor should not be running", (main2.auditorElector.getAuditor() == null + || !main2.auditorElector.getAuditor().isRunning())); + assertTrue("AR3's Auditor should not be running", (main3.auditorElector.getAuditor() == null + || !main3.auditorElector.getAuditor().isRunning())); + }); + + + /* + * expire zk2 and zk1 sessions. + */ + zkUtil.expireSession(zk2); + zkUtil.expireSession(zk1); + + /* + * wait for some time for all the components of AR1 and AR2 are + * shutdown. + */ + for (int i = 0; i < 10; i++) { + if (!main1.auditorElector.isRunning() && !main1.replicationWorker.isRunning() + && !main1.isAutoRecoveryRunning() && !main2.auditorElector.isRunning() + && !main2.replicationWorker.isRunning() && !main2.isAutoRecoveryRunning()) { + break; + } + Thread.sleep(1000); + } + + /* + * the AR3 should be current auditor. + */ + currentAuditor = main3.auditorElector.getCurrentAuditor(); + assertEquals("Current Auditor should be AR3", currentAuditor, BookieImpl.getBookieId(confByIndex(2))); + Awaitility.await().untilAsserted(() -> { + assertNotNull(main3.auditorElector.getAuditor()); + assertTrue("Auditor of AR3 should be running", main3.auditorElector.getAuditor().isRunning()); + }); + + Awaitility.waitAtMost(100, TimeUnit.SECONDS).untilAsserted(() -> { + /* + * since AR3 is current auditor, AR1's auditor should not be running + * anymore. + */ + assertFalse("AR1's auditor should not be running", auditor1.isRunning()); + + /* + * components of AR2 and AR3 should not be running since zk1 and zk2 + * sessions are expired. + */ + assertFalse("Elector1 should have shutdown", main1.auditorElector.isRunning()); + assertFalse("RW1 should have shutdown", main1.replicationWorker.isRunning()); + assertFalse("AR1 should have shutdown", main1.isAutoRecoveryRunning()); + assertFalse("Elector2 should have shutdown", main2.auditorElector.isRunning()); + assertFalse("RW2 should have shutdown", main2.replicationWorker.isRunning()); + assertFalse("AR2 should have shutdown", main2.isAutoRecoveryRunning()); + }); + + } + + /* + * start autoRecoveryMain and make sure all its components are running and + * myVote node is existing + */ + PulsarMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain) throws Exception { + autoRecoveryMain.start(); + PulsarMetadataClientDriver pulsarMetadataClientDriver = (PulsarMetadataClientDriver) autoRecoveryMain.bkc + .getMetadataClientDriver(); + TestUtils.assertEventuallyTrue("autoRecoveryMain components should be running", + () -> autoRecoveryMain.auditorElector.isRunning() + && autoRecoveryMain.replicationWorker.isRunning() && autoRecoveryMain.isAutoRecoveryRunning()); + return pulsarMetadataClientDriver; + } + + private ZooKeeper getZk(PulsarMetadataClientDriver pulsarMetadataClientDriver) throws Exception { + PulsarLedgerManagerFactory pulsarLedgerManagerFactory = + (PulsarLedgerManagerFactory) pulsarMetadataClientDriver.getLedgerManagerFactory(); + Field field = pulsarLedgerManagerFactory.getClass().getDeclaredField("store"); + field.setAccessible(true); + ZKMetadataStore zkMetadataStore = (ZKMetadataStore) field.get(pulsarLedgerManagerFactory); + return zkMetadataStore.getZkClient(); + } +}