Skip to content

Commit

Permalink
[greyhound] parallel consumer - add gaps limit (#35313)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: af1fd4bede9f00146454b2f28657bc1c845ae682
  • Loading branch information
ben-wattelman authored and wix-oss committed Oct 5, 2023
1 parent 370e44a commit c0b828d
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object Dispatcher {
maxParallelism: Int = 1,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]] = _ => ZIO.succeed(Map.empty),
gapsSizeLimit: Int = 256,
init: Promise[Nothing, Unit]
)(implicit trace: Trace): UIO[Dispatcher[R]] =
for {
Expand Down Expand Up @@ -85,8 +86,9 @@ object Dispatcher {
submitResult <- if (allSamePartition) {
val partition = RecordTopicPartition(records.head)
for {
worker <- workerFor(partition, records.head.offset)
submitted <- worker.submitBatch(records)
worker <- workerFor(partition, records.head.offset)
currentGaps <- currentGaps(Set(partition))
submitted <- worker.submitBatch(records, currentGaps)
} yield submitted
} else ZIO.succeed(SubmitBatchResult(success = false, Some(records.minBy(_.offset))))

Expand Down Expand Up @@ -172,6 +174,7 @@ object Dispatcher {
consumerAttributes,
consumeInParallel,
maxParallelism,
gapsSizeLimit,
updateBatch,
currentGaps
)
Expand Down Expand Up @@ -234,7 +237,7 @@ object Dispatcher {
trait Worker {
def submit(record: Record): URIO[Any, Boolean]

def submitBatch(records: Records): URIO[Any, SubmitBatchResult]
def submitBatch(records: Records, currentGaps: Map[TopicPartition, OffsetAndGaps]): URIO[Env, SubmitBatchResult]

def expose: URIO[Any, WorkerExposedState]

Expand All @@ -257,6 +260,7 @@ object Dispatcher {
consumerAttributes: Map[String, String],
consumeInParallel: Boolean,
maxParallelism: Int,
gapsSizeLimit: Int,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, OffsetAndGaps]]
)(implicit trace: Trace): URIO[R with Env, Worker] = for {
Expand Down Expand Up @@ -295,23 +299,13 @@ object Dispatcher {
)

override def submitBatch(
records: Records
): URIO[Any, SubmitBatchResult] =
queue
.offerAll(records)
.tap(notInserted =>
ZIO.when(notInserted.nonEmpty) {
Clock
.currentTime(TimeUnit.MILLISECONDS)
.flatMap(now =>
internalState.update(s => if (s.reachedHighWatermarkSince.nonEmpty) s else s.reachedHighWatermark(now)).commit
)
}
)
.map(rejected => {
val isSuccess = rejected.isEmpty
SubmitBatchResult(isSuccess, if (isSuccess) None else Some(rejected.minBy(_.offset)))
})
records: Records,
currentGaps: Map[TopicPartition, OffsetAndGaps]
): URIO[Env, SubmitBatchResult] = {
val gapsSize = OffsetAndGaps.gapsSize(currentGaps)
if (gapsSize + records.size <= gapsSizeLimit) submitBatchToQueue(queue, records, internalState)
else submitBatchPartially(group, clientId, partition, consumerAttributes, gapsSizeLimit, queue, internalState, records, gapsSize)
}

override def expose: URIO[Any, WorkerExposedState] = (queue.size zip internalState.get.commit)
.flatMap {
Expand Down Expand Up @@ -340,6 +334,61 @@ object Dispatcher {
internalState.get.flatMap(state => STM.check(state.currentExecutionStarted.isEmpty)).commit
}

private def submitBatchPartially[R](
group: Group,
clientId: ClientId,
partition: TopicPartition,
consumerAttributes: Map[ClientId, ClientId],
gapsSizeLimit: Int,
queue: Queue[Record],
internalState: TRef[WorkerInternalState],
records: Records,
gapsSize: Int
) = {
if (gapsSize == gapsSizeLimit) { // no records can be submitted
report(
DroppedRecordsDueToGapsSizeLimit(records.size, records.minBy(_.offset).offset, group, partition, clientId, consumerAttributes)
) *> ZIO.succeed(SubmitBatchResult(success = false, firstRejected = Some(records.minBy(_.offset))))
} else {
val sortedRecords = records.sortBy(_.offset)
val recordsToSubmit = sortedRecords.take(gapsSizeLimit - gapsSize)
val firstNotSubmitted =
sortedRecords
.take(gapsSizeLimit - gapsSize + 1)
.last // flow control in the calling function ensures this is safe, since records.size > gapsSizeLimit - gapsSize
report(
DroppedRecordsDueToGapsSizeLimit(recordsToSubmit.size, firstNotSubmitted.offset, group, partition, clientId, consumerAttributes)
) *>
submitBatchToQueue(queue, recordsToSubmit, internalState).flatMap {
case SubmitBatchResult(true, _) =>
ZIO.succeed(SubmitBatchResult(success = false, firstRejected = Some(firstNotSubmitted)))
case SubmitBatchResult(false, firstRejected) =>
ZIO.succeed(SubmitBatchResult(success = false, firstRejected = firstRejected))
}
}
}

private def submitBatchToQueue[R](
queue: Queue[Record],
records: Records,
internalState: TRef[WorkerInternalState]
): URIO[Any, SubmitBatchResult] =
queue
.offerAll(records)
.tap(notInserted =>
ZIO.when(notInserted.nonEmpty) {
Clock
.currentTime(TimeUnit.MILLISECONDS)
.flatMap(now =>
internalState.update(s => if (s.reachedHighWatermarkSince.nonEmpty) s else s.reachedHighWatermark(now)).commit
)
}
)
.map(rejected => {
val isSuccess = rejected.isEmpty
SubmitBatchResult(isSuccess, if (isSuccess) None else Some(rejected.minBy(_.offset)))
})

private def pollOnce[R](
state: Ref[DispatcherState],
internalState: TRef[WorkerInternalState],
Expand Down Expand Up @@ -579,6 +628,16 @@ object DispatcherMetric {
clientId: ClientId,
attributes: Map[String, String]
) extends DispatcherMetric

case class DroppedRecordsDueToGapsSizeLimit(
numRecords: Int,
firstDroppedOffset: Long,
group: Group,
partition: TopicPartition,
clientId: ClientId,
attributes: Map[String, String]
) extends DispatcherMetric

case class FailToUpdateCurrentExecutionStarted(
record: Record,
group: Group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object EventLoop {
config.maxParallelism,
updateBatch,
currentGaps,
config.gapsSizeLimit,
offsetsAndGapsInit
)
positionsRef <- Ref.make(Map.empty[TopicPartition, Offset])
Expand Down Expand Up @@ -396,7 +397,8 @@ case class EventLoopConfig(
delayResumeOfPausedPartition: Long,
startPaused: Boolean,
consumePartitionInParallel: Boolean,
maxParallelism: Int
maxParallelism: Int,
gapsSizeLimit: Int
)

object EventLoopConfig {
Expand All @@ -409,7 +411,8 @@ object EventLoopConfig {
delayResumeOfPausedPartition = 0,
startPaused = false,
consumePartitionInParallel = false,
maxParallelism = 1
maxParallelism = 1,
gapsSizeLimit = 256 // todo: calculate actual gaps limit based on the max metadata size
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,7 @@ object OffsetAndGaps {
def apply(offset: Offset): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap])

def apply(offset: Offset, committable: Boolean): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap], committable)

def gapsSize(gaps: Map[TopicPartition, OffsetAndGaps]): Int =
gaps.values.flatMap(_.gaps).map(_.size.toInt).sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,29 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
} yield result must beEqualTo(SubmitResult.RejectedBatch(record.copy(offset = 5L))))
}

"reject records and return first rejected when gaps limit is reached" in
new ctx(highWatermark = 20) {
val gapsSizeLimit = 5
run(for {
ref <- Ref.make[Map[TopicPartition, ShutdownPromise]](Map.empty)
init <- getInit
dispatcher <-
Dispatcher
.make[Any](
"group",
"clientId",
_ => ZIO.never,
lowWatermark,
highWatermark,
workersShutdownRef = ref,
init = init,
gapsSizeLimit = gapsSizeLimit
)
records = (0 until 7).map(i => record.copy(offset = i.toLong))
result <- submitBatch(dispatcher, records)
} yield result must beEqualTo(SubmitResult.RejectedBatch(record.copy(offset = 5L))))
}

"resume paused partitions" in
new ctx(lowWatermark = 3, highWatermark = 7) {
run(
Expand Down

0 comments on commit c0b828d

Please sign in to comment.