From cab79d413453c74ade9cd64187370365498728cc Mon Sep 17 00:00:00 2001 From: Natan Silnitsky Date: Tue, 13 Jun 2023 18:22:32 +0300 Subject: [PATCH] make GreyhoundNG RetryConfig serializable (so can be sent over gRpc and persisted) (#35207) * make GreyhoundNG RetryConfig serializable (so can be sent over gRpc and persisted) #pr * fix lp build * full cross-repo * Initial cross-repo check * Updating cross-repo check * Updating cross-repo check * Updating cross-repo check * Updating cross-repo check * CR changes --------- Co-authored-by: wixapiregistry <58037308+wixapiregistry@users.noreply.github.com> GitOrigin-RevId: feaf45b3c77f9074a3445bffdd6f748d3df77056 --- .../dst/greyhound/core/retry/RetryIT.scala | 17 +- .../retry/BlockingRetryRecordHandler.scala | 25 +-- .../core/consumer/retry/RetryAttempt.scala | 43 +++-- .../core/consumer/retry/RetryConfig.scala | 158 ++++++++++++++++-- .../RetryConsumerRecordHandlerTest.scala | 12 +- .../consumer/retry/ZRetryConfigTest.scala | 8 +- .../scala/GreyhoundConsumersBuilder.scala | 10 +- .../greyhound/scala/RetryConfigBuilder.scala | 2 +- 8 files changed, 204 insertions(+), 71 deletions(-) diff --git a/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/RetryIT.scala b/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/RetryIT.scala index ed382807..834be9f0 100644 --- a/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/RetryIT.scala +++ b/core/src/it/scala/com/wixpress/dst/greyhound/core/retry/RetryIT.scala @@ -6,6 +6,7 @@ import com.wixpress.dst.greyhound.core.consumer._ import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics} import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler} import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper.fixedRetryTopic +import com.wixpress.dst.greyhound.core.consumer.retry.RetryConfigForTopic.{finiteBlockingRetryConfigForTopic, nonBlockingRetryConfigForTopic} import com.wixpress.dst.greyhound.core.consumer.retry._ import com.wixpress.dst.greyhound.core.producer.{Encryptor, ProducerRecord} import com.wixpress.dst.greyhound.core.testkit.{eventuallyZ, AwaitableRef, BaseTestWithSharedEnv} @@ -40,8 +41,8 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] { done <- Promise.make[Nothing, ConsumerRecord[String, String]] retryConfig = ZRetryConfig .perTopicRetries { - case `topic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil)) - case `anotherTopic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil)) + case `topic` => nonBlockingRetryConfigForTopic(1.second :: Nil) + case `anotherTopic` => nonBlockingRetryConfigForTopic(1.second :: Nil) } .copy(produceEncryptor = _ => ZIO.succeed(dummyEncryptor)) @@ -76,7 +77,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] { retryConfig = ZRetryConfig .finiteBlockingRetry(100.millis, 100.millis) - .withCustomRetriesFor { case `topic2` => RetryConfigForTopic(() => 300.millis :: Nil, NonBlockingBackoffPolicy.empty) } + .withCustomRetriesFor { case `topic2` => finiteBlockingRetryConfigForTopic(300.millis :: Nil) } retryHandler = failingBlockingRecordHandlerWith(consumedValuesRef, Set(topic, topic2)).withDeserializers(StringSerde, StringSerde) _ <- RecordConsumer.make(configFor(kafka, group, retryConfig, topic, topic2), retryHandler).flatMap { _ => producer.produce(ProducerRecord(topic, "bar", Some("foo")), StringSerde, StringSerde) *> Clock.sleep(2.seconds) *> @@ -225,7 +226,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] { invocations <- Ref.make(0) done <- Promise.make[Nothing, ConsumerRecord[String, String]] retryConfig = ZRetryConfig.retryForPattern( - RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(Seq(1.second, 1.second, 1.seconds))) + nonBlockingRetryConfigForTopic(List(1.second, 1.second, 1.seconds)) ) retryHandler = failingRecordHandler(invocations, done).withDeserializers(StringSerde, StringSerde) success <- RecordConsumer @@ -258,7 +259,9 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] { group <- randomGroup originalTopicCallCount <- Ref.make[Int](0) retryTopicCallCount <- Ref.make[Int](0) - retryConfig = ZRetryConfig.blockingFollowedByNonBlockingRetry(List(1.second), NonBlockingBackoffPolicy(List(1.seconds))) + retryConfig = + ZRetryConfig + .blockingFollowedByNonBlockingRetry(FiniteBlockingBackoffPolicy(List(1.second)), NonBlockingBackoffPolicy(List(1.seconds))) retryHandler = failingBlockingNonBlockingRecordHandler(originalTopicCallCount, retryTopicCallCount, topic).withDeserializers( StringSerde, StringSerde @@ -281,9 +284,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] { for { r <- getShared TestResources(kafka, _) = r - retryConfig = ZRetryConfig.retryForPattern( - RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(Seq(1.second, 1.second, 1.seconds))) - ) + retryConfig = ZRetryConfig.retryForPattern(nonBlockingRetryConfigForTopic(List(1.second, 1.second, 1.seconds))) handler = RecordHandler { _: ConsumerRecord[String, String] => ZIO.unit }.withDeserializers(StringSerde, StringSerde) _ <- RecordConsumer .make(configFor(kafka, "group", retryConfig, "topic"), handler) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala index 2f5d59a9..5544e200 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/BlockingRetryRecordHandler.scala @@ -3,7 +3,7 @@ package com.wixpress.dst.greyhound.core.consumer.retry import java.util.concurrent.TimeUnit import com.wixpress.dst.greyhound.core.{Group, TopicPartition} import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler} -import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, IgnoringOnce, Blocking => InternalBlocking} +import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, Blocking => InternalBlocking, IgnoringOnce} import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.{BlockingRetryHandlerInvocationFailed, DoneBlockingBeforeRetry, NoRetryOnNonRetryableFailure} import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report @@ -30,7 +30,11 @@ private[retry] object BlockingRetryRecordHandler { override def handle(record: ConsumerRecord[K, V])(implicit trace: Trace): ZIO[GreyhoundMetrics with R, Nothing, LastHandleResult] = { val topicPartition = TopicPartition(record.topic, record.partition) - def pollBlockingStateWithSuspensions(record: ConsumerRecord[K, V], interval: Duration, start: Long): URIO[GreyhoundMetrics, PollResult] = { + def pollBlockingStateWithSuspensions( + record: ConsumerRecord[K, V], + interval: Duration, + start: Long + ): URIO[GreyhoundMetrics, PollResult] = { for { shouldBlock <- blockingStateResolver.resolve(record) shouldPollAgain <- @@ -73,7 +77,8 @@ private[retry] object BlockingRetryRecordHandler { case error => interval .map { interval => - report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> blockOnErrorFor(record, interval) + report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> + blockOnErrorFor(record, interval) } .getOrElse(ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))) } @@ -94,12 +99,13 @@ private[retry] object BlockingRetryRecordHandler { if (nonBlockingHandler.isHandlingRetryTopicMessage(group, record)) { ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)) } else { - val durationsIncludingForInvocationWithNoErrorHandling = retryConfig.blockingBackoffs(record.topic)().map(Some(_)) :+ None + val durations = retryConfig.blockingBackoffs(record.topic) + val durationsIncludingForInvocationWithNoErrorHandling = durations.map(Some(_)) :+ None for { result <- retryEvery(record, durationsIncludingForInvocationWithNoErrorHandling) { (rec, interval) => - handleAndMaybeBlockOnErrorFor(rec, interval) - } - _ <- maybeBackToStateBlocking + handleAndMaybeBlockOnErrorFor(rec, interval) + } + _ <- maybeBackToStateBlocking } yield result } } @@ -111,7 +117,7 @@ private[retry] object BlockingRetryRecordHandler { ZIO.succeed(as.iterator).flatMap { i => def loop(retryAttempt: Option[RetryAttempt]): ZIO[R, E, LastHandleResult] = if (i.hasNext) { - val nextDelay = i.next + val nextDelay = i.next val recordWithAttempt = retryAttempt.fold(record) { attempt => record.copy(headers = record.headers ++ RetryAttempt.toHeaders(attempt)) } @@ -127,8 +133,7 @@ private[retry] object BlockingRetryRecordHandler { } else ZIO.succeed(result) } - } - else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)) + } else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)) loop(None) } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala index 3fa3cba8..9de958e8 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryAttempt.scala @@ -11,7 +11,8 @@ import java.time.Instant /** * Description of a retry attempt - * @param attempt contains which attempt is it, starting from 0 including blocking and non-blocking attempts + * @param attempt + * contains which attempt is it, starting from 0 including blocking and non-blocking attempts */ case class RetryAttempt( originalTopic: Topic, @@ -33,10 +34,10 @@ object RetryAttempt { private def toChunk(str: String): Chunk[Byte] = Chunk.fromArray(str.getBytes) def toHeaders(attempt: RetryAttempt): Headers = Headers( - RetryHeader.Submitted -> toChunk(attempt.submittedAt.toEpochMilli.toString), - RetryHeader.Backoff -> toChunk(attempt.backoff.toMillis.toString), + RetryHeader.Submitted -> toChunk(attempt.submittedAt.toEpochMilli.toString), + RetryHeader.Backoff -> toChunk(attempt.backoff.toMillis.toString), RetryHeader.OriginalTopic -> toChunk(attempt.originalTopic), - RetryHeader.RetryAttempt -> toChunk(attempt.attempt.toString), + RetryHeader.RetryAttempt -> toChunk(attempt.attempt.toString) ) private case class RetryAttemptHeaders( @@ -49,14 +50,14 @@ object RetryAttempt { private def fromHeaders(headers: Headers): Task[RetryAttemptHeaders] = for { submitted <- headers.get(RetryHeader.Submitted, instantDeserializer) - backoff <- headers.get(RetryHeader.Backoff, durationDeserializer) - topic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde) - attempt <- headers.get(RetryHeader.RetryAttempt, longDeserializer) + backoff <- headers.get(RetryHeader.Backoff, durationDeserializer) + topic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde) + attempt <- headers.get(RetryHeader.RetryAttempt, longDeserializer) } yield RetryAttemptHeaders(topic, attempt.map(_.toInt), submitted, backoff) /** @return None on infinite blocking retries */ def maxBlockingAttempts(topic: Topic, retryConfig: Option[RetryConfig]): Option[Int] = - retryConfig.map(_.blockingBackoffs(topic)()).fold(Option(0)) { + retryConfig.map(_.blockingBackoffs(topic)).fold(Option(0)) { case finite if finite.hasDefiniteSize => Some(finite.size) case _ => None } @@ -68,31 +69,29 @@ object RetryAttempt { } def extract( - headers: Headers, - topic: Topic, - group: Group, - subscription: ConsumerSubscription, - retryConfig: Option[RetryConfig], + headers: Headers, + topic: Topic, + group: Group, + subscription: ConsumerSubscription, + retryConfig: Option[RetryConfig] )(implicit trace: Trace): UIO[Option[RetryAttempt]] = { def maybeNonBlockingAttempt(hs: RetryAttemptHeaders): Option[RetryAttempt] = for { - submitted <- hs.submittedAt - backoff <- hs.backoff + submitted <- hs.submittedAt + backoff <- hs.backoff TopicAttempt(originalTopic, attempt) <- attemptNumberFromTopic(subscription, topic, hs.originalTopic, group) - blockingRetries = maxBlockingAttempts(originalTopic, retryConfig).getOrElse(0) + blockingRetries = maxBlockingAttempts(originalTopic, retryConfig).getOrElse(0) } yield RetryAttempt(originalTopic, blockingRetries + attempt, submitted, backoff) def maybeBlockingAttempt(hs: RetryAttemptHeaders): Option[RetryAttempt] = for { - submitted <- hs.submittedAt - backoff <- hs.backoff + submitted <- hs.submittedAt + backoff <- hs.backoff originalTopic <- hs.originalTopic if originalTopic == topic - attempt <- hs.attempt + attempt <- hs.attempt } yield RetryAttempt(originalTopic, attempt, submitted, backoff) - fromHeaders(headers).map { hs => - maybeNonBlockingAttempt(hs) orElse maybeBlockingAttempt(hs) - } + fromHeaders(headers).map { hs => maybeNonBlockingAttempt(hs) orElse maybeBlockingAttempt(hs) } }.catchAll(_ => ZIO.none) } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConfig.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConfig.scala index 5632f042..96dacd2c 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConfig.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConfig.scala @@ -15,8 +15,24 @@ case class RetryConfig( produceRetryBackoff: Duration = 5.seconds, produceEncryptor: ConsumerRecord[_, _] => UIO[Encryptor] = _ => ZIO.succeed(NoOpEncryptor)(zio.Trace.empty) ) { - def blockingBackoffs(topic: Topic) = - get(topic)(_.blockingBackoffs)(ifEmpty = () => Nil) + def blockingBackoffs(topic: Topic): Seq[ZDuration] = + get(topic) { + case RetryConfigForTopic(FiniteBlockingBackoffPolicy(intervals), _, _) if intervals.nonEmpty => intervals + case RetryConfigForTopic(_, InfiniteFixedBackoff(fixed), _) => Stream.continually(fixed) + case RetryConfigForTopic(_, expMax: InfiniteExponentialBackoffsMaxInterval, _) => + exponentialBackoffs(expMax.initialInterval, expMax.maximalInterval, expMax.backOffMultiplier, infiniteRetryMaxInteval = true) + case RetryConfigForTopic(_, expMult: InfiniteExponentialBackoffsMaxMultiplication, _) => + exponentialBackoffs( + expMult.initialInterval, + expMult.maxMultiplications, + expMult.backOffMultiplier, + infiniteRetryMaxInterval = true + ) + case _ => Nil + }(ifEmpty = Nil) + + def finiteBlockingBackoffs(topic: Topic) = + get(topic)(_.finiteBlockingBackoffs)(ifEmpty = FiniteBlockingBackoffPolicy.empty) def retryType(originalTopic: Topic): RetryType = get(originalTopic)(_.retryType)(ifEmpty = NoRetries) @@ -43,11 +59,46 @@ case class RetryConfig( else ifEmpty ) +} + +trait InfiniteBlockingBackoffPolicy { + def nonEmpty: Boolean +} + +case object EmptyInfiniteBlockingBackoffPolicy extends InfiniteBlockingBackoffPolicy { + override def nonEmpty: Boolean = false +} + +sealed case class InfiniteFixedBackoff(interval: ZDuration) extends InfiniteBlockingBackoffPolicy { + override def nonEmpty: Boolean = true +} + +sealed case class InfiniteExponentialBackoffsMaxInterval( + initialInterval: ZDuration, + maximalInterval: ZDuration, + backOffMultiplier: Float +) extends InfiniteBlockingBackoffPolicy { + override def nonEmpty: Boolean = true +} + +sealed case class InfiniteExponentialBackoffsMaxMultiplication( + initialInterval: ZDuration, + maxMultiplications: Int, + backOffMultiplier: Float +) extends InfiniteBlockingBackoffPolicy { + override def nonEmpty: Boolean = true +} +case class FiniteBlockingBackoffPolicy(intervals: List[ZDuration]) { + def nonEmpty = intervals.nonEmpty +} + +object FiniteBlockingBackoffPolicy { + val empty = FiniteBlockingBackoffPolicy(Nil) } case class NonBlockingBackoffPolicy( - intervals: Seq[ZDuration], + intervals: List[ZDuration], recordMutate: ProducerRecord[Chunk[Byte], Chunk[Byte]] => ProducerRecord[Chunk[Byte], Chunk[Byte]] = identity ) { def nonEmpty = intervals.nonEmpty @@ -59,22 +110,48 @@ object NonBlockingBackoffPolicy { val empty = NonBlockingBackoffPolicy(Nil) } -case class RetryConfigForTopic(blockingBackoffs: () => Seq[ZDuration], nonBlockingBackoffs: NonBlockingBackoffPolicy) { - def nonEmpty: Boolean = blockingBackoffs().nonEmpty || nonBlockingBackoffs.nonEmpty +case class RetryConfigForTopic( + finiteBlockingBackoffs: FiniteBlockingBackoffPolicy, + infiniteBlockingBackoffs: InfiniteBlockingBackoffPolicy, + nonBlockingBackoffs: NonBlockingBackoffPolicy +) { + def nonEmpty: Boolean = finiteBlockingBackoffs.nonEmpty || infiniteBlockingBackoffs.nonEmpty || nonBlockingBackoffs.nonEmpty def retryType: RetryType = - if (blockingBackoffs.apply().nonEmpty) { + if (finiteBlockingBackoffs.nonEmpty) { if (nonBlockingBackoffs.nonEmpty) BlockingFollowedByNonBlocking else Blocking - } else { + } else if (infiniteBlockingBackoffs.nonEmpty) + Blocking + else { NonBlocking } } object RetryConfigForTopic { - val empty = RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy.empty) + val empty = RetryConfigForTopic(FiniteBlockingBackoffPolicy.empty, EmptyInfiniteBlockingBackoffPolicy, NonBlockingBackoffPolicy.empty) + + def nonBlockingRetryConfigForTopic(intervals: List[ZDuration]) = + RetryConfigForTopic(FiniteBlockingBackoffPolicy.empty, EmptyInfiniteBlockingBackoffPolicy, NonBlockingBackoffPolicy(intervals)) + + def finiteBlockingRetryConfigForTopic(intervals: List[ZDuration]) = + RetryConfigForTopic(FiniteBlockingBackoffPolicy(intervals), EmptyInfiniteBlockingBackoffPolicy, NonBlockingBackoffPolicy.empty) + + def infiniteBlockingRetryConfigForTopic(interval: ZDuration) = + RetryConfigForTopic(FiniteBlockingBackoffPolicy.empty, InfiniteFixedBackoff(interval), NonBlockingBackoffPolicy.empty) + + def infiniteBlockingRetryConfigForTopic( + initialInterval: ZDuration, + maxInterval: ZDuration, + backOffMultiplier: Float + ) = + RetryConfigForTopic( + FiniteBlockingBackoffPolicy.empty, + InfiniteExponentialBackoffsMaxInterval(initialInterval, maxInterval, backOffMultiplier), + NonBlockingBackoffPolicy.empty + ) } object ZRetryConfig { @@ -82,18 +159,27 @@ object ZRetryConfig { forAllTopics( RetryConfigForTopic( nonBlockingBackoffs = NonBlockingBackoffPolicy(firstRetry :: otherRetries.toList), - blockingBackoffs = () => List.empty + finiteBlockingBackoffs = FiniteBlockingBackoffPolicy.empty, + infiniteBlockingBackoffs = EmptyInfiniteBlockingBackoffPolicy ) ) def finiteBlockingRetry(firstRetry: ZDuration, otherRetries: ZDuration*): RetryConfig = forAllTopics( - RetryConfigForTopic(blockingBackoffs = () => firstRetry :: otherRetries.toList, nonBlockingBackoffs = NonBlockingBackoffPolicy.empty) + RetryConfigForTopic( + finiteBlockingBackoffs = FiniteBlockingBackoffPolicy(firstRetry :: otherRetries.toList), + infiniteBlockingBackoffs = EmptyInfiniteBlockingBackoffPolicy, + nonBlockingBackoffs = NonBlockingBackoffPolicy.empty + ) ) def infiniteBlockingRetry(interval: ZDuration): RetryConfig = forAllTopics( - RetryConfigForTopic(blockingBackoffs = () => Stream.continually(interval), nonBlockingBackoffs = NonBlockingBackoffPolicy.empty) + RetryConfigForTopic( + infiniteBlockingBackoffs = InfiniteFixedBackoff(interval), + finiteBlockingBackoffs = FiniteBlockingBackoffPolicy.empty, + nonBlockingBackoffs = NonBlockingBackoffPolicy.empty + ) ) def exponentialBackoffBlockingRetry( @@ -101,32 +187,68 @@ object ZRetryConfig { maximalInterval: ZDuration, backOffMultiplier: Float, infiniteRetryMaxInterval: Boolean - ): RetryConfig = + ): RetryConfig = { + val (finite, infinite) = if (infiniteRetryMaxInterval) { + ( + FiniteBlockingBackoffPolicy.empty, + InfiniteExponentialBackoffsMaxInterval(initialInterval, maximalInterval, backOffMultiplier) + ) + } else { + ( + FiniteBlockingBackoffPolicy( + exponentialBackoffs(initialInterval, maximalInterval, backOffMultiplier, infiniteRetryMaxInterval).toList + ), + EmptyInfiniteBlockingBackoffPolicy + ) + } forAllTopics( RetryConfigForTopic( - blockingBackoffs = () => exponentialBackoffs(initialInterval, maximalInterval, backOffMultiplier, infiniteRetryMaxInterval), + finiteBlockingBackoffs = finite, + infiniteBlockingBackoffs = infinite, nonBlockingBackoffs = NonBlockingBackoffPolicy.empty ) ) + } def exponentialBackoffBlockingRetry( initialInterval: ZDuration, maxMultiplications: Int, backOffMultiplier: Float, infiniteRetryMaxInterval: Boolean - ): RetryConfig = + ): RetryConfig = { + val (finite, infinite) = if (infiniteRetryMaxInterval) { + ( + FiniteBlockingBackoffPolicy.empty, + InfiniteExponentialBackoffsMaxMultiplication(initialInterval, maxMultiplications, backOffMultiplier) + ) + } else { + ( + FiniteBlockingBackoffPolicy( + exponentialBackoffs(initialInterval, maxMultiplications, backOffMultiplier, infiniteRetryMaxInterval).toList + ), + EmptyInfiniteBlockingBackoffPolicy + ) + } forAllTopics( RetryConfigForTopic( - blockingBackoffs = () => exponentialBackoffs(initialInterval, maxMultiplications, backOffMultiplier, infiniteRetryMaxInterval), + finiteBlockingBackoffs = finite, + infiniteBlockingBackoffs = infinite, nonBlockingBackoffs = NonBlockingBackoffPolicy.empty ) ) + } def blockingFollowedByNonBlockingRetry( - blockingBackoffs: NonEmptyList[ZDuration], + blockingBackoffs: FiniteBlockingBackoffPolicy, nonBlockingBackoffs: NonBlockingBackoffPolicy ): RetryConfig = - forAllTopics(RetryConfigForTopic(blockingBackoffs = () => blockingBackoffs, nonBlockingBackoffs = nonBlockingBackoffs)) + forAllTopics( + RetryConfigForTopic( + finiteBlockingBackoffs = blockingBackoffs, + nonBlockingBackoffs = nonBlockingBackoffs, + infiniteBlockingBackoffs = EmptyInfiniteBlockingBackoffPolicy + ) + ) def perTopicRetries(configs: PartialFunction[Topic, RetryConfigForTopic]) = RetryConfig(configs, None) @@ -178,7 +300,7 @@ object RetryConfig { def blockingFollowedByNonBlockingRetry(blockingBackoffs: NonEmptyList[Duration], nonBlockingBackoffs: List[Duration]): RetryConfig = ZRetryConfig.blockingFollowedByNonBlockingRetry( - blockingBackoffs = blockingBackoffs.map(ZDuration.fromScala), + blockingBackoffs = FiniteBlockingBackoffPolicy(blockingBackoffs.map(ZDuration.fromScala)), nonBlockingBackoffs = NonBlockingBackoffPolicy(nonBlockingBackoffs.map(ZDuration.fromScala)) ) } diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala index 0797ba25..ef9fccb1 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/RetryConsumerRecordHandlerTest.scala @@ -6,6 +6,7 @@ import com.wixpress.dst.greyhound.core._ import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.Topics import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler} import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, Blocking => InternalBlocking, IgnoringAll, IgnoringOnce} +import com.wixpress.dst.greyhound.core.consumer.retry.RetryConfigForTopic.nonBlockingRetryConfigForTopic import com.wixpress.dst.greyhound.core.consumer.retry.RetryConsumerRecordHandlerTest.{offset, partition, _} import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.{BlockingIgnoredForAllFor, BlockingIgnoredOnceFor, BlockingRetryHandlerInvocationFailed, NoRetryOnNonRetryableFailure} import com.wixpress.dst.greyhound.core.producer.{ProducerError, ProducerRecord} @@ -311,7 +312,10 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics group, failingHandlerWith(handleCountRef), ZRetryConfig - .blockingFollowedByNonBlockingRetry(List(10.millis, 500.millis), NonBlockingBackoffPolicy(List(1.second))), + .blockingFollowedByNonBlockingRetry( + FiniteBlockingBackoffPolicy(List(10.millis, 500.millis)), + NonBlockingBackoffPolicy(List(1.second)) + ), producer, Topics(Set(topic)), blockingState, @@ -336,9 +340,7 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics topic <- randomTopicName otherTopic <- randomTopicName blockingState <- Ref.make[Map[BlockingTarget, BlockingState]](Map.empty) - policy = ZRetryConfig.perTopicRetries { - case `otherTopic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil)) - } + policy = ZRetryConfig.perTopicRetries { case `otherTopic` => nonBlockingRetryConfigForTopic(1.second :: Nil) } retryHandler = RetryRecordHandler.withRetries( group, failingHandler, @@ -410,7 +412,7 @@ class RetryConsumerRecordHandlerTest extends BaseTest[TestClock with TestMetrics retryHelper, awaitShutdown = _ => ZIO.succeed(awaitShutdown) ) - val headers = RetryAttempt.toHeaders(RetryAttempt(topic, 0, now, 3.seconds)) + val headers = RetryAttempt.toHeaders(RetryAttempt(topic, 0, now, 3.seconds)) for { key <- bytes value <- bytes diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/ZRetryConfigTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/ZRetryConfigTest.scala index ac6dca03..6535207d 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/ZRetryConfigTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/retry/ZRetryConfigTest.scala @@ -29,12 +29,12 @@ class ZRetryConfigTest extends SpecificationWithJUnit { val absMult = abs(params.mult) val safeInit = if (init < 10) 10 else init - for (i <- 0 to max) yield { backoffs()(i) } mustEqual (pow(1 + absMult, i) * safeInit).toLong.millis + for (i <- 0 to max) yield { backoffs(i) } mustEqual (pow(1 + absMult, i) * safeInit).toLong.millis val maxMult = math.max(0, max) val lastDurationToCheck = abs(max + 1) * 2 val firstDurationToCheck = math.max(0, max + 1) for (i <- firstDurationToCheck to lastDurationToCheck) - yield backoffs()(i) mustEqual (pow(1 + absMult, maxMult) * safeInit).toLong.millis + yield backoffs(i) mustEqual (pow(1 + absMult, maxMult) * safeInit).toLong.millis } } @@ -57,11 +57,11 @@ class ZRetryConfigTest extends SpecificationWithJUnit { val absMult = abs(params.mult) val safeInit = if (init < 10) 10 else init - for (i <- 0 to maxMult) yield { backoffs()(i) } mustEqual (pow(1 + absMult, i) * safeInit).toLong.millis + for (i <- 0 to maxMult) yield { backoffs(i) } mustEqual (pow(1 + absMult, i) * safeInit).toLong.millis val lastDurationToCheck = abs(maxMult + 1) * 2 val firstDurationToCheck = math.max(0, maxMult + 1) for (i <- firstDurationToCheck to lastDurationToCheck) - yield backoffs()(i) mustEqual (pow(1 + absMult, maxMult) * safeInit).toLong.millis + yield backoffs(i) mustEqual (pow(1 + absMult, maxMult) * safeInit).toLong.millis } } } diff --git a/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/GreyhoundConsumersBuilder.scala b/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/GreyhoundConsumersBuilder.scala index f3924695..484acc95 100644 --- a/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/GreyhoundConsumersBuilder.scala +++ b/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/GreyhoundConsumersBuilder.scala @@ -5,9 +5,9 @@ import com.wixpress.dst.greyhound.core import com.wixpress.dst.greyhound.core.consumer.batched.{BatchConsumer, BatchConsumerConfig, BatchEventLoopConfig} import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.Topics import com.wixpress.dst.greyhound.core.consumer.domain.{RecordHandler => CoreRecordHandler} -import com.wixpress.dst.greyhound.core.consumer.retry.{NonBlockingBackoffPolicy, RetryConfig => CoreRetryConfig, RetryConfigForTopic} +import com.wixpress.dst.greyhound.core.consumer.retry.{EmptyInfiniteBlockingBackoffPolicy, FiniteBlockingBackoffPolicy, InfiniteBlockingBackoffPolicy, NonBlockingBackoffPolicy, RetryConfigForTopic, RetryConfig => CoreRetryConfig} import com.wixpress.dst.greyhound.core.consumer.{RecordConsumer, RecordConsumerConfig} -import com.wixpress.dst.greyhound.core.{consumer, Group, NonEmptySet, Topic} +import com.wixpress.dst.greyhound.core.{Group, NonEmptySet, Topic, consumer} import com.wixpress.dst.greyhound.future.GreyhoundRuntime import com.wixpress.dst.greyhound.future.GreyhoundRuntime.Env import zio._ @@ -138,7 +138,11 @@ class GreyhoundConsumersBuilder(val config: GreyhoundConfig) { retryConfig.map(config => { val forTopic = - RetryConfigForTopic(() => config.blockingBackoffs().asScala, NonBlockingBackoffPolicy(config.nonBlockingBackoffs.asScala)) + RetryConfigForTopic( + FiniteBlockingBackoffPolicy(config.blockingBackoffs().asScala.toList), + EmptyInfiniteBlockingBackoffPolicy, + NonBlockingBackoffPolicy(config.nonBlockingBackoffs.asScala.toList) + ) CoreRetryConfig({ case _ => forTopic }, None) }) diff --git a/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/RetryConfigBuilder.scala b/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/RetryConfigBuilder.scala index bd0f5538..795844b1 100644 --- a/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/RetryConfigBuilder.scala +++ b/java-interop/src/main/java/com/wixpress/dst/greyhound/scala/RetryConfigBuilder.scala @@ -44,7 +44,7 @@ object RetryConfigBuilder { } private def fromCoreRetryConfig(coreRetryConfig: com.wixpress.dst.greyhound.core.consumer.retry.RetryConfig): RetryConfig = { - val blocking: util.List[Duration] = seqAsJavaList(coreRetryConfig.blockingBackoffs("").apply) + val blocking: util.List[Duration] = seqAsJavaList(coreRetryConfig.blockingBackoffs("")) val nonBlocking: util.List[Duration] = seqAsJavaList(coreRetryConfig.nonBlockingBackoffs("").intervals) new RetryConfig(blocking, nonBlocking) }