Skip to content

Commit

Permalink
use offset queries when reading from topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Walid Lezzar committed Mar 16, 2020
1 parent 08b65d4 commit 3721cf0
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 143 deletions.
16 changes: 9 additions & 7 deletions zoe-cli/src/commands/topics.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.adevinta.oss.zoe.cli.config.Format
import com.adevinta.oss.zoe.cli.utils.batches
import com.adevinta.oss.zoe.cli.utils.fetch
import com.adevinta.oss.zoe.cli.utils.globalTermColors
import com.adevinta.oss.zoe.core.functions.PartitionProgress
import com.adevinta.oss.zoe.core.utils.logger
import com.adevinta.oss.zoe.core.utils.toJsonNode
import com.adevinta.oss.zoe.service.*
Expand Down Expand Up @@ -174,7 +173,7 @@ class TopicsConsume : CliktCommand(
formatter,
stop
)
.onEach { if (it is RecordOrProgress.Progress && !continuously) log(it.progress) }
.onEach { if (it is RecordOrProgress.Progress && !continuously) log(it.range) }
.filter { it is RecordOrProgress.Record }
.map { it as RecordOrProgress.Record }
.map { if (verbose) it.record.toJsonNode() else it.record.formatted }
Expand All @@ -183,21 +182,24 @@ class TopicsConsume : CliktCommand(
ctx.term.output.format(records) { echo(it) }
}

