From 3bbce8881e2a6ff7e7fff62cc1fc8cfaf1ce0334 Mon Sep 17 00:00:00 2001 From: sboesch Date: Thu, 21 Dec 2017 19:31:42 -0800 Subject: [PATCH] OnlineHDPExample is running. The statistics generated are suspect. --- pom.xml | 43 +++ src/main/resources/log4j.properties | 20 ++ .../mllib/topicModeling/LDAExample.scala | 155 +++++------ .../spark/mllib/topicModeling/OnlineHDP.scala | 83 +++--- .../topicModeling/OnlineHDPExample.scala | 256 ++++++++++++++++++ .../mllib/topicModeling/TopicMetrics.scala | 25 ++ 6 files changed, 458 insertions(+), 124 deletions(-) create mode 100644 src/main/resources/log4j.properties create mode 100644 src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala create mode 100644 src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala diff --git a/pom.xml b/pom.xml index 2c125a2..1b7aa3e 100644 --- a/pom.xml +++ b/pom.xml @@ -125,6 +125,49 @@ maven-compiler-plugin 3.3 + + org.apache.maven.plugins + maven-shade-plugin + 2.4 + + + package + + shade + + + + + + org.openchai.tensorflow.TfServer + 100.0 + + + + + + + + + + + *.* + + + log4j:log4j:jar: + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/ECLIPSE* + META-INF/license/* + + + + false + + + + diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..e329512 --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,20 @@ +# Set everything to be logged to the file bagel/target/unit-tests.log +log4j.rootCategory=WARN, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +# Settings to quiet third party logs that are too verbose +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR +log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO +log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO +log4j.logger.org.apache.spark=WARN +log4j.logger.parquet=ERROR +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.spark=WARN +log4j.logger.org.apache.spark.sql.execution.datasources.parquet=ERROR +log4j.logger.org.apache.spark.sql.execution.datasources.FileScanRDD=ERROR +log4j.logger.org.apache.hadoop.io.compress.CodecPool=ERROR diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala index cf92e05..b71d4fe 100644 --- a/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala +++ b/src/main/scala/org/apache/spark/mllib/topicModeling/LDAExample.scala @@ -29,23 +29,22 @@ import scala.collection.mutable import scala.reflect.runtime.universe._ /** - * Abstract class for parameter case classes. - * This overrides the [[toString]] method to print all case class fields by name and value. - * - * @tparam T Concrete parameter class. - */ + * Abstract class for parameter case classes. + * This overrides the [[toString]] method to print all case class fields by name and value. + * @tparam T Concrete parameter class. + */ abstract class AbstractParams[T: TypeTag] { private def tag: TypeTag[T] = typeTag[T] /** - * Finds all case class fields in concrete class instance, and outputs them in JSON-style format: - * { - * [field name]:\t[field value]\n - * [field name]:\t[field value]\n - * ... - * } - */ + * Finds all case class fields in concrete class instance, and outputs them in JSON-style format: + * { + * [field name]:\t[field value]\n + * [field name]:\t[field value]\n + * ... + * } + */ override def toString: String = { val tpe = tag.tpe val allAccessors = tpe.declarations.collect { @@ -63,44 +62,40 @@ abstract class AbstractParams[T: TypeTag] { } /** - * An example Latent Dirichlet Allocation (LDA) app. Run with - * {{{ - * ./bin/run-example mllib.LDAExample [options] - * }}} - * If you use it as a template to create your own app, please use `spark-submit` to submit your app. - */ + * An example Latent Dirichlet Allocation (LDA) app. Run with + * {{{ + * ./bin/run-example mllib.LDAExample [options] + * }}} + * If you use it as a template to create your own app, please use `spark-submit` to submit your app. + */ object LDAExample { private case class Params( - input: Seq[String] = Seq.empty, - master: String = "local", - k: Int = 20, - maxIterations: Int = 10, - maxInnerIterations: Int = 5, - docConcentration: Double = 0.01, - topicConcentration: Double = 0.01, - vocabSize: Int = 10000, - stopwordFile: String = "", - checkpointDir: Option[String] = None, - checkpointInterval: Int = 10, - optimizer: String = "online", - gibbsSampler: String = "alias", - gibbsAlphaAS: Double = 0.1, - gibbsPrintPerplexity: Boolean = false, - gibbsEdgePartitioner: String = "none", - partitions: Int = 2, - logLevel: String = "info", - psMasterAddr: String = null - ) extends AbstractParams[Params] + input: Seq[String] = Seq.empty, + k: Int = 20, + maxIterations: Int = 10, + maxInnerIterations: Int = 5, + docConcentration: Double = 0.01, + topicConcentration: Double = 0.01, + vocabSize: Int = 10000, + stopwordFile: String = "", + checkpointDir: Option[String] = None, + checkpointInterval: Int = 10, + optimizer:String = "em", + gibbsSampler:String = "alias", + gibbsAlphaAS:Double = 0.1, + gibbsPrintPerplexity:Boolean = false, + gibbsEdgePartitioner:String = "none", + partitions:Int = 2, + logLevel:String = "info", + psMasterAddr:String = null + ) extends AbstractParams[Params] def main(args: Array[String]) { val defaultParams = Params() val parser = new OptionParser[Params]("LDAExample") { head("LDAExample: an example LDA app for plain text data.") - opt[String]("master") - .text(s"spark master. default: ${defaultParams.master}") - .action((x, c) => c.copy(master = x)) opt[Int]("k") .text(s"number of topics. default: ${defaultParams.k}") .action((x, c) => c.copy(k = x)) @@ -112,28 +107,28 @@ object LDAExample { .action((x, c) => c.copy(maxInnerIterations = x)) opt[Double]("docConcentration") .text(s"amount of topic smoothing to use (> 1.0) (-1=auto)." + - s" default: ${defaultParams.docConcentration}") + s" default: ${defaultParams.docConcentration}") .action((x, c) => c.copy(docConcentration = x)) opt[Double]("topicConcentration") .text(s"amount of term (word) smoothing to use (> 1.0) (-1=auto)." + - s" default: ${defaultParams.topicConcentration}") + s" default: ${defaultParams.topicConcentration}") .action((x, c) => c.copy(topicConcentration = x)) opt[Int]("vocabSize") .text(s"number of distinct word types to use, chosen by frequency. (-1=all)" + - s" default: ${defaultParams.vocabSize}") + s" default: ${defaultParams.vocabSize}") .action((x, c) => c.copy(vocabSize = x)) opt[String]("stopwordFile") .text(s"filepath for a list of stopwords. Note: This must fit on a single machine." + - s" default: ${defaultParams.stopwordFile}") + s" default: ${defaultParams.stopwordFile}") .action((x, c) => c.copy(stopwordFile = x)) opt[String]("checkpointDir") .text(s"Directory for checkpointing intermediate results." + - s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." + - s" default: ${defaultParams.checkpointDir}") + s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." + + s" default: ${defaultParams.checkpointDir}") .action((x, c) => c.copy(checkpointDir = Some(x))) opt[Int]("checkpointInterval") .text(s"Iterations between each checkpoint. Only used if checkpointDir is set." + - s" default: ${defaultParams.checkpointInterval}") + s" default: ${defaultParams.checkpointInterval}") .action((x, c) => c.copy(checkpointInterval = x)) opt[String]("optimizer") .text(s"available optimizer are online and gibbs, default: ${defaultParams.optimizer}") @@ -161,7 +156,7 @@ object LDAExample { .action((x, c) => c.copy(psMasterAddr = x)) arg[String]("...") .text("input paths (directories) to plain text corpora." + - " Each text file line should hold 1 document.") + " Each text file line should hold 1 document.") .unbounded() .required() .action((x, c) => c.copy(input = c.input :+ x)) @@ -175,11 +170,11 @@ object LDAExample { } } - // private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = { - private def createOptimizer(params: Params): LDAOptimizer = { +// private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = { + private def createOptimizer(params: Params):LDAOptimizer = { params.optimizer match { case "online" => val optimizer = new OnlineLDAOptimizer - optimizer + optimizer case "gibbs" => val optimizer = new GibbsLDAOptimizer optimizer.setSampler(params.gibbsSampler) @@ -188,18 +183,16 @@ object LDAExample { optimizer.setAlphaAS(params.gibbsAlphaAS.toFloat) optimizer case _ => - throw new IllegalArgumentException(s"available optimizers are online and gibbs, but got ${params.optimizer}") + throw new IllegalArgumentException(s"available optimizers are em, online and gibbs, but got ${params.optimizer}") } } /** - * run LDA - * - * @param params - */ + * run LDA + * @param params + */ private def run(params: Params) { val conf = new SparkConf().setAppName(s"LDAExample with $params") - .setMaster(params.master) val sc = new SparkContext(conf) val logLevel = Level.toLevel(params.logLevel, Level.INFO) @@ -258,16 +251,15 @@ object LDAExample { } /** - * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors. - * - * @return (corpus, vocabulary as array, total token count in corpus) - */ + * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors. + * @return (corpus, vocabulary as array, total token count in corpus) + */ private def preprocess( - sc: SparkContext, - paths: Seq[String], - vocabSize: Int, - partitions: Int, - stopwordFile: String): (RDD[(Long, Vector)], Long, Array[String], Long) = { + sc: SparkContext, + paths: Seq[String], + vocabSize: Int, + partitions:Int, + stopwordFile: String): (RDD[(Long, Vector)], Long, Array[String], Long) = { // Get dataset of document texts // One document per line in each text file. If the input consists of many small files, @@ -328,24 +320,21 @@ object LDAExample { } /** - * Simple Tokenizer. - * - * TODO: Formalize the interface, and make this a public class in mllib.feature - */ + * Simple Tokenizer. + * + * TODO: Formalize the interface, and make this a public class in mllib.feature + */ private class SimpleTokenizer(sc: SparkContext, stopwordFile: String) extends Serializable { - private val stopwords: Set[String] = Set("article", "write", "writes", "entry", "date", "udel", "said", - "tell", "think", "know", "just", "newsgroup", "line", "like", "does", "going", "make", "thanks","year","years") ++ - (if (stopwordFile.isEmpty) { - Set.empty[String] - } else { - val stopwordText = sc.textFile(stopwordFile).collect() - stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet - }) - - // Matches sequences of Unicode alnum letters - // private val allWordRegex = "^(\\p{L}*)$".r - private val allWordRegex = "(\\b[^\\s]+\\b)".r + private val stopwords: Set[String] = if (stopwordFile.isEmpty) { + Set.empty[String] + } else { + val stopwordText = sc.textFile(stopwordFile).collect() + stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet + } + + // Matches sequences of Unicode letters + private val allWordRegex = "^(\\p{L}*)$".r // Ignore words shorter than this length. private val minWordLength = 3 diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala index 54cb1ea..adadce9 100644 --- a/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala +++ b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala @@ -3,6 +3,7 @@ package org.apache.spark.mllib.topicModeling import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Vector => BV, _} import breeze.numerics.{abs, digamma, exp, _} import org.apache.spark.mllib.linalg.{SparseVector, Vector} +import org.apache.spark.mllib.topicModeling.OnlineHDPExample.OnlineHDPParams import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer @@ -20,7 +21,7 @@ class SuffStats( } } -object OnlineHDPOptimizer extends Serializable { +object OnlineHDP extends Serializable { val rhot_bound = 0.0 def log_normalize(v: BDV[Double]): (BDV[Double], Double) = { @@ -82,21 +83,20 @@ object OnlineHDPOptimizer extends Serializable { * Implemented based on the paper "Online Variational Inference for the Hierarchical Dirichlet Process" (Chong Wang, John Paisley and David M. Blei) */ -class OnlineHDPOptimizer( - val corpus: RDD[(Long, Vector)], - val chunkSize: Int = 256, - val m_kappa: Double = 1.0, - var m_tau: Double = 64.0, - val m_K: Int = 15, - val m_T: Int = 150, - val m_alpha: Double = 1, - val m_gamma: Double = 1, - val m_eta: Double = 0.01, - val m_scale: Double = 1.0, - val m_var_converge: Double = 0.0001, - val iterations: Int = 10 - ) extends Serializable { - +class OnlineHDP(corpus: RDD[(Long,Vector)], hdpParams: OnlineHDPParams) extends Serializable { + + val op = hdpParams + val chunkSize: Int = op.chunkSize + val m_kappa: Double = op.kappa + val tau: Double = op.tau + val m_K: Int = op.k + val m_T: Int = op.vocabSize + val m_alpha: Double = op.alpha + val m_gamma: Double = op.gamma + val m_eta: Double = op.eta + val m_scale: Double = op.scale + val m_var_converge: Double = op.var_converge + val iterations: Int = op.maxIterations val lda_alpha: Double = 1D val lda_beta: Double = 1D @@ -109,12 +109,12 @@ class OnlineHDPOptimizer( var m_varphi_ss: BDV[Double] = BDV.zeros[Double](m_T) // T // T * W - val m_lambda: BDM[Double] = BDM.rand(m_T, m_W) * (m_D.toDouble) * 100.0 / (m_T * m_W).toDouble - m_eta + val m_lambda: BDM[Double] = BDM.rand(m_T, m_W) * (m_D * 100.0 / (m_T * m_W).toDouble - m_eta) // T * W - val m_Elogbeta = OnlineHDPOptimizer.dirichletExpectation(m_lambda + m_eta) + val m_Elogbeta = OnlineHDP.dirichletExpectation(m_lambda + m_eta) - m_tau = m_tau + 1 + var m_tau = tau + 1 var m_updatect = 0 var m_status_up_to_date = true @@ -124,16 +124,15 @@ class OnlineHDPOptimizer( val rhot_bound = 0.0 - - def update(docs: RDD[(Long, Vector)]): Unit = { - for (i <- 1 to iterations) { + def update(docs: RDD[(Long, Vector)]): Seq[LDAMetrics] = { + for (i <- 1 to iterations) yield { val chunk = docs update_chunk(chunk) } } - def update_chunk(chunk: RDD[(Long, Vector)], update: Boolean = true): (Double, Int) = { + def update_chunk(chunk: RDD[(Long, Vector)], update: Boolean = true): LDAMetrics = { // Find the unique words in this chunk... val unique_words = scala.collection.mutable.Map[Int, Int]() val raw_word_list = ArrayBuffer[Int]() @@ -174,7 +173,7 @@ class OnlineHDPOptimizer( val ss = new SuffStats(m_T, Wt, chunk.count().toInt) - val Elogsticks_1st: BDV[Double] = OnlineHDPOptimizer.expect_log_sticks(m_var_sticks) // global sticks + val Elogsticks_1st: BDV[Double] = OnlineHDP.expect_log_sticks(m_var_sticks) // global sticks // run variational inference on some new docs var score = 0.0 @@ -186,7 +185,7 @@ class OnlineHDPOptimizer( val dict = unique_words.toMap val wl = doc_word_ids.toList - val doc_score = doc_e_step(doc, ss, Elogsticks_1st, + val doc_score = doc_e_step(doc, hdpParams, ss, Elogsticks_1st, word_list, dict, wl, new BDV[Double](doc_word_counts), m_var_converge) count += sum(doc_word_counts) @@ -197,7 +196,8 @@ class OnlineHDPOptimizer( update_lambda(ss, word_list) } - (score, count.toInt) + val perplexity = math.exp(-1.0 * (score / count)) + LDAMetrics("OnlineHDP", count.toInt, score, perplexity) } @@ -223,13 +223,14 @@ class OnlineHDPOptimizer( // update top level sticks // 2 * T - 1 m_var_sticks(0, ::) := (m_varphi_ss(0 to m_T - 2) + 1.0).t - val var_phi_sum = flipud(m_varphi_ss(1 to m_varphi_ss.length - 1)) // T - 1 + val var_phi_sum = flipud(m_varphi_ss(1 until m_varphi_ss.length)) // T - 1 m_var_sticks(1, ::) := (flipud(accumulate(var_phi_sum)) + m_gamma).t } def doc_e_step(doc: (Long, Vector), + hdpParams: OnlineHDPParams, ss: SuffStats, Elogsticks_1st: BDV[Double], word_list: List[Int], @@ -246,7 +247,7 @@ class OnlineHDPOptimizer( v(0, ::) := 1.0 v(1, ::) := m_alpha - var Elogsticks_2nd = OnlineHDPOptimizer.expect_log_sticks(v) + var Elogsticks_2nd = OnlineHDP.expect_log_sticks(v) // back to the uniform var phi: BDM[Double] = BDM.ones[Double](doc_word_ids.size, m_K) * 1.0 / m_K.toDouble // Wt * K @@ -257,28 +258,27 @@ class OnlineHDPOptimizer( val eps = 1e-100 var iter = 0 - val max_iter = 100 var var_phi_out: BDM[Double] = BDM.ones[Double](1, 1) // not yet support second level optimization yet, to be done in the future - while (iter < max_iter && (converge < 0.0 || converge > var_converge)) { + while (iter < hdpParams.maxIterations && (converge < 0.0 || converge > var_converge)) { // var_phi val (log_var_phi: BDM[Double], var_phi: BDM[Double]) = if (iter < 3) { val element = Elogbeta_doc.copy // T * Wt - for (i <- 0 to element.rows - 1) { + for (i <- 0 until element.rows) { element(i, ::) := (element(i, ::).t :* doc_word_counts).t } var var_phi: BDM[Double] = phi.t * element.t // K * Wt * Wt * T => K * T - val (log_var_phi, log_norm) = OnlineHDPOptimizer.log_normalize(var_phi) + val (log_var_phi, log_norm) = OnlineHDP.log_normalize(var_phi) var_phi = exp(log_var_phi) (log_var_phi, var_phi) } else { val element = Elogbeta_doc.copy - for (i <- 0 to element.rows - 1) { + for (i <- 0 until element.rows) { element(i, ::) := (element(i, ::).t :* doc_word_counts).t } val product: BDM[Double] = phi.t * element.t @@ -287,7 +287,7 @@ class OnlineHDPOptimizer( } var var_phi: BDM[Double] = product - val (log_var_phi, log_norm) = OnlineHDPOptimizer.log_normalize(var_phi) + val (log_var_phi, log_norm) = OnlineHDP.log_normalize(var_phi) var_phi = exp(log_var_phi) (log_var_phi, var_phi) } @@ -296,7 +296,7 @@ class OnlineHDPOptimizer( // phi if (iter < 3) { phi = (var_phi * Elogbeta_doc).t - val (log_phi, log_norm) = OnlineHDPOptimizer.log_normalize(phi) + val (log_phi, log_norm) = OnlineHDP.log_normalize(phi) phi = exp(log_phi) (log_phi, log_norm) } @@ -307,7 +307,7 @@ class OnlineHDPOptimizer( product(i, ::) := (product(i, ::).t + Elogsticks_2nd).t } phi = product - val (log_phi, log_norm) = OnlineHDPOptimizer.log_normalize(phi) + val (log_phi, log_norm) = OnlineHDP.log_normalize(phi) phi = exp(log_phi) (log_phi, log_norm) } @@ -324,14 +324,14 @@ class OnlineHDPOptimizer( val t_sum = sum(selected(::, *)).inner // toDenseVector val phi_cum = flipud(t_sum) v(1, ::) := (flipud(accumulate(phi_cum)) + m_alpha).t - Elogsticks_2nd = OnlineHDPOptimizer.expect_log_sticks(v) + Elogsticks_2nd = OnlineHDP.expect_log_sticks(v) likelihood = 0.0 // compute likelihood // var_phi part/ C in john's notation val diff = log_var_phi.copy - for (i <- 0 to diff.rows - 1) { + for (i <- 0 until diff.rows) { diff(i, ::) := (Elogsticks_1st :- diff(i, ::).t).t } @@ -340,7 +340,7 @@ class OnlineHDPOptimizer( // v part/ v in john's notation, john's beta is alpha here val log_alpha = log(m_alpha) likelihood += (m_K - 1) * log_alpha - val dig_sum = (digamma(sum(v(::, *)))).inner // .toDenseVector + val dig_sum = digamma(sum(v(::, *))).inner // .toDenseVector val vCopy = v.copy for (i <- 0 until v.cols) { vCopy(::, i) := BDV[Double](1.0, m_alpha) - vCopy(::, i) @@ -368,6 +368,7 @@ class OnlineHDPOptimizer( } likelihood += sum(phi.t :* (var_phi * Elogbeta_docCopy)) + println(s"Likelihood = $likelihood") val converge = (likelihood - old_likelihood) / abs(old_likelihood) old_likelihood = likelihood @@ -382,7 +383,7 @@ class OnlineHDPOptimizer( // update the suff_stat ss // this time it only contains information from one doc val sumPhiOut = sum(var_phi_out(::, *)) - ss.m_var_sticks_ss += sumPhiOut.inner // .toDenseVector + ss.m_var_sticks_ss += sumPhiOut.inner val phiCopy = phi.copy.t for (i <- 0 until phi.rows) { @@ -394,7 +395,7 @@ class OnlineHDPOptimizer( ss.m_var_beta_ss(::, chunkids(i)) := ss.m_var_beta_ss(::, chunkids(i)) + middleResult(::, i) } - return likelihood + likelihood } } \ No newline at end of file diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala new file mode 100644 index 0000000..dea0896 --- /dev/null +++ b/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDPExample.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.topicModeling + +import java.text.BreakIterator + +import org.apache.log4j.{Level, Logger} +import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.{SparkConf, SparkContext} +import scopt.OptionParser + +import scala.collection.mutable +import scala.reflect.runtime.universe._ + + +/** + * An example Online Hierarchical Dirichlet Process (HDP) app. Run with + * {{{ + * ./bin/run-example mllib.OnlineHDPExample [options] + * }}} + */ +object OnlineHDPExample { + + case class OnlineHDPParams( + input: Seq[String] = Seq.empty, + master: String = "local", + k: Int = 20, + vocabSize: Int = 10000, + chunkSize: Int = 256, + kappa: Double = 1.0, + tau: Double = 64.0, + alpha: Double = 1, + gamma: Double = 1, + eta: Double = 0.01, + scale: Double = 1.0, + var_converge: Double = 0.0001, + maxIterations: Int = 10, + checkpointDir: Option[String] = None, + checkpointInterval: Int = 10, + maxDocs: Int = -1, + partitions: Int = 2, + stopwordFile: String = "", + logLevel: String = "info" + ) extends AbstractParams[OnlineHDPParams] { + + } + + def main(args: Array[String]) = { + parseAndRun(args) + } + + def parseAndRun(args: Array[String]) = { + + val parser = getParser(args) + val results = parser.parse(args, OnlineHDPParams()).map { params=> + run(params) + }.getOrElse { + parser.showUsageAsError + sys.exit(1) + } + results + } + + def getParser(args: Array[String]) = { + val defaultParams = OnlineHDPParams() + + val parser = new OptionParser[OnlineHDPParams]("LDAExample") { + head("LDAExample: an example LDA app for plain text data.") + opt[String]("master") + .text(s"spark master. default: ${defaultParams.master}") + .action((x, c) => c.copy(master = x)) + opt[Int]("k") + .text(s"number of topics. default: ${defaultParams.k}") + .action((x, c) => c.copy(k = x)) + opt[Int]("maxDocs") + .text(s"max number of documents: will truncate the corpus if needbe. default: ${defaultParams.maxDocs}") + .action((x, c) => c.copy(maxDocs = x)) + opt[Int]("maxIterations") + .text(s"number of iterations of learning. default: ${defaultParams.maxIterations}") + .action((x, c) => c.copy(maxIterations = x)) + opt[Int]("vocabSize") + .text(s"number of distinct word types to use, chosen by frequency. (-1=all)" + + s" default: ${defaultParams.vocabSize}") + .action((x, c) => c.copy(vocabSize = x)) + opt[String]("stopwordFile") + .text(s"filepath for a list of stopwords. Note: This must fit on a single machine." + + s" default: ${defaultParams.stopwordFile}") + .action((x, c) => c.copy(stopwordFile = x)) + opt[String]("checkpointDir") + .text(s"Directory for checkpointing intermediate results." + + s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." + + s" default: ${defaultParams.checkpointDir}") + .action((x, c) => c.copy(checkpointDir = Some(x))) + opt[Int]("checkpointInterval") + .text(s"Iterations between each checkpoint. Only used if checkpointDir is set." + + s" default: ${defaultParams.checkpointInterval}") + .action((x, c) => c.copy(checkpointInterval = x)) + opt[Int]("partitions") + .text(s"Minimum edge partitions, default: ${defaultParams.partitions}") + .action((x, c) => c.copy(partitions = x)) + opt[String]("logLevel") + .text(s"Log level, default: ${defaultParams.logLevel}") + .action((x, c) => c.copy(logLevel = x)) + arg[String]("...") + .text("input paths (directories) to plain text corpora." + + " Each text file line should hold 1 document.") + .unbounded() + .required() + .action((x, c) => c.copy(input = c.input :+ x)) + } + parser + } + + def run(params: OnlineHDPParams) { + + val conf = new SparkConf().setAppName(s"LDAExample with $params") + .setMaster(params.master) + val sc = new SparkContext(conf) + + val logLevel = Level.toLevel(params.logLevel, Level.INFO) + Logger.getRootLogger.setLevel(logLevel) + println(s"Setting log level to $logLevel") + + // Load documents, and prepare them for LDA. + val preprocessStart = System.nanoTime() + val (corpus, actualCorpusSize, vocabArray, actualNumTokens) = + preprocess(sc, params.input, params.vocabSize, params.partitions, params.stopwordFile, params.maxDocs) + val actualVocabSize = vocabArray.size + val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9 + + println() + println(s"Corpus summary:") + println(s"\t Training set size: $actualCorpusSize documents") + println(s"\t Vocabulary size: $actualVocabSize terms") + println(s"\t Training set size: $actualNumTokens tokens") + println(s"\t Preprocessing time: $preprocessElapsed sec") + println() + + + if (params.checkpointDir.nonEmpty) { + sc.setCheckpointDir(params.checkpointDir.get) + } + + val startTime = System.nanoTime() + val lda = new OnlineHDP(corpus, params) + val results = lda.update(corpus) // Run one honkin big chunk for this example + val elapsed = (System.nanoTime() - startTime) / 1e9 + + println(s"Finished training ${getClass.getSimpleName}") + println(s"Results\n${results.mkString(",")}") + sc.stop() + results + } + + /** + * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors. + * + * @return (corpus, vocabulary as array, total token count in corpus) + */ + private def preprocess( + sc: SparkContext, + paths: Seq[String], + vocabSize: Int, + partitions: Int, + stopwordFile: String, + maxDocs: Int): (RDD[(Long, Vector)], Long, Array[String], Long) = { + + // Get dataset of document texts + // One document per line in each text file. If the input consists of many small files, + // this can result in a large number of small partitions, which can degrade performance. + // In this case, consider using coalesce() to create fewer, larger partitions. + println(s"Loading corpus from ${paths.mkString(",")} ..") + val textRDD1: RDD[String] = sc.textFile(paths.mkString(","), Math.max(1, partitions)) + + val textRDD2 = if (maxDocs == -1) { + println("Processing all documents in the corpus ..") + textRDD1 + } else { + val nDocs = textRDD1.count + if (nDocs > maxDocs) { + val sampleFrac = maxDocs.toDouble / nDocs + println(s"Processing only $maxDocs (out of $nDocs) - ${((sampleFrac*1000).toInt).toDouble/10.0}% ..") + textRDD1.sample(withReplacement = false, sampleFrac, 1) + } else { + textRDD1 + } + } + val textRDD = textRDD2.coalesce(partitions) + + // Split text into words + val tokenizer = new SimpleTokenizer(sc, stopwordFile) + val tokenized: RDD[(Long, IndexedSeq[String])] = textRDD.zipWithIndex().map { case (text, id) => + id -> tokenizer.getWords(text) + } + tokenized.cache() + + // Counts words: RDD[(word, wordCount)] + val wordCounts: RDD[(String, Long)] = tokenized + .flatMap { case (_, tokens) => tokens.map(_ -> 1L) } + .reduceByKey(_ + _) + wordCounts.cache() + val fullVocabSize = wordCounts.count() + // Select vocab + // (vocab: Map[word -> id], total tokens after selecting vocab) + val (vocab: Map[String, Int], selectedTokenCount: Long) = { + val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 || fullVocabSize <= vocabSize) { + // Use all terms + wordCounts.collect().sortBy(-_._2) + } else { + // Sort terms to select vocab + wordCounts.sortBy(_._2, ascending = false).take(vocabSize) + } + (tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum) + } + + val documents = tokenized.map { case (id, tokens) => + // Filter tokens by vocabulary, and create word count vector representation of document. + val wc = new mutable.HashMap[Int, Int]() + tokens.foreach { term => + if (vocab.contains(term)) { + val termIndex = vocab(term) + wc(termIndex) = wc.getOrElse(termIndex, 0) + 1 + } + } + val indices = wc.keys.toArray.sorted + val values = indices.map(i => wc(i).toDouble) + + val sb = Vectors.sparse(vocab.size, indices, values) + (id, sb) + }.filter(_._2.asInstanceOf[SparseVector].values.length > 0).cache + val corpusSize = documents.count + tokenized.unpersist(false) + + val vocabArray = new Array[String](vocab.size) + vocab.foreach { case (term, i) => vocabArray(i) = term } + + (documents, corpusSize, vocabArray, selectedTokenCount) + } +} + diff --git a/src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala b/src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala new file mode 100644 index 0000000..68fd1e3 --- /dev/null +++ b/src/main/scala/org/apache/spark/mllib/topicModeling/TopicMetrics.scala @@ -0,0 +1,25 @@ +package org.apache.spark.mllib.topicModeling + +object TopicMetrics { + + def round(d: Double, precis: Int = 6): Double = (d * math.pow(10, precis)).toInt / math.pow(10, precis) +} + +case class TopicMetrics(topicNum: Int, termWeights: Seq[(String, Double)]) { + + import TopicMetrics._ + + override def toString: String = s"Topic-$topicNum: ${termWeights.map{ case (t,w) =>(t,round(w))}.mkString("{"," ","}")}" + def toCsv = s"${termWeights.map{ case (term,w) => s"$term:${round(w)}"}.mkString(""," ","")}" +} + +case class LDAMetrics(testName: String, nDocs: Int, var logLikelihood: Double, var logPerplexity: Double, + topicMetrics: Seq[TopicMetrics] = Seq.empty[TopicMetrics]) { + + import TopicMetrics._ + + logLikelihood = round(logLikelihood,6) + + logPerplexity = round(logPerplexity,6) + +}