Skip to content

Commit

Permalink
Add consumer polling (monix#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
voidconductor committed Sep 11, 2019
1 parent 3e14966 commit 7884fd9
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 27 deletions.
2 changes: 2 additions & 0 deletions kafka-1.0.x/src/main/resources/monix/kafka/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ kafka {
monix.observable.commit.type = "sync"
# Possible values: before-ack, after-ack or no-ack
monix.observable.commit.order = "after-ack"

monix.observable.poll.interval.ms = 100
}
4 changes: 2 additions & 2 deletions kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import org.apache.kafka.common.TopicPartition
* */
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Task[Unit]]
def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit]
}

private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Task[Unit]] = Task.pure(Task.unit)
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ final class CommittableOffset private[kafka] (
* Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
def commitAsync(): Task[Task[Unit]] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))
}

object CommittableOffset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti
/**
* Asynchronously commits [[offsets]] to Kafka
* */
def commitAsync(): Task[Task[Unit]] = commitCallback.commitBatchAsync(offsets)
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/**
* Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ import scala.concurrent.duration._
* Specifies when the commit should happen, like before we receive the
* acknowledgement from downstream, or afterwards.
*
* @param pollInterval is the `monix.observable.poll.interval.ms` setting.
* Specifies time between KafkaConsumer#poll call attempts.
*
* @param properties map of other properties that will be passed to
* the underlying kafka client. Any properties not explicitly handled
* by this object can be set via the map, but in case of a duplicate
Expand Down Expand Up @@ -249,6 +252,7 @@ final case class KafkaConsumerConfig(
observableCommitType: ObservableCommitType,
observableCommitOrder: ObservableCommitOrder,
observableSeekToEndOnStart: Boolean,
pollInterval: FiniteDuration,
properties: Map[String, String]) {

def toMap: Map[String, String] = properties ++ Map(
Expand Down Expand Up @@ -436,6 +440,7 @@ object KafkaConsumerConfig {
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
pollInterval = config.getInt("monix.observable.poll.interval.ms").millis,
properties = Map.empty
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package monix.kafka

import monix.eval.Task
import monix.eval.{Fiber, Task}
import monix.execution.Ack.{Continue, Stop}
import monix.execution.atomic.Atomic
import monix.execution.{Ack, Callback, Cancelable}
import monix.kafka.config.ObservableCommitOrder
import monix.reactive.Observable
Expand All @@ -39,6 +40,8 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
protected def config: KafkaConsumerConfig
protected def consumer: Task[KafkaConsumer[K, V]]

protected val isAcked = Atomic(true)

/**
* Creates a task that polls the source, then feeds the downstream
* subscriber, returning the resulting acknowledgement
Expand All @@ -65,8 +68,10 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
// Skipping all available messages on all partitions
if (config.observableSeekToEndOnStart) c.seekToEnd(Nil.asJavaCollection)
// A task to execute on both cancellation and normal termination
val onCancel = cancelTask(c)
runLoop(c, out).guarantee(onCancel)
pollConsumer(c).loopForever.start.flatMap { pollFiber =>
val onCancel = cancelTask(c, pollFiber)
runLoop(c, out).guarantee(onCancel)
}
}
feedTask.runAsync(cb)
}
Expand All @@ -87,10 +92,12 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
/* Returns a `Task` that triggers the closing of the
* Kafka Consumer connection.
*/
private def cancelTask(consumer: KafkaConsumer[K, V]): Task[Unit] = {
private def cancelTask(consumer: KafkaConsumer[K, V], pollFiber: Fiber[Nothing]): Task[Unit] = {
// Forced asynchronous boundary
val cancelTask = Task.evalAsync {
consumer.synchronized(blocking(consumer.close()))
val cancelTask = pollFiber.cancel.flatMap { _ =>
Task.evalAsync {
consumer.synchronized(blocking(consumer.close()))
}
}

// By applying memoization, we are turning this
Expand All @@ -99,6 +106,28 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
// at most once
cancelTask.memoize
}

/* Returns task that constantly polls the `KafkaConsumer` in case subscriber
* is still processing last fed batch.
* This allows producer process commit calls and also keeps consumer alive even
* with long batch processing.
*/
private def pollConsumer(consumer: KafkaConsumer[K, V]): Task[Unit] = {
Task
.sleep(config.pollInterval)
.flatMap { _ =>
if (!isAcked.get()) {
Task.evalAsync {
consumer.synchronized{
blocking(consumer.poll(0))
}
}
} else {
Task.unit
}
}
.flatMap(_ => pollConsumer(consumer))
}
}

object KafkaConsumerObservable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava))))
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Task[Unit]] = {
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = {
Task
.async0[Unit] { (s, cb) =>
val asyncCb = Callback.forked(cb)(s)
Expand All @@ -64,8 +64,9 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
consumer.commitAsync(
offsets,
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception)
: Unit = {
override def onComplete(
offsets: util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception): Unit = {
if (isActive.compareAndSet(expect = true, update = false)) {
if (exception != null)
asyncCb.onError(exception)
Expand All @@ -86,8 +87,6 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
}
}
}
.start
.map(_.join)
}
}

Expand All @@ -103,9 +102,12 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
s.executeAsync { () =>
val ackFuture =
try consumer.synchronized {
val assignment = consumer.assignment()
if (cancelable.isCanceled) Stop
else {
consumer.resume(assignment)
val next = blocking(consumer.poll(pollTimeoutMillis))
consumer.pause(assignment)
// Feeding the observer happens on the Subscriber's scheduler
// if any asynchronous boundaries happen
val result = next.asScala.map { record =>
Expand All @@ -116,6 +118,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
record.offset() + 1,
commit))
}
isAcked.set(false)
Observer.feed(out, result)(out.scheduler)
}
} catch {
Expand All @@ -125,6 +128,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (

ackFuture.syncOnComplete {
case Success(ack) =>
isAcked.set(true)
// The `streamError` flag protects against contract violations
// (i.e. onSuccess/onError should happen only once).
// Not really required, but we don't want to depend on the
Expand All @@ -147,6 +151,7 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
}

case Failure(ex) =>
isAcked.set(true)
asyncCb.onError(ex)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package monix.kafka

import cats.effect.concurrent.{Deferred}
import cats.syntax.apply._
import monix.eval.{Coeval, Task}
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.kafka.config.AutoOffsetReset
import monix.reactive.Observable
Expand Down Expand Up @@ -156,27 +155,20 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
.bufferIntrospective(1024)
.consumeWith(producer)

val promise = Deferred.in[Coeval, Task, Task[(Seq[String], Map[TopicPartition, Long])]].value()

val listT = consumer
.executeOn(io)
.bufferTumbling(count)
.map { messages =>
messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset))
}
.mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_.map(_ => values -> batch.offsets)) }
.mapEval { complete =>
promise.complete(complete)
}
.subscribe()

.mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_ => values -> batch.offsets) }
.headL

val ((result, offsets), _) =
Await.result(Task.parZip2(promise.get.flatten, pushT.executeAsync).runToFuture, 60.seconds)
Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds)

val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000)
assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets)
listT.cancel()
}
}

Expand Down

0 comments on commit 7884fd9

Please sign in to comment.