Skip to content

Commit

Permalink
Async commit completed with callback (monix#94)
Browse files Browse the repository at this point in the history
Add consumer polling (monix#94)

AutoCommit polling logic (monix#94)

Remove nested task loop

Remove redundant atomic refs

Add test for (monix#101)

Apply changes to older versions
  • Loading branch information
paualarco committed Jan 17, 2021
1 parent 0b8c94e commit 0ad10cb
Show file tree
Hide file tree
Showing 36 changed files with 665 additions and 106 deletions.
1 change: 1 addition & 0 deletions kafka-0.10.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ kafka {
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
monix.observable.commit.order = "after-ack"
monix.observable.poll.interval.ms = 100
}
7 changes: 2 additions & 5 deletions kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback
*/
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit]
final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null)
def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit]
}

private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ final class CommittableOffset private[kafka] (
* to use batched commit with [[CommittableOffsetBatch]] class.
*/
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))

/** Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
*/
def commitAsync(callback: OffsetCommitCallback): Task[Unit] =
commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback)
}

object CommittableOffset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ import scala.concurrent.duration._
* Specifies when the commit should happen, like before we receive the
* acknowledgement from downstream, or afterwards.
*
* @param pollInterval is the `monix.observable.poll.interval.ms` setting.
* Specifies time between KafkaConsumer#poll call attempts.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
Expand Down Expand Up @@ -243,7 +246,8 @@ final case class KafkaConsumerConfig(
retryBackoffTime: FiniteDuration,
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekOnStart: ObservableSeekOnStart,
observableSeekToEndOnStart: Boolean,
pollInterval: FiniteDuration,
properties: Map[String, String]) {

def toMap: Map[String, String] = properties ++ Map(
Expand Down Expand Up @@ -428,7 +432,8 @@ object KafkaConsumerConfig {
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekOnStart = ObservableSeekOnStart(config.getString("monix.observable.seek.onStart")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
pollInterval = config.getInt("monix.observable.poll.interval.ms").millis,
properties = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package monix.kafka

import monix.eval.Task
import monix.eval.{Fiber, Task}
import monix.execution.Ack.{Continue, Stop}
import monix.execution.{Ack, Callback, Cancelable}
import monix.kafka.config.ObservableCommitOrder
Expand All @@ -41,7 +41,11 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
protected def config: KafkaConsumerConfig
protected def consumer: Task[Consumer[K, V]]

/** Creates a task that polls the source, then feeds the downstream
@volatile
protected var isAcked = true

/**
* Creates a task that polls the source, then feeds the downstream
* subscriber, returning the resulting acknowledgement
*/
protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack]
Expand All @@ -67,8 +71,10 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
if (config.observableSeekOnStart.isSeekEnd) c.seekToEnd(Nil.asJavaCollection)
else if (config.observableSeekOnStart.isSeekBeginning) c.seekToBeginning(Nil.asJavaCollection)
// A task to execute on both cancellation and normal termination
val onCancel = cancelTask(c)
runLoop(c, out).guarantee(onCancel)
pollConsumer(c).loopForever.start.flatMap { pollFiber =>
val onCancel = cancelTask(c, pollFiber)
runLoop(c, out).guarantee(onCancel)
}
}
feedTask.runAsync(cb)
}
Expand All @@ -89,10 +95,12 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
/* Returns a `Task` that triggers the closing of the
* Kafka Consumer connection.
*/
private def cancelTask(consumer: Consumer[K, V]): Task[Unit] = {
private def cancelTask(consumer: KafkaConsumer[K, V], pollFiber: Fiber[Nothing]): Task[Unit] = {
// Forced asynchronous boundary
val cancelTask = Task.evalAsync {
consumer.synchronized(blocking(consumer.close()))
val cancelTask = pollFiber.cancel.flatMap { _ =>
Task.evalAsync {
consumer.synchronized(blocking(consumer.close()))
}
}

// By applying memoization, we are turning this
Expand All @@ -101,6 +109,27 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
// at most once
cancelTask.memoize
}

/* Returns task that constantly polls the `KafkaConsumer` in case subscriber
* is still processing last fed batch.
* This allows producer process commit calls and also keeps consumer alive even
* with long batch processing.
*/
private def pollConsumer(consumer: KafkaConsumer[K, V]): Task[Unit] = {
Task
.sleep(config.pollInterval)
.flatMap { _ =>
if (!isAcked) {
Task.evalAsync {
consumer.synchronized {
blocking(consumer.poll(0))
}
}
} else {
Task.unit
}
}
}
}

object KafkaConsumerObservable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (
s.executeAsync { () =>
val ackFuture =
try consumer.synchronized {
val assignment = consumer.assignment()
if (cancelable.isCanceled) Stop
else {
consumer.resume(assignment)
val next = blocking(consumer.poll(pollTimeoutMillis))
consumer.pause(assignment)
if (shouldCommitBefore) consumerCommit(consumer)
// Feeding the observer happens on the Subscriber's scheduler
// if any asynchronous boundaries happen
isAcked = false
Observer.feed(out, next.asScala)(out.scheduler)
}
} catch {
Expand All @@ -83,6 +87,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (

ackFuture.syncOnComplete {
case Success(ack) =>
isAcked = true
// The `streamError` flag protects against contract violations
// (i.e. onSuccess/onError should happen only once).
// Not really required, but we don't want to depend on the
Expand All @@ -106,6 +111,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (
}

case Failure(ex) =>
isAcked = true
asyncCb.onError(ex)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package monix.kafka

import java.util

import monix.eval.Task
import monix.execution.Ack.Stop
import monix.execution.cancelables.BooleanCancelable
Expand Down Expand Up @@ -48,17 +50,40 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) =>
k -> new OffsetAndMetadata(v)
}.asJava))))

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(
consumer.synchronized(
consumer.commitAsync(
batch.map { case (k, v) =>
k -> new OffsetAndMetadata(v)
}.asJava,
callback)))
}
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = {
Task
.async0[Unit] { (s, cb) =>
val asyncCb = Callback.forked(cb)(s)
s.executeAsync { () =>
consumer.synchronized {
try {
val offsets = batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava
consumer.commitAsync(
offsets,
new OffsetCommitCallback {
override def onComplete(
offsets: util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception): Unit = {
if (exception != null) {
if (!asyncCb.tryOnError(exception))
s.reportFailure(exception)
} else {
asyncCb.tryOnSuccess(())
}
}
}
)
} catch {
case NonFatal(ex) =>
if (!asyncCb.tryOnError(ex))
s.reportFailure(ex)
}
}
}
}
}
}

