Skip to content

Commit

Permalink
scalafmt and producer tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
Avasil committed Jan 1, 2021
1 parent bf109be commit c7231f9
Show file tree
Hide file tree
Showing 33 changed files with 288 additions and 328 deletions.
5 changes: 2 additions & 3 deletions kafka-0.10.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import monix.eval.Task
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.OffsetCommitCallback

/**
* Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context.
* */
/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context.
*/
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package monix.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord

/**
* Represents data consumed from Kafka and [[CommittableOffset]] built from it
* */
/** Represents data consumed from Kafka and [[CommittableOffset]] built from it
*/
final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset)
15 changes: 6 additions & 9 deletions kafka-0.10.x/src/main/scala/monix/kafka/CommittableOffset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,19 @@ final class CommittableOffset private[kafka] (
val offset: Long,
private[kafka] val commitCallback: Commit) {

/**
* Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended
/** Synchronously commits [[offset]] for the [[topicPartition]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
*/
def commitSync(): Task[Unit] = commitCallback.commitBatchSync(Map(topicPartition -> offset))

/**
* Asynchronously commits [[offset]] to Kafka. It is recommended
/** Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
*/
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(Map(topicPartition -> offset))

/**
* Asynchronously commits [[offset]] to Kafka. It is recommended
/** Asynchronously commits [[offset]] to Kafka. It is recommended
* to use batched commit with [[CommittableOffsetBatch]] class.
* */
*/
def commitAsync(callback: OffsetCommitCallback): Task[Unit] =
commitCallback.commitBatchAsync(Map(topicPartition -> offset), callback)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,21 @@ import org.apache.kafka.common.TopicPartition
*/
final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartition, Long], commitCallback: Commit) {

/**
* Synchronously commits [[offsets]] to Kafka
* */
/** Synchronously commits [[offsets]] to Kafka
*/
def commitSync(): Task[Unit] = commitCallback.commitBatchSync(offsets)

/**
* Asynchronously commits [[offsets]] to Kafka
* */
/** Asynchronously commits [[offsets]] to Kafka
*/
def commitAsync(): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/**
* Asynchronously commits [[offsets]] to Kafka
* */
/** Asynchronously commits [[offsets]] to Kafka
*/
def commitAsync(callback: OffsetCommitCallback): Task[Unit] = commitCallback.commitBatchAsync(offsets)

/**
* Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified
/** Adds new [[CommittableOffset]] to batch. Added offset replaces previous one specified
* for same topic and partition.
* */
*/
def updated(committableOffset: CommittableOffset): CommittableOffsetBatch =
new CommittableOffsetBatch(
offsets.updated(committableOffset.topicPartition, committableOffset.offset),
Expand All @@ -65,19 +61,17 @@ final class CommittableOffsetBatch private[kafka] (val offsets: Map[TopicPartiti

object CommittableOffsetBatch {

/**
* Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold:
/** Creates empty [[CommittableOffsetBatch]]. Can be used as neutral element in fold:
* {{{
* offsets.foldLeft(CommittableOffsetBatch.empty)(_ updated _)
* }}}
* */
*/
val empty: CommittableOffsetBatch = new CommittableOffsetBatch(Map.empty, Commit.empty)

/**
* Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with
/** Builds [[CommittableOffsetBatch]] from offsets sequence. Be careful with
* sequence order. If there is more than once offset for a topic and partition in the
* sequence then the last one will remain.
* */
*/
def apply(offsets: Seq[CommittableOffset]): CommittableOffsetBatch =
if (offsets.nonEmpty) {
val aggregatedOffsets = offsets.foldLeft(Map.empty[TopicPartition, Long]) { (acc, o) =>
Expand All @@ -88,15 +82,13 @@ object CommittableOffsetBatch {
empty
}

/**
* Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets
/** Builds [[CommittableOffsetBatch]] list from offsets sequence by merging the offsets
* that have the same commit callback. This will help when the committable offsets are
* from different consumers.
* {{{
* CommittableOffsetBatch.mergeByCommitCallback(offsets)
* }}}
*
* */
*/
def mergeByCommitCallback(committableOffsets: Seq[CommittableOffset]): List[CommittableOffsetBatch] = {
if (committableOffsets.nonEmpty) {
committableOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ trait KafkaConsumerObservable[K, V, Out] extends Observable[Out] {
protected def config: KafkaConsumerConfig
protected def consumer: Task[Consumer[K, V]]

/**
* Creates a task that polls the source, then feeds the downstream
/** Creates a task that polls the source, then feeds the downstream
* subscriber, returning the resulting acknowledgement
* */
*/
protected def ackTask(consumer: Consumer[K, V], out: Subscriber[Out]): Task[Ack]

override final def unsafeSubscribeFn(out: Subscriber[Out]): Cancelable = {
Expand Down Expand Up @@ -128,8 +127,8 @@ object KafkaConsumerObservable {
*
* @param topics is the list of Kafka topics to subscribe to.
*/
def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(
implicit K: Deserializer[K],
def apply[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = {

val consumer = createConsumer[K, V](cfg, topics)
Expand All @@ -144,16 +143,15 @@ object KafkaConsumerObservable {
*
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
*/
def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(
implicit K: Deserializer[K],
def apply[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, ConsumerRecord[K, V]] = {

val consumer = createConsumer[K, V](cfg, topicsRegex)
apply(cfg, consumer)
}

/**
* Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
* and forcibly disables auto commits in configuration.
* Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord.
*
Expand All @@ -175,7 +173,7 @@ object KafkaConsumerObservable {
* @param consumer is a factory for the
* `org.apache.kafka.clients.consumer.KafkaConsumer`
* instance to use for consuming from Kafka
* */
*/
def manualCommit[K, V](
cfg: KafkaConsumerConfig,
consumer: Task[Consumer[K, V]]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {
Expand All @@ -184,8 +182,7 @@ object KafkaConsumerObservable {
new KafkaConsumerObservableManualCommit[K, V](manualCommitConfig, consumer)
}

/**
* Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
* and forcibly disables auto commits in configuration.
* Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord.
*
Expand All @@ -205,17 +202,16 @@ object KafkaConsumerObservable {
* observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly!
*
* @param topics is the list of Kafka topics to subscribe to.
* */
def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(
implicit K: Deserializer[K],
*/
def manualCommit[K, V](cfg: KafkaConsumerConfig, topics: List[String])(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {

val consumer = createConsumer[K, V](cfg, topics)
manualCommit(cfg, consumer)
}

/**
* Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
/** Builds a [[KafkaConsumerObservable]] instance with ability to manual commit offsets
* and forcibly disables auto commits in configuration.
* Such instances emit [[CommittableMessage]] instead of Kafka's ConsumerRecord.
*
Expand All @@ -235,18 +231,18 @@ object KafkaConsumerObservable {
* observable commit order will turned to [[monix.kafka.config.ObservableCommitOrder.NoAck NoAck]] forcibly!
*
* @param topicsRegex is the pattern of Kafka topics to subscribe to.
* */
def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(
implicit K: Deserializer[K],
*/
def manualCommit[K, V](cfg: KafkaConsumerConfig, topicsRegex: Regex)(implicit
K: Deserializer[K],
V: Deserializer[V]): KafkaConsumerObservable[K, V, CommittableMessage[K, V]] = {

val consumer = createConsumer[K, V](cfg, topicsRegex)
manualCommit(cfg, consumer)
}

/** Returns a `Task` for creating a consumer instance given list of topics. */
def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(
implicit K: Deserializer[K],
def createConsumer[K, V](config: KafkaConsumerConfig, topics: List[String])(implicit
K: Deserializer[K],
V: Deserializer[V]): Task[Consumer[K, V]] = {

import collection.JavaConverters._
Expand All @@ -261,8 +257,8 @@ object KafkaConsumerObservable {
}

/** Returns a `Task` for creating a consumer instance given topics regex. */
def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)(
implicit K: Deserializer[K],
def createConsumer[K, V](config: KafkaConsumerConfig, topicsRegex: Regex)(implicit
K: Deserializer[K],
V: Deserializer[V]): Task[Consumer[K, V]] = {
Task.evalAsync {
val configMap = config.toJavaMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ final class KafkaConsumerObservableManualCommit[K, V] private[kafka] (
case class CommitWithConsumer(consumer: Consumer[K, V]) extends Commit {

override def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit] =
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
Task(blocking(consumer.synchronized(consumer.commitSync(batch.map { case (k, v) =>
k -> new OffsetAndMetadata(v)
}.asJava))))

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava, callback)))
blocking(
consumer.synchronized(
consumer.commitAsync(
batch.map { case (k, v) =>
k -> new OffsetAndMetadata(v)
}.asJava,
callback)))
}
}

Expand Down
33 changes: 18 additions & 15 deletions kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import org.apache.kafka.clients.producer.{
RecordMetadata,
Callback => KafkaCallback,
Producer => ApacheProducer,
KafkaProducer => ApacheKafkaProducer}
KafkaProducer => ApacheKafkaProducer
}

import scala.util.control.NonFatal

/**
* Wraps the Kafka Producer.
/** Wraps the Kafka Producer.
*
* Calling `producer.send` returns a `Task[Option[RecordMetadata]]`
* which can then be run and transformed into a `Future`.
Expand All @@ -55,10 +55,10 @@ trait KafkaProducer[K, V] extends Serializable {
object KafkaProducer {

/** Builds a [[KafkaProducer]] instance. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(
implicit K: Serializer[K],
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit
K: Serializer[K],
V: Serializer[V]): KafkaProducer[K, V] = {
val producerRef: Coeval[ApacheProducer[K, V]] = Coeval.evalOnce {
val producerRef: Coeval[ApacheProducer[K, V]] = Coeval {
val keySerializer = K.create()
val valueSerializer = V.create()
val configJavaMap = config.toJavaMap
Expand All @@ -70,21 +70,24 @@ object KafkaProducer {
}

/** Builds a [[KafkaProducer]] instance with provided Apache Producer. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(
implicit K: Serializer[K],
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(implicit
K: Serializer[K],
V: Serializer[V]): KafkaProducer[K, V] = {
new Implementation[K, V](config, sc, producerRef)
}

private final class Implementation[K, V](config: KafkaProducerConfig, sc: Scheduler, producerRef: Coeval[ApacheProducer[K, V]])(
implicit K: Serializer[K],
V: Serializer[V])
private final class Implementation[K, V](
config: KafkaProducerConfig,
sc: Scheduler,
producer: Coeval[ApacheProducer[K, V]])(implicit K: Serializer[K], V: Serializer[V])
extends KafkaProducer[K, V] with StrictLogging {

private val isCanceled = Atomic(false)

lazy val producerRef = producer.value()

def underlying: Task[ApacheProducer[K, V]] =
producerRef.to[Task]
Task.eval(producerRef)

def send(topic: String, value: V): Task[Option[RecordMetadata]] =
send(new ProducerRecord[K, V](topic, value))
Expand All @@ -104,8 +107,8 @@ object KafkaProducer {
val isActive = Atomic(true)
val cancelable = SingleAssignCancelable()
try {
// Using asynchronous API
val future = producerRef.value().send(
// Using asynchronous API
val future = producerRef.send(
record,
new KafkaCallback {
def onCompletion(meta: RecordMetadata, exception: Exception): Unit =
Expand Down Expand Up @@ -153,7 +156,7 @@ object KafkaProducer {
asyncCb.onSuccess(())
} else {
try {
producerRef.value().close()
producerRef.close()
asyncCb.onSuccess(())
} catch {
case NonFatal(ex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ object KafkaProducerSink extends StrictLogging {
}

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(
implicit K: Serializer[K],
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler)(implicit
K: Serializer[K],
V: Serializer[V]): KafkaProducerSink[K, V] = apply(config, sc, onSendErrorDefault)

/** Builder for [[KafkaProducerSink]]. */
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(
implicit K: Serializer[K],
def apply[K, V](config: KafkaProducerConfig, sc: Scheduler, onSendError: Throwable => Task[Ack])(implicit
K: Serializer[K],
V: Serializer[V]): KafkaProducerSink[K, V] = {

val producer = Coeval(KafkaProducer[K, V](config, sc))
Expand Down
5 changes: 2 additions & 3 deletions kafka-0.11.x/src/main/scala/monix/kafka/Commit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import monix.eval.Task
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.OffsetCommitCallback

/**
* Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context.
* */
/** Callback for batched commit realized as closure in [[KafkaConsumerObservable]] context.
*/
trait Commit {
def commitBatchSync(batch: Map[TopicPartition, Long]): Task[Unit]
def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package monix.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord

/**
* Represents data consumed from Kafka and [[CommittableOffset]] built from it
* */
/** Represents data consumed from Kafka and [[CommittableOffset]] built from it
*/
final case class CommittableMessage[K, V](record: ConsumerRecord[K, V], committableOffset: CommittableOffset)
Loading

0 comments on commit c7231f9

Please sign in to comment.