diff --git a/build.sbt b/build.sbt index a56dc7cf..fbe95e66 100644 --- a/build.sbt +++ b/build.sbt @@ -116,6 +116,8 @@ lazy val sharedSettings = Seq( testForkedParallel in IntegrationTest := false, concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), + licenses := Seq("APL2" -> url("http://www.apache.org/licenses/LICENSE-2.0.txt")), + homepage := Some(url("https://github.com/monix/monix-kafka")), headerLicense := Some(HeaderLicense.Custom( """|Copyright (c) 2014-2018 by The Monix Project Developers. | @@ -132,9 +134,28 @@ lazy val sharedSettings = Seq( |limitations under the License.""" .stripMargin)), + scmInfo := Some( + ScmInfo( + url("https://github.com/monix/monix-kafka"), + "scm:git@github.com:monix/monix-kafka.git" + )), + + developers := List( + Developer( + id="alexelcu", + name="Alexandru Nedelcu", + email="noreply@alexn.org", + url=url("https://alexn.org") + ), + Developer( + id="pgawrys", + name="Piotr Gawryƛ", + email="pgawrys2@gmail.com", + url=url("https://github.com/Avasil") + )), + // -- Settings meant for deployment on oss.sonatype.org publishMavenStyle := true, - publishTo := { val nexus = "https://oss.sonatype.org/" if (isSnapshot.value) @@ -144,28 +165,7 @@ lazy val sharedSettings = Seq( }, publishArtifact in Test := false, - pomIncludeRepository := { _ => false }, // removes optional dependencies - - pomExtra := - https://github.com/monixio/monix-kafka/ - - - Apache License, Version 2.0 - https://www.apache.org/licenses/LICENSE-2.0 - repo - - - - git@github.com:monixio/monix-kafka.git - scm:git:git@github.com:monixio/monix-kafka.git - - - - alexelcu - Alexandru Nedelcu - https://alexn.org/ - - + pomIncludeRepository := { _ => false } // removes optional dependencies ) def mimaSettings(projectName: String) = Seq( diff --git a/kafka-0.8.x/src/main/resources/monix/kafka/default.conf b/kafka-0.8.x/src/main/resources/monix/kafka/default.conf deleted file mode 100644 index 3669b9e5..00000000 --- a/kafka-0.8.x/src/main/resources/monix/kafka/default.conf +++ /dev/null @@ -1,68 +0,0 @@ -kafka { - - # Producer specific settings - - bootstrap.servers = "localhost:9092" - acks = "1" - buffer.memory = 33554432 - compression.type = "none" - retries = 0 - - batch.size = 16384 - client.id = "" - - linger.ms = 0 - max.request.size = 1048576 - - receive.buffer.bytes = 32768 - send.buffer.bytes = 131072 - - timeout.ms = 30000 - - block.on.buffer.full = true - - metadata.fetch.timeout.ms = 60000 - metadata.max.age.ms = 300000 - - reconnect.backoff.ms = 10 - retry.backoff.ms = 100 - - # Consumer specific settings - - group.id = "" - zookeeper.connect = "" - - consumer.id = null - - socket.timeout.ms = 30000 - socket.receive.buffer.bytes = 65536 - fetch.message.max.bytes = 1048576 - num.consumer.fetchers = 1 - auto.commit.enable = true - auto.commit.interval.ms = 60000 - queued.max.message.chunks = 2 - rebalance.max.retries = 4 - fetch.min.bytes = 1 - fetch.wait.max.ms = 100 - rebalance.backoff.ms = 2000 - refresh.leader.backoff.ms = 200 - auto.offset.reset = "largest" - consumer.timeout.ms = -1 - exclude.internal.topics = true - partition.assignment.strategy = "range" - zookeeper.session.timeout.ms = 6000 - zookeeper.connection.timeout.ms = 6000 - zookeeper.sync.time.ms = 2000 - offsets.storage = "zookeeper" - offsets.channel.backoff.ms = 1000 - offsets.channel.socket.timeout.ms = 10000 - offsets.commit.max.retries = 5 - dual.commit.enabled = true - - # Monix specific settings - - # Number of requests that KafkaProducerSink - # can push in parallel - monix.producer.sink.parallelism = 100 - -} \ No newline at end of file diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/Deserializer.scala b/kafka-0.8.x/src/main/scala/monix/kafka/Deserializer.scala deleted file mode 100644 index 81a34289..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/Deserializer.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import kafka.serializer.{DefaultDecoder, StringDecoder, Decoder => KafkaDecoder} - -/** Wraps a Kafka `Decoder`, provided for - * convenience, since it can be implicitly fetched - * from the context. - * - * @param className is the full package path to the Kafka `Decoder` - * - * @param classType is the `java.lang.Class` for [[className]] - * - * @param constructor creates an instance of [[classType]]. - * This is defaulted with a `Deserializer.Constructor[A]` function that creates a - * new instance using an assumed empty or nullable constructor. - * Supplying this parameter allows for manual provision of the `Decoder`. - */ -final case class Deserializer[A]( - className: String, - classType: Class[_ <: KafkaDecoder[A]], - constructor: Deserializer.Constructor[A] = Deserializer.reflectCreate[A] _) { - - /** Creates a new instance. */ - def create(): KafkaDecoder[A] = - constructor(this) -} - -object Deserializer { - - /** Alias for the function that provides an instance of - * the Kafka `Decoder`. - */ - type Constructor[A] = (Deserializer[A]) => KafkaDecoder[A] - - private def reflectCreate[A](d: Deserializer[A]): KafkaDecoder[A] = { - val constructor = d.classType.getDeclaredConstructors()(0) - constructor.getParameterCount match { - case 0 => d.classType.newInstance() - case 1 => constructor.newInstance(null).asInstanceOf[KafkaDecoder[A]] - } - } - - implicit val forStrings: Deserializer[String] = - Deserializer[String]( - className = "kafka.serializer.StringDecoder", - classType = classOf[StringDecoder] - ) - - implicit val forByteArray: Deserializer[Array[Byte]] = - Deserializer[Array[Byte]]( - className = "kafka.serializer.DefaultDecoder", - classType = classOf[DefaultDecoder] - ) -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala b/kafka-0.8.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala deleted file mode 100644 index 5f3f9b39..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import java.io.File -import java.util.Properties -import scala.concurrent.duration._ - -import com.typesafe.config.{Config, ConfigFactory} - -import monix.kafka.config._ - -/** Configuration for Kafka Consumer. - * - * For the official documentation on the available configuration - * options, see - * [[https://kafka.apache.org/082/documentation.html#consumerconfigs Consumer Configs]] - * on `kafka.apache.org`. - * - * @param groupId is the `group.id` setting, a unique string - * that identifies the consumer group this consumer - * belongs to. - * - * @param zookeeperConnect is the `zookeeper.connect` setting, - * a list of host/port pairs to use for establishing - * the initial connection to the Zookeeper cluster. - * - * @param consumerId is the `consumer.id` setting, a unique string - * that identifies the consumer (will be autogenerated if not set). - * - * @param socketTimeout is the `socket.timeout.ms` setting, - * the socket timeout for network requests. - * - * @param socketReceiveBufferInBytes is the `socket.receive.buffer.bytes` - * setting, the size of the socket receive buffer for network requests. - * - * @param fetchMessageMaxBytes is the `fetch.message.max.bytes` - * setting, the maximum amount of data per-partition the - * server will return. - * - * @param numConsumerFetchers is the `num.consumer.fetchers` setting, - * the number of fetcher threads to spawn. - * - * @param autoCommitEnable is the `auto.commit.enable` setting. - * If true the consumer's offset will be periodically committed - * in the background. - * - * @param autoCommitInterval is the `auto.commit.interval.ms` setting, - * the frequency of autocommits. - * - * @param queuedMaxMessageChunks is the `queued.max.message.chunks` - * setting, the maximum number of message chunks that consumer - * may buffer. - * - * @param rebalanceMaxRetries is the `rebalance.max.retries` setting, - * the number of attempts to rebalance the consumer group when a new - * consumer joins. - * - * @param fetchMinBytes is the `fetch.min.bytes` setting, - * the minimum amount of data the server should return - * for a fetch request. - * - * @param fetchWaitMaxTime is the `fetch.wait.max.ms` setting, - * the maximum amount of time the server will block before - * answering the fetch request if there isn't sufficient data to - * immediately satisfy the requirement given by fetch.min.bytes. - * - * @param rebalanceBackoffTime is the `rebalance.backoff.m` setting. - * The amount of time to wait before attempting to rebalance the - * consumer group. - * - * @param refreshLeaderBackoffTime is the `refresh.leader.backoff.ms` - * setting. The amount of time to wait before trying to elect - * a new leader for a consumer group that has lost one. - * - * @param autoOffsetReset is the `auto.offset.reset` setting, - * specifying what to do when there is no initial offset in - * Kafka or if the current offset does not exist any more - * on the server (e.g. because that data has been deleted). - * - * @param consumerTimeout is the `consumer.timeout.ms` setting, - * which specifies the amount of time to wait before throwing - * an exception when there's nothing to consume. - * - * @param excludeInternalTopics is the `exclude.internal.topics` setting. - * Whether records from internal topics (such as offsets) should be - * exposed to the consumer. If set to true the only way to receive - * records from an internal topic is subscribing to it. - * - * @param partitionAssignmentStrategy is the `partition.assignment.strategy` - * setting, which chooses how partitions will be assigned to consumer - * streams (`range` or `roundrobin`). Note that `roundrobin` strategy - * results in a more even load distribution, but will not work when - * consuming from multiple topics. - * - * @param clientId is the `client.id` setting, - * an id string to pass to the server when making requests. - * The purpose of this is to be able to track the source of - * requests beyond just ip/port by allowing a logical application - * name to be included in server-side request logging. - * - * @param zookeeperSessionTimeout is the `zookeeper.session.timeout.ms` - * setting, the maximum amount of time to wait for a heartbeat before - * initiating a rebalance. - * - * @param zookeeperConnectionTimeout is the `zookeeper.connection.timeout.ms` - * setting, the maximum amount of time the client will wait to - * establish a connection to ZooKeeper. - * - * @param zookeeperSyncTime is the `zookeeper.sync.time.ms` setting, - * the maximum lag allowed for ZK followers. - * - * @param offsetsStorage is the `offsets.storage` setting, that controls - * where offsets are stored (`zookeeper` or `kafka`). - * - * @param offsetsChannelBackoffTime is the `offsets.channel.backoff.ms` - * setting, the backoff period when reconnecting the offsets channel - * or retrying failed offset fetch/commit requests. - * - * @param offsetsChannelSocketTimeout is the `offsets.channel.socket.timeout.ms` - * setting. Socket timeout when reading responses for offset fetch/commit - * requests. - * - * @param offsetsCommitMaxRetries is the `offsets.commit.max.retries` setting, - * The maximum amount of retries for commiting the offset. This retry - * count only applies to offset commits during shut-down. It does not - * apply to commits originating from the auto-commit thread. - * - * @param dualCommitEnabled is the `dual.commit.enabled` setting, which - * can be used to dual commit offsets to ZooKeeper if using - * `kafka` as `offsets.storage`. This is required during migration - * from ZooKeeper-based offset storage to Kafka-based offset storage. - * - */ -final case class KafkaConsumerConfig( - groupId: String, - zookeeperConnect: String, - consumerId: String, - socketTimeout: FiniteDuration, - socketReceiveBufferInBytes: Int, - fetchMessageMaxBytes: Int, - numConsumerFetchers: Int, - autoCommitEnable: Boolean, - autoCommitInterval: FiniteDuration, - queuedMaxMessageChunks: Int, - rebalanceMaxRetries: Int, - fetchMinBytes: Int, - fetchWaitMaxTime: FiniteDuration, - rebalanceBackoffTime: FiniteDuration, - refreshLeaderBackoffTime: FiniteDuration, - autoOffsetReset: AutoOffsetReset, - consumerTimeout: FiniteDuration, - excludeInternalTopics: Boolean, - partitionAssignmentStrategy: PartitionAssignmentStrategy, - clientId: String, - zookeeperSessionTimeout: FiniteDuration, - zookeeperConnectionTimeout: FiniteDuration, - zookeeperSyncTime: FiniteDuration, - offsetsStorage: OffsetsStorage, - offsetsChannelBackoffTime: FiniteDuration, - offsetsChannelSocketTimeout: FiniteDuration, - offsetsCommitMaxRetries: Int, - dualCommitEnabled: Boolean) { - - def toMap: Map[String, String] = Map( - "group.id" -> groupId, - "zookeeper.connect" -> zookeeperConnect, - "consumer.id" -> consumerId, - "socket.timeout.ms" -> socketTimeout.toMillis.toString, - "socket.receive.buffer.bytes" -> socketReceiveBufferInBytes.toString, - "fetch.message.max.bytes" -> fetchMessageMaxBytes.toString, - "num.consumer.fetchers" -> numConsumerFetchers.toString, - "auto.commit.enable" -> autoCommitEnable.toString, - "auto.commit.interval.ms" -> autoCommitInterval.toMillis.toString, - "queued.max.message.chunks" -> queuedMaxMessageChunks.toString, - "rebalance.max.retries" -> rebalanceMaxRetries.toString, - "fetch.min.bytes" -> fetchMinBytes.toString, - "fetch.wait.max.ms" -> fetchWaitMaxTime.toMillis.toString, - "rebalance.backoff.ms" -> rebalanceBackoffTime.toMillis.toString, - "refresh.leader.backoff.ms" -> refreshLeaderBackoffTime.toMillis.toString, - "auto.offset.reset" -> autoOffsetReset.id, - "consumer.timeout.ms" -> consumerTimeout.toMillis.toString, - "exclude.internal.topics" -> excludeInternalTopics.toString, - "partition.assignment.strategy" -> partitionAssignmentStrategy.id, - "client.id" -> clientId, - "zookeeper.session.timeout.ms" -> zookeeperSessionTimeout.toMillis.toString, - "zookeeper.connection.timeout.ms" -> zookeeperConnectionTimeout.toMillis.toString, - "zookeeper.sync.time.ms" -> zookeeperSyncTime.toMillis.toString, - "offsets.storage" -> offsetsStorage.id, - "offsets.channel.backoff.ms" -> offsetsChannelBackoffTime.toMillis.toString, - "offsets.channel.socket.timeout.ms" -> offsetsChannelSocketTimeout.toMillis.toString, - "offsets.commit.max.retries" -> offsetsCommitMaxRetries.toString, - "dual.commit.enabled" -> dualCommitEnabled.toString - ) - - def toProperties: Properties = { - val props = new Properties() - for ((k, v) <- toMap; if v != null) props.put(k, v) - props - } -} - -object KafkaConsumerConfig { - private val defaultRootPath = "kafka" - - lazy private val defaultConf: Config = - ConfigFactory.load("monix/kafka/default.conf").getConfig(defaultRootPath) - - /** Returns the default configuration, specified the `monix-kafka` project - * in `monix/kafka/default.conf`. - */ - lazy val default: KafkaConsumerConfig = - apply(defaultConf, includeDefaults = false) - - /** Loads the [[KafkaConsumerConfig]] either from a file path or - * from a resource, if `config.file` or `config.resource` are - * defined, or otherwise returns the default config. - * - * If you want to specify a `config.file`, you can configure the - * Java process on execution like so: - * {{{ - * java -Dconfig.file=/path/to/application.conf - * }}} - * - * Or if you want to specify a `config.resource` to be loaded - * from the executable's distributed JAR or classpath: - * {{{ - * java -Dconfig.resource=com/company/mySpecial.conf - * }}} - * - * In case neither of these are specified, then the configuration - * loaded is the default one, from the `monix-kafka` project, specified - * in `monix/kafka/default.conf`. - */ - def load(): KafkaConsumerConfig = - Option(System.getProperty("config.file")).map(f => new File(f)) match { - case Some(file) if file.exists() => - loadFile(file) - case None => - Option(System.getProperty("config.resource")) match { - case Some(resource) => - loadResource(resource) - case None => - default - } - } - - /** Loads a [[KafkaConsumerConfig]] from a project resource. - * - * @param resourceBaseName is the resource from where to load the config - * @param rootPath is the config root path (e.g. `kafka`) - * @param includeDefaults should be `true` in case you want to fallback - * to the default values provided by the `monix-kafka` library - * in `monix/kafka/default.conf` - */ - def loadResource(resourceBaseName: String, rootPath: String = defaultRootPath, includeDefaults: Boolean = true): KafkaConsumerConfig = - apply(ConfigFactory.load(resourceBaseName).getConfig(rootPath), includeDefaults) - - /** Loads a [[KafkaConsumerConfig]] from a specified file. - * - * @param file is the configuration path from where to load the config - * @param rootPath is the config root path (e.g. `kafka`) - * @param includeDefaults should be `true` in case you want to fallback - * to the default values provided by the `monix-kafka` library - * in `monix/kafka/default.conf` - */ - def loadFile(file: File, rootPath: String = defaultRootPath, includeDefaults: Boolean = true): KafkaConsumerConfig = - apply(ConfigFactory.parseFile(file).resolve().getConfig(rootPath), includeDefaults) - - /** Loads the [[KafkaConsumerConfig]] from a parsed - * `com.typesafe.config.Config` reference. - * - * NOTE that this method doesn't assume any path prefix for loading the - * configuration settings, so it does NOT assume a root path like `kafka`. - * In case case you need that, you can always do: - * - * {{{ - * KafkaConsumerConfig(globalConfig.getConfig("kafka")) - * }}} - * - * @param source is the typesafe `Config` object to read from - * @param includeDefaults should be `true` in case you want to fallback - * to the default values provided by the `monix-kafka` library - * in `monix/kafka/default.conf` - */ - def apply(source: Config, includeDefaults: Boolean = true): KafkaConsumerConfig = { - val config = if (!includeDefaults) source else source.withFallback(defaultConf) - - KafkaConsumerConfig( - groupId = config.getString("group.id"), - zookeeperConnect = config.getString("zookeeper.connect"), - consumerId = if (config.getIsNull("consumer.id")) null else config.getString("consumer.id"), - socketTimeout = config.getInt("socket.timeout.ms").millis, - socketReceiveBufferInBytes = config.getInt("socket.receive.buffer.bytes"), - fetchMessageMaxBytes = config.getInt("fetch.message.max.bytes"), - numConsumerFetchers = config.getInt("num.consumer.fetchers"), - autoCommitEnable = config.getBoolean("auto.commit.enable"), - autoCommitInterval = config.getInt("auto.commit.interval.ms").millis, - queuedMaxMessageChunks = config.getInt("queued.max.message.chunks"), - rebalanceMaxRetries = config.getInt("rebalance.max.retries"), - fetchMinBytes = config.getInt("fetch.min.bytes"), - fetchWaitMaxTime = config.getInt("fetch.wait.max.ms").millis, - rebalanceBackoffTime = config.getInt("rebalance.backoff.ms").millis, - refreshLeaderBackoffTime = config.getInt("refresh.leader.backoff.ms").millis, - autoOffsetReset = AutoOffsetReset(config.getString("auto.offset.reset")), - consumerTimeout = config.getInt("consumer.timeout.ms").millis, - excludeInternalTopics = config.getBoolean("exclude.internal.topics"), - partitionAssignmentStrategy = PartitionAssignmentStrategy(config.getString("partition.assignment.strategy")), - clientId = config.getString("client.id"), - zookeeperSessionTimeout = config.getInt("zookeeper.session.timeout.ms").millis, - zookeeperConnectionTimeout = config.getInt("zookeeper.connection.timeout.ms").millis, - zookeeperSyncTime = config.getInt("zookeeper.sync.time.ms").millis, - offsetsStorage = OffsetsStorage(config.getString("offsets.storage")), - offsetsChannelBackoffTime = config.getInt("offsets.channel.backoff.ms").millis, - offsetsChannelSocketTimeout = config.getInt("offsets.channel.socket.timeout.ms").millis, - offsetsCommitMaxRetries = config.getInt("offsets.commit.max.retries"), - dualCommitEnabled = config.getBoolean("dual.commit.enabled") - ) - } -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala b/kafka-0.8.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala deleted file mode 100644 index ec6a802b..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaConsumerObservable.scala +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import kafka.consumer._ -import kafka.message.MessageAndMetadata -import monix.eval.{Callback, Task} -import monix.execution.Cancelable -import monix.execution.cancelables.{BooleanCancelable, MultiAssignmentCancelable} -import monix.reactive.Observable -import monix.reactive.observers.Subscriber -import scala.concurrent.blocking -import scala.util.control.NonFatal - -/** Exposes an `Observable` that consumes a Kafka stream by - * means of a Kafka Consumer client. - * - * In order to get initialized, it needs a configuration. See the - * [[KafkaConsumerConfig]] needed and see `monix/kafka/default.conf`, - * (in the resource files) that is exposing all default values. - */ -final class KafkaConsumerObservable[K, V] private - (config: KafkaConsumerConfig, consumer: Task[ConsumerConnector], topicMap: Map[String, Int]) - (implicit K: Deserializer[K], V: Deserializer[V]) - extends Observable[MessageAndMetadata[K,V]] { - - def unsafeSubscribeFn(out: Subscriber[MessageAndMetadata[K, V]]): Cancelable = { - import out.scheduler - - val conn = MultiAssignmentCancelable() - val callback = new Callback[ConsumerConnector] { - def onError(ex: Throwable): Unit = - out.onError(ex) - def onSuccess(value: ConsumerConnector): Unit = - init(value, conn, out) - } - - val c = consumer.runAsync(callback) - conn.orderedUpdate(c, order=1) - } - - private def init( - connector: ConsumerConnector, - conn: MultiAssignmentCancelable, - out: Subscriber[MessageAndMetadata[K, V]]): Unit = { - - val streamsMap = connector.createMessageStreams( - topicCountMap = topicMap, - keyDecoder = K.create(), - valueDecoder = V.create() - ) - - val streamsList = streamsMap.values.flatten.toList - val rawStream = streamsList match { - case Nil => - // Optimization for an empty sequence - Observable.empty - case s :: Nil => - // Optimization when subscribed to a single topic - Observable.fromIterator(streamToIterator(s, conn)) - case list => - // Got a list of kafka streams, at least one per topic, so merge - Observable.fromIterator(list.iterator).mergeMap { s => - Observable.fromIterator(streamToIterator(s, conn)) - // Each stream needs to start on its own thread, so fork - .executeWithFork - } - } - - - val connectorSubscription = Cancelable(() => - out.scheduler.executeAsync { () => - blocking { - connector.shutdown() - } - }) - - val cancelable = rawStream - // In case of error or completion we need to close the connector - .doOnTerminate(_ => connectorSubscription.cancel()) - // This might be problematic if the shutdown isn't thread-safe: - .doOnSubscriptionCancel(() => connectorSubscription.cancel()) - // Ensuring we have an asynchronous boundary - .executeWithFork - .unsafeSubscribeFn(out) - - conn.orderedUpdate(cancelable, order = 2) - } - - /** Wraps a `KafkaStream` into an `Iterator` implementation that uses - * Scala's `blocking` context when calling `hasNext` and `next`, - * thus making it possible to play well with Scala's `global`. - */ - private def streamToIterator(source: KafkaStream[K,V], conn: BooleanCancelable): Iterator[MessageAndMetadata[K,V]] = - new Iterator[MessageAndMetadata[K,V]] { - private[this] val stream = source.iterator() - - def hasNext: Boolean = { - // The hasNext operation is blocking until there is an element available - try blocking(stream.hasNext()) catch { - case NonFatal(ex) => - if (conn.isCanceled) { - // If the connection was cancelled, ignore the error! - false - } else { - throw ex - } - } - } - - def next(): MessageAndMetadata[K, V] = { - // Unfortunately we cannot catch errors here, however it should be - // OK since `hasNext` is the method that advances to the next element - blocking(stream.next()) - } - } -} - -object KafkaConsumerObservable { - /** Builds a [[KafkaConsumerObservable]] instance. - * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * - * @param topicsMap is the list of Kafka topics to subscribe to. - * - * @param connector is a factory for the `kafka.consumer.ConsumerConnector` - * instance to use for consuming from Kafka - */ - def apply[K,V]( - cfg: KafkaConsumerConfig, - connector: Task[ConsumerConnector], - topicsMap: Map[String, Int]) - (implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K,V] = - new KafkaConsumerObservable[K,V](cfg, connector, topicsMap) - - /** Builds a [[KafkaConsumerObservable]] instance. - * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * - * @param topics is the list of Kafka topics to subscribe to. - * - * @param connector is a factory for the `kafka.consumer.ConsumerConnector` - * instance to use for consuming from Kafka - */ - def apply[K,V]( - cfg: KafkaConsumerConfig, - connector: Task[ConsumerConnector], - topics: List[String]) - (implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K,V] = - new KafkaConsumerObservable[K,V](cfg, connector, topics.map(_ -> 1).toMap) - - /** Builds a [[KafkaConsumerObservable]] instance. - * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * - * @param topicsMap is the list of Kafka topics to subscribe to. - */ - def apply[K,V](cfg: KafkaConsumerConfig, topicsMap: Map[String, Int]) - (implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K,V] = { - - val consumer = createConnector[K,V](cfg) - apply(cfg, consumer, topicsMap) - } - - /** Builds a [[KafkaConsumerObservable]] instance. - * - * @param cfg is the [[KafkaConsumerConfig]] needed for initializing the - * consumer; also make sure to see `monix/kafka/default.conf` for - * the default values being used. - * - * @param topics is the list of Kafka topics to subscribe to. - */ - def apply[K,V](cfg: KafkaConsumerConfig, topics: List[String]) - (implicit K: Deserializer[K], V: Deserializer[V]): KafkaConsumerObservable[K,V] = - apply(cfg, topics.map(_ -> 1).toMap)(K,V) - - /** Returns a `Task` for creating a consumer instance. */ - def createConnector[K,V](config: KafkaConsumerConfig) - (implicit K: Deserializer[K], V: Deserializer[V]): Task[ConsumerConnector] = { - - Task { - blocking { - Consumer.create(new ConsumerConfig(config.toProperties)) - } - } - } -} \ No newline at end of file diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducer.scala b/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducer.scala deleted file mode 100644 index 6775c044..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducer.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import com.typesafe.scalalogging.StrictLogging -import monix.eval.Task -import monix.execution.atomic.Atomic -import monix.execution.cancelables.SingleAssignmentCancelable -import monix.execution.{Cancelable, Scheduler} -import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata, KafkaProducer => ApacheKafkaProducer} - -import scala.util.control.NonFatal - -/** Wraps the Kafka Producer. */ -trait KafkaProducer[K,V] extends Serializable { - def send(topic: String, value: V): Task[RecordMetadata] - def send(topic: String, key: K, value: V): Task[RecordMetadata] - def send(record: ProducerRecord[K,V]): Task[RecordMetadata] - def close(): Task[Unit] -} - -object KafkaProducer { - /** Builds a [[KafkaProducer]] instance. */ - def apply[K,V](config: KafkaProducerConfig, io: Scheduler) - (implicit K: Serializer[K], V: Serializer[V]): KafkaProducer[K,V] = - new Implementation[K,V](config, io) - - private final class Implementation[K,V](config: KafkaProducerConfig, io: Scheduler) - (implicit K: Serializer[K], V: Serializer[V]) - extends KafkaProducer[K,V] with StrictLogging { - - // Alias needed for being precise when blocking - self => - - // MUST BE synchronized by `self` - private[this] var isCanceled = false - - // Gets initialized on the first `send` - private lazy val producerRef = { - logger.info(s"Kafka producer connecting to servers: ${config.bootstrapServers.mkString(",")}") - new ApacheKafkaProducer[K,V](config.toProperties, K.create(), V.create()) - } - - def send(topic: String, value: V): Task[RecordMetadata] = - send(new ProducerRecord[K,V](topic, value)) - - def send(topic: String, key: K, value: V): Task[RecordMetadata] = - send(new ProducerRecord[K,V](topic, key, value)) - - def send(record: ProducerRecord[K,V]): Task[RecordMetadata] = - Task.unsafeCreate[RecordMetadata] { (context, cb) => - // Forcing asynchronous boundary on the I/O scheduler! - io.executeAsync(() => self.synchronized { - val s = context.scheduler - if (!isCanceled) { - val isActive = Atomic(true) - val cancelable = SingleAssignmentCancelable() - context.connection.push(cancelable) - - try { - // Force evaluation - val producer = producerRef - - // Using asynchronous API - val future = producer.send(record, new Callback { - def onCompletion(meta: RecordMetadata, exception: Exception): Unit = - if (isActive.getAndSet(false)) { - context.connection.pop() - if (exception != null) - cb.asyncOnError(exception)(s) - else - cb.asyncOnSuccess(meta)(s) - } else if (exception != null) { - s.reportFailure(exception) - } - }) - - cancelable := Cancelable(() => future.cancel(false)) - } - catch { - case NonFatal(ex) => - // Needs synchronization, otherwise we are violating the contract - if (isActive.compareAndSet(expect = true, update = false)) { - context.connection.pop() - cb.asyncOnError(ex)(s) - } else { - s.reportFailure(ex) - } - } - } else { - val ex = new IllegalStateException("KafkaProducer connection is closed") - cb.asyncOnError(ex)(s) - } - }) - } - - def close(): Task[Unit] = - Task.unsafeCreate { (context, cb) => - // Forcing asynchronous boundary on I/O scheduler! - io.executeAsync { () => - self.synchronized { - val s = context.scheduler - if (isCanceled) { - cb.asyncOnSuccess(())(s) - } else { - isCanceled = true - try { - producerRef.close() - cb.asyncOnSuccess(())(s) - } catch { - case NonFatal(ex) => - cb.asyncOnError(ex)(s) - } - } - } - } - } - } -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala b/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala deleted file mode 100644 index b9715390..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import java.io.File -import java.util.Properties -import scala.concurrent.duration._ - -import com.typesafe.config.{Config, ConfigFactory} - -import monix.kafka.config._ - -/** The Kafka Producer config. - * - * For the official documentation on the available configuration - * options, see - * [[https://kafka.apache.org/082/documentation.html#newproducerconfigs Producer Configs]] - * on `kafka.apache.org`. - * - * @param bootstrapServers is the `bootstrap.servers` setting - * and represents the list of servers to connect to. - * - * @param acks is the `acks` setting and represents - * the number of acknowledgments the producer requires the leader to - * have received before considering a request complete. - * See [[monix.kafka.config.Acks Acks]]. - * - * @param bufferMemoryInBytes is the `buffer.memory` setting and - * represents the total bytes of memory the producer - * can use to buffer records waiting to be sent to the server. - * - * @param compressionType is the `compression.type` setting and specifies - * what compression algorithm to apply to all the generated data - * by the producer. The default is none (no compression applied). - * - * @param retries is the `retries` setting. A value greater than zero will - * cause the client to resend any record whose send fails with - * a potentially transient error. - * - * @param batchSizeInBytes is the `batch.size` setting. - * The producer will attempt to batch records together into fewer - * requests whenever multiple records are being sent to the - * same partition. This setting specifies the maximum number of - * records to batch together. - * - * @param clientId is the `client.id` setting, - * an id string to pass to the server when making requests. - * The purpose of this is to be able to track the source of - * requests beyond just ip/port by allowing a logical application - * name to be included in server-side request logging. - * - * @param lingerTime is the `linger.ms` setting - * and specifies to buffer records for more efficient batching, - * up to the maximum batch size or for the maximum `lingerTime`. - * If zero, then no buffering will happen, but if different - * from zero, then records will be delayed in absence of load. - * - * @param maxRequestSizeInBytes is the `max.request.size` setting - * and represents the maximum size of a request in bytes. - * This is also effectively a cap on the maximum record size. - * - * @param receiveBufferInBytes is the `receive.buffer.bytes` setting - * being the size of the TCP receive buffer (SO_RCVBUF) to use - * when reading data. - * - * @param sendBufferInBytes is the `send.buffer.bytes` setting, - * being the size of the TCP send buffer (SO_SNDBUF) to use - * when sending data. - * - * @param timeout is the `timeout.ms` setting, - * a configuration the controls the maximum amount of time - * the server will wait for acknowledgments from followers to meet - * the acknowledgment requirements the producer has specified with - * the `acks` configuration. - * - * @param blockOnBufferFull is the `block.on.buffer.full` setting, - * which controls whether producer stops accepting new - * records (blocks) or throws errors when the memory buffer - * is exhausted. - * - * @param metadataFetchTimeout is the `metadata.fetch.timeout.ms` setting. - * The period of time in milliseconds after which we force a - * refresh of metadata even if we haven't seen any partition - * leadership changes to proactively discover any new brokers - * or partitions. - * - * @param metadataMaxAge is the `metadata.max.age.ms` setting. - * The period of time in milliseconds after which we force a - * refresh of metadata even if we haven't seen any partition - * leadership changes to proactively discover any new brokers - * or partitions. - * - * @param reconnectBackoffTime is the `reconnect.backoff.ms` setting. - * The amount of time to wait before attempting to reconnect to a - * given host. This avoids repeatedly connecting to a host in a - * tight loop. This backoff applies to all requests sent by the - * consumer to the broker. - * - * @param retryBackoffTime is the `retry.backoff.ms` setting. - * The amount of time to wait before attempting to retry a failed - * request to a given topic partition. This avoids repeatedly - * sending requests in a tight loop under some failure scenarios. - * - * @param monixSinkParallelism is the `monix.producer.sink.parallelism` - * setting indicating how many requests the [[KafkaProducerSink]] - * can execute in parallel. - */ -case class KafkaProducerConfig( - bootstrapServers: List[String], - acks: Acks, - bufferMemoryInBytes: Int, - compressionType: CompressionType, - retries: Int, - batchSizeInBytes: Int, - clientId: String, - lingerTime: FiniteDuration, - maxRequestSizeInBytes: Int, - receiveBufferInBytes: Int, - sendBufferInBytes: Int, - timeout: FiniteDuration, - blockOnBufferFull: Boolean, - metadataFetchTimeout: FiniteDuration, - metadataMaxAge: FiniteDuration, - reconnectBackoffTime: FiniteDuration, - retryBackoffTime: FiniteDuration, - monixSinkParallelism: Int) { - - def toProperties: Properties = { - val props = new Properties() - for ((k, v) <- toMap; if v != null) props.put(k, v) - props - } - - def toMap: Map[String, String] = Map( - "bootstrap.servers" -> bootstrapServers.mkString(","), - "acks" -> acks.id, - "buffer.memory" -> bufferMemoryInBytes.toString, - "compression.type" -> compressionType.id, - "retries" -> retries.toString, - "batch.size" -> batchSizeInBytes.toString, - "client.id" -> clientId, - "linger.ms" -> lingerTime.toMillis.toString, - "max.request.size" -> maxRequestSizeInBytes.toString, - "receive.buffer.bytes" -> receiveBufferInBytes.toString, - "send.buffer.bytes" -> sendBufferInBytes.toString, - "timeout.ms" -> timeout.toMillis.toString, - "block.on.buffer.full" -> blockOnBufferFull.toString, - "metadata.fetch.timeout.ms" -> metadataFetchTimeout.toMillis.toString, - "metadata.max.age.ms" -> metadataMaxAge.toMillis.toString, - "reconnect.backoff.ms" -> reconnectBackoffTime.toMillis.toString, - "retry.backoff.ms" -> retryBackoffTime.toMillis.toString - ) -} - -object KafkaProducerConfig { - private val defaultRootPath = "kafka" - - lazy private val defaultConf: Config = - ConfigFactory.load("monix/kafka/default.conf").getConfig(defaultRootPath) - - /** Returns the default configuration, specified the `monix-kafka` project - * in `monix/kafka/default.conf`. - */ - lazy val default: KafkaProducerConfig = - apply(defaultConf, includeDefaults = false) - - /** Loads the [[KafkaProducerConfig]] either from a file path or - * from a resource, if `config.file` or `config.resource` are - * defined, or otherwise returns the default config. - * - * If you want to specify a `config.file`, you can configure the - * Java process on execution like so: - * {{{ - * java -Dconfig.file=/path/to/application.conf - * }}} - * - * Or if you want to specify a `config.resource` to be loaded - * from the executable's distributed JAR or classpath: - * {{{ - * java -Dconfig.resource=com/company/mySpecial.conf - * }}} - * - * In case neither of these are specified, then the configuration - * loaded is the default one, from the `monix-kafka` project, specified - * in `monix/kafka/default.conf`. - */ - def load(): KafkaProducerConfig = - Option(System.getProperty("config.file")).map(f => new File(f)) match { - case Some(file) if file.exists() => - loadFile(file) - case None => - Option(System.getProperty("config.resource")) match { - case Some(resource) => - loadResource(resource) - case None => - default - } - } - - /** Loads a [[KafkaProducerConfig]] from a project resource. - * - * @param resourceBaseName is the resource from where to load the config - * @param rootPath is the config root path (e.g. `kafka`) - * @param includeDefaults should be `true` in case you want to fallback - * to the default values provided by the `monix-kafka` library - * in `monix/kafka/default.conf` - */ - def loadResource(resourceBaseName: String, rootPath: String = defaultRootPath, includeDefaults: Boolean = true): KafkaProducerConfig = - apply(ConfigFactory.load(resourceBaseName).getConfig(rootPath), includeDefaults) - - /** Loads a [[KafkaProducerConfig]] from a specified file. - * - * @param file is the configuration path from where to load the config - * @param rootPath is the config root path (e.g. `kafka`) - * @param includeDefaults should be `true` in case you want to fallback - * to the default values provided by the `monix-kafka` library - * in `monix/kafka/default.conf` - */ - def loadFile(file: File, rootPath: String = defaultRootPath, includeDefaults: Boolean = true): KafkaProducerConfig = - apply(ConfigFactory.parseFile(file).resolve().getConfig(rootPath), includeDefaults) - - /** Loads the [[KafkaProducerConfig]] from a parsed - * `com.typesafe.config.Config` reference. - * - * NOTE that this method doesn't assume any path prefix for loading the - * configuration settings, so it does NOT assume a root path like `kafka`. - * In case case you need that, you can always do: - * - * {{{ - * KafkaProducerConfig(globalConfig.getConfig("kafka")) - * }}} - * - * @param source is the typesafe `Config` object to read from - * @param includeDefaults should be `true` in case you want to fallback - * to the default values provided by the `monix-kafka` library - * in `monix/kafka/default.conf` - */ - def apply(source: Config, includeDefaults: Boolean = true): KafkaProducerConfig = { - val config = if (!includeDefaults) source else source.withFallback(defaultConf) - - KafkaProducerConfig( - bootstrapServers = config.getString("bootstrap.servers").trim.split("\\s*,\\s*").toList, - acks = Acks(config.getString("acks")), - bufferMemoryInBytes = config.getInt("buffer.memory"), - compressionType = CompressionType(config.getString("compression.type")), - retries = config.getInt("retries"), - batchSizeInBytes = config.getInt("batch.size"), - clientId = config.getString("client.id"), - lingerTime = config.getInt("linger.ms").millis, - maxRequestSizeInBytes = config.getInt("max.request.size"), - receiveBufferInBytes = config.getInt("receive.buffer.bytes"), - sendBufferInBytes = config.getInt("send.buffer.bytes"), - timeout = config.getInt("timeout.ms").millis, - blockOnBufferFull = config.getBoolean("block.on.buffer.full"), - metadataFetchTimeout = config.getInt("metadata.fetch.timeout.ms").millis, - metadataMaxAge = config.getInt("metadata.max.age.ms").millis, - reconnectBackoffTime = config.getInt("reconnect.backoff.ms").millis, - retryBackoffTime = config.getInt("retry.backoff.ms").millis, - monixSinkParallelism = config.getInt("monix.producer.sink.parallelism") - ) - } -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducerSink.scala deleted file mode 100644 index 4e36e97c..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import com.typesafe.scalalogging.StrictLogging -import monix.eval.{Callback, Coeval, Task} -import monix.execution.Ack.{Continue, Stop} -import monix.execution.cancelables.AssignableCancelable -import monix.execution.{Ack, Scheduler} -import monix.reactive.Consumer -import monix.reactive.observers.Subscriber -import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata} -import scala.concurrent.Future -import scala.util.control.NonFatal -import scala.util.{Failure, Success} - -/** A `monix.reactive.Consumer` that pushes incoming messages into - * a [[KafkaProducer]]. - */ -final class KafkaProducerSink[K,V] private ( - producer: Coeval[KafkaProducer[K,V]], - shouldTerminate: Boolean, - parallelism: Int) - extends Consumer[Seq[ProducerRecord[K,V]], Unit] - with StrictLogging with Serializable { - - require(parallelism >= 1, "parallelism >= 1") - - def createSubscriber(cb: Callback[Unit], s: Scheduler) = { - val out = new Subscriber[Seq[ProducerRecord[K,V]]] { self => - implicit val scheduler = s - private[this] val p = producer.memoize - private[this] var isActive = true - - private def sendAll(batch: Seq[ProducerRecord[K,V]]): Seq[Task[RecordMetadata]] = - for (record <- batch) yield { - try p.value.send(record) - catch { case NonFatal(ex) => Task.raiseError(ex) } - } - - def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] = - self.synchronized { - if (!isActive) Stop else { - val sendTask: Task[Seq[RecordMetadata]] = - if (parallelism == 1) - Task.sequence(sendAll(list)) - else { - val batches = list.sliding(parallelism, parallelism) - val tasks = for (b <- batches) yield Task.gather(sendAll(b)) - Task.sequence(tasks.toList).map(_.flatten) - } - - val recovered = sendTask.map(_ => Continue).onErrorHandle { ex => - logger.error("Unexpected error in KafkaProducerSink", ex) - Continue - } - - recovered.runAsync - } - } - - def terminate(cb: => Unit): Unit = - self.synchronized { - if (isActive) { - isActive = false - - if (!shouldTerminate) cb else - Task(p.value.close()).flatten.materialize.runAsync.foreach { - case Success(_) => cb - case Failure(ex) => - logger.error("Unexpected error in KafkaProducerSink", ex) - cb - } - } - } - - def onError(ex: Throwable): Unit = - terminate(cb.onError(ex)) - def onComplete(): Unit = - terminate(cb.onSuccess(())) - } - - (out, AssignableCancelable.dummy) - } -} - -object KafkaProducerSink { - /** Builder for [[KafkaProducerSink]]. */ - def apply[K,V](config: KafkaProducerConfig, io: Scheduler) - (implicit K: Serializer[K], V: Serializer[V]): KafkaProducerSink[K,V] = { - - val producer = Coeval(KafkaProducer[K,V](config, io)) - new KafkaProducerSink(producer, shouldTerminate = true, - parallelism = config.monixSinkParallelism) - } - - /** Builder for [[KafkaProducerSink]]. */ - def apply[K,V](producer: Coeval[KafkaProducer[K,V]], parallelism: Int): KafkaProducerSink[K,V] = - new KafkaProducerSink(producer, shouldTerminate = false, - parallelism = parallelism) -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/Serializer.scala b/kafka-0.8.x/src/main/scala/monix/kafka/Serializer.scala deleted file mode 100644 index af0db995..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/Serializer.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import org.apache.kafka.common.serialization._ -import org.apache.kafka.common.serialization.{Serializer => KafkaSerializer} - -/** Wraps a Kafka `Serializer`, provided for - * convenience, since it can be implicitly fetched - * from the context. - * - * @param className is the full package path to the Kafka `Serializer` - * - * @param classType is the `java.lang.Class` for [[className]] - * - * @param constructor creates an instance of [[classType]]. - * This is defaulted with a `Serializer.Constructor[A]` function that creates a - * new instance using an assumed empty constructor. - * Supplying this parameter allows for manual provision of the `Serializer`. - */ -final case class Serializer[A]( - className: String, - classType: Class[_ <: KafkaSerializer[A]], - constructor: Serializer.Constructor[A] = (s: Serializer[A]) => s.classType.newInstance()) { - - /** Creates a new instance. */ - def create(): KafkaSerializer[A] = - constructor(this) -} - -object Serializer { - - /** Alias for the function that provides an instance of - * the Kafka `Serializer`. - */ - type Constructor[A] = (Serializer[A]) => KafkaSerializer[A] - - implicit val forStrings: Serializer[String] = - Serializer[String]( - className = "org.apache.kafka.common.serialization.StringSerializer", - classType = classOf[StringSerializer] - ) - - implicit val forByteArray: Serializer[Array[Byte]] = - Serializer[Array[Byte]]( - className = "org.apache.kafka.common.serialization.ByteArraySerializer", - classType = classOf[ByteArraySerializer] - ) - -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/config/Acks.scala b/kafka-0.8.x/src/main/scala/monix/kafka/config/Acks.scala deleted file mode 100644 index 0018d00c..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/config/Acks.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka.config - -import com.typesafe.config.ConfigException.BadValue - -/** Enumeration for specifying the `acks` setting in - * [[monix.kafka.KafkaProducerConfig KafkaProducerConfig]]. - * - * Represents the number of acknowledgments the producer requires - * the leader to have received before considering a request complete. - * This controls the durability of records that are sent. - * - * For the available options see: - * - * - [[Acks.Zero]] - * - [[Acks.NonZero]] - * - [[Acks.All]] - */ -sealed trait Acks extends Product with Serializable { - def id: String -} - -object Acks { - @throws(classOf[BadValue]) - def apply(id: String): Acks = - id match { - case All.id => All - case Number(nrStr) => - val nr = nrStr.toInt - if (nr == 0) Zero else NonZero(nr) - case _ => - throw new BadValue("kafka.acks", s"Invalid value: $id") - } - - /** If set to zero then the producer will not wait - * for any acknowledgment from the server at all. - * - * The record will be immediately added to the socket buffer and - * considered sent. No guarantee can be made that the server has received - * the record in this case, and the retries configuration will not - * take effect (as the client won't generally know of any failures). - * The offset given back for each record will always be set to -1. - */ - case object Zero extends Acks { - val id = "0" - } - - /** This will mean the leader will write the record to its local - * log but will respond without awaiting full acknowledgement - * from all followers. - * - * In this case should the leader fail immediately after acknowledging - * the record but before the followers have replicated it then the - * record will be lost. - */ - final case class NonZero(nr: Int) extends Acks { - require(nr > 0, "nr > 0") - val id = nr.toString - } - - /** This means the leader will wait for the - * full set of in-sync replicas to acknowledge the record. - * - * This guarantees that the record will not be lost as long as - * at least one in-sync replica remains alive. This is the strongest - * available guarantee. - */ - case object All extends Acks { - val id = "all" - } - - // Regular expression for parsing IDs - private val Number = """^(\d+)$""".r -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/config/AutoOffsetReset.scala b/kafka-0.8.x/src/main/scala/monix/kafka/config/AutoOffsetReset.scala deleted file mode 100644 index 547c294c..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/config/AutoOffsetReset.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka.config - -import com.typesafe.config.ConfigException.BadValue - -/** What to do when there is no initial offset in Kafka or if the - * current offset does not exist any more on the server - * (e.g. because that data has been deleted). - * - * Available choices: - * - * - [[AutoOffsetReset.Smallest]] - * - [[AutoOffsetReset.Largest]] - */ -sealed trait AutoOffsetReset extends Serializable { - def id: String -} - -object AutoOffsetReset { - @throws(classOf[BadValue]) - def apply(id: String): AutoOffsetReset = - id.trim.toLowerCase match { - case Smallest.id => Smallest - case Largest.id => Largest - case _ => - throw new BadValue("kafka.auto.offset.reset", s"Invalid value: $id") - } - - /** Automatically reset the offset to the earliest offset. */ - case object Smallest extends AutoOffsetReset { - val id = "smallest" - } - - /** Automatically reset the offset to the latest offset. */ - case object Largest extends AutoOffsetReset { - val id = "largest" - } - -} - - diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/config/ClassName.scala b/kafka-0.8.x/src/main/scala/monix/kafka/config/ClassName.scala deleted file mode 100644 index 19193ce8..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/config/ClassName.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka.config - -import scala.reflect.ClassTag - -abstract class ClassName[T](implicit T: ClassTag[T]) extends Serializable { - def className: String - - val classType: Class[_ <: T] = - Class.forName(className).asInstanceOf[Class[_ <: T]] - - require( - findClass(classType :: Nil, T.runtimeClass), - s"Given type $className does not implement ${T.runtimeClass}" - ) - - private def findClass(stack: List[Class[_]], searched: Class[_]): Boolean = - stack match { - case Nil => false - case x :: xs => - if (x == searched) true else { - val superClass: List[Class[_]] = Option(x.getSuperclass).toList - val rest = superClass ::: x.getInterfaces.toList ::: xs - findClass(rest, searched) - } - } -} diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/config/CompressionType.scala b/kafka-0.8.x/src/main/scala/monix/kafka/config/CompressionType.scala deleted file mode 100644 index b2c1d684..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/config/CompressionType.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka.config - -import com.typesafe.config.ConfigException.BadValue - -/** The compression type for all data generated by the producer, - * the `compression.type` from the Kafka Producer configuration. - * - * The default is none (i.e. no compression). Compression is of full - * batches of data, so the efficacy of batching will also impact - * the compression ratio (more batching means better compression). - * - * Valid values: - * - * - [[CompressionType.Uncompressed]] - * - [[CompressionType.Gzip]] - * - [[CompressionType.Snappy]] - * - [[CompressionType.Lz4]] - */ -sealed trait CompressionType extends Serializable { - def id: String -} - -object CompressionType { - @throws(classOf[BadValue]) - def apply(id: String): CompressionType = - id match { - case Uncompressed.id => Uncompressed - case Gzip.id => Gzip - case Snappy.id => Snappy - case Lz4.id => Lz4 - case _ => - throw new BadValue("kafka.compression.type", s"Invalid value: $id") - } - - case object Uncompressed extends CompressionType { - val id = "none" - } - - case object Gzip extends CompressionType { - val id = "gzip" - } - - case object Snappy extends CompressionType { - val id = "snappy" - } - - case object Lz4 extends CompressionType { - val id = "lz4" - } -} \ No newline at end of file diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/config/OffsetsStorage.scala b/kafka-0.8.x/src/main/scala/monix/kafka/config/OffsetsStorage.scala deleted file mode 100644 index c823ad6d..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/config/OffsetsStorage.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka.config - -import com.typesafe.config.ConfigException.BadValue - -/** Where offsets should be stored. - * - * Available choices: - * - * - [[OffsetsStorage.Zookeeper]] - * - [[OffsetsStorage.Kafka]] - */ -sealed trait OffsetsStorage extends Serializable { - def id: String -} - -object OffsetsStorage { - @throws(classOf[BadValue]) - def apply(id: String): OffsetsStorage = - id.trim.toLowerCase match { - case Zookeeper.id => Zookeeper - case Kafka.id => Kafka - case _ => - throw new BadValue("kafka.partition.assignment.strategy", s"Invalid value: $id") - } - - /** Store offsets in Zookeeper. */ - case object Zookeeper extends OffsetsStorage { - val id = "zookeeper" - } - - /** Store offsets in Kafka. */ - case object Kafka extends OffsetsStorage { - val id = "kafka" - } - -} - - - diff --git a/kafka-0.8.x/src/main/scala/monix/kafka/config/PartitionAssignmentStrategy.scala b/kafka-0.8.x/src/main/scala/monix/kafka/config/PartitionAssignmentStrategy.scala deleted file mode 100644 index de7ade4a..00000000 --- a/kafka-0.8.x/src/main/scala/monix/kafka/config/PartitionAssignmentStrategy.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka.config - -import com.typesafe.config.ConfigException.BadValue - -/** A strategy for assigning partitions to consumer streams - * - * Available choices: - * - * - [[PartitionAssignmentStrategy.Range]] - * - [[PartitionAssignmentStrategy.Roundrobin]] - */ -sealed trait PartitionAssignmentStrategy extends Serializable { - def id: String -} - -object PartitionAssignmentStrategy { - @throws(classOf[BadValue]) - def apply(id: String): PartitionAssignmentStrategy = - id.trim.toLowerCase match { - case Range.id => Range - case Roundrobin.id => Roundrobin - case _ => - throw new BadValue("kafka.partition.assignment.strategy", s"Invalid value: $id") - } - - /** Use range method for assigning partitions. */ - case object Range extends PartitionAssignmentStrategy { - val id = "range" - } - - /** Use roundrobin method for assigning partitions. */ - case object Roundrobin extends PartitionAssignmentStrategy { - val id = "roundrobin" - } - -} - - - diff --git a/kafka-0.8.x/src/test/resources/logback.xml b/kafka-0.8.x/src/test/resources/logback.xml deleted file mode 100644 index 5a3e8a64..00000000 --- a/kafka-0.8.x/src/test/resources/logback.xml +++ /dev/null @@ -1,11 +0,0 @@ - - - - %d{yyyyMMdd-HH:mm:ss.SSSZ} [%thread] %-5level %logger - %msg%n - - - - - - - diff --git a/kafka-0.8.x/src/test/scala/monix/kafka/MonixKafkaTest.scala b/kafka-0.8.x/src/test/scala/monix/kafka/MonixKafkaTest.scala deleted file mode 100644 index 0e026624..00000000 --- a/kafka-0.8.x/src/test/scala/monix/kafka/MonixKafkaTest.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix.kafka - -import monix.eval.Task -import monix.execution.Scheduler.Implicits.global -import monix.kafka.config.AutoOffsetReset -import monix.reactive.Observable -import org.apache.kafka.clients.producer.ProducerRecord -import org.scalatest.FunSuite -import scala.concurrent.Await -import scala.concurrent.duration._ - -class MonixKafkaTest extends FunSuite { - val topicName = "monix-kafka-tests" - - test("full producer/consumer test") { - val count = 10000 - val producerCfg = KafkaProducerConfig.default.copy( - bootstrapServers = List("127.0.0.1:9092") - ) - - val consumerCfg = KafkaConsumerConfig.default.copy( - zookeeperConnect = "127.0.0.1:2181", - groupId = "kafka-tests", - autoOffsetReset = AutoOffsetReset.Smallest - ) - - val producer = KafkaProducerSink[String,String](producerCfg, io) - val consumer = KafkaConsumerObservable[String,String](consumerCfg, List(topicName)).executeOn(io) - - val pushT = Observable.range(0, count) - .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) - .bufferIntrospective(1024) - .consumeWith(producer) - - val listT = consumer - .take(count) - .map(_.message()) - .toListL - - val (result, _) = Await.result(Task.zip2(Task.fork(listT), Task.fork(pushT)).runAsync, 5.minutes) - assert(result.map(_.toInt).sum === (0 until count).sum) - } -} diff --git a/kafka-0.8.x/src/test/scala/monix/kafka/package.scala b/kafka-0.8.x/src/test/scala/monix/kafka/package.scala deleted file mode 100644 index 0960ca50..00000000 --- a/kafka-0.8.x/src/test/scala/monix/kafka/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2014-2016 by its authors. Some rights reserved. - * See the project homepage at: https://github.com/monixio/monix-kafka - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package monix - -import monix.execution.Scheduler - -package object kafka { - /** I/O scheduler meant for tests. */ - lazy val io = Scheduler.io("monix-kafka-tests") -}