diff --git a/zoe-cli/src/commands/topics.kt b/zoe-cli/src/commands/topics.kt index 584f864..b0ae983 100644 --- a/zoe-cli/src/commands/topics.kt +++ b/zoe-cli/src/commands/topics.kt @@ -12,7 +12,6 @@ import com.adevinta.oss.zoe.cli.config.Format import com.adevinta.oss.zoe.cli.utils.batches import com.adevinta.oss.zoe.cli.utils.fetch import com.adevinta.oss.zoe.cli.utils.globalTermColors -import com.adevinta.oss.zoe.core.functions.PartitionProgress import com.adevinta.oss.zoe.core.utils.logger import com.adevinta.oss.zoe.core.utils.toJsonNode import com.adevinta.oss.zoe.service.* @@ -174,7 +173,7 @@ class TopicsConsume : CliktCommand( formatter, stop ) - .onEach { if (it is RecordOrProgress.Progress && !continuously) log(it.progress) } + .onEach { if (it is RecordOrProgress.Progress && !continuously) log(it.range) } .filter { it is RecordOrProgress.Record } .map { it as RecordOrProgress.Record } .map { if (verbose) it.record.toJsonNode() else it.record.formatted } @@ -183,21 +182,24 @@ class TopicsConsume : CliktCommand( ctx.term.output.format(records) { echo(it) } } - private fun log(progress: Iterable) = progress.forEach { + private fun log(progress: Iterable) = progress.forEach { it.progress.run { val message = "progress on partition ${String.format("%02d", it.partition)}\t" + "timestamp -> ${currentTimestamp?.let { ts -> dateFmt.format(Date(ts)) }}\t" + - "consumed -> $recordsCount / ${it.latestOffset - startOffset} (${it.percent()}%)" + "consumed -> $numberOfRecords / ${it.until?.let { until -> until - it.from } ?: "Inf"} " + + "(${it.percent()}%)" logger.info(ctx.term.colors.yellow(message)) } } private val dateFmt = SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - private fun PartitionProgress.percent(): Int = with(progress) { - val percent = ((currentOffset - startOffset) / (latestOffset - startOffset).toDouble() * 100) - if (percent.isNaN()) -1 else percent.roundToInt() + + private fun ConsumptionRange.percent(): Int { + val percent = + until?.let { until -> ((progress.currentOffset ?: from - from) / (until - from).toDouble() * 100) } + return percent?.roundToInt() ?: -1 } } diff --git a/zoe-core/src/functions/poll.kt b/zoe-core/src/functions/poll.kt index f3222bc..91d2a04 100644 --- a/zoe-core/src/functions/poll.kt +++ b/zoe-core/src/functions/poll.kt @@ -8,14 +8,14 @@ package com.adevinta.oss.zoe.core.functions -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.NullNode import com.adevinta.oss.zoe.core.utils.consumer import com.adevinta.oss.zoe.core.utils.jmespath import com.adevinta.oss.zoe.core.utils.json import com.adevinta.oss.zoe.core.utils.match +import com.fasterxml.jackson.annotation.JsonSubTypes +import com.fasterxml.jackson.annotation.JsonTypeInfo +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.NullNode import org.apache.avro.generic.GenericDatumWriter import org.apache.avro.generic.GenericRecord import org.apache.avro.io.EncoderFactory @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import java.io.ByteArrayOutputStream import java.time.Duration -import kotlin.math.max /** * Polls records from kafka @@ -90,44 +89,18 @@ data class PolledRecord( val formatted: JsonNode ) -private fun Consumer<*, *>.subscribe(topic: String, subscription: Subscription) { - - val partitions = partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) } - val endOffsets = endOffsets(partitions) - val startOffsets = beginningOffsets(partitions) - - when (subscription) { - is Subscription.FromBeginning -> { - val effectivePartitions = subscription.partitions?.map { TopicPartition(topic, it) } ?: partitions - assign(effectivePartitions) - startOffsets.filter { it.key in effectivePartitions }.forEach { (partition, startOffset) -> - seek(partition, startOffset) - } - } - is Subscription.FromTimestamp -> { - val effectivePartitions = subscription.partitions?.map { TopicPartition(topic, it) } ?: partitions - assign(effectivePartitions) - offsetsForTimes(effectivePartitions.map { it to subscription.ts }.toMap()).forEach { (partition, offset) -> - seek(partition, offset?.offset() ?: endOffsets[partition]!!) - } - } - is Subscription.OffsetStepBack -> { - val effectivePartitions = subscription.partitions?.map { TopicPartition(topic, it) } ?: partitions - assign(effectivePartitions) - endOffsets.filter { it.key in effectivePartitions }.forEach { (partition, endOffset) -> - seek(partition, max(endOffset - subscription.offsetsBack, startOffsets[partition]!!)) - } +private fun Consumer<*, *>.subscribe(topic: String, subscription: Subscription) = when (subscription) { + is Subscription.AssignPartitions -> { + assign(subscription.partitions.map { TopicPartition(topic, it.key) }) + subscription.partitions.forEach { (partition, offset) -> + seek(TopicPartition(topic, partition), offset) } - is Subscription.AssignPartitions -> { - assign(subscription.partitions.map { TopicPartition(topic, it.key) }) - subscription.partitions.forEach { (partition, offset) -> - seek(TopicPartition(topic, partition), offset) - } - } - is Subscription.WithGroupId -> subscribe(listOf(topic)) } + + is Subscription.WithGroupId -> subscribe(listOf(topic)) } + private fun Consumer.pollAsSequence(timeoutMs: Long): Pair>, ProgressListener> { val progressListener = ProgressListener(this) val records = sequence { @@ -254,8 +227,8 @@ data class Progress( data class PollConfig( val topic: String, + val subscription: Subscription, val props: Map = mapOf(), - val subscription: Subscription = Subscription.OffsetStepBack(10, null), val filter: List = listOf(), val query: String? = null, val timeoutMs: Long = 10000, @@ -266,15 +239,9 @@ data class PollConfig( @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") @JsonSubTypes( JsonSubTypes.Type(value = Subscription.WithGroupId::class, name = "withGroupId"), - JsonSubTypes.Type(value = Subscription.FromTimestamp::class, name = "fromTimestamp"), - JsonSubTypes.Type(value = Subscription.OffsetStepBack::class, name = "offsetStepBack"), - JsonSubTypes.Type(value = Subscription.AssignPartitions::class, name = "assignPartitions"), - JsonSubTypes.Type(value = Subscription.FromBeginning::class, name = "fromBeginning") + JsonSubTypes.Type(value = Subscription.AssignPartitions::class, name = "assignPartitions") ) sealed class Subscription { object WithGroupId : Subscription() - data class FromBeginning(val partitions: Set?) : Subscription() - data class FromTimestamp(val ts: Long, val partitions: Set?) : Subscription() - data class OffsetStepBack(val offsetsBack: Long, val partitions: Set?) : Subscription() data class AssignPartitions(val partitions: Map) : Subscription() } \ No newline at end of file diff --git a/zoe-service/resources/pod.template.json b/zoe-service/resources/pod.template.json index 40fa969..6f8029a 100644 --- a/zoe-service/resources/pod.template.json +++ b/zoe-service/resources/pod.template.json @@ -32,6 +32,7 @@ { "name": "zoe", "image": "wlezzar/zoe:latest", + "imagePullPolicy": "Always", "args": [], "resources": { "limits": { diff --git a/zoe-service/src/runners/extensions.kt b/zoe-service/src/runners/extensions.kt index ce706da..34b8f47 100644 --- a/zoe-service/src/runners/extensions.kt +++ b/zoe-service/src/runners/extensions.kt @@ -50,7 +50,7 @@ suspend fun ZoeRunner.offsets(config: GroupConfig): GroupOffsetsResponse { } suspend fun ZoeRunner.queryOffsets(config: OffsetQueriesRequest): OffsetQueriesResponse { - logger.info("querying offsets : $config") + logger.info("querying offsets : ${config.queries}") return launchAwait(queryOffsets.name(), config).parseJson() } diff --git a/zoe-service/src/service.kt b/zoe-service/src/service.kt index fce834b..cc08824 100644 --- a/zoe-service/src/service.kt +++ b/zoe-service/src/service.kt @@ -156,7 +156,7 @@ class ZoeService( OffsetQueriesRequest( props = props, topic = topic, - queries = listOfNotNull(endQuery, startQuery) + queries = listOfNotNull(endQuery, startQuery, topicEndQuery) ) ) @@ -176,7 +176,17 @@ class ZoeService( ?: throw IllegalStateException("topic end offset (part: $partition) not found in: $responses") } - ConsumptionRange(partition, from = nonNullStartOffset, until = endOffset?.offset) + ConsumptionRange( + partition, + from = nonNullStartOffset, + until = endOffset?.offset, + progress = ConsumptionProgress( + currentOffset = null, + nextOffset = nonNullStartOffset, + currentTimestamp = null, + numberOfRecords = 0 + ) + ) } } @@ -354,69 +364,58 @@ class ZoeService( formatter: String ): Flow = flow { - var globalProgress = emptyMap() var currentRange = range do { val config = PollConfig( - topic, - props, - Subscription.AssignPartitions(currentRange.map { it.partition to it.from }.toMap()), - filter, - query, - timeoutPerBatch, + topic = topic, + subscription = Subscription.AssignPartitions( + currentRange.map { it.partition to it.progress.nextOffset }.toMap() + ), + props = props, + filter = filter, + query = query, + timeoutMs = timeoutPerBatch, numberOfRecords = recordsPerBatch, jsonifier = formatter ) val (records, progress) = runner.poll(config) - currentRange = updateConsumptionRange(currentRange = currentRange, progress = progress) - globalProgress = updatedGlobalProgress(progress, currentGlobalProgress = globalProgress) - for (record in records) { emit(RecordOrProgress.Record(record)) } - emit(RecordOrProgress.Progress(globalProgress.values)) - - } while (currentRange.isNotEmpty()) + currentRange = updateConsumptionRange(currentRange = currentRange, progress = progress) - } + emit(RecordOrProgress.Progress(currentRange)) + } while (currentRange.isNotEmpty()) - private fun updatedGlobalProgress( - progress: Iterable, - currentGlobalProgress: Map - ) = currentGlobalProgress.toMutableMap().apply { - progress.forEach { newPartitionProgress -> - val (_, partition, _, _, newProgress) = newPartitionProgress - this[partition] = - this[partition] - ?.let { previousPartitionProgress -> - val (_, _, _, _, previousProgress) = previousPartitionProgress - previousPartitionProgress.copy( - progress = Progress( - startOffset = previousProgress.startOffset, - currentOffset = newProgress.currentOffset, - currentTimestamp = newProgress.currentTimestamp, - recordsCount = newProgress.recordsCount + previousProgress.recordsCount - ) - ) - } - ?: newPartitionProgress - } } private fun updateConsumptionRange( progress: Iterable, currentRange: List ): List { - val currentOffsets = progress.map { it.partition to it.progress.currentOffset }.toMap() + val currentOffsets = progress.map { it.partition to it.progress }.toMap() return currentRange - .map { it.copy(from = currentOffsets[it.partition] ?: it.from) } - .filter { it.until == null || it.from >= it.until } + .map { + it.copy( + progress = currentOffsets[it.partition] + ?.let { lastProgress -> + ConsumptionProgress( + currentOffset = lastProgress.currentOffset, + nextOffset = lastProgress.currentOffset + 1, + currentTimestamp = lastProgress.currentTimestamp, + numberOfRecords = lastProgress.recordsCount + it.progress.numberOfRecords + ) + } + ?: it.progress + ) + } + .filter { it.until == null || it.progress.nextOffset <= it.until } } } @@ -425,7 +424,7 @@ private fun Iterable.splitIntoGroupsBy(count: Int, by: (T) -> Int): Colle sealed class RecordOrProgress { data class Record(val record: PolledRecord) : RecordOrProgress() - data class Progress(val progress: Iterable) : RecordOrProgress() + data class Progress(val range: Iterable) : RecordOrProgress() } sealed class ConsumeFrom { @@ -438,7 +437,15 @@ sealed class ConsumeFrom { data class ConsumptionRange( val partition: Int, val from: Long, - val until: Long? + val until: Long?, + val progress: ConsumptionProgress +) + +data class ConsumptionProgress( + val currentOffset: Long?, + val nextOffset: Long, + val currentTimestamp: Long?, + val numberOfRecords: Long ) class TopicAliasOrRealName(val value: String) diff --git a/zoe-service/test/ZoeServiceTest.kt b/zoe-service/test/ZoeServiceTest.kt index ed33a74..380031d 100644 --- a/zoe-service/test/ZoeServiceTest.kt +++ b/zoe-service/test/ZoeServiceTest.kt @@ -1,6 +1,5 @@ package com.adevinta.oss.zoe.service -import com.adevinta.oss.zoe.core.functions.OffsetQueriesResponse import com.adevinta.oss.zoe.core.functions.TopicPartitionOffset import com.adevinta.oss.zoe.core.utils.toJsonNode import com.adevinta.oss.zoe.service.config.Cluster @@ -27,12 +26,12 @@ object ZoeServiceTest : Spek({ simulator(readSpeedPerPoll = 2) { cluster("local") { topic(name = "input", partitions = 5) { - message(partition = 0, offset = 0, timestamp = 0, content = """{"id": 1}""".toJsonNode()) - message(partition = 0, offset = 1, timestamp = 1, content = """{"id": 2}""".toJsonNode()) - message(partition = 0, offset = 2, timestamp = 3, content = """{"id": 3}""".toJsonNode()) - message(partition = 1, offset = 0, timestamp = 1, content = """{"id": 5}""".toJsonNode()) - message(partition = 1, offset = 1, timestamp = 2, content = """{"id": 6}""".toJsonNode()) - message(partition = 2, offset = 0, timestamp = 1, content = """{"id": 6}""".toJsonNode()) + message("""{"id": 1}""".toJsonNode(), key = "0-0", partition = 0, offset = 0, timestamp = 0) + message("""{"id": 2}""".toJsonNode(), key = "0-1", partition = 0, offset = 1, timestamp = 1) + message("""{"id": 3}""".toJsonNode(), key = "0-2", partition = 0, offset = 2, timestamp = 3) + message("""{"id": 5}""".toJsonNode(), key = "1-0", partition = 1, offset = 0, timestamp = 1) + message("""{"id": 6}""".toJsonNode(), key = "1-1", partition = 1, offset = 1, timestamp = 2) + message("""{"id": 6}""".toJsonNode(), key = "2-0", partition = 2, offset = 0, timestamp = 1) } } } @@ -67,7 +66,10 @@ object ZoeServiceTest : Spek({ it("receives all the records") { Assert.assertEquals( - "didn't receive all records (${readResponse.size}): $readResponse", + """didn't receive the right number of records: + | - received: $readResponse + | - expected: ${runner.state.clusters.first().topics.first().messages.values} + """.trimMargin(), 6, readResponse.filterIsInstance().size ) @@ -100,27 +102,6 @@ object ZoeServiceTest : Spek({ } } - describe("Can read from an offset stepback") { - lateinit var readResponse: List - - beforeEachTest { - runBlocking { - readResponse = - service - .readWithDefaultValues( - cluster = "cluster", - topic = "input", - from = ConsumeFrom.OffsetStepBack(1) - ) - .toList() - } - } - - it("receives one message per partition") { - Assert.assertEquals(2, readResponse.filterIsInstance().size) - } - } - describe("Can request offsets from timestamps") { val ts = 2L val topic = "input" diff --git a/zoe-service/test/simulator/simulator.kt b/zoe-service/test/simulator/simulator.kt index f693ceb..0cac661 100644 --- a/zoe-service/test/simulator/simulator.kt +++ b/zoe-service/test/simulator/simulator.kt @@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode import java.util.concurrent.CompletableFuture import kotlin.math.min -class ZoeRunnerSimulator(private val state: RunnerState) : ZoeRunner { +class ZoeRunnerSimulator(val state: RunnerState) : ZoeRunner { override val name: String = "simulator" @@ -48,24 +48,11 @@ class ZoeRunnerSimulator(private val state: RunnerState) : ZoeRunner { val candidates = when (val subscription = payload.subscription) { - is Subscription.FromBeginning -> - topic.messagesForPartitions(subscription.partitions ?: topic.partitions) - is Subscription.AssignPartitions -> topic .messagesForPartitions(subscription.partitions.keys) .filter { it.offset >= subscription.partitions.getValue(it.partition) } - is Subscription.FromTimestamp -> - topic - .messagesForPartitions(subscription.partitions ?: topic.partitions) - .filter { it.timestamp >= subscription.ts } - - is Subscription.OffsetStepBack -> - topic - .messagesForPartitions(subscription.partitions ?: topic.partitions) - .filter { it.offset >= topic.latestOffset(it.partition)!! - subscription.offsetsBack } - is Subscription.WithGroupId -> throw IllegalArgumentException("subscription not supported: $subscription") } @@ -89,9 +76,8 @@ class ZoeRunnerSimulator(private val state: RunnerState) : ZoeRunner { .toList(), progress = records - .groupBy { (topic.name to it.partition) } - .map { (topicPartition, partitionRecords) -> - val (_, partition) = topicPartition + .groupBy { it.partition } + .map { (partition, partitionRecords) -> PartitionProgress( topic = topic.name, partition = partition,