Skip to content

Commit

Permalink
OnlineHDPExample is running. The statistics generated are suspect.
Browse files Browse the repository at this point in the history
  • Loading branch information
sboesch committed Dec 22, 2017
1 parent 8a77eed commit 3bbce88
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 124 deletions.
43 changes: 43 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,49 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>org.openchai.tensorflow.TfServer</Main-Class>
<Build-Number>100.0</Build-Number>
</manifestEntries>
</transformer>
</transformers>
<artifactSet>
<!--<excludes>-->
<!--<exclude>org.apache.spark:*</exclude>-->
<!--</excludes>-->
</artifactSet>
<filters>
<filter>
<artifact>*.*</artifact>
<excludes>
<!--<exclude>org.apache.maven:lib:tests</exclude>-->
<exclude>log4j:log4j:jar:</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/ECLIPSE*</exclude>
<exclude>META-INF/license/*</exclude>
</excludes>
</filter>
</filters>
<minimizeJar>false</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
20 changes: 20 additions & 0 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Set everything to be logged to the file bagel/target/unit-tests.log
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.spark=WARN
log4j.logger.parquet=ERROR
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.spark=WARN
log4j.logger.org.apache.spark.sql.execution.datasources.parquet=ERROR
log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ERROR
log4j.logger.org.apache.hadoop.io.compress.CodecPool=ERROR
155 changes: 72 additions & 83 deletions src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@ import scala.collection.mutable
import scala.reflect.runtime.universe._

/**
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
*
* @tparam T Concrete parameter class.
*/
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
* @tparam T Concrete parameter class.
*/
abstract class AbstractParams[T: TypeTag] {

private def tag: TypeTag[T] = typeTag[T]

/**
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.declarations.collect {
Expand All @@ -63,44 +62,40 @@ abstract class AbstractParams[T: TypeTag] {
}

/**
* An example Latent Dirichlet Allocation (LDA) app. Run with
* {{{
* ./bin/run-example mllib.LDAExample [options] <input>
* }}}
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
* An example Latent Dirichlet Allocation (LDA) app. Run with
* {{{
* ./bin/run-example mllib.LDAExample [options] <input>
* }}}
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object LDAExample {

private case class Params(
input: Seq[String] = Seq.empty,
master: String = "local",
k: Int = 20,
maxIterations: Int = 10,
maxInnerIterations: Int = 5,
docConcentration: Double = 0.01,
topicConcentration: Double = 0.01,
vocabSize: Int = 10000,
stopwordFile: String = "",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10,
optimizer: String = "online",
gibbsSampler: String = "alias",
gibbsAlphaAS: Double = 0.1,
gibbsPrintPerplexity: Boolean = false,
gibbsEdgePartitioner: String = "none",
partitions: Int = 2,
logLevel: String = "info",
psMasterAddr: String = null
) extends AbstractParams[Params]
input: Seq[String] = Seq.empty,
k: Int = 20,
maxIterations: Int = 10,
maxInnerIterations: Int = 5,
docConcentration: Double = 0.01,
topicConcentration: Double = 0.01,
vocabSize: Int = 10000,
stopwordFile: String = "",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10,
optimizer:String = "em",
gibbsSampler:String = "alias",
gibbsAlphaAS:Double = 0.1,
gibbsPrintPerplexity:Boolean = false,
gibbsEdgePartitioner:String = "none",
partitions:Int = 2,
logLevel:String = "info",
psMasterAddr:String = null
) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()

val parser = new OptionParser[Params]("LDAExample") {
head("LDAExample: an example LDA app for plain text data.")
opt[String]("master")
.text(s"spark master. default: ${defaultParams.master}")
.action((x, c) => c.copy(master = x))
opt[Int]("k")
.text(s"number of topics. default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
Expand All @@ -112,28 +107,28 @@ object LDAExample {
.action((x, c) => c.copy(maxInnerIterations = x))
opt[Double]("docConcentration")
.text(s"amount of topic smoothing to use (> 1.0) (-1=auto)." +
s" default: ${defaultParams.docConcentration}")
s" default: ${defaultParams.docConcentration}")
.action((x, c) => c.copy(docConcentration = x))
opt[Double]("topicConcentration")
.text(s"amount of term (word) smoothing to use (> 1.0) (-1=auto)." +
s" default: ${defaultParams.topicConcentration}")
s" default: ${defaultParams.topicConcentration}")
.action((x, c) => c.copy(topicConcentration = x))
opt[Int]("vocabSize")
.text(s"number of distinct word types to use, chosen by frequency. (-1=all)" +
s" default: ${defaultParams.vocabSize}")
s" default: ${defaultParams.vocabSize}")
.action((x, c) => c.copy(vocabSize = x))
opt[String]("stopwordFile")
.text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
s" default: ${defaultParams.stopwordFile}")
s" default: ${defaultParams.stopwordFile}")
.action((x, c) => c.copy(stopwordFile = x))
opt[String]("checkpointDir")
.text(s"Directory for checkpointing intermediate results." +
s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
s" default: ${defaultParams.checkpointDir}")
s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
s" default: ${defaultParams.checkpointDir}")
.action((x, c) => c.copy(checkpointDir = Some(x)))
opt[Int]("checkpointInterval")
.text(s"Iterations between each checkpoint. Only used if checkpointDir is set." +
s" default: ${defaultParams.checkpointInterval}")
s" default: ${defaultParams.checkpointInterval}")
.action((x, c) => c.copy(checkpointInterval = x))
opt[String]("optimizer")
.text(s"available optimizer are online and gibbs, default: ${defaultParams.optimizer}")
Expand Down Expand Up @@ -161,7 +156,7 @@ object LDAExample {
.action((x, c) => c.copy(psMasterAddr = x))
arg[String]("<input>...")
.text("input paths (directories) to plain text corpora." +
" Each text file line should hold 1 document.")
" Each text file line should hold 1 document.")
.unbounded()
.required()
.action((x, c) => c.copy(input = c.input :+ x))
Expand All @@ -175,11 +170,11 @@ object LDAExample {
}
}

// private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = {
private def createOptimizer(params: Params): LDAOptimizer = {
// private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = {
private def createOptimizer(params: Params):LDAOptimizer = {
params.optimizer match {
case "online" => val optimizer = new OnlineLDAOptimizer
optimizer
optimizer
case "gibbs" =>
val optimizer = new GibbsLDAOptimizer
optimizer.setSampler(params.gibbsSampler)
Expand All @@ -188,18 +183,16 @@ object LDAExample {
optimizer.setAlphaAS(params.gibbsAlphaAS.toFloat)
optimizer
case _ =>
throw new IllegalArgumentException(s"available optimizers are online and gibbs, but got ${params.optimizer}")
throw new IllegalArgumentException(s"available optimizers are em, online and gibbs, but got ${params.optimizer}")
}
}

/**
* run LDA
*
* @param params
*/
* run LDA
* @param params
*/
private def run(params: Params) {
val conf = new SparkConf().setAppName(s"LDAExample with $params")
.setMaster(params.master)
val sc = new SparkContext(conf)

val logLevel = Level.toLevel(params.logLevel, Level.INFO)
Expand Down Expand Up @@ -258,16 +251,15 @@ object LDAExample {
}

/**
* Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
*
* @return (corpus, vocabulary as array, total token count in corpus)
*/
* Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
* @return (corpus, vocabulary as array, total token count in corpus)
*/
private def preprocess(
sc: SparkContext,
paths: Seq[String],
vocabSize: Int,
partitions: Int,
stopwordFile: String): (RDD[(Long, Vector)], Long, Array[String], Long) = {
sc: SparkContext,
paths: Seq[String],
vocabSize: Int,
partitions:Int,
stopwordFile: String): (RDD[(Long, Vector)], Long, Array[String], Long) = {

// Get dataset of document texts
// One document per line in each text file. If the input consists of many small files,
Expand Down Expand Up @@ -328,24 +320,21 @@ object LDAExample {
}

/**
* Simple Tokenizer.
*
* TODO: Formalize the interface, and make this a public class in mllib.feature
*/
* Simple Tokenizer.
*
* TODO: Formalize the interface, and make this a public class in mllib.feature
*/
private class SimpleTokenizer(sc: SparkContext, stopwordFile: String) extends Serializable {

private val stopwords: Set[String] = Set("article", "write", "writes", "entry", "date", "udel", "said",
"tell", "think", "know", "just", "newsgroup", "line", "like", "does", "going", "make", "thanks","year","years") ++
(if (stopwordFile.isEmpty) {
Set.empty[String]
} else {
val stopwordText = sc.textFile(stopwordFile).collect()
stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet
})

// Matches sequences of Unicode alnum letters
// private val allWordRegex = "^(\\p{L}*)$".r
private val allWordRegex = "(\\b[^\\s]+\\b)".r
private val stopwords: Set[String] = if (stopwordFile.isEmpty) {
Set.empty[String]
} else {
val stopwordText = sc.textFile(stopwordFile).collect()
stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet
}

// Matches sequences of Unicode letters
private val allWordRegex = "^(\\p{L}*)$".r

// Ignore words shorter than this length.
private val minWordLength = 3
Expand Down
Loading

0 comments on commit 3bbce88

Please sign in to comment.