Skip to content

Commit

Permalink
Async commit completed with callback (monix#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
voidconductor committed Aug 26, 2019
1 parent 69e3f9c commit c51f7dd
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 28 deletions.
7 changes: 2 additions & 5 deletions kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@ package monix.kafka

import monix.eval.Task
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.OffsetCommitCallback

/**
* Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context.
* */
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit]
final def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Unit] = commitBatchAsync(batch, null)
def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Task[Unit]]
}

private[kafka] object Commit {

val empty: Commit = new Commit {
override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task.unit
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Task[Unit]] = Task.pure(Task.unit)
}
}
10 changes: 1 addition & 9 deletions kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package monix.kafka

import monix.eval.Task
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition

/** Represents offset for specified topic and partition that can be
Expand Down Expand Up @@ -47,14 +46,7 @@ final class CommittableOffset private[kafka] (
* Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))

/**
* Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
def commitAsync(callback: OffsetCommitCallback): Task[Unit] =
commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback)
def commitAsync(): Task[Task[Unit]] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))
}

object CommittableOffset {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package monix.kafka

import monix.eval.Task
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition

/** Batch of Kafka offsets which can be committed together.
Expand Down Expand Up @@ -45,12 +44,7 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti
/**
* Asynchronously commits [[offsets]] to Kafka
* */
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/**
* Asynchronously commits [[offsets]] to Kafka
* */
def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets)
def commitAsync(): Task[Task[Unit]] = commitCallback.commitBatchAsync(offsets)

/**
* Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package monix.kafka

import java.util

import monix.eval.Task
import monix.execution.Ack.Stop
import monix.execution.atomic.Atomic
import monix.execution.cancelables.BooleanCancelable
import monix.execution.{Ack, Callback}
import monix.reactive.Observer
Expand Down Expand Up @@ -47,12 +50,45 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava))))
override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava, callback)))
}
override def commitBatchAsync(batch: Map[TopicPartition, Long]): Task[Task[Unit]] = {
Task
.async0[Unit] { (s, cb) =>
val asyncCb = Callback.forked(cb)(s)
s.executeAsync { () =>
consumer.synchronized {
val isActive = Atomic(true)
try {
val offsets = batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava
consumer.commitAsync(
offsets,
new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception)
: Unit = {
if (isActive.compareAndSet(expect = true, update = false)) {
if (exception != null)
asyncCb.onError(exception)
else
asyncCb.onSuccess(())
} else if (exception != null)
s.reportFailure(exception)
}
}
)
} catch {
case NonFatal(ex) =>
if (isActive.compareAndSet(expect = true, update = false)) {
asyncCb.onError(ex)
} else
s.reportFailure(ex)
}
}
}
}
.start
.map(_.join)
}
}

override protected def ackTask(consumer: KafkaConsumer[K, V], out: Subscriber[CommittableMessage[K, V]]): Task[Ack] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package monix.kafka

import cats.effect.concurrent.{Deferred}
import cats.syntax.apply._
import monix.eval.Task
import monix.eval.{Coeval, Task}
import monix.execution.Scheduler.Implicits.global
import monix.kafka.config.AutoOffsetReset
import monix.reactive.Observable
Expand Down Expand Up @@ -140,6 +141,45 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
}
}

test("manual async commit consumer test when subscribed to topics list") {
withRunningKafka {

val count = 10000
val topicName = "monix-kafka-manual-commit-tests"

val producer = KafkaProducerSink[String, String](producerCfg, io)
val consumer = KafkaConsumerObservable.manualCommit[String, String](consumerCfg, List(topicName))

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", msg.toString))
.bufferIntrospective(1024)
.consumeWith(producer)

val promise = Deferred.in[Coeval, Task, Task[(Seq[String], Map[TopicPartition, Long])]].value()

val listT = consumer
.executeOn(io)
.bufferTumbling(count)
.map { messages =>
messages.map(_.record.value()) -> CommittableOffsetBatch(messages.map(_.committableOffset))
}
.mapEval { case (values, batch) => Task.shift *> batch.commitAsync().map(_.map(_ => values -> batch.offsets)) }
.mapEval { complete =>
promise.complete(complete)
}
.subscribe()


val ((result, offsets), _) =
Await.result(Task.parZip2(promise.get.flatten, pushT.executeAsync).runToFuture, 60.seconds)

val properOffsets = Map(new TopicPartition(topicName, 0) -> 10000)
assert(result.map(_.toInt).sum === (0 until count).sum && offsets === properOffsets)
listT.cancel()
}
}

test("publish to closed producer when subscribed to topics list") {
withRunningKafka {
val producer = KafkaProducer[String, String](producerCfg, io)
Expand Down

0 comments on commit c51f7dd

Please sign in to comment.