Skip to content

Commit

Permalink
make GreyhoundNG RetryConfig serializable (so can be sent over gRpc a…
Browse files Browse the repository at this point in the history
…nd persisted) (#35207)

* make GreyhoundNG RetryConfig serializable (so can be sent over gRpc and persisted) #pr

* fix lp build

* full cross-repo

* Initial cross-repo check

* Updating cross-repo check

* Updating cross-repo check

* Updating cross-repo check

* Updating cross-repo check

* CR changes

---------

Co-authored-by: wixapiregistry <[email protected]>
GitOrigin-RevId: feaf45b3c77f9074a3445bffdd6f748d3df77056
  • Loading branch information
2 people authored and wix-oss committed Sep 3, 2023
1 parent dd1aee1 commit cab79d4
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.wixpress.dst.greyhound.core.consumer._
import com.wixpress.dst.greyhound.core.consumer.domain.ConsumerSubscription.{TopicPattern, Topics}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler}
import com.wixpress.dst.greyhound.core.consumer.retry.NonBlockingRetryHelper.fixedRetryTopic
import com.wixpress.dst.greyhound.core.consumer.retry.RetryConfigForTopic.{finiteBlockingRetryConfigForTopic, nonBlockingRetryConfigForTopic}
import com.wixpress.dst.greyhound.core.consumer.retry._
import com.wixpress.dst.greyhound.core.producer.{Encryptor, ProducerRecord}
import com.wixpress.dst.greyhound.core.testkit.{eventuallyZ, AwaitableRef, BaseTestWithSharedEnv}
Expand Down Expand Up @@ -40,8 +41,8 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
done <- Promise.make[Nothing, ConsumerRecord[String, String]]
retryConfig = ZRetryConfig
.perTopicRetries {
case `topic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil))
case `anotherTopic` => RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(1.second :: Nil))
case `topic` => nonBlockingRetryConfigForTopic(1.second :: Nil)
case `anotherTopic` => nonBlockingRetryConfigForTopic(1.second :: Nil)
}
.copy(produceEncryptor = _ => ZIO.succeed(dummyEncryptor))

Expand Down Expand Up @@ -76,7 +77,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
retryConfig =
ZRetryConfig
.finiteBlockingRetry(100.millis, 100.millis)
.withCustomRetriesFor { case `topic2` => RetryConfigForTopic(() => 300.millis :: Nil, NonBlockingBackoffPolicy.empty) }
.withCustomRetriesFor { case `topic2` => finiteBlockingRetryConfigForTopic(300.millis :: Nil) }
retryHandler = failingBlockingRecordHandlerWith(consumedValuesRef, Set(topic, topic2)).withDeserializers(StringSerde, StringSerde)
_ <- RecordConsumer.make(configFor(kafka, group, retryConfig, topic, topic2), retryHandler).flatMap { _ =>
producer.produce(ProducerRecord(topic, "bar", Some("foo")), StringSerde, StringSerde) *> Clock.sleep(2.seconds) *>
Expand Down Expand Up @@ -225,7 +226,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
invocations <- Ref.make(0)
done <- Promise.make[Nothing, ConsumerRecord[String, String]]
retryConfig = ZRetryConfig.retryForPattern(
RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(Seq(1.second, 1.second, 1.seconds)))
nonBlockingRetryConfigForTopic(List(1.second, 1.second, 1.seconds))
)
retryHandler = failingRecordHandler(invocations, done).withDeserializers(StringSerde, StringSerde)
success <- RecordConsumer
Expand Down Expand Up @@ -258,7 +259,9 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
group <- randomGroup
originalTopicCallCount <- Ref.make[Int](0)
retryTopicCallCount <- Ref.make[Int](0)
retryConfig = ZRetryConfig.blockingFollowedByNonBlockingRetry(List(1.second), NonBlockingBackoffPolicy(List(1.seconds)))
retryConfig =
ZRetryConfig
.blockingFollowedByNonBlockingRetry(FiniteBlockingBackoffPolicy(List(1.second)), NonBlockingBackoffPolicy(List(1.seconds)))
retryHandler = failingBlockingNonBlockingRecordHandler(originalTopicCallCount, retryTopicCallCount, topic).withDeserializers(
StringSerde,
StringSerde
Expand All @@ -281,9 +284,7 @@ class RetryIT extends BaseTestWithSharedEnv[Env, TestResources] {
for {
r <- getShared
TestResources(kafka, _) = r
retryConfig = ZRetryConfig.retryForPattern(
RetryConfigForTopic(() => Nil, NonBlockingBackoffPolicy(Seq(1.second, 1.second, 1.seconds)))
)
retryConfig = ZRetryConfig.retryForPattern(nonBlockingRetryConfigForTopic(List(1.second, 1.second, 1.seconds)))
handler = RecordHandler { _: ConsumerRecord[String, String] => ZIO.unit }.withDeserializers(StringSerde, StringSerde)
_ <- RecordConsumer
.make(configFor(kafka, "group", retryConfig, "topic"), handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.wixpress.dst.greyhound.core.consumer.retry
import java.util.concurrent.TimeUnit
import com.wixpress.dst.greyhound.core.{Group, TopicPartition}
import com.wixpress.dst.greyhound.core.consumer.domain.{ConsumerRecord, RecordHandler}
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, IgnoringOnce, Blocking => InternalBlocking}
import com.wixpress.dst.greyhound.core.consumer.retry.BlockingState.{Blocked, Blocking => InternalBlocking, IgnoringOnce}
import com.wixpress.dst.greyhound.core.consumer.retry.RetryRecordHandlerMetric.{BlockingRetryHandlerInvocationFailed, DoneBlockingBeforeRetry, NoRetryOnNonRetryableFailure}
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics
import com.wixpress.dst.greyhound.core.metrics.GreyhoundMetrics.report
Expand All @@ -30,7 +30,11 @@ private[retry] object BlockingRetryRecordHandler {
override def handle(record: ConsumerRecord[K, V])(implicit trace: Trace): ZIO[GreyhoundMetrics with R, Nothing, LastHandleResult] = {
val topicPartition = TopicPartition(record.topic, record.partition)

def pollBlockingStateWithSuspensions(record: ConsumerRecord[K, V], interval: Duration, start: Long): URIO[GreyhoundMetrics, PollResult] = {
def pollBlockingStateWithSuspensions(
record: ConsumerRecord[K, V],
interval: Duration,
start: Long
): URIO[GreyhoundMetrics, PollResult] = {
for {
shouldBlock <- blockingStateResolver.resolve(record)
shouldPollAgain <-
Expand Down Expand Up @@ -73,7 +77,8 @@ private[retry] object BlockingRetryRecordHandler {
case error =>
interval
.map { interval =>
report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *> blockOnErrorFor(record, interval)
report(BlockingRetryHandlerInvocationFailed(topicPartition, record.offset, error.toString)) *>
blockOnErrorFor(record, interval)
}
.getOrElse(ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false)))
}
Expand All @@ -94,12 +99,13 @@ private[retry] object BlockingRetryRecordHandler {
if (nonBlockingHandler.isHandlingRetryTopicMessage(group, record)) {
ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))
} else {
val durationsIncludingForInvocationWithNoErrorHandling = retryConfig.blockingBackoffs(record.topic)().map(Some(_)) :+ None
val durations = retryConfig.blockingBackoffs(record.topic)
val durationsIncludingForInvocationWithNoErrorHandling = durations.map(Some(_)) :+ None
for {
result <- retryEvery(record, durationsIncludingForInvocationWithNoErrorHandling) { (rec, interval) =>
handleAndMaybeBlockOnErrorFor(rec, interval)
}
_ <- maybeBackToStateBlocking
handleAndMaybeBlockOnErrorFor(rec, interval)
}
_ <- maybeBackToStateBlocking
} yield result
}
}
Expand All @@ -111,7 +117,7 @@ private[retry] object BlockingRetryRecordHandler {
ZIO.succeed(as.iterator).flatMap { i =>
def loop(retryAttempt: Option[RetryAttempt]): ZIO[R, E, LastHandleResult] =
if (i.hasNext) {
val nextDelay = i.next
val nextDelay = i.next
val recordWithAttempt = retryAttempt.fold(record) { attempt =>
record.copy(headers = record.headers ++ RetryAttempt.toHeaders(attempt))
}
Expand All @@ -127,8 +133,7 @@ private[retry] object BlockingRetryRecordHandler {
}
else ZIO.succeed(result)
}
}
else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))
} else ZIO.succeed(LastHandleResult(lastHandleSucceeded = false, shouldContinue = false))

loop(None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import java.time.Instant

/**
* Description of a retry attempt
* @param attempt contains which attempt is it, starting from 0 including blocking and non-blocking attempts
* @param attempt
* contains which attempt is it, starting from 0 including blocking and non-blocking attempts
*/
case class RetryAttempt(
originalTopic: Topic,
Expand All @@ -33,10 +34,10 @@ object RetryAttempt {
private def toChunk(str: String): Chunk[Byte] = Chunk.fromArray(str.getBytes)

def toHeaders(attempt: RetryAttempt): Headers = Headers(
RetryHeader.Submitted -> toChunk(attempt.submittedAt.toEpochMilli.toString),
RetryHeader.Backoff -> toChunk(attempt.backoff.toMillis.toString),
RetryHeader.Submitted -> toChunk(attempt.submittedAt.toEpochMilli.toString),
RetryHeader.Backoff -> toChunk(attempt.backoff.toMillis.toString),
RetryHeader.OriginalTopic -> toChunk(attempt.originalTopic),
RetryHeader.RetryAttempt -> toChunk(attempt.attempt.toString),
RetryHeader.RetryAttempt -> toChunk(attempt.attempt.toString)
)

private case class RetryAttemptHeaders(
Expand All @@ -49,14 +50,14 @@ object RetryAttempt {
private def fromHeaders(headers: Headers): Task[RetryAttemptHeaders] =
for {
submitted <- headers.get(RetryHeader.Submitted, instantDeserializer)
backoff <- headers.get(RetryHeader.Backoff, durationDeserializer)
topic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde)
attempt <- headers.get(RetryHeader.RetryAttempt, longDeserializer)
backoff <- headers.get(RetryHeader.Backoff, durationDeserializer)
topic <- headers.get[String](RetryHeader.OriginalTopic, StringSerde)
attempt <- headers.get(RetryHeader.RetryAttempt, longDeserializer)
} yield RetryAttemptHeaders(topic, attempt.map(_.toInt), submitted, backoff)

/** @return None on infinite blocking retries */
def maxBlockingAttempts(topic: Topic, retryConfig: Option[RetryConfig]): Option[Int] =
retryConfig.map(_.blockingBackoffs(topic)()).fold(Option(0)) {
retryConfig.map(_.blockingBackoffs(topic)).fold(Option(0)) {
case finite if finite.hasDefiniteSize => Some(finite.size)
case _ => None
}
Expand All @@ -68,31 +69,29 @@ object RetryAttempt {
}

def extract(
headers: Headers,
topic: Topic,
group: Group,
subscription: ConsumerSubscription,
retryConfig: Option[RetryConfig],
headers: Headers,
topic: Topic,
group: Group,
subscription: ConsumerSubscription,
retryConfig: Option[RetryConfig]
)(implicit trace: Trace): UIO[Option[RetryAttempt]] = {

def maybeNonBlockingAttempt(hs: RetryAttemptHeaders): Option[RetryAttempt] =
for {
submitted <- hs.submittedAt
backoff <- hs.backoff
submitted <- hs.submittedAt
backoff <- hs.backoff
TopicAttempt(originalTopic, attempt) <- attemptNumberFromTopic(subscription, topic, hs.originalTopic, group)
blockingRetries = maxBlockingAttempts(originalTopic, retryConfig).getOrElse(0)
blockingRetries = maxBlockingAttempts(originalTopic, retryConfig).getOrElse(0)
} yield RetryAttempt(originalTopic, blockingRetries + attempt, submitted, backoff)

def maybeBlockingAttempt(hs: RetryAttemptHeaders): Option[RetryAttempt] =
for {
submitted <- hs.submittedAt
backoff <- hs.backoff
submitted <- hs.submittedAt
backoff <- hs.backoff
originalTopic <- hs.originalTopic if originalTopic == topic
attempt <- hs.attempt
attempt <- hs.attempt
} yield RetryAttempt(originalTopic, attempt, submitted, backoff)

fromHeaders(headers).map { hs =>
maybeNonBlockingAttempt(hs) orElse maybeBlockingAttempt(hs)
}
fromHeaders(headers).map { hs => maybeNonBlockingAttempt(hs) orElse maybeBlockingAttempt(hs) }
}.catchAll(_ => ZIO.none)
}
Loading

0 comments on commit cab79d4

Please sign in to comment.