Skip to content

Commit

Permalink
[greyhound] parallel consumer OffsetsAndGaps (#33605)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: f178c94663c7cbcb22bd7266c3e15919d8997d8c
  • Loading branch information
ben-wattelman authored and wix-oss committed Sep 3, 2023
1 parent 87d8c14 commit 199c66c
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.wixpress.dst.greyhound.core.consumer

import com.wixpress.dst.greyhound.core.{Offset, TopicPartition}
import zio._

trait OffsetsAndGaps {
def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]]

def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]]

def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit]

def contains(partition: TopicPartition, offset: Offset): UIO[Boolean]
}

object OffsetsAndGaps {
def make: UIO[OffsetsAndGaps] =
Ref.make(Map.empty[TopicPartition, OffsetAndGaps]).map { ref =>
new OffsetsAndGaps {
override def getCommittableAndClear: UIO[Map[TopicPartition, OffsetAndGaps]] =
ref.modify(offsetsAndGaps => {
val committable = offsetsAndGaps.filter(_._2.committable)
val updated = offsetsAndGaps.mapValues(_.markCommitted)
(committable, updated)
})

override def gapsForPartition(partition: TopicPartition): UIO[Seq[Gap]] =
ref.get.map(_.get(partition).fold(Seq.empty[Gap])(_.gaps.sortBy(_.start)))

override def update(partition: TopicPartition, batch: Seq[Offset]): UIO[Unit] =
ref.update { offsetsAndGaps =>
val sortedBatch = batch.sorted
val maxBatchOffset = sortedBatch.last
val maybeOffsetAndGaps = offsetsAndGaps.get(partition)
val prevOffset = maybeOffsetAndGaps.fold(-1L)(_.offset)
val partitionOffsetAndGaps = maybeOffsetAndGaps.fold(OffsetAndGaps(maxBatchOffset))(identity)

val newGaps = gapsInBatch(sortedBatch, prevOffset)

val updatedGaps = updateGapsByOffsets(
partitionOffsetAndGaps.gaps ++ newGaps,
sortedBatch
)

offsetsAndGaps + (partition -> OffsetAndGaps(maxBatchOffset max prevOffset, updatedGaps))
}.unit

override def contains(partition: TopicPartition, offset: Offset): UIO[Boolean] =
ref.get.map(_.get(partition).fold(false)(_.contains(offset)))

private def gapsInBatch(batch: Seq[Offset], prevLastOffset: Offset): Seq[Gap] =
batch.sorted
.foldLeft(Seq.empty[Gap], prevLastOffset) {
case ((gaps, lastOffset), offset) =>
if (offset <= lastOffset) (gaps, lastOffset)
else if (offset == lastOffset + 1) (gaps, offset)
else {
val newGap = Gap(lastOffset + 1, offset - 1)
(newGap +: gaps, offset)
}
}
._1
.reverse

private def updateGapsByOffsets(gaps: Seq[Gap], offsets: Seq[Offset]): Seq[Gap] = {
val gapsToOffsets = gaps.map(gap => gap -> offsets.filter(o => o >= gap.start && o <= gap.end)).toMap
gapsToOffsets.flatMap {
case (gap, offsets) =>
if (offsets.isEmpty) Seq(gap)
else if (offsets.size == (gap.size)) Seq.empty[Gap]
else gapsInBatch(offsets ++ Seq(gap.start - 1, gap.end + 1), gap.start - 2)
}.toSeq
}
}
}
}

case class Gap(start: Offset, end: Offset) {
def contains(offset: Offset): Boolean = start <= offset && offset <= end

def size: Long = end - start + 1
}

case class OffsetAndGaps(offset: Offset, gaps: Seq[Gap], committable: Boolean = true) {
def contains(offset: Offset): Boolean = gaps.exists(_.contains(offset))

def markCommitted: OffsetAndGaps = copy(committable = false)
}

object OffsetAndGaps {
def apply(offset: Offset): OffsetAndGaps = OffsetAndGaps(offset, Seq.empty[Gap])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.wixpress.dst.greyhound.core.consumer

import com.wixpress.dst.greyhound.core.TopicPartition
import com.wixpress.dst.greyhound.core.consumer.OffsetGapsTest._
import com.wixpress.dst.greyhound.core.testkit.BaseTestNoEnv

class OffsetsAndGapsTestGapsTest extends BaseTestNoEnv {

"calculate gaps created by handled batch" in {
for {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
currentGaps <- offsetGaps.gapsForPartition(topicPartition)
} yield currentGaps must beEqualTo(Seq(Gap(0L, 0L), Gap(2L, 2L), Gap(4L, 6L)))
}

"update offset and gaps according to handled batch" in {
for {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.update(topicPartition, Seq(2L, 5L))
getCommittableAndClear <- offsetGaps.getCommittableAndClear
} yield getCommittableAndClear must havePair(topicPartition -> OffsetAndGaps(7L, Seq(Gap(0L, 0L), Gap(4L, 4L), Gap(6L, 6L))))
}

"clear committable offsets" in {
for {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.getCommittableAndClear
getCommittableAndClear <- offsetGaps.getCommittableAndClear
} yield getCommittableAndClear must beEmpty
}

"do not clear gaps on retrieving current" in {
for {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(topicPartition, Seq(1L, 3L, 7L))
_ <- offsetGaps.gapsForPartition(topicPartition)
currentGaps <- offsetGaps.gapsForPartition(topicPartition)
} yield currentGaps must beEqualTo(Seq(Gap(0L, 0L), Gap(2L, 2L), Gap(4L, 6L)))
}

"update with larger offset" in {
val partition0 = TopicPartition(topic, 0)
val partition1 = TopicPartition(topic, 1)

for {
offsetGaps <- OffsetsAndGaps.make
_ <- offsetGaps.update(partition0, Seq(1L))
_ <- offsetGaps.update(partition0, Seq(0L))
_ <- offsetGaps.update(partition1, Seq(0L))
current <- offsetGaps.getCommittableAndClear
} yield current must havePairs(partition0 -> OffsetAndGaps(1L, Seq()), partition1 -> OffsetAndGaps(0L, Seq()))
}

}

object OffsetGapsTest {
val topic = "some-topic"
val topicPartition = TopicPartition(topic, 0)
}

0 comments on commit 199c66c

Please sign in to comment.