Skip to content

Commit

Permalink
[greyhound] parallel consumer - add offsets and gaps init (#35027)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: fe53459fd02e8e627356cbe68d71e4d4bc6b3d24
  • Loading branch information
ben-wattelman authored and wix-oss committed Oct 5, 2023
1 parent 8c9be80 commit fdaf5f3
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
for {
r <- getShared
TestResources(kafka, producer) = r
topic <- kafka.createRandomTopic()
topic <- kafka.createRandomTopic(partitions = 1)
group <- randomGroup
cId <- clientId
partition = 0
Expand Down Expand Up @@ -129,10 +129,11 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
consumer <- makeParallelConsumer(handler, kafka, topic, group, cId, drainTimeout = drainTimeout, startPaused = true)
_ <- produceRecords(producer, Seq(slowRecord))
_ <- produceRecords(producer, fastRecords)
_ <- ZIO.sleep(2.seconds)
// produce is done synchronously to make sure all records are produced before consumer starts, so all records are polled at once
_ <- consumer.resume
_ <- fastMessagesLatch.await
_ <- ZIO.sleep(2.second) // sleep to ensure commit is done before rebalance
_ <- ZIO.sleep(3.second) // sleep to ensure commit is done before rebalance
// start another consumer to trigger a rebalance before slow handler is done
_ <- makeParallelConsumer(
handler,
Expand All @@ -141,11 +142,11 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
group,
cId,
drainTimeout = drainTimeout,
onAssigned = assigned => ZIO.when(assigned.nonEmpty)(finishRebalance.succeed())
onAssigned = _ => finishRebalance.succeed()
)
} yield ()

_ <- eventuallyZ(numProcessedMessges.get, 20.seconds)(_ == allMessages)
_ <- eventuallyZ(numProcessedMessges.get, 25.seconds)(_ == allMessages)
} yield {
ok
}
Expand Down Expand Up @@ -209,8 +210,7 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
regularConfig = configFor(kafka, group, Set(topic))
_ <- metricsQueue.take
.flatMap {
case m: SkippedGapsOnInitialization =>
ZIO.debug(s">>> got SkippedGapsOnInitialization with gaps: ${m.gaps}") *> skippedGaps.update(_ + 1)
case _: SkippedGapsOnInitialization => skippedGaps.update(_ + 1)
case _ => ZIO.unit
}
.repeat(Schedule.forever)
Expand Down Expand Up @@ -242,6 +242,7 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
parallelConsumer <- makeParallelConsumer(parallelConsumerHandler, kafka, topic, group, cId, startPaused = true)
_ <- produceRecords(producer, Seq(slowRecord))
_ <- produceRecords(producer, fastRecords)
_ <- ZIO.sleep(3.seconds)
// produce is done synchronously to make sure all records are produced before consumer starts, so all records are polled at once
_ <- parallelConsumer.resume
_ <- fastMessagesLatch.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ object Dispatcher {
consumeInParallel: Boolean = false,
maxParallelism: Int = 1,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] = _ =>
ZIO.succeed(Map.empty)
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]] = _ => ZIO.succeed(Map.empty),
init: Promise[Nothing, Unit]
)(implicit trace: Trace): UIO[Dispatcher[R]] =
for {
p <- Promise.make[Nothing, Unit]
state <- Ref.make[DispatcherState](if (startPaused) DispatcherState.Paused(p) else DispatcherState.Running)
workers <- Ref.make(Map.empty[TopicPartition, Worker])
p <- Promise.make[Nothing, Unit]
state <- Ref.make[DispatcherState](if (startPaused) DispatcherState.Paused(p) else DispatcherState.Running)
initState <-
Ref.make[DispatcherInitState](if (consumeInParallel) DispatcherInitState.NotInitialized else DispatcherInitState.Initialized)
workers <- Ref.make(Map.empty[TopicPartition, Worker])
} yield new Dispatcher[R] {
override def submit(record: Record): URIO[R with Env, SubmitResult] =
for {
Expand All @@ -73,15 +75,20 @@ object Dispatcher {

override def submitBatch(records: Records): URIO[R with Env, SubmitResult] =
for {
_ <- report(SubmittingRecordBatch(group, clientId, records.size, records, consumerAttributes))
allSamePartition = records.map(r => RecordTopicPartition(r)).distinct.size == 1
submitResult <- if (allSamePartition) {
val partition = RecordTopicPartition(records.head)
for {
worker <- workerFor(partition, records.head.offset)
submitted <- worker.submitBatch(records)
} yield submitted
} else ZIO.succeed(SubmitBatchResult(success = false, Some(records.minBy(_.offset))))
_ <- report(SubmittingRecordBatch(group, clientId, records.size, records, consumerAttributes))
currentInitState <- initState.get
_ <- currentInitState match {
case DispatcherInitState.NotInitialized => init.await *> initState.set(DispatcherInitState.Initialized)
case _ => ZIO.unit
}
allSamePartition = records.map(r => RecordTopicPartition(r)).distinct.size == 1
submitResult <- if (allSamePartition) {
val partition = RecordTopicPartition(records.head)
for {
worker <- workerFor(partition, records.head.offset)
submitted <- worker.submitBatch(records)
} yield submitted
} else ZIO.succeed(SubmitBatchResult(success = false, Some(records.minBy(_.offset))))

} yield
if (allSamePartition && submitResult.success) Submitted
Expand Down Expand Up @@ -212,6 +219,16 @@ object Dispatcher {

}

sealed trait DispatcherInitState

object DispatcherInitState {

case object NotInitialized extends DispatcherInitState

case object Initialized extends DispatcherInitState

}

case class Task(record: Record, complete: UIO[Unit])

trait Worker {
Expand Down Expand Up @@ -241,7 +258,7 @@ object Dispatcher {
consumeInParallel: Boolean,
maxParallelism: Int,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]]
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]]
)(implicit trace: Trace): URIO[R with Env, Worker] = for {
queue <- Queue.dropping[Record](capacity)
internalState <- TRef.make(WorkerInternalState.empty).commit
Expand Down Expand Up @@ -366,7 +383,7 @@ object Dispatcher {
consumerAttributes: Map[String, String],
maxParallelism: Int,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit],
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]]
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]]
)(implicit trace: Trace): ZIO[R with GreyhoundMetrics, Any, Boolean] =
internalState.update(s => s.cleared).commit *>
state.get.flatMap {
Expand Down Expand Up @@ -403,7 +420,7 @@ object Dispatcher {
consumerAttributes: Map[ClientId, ClientId],
maxParallelism: Int,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit],
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]]
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]]
): ZIO[R with GreyhoundMetrics, Throwable, Boolean] =
for {
_ <- report(TookAllRecordsFromQueue(records.size, records, group, clientId, consumerAttributes))
Expand Down Expand Up @@ -432,15 +449,11 @@ object Dispatcher {
} yield res
}

