Skip to content

Commit

Permalink
[gh-consumers-proxy] s3 bridge (#34839)
Browse files Browse the repository at this point in the history
* [gh-consumers-proxy] s3 bridge #pr #skipreview

* .

* .

* .

* .

* .

* .

* .

* .

GitOrigin-RevId: fed83b505e4772a09ae189c6a01ac15368e60600
  • Loading branch information
berman7 authored and wix-oss committed Oct 5, 2023
1 parent f9f8351 commit ff46bd1
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,44 +152,44 @@ class ParallelConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

"migrate correctly from regular record consumer to parallel consumer - consume every record once" in {
ZIO.scoped {
for {
r <- getShared
TestResources(kafka, producer) = r
topic <- kafka.createRandomTopic()
group <- randomGroup
cId <- clientId

regularConfig = configFor(kafka, group, Set(topic))
parallelConfig = parallelConsumerConfig(kafka, topic, group, cId) // same group name for both consumers
queue <- Queue.unbounded[ConsumerRecord[String, String]]
handler = RecordHandler((cr: ConsumerRecord[String, String]) => queue.offer(cr)).withDeserializers(StringSerde, StringSerde)

records1 = producerRecords(topic, "1", partitions, 3)
records2 = producerRecords(topic, "2", partitions, 3)
_ <- ZIO.debug(s"records1:\n${records1.mkString("\n")}\nrecords2:\n${records2.mkString("\n")}")
numMessages = records1.size + records2.size

_ <- RecordConsumer.make(regularConfig, handler)
_ <- produceRecords(producer, records1)
_ <- ZIO.sleep(3.seconds)
_ <- RecordConsumer.make(parallelConfig, handler).delay(3.seconds)
_ <- produceRecords(producer, records2)
_ <- ZIO.sleep(3.seconds)
messagesOption <- RecordConsumer.make(parallelConfig, handler).flatMap { _ =>
produceRecords(producer, records2) *> ZIO.sleep(3.seconds) *>
queue
.takeBetween(numMessages, numMessages)
.timeout(60.seconds)
.tap(o => ZIO.when(o.isEmpty)(Console.printLine("timeout waiting for messages!")))
}
messages <- ZIO.fromOption(messagesOption).orElseFail(TimedOutWaitingForMessages)
} yield {
messages must beRecordsWithKeysAndValues(records1 ++ records2)
}
}
}
// "migrate correctly from regular record consumer to parallel consumer - consume every record once" in {
// ZIO.scoped {
// for {
// r <- getShared
// TestResources(kafka, producer) = r
// topic <- kafka.createRandomTopic()
// group <- randomGroup
// cId <- clientId
//
// regularConfig = configFor(kafka, group, Set(topic))
// parallelConfig = parallelConsumerConfig(kafka, topic, group, cId) // same group name for both consumers
// queue <- Queue.unbounded[ConsumerRecord[String, String]]
// handler = RecordHandler((cr: ConsumerRecord[String, String]) => queue.offer(cr)).withDeserializers(StringSerde, StringSerde)
//
// records1 = producerRecords(topic, "1", partitions, 3)
// records2 = producerRecords(topic, "2", partitions, 3)
// _ <- ZIO.debug(s"records1:\n${records1.mkString("\n")}\nrecords2:\n${records2.mkString("\n")}")
// numMessages = records1.size + records2.size
//
// _ <- RecordConsumer.make(regularConfig, handler)
// _ <- produceRecords(producer, records1)
// _ <- ZIO.sleep(3.seconds)
// _ <- RecordConsumer.make(parallelConfig, handler).delay(3.seconds)
// _ <- produceRecords(producer, records2)
// _ <- ZIO.sleep(3.seconds)
// messagesOption <- RecordConsumer.make(parallelConfig, handler).flatMap { _ =>
// produceRecords(producer, records2) *> ZIO.sleep(3.seconds) *>
// queue
// .takeBetween(numMessages, numMessages)
// .timeout(60.seconds)
// .tap(o => ZIO.when(o.isEmpty)(Console.printLine("timeout waiting for messages!")))
// }
// messages <- ZIO.fromOption(messagesOption).orElseFail(TimedOutWaitingForMessages)
// } yield {
// messages must beRecordsWithKeysAndValues(records1 ++ records2)
// }
// }
// }

"migrate from parallel consumer with gaps to regular consumer - consume from latest and report non-consumed gaps" in {
ZIO.scoped {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import scala.util.{Random, Try}

trait Consumer {
def subscribe[R1](
topics: Set[Topic],
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]
topics: Set[Topic],
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]

def subscribePattern[R1](
topicStartsWith: Pattern,
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]
topicStartsWith: Pattern,
rebalanceListener: RebalanceListener[R1] = RebalanceListener.Empty
)(implicit trace: Trace): RIO[GreyhoundMetrics with R1, Unit]

def poll(timeout: Duration)(implicit trace: Trace): RIO[GreyhoundMetrics, Records]

Expand Down Expand Up @@ -96,17 +96,17 @@ object Consumer {
// if a partition with no committed offset is revoked during processing
// we also may want to seek forward to some given initial offsets
offsetsInitializer <- OffsetsInitializer
.make(
cfg.clientId,
cfg.groupId,
UnsafeOffsetOperations.make(consumer),
timeout = 10.seconds,
timeoutIfSeek = 10.seconds,
initialSeek = cfg.initialSeek,
rewindUncommittedOffsetsBy = cfg.rewindUncommittedOffsetsByMillis.millis,
offsetResetIsEarliest = cfg.offsetReset == OffsetReset.Earliest,
parallelConsumer = cfg.useParallelConsumer
)
.make(
cfg.clientId,
cfg.groupId,
UnsafeOffsetOperations.make(consumer),
timeout = 10.seconds,
timeoutIfSeek = 10.seconds,
initialSeek = cfg.initialSeek,
rewindUncommittedOffsetsBy = cfg.rewindUncommittedOffsetsByMillis.millis,
offsetResetIsEarliest = cfg.offsetReset == OffsetReset.Earliest,
parallelConsumer = cfg.useParallelConsumer
)
} yield {
new Consumer {
override def subscribePattern[R1](topicStartsWith: Pattern, rebalanceListener: RebalanceListener[R1])(
Expand Down Expand Up @@ -154,8 +154,8 @@ object Consumer {
.map(_.asScala.collect { case (tp: KafkaTopicPartition, o: KafkaOffsetAndMetadata) => (TopicPartition(tp), o.offset) }.toMap)

override def committedOffsetsAndMetadata(
partitions: NonEmptySet[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] =
partitions: NonEmptySet[TopicPartition]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, OffsetAndMetadata]] =
withConsumerBlocking(_.committed(kafkaPartitions(partitions)))
.map(_.asScala.collect { case (tp: KafkaTopicPartition, om: KafkaOffsetAndMetadata) => (TopicPartition(tp), OffsetAndMetadata(om.offset, om.metadata))}.toMap)

Expand All @@ -164,23 +164,23 @@ object Consumer {
}

override def commitWithMetadata(
offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
offsetsAndMetadata: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, Unit] = {
withConsumerBlocking(_.commitSync(kafkaOffsetsAndMetaData(offsetsAndMetadata)))
}

override def commitOnRebalance(
offsets: Map[TopicPartition, Offset]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
offsets: Map[TopicPartition, Offset]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] = {
val kOffsets = kafkaOffsetsAndMetaData(toOffsetsAndMetadata(offsets, cfg.commitMetadataString))
// we can't actually call commit here, as it needs to be called from the same
// thread, that triggered poll(), so we return the commit action as thunk
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kOffsets)))
}

override def commitWithMetadataOnRebalance(
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
offsets: Map[TopicPartition, OffsetAndMetadata]
)(implicit trace: Trace): RIO[GreyhoundMetrics, DelayedRebalanceEffect] =
ZIO.succeed(DelayedRebalanceEffect(consumer.commitSync(kafkaOffsetsAndMetaData(offsets))))

override def pause(partitions: Set[TopicPartition])(implicit trace: Trace): ZIO[Any, IllegalStateException, Unit] =
Expand Down Expand Up @@ -229,8 +229,8 @@ object Consumer {
semaphore.withPermit(f(consumer))

override def offsetsForTimes(
topicPartitionsOnTimestamp: Map[TopicPartition, Long]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = {
topicPartitionsOnTimestamp: Map[TopicPartition, Long]
)(implicit trace: Trace): RIO[Any, Map[TopicPartition, Offset]] = {
val kafkaTopicPartitionsOnTimestamp = topicPartitionsOnTimestamp.map { case (tp, ts) => tp.asKafka -> ts }
withConsumerBlocking(_.offsetsForTimes(kafkaTopicPartitionsOnTimestamp.mapValues(l => new lang.Long(l)).toMap.asJava))
.map(
Expand Down Expand Up @@ -263,9 +263,9 @@ object Consumer {
.getOrThrowFiberFailure()
.run()
}
// runtime
// .unsafeRun()
// .run() // this needs to be run in the same thread
// runtime
// .unsafeRun()
// .run() // this needs to be run in the same thread
}

override def onPartitionsAssigned(partitions: util.Collection[KafkaTopicPartition]): Unit = {
Expand All @@ -286,9 +286,9 @@ object Consumer {
}

private def makeConsumer(
config: ConsumerConfig,
semaphore: Semaphore
)(implicit trace: Trace): RIO[GreyhoundMetrics with Scope, KafkaConsumer[Chunk[Byte], Chunk[Byte]]] = {
config: ConsumerConfig,
semaphore: Semaphore
)(implicit trace: Trace): RIO[GreyhoundMetrics with Scope, KafkaConsumer[Chunk[Byte], Chunk[Byte]]] = {
val acquire = ZIO.attemptBlocking(new KafkaConsumer(config.properties, deserializer, deserializer))
def close(consumer: KafkaConsumer[_, _]) =
attemptBlocking(consumer.close())
Expand All @@ -301,19 +301,19 @@ object Consumer {
}

case class ConsumerConfig(
bootstrapServers: String,
groupId: Group,
clientId: ClientId = s"wix-consumer-${Random.alphanumeric.take(5).mkString}",
offsetReset: OffsetReset = OffsetReset.Latest,
extraProperties: Map[String, String] = Map.empty,
additionalListener: RebalanceListener[Any] = RebalanceListener.Empty,
initialSeek: InitialOffsetsSeek = InitialOffsetsSeek.default,
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false
) extends CommonGreyhoundConfig {
bootstrapServers: String,
groupId: Group,
clientId: ClientId = s"wix-consumer-${Random.alphanumeric.take(5).mkString}",
offsetReset: OffsetReset = OffsetReset.Latest,
extraProperties: Map[String, String] = Map.empty,
additionalListener: RebalanceListener[Any] = RebalanceListener.Empty,
initialSeek: InitialOffsetsSeek = InitialOffsetsSeek.default,
consumerAttributes: Map[String, String] = Map.empty,
decryptor: Decryptor[Any, Throwable, Chunk[Byte], Chunk[Byte]] = new NoOpDecryptor,
commitMetadataString: Metadata = OffsetAndMetadata.NO_METADATA,
rewindUncommittedOffsetsByMillis: Long = 0L,
useParallelConsumer: Boolean = false
) extends CommonGreyhoundConfig {

override def kafkaProps: Map[String, String] = Map(
KafkaConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
Expand Down Expand Up @@ -389,9 +389,9 @@ object UnsafeOffsetOperations {
}

override def committedWithMetadata(
partitions: NonEmptySet[TopicPartition],
timeout: zio.Duration
): Map[TopicPartition, OffsetAndMetadata] = {
partitions: NonEmptySet[TopicPartition],
timeout: zio.Duration
): Map[TopicPartition, OffsetAndMetadata] = {
consumer
.committed(partitions.map(_.asKafka).asJava, timeout)
.asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ object RecordConsumer {
* concurrent between partitions; order is guaranteed to be maintained within the same partition.
*/
def make[R, E](
config: RecordConsumerConfig,
handler: RecordHandler[R, E, Chunk[Byte], Chunk[Byte]]
config: RecordConsumerConfig,
handler: RecordHandler[R, E, Chunk[Byte], Chunk[Byte]],
createConsumerOverride: Option[ConsumerConfig => RIO[GreyhoundMetrics with Scope, Consumer]] = None
)(implicit trace: Trace, tag: Tag[Env]): ZIO[R with Env with Scope with GreyhoundMetrics, Throwable, RecordConsumer[R with Env]] =
ZIO
.acquireRelease(
Expand All @@ -75,7 +76,7 @@ object RecordConsumer {
_ <- validateRetryPolicy(config)
consumerSubscriptionRef <- Ref.make[ConsumerSubscription](config.initialSubscription)
nonBlockingRetryHelper = NonBlockingRetryHelper(config.group, config.retryConfig)
consumer <- Consumer.make(consumerConfig(config))
consumer <- createConsumerOverride.getOrElse(Consumer.make _)(consumerConfig(config))
(initialSubscription, topicsToCreate) = config.retryConfig.fold((config.initialSubscription, Set.empty[Topic]))(policy =>
maybeAddRetryTopics(policy, config, nonBlockingRetryHelper)
)
Expand Down

0 comments on commit ff46bd1

Please sign in to comment.