Skip to content

Commit

Permalink
ConsumerIT: add a test: allow to override offsetReset with autoResetO…
Browse files Browse the repository at this point in the history
…ffset from extra properties (#35845)

ConsumerIT: add a test: allow to override offsetReset with autoResetOffset from extra properties #automerge
GitOrigin-RevId: a9159b9c15445daa243e84a0b4bcaeae8db8b29d
  • Loading branch information
natansil authored and wix-oss committed Oct 5, 2023
1 parent ebcd88a commit 97179da
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions core/src/it/scala/com/wixpress/dst/greyhound/core/ConsumerIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,41 @@ class ConsumerIT extends BaseTestWithSharedEnv[Env, TestResources] {
}
}

s"allow to override offsetReset with autoResetOffset from extra properties${parallelConsumerString(useParallelConsumer)}" in
ZIO.scoped {
for {
r <- getShared
TestResources(kafka, producer) = r
_ <- ZIO.debug(">>>> starting test: earliestTest")
topic <- kafka.createRandomTopic(prefix = "core-from-earliest")
group <- randomGroup

queue <- Queue.unbounded[ConsumerRecord[String, String]]
handler = RecordHandler(queue.offer(_: ConsumerRecord[String, String]))
.withDeserializers(StringSerde, StringSerde)
.ignore

record = ProducerRecord(topic, "bar", Some("foo"))
_ <- producer.produce(record, StringSerde, StringSerde)

message <- RecordConsumer
.make(
configFor(
kafka,
group,
topic,
mutateEventLoop = _.copy(consumePartitionInParallel = useParallelConsumer, maxParallelism = 8)
)
.copy(offsetReset = Latest, extraProperties = Map("auto.offset.reset" -> "earliest")),
handler
)
.flatMap { _ => queue.take }
.timeout(10.seconds)
} yield {
message.get must (beRecordWithKey("foo") and beRecordWithValue("bar"))
}
}

s"not lose messages while throttling after rebalance${parallelConsumerString(useParallelConsumer)}" in
ZIO.scoped {
for {
Expand Down

0 comments on commit 97179da

Please sign in to comment.