From a0930b4b00b7b86f57c34fd80f5a642986559596 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Tue, 7 May 2024 10:35:23 -0700 Subject: [PATCH] [improve] Retry re-validating ResourceLock with backoff after errors (#22617) (cherry picked from commit b0542b3312ef915e7467d9550d19a3e7ca6aca99) --- .../broker/service/AbstractReplicator.java | 2 +- .../PulsarMetadataEventSynchronizer.java | 2 +- .../broker/service/TopicPoliciesService.java | 4 +- ...PersistentDispatcherMultipleConsumers.java | 2 +- ...sistentDispatcherSingleActiveConsumer.java | 3 +- .../persistent/PersistentReplicator.java | 2 +- .../pendingack/impl/PendingAckHandleImpl.java | 2 +- .../common/naming/NamespaceBundleFactory.java | 2 +- ...temTopicBasedTopicPoliciesServiceTest.java | 4 +- .../client/impl/ConnectionHandlerTest.java | 2 + .../pulsar/client/impl/RetryUtilTest.java | 2 + .../client/impl/BinaryProtoLookupService.java | 2 + .../pulsar/client/impl/ConnectionHandler.java | 1 + .../pulsar/client/impl/ConsumerImpl.java | 2 + .../impl/PatternMultiTopicsConsumerImpl.java | 2 + .../pulsar/client/impl/ProducerImpl.java | 1 + .../pulsar/client/impl/PulsarClientImpl.java | 2 + .../pulsar/client/impl/TopicListWatcher.java | 1 + .../impl/TransactionMetaStoreHandler.java | 2 + .../apache/pulsar/client/util/RetryUtil.java | 2 +- .../pulsar/client/impl/ConsumerImplTest.java | 1 + .../apache/pulsar/common/util}/Backoff.java | 2 +- .../pulsar/common/util}/BackoffBuilder.java | 5 ++- .../pulsar/common/util}/BackoffTest.java | 2 +- .../coordination/impl/LockManagerImpl.java | 10 ++--- .../coordination/impl/ResourceLockImpl.java | 37 +++++++++++++++++-- .../pulsar/metadata/LockManagerTest.java | 31 ++++++++++++++++ 27 files changed, 106 insertions(+), 24 deletions(-) rename {pulsar-client/src/main/java/org/apache/pulsar/client/impl => pulsar-common/src/main/java/org/apache/pulsar/common/util}/Backoff.java (99%) rename {pulsar-client/src/main/java/org/apache/pulsar/client/impl => pulsar-common/src/main/java/org/apache/pulsar/common/util}/BackoffBuilder.java (91%) rename {pulsar-client/src/test/java/org/apache/pulsar/client/impl => pulsar-common/src/test/java/org/apache/pulsar/common/util}/BackoffTest.java (99%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 394fad21ae6dc..869a4bc81d310 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -36,10 +36,10 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.StringInterner; import org.slf4j.Logger; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index 8727d1ce345f9..8b2ebf200537e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -37,8 +37,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataEvent; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index aa3a6aaeff29f..2a222e28e2aea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -24,13 +24,13 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; -import org.apache.pulsar.client.impl.Backoff; -import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.jetbrains.annotations.NotNull; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 9d0dba798ad88..2f99d57aa4532 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -68,11 +68,11 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.stats.TopicMetricBean; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 530230eeb5c12..c306467e986ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -51,8 +51,9 @@ import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.impl.Backoff; + import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.compaction.CompactedTopicUtils; import org.apache.pulsar.compaction.Compactor; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index b9f6c0b4fe1b0..2aa8e9b748c32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -56,7 +56,6 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; @@ -65,6 +64,7 @@ import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 5ed271c6fd414..9d07af4d26c44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -59,11 +59,11 @@ import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; import org.apache.pulsar.common.stats.PositionInPendingAckStats; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RecoverTimeRecord; import org.apache.pulsar.common.util.collections.BitSetRecyclable; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index c136ed42f8119..2b285cbb0e2ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -51,10 +51,10 @@ import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.resources.LocalPoliciesResources; import org.apache.pulsar.broker.resources.PulsarResources; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.apache.pulsar.stats.CacheMetricsCollector; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 16eda4e86c015..4166513496db0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -46,8 +46,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.Backoff; -import org.apache.pulsar.client.impl.BackoffBuilder; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java index f29d62db5f480..add0a5f3eefe4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java @@ -32,6 +32,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java index 604c468b1de45..58d1709cd1a01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.client.impl; import org.apache.pulsar.client.util.RetryUtil; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.Test; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 8ceb8e44975c8..9634c60f09a27 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -45,6 +45,8 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 77a85c255a8f8..6403d48d7be0c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.HandlerState.State; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Backoff; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 1e6dc401f93b7..bcee5c38f9a31 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -117,6 +117,8 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.CompletableFutureCancellationHandler; import org.apache.pulsar.common.util.ExceptionHandler; import org.apache.pulsar.common.util.FutureUtil; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 4d179f7d914c2..ec7ff7930c0ac 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -43,6 +43,8 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index a41de6d10512d..426ac5009b59e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -91,6 +91,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RelativeTimeUtil; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index fdabb5fa8cfa5..b92e039e5facd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -84,6 +84,8 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java index 86adf69f06e0f..4e635e0d2e8d2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java @@ -31,6 +31,7 @@ import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.BackoffBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 0b5174a015118..2a43ca20beb38 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -46,6 +46,8 @@ import org.apache.pulsar.common.api.proto.Subscription; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java index 93501d7b6c18b..912cb7d7c5832 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java @@ -22,7 +22,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.common.util.Backoff; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index 9995246c175e1..0c47d17098eb9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -49,6 +49,7 @@ import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.ScheduledExecutorProvider; +import org.apache.pulsar.common.util.Backoff; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java similarity index 99% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java index daaf349940035..4eab85f3c41be 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl; +package org.apache.pulsar.common.util; import com.google.common.annotations.VisibleForTesting; import java.time.Clock; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java similarity index 91% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java index 9913393fa9aa9..69b390300815b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BackoffBuilder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl; +package org.apache.pulsar.common.util; import java.time.Clock; import java.util.concurrent.TimeUnit; @@ -32,8 +32,11 @@ public class BackoffBuilder { public BackoffBuilder() { this.initial = 0; + this.unitInitial = TimeUnit.MILLISECONDS; this.max = 0; + this.unitMax = TimeUnit.MILLISECONDS; this.mandatoryStop = 0; + this.unitMandatoryStop = TimeUnit.MILLISECONDS; this.clock = Clock.systemDefaultZone(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java similarity index 99% rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java index 7f13acb769492..b3786236a70ef 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl; +package org.apache.pulsar.common.util; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java index 4da6b7998a0c4..b6b5c57ccea39 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.java @@ -27,7 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.util.FutureUtil; @@ -52,7 +52,7 @@ class LockManagerImpl implements LockManager { private final MetadataCache cache; private final MetadataSerde serde; private final FutureUtil.Sequencer sequencer; - private final ExecutorService executor; + private final ScheduledExecutorService executor; private enum State { Ready, Closed @@ -60,13 +60,13 @@ private enum State { private State state = State.Ready; - LockManagerImpl(MetadataStoreExtended store, Class clazz, ExecutorService executor) { + LockManagerImpl(MetadataStoreExtended store, Class clazz, ScheduledExecutorService executor) { this(store, new JSONMetadataSerdeSimpleType<>( TypeFactory.defaultInstance().constructSimpleType(clazz, null)), executor); } - LockManagerImpl(MetadataStoreExtended store, MetadataSerde serde, ExecutorService executor) { + LockManagerImpl(MetadataStoreExtended store, MetadataSerde serde, ScheduledExecutorService executor) { this.store = store; this.cache = store.getMetadataCache(serde); this.serde = serde; @@ -83,7 +83,7 @@ public CompletableFuture> readLock(String path) { @Override public CompletableFuture> acquireLock(String path, T value) { - ResourceLockImpl lock = new ResourceLockImpl<>(store, serde, path); + ResourceLockImpl lock = new ResourceLockImpl<>(store, serde, path, executor); CompletableFuture> result = new CompletableFuture<>(); lock.acquire(value).thenRun(() -> { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java index 93c994b2436b9..692f224594cae 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java @@ -21,8 +21,13 @@ import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.pulsar.common.util.Backoff; +import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataSerde; @@ -44,7 +49,10 @@ public class ResourceLockImpl implements ResourceLock { private long version; private final CompletableFuture expiredFuture; private boolean revalidateAfterReconnection = false; + private final Backoff backoff; private final FutureUtil.Sequencer sequencer; + private final ScheduledExecutorService executor; + private ScheduledFuture revalidateTask; private enum State { Init, @@ -55,7 +63,8 @@ private enum State { private State state; - public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, String path) { + ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, String path, + ScheduledExecutorService executor) { this.store = store; this.serde = serde; this.path = path; @@ -63,6 +72,11 @@ public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde serde, Str this.expiredFuture = new CompletableFuture<>(); this.sequencer = FutureUtil.Sequencer.create(); this.state = State.Init; + this.executor = executor; + this.backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMax(60, TimeUnit.SECONDS) + .create(); } @Override @@ -93,6 +107,10 @@ public synchronized CompletableFuture release() { } state = State.Releasing; + if (revalidateTask != null) { + revalidateTask.cancel(true); + } + CompletableFuture result = new CompletableFuture<>(); store.delete(path, Optional.of(version)) @@ -210,8 +228,15 @@ synchronized CompletableFuture revalidateIfNeededAfterReconnection() { * This method is thread-safe and it will perform multiple re-validation operations in turn. */ synchronized CompletableFuture silentRevalidateOnce() { + if (state != State.Valid) { + return CompletableFuture.completedFuture(null); + } + return sequencer.sequential(() -> revalidate(value)) - .thenRun(() -> log.info("Successfully revalidated the lock on {}", path)) + .thenRun(() -> { + log.info("Successfully revalidated the lock on {}", path); + backoff.reset(); + }) .exceptionally(ex -> { synchronized (ResourceLockImpl.this) { Throwable realCause = FutureUtil.unwrapCompletionException(ex); @@ -225,8 +250,12 @@ synchronized CompletableFuture silentRevalidateOnce() { // Continue assuming we hold the lock, until we can revalidate it, either // on Reconnected or SessionReestablished events. revalidateAfterReconnection = true; - log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path, - realCause.getMessage()); + + long delayMillis = backoff.next(); + log.warn("Failed to revalidate the lock at {}: {} - Retrying in {} seconds", path, + realCause.getMessage(), delayMillis / 1000.0); + revalidateTask = + executor.schedule(this::silentRevalidateOnce, delayMillis, TimeUnit.MILLISECONDS); } } return null; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java index 05e6d4a3845e2..ebd60bad5507d 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java @@ -35,6 +35,7 @@ import java.util.function.Supplier; import lombok.Cleanup; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException; @@ -352,4 +353,34 @@ public void testCleanUpStateWhenRevalidationGotLockBusy(String provider, Supplie } }); } + + @Test(dataProvider = "impl") + public void lockDeletedAndReacquired(String provider, Supplier urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + MetadataCache cache = store.getMetadataCache(String.class); + + @Cleanup + CoordinationService coordinationService = new CoordinationServiceImpl(store); + + @Cleanup + LockManager lockManager = coordinationService.getLockManager(String.class); + + String key = newKey(); + ResourceLock lock = lockManager.acquireLock(key, "lock").join(); + assertEquals(lock.getValue(), "lock"); + var res = cache.get(key).join(); + assertTrue(res.isPresent()); + assertEquals(res.get(), "lock"); + + store.delete(key, Optional.empty()).join(); + + Awaitility.await().untilAsserted(() -> { + Optional val = store.get(key).join(); + assertTrue(val.isPresent()); + assertFalse(lock.getLockExpiredFuture().isDone()); + }); + } }