private def shouldRecordBeHandled(record: Record, maybeGaps: Map[TopicPartition, Option[OffsetAndGaps]]): Boolean = {
maybeGaps.get(TopicPartition(record.topic, record.partition)) match {
case Some(maybeOffsetAndGapsForPartition) =>
maybeOffsetAndGapsForPartition match {
case Some(offsetAndGapsForPartition) if offsetAndGapsForPartition.gaps.nonEmpty =>
record.offset > offsetAndGapsForPartition.offset || offsetAndGapsForPartition.gaps.exists(_.contains(record.offset))
case _ => true
}
case None => true
private def shouldRecordBeHandled(record: Record, gaps: Map[TopicPartition, OffsetAndGaps]): Boolean = {
gaps.get(TopicPartition(record.topic, record.partition)) match {
case Some(offsetAndGapsForPartition) if offsetAndGapsForPartition.gaps.nonEmpty =>
record.offset > offsetAndGapsForPartition.offset || offsetAndGapsForPartition.gaps.exists(_.contains(record.offset))
case _ => true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ object EventLoop {
offsetsAndGaps <- OffsetsAndGaps.make
handle = if (config.consumePartitionInParallel) { cr: Record => handler.handle(cr) }
else handler.andThen(offsets.update).handle(_)
updateBatch = { records: Chunk[Record] => report(HandledBatch(records)) *> offsetsAndGaps.update(records) }
currentGaps = { partitions: Set[TopicPartition] => currentGapsForPartitions(partitions, clientId)(consumer) }
updateBatch = { records: Chunk[Record] => report(HandledBatch(records)) *> updateGapsByBatch(records, offsetsAndGaps) }
currentGaps = { partitions: Set[TopicPartition] => offsetsAndGaps.offsetsAndGapsForPartitions(partitions) }
_ <- report(CreatingDispatcher(clientId, group, consumerAttributes, config.startPaused))
offsetsAndGapsInit <- Promise.make[Nothing, Unit]
dispatcher <- Dispatcher.make(
group,
clientId,
Expand All @@ -60,11 +61,12 @@ object EventLoop {
config.consumePartitionInParallel,
config.maxParallelism,
updateBatch,
currentGaps
currentGaps,
offsetsAndGapsInit
)
positionsRef <- Ref.make(Map.empty[TopicPartition, Offset])
pausedPartitionsRef <- Ref.make(Set.empty[TopicPartition])
partitionsAssigned <- Promise.make[Nothing, Unit]
partitionsAssigned <- Promise.make[Nothing, Set[TopicPartition]]
// TODO how to handle errors in subscribe?
rebalanceListener = listener(
pausedPartitionsRef,
Expand All @@ -86,7 +88,20 @@ object EventLoop {
.repeatWhile(_ == true)
.forkDaemon
_ <- report(AwaitingPartitionsAssignment(clientId, group, consumerAttributes))
_ <- partitionsAssigned.await
partitions <- partitionsAssigned.await
_ <- if (config.consumePartitionInParallel) {
report(AwaitingOffsetsAndGapsInit(clientId, group, consumerAttributes)) *>
initializeOffsetsAndGaps( // we must preform init in the main thread ant not in the rebalance listener as it involves calling SDK
offsetsAndGaps,
partitions,
consumer,
clientId,
group,
consumerAttributes,
offsetsAndGapsInit
) *> offsetsAndGapsInit.await

} else offsetsAndGapsInit.succeed()
env <- ZIO.environment[Env]
} yield (dispatcher, fiber, offsets, positionsRef, running, rebalanceListener.provideEnvironment(env))

Expand Down Expand Up @@ -181,7 +196,7 @@ object EventLoop {
pausedPartitionsRef: Ref[Set[TopicPartition]],
config: EventLoopConfig,
dispatcher: Dispatcher[_],
partitionsAssigned: Promise[Nothing, Unit],
partitionsAssigned: Promise[Nothing, Set[TopicPartition]],
group: Group,
consumer0: Consumer,
clientId: ClientId,
Expand All @@ -207,7 +222,7 @@ object EventLoop {
}

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
partitionsAssigned.succeed(())
partitionsAssigned.succeed(partitions)
}
}

Expand Down Expand Up @@ -245,6 +260,25 @@ object EventLoop {
_ <- pausedRef.update(_ => pausedTopics)
} yield records

private def initializeOffsetsAndGaps(
offsetsAndGaps: OffsetsAndGaps,
partitions: Set[TopicPartition],
consumer: Consumer,
clientId: ClientId,
group: Group,
attributes: Map[String, String],
offsetsAndGapsInit: Promise[Nothing, Unit]
) = for {
committedOffsetsAndMetadata <- consumer.committedOffsetsAndMetadata(partitions)
initialOffsetsAndGaps =
committedOffsetsAndMetadata.mapValues(om =>
OffsetsAndGaps.parseGapsString(om.metadata).fold(OffsetAndGaps(om.offset - 1, committable = false))(identity)
)
_ <- offsetsAndGaps.init(initialOffsetsAndGaps)
_ <- report(InitializedOffsetsAndGaps(clientId, group, initialOffsetsAndGaps, attributes))
_ <- offsetsAndGapsInit.succeed(())
} yield ()

private def submitRecordsSequentially[R2, R1](
consumer: Consumer,
dispatcher: Dispatcher[R2],
Expand Down Expand Up @@ -340,6 +374,9 @@ object EventLoop {
}
}

private def updateGapsByBatch(records: Chunk[Record], offsetsAndGaps: OffsetsAndGaps) =
offsetsAndGaps.update(records)

private def currentGapsForPartitions(partitions: Set[TopicPartition], clientId: ClientId)(
consumer: Consumer
): ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] =
Expand Down Expand Up @@ -429,6 +466,15 @@ object EventLoopMetric {

case class AwaitingPartitionsAssignment(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class AwaitingOffsetsAndGapsInit(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class InitializedOffsetsAndGaps(
clientId: ClientId,
group: Group,
initial: Map[TopicPartition, OffsetAndGaps],
attributes: Map[String, String]
) extends EventLoopMetric

case class CommittedOffsetsAndMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) extends EventLoopMetric

case class FailedToCommitOffsetsAndMetadata(t: Throwable, offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import com.wixpress.dst.greyhound.core.{Offset, OffsetAndMetadata, TopicPartitio
import zio._

trait OffsetsAndGaps {
def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit]

def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]]

def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]]

def offsetsAndGapsForPartitions(partitions: Set[TopicPartition]): UIO[Map[TopicPartition, OffsetAndGaps]]

def update(partition: TopicPartition, batch: Seq[Offset], prevCommittedOffset: Option[Offset]): UIO[Unit]

def update(record: ConsumerRecord[_, _]): UIO[Unit] =
Expand All @@ -33,6 +37,9 @@ object OffsetsAndGaps {
def make: UIO[OffsetsAndGaps] =
Ref.make(Map.empty[TopicPartition, OffsetAndGaps]).map { ref =>
new OffsetsAndGaps {
override def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit] =
ref.update(_ => committedOffsets)

override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] =
ref.modify(offsetsAndGaps => {
val committable = offsetsAndGaps.filter(_._2.committable)
Expand All @@ -43,6 +50,9 @@ object OffsetsAndGaps {
override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] =
ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start)))

override def offsetsAndGapsForPartitions(partitions: Set[TopicPartition]): UIO[Map[TopicPartition, OffsetAndGaps]] =
ref.get.map(_.filterKeys(partitions.contains))

override def update(partition: TopicPartition, batch: Seq[Offset], prevCommittedOffset: Option[Offset]): UIO[Unit] =
ref.update { offsetsAndGaps =>
val sortedBatch = batch.sorted
Expand Down Expand Up @@ -153,4 +163,6 @@ object OffsetAndGaps {
val LAST_HANDLED_OFFSET_SEPARATOR = "#"

def apply(offset: Offset): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap])

def apply(offset: Offset, committable: Boolean): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap], committable)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.wixpress.dst.greyhound.core.consumer

import com.wixpress.dst.greyhound.core.TopicPartition
import com.wixpress.dst.greyhound.core.consumer.Gap.GAP_SEPARATOR
import com.wixpress.dst.greyhound.core.consumer.OffsetAndGaps.LAST_HANDLED_OFFSET_SEPARATOR
import com.wixpress.dst.greyhound.core.consumer.OffsetGapsTest._
import com.wixpress.dst.greyhound.core.testkit.BaseTestNoEnv

Expand Down Expand Up @@ -54,6 +56,26 @@ class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {
} yield current must havePairs(partition0 -> OffsetAndGaps(1L, Seq()), partition1 -> OffsetAndGaps(0L, Seq()))
}

"init with given offsets and calculate subsequent gaps accordingly" in {
val partition0 = TopicPartition(topic, 0)
val partition1 = TopicPartition(topic, 1)
val initialCommittedOffsets =
Map(partition0 -> OffsetAndGaps(100L, committable = false), partition1 -> OffsetAndGaps(200L, committable = false))
for {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.init(initialCommittedOffsets)
_ <- offsetGaps.update(partition0, Seq(101L, 102L))
_ <- offsetGaps.update(partition1, Seq(203L, 204L))
current <- offsetGaps.getCommittableAndClear
} yield current must havePairs(partition0 -> OffsetAndGaps(102L, Seq()), partition1 -> OffsetAndGaps(204L, Seq(Gap(201L, 202L))))
}

"parse gaps from string" in {
val gaps = Seq(s"10${LAST_HANDLED_OFFSET_SEPARATOR}0${GAP_SEPARATOR}1", s"10${LAST_HANDLED_OFFSET_SEPARATOR}", "")
val expected = Seq(Some(OffsetAndGaps(10, Seq(Gap(0, 1)))), Some(OffsetAndGaps(10, Seq())), None)
gaps.map(OffsetsAndGaps.parseGapsString).must(beEqualTo(expected))
}

}

object OffsetGapsTest {
Expand Down
Loading

0 comments on commit fdaf5f3

Please sign in to comment.