Skip to content
This repository has been archived by the owner on Oct 19, 2020. It is now read-only.

Input d stream update state by key #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# spark-kafka
Spark-kafka is a library that facilitates batch loading data from Kafka into Spark, and from Spark into Kafka.

This library does not provide a Kafka Input DStream for Spark Streaming. For that please take a look at the spark-streaming-kafka library that is part of Spark itself.

## SimpleConsumerConfig
This is the configuration that KafkaRDD needs to consume data from Kafka. It includes metadata.broker.list (a comma-separated list of Kafka brokers for bootstrapping) and some SimpleConsumer related settings such as timeouts and buffer sizes. Only metadata.broker.list is required.

Expand All @@ -21,5 +19,11 @@ writeToKafka can also be used in Spark Streaming to save the underlying RDDs of

The unit test infrastructure was copied from/inspired by spark-streaming-kafka.

# KafkaInputDStream
KafkaInputDStream is a inputDStream for Spark Streaming to provide exactly-once message delivery semantics. It intents to addresses some problems in spark-streaming-kafka library provided by Spark as following:
1. Spark Receiver can't be restarted after it died. Our KafkaInputDStream doesn't extend from ReceiverInputDStream.
2. https://issues.apache.org/jira/browse/SPARK-3146
3. Known issues in Spark Streaming section of http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

Have fun!
Team @ Tresata
112 changes: 112 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tresata</groupId>
<artifactId>spark-kafka</artifactId>
<version>0.3</version>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<version>2.1.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>

<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
4 changes: 3 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ object ProjectBuild extends Build {
exclude("jline", "jline"),
"org.apache.spark" %% "spark-core" % "1.1.0" % "provided",
"org.slf4j" % "slf4j-log4j12" % "1.6.6" % "test",
"org.scalatest" %% "scalatest" % "2.2.0" % "test"
"org.scalatest" %% "scalatest" % "2.2.0" % "test",
"org.apache.spark" % "spark-streaming-kafka" % "1.1.0" % "compile",
"org.apache.commons" % "commons-pool2" % "2.2" % "compile"
),
publishMavenStyle := true,
pomIncludeRepository := { x => false },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.tresata.kafka.utils;

import com.tresata.spark.kafka.Broker;
import com.tresata.spark.kafka.SimpleConsumerConfig;
import kafka.consumer.SimpleConsumer;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

/**
* Created by vincentye on 12/18/14.
*/
public class PooledSimpleConsumerFactory extends BaseKeyedPooledObjectFactory<Broker, SimpleConsumer> {

SimpleConsumerConfig config;
String clientId;
public PooledSimpleConsumerFactory(SimpleConsumerConfig config, String clientId){
this.config = config;
this.clientId = clientId;
}

@Override
public SimpleConsumer create(Broker key) throws Exception {
return new SimpleConsumer(key.host(), key.port(), config.socketTimeoutMs(), config.socketReceiveBufferBytes(), clientId);
}

@Override
public PooledObject wrap(SimpleConsumer value) {
return new DefaultPooledObject(value);
}

@Override
public void destroyObject(Broker key, PooledObject<SimpleConsumer> p) throws Exception {
p.getObject().close();
super.destroyObject(key, p);
}
}
106 changes: 106 additions & 0 deletions src/main/scala/com/tresata/spark/kafka/KafkaInputDStream.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.tresata.spark.kafka

import java.util.Properties

import kafka.api.OffsetRequest
import kafka.consumer.SimpleConsumer
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
import org.apache.spark.{SparkConf, Accumulator}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, Time, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions, InputDStream}

import scala.collection.mutable