override protected def ackTask(consumer: Consumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] =
Expand All @@ -73,9 +98,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
s.executeAsync { () =>
val ackFuture =
try consumer.synchronized {
val assignment = consumer.assignment()
if (cancelable.isCanceled) Stop
else {
consumer.resume(assignment)
val next = blocking(consumer.poll(pollTimeoutMillis))
consumer.pause(assignment)
// Feeding the observer happens on the Subscriber's scheduler
// if any asynchronous boundaries happen
val result = next.asScala.map { record =>
Expand All @@ -86,6 +114,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
record.offset() + 1,
commit))
}
isAcked = false
Observer.feed(out, result)(out.scheduler)
}
} catch {
Expand All @@ -95,6 +124,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (

ackFuture.syncOnComplete {
case Success(ack) =>
isAcked = true
// The `streamError` flag protects against contract violations
// (i.e. onSuccess/onError should happen only once).
// Not really required, but we don't want to depend on the
Expand All @@ -117,6 +147,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
}

case Failure(ex) =>
isAcked = true
asyncCb.onError(ex)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ class MergeByCommitCallbackTest extends FunSuite with KafkaTestKit with ScalaChe

val commitCallbacks: List[Commit] = List.fill(4)(new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
})

val committableOffsetsGen: Gen[CommittableOffset] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,38 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
}
}

test("manual async commit consumer test when subscribed to topics list") {
withRunningKafka {

val count = 10000
val topicName = "monix-kafka-manual-commit-tests"

val producer = KafkaProducerSink[String, String](producerCfg, io)
val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerCfg, List(topicName))

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", msg.toString))
.bufferIntrospective(1024)
.consumeWith(producer)

val listT = consumer
.executeOn(io)
.bufferTumbling(count)
.map { messages =>
messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset))
}
.mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_ => values -> batch.offsets) }
.headL

val ((result, offsets), _) =
Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds)

val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000)
assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets)
}
}

test("publish to closed producer when subscribed to topics list") {
withRunningKafka {
val producer = KafkaProducer[String, String](producerCfg, io)
Expand All @@ -155,4 +187,35 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
assert(first.isDefined && second.isRight && third.isEmpty)
}
}

test("slow batches processing doesn't cause rebalancing") {
withRunningKafka {
val count = 10000

val consumerConfig = consumerCfg.copy(
maxPollInterval = 200.millis,
pollInterval = 100.millis
)

val producer = KafkaProducerSink[String, String](producerCfg, io)
val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io)

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", msg.toString))
.bufferIntrospective(1024)
.consumeWith(producer)

val listT = consumer
.take(count)
.map(_.value())
.bufferTumbling(count / 4)
.mapEval(s => Task.sleep(2.second) >> Task.delay(s))
.flatMap(Observable.fromIterable)
.toListL

val (result, _) = Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds)
assert(result.map(_.toInt).sum === (0 until count).sum)
}
}
}
1 change: 1 addition & 0 deletions kafka-0.11.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,5 @@ kafka {
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
monix.observable.commit.order = "after-ack"
monix.observable.poll.interval.ms = 100
}
7 changes: 2 additions & 5 deletions kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ import org.apache.kafka.clients.consumer.OffsetCommitCallback
*/
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit]
final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null)
def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit]
}

private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
}
}
Loading

0 comments on commit 0ad10cb

Please sign in to comment.