diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala index 81a1c06f..5163f24c 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Dispatcher.scala @@ -117,7 +117,7 @@ object Dispatcher { override def revoke(partitions: Set[TopicPartition]): URIO[GreyhoundMetrics, Unit] = workers .modify { workers => - val revoked = workers.filterKeys(partitions.contains) + val revoked = workers.filterKeys(partitions.contains) val remaining = workers -- partitions (revoked, remaining) @@ -246,13 +246,23 @@ object Dispatcher { queue <- Queue.dropping[Record](capacity) internalState <- TRef.make(WorkerInternalState.empty).commit fiber <- - (reportWorkerRunningInInterval(every = 60.seconds, internalState)(partition, group, clientId).forkDaemon *> - (if (consumeInParallel) - pollBatch(status, internalState, handle, queue, group, clientId, partition, consumerAttributes, maxParallelism, updateBatch, currentGaps) - else pollOnce(status, internalState, handle, queue, group, clientId, partition, consumerAttributes)) - .repeatWhile(_ == true)) - .interruptible - .forkDaemon + (reportWorkerRunningInInterval(every = 60.seconds, internalState)(partition, group, clientId).forkDaemon *> + (if (consumeInParallel) + pollBatch( + status, + internalState, + handle, + queue, + group, + clientId, + partition, + consumerAttributes, + maxParallelism, + updateBatch, + currentGaps + ) + else pollOnce(status, internalState, handle, queue, group, clientId, partition, consumerAttributes)) + .repeatWhile(_ == true)).interruptible.forkDaemon } yield new Worker { override def submit(record: Record): URIO[Any, Boolean] = queue @@ -406,17 +416,18 @@ object Dispatcher { ) groupedRecords = records.groupBy(_.key).values // todo: add sub-grouping for records without key latestCommitGaps <- currentGaps(records.map(r => TopicPartition(r.topic, r.partition)).toSet) - _ <- ZIO - .foreachParDiscard(groupedRecords)(sameKeyRecords => - ZIO.foreach(sameKeyRecords) { record => - if (shouldRecordBeHandled(record, latestCommitGaps)) { - handle(record).interruptible.ignore *> updateBatch(sameKeyRecords).interruptible - } else - report(SkippedPreviouslyHandledRecord(record, group, clientId, consumerAttributes)) - - } - ) - .withParallelism(maxParallelism) + _ <- report(InvokingHandlersInParallel(Math.max(groupedRecords.size, maxParallelism))) *> + ZIO + .foreachParDiscard(groupedRecords)(sameKeyRecords => + ZIO.foreach(sameKeyRecords) { record => + if (shouldRecordBeHandled(record, latestCommitGaps)) { + handle(record).interruptible.ignore *> updateBatch(sameKeyRecords).interruptible + } else + report(SkippedPreviouslyHandledRecord(record, group, clientId, consumerAttributes)) + + } + ) + .withParallelism(maxParallelism) res <- isActive(internalState) } yield res } @@ -568,6 +579,8 @@ object DispatcherMetric { currentExecutionStarted: Option[Long] ) extends DispatcherMetric + case class InvokingHandlersInParallel(numHandlers: Int) extends DispatcherMetric + case class SkippedPreviouslyHandledRecord(record: Record, group: Group, clientId: ClientId, attributes: Map[String, String]) extends DispatcherMetric diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala index fd291da0..62f9c638 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala @@ -296,7 +296,7 @@ object EventLoop { val offsetsAndMetadataToCommit = OffsetsAndGaps.toOffsetsAndMetadata(committable) consumer .commitWithMetadata(offsetsAndMetadataToCommit) - .tap(_ => report(CommittedOffsetsAndMetadata(offsetsAndMetadataToCommit))) + .tap(_ => ZIO.when(offsetsAndMetadataToCommit.nonEmpty)(report(CommittedOffsetsAndMetadata(offsetsAndMetadataToCommit)))) .catchAll { t => report(FailedToCommitOffsetsAndMetadata(t, offsetsAndMetadataToCommit)) *> offsetsAndGaps.setCommittable(committable) } @@ -331,7 +331,8 @@ object EventLoop { .commitWithMetadataOnRebalance(OffsetsAndGaps.toOffsetsAndMetadata(committable)) .catchAll { _ => offsetsAndGaps.setCommittable(committable) *> DelayedRebalanceEffect.zioUnit } runtime <- ZIO.runtime[Any] - } yield tle.catchAll { _ => zio.Unsafe.unsafe { implicit s => + } yield tle.catchAll { _ => + zio.Unsafe.unsafe { implicit s => runtime.unsafe .run(offsetsAndGaps.setCommittable(committable)) .getOrThrowFiberFailure() @@ -407,6 +408,14 @@ object EventLoopMetric { case class FailedToUpdatePositions(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric + case class FailedToUpdateGapsOnPartitionAssignment( + t: Throwable, + clientId: ClientId, + group: Group, + partitions: Set[TopicPartition], + attributes: Map[String, String] = Map.empty + ) extends EventLoopMetric + case class FailedToFetchCommittedGaps(t: Throwable, clientId: ClientId, attributes: Map[String, String] = Map.empty) extends EventLoopMetric diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala index 2704b5d6..b2e68bca 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala @@ -11,16 +11,19 @@ trait OffsetsAndGaps { def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] - def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] + def update(partition: TopicPartition, batch: Seq[Offset], prevCommittedOffset: Option[Offset]): UIO[Unit] def update(record: ConsumerRecord[_, _]): UIO[Unit] = - update(RecordTopicPartition(record), Seq(record.offset)) + update(RecordTopicPartition(record), Seq(record.offset), None) def update(records: Chunk[ConsumerRecord[_, _]]): UIO[Unit] = { val sortedBatch = records.sortBy(_.offset) - update(RecordTopicPartition(sortedBatch.head), sortedBatch.map(_.offset) ++ Seq(sortedBatch.last.offset + 1)) + update(RecordTopicPartition(sortedBatch.head), sortedBatch.map(_.offset) ++ Seq(sortedBatch.last.offset + 1), None) } + def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] = + update(partition, batch, None) + def setCommittable(offsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit] def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] @@ -40,7 +43,7 @@ object OffsetsAndGaps { override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] = ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start))) - override def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] = + override def update(partition: TopicPartition, batch: Seq[Offset], prevCommittedOffset: Option[Offset]): UIO[Unit] = ref.update { offsetsAndGaps => val sortedBatch = batch.sorted val maxBatchOffset = sortedBatch.last