/**
* Created by vincentye on 12/18/14.
*/
private class KafkaInputDStream(@transient val ssc_ : StreamingContext, val config: Properties, val topic: String, val startOffsets: mutable.Map[Int, Long] = mutable.Map.empty,
startTime: Long = OffsetRequest.EarliestTime) extends InputDStream[(Int, (Long, Long))](ssc_){
override def start(): Unit = {
// context.addStreamingListener(new StreamingListener{
// override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
// offsetsMap.get(batchCompleted.batchInfo.batchTime.milliseconds).foreach(v => commit(v.mapValues(_._2)))
// }
// })
}

override def stop(): Unit = {}

// private val offsetAccum: Accumulator[Map[Int, Long]] = ssc_.sparkContext.accumulator(offsets)(OffsetAccumulatorParam)

// val offsetsMap:mutable.Map[Long, Map[Int, (Long, Long)]] = mutable.Map.empty
@transient lazy val simpleConfig: SimpleConsumerConfig = {
SimpleConsumerConfig(config)
}

override def compute(validTime: Time): Option[RDD[(Int, (Long, Long))]] = {
import KafkaOffsetRDD._
val brokers = brokerList(simpleConfig)
val (startElseOffsets, stopOffsets) = retryIfNoLeader({
val leaders = partitionLeaders(topic, brokers, simpleConfig)
(partitionOffsets(topic, startTime, leaders, simpleConfig), partitionOffsets(topic, OffsetRequest.LatestTime, leaders, simpleConfig))
}, simpleConfig)
val offsets = startElseOffsets.map{ case (partition, startOffset) => (partition, (startOffsets.getOrElse(partition,startOffset), stopOffsets(partition))) }


// val ret = Some(KafkaOffsetRDD(ssc_.sparkContext, topic, startOffsets.toMap, startTime, OffsetRequest.LatestTime, simpleConfig))
startOffsets ++= offsets.mapValues(_._2)
// offsetsMap += (validTime.milliseconds -> offsets)
// ret
this.log.info(s"offsets RDD: ${offsets}")
Some(context.sparkContext.makeRDD(offsets.toSeq, 1))
}
}


object KafkaInputDStream {
private def updateFunc(partitionOffset: Seq[(Long, Long)], state: Option[(Long, Long)])(implicit maxMessages: Int): Option[(Long, Long)] = {
state.map(state => (state._2, Math.max(Math.min(partitionOffset.head._2, state._2 + maxMessages), state._2))).orElse(partitionOffset.headOption.map{ case (start, stop) => (start, Math.min(stop, start + maxMessages))})
}

private def simpleConsumer(broker: Broker, config: SimpleConsumerConfig): SimpleConsumer =
new SimpleConsumer(broker.host, broker.port, config.socketTimeoutMs, config.socketReceiveBufferBytes, config.clientId)

def apply(@transient ssc_ : StreamingContext, config: Properties, topic: String, nextOffsets: mutable.Map[Int, Long] = mutable.Map.empty,
startTime: Long = OffsetRequest.EarliestTime)(implicit maxMessages: Int): DStream[PartitionOffsetMessage] = {
new KafkaInputDStream(ssc_, config, topic, nextOffsets, startTime)
.updateStateByKey(updateFunc _, 1)
.transform(rdd =>
new KafkaRDD(rdd.sparkContext, topic, rdd.collect().toMap, config)
)


}
def main(args: Array[String]): Unit ={
val sConf = new SparkConf()
sConf.setMaster("local[*]")
sConf.setAppName("KafkaInputDStream")

val kafkaProps = new Properties()
//kafkaProps.setProperty("zookeeper.connect","zk11,zk12,zk13/kafka08_adstream")
kafkaProps.setProperty("metadata.broker.list", "pkafka201,pkafka202,pkafka203,pkafka204,pkafka205,pkafka206,pkafka207,pkafka208")

val ssc = StreamingContext.getOrCreate("/tmp/KafkaInputDStream",
() => {
val ssc = new StreamingContext(sConf, Seconds(10))
ssc.checkpoint("/tmp/KafkaInputDStream")
implicit val maxMessages = 10
val dstream = KafkaInputDStream(ssc, kafkaProps, "impressionsAvroStream", startTime = OffsetRequest.LatestTime)

dstream.count().print()
ssc
})


// val offsets = mutable.Map[Int, Long](0 -> 476389581, 5 -> 471471490, 10 -> 467098771, 14 -> 481670846, 1 -> 471358914, 6 -> 470971738, 9 -> 470376574, 13 -> 474983439, 2 -> 472933046, 17 -> 474255748, 12 -> 475283012, 7 -> 477454529, 3 -> 472251285, 18 -> 476095049, 16 -> 477408456, 11 -> 476708031, 8 -> 481643634, 19 -> 470556533, 4 -> 479206186, 15 -> 467984037)


ssc.start()
ssc.awaitTermination()
}

}


Loading