diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala index eb666b7b..a69fcda9 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala @@ -20,9 +20,8 @@ import monix.eval.Task import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.OffsetCommitCallback -/** - * Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. - * */ +/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. + */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableMessage.scala b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableMessage.scala index 19a8ddc6..80788ff0 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableMessage.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableMessage.scala @@ -18,7 +18,6 @@ package monix.kafka import org.apache.kafka.clients.consumer.ConsumerRecord -/** - * Represents data consumed from Kafka and [[CommittableOffset]] built from it - * */ +/** Represents data consumed from Kafka and [[CommittableOffset]] built from it + */ final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset) diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala index f0620885..6285de46 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -37,22 +37,19 @@ final class CommittableOffset private[kafka] ( val offset: Long, private[kafka] val commitCallback: Commit) { - /** - * Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended + /** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala index 8b2b098e..213ecd1e 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -37,25 +37,21 @@ import org.apache.kafka.common.TopicPartition */ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) { - /** - * Synchronously commits [[offsets]] to Kafka - * */ + /** Synchronously commits [[offsets]] to Kafka + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified + /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified * for same topic and partition. - * */ + */ def updated(committableOffset: CommittableOffset): CommittableOffsetBatch = new CommittableOffsetBatch( offsets.updated(committableOffset.topicPartition, committableOffset.offset), @@ -65,19 +61,17 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti object CommittableOffsetBatch { - /** - * Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: + /** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: * {{{ * offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _) * }}} - * */ + */ val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty) - /** - * Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with + /** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with * sequence order. If there is more than once offset for a topic and partition in the * sequence then the last one will remain. - * */ + */ def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch = if (offsets.nonEmpty) { val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) => @@ -88,15 +82,13 @@ object CommittableOffsetBatch { empty } - /** - * Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets + /** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets * that have the same commit callback. This will help when the committable offsets are * from different consumers. * {{{ * CommittableOffsetBatch.mergeByCommitCallback(offsets) * }}} - * - * */ + */ def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = { if (committableOffsets.nonEmpty) { committableOffsets diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index c9d73ba6..102c4059 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -40,10 +40,9 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] - /** - * Creates a task that polls the source, then feeds the downstream + /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement - * */ + */ protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack] override final def unsafeSubscribeFn(out: Subscriber[Out]): Cancelable = { @@ -128,8 +127,8 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) @@ -144,16 +143,15 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topicsRegex) apply(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -175,7 +173,7 @@ object KafkaConsumerObservable { * @param consumer is a factory for the * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka - * */ + */ def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { @@ -184,8 +182,7 @@ object KafkaConsumerObservable { new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -205,17 +202,16 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topics is the list of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) manualCommit(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -235,9 +231,9 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topicsRegex is the pattern of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topicsRegex) @@ -245,8 +241,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given list of topics. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { import collection.JavaConverters._ @@ -261,8 +257,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given topics regex. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { Task.evalAsync { val configMap = config.toJavaMap diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 79fea16e..fe04ec60 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -45,15 +45,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( case class CommitWithConsumer(consumer: Consumer[K, V]) extends Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = - Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) + Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) }.asJava)))) override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { - blocking(consumer.synchronized(consumer.commitAsync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) - }.asJava, callback))) + blocking( + consumer.synchronized( + consumer.commitAsync( + batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) + }.asJava, + callback))) } } diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducer.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducer.scala index 74131ede..7dff84b0 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducer.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducer.scala @@ -26,12 +26,12 @@ import org.apache.kafka.clients.producer.{ RecordMetadata, Callback => KafkaCallback, Producer => ApacheProducer, - KafkaProducer => ApacheKafkaProducer} + KafkaProducer => ApacheKafkaProducer +} import scala.util.control.NonFatal -/** - * Wraps the Kafka Producer. +/** Wraps the Kafka Producer. * * Calling `producer.send` returns a `Task[Option[RecordMetadata]]` * which can then be run and transformed into a `Future`. @@ -55,10 +55,10 @@ trait KafkaProducer[K, V] extends Serializable { object KafkaProducer { /** Builds a [[KafkaProducer]] instance. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { - val producerRef: Coeval[ApacheProducer[K, V]] = Coeval.evalOnce { + val producerRef: Coeval[ApacheProducer[K, V]] = Coeval { val keySerializer = K.create() val valueSerializer = V.create() val configJavaMap = config.toJavaMap @@ -70,21 +70,24 @@ object KafkaProducer { } /** Builds a [[KafkaProducer]] instance with provided Apache Producer. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { new Implementation[K, V](config, sc, producerRef) } - private final class Implementation[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], - V: Serializer[V]) + private final class Implementation[K, V]( + config: KafkaProducerConfig, + sc: Scheduler, + producer: Coeval[ApacheProducer[K, V]])(implicit K: Serializer[K], V: Serializer[V]) extends KafkaProducer[K, V] with StrictLogging { private val isCanceled = Atomic(false) + lazy val producerRef = producer.value() + def underlying: Task[ApacheProducer[K, V]] = - producerRef.to[Task] + Task.eval(producerRef) def send(topic: String, value: V): Task[Option[RecordMetadata]] = send(new ProducerRecord[K, V](topic, value)) @@ -104,8 +107,8 @@ object KafkaProducer { val isActive = Atomic(true) val cancelable = SingleAssignCancelable() try { - // Using asynchronous API - val future = producerRef.value().send( + // Using asynchronous API + val future = producerRef.send( record, new KafkaCallback { def onCompletion(meta: RecordMetadata, exception: Exception): Unit = @@ -153,7 +156,7 @@ object KafkaProducer { asyncCb.onSuccess(()) } else { try { - producerRef.value().close() + producerRef.close() asyncCb.onSuccess(()) } catch { case NonFatal(ex) => diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 78d0efb7..8c16ca37 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -109,13 +109,13 @@ object KafkaProducerSink extends StrictLogging { } /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = apply(config, sc, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = { val producer = Coeval(KafkaProducer[K, V](config, sc)) diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala index eb666b7b..a69fcda9 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala @@ -20,9 +20,8 @@ import monix.eval.Task import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.OffsetCommitCallback -/** - * Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. - * */ +/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. + */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableMessage.scala b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableMessage.scala index 19a8ddc6..80788ff0 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableMessage.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableMessage.scala @@ -18,7 +18,6 @@ package monix.kafka import org.apache.kafka.clients.consumer.ConsumerRecord -/** - * Represents data consumed from Kafka and [[CommittableOffset]] built from it - * */ +/** Represents data consumed from Kafka and [[CommittableOffset]] built from it + */ final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset) diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala index f0620885..6285de46 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -37,22 +37,19 @@ final class CommittableOffset private[kafka] ( val offset: Long, private[kafka] val commitCallback: Commit) { - /** - * Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended + /** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala index a7ed6f7a..92cfcbf7 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -37,25 +37,21 @@ import org.apache.kafka.common.TopicPartition */ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) { - /** - * Synchronously commits [[offsets]] to Kafka - * */ + /** Synchronously commits [[offsets]] to Kafka + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified + /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified * for same topic and partition. - * */ + */ def updated(committableOffset: CommittableOffset): CommittableOffsetBatch = new CommittableOffsetBatch( offsets.updated(committableOffset.topicPartition, committableOffset.offset), @@ -65,19 +61,17 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti object CommittableOffsetBatch { - /** - * Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: + /** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: * {{{ * offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _) * }}} - * */ + */ val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty) - /** - * Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with + /** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with * sequence order. If there is more than once offset for a topic and partition in the * sequence then the last one will remain. - * */ + */ def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch = if (offsets.nonEmpty) { val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) => @@ -88,15 +82,13 @@ object CommittableOffsetBatch { empty } - /** - * Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets + /** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets * that have the same commit callback. This will help when the committable offsets are * from different consumers. * {{{ * CommittableOffsetBatch.mergeByCommitCallback(offsets) * }}} - * - * */ + */ def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = { if (committableOffsets.nonEmpty) { committableOffsets diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index c9d73ba6..102c4059 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -40,10 +40,9 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] - /** - * Creates a task that polls the source, then feeds the downstream + /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement - * */ + */ protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack] override final def unsafeSubscribeFn(out: Subscriber[Out]): Cancelable = { @@ -128,8 +127,8 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) @@ -144,16 +143,15 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topicsRegex) apply(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -175,7 +173,7 @@ object KafkaConsumerObservable { * @param consumer is a factory for the * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka - * */ + */ def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { @@ -184,8 +182,7 @@ object KafkaConsumerObservable { new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -205,17 +202,16 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topics is the list of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) manualCommit(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -235,9 +231,9 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topicsRegex is the pattern of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topicsRegex) @@ -245,8 +241,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given list of topics. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { import collection.JavaConverters._ @@ -261,8 +257,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given topics regex. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { Task.evalAsync { val configMap = config.toJavaMap diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 79fea16e..fe04ec60 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -45,15 +45,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( case class CommitWithConsumer(consumer: Consumer[K, V]) extends Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = - Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) + Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) }.asJava)))) override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { - blocking(consumer.synchronized(consumer.commitAsync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) - }.asJava, callback))) + blocking( + consumer.synchronized( + consumer.commitAsync( + batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) + }.asJava, + callback))) } } diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducer.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducer.scala index 18b3e8e3..7dff84b0 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducer.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducer.scala @@ -26,12 +26,12 @@ import org.apache.kafka.clients.producer.{ RecordMetadata, Callback => KafkaCallback, Producer => ApacheProducer, - KafkaProducer => ApacheKafkaProducer} + KafkaProducer => ApacheKafkaProducer +} import scala.util.control.NonFatal -/** - * Wraps the Kafka Producer. +/** Wraps the Kafka Producer. * * Calling `producer.send` returns a `Task[Option[RecordMetadata]]` * which can then be run and transformed into a `Future`. @@ -55,10 +55,10 @@ trait KafkaProducer[K, V] extends Serializable { object KafkaProducer { /** Builds a [[KafkaProducer]] instance. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { - val producerRef: Coeval[ApacheProducer[K, V]] = Coeval.evalOnce { + val producerRef: Coeval[ApacheProducer[K, V]] = Coeval { val keySerializer = K.create() val valueSerializer = V.create() val configJavaMap = config.toJavaMap @@ -70,21 +70,24 @@ object KafkaProducer { } /** Builds a [[KafkaProducer]] instance with provided Apache Producer. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { new Implementation[K, V](config, sc, producerRef) } - private final class Implementation[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], - V: Serializer[V]) + private final class Implementation[K, V]( + config: KafkaProducerConfig, + sc: Scheduler, + producer: Coeval[ApacheProducer[K, V]])(implicit K: Serializer[K], V: Serializer[V]) extends KafkaProducer[K, V] with StrictLogging { private val isCanceled = Atomic(false) + lazy val producerRef = producer.value() + def underlying: Task[ApacheProducer[K, V]] = - producerRef.to[Task] + Task.eval(producerRef) def send(topic: String, value: V): Task[Option[RecordMetadata]] = send(new ProducerRecord[K, V](topic, value)) @@ -105,7 +108,7 @@ object KafkaProducer { val cancelable = SingleAssignCancelable() try { // Using asynchronous API - val future = producerRef.value().send( + val future = producerRef.send( record, new KafkaCallback { def onCompletion(meta: RecordMetadata, exception: Exception): Unit = @@ -153,7 +156,7 @@ object KafkaProducer { asyncCb.onSuccess(()) } else { try { - producerRef.value().close() + producerRef.close() asyncCb.onSuccess(()) } catch { case NonFatal(ex) => diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 78d0efb7..8c16ca37 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -109,13 +109,13 @@ object KafkaProducerSink extends StrictLogging { } /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = apply(config, sc, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = { val producer = Coeval(KafkaProducer[K, V](config, sc)) diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala index eb666b7b..a69fcda9 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/Commit.scala @@ -20,9 +20,8 @@ import monix.eval.Task import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.OffsetCommitCallback -/** - * Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. - * */ +/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. + */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableMessage.scala b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableMessage.scala index 19a8ddc6..80788ff0 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableMessage.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableMessage.scala @@ -18,7 +18,6 @@ package monix.kafka import org.apache.kafka.clients.consumer.ConsumerRecord -/** - * Represents data consumed from Kafka and [[CommittableOffset]] built from it - * */ +/** Represents data consumed from Kafka and [[CommittableOffset]] built from it + */ final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset) diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala index f0620885..6285de46 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -37,22 +37,19 @@ final class CommittableOffset private[kafka] ( val offset: Long, private[kafka] val commitCallback: Commit) { - /** - * Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended + /** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala index 8b2b098e..213ecd1e 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -37,25 +37,21 @@ import org.apache.kafka.common.TopicPartition */ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) { - /** - * Synchronously commits [[offsets]] to Kafka - * */ + /** Synchronously commits [[offsets]] to Kafka + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified + /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified * for same topic and partition. - * */ + */ def updated(committableOffset: CommittableOffset): CommittableOffsetBatch = new CommittableOffsetBatch( offsets.updated(committableOffset.topicPartition, committableOffset.offset), @@ -65,19 +61,17 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti object CommittableOffsetBatch { - /** - * Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: + /** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: * {{{ * offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _) * }}} - * */ + */ val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty) - /** - * Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with + /** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with * sequence order. If there is more than once offset for a topic and partition in the * sequence then the last one will remain. - * */ + */ def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch = if (offsets.nonEmpty) { val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) => @@ -88,15 +82,13 @@ object CommittableOffsetBatch { empty } - /** - * Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets + /** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets * that have the same commit callback. This will help when the committable offsets are * from different consumers. * {{{ * CommittableOffsetBatch.mergeByCommitCallback(offsets) * }}} - * - * */ + */ def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = { if (committableOffsets.nonEmpty) { committableOffsets diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index 70c9cfa9..e1d89b77 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -37,10 +37,9 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] - /** - * Creates a task that polls the source, then feeds the downstream + /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement - * */ + */ protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack] override final def unsafeSubscribeFn(out: Subscriber[Out]): Cancelable = { @@ -125,16 +124,15 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) apply(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -156,7 +154,7 @@ object KafkaConsumerObservable { * @param consumer is a factory for the * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka - * */ + */ def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { @@ -165,8 +163,7 @@ object KafkaConsumerObservable { new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -186,9 +183,9 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topics is the list of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) @@ -196,8 +193,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given list of topics. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { import collection.JavaConverters._ diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 79fea16e..fe04ec60 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -45,15 +45,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( case class CommitWithConsumer(consumer: Consumer[K, V]) extends Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = - Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) + Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) }.asJava)))) override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { - blocking(consumer.synchronized(consumer.commitAsync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) - }.asJava, callback))) + blocking( + consumer.synchronized( + consumer.commitAsync( + batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) + }.asJava, + callback))) } } diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducer.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducer.scala index 037f43fe..7dff84b0 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducer.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducer.scala @@ -31,8 +31,7 @@ import org.apache.kafka.clients.producer.{ import scala.util.control.NonFatal -/** - * Wraps the Kafka Producer. +/** Wraps the Kafka Producer. * * Calling `producer.send` returns a `Task[Option[RecordMetadata]]` * which can then be run and transformed into a `Future`. @@ -56,10 +55,10 @@ trait KafkaProducer[K, V] extends Serializable { object KafkaProducer { /** Builds a [[KafkaProducer]] instance. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { - val producerRef: Coeval[ApacheProducer[K, V]] = Coeval.evalOnce { + val producerRef: Coeval[ApacheProducer[K, V]] = Coeval { val keySerializer = K.create() val valueSerializer = V.create() val configJavaMap = config.toJavaMap @@ -71,21 +70,24 @@ object KafkaProducer { } /** Builds a [[KafkaProducer]] instance with provided Apache Producer. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { new Implementation[K, V](config, sc, producerRef) } - private final class Implementation[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], - V: Serializer[V]) + private final class Implementation[K, V]( + config: KafkaProducerConfig, + sc: Scheduler, + producer: Coeval[ApacheProducer[K, V]])(implicit K: Serializer[K], V: Serializer[V]) extends KafkaProducer[K, V] with StrictLogging { private val isCanceled = Atomic(false) + lazy val producerRef = producer.value() + def underlying: Task[ApacheProducer[K, V]] = - producerRef.to[Task] + Task.eval(producerRef) def send(topic: String, value: V): Task[Option[RecordMetadata]] = send(new ProducerRecord[K, V](topic, value)) @@ -106,7 +108,7 @@ object KafkaProducer { val cancelable = SingleAssignCancelable() try { // Using asynchronous API - val future = producerRef.value().send( + val future = producerRef.send( record, new KafkaCallback { def onCompletion(meta: RecordMetadata, exception: Exception): Unit = @@ -154,7 +156,7 @@ object KafkaProducer { asyncCb.onSuccess(()) } else { try { - producerRef.value().close() + producerRef.close() asyncCb.onSuccess(()) } catch { case NonFatal(ex) => diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 78d0efb7..8c16ca37 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -109,13 +109,13 @@ object KafkaProducerSink extends StrictLogging { } /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = apply(config, sc, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = { val producer = Coeval(KafkaProducer[K, V](config, sc)) diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala index eb666b7b..a69fcda9 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/Commit.scala @@ -20,9 +20,8 @@ import monix.eval.Task import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.OffsetCommitCallback -/** - * Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. - * */ +/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context. + */ trait Commit { def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableMessage.scala b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableMessage.scala index 19a8ddc6..80788ff0 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableMessage.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableMessage.scala @@ -18,7 +18,6 @@ package monix.kafka import org.apache.kafka.clients.consumer.ConsumerRecord -/** - * Represents data consumed from Kafka and [[CommittableOffset]] built from it - * */ +/** Represents data consumed from Kafka and [[CommittableOffset]] built from it + */ final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset) diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala index f0620885..6285de46 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffset.scala @@ -37,22 +37,19 @@ final class CommittableOffset private[kafka] ( val offset: Long, private[kafka] val commitCallback: Commit) { - /** - * Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended + /** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset)) - /** - * Asynchronously commits [[offset]] to Kafka. It is recommended + /** Asynchronously commits [[offset]] to Kafka. It is recommended * to use batched commit with [[CommittableOffsetBatch]] class. - * */ + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback) } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala index 8b2b098e..213ecd1e 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/CommittableOffsetBatch.scala @@ -37,25 +37,21 @@ import org.apache.kafka.common.TopicPartition */ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) { - /** - * Synchronously commits [[offsets]] to Kafka - * */ + /** Synchronously commits [[offsets]] to Kafka + */ def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Asynchronously commits [[offsets]] to Kafka - * */ + /** Asynchronously commits [[offsets]] to Kafka + */ def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets) - /** - * Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified + /** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified * for same topic and partition. - * */ + */ def updated(committableOffset: CommittableOffset): CommittableOffsetBatch = new CommittableOffsetBatch( offsets.updated(committableOffset.topicPartition, committableOffset.offset), @@ -65,19 +61,17 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti object CommittableOffsetBatch { - /** - * Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: + /** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold: * {{{ * offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _) * }}} - * */ + */ val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty) - /** - * Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with + /** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with * sequence order. If there is more than once offset for a topic and partition in the * sequence then the last one will remain. - * */ + */ def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch = if (offsets.nonEmpty) { val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) => @@ -88,15 +82,13 @@ object CommittableOffsetBatch { empty } - /** - * Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets + /** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets * that have the same commit callback. This will help when the committable offsets are * from different consumers. * {{{ * CommittableOffsetBatch.mergeByCommitCallback(offsets) * }}} - * - * */ + */ def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = { if (committableOffsets.nonEmpty) { committableOffsets diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala index 3b4d1746..46faf215 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala @@ -157,7 +157,7 @@ import scala.concurrent.duration._ * * @param clientRack is the `client.rack` setting. * A rack identifier for this client. - * This can be any string value which indicates where this client is physically located. + * This can be any string value which indicates where this client is physically located. * It corresponds with the broker config 'broker.rack' * * @param fetchMaxWaitTime is the `fetch.max.wait.ms` setting, diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala index e2e7384c..8143036e 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala @@ -39,10 +39,9 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] { protected def config: KafkaConsumerConfig protected def consumer: Task[Consumer[K, V]] - /** - * Creates a task that polls the source, then feeds the downstream + /** Creates a task that polls the source, then feeds the downstream * subscriber, returning the resulting acknowledgement - * */ + */ protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack] override final def unsafeSubscribeFn(out: Subscriber[Out]): Cancelable = { @@ -127,8 +126,8 @@ object KafkaConsumerObservable { * * @param topics is the list of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) @@ -143,16 +142,15 @@ object KafkaConsumerObservable { * * @param topicsRegex is the pattern of Kafka topics to subscribe to. */ - def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = { val consumer = createConsumer[K, V](cfg, topicsRegex) apply(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -174,7 +172,7 @@ object KafkaConsumerObservable { * @param consumer is a factory for the * `org.apache.kafka.clients.consumer.KafkaConsumer` * instance to use for consuming from Kafka - * */ + */ def manualCommit[K, V]( cfg: KafkaConsumerConfig, consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { @@ -183,8 +181,7 @@ object KafkaConsumerObservable { new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -204,17 +201,16 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topics is the list of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topics) manualCommit(cfg, consumer) } - /** - * Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets + /** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets * and forcibly disables auto commits in configuration. * Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord. * @@ -234,9 +230,9 @@ object KafkaConsumerObservable { * observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly! * * @param topicsRegex is the pattern of Kafka topics to subscribe to. - * */ - def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + */ + def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = { val consumer = createConsumer[K, V](cfg, topicsRegex) @@ -244,8 +240,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given list of topics. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { import collection.JavaConverters._ @@ -260,8 +256,8 @@ object KafkaConsumerObservable { } /** Returns a `Task` for creating a consumer instance given topics regex. */ - def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)( - implicit K: Deserializer[K], + def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)(implicit + K: Deserializer[K], V: Deserializer[V]): Task[Consumer[K, V]] = { Task.evalAsync { val configMap = config.toJavaMap diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala index 79fea16e..fe04ec60 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaConsumerObservableManualCommit.scala @@ -45,15 +45,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] ( case class CommitWithConsumer(consumer: Consumer[K, V]) extends Commit { override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] = - Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) + Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) }.asJava)))) override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] = Task { - blocking(consumer.synchronized(consumer.commitAsync(batch.map { - case (k, v) => k -> new OffsetAndMetadata(v) - }.asJava, callback))) + blocking( + consumer.synchronized( + consumer.commitAsync( + batch.map { case (k, v) => + k -> new OffsetAndMetadata(v) + }.asJava, + callback))) } } diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducer.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducer.scala index 18b3e8e3..7dff84b0 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducer.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducer.scala @@ -26,12 +26,12 @@ import org.apache.kafka.clients.producer.{ RecordMetadata, Callback => KafkaCallback, Producer => ApacheProducer, - KafkaProducer => ApacheKafkaProducer} + KafkaProducer => ApacheKafkaProducer +} import scala.util.control.NonFatal -/** - * Wraps the Kafka Producer. +/** Wraps the Kafka Producer. * * Calling `producer.send` returns a `Task[Option[RecordMetadata]]` * which can then be run and transformed into a `Future`. @@ -55,10 +55,10 @@ trait KafkaProducer[K, V] extends Serializable { object KafkaProducer { /** Builds a [[KafkaProducer]] instance. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { - val producerRef: Coeval[ApacheProducer[K, V]] = Coeval.evalOnce { + val producerRef: Coeval[ApacheProducer[K, V]] = Coeval { val keySerializer = K.create() val valueSerializer = V.create() val configJavaMap = config.toJavaMap @@ -70,21 +70,24 @@ object KafkaProducer { } /** Builds a [[KafkaProducer]] instance with provided Apache Producer. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducer[K, V] = { new Implementation[K, V](config, sc, producerRef) } - private final class Implementation[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])( - implicit K: Serializer[K], - V: Serializer[V]) + private final class Implementation[K, V]( + config: KafkaProducerConfig, + sc: Scheduler, + producer: Coeval[ApacheProducer[K, V]])(implicit K: Serializer[K], V: Serializer[V]) extends KafkaProducer[K, V] with StrictLogging { private val isCanceled = Atomic(false) + lazy val producerRef = producer.value() + def underlying: Task[ApacheProducer[K, V]] = - producerRef.to[Task] + Task.eval(producerRef) def send(topic: String, value: V): Task[Option[RecordMetadata]] = send(new ProducerRecord[K, V](topic, value)) @@ -105,7 +108,7 @@ object KafkaProducer { val cancelable = SingleAssignCancelable() try { // Using asynchronous API - val future = producerRef.value().send( + val future = producerRef.send( record, new KafkaCallback { def onCompletion(meta: RecordMetadata, exception: Exception): Unit = @@ -153,7 +156,7 @@ object KafkaProducer { asyncCb.onSuccess(()) } else { try { - producerRef.value().close() + producerRef.close() asyncCb.onSuccess(()) } catch { case NonFatal(ex) => diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 78d0efb7..8c16ca37 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -109,13 +109,13 @@ object KafkaProducerSink extends StrictLogging { } /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = apply(config, sc, onSendErrorDefault) /** Builder for [[KafkaProducerSink]]. */ - def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])( - implicit K: Serializer[K], + def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(implicit + K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K, V] = { val producer = Coeval(KafkaProducer[K, V](config, sc))