private fun log(progress: Iterable<PartitionProgress>) = progress.forEach {
private fun log(progress: Iterable<ConsumptionRange>) = progress.forEach {
it.progress.run {
val message =
"progress on partition ${String.format("%02d", it.partition)}\t" +
"timestamp -> ${currentTimestamp?.let { ts -> dateFmt.format(Date(ts)) }}\t" +
"consumed -> $recordsCount / ${it.latestOffset - startOffset} (${it.percent()}%)"
"consumed -> $numberOfRecords / ${it.until?.let { until -> until - it.from } ?: "Inf"} " +
"(${it.percent()}%)"

logger.info(ctx.term.colors.yellow(message))
}
}

private val dateFmt = SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
private fun PartitionProgress.percent(): Int = with(progress) {
val percent = ((currentOffset - startOffset) / (latestOffset - startOffset).toDouble() * 100)
if (percent.isNaN()) -1 else percent.roundToInt()

private fun ConsumptionRange.percent(): Int {
val percent =
until?.let { until -> ((progress.currentOffset ?: from - from) / (until - from).toDouble() * 100) }
return percent?.roundToInt() ?: -1
}
}

Expand Down
61 changes: 14 additions & 47 deletions zoe-core/src/functions/poll.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

package com.adevinta.oss.zoe.core.functions

import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import com.adevinta.oss.zoe.core.utils.consumer
import com.adevinta.oss.zoe.core.utils.jmespath
import com.adevinta.oss.zoe.core.utils.json
import com.adevinta.oss.zoe.core.utils.match
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.EncoderFactory
Expand All @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import java.io.ByteArrayOutputStream
import java.time.Duration
import kotlin.math.max

/**
* Polls records from kafka
Expand Down Expand Up @@ -90,44 +89,18 @@ data class PolledRecord(
val formatted: JsonNode
)

private fun Consumer<*, *>.subscribe(topic: String, subscription: Subscription) {

val partitions = partitionsFor(topic).map { TopicPartition(it.topic(), it.partition()) }
val endOffsets = endOffsets(partitions)
val startOffsets = beginningOffsets(partitions)

when (subscription) {
is Subscription.FromBeginning -> {
val effectivePartitions = subscription.partitions?.map { TopicPartition(topic, it) } ?: partitions
assign(effectivePartitions)
startOffsets.filter { it.key in effectivePartitions }.forEach { (partition, startOffset) ->
seek(partition, startOffset)
}
}
is Subscription.FromTimestamp -> {
val effectivePartitions = subscription.partitions?.map { TopicPartition(topic, it) } ?: partitions
assign(effectivePartitions)
offsetsForTimes(effectivePartitions.map { it to subscription.ts }.toMap()).forEach { (partition, offset) ->
seek(partition, offset?.offset() ?: endOffsets[partition]!!)
}
}
is Subscription.OffsetStepBack -> {
val effectivePartitions = subscription.partitions?.map { TopicPartition(topic, it) } ?: partitions
assign(effectivePartitions)
endOffsets.filter { it.key in effectivePartitions }.forEach { (partition, endOffset) ->
seek(partition, max(endOffset - subscription.offsetsBack, startOffsets[partition]!!))
}
private fun Consumer<*, *>.subscribe(topic: String, subscription: Subscription) = when (subscription) {
is Subscription.AssignPartitions -> {
assign(subscription.partitions.map { TopicPartition(topic, it.key) })
subscription.partitions.forEach { (partition, offset) ->
seek(TopicPartition(topic, partition), offset)
}
is Subscription.AssignPartitions -> {
assign(subscription.partitions.map { TopicPartition(topic, it.key) })
subscription.partitions.forEach { (partition, offset) ->
seek(TopicPartition(topic, partition), offset)
}
}
is Subscription.WithGroupId -> subscribe(listOf(topic))
}

is Subscription.WithGroupId -> subscribe(listOf(topic))
}


private fun <K, V> Consumer<K, V>.pollAsSequence(timeoutMs: Long): Pair<Sequence<ConsumerRecord<K, V>>, ProgressListener> {
val progressListener = ProgressListener(this)
val records = sequence {
Expand Down Expand Up @@ -254,8 +227,8 @@ data class Progress(

data class PollConfig(
val topic: String,
val subscription: Subscription,
val props: Map<String, String> = mapOf(),
val subscription: Subscription = Subscription.OffsetStepBack(10, null),
val filter: List<String> = listOf(),
val query: String? = null,
val timeoutMs: Long = 10000,
Expand All @@ -266,15 +239,9 @@ data class PollConfig(
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(
JsonSubTypes.Type(value = Subscription.WithGroupId::class, name = "withGroupId"),
JsonSubTypes.Type(value = Subscription.FromTimestamp::class, name = "fromTimestamp"),
JsonSubTypes.Type(value = Subscription.OffsetStepBack::class, name = "offsetStepBack"),
JsonSubTypes.Type(value = Subscription.AssignPartitions::class, name = "assignPartitions"),
JsonSubTypes.Type(value = Subscription.FromBeginning::class, name = "fromBeginning")
JsonSubTypes.Type(value = Subscription.AssignPartitions::class, name = "assignPartitions")
)
sealed class Subscription {
object WithGroupId : Subscription()
data class FromBeginning(val partitions: Set<Int>?) : Subscription()
data class FromTimestamp(val ts: Long, val partitions: Set<Int>?) : Subscription()
data class OffsetStepBack(val offsetsBack: Long, val partitions: Set<Int>?) : Subscription()
data class AssignPartitions(val partitions: Map<Int, Long>) : Subscription()
}
1 change: 1 addition & 0 deletions zoe-service/resources/pod.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
{
"name": "zoe",
"image": "wlezzar/zoe:latest",
"imagePullPolicy": "Always",
"args": [],
"resources": {
"limits": {
Expand Down
2 changes: 1 addition & 1 deletion zoe-service/src/runners/extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ suspend fun ZoeRunner.offsets(config: GroupConfig): GroupOffsetsResponse {
}

suspend fun ZoeRunner.queryOffsets(config: OffsetQueriesRequest): OffsetQueriesResponse {
logger.info("querying offsets : $config")
logger.info("querying offsets : ${config.queries}")
return launchAwait(queryOffsets.name(), config).parseJson()
}

Expand Down
91 changes: 49 additions & 42 deletions zoe-service/src/service.kt
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class ZoeService(
OffsetQueriesRequest(
props = props,
topic = topic,
queries = listOfNotNull(endQuery, startQuery)
queries = listOfNotNull(endQuery, startQuery, topicEndQuery)
)
)

Expand All @@ -176,7 +176,17 @@ class ZoeService(
?: throw IllegalStateException("topic end offset (part: $partition) not found in: $responses")
}

ConsumptionRange(partition, from = nonNullStartOffset, until = endOffset?.offset)
ConsumptionRange(
partition,
from = nonNullStartOffset,
until = endOffset?.offset,
progress = ConsumptionProgress(
currentOffset = null,
nextOffset = nonNullStartOffset,
currentTimestamp = null,
numberOfRecords = 0
)
)
}
}

Expand Down Expand Up @@ -354,69 +364,58 @@ class ZoeService(
formatter: String
): Flow<RecordOrProgress> = flow {

var globalProgress = emptyMap<Int, PartitionProgress>()
var currentRange = range

do {

val config = PollConfig(
topic,
props,
Subscription.AssignPartitions(currentRange.map { it.partition to it.from }.toMap<Int, Long>()),
filter,
query,
timeoutPerBatch,
topic = topic,
subscription = Subscription.AssignPartitions(
currentRange.map { it.partition to it.progress.nextOffset }.toMap()
),
props = props,
filter = filter,
query = query,
timeoutMs = timeoutPerBatch,
numberOfRecords = recordsPerBatch,
jsonifier = formatter
)

val (records, progress) = runner.poll(config)

currentRange = updateConsumptionRange(currentRange = currentRange, progress = progress)
globalProgress = updatedGlobalProgress(progress, currentGlobalProgress = globalProgress)

for (record in records) {
emit(RecordOrProgress.Record(record))
}

emit(RecordOrProgress.Progress(globalProgress.values))

} while (currentRange.isNotEmpty())
currentRange = updateConsumptionRange(currentRange = currentRange, progress = progress)

}
emit(RecordOrProgress.Progress(currentRange))

} while (currentRange.isNotEmpty())

private fun updatedGlobalProgress(
progress: Iterable<PartitionProgress>,
currentGlobalProgress: Map<Int, PartitionProgress>
) = currentGlobalProgress.toMutableMap().apply {
progress.forEach { newPartitionProgress ->
val (_, partition, _, _, newProgress) = newPartitionProgress
this[partition] =
this[partition]
?.let { previousPartitionProgress ->
val (_, _, _, _, previousProgress) = previousPartitionProgress
previousPartitionProgress.copy(
progress = Progress(
startOffset = previousProgress.startOffset,
currentOffset = newProgress.currentOffset,
currentTimestamp = newProgress.currentTimestamp,
recordsCount = newProgress.recordsCount + previousProgress.recordsCount
)
)
}
?: newPartitionProgress
}
}

private fun updateConsumptionRange(
progress: Iterable<PartitionProgress>,
currentRange: List<ConsumptionRange>
): List<ConsumptionRange> {
val currentOffsets = progress.map { it.partition to it.progress.currentOffset }.toMap()
val currentOffsets = progress.map { it.partition to it.progress }.toMap()
return currentRange
.map { it.copy(from = currentOffsets[it.partition] ?: it.from) }
.filter { it.until == null || it.from >= it.until }
.map {
it.copy(
progress = currentOffsets[it.partition]
?.let { lastProgress ->
ConsumptionProgress(
currentOffset = lastProgress.currentOffset,
nextOffset = lastProgress.currentOffset + 1,
currentTimestamp = lastProgress.currentTimestamp,
numberOfRecords = lastProgress.recordsCount + it.progress.numberOfRecords
)
}
?: it.progress
)
}
.filter { it.until == null || it.progress.nextOffset <= it.until }
}
}

Expand All @@ -425,7 +424,7 @@ private fun <T> Iterable<T>.splitIntoGroupsBy(count: Int, by: (T) -> Int): Colle

sealed class RecordOrProgress {
data class Record(val record: PolledRecord) : RecordOrProgress()
data class Progress(val progress: Iterable<PartitionProgress>) : RecordOrProgress()
data class Progress(val range: Iterable<ConsumptionRange>) : RecordOrProgress()
}

sealed class ConsumeFrom {
Expand All @@ -438,7 +437,15 @@ sealed class ConsumeFrom {
data class ConsumptionRange(
val partition: Int,
val from: Long,
val until: Long?
val until: Long?,
val progress: ConsumptionProgress
)

data class ConsumptionProgress(
val currentOffset: Long?,
val nextOffset: Long,
val currentTimestamp: Long?,
val numberOfRecords: Long
)

class TopicAliasOrRealName(val value: String)
Expand Down
39 changes: 10 additions & 29 deletions zoe-service/test/ZoeServiceTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.adevinta.oss.zoe.service

import com.adevinta.oss.zoe.core.functions.OffsetQueriesResponse
import com.adevinta.oss.zoe.core.functions.TopicPartitionOffset
import com.adevinta.oss.zoe.core.utils.toJsonNode
import com.adevinta.oss.zoe.service.config.Cluster
Expand All @@ -27,12 +26,12 @@ object ZoeServiceTest : Spek({
simulator(readSpeedPerPoll = 2) {
cluster("local") {
topic(name = "input", partitions = 5) {
message(partition = 0, offset = 0, timestamp = 0, content = """{"id": 1}""".toJsonNode())
message(partition = 0, offset = 1, timestamp = 1, content = """{"id": 2}""".toJsonNode())
message(partition = 0, offset = 2, timestamp = 3, content = """{"id": 3}""".toJsonNode())
message(partition = 1, offset = 0, timestamp = 1, content = """{"id": 5}""".toJsonNode())
message(partition = 1, offset = 1, timestamp = 2, content = """{"id": 6}""".toJsonNode())
message(partition = 2, offset = 0, timestamp = 1, content = """{"id": 6}""".toJsonNode())
message("""{"id": 1}""".toJsonNode(), key = "0-0", partition = 0, offset = 0, timestamp = 0)
message("""{"id": 2}""".toJsonNode(), key = "0-1", partition = 0, offset = 1, timestamp = 1)
message("""{"id": 3}""".toJsonNode(), key = "0-2", partition = 0, offset = 2, timestamp = 3)
message("""{"id": 5}""".toJsonNode(), key = "1-0", partition = 1, offset = 0, timestamp = 1)
message("""{"id": 6}""".toJsonNode(), key = "1-1", partition = 1, offset = 1, timestamp = 2)
message("""{"id": 6}""".toJsonNode(), key = "2-0", partition = 2, offset = 0, timestamp = 1)
}
}
}
Expand Down Expand Up @@ -67,7 +66,10 @@ object ZoeServiceTest : Spek({

it("receives all the records") {
Assert.assertEquals(
"didn't receive all records (${readResponse.size}): $readResponse",
"""didn't receive the right number of records:
| - received: $readResponse
| - expected: ${runner.state.clusters.first().topics.first().messages.values}
""".trimMargin(),
6,
readResponse.filterIsInstance<RecordOrProgress.Record>().size
)
Expand Down Expand Up @@ -100,27 +102,6 @@ object ZoeServiceTest : Spek({
}
}

describe("Can read from an offset stepback") {
lateinit var readResponse: List<RecordOrProgress>

beforeEachTest {
runBlocking {
readResponse =
service
.readWithDefaultValues(
cluster = "cluster",
topic = "input",
from = ConsumeFrom.OffsetStepBack(1)
)
.toList()
}
}

it("receives one message per partition") {
Assert.assertEquals(2, readResponse.filterIsInstance<RecordOrProgress.Record>().size)
}
}

describe("Can request offsets from timestamps") {
val ts = 2L
val topic = "input"
Expand Down
Loading

0 comments on commit 3721cf0

Please sign in to comment.