Skip to content

Commit

Permalink
[greyhound] parallel consumer - add visibility (#34908)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 611f9b35285657e84dfaf69cf2af1744b54672e8
  • Loading branch information
ben-wattelman authored and wix-oss committed Oct 5, 2023
1 parent ff46bd1 commit f24b312
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object Dispatcher {
startPaused: Boolean = false,
consumeInParallel: Boolean = false,
maxParallelism: Int = 1,
updateBatch: Chunk[Record] => UIO[Unit] = _ => ZIO.unit,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]] = _ =>
ZIO.succeed(Map.empty)
)(implicit trace: Trace): UIO[Dispatcher[R]] =
Expand All @@ -73,7 +73,7 @@ object Dispatcher {

override def submitBatch(records: Records): URIO[R with Env, SubmitResult] =
for {
_ <- report(SubmittingRecordBatch(group, clientId, records.size, consumerAttributes))
_ <- report(SubmittingRecordBatch(group, clientId, records.size, records, consumerAttributes))
allSamePartition = records.map(r => RecordTopicPartition(r)).distinct.size == 1
submitResult <- if (allSamePartition) {
val partition = RecordTopicPartition(records.head)
Expand Down Expand Up @@ -240,7 +240,7 @@ object Dispatcher {
consumerAttributes: Map[String, String],
consumeInParallel: Boolean,
maxParallelism: Int,
updateBatch: Chunk[Record] => UIO[Unit] = _ => ZIO.unit,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit] = _ => ZIO.unit,
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]]
)(implicit trace: Trace): URIO[R with Env, Worker] = for {
queue <- Queue.dropping[Record](capacity)
Expand Down Expand Up @@ -355,7 +355,7 @@ object Dispatcher {
partition: TopicPartition,
consumerAttributes: Map[String, String],
maxParallelism: Int,
updateBatch: Chunk[Record] => UIO[Unit],
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit],
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]]
)(implicit trace: Trace): ZIO[R with GreyhoundMetrics, Any, Boolean] =
internalState.update(s => s.cleared).commit *>
Expand Down Expand Up @@ -391,8 +391,8 @@ object Dispatcher {
clientId: ClientId,
partition: TopicPartition,
consumerAttributes: Map[ClientId, ClientId],
maxParallelism: RuntimeFlags,
updateBatch: Chunk[Record] => UIO[Unit],
maxParallelism: Int,
updateBatch: Chunk[Record] => URIO[GreyhoundMetrics, Unit],
currentGaps: Set[TopicPartition] => ZIO[GreyhoundMetrics, Nothing, Map[TopicPartition, Option[OffsetAndGaps]]]
): ZIO[R with GreyhoundMetrics, Throwable, Boolean] =
for {
Expand Down Expand Up @@ -511,8 +511,13 @@ object DispatcherMetric {
case class SubmittingRecord[K, V](group: Group, clientId: ClientId, record: ConsumerRecord[K, V], attributes: Map[String, String])
extends DispatcherMetric

case class SubmittingRecordBatch[K, V](group: Group, clientId: ClientId, numRecords: Int, attributes: Map[String, String])
extends DispatcherMetric
case class SubmittingRecordBatch[K, V](
group: Group,
clientId: ClientId,
numRecords: Int,
records: Records,
attributes: Map[String, String]
) extends DispatcherMetric

case class HandlingRecord[K, V](
group: Group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object EventLoop {
offsetsAndGaps <- OffsetsAndGaps.make
handle = if (config.consumePartitionInParallel) { cr: Record => handler.handle(cr) }
else handler.andThen(offsets.update).handle(_)
updateBatch = { records: Chunk[Record] => offsetsAndGaps.update(records) }
updateBatch = { records: Chunk[Record] => report(HandledBatch(records)) *> offsetsAndGaps.update(records) }
currentGaps = { partitions: Set[TopicPartition] => currentGapsForPartitions(partitions, clientId)(consumer) }
_ <- report(CreatingDispatcher(clientId, group, consumerAttributes, config.startPaused))
dispatcher <- Dispatcher.make(
Expand Down Expand Up @@ -278,7 +278,9 @@ object EventLoop {
report(PartitionThrottled(partition, partitionToRecords._2.map(_.offset).min, consumer.config.consumerAttributes)).as(acc)
else
dispatcher.submitBatch(partitionToRecords._2.toSeq).flatMap {
case SubmitResult.Submitted => ZIO.succeed(acc)
case SubmitResult.Submitted =>
report(SubmittedBatch(partitionToRecords._2.size, partitionToRecords._1, partitionToRecords._2.map(_.offset))) *>
ZIO.succeed(acc)
case RejectedBatch(firstRejected) =>
report(HighWatermarkReached(partition, firstRejected.offset, consumer.config.consumerAttributes)) *>
consumer.pause(firstRejected).fold(_ => acc, _ => acc + partition)
Expand All @@ -292,7 +294,12 @@ object EventLoop {
private def commitOffsetsAndGaps(consumer: Consumer, offsetsAndGaps: OffsetsAndGaps): URIO[GreyhoundMetrics, Unit] =
offsetsAndGaps.getCommittableAndClear.flatMap { committable =>
val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable)
consumer.commitWithMetadata(offsetsAndMetadataToCommit).catchAll { _ => offsetsAndGaps.setCommittable(committable) }
consumer
.commitWithMetadata(offsetsAndMetadataToCommit)
.tap(_ => report(CommittedOffsetsAndMetadata(offsetsAndMetadataToCommit)))
.catchAll { t =>
report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable)
}
}

private def commitOffsetsOnRebalance(
Expand Down Expand Up @@ -396,6 +403,8 @@ object EventLoopMetric {
attributes: Map[String, String] = Map.empty
) extends EventLoopMetric

case class SubmittedBatch(numSubmitted: Int, partition: TopicPartition, offsets: Iterable[Offset]) extends EventLoopMetric

case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric

case class FailedToFetchCommittedGaps(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty)
Expand All @@ -410,6 +419,13 @@ object EventLoopMetric {
case class CreatingPollOnceFiber(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class AwaitingPartitionsAssignment(clientId: ClientId, group: Group, attributes: Map[String, String]) extends EventLoopMetric

case class CommittedOffsetsAndMetadata(offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]) extends EventLoopMetric

case class FailedToCommitOffsetsAndMetadata(t: Throwable, offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata])
extends EventLoopMetric

case class HandledBatch(records: Records) extends EventLoopMetric
}

sealed trait EventLoopState
Expand Down

0 comments on commit f24b312

Please sign in to comment.