diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala index d6070e9d..a062ccd5 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsAndGaps.scala @@ -8,6 +8,7 @@ import com.wixpress.dst.greyhound.core.{Offset, OffsetAndMetadata, TopicPartitio import zio._ import java.util.Base64 +import scala.util.Try trait OffsetsAndGaps { def init(committedOffsets: Map[TopicPartition, OffsetAndGaps]): UIO[Unit] @@ -111,11 +112,9 @@ object OffsetsAndGaps { def parseGapsString(rawOffsetAndGapsString: String): Option[OffsetAndGaps] = { val offsetAndGapsString = - if (rawOffsetAndGapsString.nonEmpty) - new String( - GzipCompression.decompress(Base64.getDecoder.decode(rawOffsetAndGapsString)).getOrElse(Array.empty) - ) - else "" + if (rawOffsetAndGapsString.nonEmpty) { + Try(new String(GzipCompression.decompress(Base64.getDecoder.decode(rawOffsetAndGapsString)).getOrElse(Array.empty))).getOrElse("") + } else "" val lastHandledOffsetSeparatorIndex = offsetAndGapsString.indexOf(LAST_HANDLED_OFFSET_SEPARATOR) if (lastHandledOffsetSeparatorIndex < 0) None @@ -132,13 +131,19 @@ object OffsetsAndGaps { } } - def firstGapOffset(gapsString: String): Option[Offset] = { + private def firstGapOffset(gapsString: String): Option[Offset] = { val maybeOffsetAndGaps = parseGapsString(gapsString) maybeOffsetAndGaps match { case Some(offsetAndGaps) if offsetAndGaps.gaps.nonEmpty => Some(offsetAndGaps.gaps.minBy(_.start).start) case _ => None } } + + def gapsSmallestOffsets(offsets: Map[TopicPartition, Option[OffsetAndMetadata]]): Map[TopicPartition, OffsetAndMetadata] = + offsets + .collect { case (tp, Some(om)) => tp -> om } + .map(tpom => tpom._1 -> (firstGapOffset(tpom._2.metadata), tpom._2.metadata)) + .collect { case (tp, (Some(offset), metadata)) => tp -> OffsetAndMetadata(offset, metadata) } } case class Gap(start: Offset, end: Offset) { diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala index 234e7ba9..31b5d2bd 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializer.scala @@ -2,7 +2,6 @@ package com.wixpress.dst.greyhound.core.consumer import java.time.Clock import com.wixpress.dst.greyhound.core.consumer.ConsumerMetric.{CommittedMissingOffsets, CommittedMissingOffsetsFailed, SkippedGapsOnInitialization} -import com.wixpress.dst.greyhound.core.consumer.OffsetsAndGaps.{firstGapOffset, parseGapsString} import com.wixpress.dst.greyhound.core.{ClientId, Group, Offset, OffsetAndMetadata, TopicPartition} import com.wixpress.dst.greyhound.core.metrics.{GreyhoundMetric, GreyhoundMetrics} import zio.{URIO, ZIO} @@ -82,10 +81,7 @@ class OffsetsInitializer( val seekToEndPartitions = seekTo.collect { case (k, SeekTo.SeekToEnd) => k }.toSet val toPause = seekTo.collect { case (k, SeekTo.Pause) => k } val seekToEndOffsets = fetchEndOffsets(seekToEndPartitions, timeout).mapValues(OffsetAndMetadata.apply) - val gapsSmallestOffsets = currentCommittedOffsets - .collect { case (tp, Some(om)) => tp -> om } - .map(tpom => tpom._1 -> (firstGapOffset(tpom._2.metadata), tpom._2.metadata)) - .collect { case (tp, (Some(offset), metadata)) => tp -> OffsetAndMetadata(offset, metadata) } + val gapsSmallestOffsets = OffsetsAndGaps.gapsSmallestOffsets(currentCommittedOffsets) val seekToGapsOffsets = if (parallelConsumer) gapsSmallestOffsets else Map.empty val toOffsets = seekToOffsets ++ seekToEndOffsets ++ seekToGapsOffsets @@ -96,7 +92,7 @@ class OffsetsInitializer( private def reportSkippedGaps(currentCommittedOffsets: Map[TopicPartition, Option[OffsetAndMetadata]]) = { val skippedGaps = currentCommittedOffsets .collect { case (tp, Some(om)) => tp -> om } - .map(tpom => tpom._1 -> parseGapsString(tpom._2.metadata)) + .map(tpom => tpom._1 -> OffsetsAndGaps.parseGapsString(tpom._2.metadata)) .collect { case (tp, Some(gaps)) => tp -> gaps } reporter(SkippedGapsOnInitialization(clientId, group, skippedGaps)) } diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala index 2b8ffb0f..32aeb11a 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/OffsetsInitializerTest.scala @@ -192,6 +192,24 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito { ) } + "commit offsets even when unknown metadata string is present in committed offsets" in + new ctx() { + givenCommittedOffsetsAndMetadata(partitions)(Map(p1 -> OffsetAndMetadata(randomInt, randomStr))) + givenPositions(p2 -> p2Pos, p3 -> p3Pos) + + committer.initializeOffsets(partitions) + + val missingOffsets = Map( + p2 -> p2Pos, + p3 -> p3Pos + ) + there was + one(offsetOps).commitWithMetadata( + missingOffsets.mapValues(OffsetAndMetadata(_)), + timeout + ) + } + class ctx(val seekTo: Map[TopicPartition, SeekTo] = Map.empty, offsetReset: OffsetReset = OffsetReset.Latest) extends Scope { private val metricsLogRef = new AtomicReference(Seq.empty[GreyhoundMetric]) def reported = metricsLogRef.get @@ -226,6 +244,12 @@ class OffsetsInitializerTest extends SpecificationWithJUnit with Mockito { offsetOps.committedWithMetadata(partitions, timeout) returns result.mapValues(OffsetAndMetadata(_)) } + def givenCommittedOffsetsAndMetadata(partitions: Set[TopicPartition], timeout: zio.Duration = timeout)( + result: Map[TopicPartition, OffsetAndMetadata] + ) = { + offsetOps.committedWithMetadata(partitions, timeout) returns result + } + def givenEndOffsets(partitions: Set[TopicPartition], timeout: zio.Duration = timeout)(result: Map[TopicPartition, Long]) = { offsetOps.endOffsets(partitions, timeout) returns result }