From 60378dfa8f6cd90e0ab86adb95568822e910a291 Mon Sep 17 00:00:00 2001 From: Ben Wattelman <82799628+ben-wattelman@users.noreply.github.com> Date: Tue, 4 Apr 2023 12:03:59 +0300 Subject: [PATCH] [greyhound] parallel consumer OffsetsAndGaps (#33605) GitOrigin-RevId: f178c94663c7cbcb22bd7266c3e15919d8997d8c --- .../core/consumer/OffsetsAndGaps.scala | 92 +++++++++++++++++++ .../core/consumer/OffsetsAndGapsTest.scala | 62 +++++++++++++ 2 files changed, 154 insertions(+) create mode 100644 core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala create mode 100644 core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala new file mode 100644 index 00000000..f5718555 --- /dev/null +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala @@ -0,0 +1,92 @@ +package com.wixpress.dst.greyhound.core.consumer + +import com.wixpress.dst.greyhound.core.{Offset, TopicPartition} +import zio._ + +trait OffsetsAndGaps { + def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] + + def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] + + def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] + + def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] +} + +object OffsetsAndGaps { + def make: UIO[OffsetsAndGaps] = + Ref.make(Map.empty[TopicPartition, OffsetAndGaps]).map { ref => + new OffsetsAndGaps { + override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] = + ref.modify(offsetsAndGaps => { + val committable = offsetsAndGaps.filter(_._2.committable) + val updated = offsetsAndGaps.mapValues(_.markCommitted) + (committable, updated) + }) + + override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] = + ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start))) + + override def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] = + ref.update { offsetsAndGaps => + val sortedBatch = batch.sorted + val maxBatchOffset = sortedBatch.last + val maybeOffsetAndGaps = offsetsAndGaps.get(partition) + val prevOffset = maybeOffsetAndGaps.fold(-1L)(_.offset) + val partitionOffsetAndGaps = maybeOffsetAndGaps.fold(OffsetAndGaps(maxBatchOffset))(identity) + + val newGaps = gapsInBatch(sortedBatch, prevOffset) + + val updatedGaps = updateGapsByOffsets( + partitionOffsetAndGaps.gaps ++ newGaps, + sortedBatch + ) + + offsetsAndGaps + (partition -> OffsetAndGaps(maxBatchOffset max prevOffset, updatedGaps)) + }.unit + + override def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] = + ref.get.map(_.get(partition).fold(false)(_.contains(offset))) + + private def gapsInBatch(batch: Seq[Offset], prevLastOffset: Offset): Seq[Gap] = + batch.sorted + .foldLeft(Seq.empty[Gap], prevLastOffset) { + case ((gaps, lastOffset), offset) => + if (offset <= lastOffset) (gaps, lastOffset) + else if (offset == lastOffset + 1) (gaps, offset) + else { + val newGap = Gap(lastOffset + 1, offset - 1) + (newGap +: gaps, offset) + } + } + ._1 + .reverse + + private def updateGapsByOffsets(gaps: Seq[Gap], offsets: Seq[Offset]): Seq[Gap] = { + val gapsToOffsets = gaps.map(gap => gap -> offsets.filter(o => o >= gap.start && o <= gap.end)).toMap + gapsToOffsets.flatMap { + case (gap, offsets) => + if (offsets.isEmpty) Seq(gap) + else if (offsets.size == (gap.size)) Seq.empty[Gap] + else gapsInBatch(offsets ++ Seq(gap.start - 1, gap.end + 1), gap.start - 2) + }.toSeq + } + } + } +} + +case class Gap(start: Offset, end: Offset) { + def contains(offset: Offset): Boolean = start <= offset && offset <= end + + def size: Long = end - start + 1 +} + +case class OffsetAndGaps(offset: Offset, gaps: Seq[Gap], committable: Boolean = true) { + def contains(offset: Offset): Boolean = gaps.exists(_.contains(offset)) + + def markCommitted: OffsetAndGaps = copy(committable = false) +} + +object OffsetAndGaps { + def apply(offset: Offset): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap]) +} diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala new file mode 100644 index 00000000..df765ce5 --- /dev/null +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGapsTest.scala @@ -0,0 +1,62 @@ +package com.wixpress.dst.greyhound.core.consumer + +import com.wixpress.dst.greyhound.core.TopicPartition +import com.wixpress.dst.greyhound.core.consumer.OffsetGapsTest._ +import com.wixpress.dst.greyhound.core.testkit.BaseTestNoEnv + +class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv { + + "calculate gaps created by handled batch" in { + for { + offsetGaps <- OffsetsAndGaps.make + _ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) + currentGaps <- offsetGaps.gapsForPartition(topicPartition) + } yield currentGaps must beEqualTo(Seq(Gap(0L, 0L), Gap(2L, 2L), Gap(4L, 6L))) + } + + "update offset and gaps according to handled batch" in { + for { + offsetGaps <- OffsetsAndGaps.make + _ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) + _ <- offsetGaps.update(topicPartition, Seq(2L, 5L)) + getCommittableAndClear <- offsetGaps.getCommittableAndClear + } yield getCommittableAndClear must havePair(topicPartition -> OffsetAndGaps(7L, Seq(Gap(0L, 0L), Gap(4L, 4L), Gap(6L, 6L)))) + } + + "clear committable offsets" in { + for { + offsetGaps <- OffsetsAndGaps.make + _ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) + _ <- offsetGaps.getCommittableAndClear + getCommittableAndClear <- offsetGaps.getCommittableAndClear + } yield getCommittableAndClear must beEmpty + } + + "do not clear gaps on retrieving current" in { + for { + offsetGaps <- OffsetsAndGaps.make + _ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L)) + _ <- offsetGaps.gapsForPartition(topicPartition) + currentGaps <- offsetGaps.gapsForPartition(topicPartition) + } yield currentGaps must beEqualTo(Seq(Gap(0L, 0L), Gap(2L, 2L), Gap(4L, 6L))) + } + + "update with larger offset" in { + val partition0 = TopicPartition(topic, 0) + val partition1 = TopicPartition(topic, 1) + + for { + offsetGaps <- OffsetsAndGaps.make + _ <- offsetGaps.update(partition0, Seq(1L)) + _ <- offsetGaps.update(partition0, Seq(0L)) + _ <- offsetGaps.update(partition1, Seq(0L)) + current <- offsetGaps.getCommittableAndClear + } yield current must havePairs(partition0 -> OffsetAndGaps(1L, Seq()), partition1 -> OffsetAndGaps(0L, Seq())) + } + +} + +object OffsetGapsTest { + val topic = "some-topic" + val topicPartition = TopicPartition(topic, 0) +}