diff --git a/pom.xml b/pom.xml
index 2c125a2..1b7aa3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,49 @@
maven-compiler-plugin
3.3
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4
+
+
+ package
+
+ shade
+
+
+
+
+
+ org.openchai.tensorflow.TfServer
+ 100.0
+
+
+
+
+
+
+
+
+
+
+ *.*
+
+
+ log4j:log4j:jar:
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ META-INF/ECLIPSE*
+ META-INF/license/*
+
+
+
+ false
+
+
+
+
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644
index 0000000..e329512
--- /dev/null
+++ b/src/main/resources/log4j.properties
@@ -0,0 +1,20 @@
+# Set everything to be logged to the file bagel/target/unit-tests.log
+log4j.rootCategory=WARN, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.spark=WARN
+log4j.logger.parquet=ERROR
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.spark=WARN
+log4j.logger.org.apache.spark.sql.execution.datasources.parquet=ERROR
+log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ERROR
+log4j.logger.org.apache.hadoop.io.compress.CodecPool=ERROR
diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala
index cf92e05..b71d4fe 100644
--- a/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala
+++ b/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala
@@ -29,23 +29,22 @@ import scala.collection.mutable
import scala.reflect.runtime.universe._
/**
- * Abstract class for parameter case classes.
- * This overrides the [[toString]] method to print all case class fields by name and value.
- *
- * @tparam T Concrete parameter class.
- */
+ * Abstract class for parameter case classes.
+ * This overrides the [[toString]] method to print all case class fields by name and value.
+ * @tparam T Concrete parameter class.
+ */
abstract class AbstractParams[T: TypeTag] {
private def tag: TypeTag[T] = typeTag[T]
/**
- * Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
- * {
- * [field name]:\t[field value]\n
- * [field name]:\t[field value]\n
- * ...
- * }
- */
+ * Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
+ * {
+ * [field name]:\t[field value]\n
+ * [field name]:\t[field value]\n
+ * ...
+ * }
+ */
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.declarations.collect {
@@ -63,44 +62,40 @@ abstract class AbstractParams[T: TypeTag] {
}
/**
- * An example Latent Dirichlet Allocation (LDA) app. Run with
- * {{{
- * ./bin/run-example mllib.LDAExample [options]
- * }}}
- * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
- */
+ * An example Latent Dirichlet Allocation (LDA) app. Run with
+ * {{{
+ * ./bin/run-example mllib.LDAExample [options]
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
object LDAExample {
private case class Params(
- input: Seq[String] = Seq.empty,
- master: String = "local",
- k: Int = 20,
- maxIterations: Int = 10,
- maxInnerIterations: Int = 5,
- docConcentration: Double = 0.01,
- topicConcentration: Double = 0.01,
- vocabSize: Int = 10000,
- stopwordFile: String = "",
- checkpointDir: Option[String] = None,
- checkpointInterval: Int = 10,
- optimizer: String = "online",
- gibbsSampler: String = "alias",
- gibbsAlphaAS: Double = 0.1,
- gibbsPrintPerplexity: Boolean = false,
- gibbsEdgePartitioner: String = "none",
- partitions: Int = 2,
- logLevel: String = "info",
- psMasterAddr: String = null
- ) extends AbstractParams[Params]
+ input: Seq[String] = Seq.empty,
+ k: Int = 20,
+ maxIterations: Int = 10,
+ maxInnerIterations: Int = 5,
+ docConcentration: Double = 0.01,
+ topicConcentration: Double = 0.01,
+ vocabSize: Int = 10000,
+ stopwordFile: String = "",
+ checkpointDir: Option[String] = None,
+ checkpointInterval: Int = 10,
+ optimizer:String = "em",
+ gibbsSampler:String = "alias",
+ gibbsAlphaAS:Double = 0.1,
+ gibbsPrintPerplexity:Boolean = false,
+ gibbsEdgePartitioner:String = "none",
+ partitions:Int = 2,
+ logLevel:String = "info",
+ psMasterAddr:String = null
+ ) extends AbstractParams[Params]
def main(args: Array[String]) {
val defaultParams = Params()
val parser = new OptionParser[Params]("LDAExample") {
head("LDAExample: an example LDA app for plain text data.")
- opt[String]("master")
- .text(s"spark master. default: ${defaultParams.master}")
- .action((x, c) => c.copy(master = x))
opt[Int]("k")
.text(s"number of topics. default: ${defaultParams.k}")
.action((x, c) => c.copy(k = x))
@@ -112,28 +107,28 @@ object LDAExample {
.action((x, c) => c.copy(maxInnerIterations = x))
opt[Double]("docConcentration")
.text(s"amount of topic smoothing to use (> 1.0) (-1=auto)." +
- s" default: ${defaultParams.docConcentration}")
+ s" default: ${defaultParams.docConcentration}")
.action((x, c) => c.copy(docConcentration = x))
opt[Double]("topicConcentration")
.text(s"amount of term (word) smoothing to use (> 1.0) (-1=auto)." +
- s" default: ${defaultParams.topicConcentration}")
+ s" default: ${defaultParams.topicConcentration}")
.action((x, c) => c.copy(topicConcentration = x))
opt[Int]("vocabSize")
.text(s"number of distinct word types to use, chosen by frequency. (-1=all)" +
- s" default: ${defaultParams.vocabSize}")
+ s" default: ${defaultParams.vocabSize}")
.action((x, c) => c.copy(vocabSize = x))
opt[String]("stopwordFile")
.text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
- s" default: ${defaultParams.stopwordFile}")
+ s" default: ${defaultParams.stopwordFile}")
.action((x, c) => c.copy(stopwordFile = x))
opt[String]("checkpointDir")
.text(s"Directory for checkpointing intermediate results." +
- s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
- s" default: ${defaultParams.checkpointDir}")
+ s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
+ s" default: ${defaultParams.checkpointDir}")
.action((x, c) => c.copy(checkpointDir = Some(x)))
opt[Int]("checkpointInterval")
.text(s"Iterations between each checkpoint. Only used if checkpointDir is set." +
- s" default: ${defaultParams.checkpointInterval}")
+ s" default: ${defaultParams.checkpointInterval}")
.action((x, c) => c.copy(checkpointInterval = x))
opt[String]("optimizer")
.text(s"available optimizer are online and gibbs, default: ${defaultParams.optimizer}")
@@ -161,7 +156,7 @@ object LDAExample {
.action((x, c) => c.copy(psMasterAddr = x))
arg[String]("...")
.text("input paths (directories) to plain text corpora." +
- " Each text file line should hold 1 document.")
+ " Each text file line should hold 1 document.")
.unbounded()
.required()
.action((x, c) => c.copy(input = c.input :+ x))
@@ -175,11 +170,11 @@ object LDAExample {
}
}
- // private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = {
- private def createOptimizer(params: Params): LDAOptimizer = {
+// private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = {
+ private def createOptimizer(params: Params):LDAOptimizer = {
params.optimizer match {
case "online" => val optimizer = new OnlineLDAOptimizer
- optimizer
+ optimizer
case "gibbs" =>
val optimizer = new GibbsLDAOptimizer
optimizer.setSampler(params.gibbsSampler)
@@ -188,18 +183,16 @@ object LDAExample {
optimizer.setAlphaAS(params.gibbsAlphaAS.toFloat)
optimizer
case _ =>
- throw new IllegalArgumentException(s"available optimizers are online and gibbs, but got ${params.optimizer}")
+ throw new IllegalArgumentException(s"available optimizers are em, online and gibbs, but got ${params.optimizer}")
}
}
/**
- * run LDA
- *
- * @param params
- */
+ * run LDA
+ * @param params
+ */
private def run(params: Params) {
val conf = new SparkConf().setAppName(s"LDAExample with $params")
- .setMaster(params.master)
val sc = new SparkContext(conf)
val logLevel = Level.toLevel(params.logLevel, Level.INFO)
@@ -258,16 +251,15 @@ object LDAExample {
}
/**
- * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
- *
- * @return (corpus, vocabulary as array, total token count in corpus)
- */
+ * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
+ * @return (corpus, vocabulary as array, total token count in corpus)
+ */
private def preprocess(
- sc: SparkContext,
- paths: Seq[String],
- vocabSize: Int,
- partitions: Int,
- stopwordFile: String): (RDD[(Long, Vector)], Long, Array[String], Long) = {
+ sc: SparkContext,
+ paths: Seq[String],
+ vocabSize: Int,
+ partitions:Int,
+ stopwordFile: String): (RDD[(Long, Vector)], Long, Array[String], Long) = {
// Get dataset of document texts
// One document per line in each text file. If the input consists of many small files,
@@ -328,24 +320,21 @@ object LDAExample {
}
/**
- * Simple Tokenizer.
- *
- * TODO: Formalize the interface, and make this a public class in mllib.feature
- */
+ * Simple Tokenizer.
+ *
+ * TODO: Formalize the interface, and make this a public class in mllib.feature
+ */
private class SimpleTokenizer(sc: SparkContext, stopwordFile: String) extends Serializable {
- private val stopwords: Set[String] = Set("article", "write", "writes", "entry", "date", "udel", "said",
- "tell", "think", "know", "just", "newsgroup", "line", "like", "does", "going", "make", "thanks","year","years") ++
- (if (stopwordFile.isEmpty) {
- Set.empty[String]
- } else {
- val stopwordText = sc.textFile(stopwordFile).collect()
- stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet
- })
-
- // Matches sequences of Unicode alnum letters
- // private val allWordRegex = "^(\\p{L}*)$".r
- private val allWordRegex = "(\\b[^\\s]+\\b)".r
+ private val stopwords: Set[String] = if (stopwordFile.isEmpty) {
+ Set.empty[String]
+ } else {
+ val stopwordText = sc.textFile(stopwordFile).collect()
+ stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet
+ }
+
+ // Matches sequences of Unicode letters
+ private val allWordRegex = "^(\\p{L}*)$".r
// Ignore words shorter than this length.
private val minWordLength = 3
diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala
index 54cb1ea..adadce9 100644
--- a/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala
+++ b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala
@@ -3,6 +3,7 @@ package org.apache.spark.mllib.topicModeling
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Vector => BV, _}
import breeze.numerics.{abs, digamma, exp, _}
import org.apache.spark.mllib.linalg.{SparseVector, Vector}
+import org.apache.spark.mllib.topicModeling.OnlineHDPExample.OnlineHDPParams
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
@@ -20,7 +21,7 @@ class SuffStats(
}
}
-object OnlineHDPOptimizer extends Serializable {
+object OnlineHDP extends Serializable {
val rhot_bound = 0.0
def log_normalize(v: BDV[Double]): (BDV[Double], Double) = {
@@ -82,21 +83,20 @@ object OnlineHDPOptimizer extends Serializable {
* Implemented based on the paper "Online Variational Inference for the Hierarchical Dirichlet Process" (Chong Wang, John Paisley and David M. Blei)
*/
-class OnlineHDPOptimizer(
- val corpus: RDD[(Long, Vector)],
- val chunkSize: Int = 256,
- val m_kappa: Double = 1.0,
- var m_tau: Double = 64.0,
- val m_K: Int = 15,
- val m_T: Int = 150,
- val m_alpha: Double = 1,
- val m_gamma: Double = 1,
- val m_eta: Double = 0.01,
- val m_scale: Double = 1.0,
- val m_var_converge: Double = 0.0001,
- val iterations: Int = 10
- ) extends Serializable {
-
+class OnlineHDP(corpus: RDD[(Long,Vector)], hdpParams: OnlineHDPParams) extends Serializable {
+
+ val op = hdpParams
+ val chunkSize: Int = op.chunkSize
+ val m_kappa: Double = op.kappa
+ val tau: Double = op.tau
+ val m_K: Int = op.k
+ val m_T: Int = op.vocabSize
+ val m_alpha: Double = op.alpha
+ val m_gamma: Double = op.gamma
+ val m_eta: Double = op.eta
+ val m_scale: Double = op.scale
+ val m_var_converge: Double = op.var_converge
+ val iterations: Int = op.maxIterations
val lda_alpha: Double = 1D
val lda_beta: Double = 1D
@@ -109,12 +109,12 @@ class OnlineHDPOptimizer(
var m_varphi_ss: BDV[Double] = BDV.zeros[Double](m_T) // T
// T * W
- val m_lambda: BDM[Double] = BDM.rand(m_T, m_W) * (m_D.toDouble) * 100.0 / (m_T * m_W).toDouble - m_eta
+ val m_lambda: BDM[Double] = BDM.rand(m_T, m_W) * (m_D * 100.0 / (m_T * m_W).toDouble - m_eta)
// T * W
- val m_Elogbeta = OnlineHDPOptimizer.dirichletExpectation(m_lambda + m_eta)
+ val m_Elogbeta = OnlineHDP.dirichletExpectation(m_lambda + m_eta)
- m_tau = m_tau + 1
+ var m_tau = tau + 1
var m_updatect = 0
var m_status_up_to_date = true
@@ -124,16 +124,15 @@ class OnlineHDPOptimizer(
val rhot_bound = 0.0
-
- def update(docs: RDD[(Long, Vector)]): Unit = {
- for (i <- 1 to iterations) {
+ def update(docs: RDD[(Long, Vector)]): Seq[LDAMetrics] = {
+ for (i <- 1 to iterations) yield {
val chunk = docs
update_chunk(chunk)
}
}
- def update_chunk(chunk: RDD[(Long, Vector)], update: Boolean = true): (Double, Int) = {
+ def update_chunk(chunk: RDD[(Long, Vector)], update: Boolean = true): LDAMetrics = {
// Find the unique words in this chunk...
val unique_words = scala.collection.mutable.Map[Int, Int]()
val raw_word_list = ArrayBuffer[Int]()
@@ -174,7 +173,7 @@ class OnlineHDPOptimizer(
val ss = new SuffStats(m_T, Wt, chunk.count().toInt)
- val Elogsticks_1st: BDV[Double] = OnlineHDPOptimizer.expect_log_sticks(m_var_sticks) // global sticks
+ val Elogsticks_1st: BDV[Double] = OnlineHDP.expect_log_sticks(m_var_sticks) // global sticks
// run variational inference on some new docs
var score = 0.0
@@ -186,7 +185,7 @@ class OnlineHDPOptimizer(
val dict = unique_words.toMap
val wl = doc_word_ids.toList
- val doc_score = doc_e_step(doc, ss, Elogsticks_1st,
+ val doc_score = doc_e_step(doc, hdpParams, ss, Elogsticks_1st,
word_list, dict, wl,
new BDV[Double](doc_word_counts), m_var_converge)
count += sum(doc_word_counts)
@@ -197,7 +196,8 @@ class OnlineHDPOptimizer(
update_lambda(ss, word_list)
}
- (score, count.toInt)
+ val perplexity = math.exp(-1.0 * (score / count))
+ LDAMetrics("OnlineHDP", count.toInt, score, perplexity)
}
@@ -223,13 +223,14 @@ class OnlineHDPOptimizer(
// update top level sticks
// 2 * T - 1
m_var_sticks(0, ::) := (m_varphi_ss(0 to m_T - 2) + 1.0).t
- val var_phi_sum = flipud(m_varphi_ss(1 to m_varphi_ss.length - 1)) // T - 1
+ val var_phi_sum = flipud(m_varphi_ss(1 until m_varphi_ss.length)) // T - 1
m_var_sticks(1, ::) := (flipud(accumulate(var_phi_sum)) + m_gamma).t
}
def doc_e_step(doc: (Long, Vector),
+ hdpParams: OnlineHDPParams,
ss: SuffStats,
Elogsticks_1st: BDV[Double],
word_list: List[Int],
@@ -246,7 +247,7 @@ class OnlineHDPOptimizer(
v(0, ::) := 1.0
v(1, ::) := m_alpha
- var Elogsticks_2nd = OnlineHDPOptimizer.expect_log_sticks(v)
+ var Elogsticks_2nd = OnlineHDP.expect_log_sticks(v)
// back to the uniform
var phi: BDM[Double] = BDM.ones[Double](doc_word_ids.size, m_K) * 1.0 / m_K.toDouble // Wt * K
@@ -257,28 +258,27 @@ class OnlineHDPOptimizer(
val eps = 1e-100
var iter = 0
- val max_iter = 100
var var_phi_out: BDM[Double] = BDM.ones[Double](1, 1)
// not yet support second level optimization yet, to be done in the future
- while (iter < max_iter && (converge < 0.0 || converge > var_converge)) {
+ while (iter < hdpParams.maxIterations && (converge < 0.0 || converge > var_converge)) {
// var_phi
val (log_var_phi: BDM[Double], var_phi: BDM[Double]) =
if (iter < 3) {
val element = Elogbeta_doc.copy // T * Wt
- for (i <- 0 to element.rows - 1) {
+ for (i <- 0 until element.rows) {
element(i, ::) := (element(i, ::).t :* doc_word_counts).t
}
var var_phi: BDM[Double] = phi.t * element.t // K * Wt * Wt * T => K * T
- val (log_var_phi, log_norm) = OnlineHDPOptimizer.log_normalize(var_phi)
+ val (log_var_phi, log_norm) = OnlineHDP.log_normalize(var_phi)
var_phi = exp(log_var_phi)
(log_var_phi, var_phi)
}
else {
val element = Elogbeta_doc.copy
- for (i <- 0 to element.rows - 1) {
+ for (i <- 0 until element.rows) {
element(i, ::) := (element(i, ::).t :* doc_word_counts).t
}
val product: BDM[Double] = phi.t * element.t
@@ -287,7 +287,7 @@ class OnlineHDPOptimizer(
}
var var_phi: BDM[Double] = product
- val (log_var_phi, log_norm) = OnlineHDPOptimizer.log_normalize(var_phi)
+ val (log_var_phi, log_norm) = OnlineHDP.log_normalize(var_phi)
var_phi = exp(log_var_phi)
(log_var_phi, var_phi)
}
@@ -296,7 +296,7 @@ class OnlineHDPOptimizer(
// phi
if (iter < 3) {
phi = (var_phi * Elogbeta_doc).t
- val (log_phi, log_norm) = OnlineHDPOptimizer.log_normalize(phi)
+ val (log_phi, log_norm) = OnlineHDP.log_normalize(phi)
phi = exp(log_phi)
(log_phi, log_norm)
}
@@ -307,7 +307,7 @@ class OnlineHDPOptimizer(
product(i, ::) := (product(i, ::).t + Elogsticks_2nd).t
}
phi = product
- val (log_phi, log_norm) = OnlineHDPOptimizer.log_normalize(phi)
+ val (log_phi, log_norm) = OnlineHDP.log_normalize(phi)
phi = exp(log_phi)
(log_phi, log_norm)
}
@@ -324,14 +324,14 @@ class OnlineHDPOptimizer(
val t_sum = sum(selected(::, *)).inner // toDenseVector
val phi_cum = flipud(t_sum)
v(1, ::) := (flipud(accumulate(phi_cum)) + m_alpha).t
- Elogsticks_2nd = OnlineHDPOptimizer.expect_log_sticks(v)
+ Elogsticks_2nd = OnlineHDP.expect_log_sticks(v)
likelihood = 0.0
// compute likelihood
// var_phi part/ C in john's notation
val diff = log_var_phi.copy
- for (i <- 0 to diff.rows - 1) {
+ for (i <- 0 until diff.rows) {
diff(i, ::) := (Elogsticks_1st :- diff(i, ::).t).t
}
@@ -340,7 +340,7 @@ class OnlineHDPOptimizer(
// v part/ v in john's notation, john's beta is alpha here
val log_alpha = log(m_alpha)
likelihood += (m_K - 1) * log_alpha
- val dig_sum = (digamma(sum(v(::, *)))).inner // .toDenseVector
+ val dig_sum = digamma(sum(v(::, *))).inner // .toDenseVector
val vCopy = v.copy
for (i <- 0 until v.cols) {
vCopy(::, i) := BDV[Double](1.0, m_alpha) - vCopy(::, i)
@@ -368,6 +368,7 @@ class OnlineHDPOptimizer(
}
likelihood += sum(phi.t :* (var_phi * Elogbeta_docCopy))
+ println(s"Likelihood = $likelihood")
val converge = (likelihood - old_likelihood) / abs(old_likelihood)
old_likelihood = likelihood
@@ -382,7 +383,7 @@ class OnlineHDPOptimizer(
// update the suff_stat ss
// this time it only contains information from one doc
val sumPhiOut = sum(var_phi_out(::, *))
- ss.m_var_sticks_ss += sumPhiOut.inner // .toDenseVector
+ ss.m_var_sticks_ss += sumPhiOut.inner
val phiCopy = phi.copy.t
for (i <- 0 until phi.rows) {
@@ -394,7 +395,7 @@ class OnlineHDPOptimizer(
ss.m_var_beta_ss(::, chunkids(i)) := ss.m_var_beta_ss(::, chunkids(i)) + middleResult(::, i)
}
- return likelihood
+ likelihood
}
}
\ No newline at end of file
diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala
new file mode 100644
index 0000000..dea0896
--- /dev/null
+++ b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.topicModeling
+
+import java.text.BreakIterator
+
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+import scopt.OptionParser
+
+import scala.collection.mutable
+import scala.reflect.runtime.universe._
+
+
+/**
+ * An example Online Hierarchical Dirichlet Process (HDP) app. Run with
+ * {{{
+ * ./bin/run-example mllib.OnlineHDPExample [options]
+ * }}}
+ */
+object OnlineHDPExample {
+
+ case class OnlineHDPParams(
+ input: Seq[String] = Seq.empty,
+ master: String = "local",
+ k: Int = 20,
+ vocabSize: Int = 10000,
+ chunkSize: Int = 256,
+ kappa: Double = 1.0,
+ tau: Double = 64.0,
+ alpha: Double = 1,
+ gamma: Double = 1,
+ eta: Double = 0.01,
+ scale: Double = 1.0,
+ var_converge: Double = 0.0001,
+ maxIterations: Int = 10,
+ checkpointDir: Option[String] = None,
+ checkpointInterval: Int = 10,
+ maxDocs: Int = -1,
+ partitions: Int = 2,
+ stopwordFile: String = "",
+ logLevel: String = "info"
+ ) extends AbstractParams[OnlineHDPParams] {
+
+ }
+
+ def main(args: Array[String]) = {
+ parseAndRun(args)
+ }
+
+ def parseAndRun(args: Array[String]) = {
+
+ val parser = getParser(args)
+ val results = parser.parse(args, OnlineHDPParams()).map { params=>
+ run(params)
+ }.getOrElse {
+ parser.showUsageAsError
+ sys.exit(1)
+ }
+ results
+ }
+
+ def getParser(args: Array[String]) = {
+ val defaultParams = OnlineHDPParams()
+
+ val parser = new OptionParser[OnlineHDPParams]("LDAExample") {
+ head("LDAExample: an example LDA app for plain text data.")
+ opt[String]("master")
+ .text(s"spark master. default: ${defaultParams.master}")
+ .action((x, c) => c.copy(master = x))
+ opt[Int]("k")
+ .text(s"number of topics. default: ${defaultParams.k}")
+ .action((x, c) => c.copy(k = x))
+ opt[Int]("maxDocs")
+ .text(s"max number of documents: will truncate the corpus if needbe. default: ${defaultParams.maxDocs}")
+ .action((x, c) => c.copy(maxDocs = x))
+ opt[Int]("maxIterations")
+ .text(s"number of iterations of learning. default: ${defaultParams.maxIterations}")
+ .action((x, c) => c.copy(maxIterations = x))
+ opt[Int]("vocabSize")
+ .text(s"number of distinct word types to use, chosen by frequency. (-1=all)" +
+ s" default: ${defaultParams.vocabSize}")
+ .action((x, c) => c.copy(vocabSize = x))
+ opt[String]("stopwordFile")
+ .text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
+ s" default: ${defaultParams.stopwordFile}")
+ .action((x, c) => c.copy(stopwordFile = x))
+ opt[String]("checkpointDir")
+ .text(s"Directory for checkpointing intermediate results." +
+ s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
+ s" default: ${defaultParams.checkpointDir}")
+ .action((x, c) => c.copy(checkpointDir = Some(x)))
+ opt[Int]("checkpointInterval")
+ .text(s"Iterations between each checkpoint. Only used if checkpointDir is set." +
+ s" default: ${defaultParams.checkpointInterval}")
+ .action((x, c) => c.copy(checkpointInterval = x))
+ opt[Int]("partitions")
+ .text(s"Minimum edge partitions, default: ${defaultParams.partitions}")
+ .action((x, c) => c.copy(partitions = x))
+ opt[String]("logLevel")
+ .text(s"Log level, default: ${defaultParams.logLevel}")
+ .action((x, c) => c.copy(logLevel = x))
+ arg[String]("...")
+ .text("input paths (directories) to plain text corpora." +
+ " Each text file line should hold 1 document.")
+ .unbounded()
+ .required()
+ .action((x, c) => c.copy(input = c.input :+ x))
+ }
+ parser
+ }
+
+ def run(params: OnlineHDPParams) {
+
+ val conf = new SparkConf().setAppName(s"LDAExample with $params")
+ .setMaster(params.master)
+ val sc = new SparkContext(conf)
+
+ val logLevel = Level.toLevel(params.logLevel, Level.INFO)
+ Logger.getRootLogger.setLevel(logLevel)
+ println(s"Setting log level to $logLevel")
+
+ // Load documents, and prepare them for LDA.
+ val preprocessStart = System.nanoTime()
+ val (corpus, actualCorpusSize, vocabArray, actualNumTokens) =
+ preprocess(sc, params.input, params.vocabSize, params.partitions, params.stopwordFile, params.maxDocs)
+ val actualVocabSize = vocabArray.size
+ val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
+
+ println()
+ println(s"Corpus summary:")
+ println(s"\t Training set size: $actualCorpusSize documents")
+ println(s"\t Vocabulary size: $actualVocabSize terms")
+ println(s"\t Training set size: $actualNumTokens tokens")
+ println(s"\t Preprocessing time: $preprocessElapsed sec")
+ println()
+
+
+ if (params.checkpointDir.nonEmpty) {
+ sc.setCheckpointDir(params.checkpointDir.get)
+ }
+
+ val startTime = System.nanoTime()
+ val lda = new OnlineHDP(corpus, params)
+ val results = lda.update(corpus) // Run one honkin big chunk for this example
+ val elapsed = (System.nanoTime() - startTime) / 1e9
+
+ println(s"Finished training ${getClass.getSimpleName}")
+ println(s"Results\n${results.mkString(",")}")
+ sc.stop()
+ results
+ }
+
+ /**
+ * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
+ *
+ * @return (corpus, vocabulary as array, total token count in corpus)
+ */
+ private def preprocess(
+ sc: SparkContext,
+ paths: Seq[String],
+ vocabSize: Int,
+ partitions: Int,
+ stopwordFile: String,
+ maxDocs: Int): (RDD[(Long, Vector)], Long, Array[String], Long) = {
+
+ // Get dataset of document texts
+ // One document per line in each text file. If the input consists of many small files,
+ // this can result in a large number of small partitions, which can degrade performance.
+ // In this case, consider using coalesce() to create fewer, larger partitions.
+ println(s"Loading corpus from ${paths.mkString(",")} ..")
+ val textRDD1: RDD[String] = sc.textFile(paths.mkString(","), Math.max(1, partitions))
+
+ val textRDD2 = if (maxDocs == -1) {
+ println("Processing all documents in the corpus ..")
+ textRDD1
+ } else {
+ val nDocs = textRDD1.count
+ if (nDocs > maxDocs) {
+ val sampleFrac = maxDocs.toDouble / nDocs
+ println(s"Processing only $maxDocs (out of $nDocs) - ${((sampleFrac*1000).toInt).toDouble/10.0}% ..")
+ textRDD1.sample(withReplacement = false, sampleFrac, 1)
+ } else {
+ textRDD1
+ }
+ }
+ val textRDD = textRDD2.coalesce(partitions)
+
+ // Split text into words
+ val tokenizer = new SimpleTokenizer(sc, stopwordFile)
+ val tokenized: RDD[(Long, IndexedSeq[String])] = textRDD.zipWithIndex().map { case (text, id) =>
+ id -> tokenizer.getWords(text)
+ }
+ tokenized.cache()
+
+ // Counts words: RDD[(word, wordCount)]
+ val wordCounts: RDD[(String, Long)] = tokenized
+ .flatMap { case (_, tokens) => tokens.map(_ -> 1L) }
+ .reduceByKey(_ + _)
+ wordCounts.cache()
+ val fullVocabSize = wordCounts.count()
+ // Select vocab
+ // (vocab: Map[word -> id], total tokens after selecting vocab)
+ val (vocab: Map[String, Int], selectedTokenCount: Long) = {
+ val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 || fullVocabSize <= vocabSize) {
+ // Use all terms
+ wordCounts.collect().sortBy(-_._2)
+ } else {
+ // Sort terms to select vocab
+ wordCounts.sortBy(_._2, ascending = false).take(vocabSize)
+ }
+ (tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum)
+ }
+
+ val documents = tokenized.map { case (id, tokens) =>
+ // Filter tokens by vocabulary, and create word count vector representation of document.
+ val wc = new mutable.HashMap[Int, Int]()
+ tokens.foreach { term =>
+ if (vocab.contains(term)) {
+ val termIndex = vocab(term)
+ wc(termIndex) = wc.getOrElse(termIndex, 0) + 1
+ }
+ }
+ val indices = wc.keys.toArray.sorted
+ val values = indices.map(i => wc(i).toDouble)
+
+ val sb = Vectors.sparse(vocab.size, indices, values)
+ (id, sb)
+ }.filter(_._2.asInstanceOf[SparseVector].values.length > 0).cache
+ val corpusSize = documents.count
+ tokenized.unpersist(false)
+
+ val vocabArray = new Array[String](vocab.size)
+ vocab.foreach { case (term, i) => vocabArray(i) = term }
+
+ (documents, corpusSize, vocabArray, selectedTokenCount)
+ }
+}
+
diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala
new file mode 100644
index 0000000..68fd1e3
--- /dev/null
+++ b/src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala
@@ -0,0 +1,25 @@
+package org.apache.spark.mllib.topicModeling
+
+object TopicMetrics {
+
+ def round(d: Double, precis: Int = 6): Double = (d * math.pow(10, precis)).toInt / math.pow(10, precis)
+}
+
+case class TopicMetrics(topicNum: Int, termWeights: Seq[(String, Double)]) {
+
+ import TopicMetrics._
+
+ override def toString: String = s"Topic-$topicNum: ${termWeights.map{ case (t,w) =>(t,round(w))}.mkString("{"," ","}")}"
+ def toCsv = s"${termWeights.map{ case (term,w) => s"$term:${round(w)}"}.mkString(""," ","")}"
+}
+
+case class LDAMetrics(testName: String, nDocs: Int, var logLikelihood: Double, var logPerplexity: Double,
+ topicMetrics: Seq[TopicMetrics] = Seq.empty[TopicMetrics]) {
+
+ import TopicMetrics._
+
+ logLikelihood = round(logLikelihood,6)
+
+ logPerplexity = round(logPerplexity,6)
+
+}