@@ -29,23 +29,22 @@ import scala.collection.mutable
2929import scala .reflect .runtime .universe ._
3030
3131/**
32- * Abstract class for parameter case classes.
33- * This overrides the [[toString ]] method to print all case class fields by name and value.
34- *
35- * @tparam T Concrete parameter class.
36- */
32+ * Abstract class for parameter case classes.
33+ * This overrides the [[toString ]] method to print all case class fields by name and value.
34+ * @tparam T Concrete parameter class.
35+ */
3736abstract class AbstractParams [T : TypeTag ] {
3837
3938 private def tag : TypeTag [T ] = typeTag[T ]
4039
4140 /**
42- * Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
43- * {
44- * [field name]:\t[field value]\n
45- * [field name]:\t[field value]\n
46- * ...
47- * }
48- */
41+ * Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
42+ * {
43+ * [field name]:\t[field value]\n
44+ * [field name]:\t[field value]\n
45+ * ...
46+ * }
47+ */
4948 override def toString : String = {
5049 val tpe = tag.tpe
5150 val allAccessors = tpe.declarations.collect {
@@ -63,44 +62,40 @@ abstract class AbstractParams[T: TypeTag] {
6362}
6463
6564/**
66- * An example Latent Dirichlet Allocation (LDA) app. Run with
67- * {{{
68- * ./bin/run-example mllib.LDAExample [options] <input>
69- * }}}
70- * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
71- */
65+ * An example Latent Dirichlet Allocation (LDA) app. Run with
66+ * {{{
67+ * ./bin/run-example mllib.LDAExample [options] <input>
68+ * }}}
69+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
70+ */
7271object LDAExample {
7372
7473 private case class Params (
75- input : Seq [String ] = Seq .empty,
76- master : String = " local" ,
77- k : Int = 20 ,
78- maxIterations : Int = 10 ,
79- maxInnerIterations : Int = 5 ,
80- docConcentration : Double = 0.01 ,
81- topicConcentration : Double = 0.01 ,
82- vocabSize : Int = 10000 ,
83- stopwordFile : String = " " ,
84- checkpointDir : Option [String ] = None ,
85- checkpointInterval : Int = 10 ,
86- optimizer : String = " online" ,
87- gibbsSampler : String = " alias" ,
88- gibbsAlphaAS : Double = 0.1 ,
89- gibbsPrintPerplexity : Boolean = false ,
90- gibbsEdgePartitioner : String = " none" ,
91- partitions : Int = 2 ,
92- logLevel : String = " info" ,
93- psMasterAddr : String = null
94- ) extends AbstractParams [Params ]
74+ input : Seq [String ] = Seq .empty,
75+ k : Int = 20 ,
76+ maxIterations : Int = 10 ,
77+ maxInnerIterations : Int = 5 ,
78+ docConcentration : Double = 0.01 ,
79+ topicConcentration : Double = 0.01 ,
80+ vocabSize : Int = 10000 ,
81+ stopwordFile : String = " " ,
82+ checkpointDir : Option [String ] = None ,
83+ checkpointInterval : Int = 10 ,
84+ optimizer: String = " em" ,
85+ gibbsSampler: String = " alias" ,
86+ gibbsAlphaAS: Double = 0.1 ,
87+ gibbsPrintPerplexity: Boolean = false ,
88+ gibbsEdgePartitioner: String = " none" ,
89+ partitions: Int = 2 ,
90+ logLevel: String = " info" ,
91+ psMasterAddr: String = null
92+ ) extends AbstractParams [Params ]
9593
9694 def main (args : Array [String ]) {
9795 val defaultParams = Params ()
9896
9997 val parser = new OptionParser [Params ](" LDAExample" ) {
10098 head(" LDAExample: an example LDA app for plain text data." )
101- opt[String ](" master" )
102- .text(s " spark master. default: ${defaultParams.master}" )
103- .action((x, c) => c.copy(master = x))
10499 opt[Int ](" k" )
105100 .text(s " number of topics. default: ${defaultParams.k}" )
106101 .action((x, c) => c.copy(k = x))
@@ -112,28 +107,28 @@ object LDAExample {
112107 .action((x, c) => c.copy(maxInnerIterations = x))
113108 opt[Double ](" docConcentration" )
114109 .text(s " amount of topic smoothing to use (> 1.0) (-1=auto). " +
115- s " default: ${defaultParams.docConcentration}" )
110+ s " default: ${defaultParams.docConcentration}" )
116111 .action((x, c) => c.copy(docConcentration = x))
117112 opt[Double ](" topicConcentration" )
118113 .text(s " amount of term (word) smoothing to use (> 1.0) (-1=auto). " +
119- s " default: ${defaultParams.topicConcentration}" )
114+ s " default: ${defaultParams.topicConcentration}" )
120115 .action((x, c) => c.copy(topicConcentration = x))
121116 opt[Int ](" vocabSize" )
122117 .text(s " number of distinct word types to use, chosen by frequency. (-1=all) " +
123- s " default: ${defaultParams.vocabSize}" )
118+ s " default: ${defaultParams.vocabSize}" )
124119 .action((x, c) => c.copy(vocabSize = x))
125120 opt[String ](" stopwordFile" )
126121 .text(s " filepath for a list of stopwords. Note: This must fit on a single machine. " +
127- s " default: ${defaultParams.stopwordFile}" )
122+ s " default: ${defaultParams.stopwordFile}" )
128123 .action((x, c) => c.copy(stopwordFile = x))
129124 opt[String ](" checkpointDir" )
130125 .text(s " Directory for checkpointing intermediate results. " +
131- s " Checkpointing helps with recovery and eliminates temporary shuffle files on disk. " +
132- s " default: ${defaultParams.checkpointDir}" )
126+ s " Checkpointing helps with recovery and eliminates temporary shuffle files on disk. " +
127+ s " default: ${defaultParams.checkpointDir}" )
133128 .action((x, c) => c.copy(checkpointDir = Some (x)))
134129 opt[Int ](" checkpointInterval" )
135130 .text(s " Iterations between each checkpoint. Only used if checkpointDir is set. " +
136- s " default: ${defaultParams.checkpointInterval}" )
131+ s " default: ${defaultParams.checkpointInterval}" )
137132 .action((x, c) => c.copy(checkpointInterval = x))
138133 opt[String ](" optimizer" )
139134 .text(s " available optimizer are online and gibbs, default: ${defaultParams.optimizer}" )
@@ -161,7 +156,7 @@ object LDAExample {
161156 .action((x, c) => c.copy(psMasterAddr = x))
162157 arg[String ](" <input>..." )
163158 .text(" input paths (directories) to plain text corpora." +
164- " Each text file line should hold 1 document." )
159+ " Each text file line should hold 1 document." )
165160 .unbounded()
166161 .required()
167162 .action((x, c) => c.copy(input = c.input :+ x))
@@ -175,11 +170,11 @@ object LDAExample {
175170 }
176171 }
177172
178- // private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = {
179- private def createOptimizer (params : Params ): LDAOptimizer = {
173+ // private def createOptimizer(params: Params, lineRdd: RDD[Int], columnRdd: RDD[Int]):LDAOptimizer = {
174+ private def createOptimizer (params : Params ): LDAOptimizer = {
180175 params.optimizer match {
181176 case " online" => val optimizer = new OnlineLDAOptimizer
182- optimizer
177+ optimizer
183178 case " gibbs" =>
184179 val optimizer = new GibbsLDAOptimizer
185180 optimizer.setSampler(params.gibbsSampler)
@@ -188,18 +183,16 @@ object LDAExample {
188183 optimizer.setAlphaAS(params.gibbsAlphaAS.toFloat)
189184 optimizer
190185 case _ =>
191- throw new IllegalArgumentException (s " available optimizers are online and gibbs, but got ${params.optimizer}" )
186+ throw new IllegalArgumentException (s " available optimizers are em, online and gibbs, but got ${params.optimizer}" )
192187 }
193188 }
194189
195190 /**
196- * run LDA
197- *
198- * @param params
199- */
191+ * run LDA
192+ * @param params
193+ */
200194 private def run (params : Params ) {
201195 val conf = new SparkConf ().setAppName(s " LDAExample with $params" )
202- .setMaster(params.master)
203196 val sc = new SparkContext (conf)
204197
205198 val logLevel = Level .toLevel(params.logLevel, Level .INFO )
@@ -258,16 +251,15 @@ object LDAExample {
258251 }
259252
260253 /**
261- * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
262- *
263- * @return (corpus, vocabulary as array, total token count in corpus)
264- */
254+ * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
255+ * @return (corpus, vocabulary as array, total token count in corpus)
256+ */
265257 private def preprocess (
266- sc : SparkContext ,
267- paths : Seq [String ],
268- vocabSize : Int ,
269- partitions : Int ,
270- stopwordFile : String ): (RDD [(Long , Vector )], Long , Array [String ], Long ) = {
258+ sc : SparkContext ,
259+ paths : Seq [String ],
260+ vocabSize : Int ,
261+ partitions: Int ,
262+ stopwordFile : String ): (RDD [(Long , Vector )], Long , Array [String ], Long ) = {
271263
272264 // Get dataset of document texts
273265 // One document per line in each text file. If the input consists of many small files,
@@ -328,24 +320,21 @@ object LDAExample {
328320}
329321
330322/**
331- * Simple Tokenizer.
332- *
333- * TODO: Formalize the interface, and make this a public class in mllib.feature
334- */
323+ * Simple Tokenizer.
324+ *
325+ * TODO: Formalize the interface, and make this a public class in mllib.feature
326+ */
335327private class SimpleTokenizer (sc : SparkContext , stopwordFile : String ) extends Serializable {
336328
337- private val stopwords : Set [String ] = Set (" article" , " write" , " writes" , " entry" , " date" , " udel" , " said" ,
338- " tell" , " think" , " know" , " just" , " newsgroup" , " line" , " like" , " does" , " going" , " make" , " thanks" ," year" ," years" ) ++
339- (if (stopwordFile.isEmpty) {
340- Set .empty[String ]
341- } else {
342- val stopwordText = sc.textFile(stopwordFile).collect()
343- stopwordText.flatMap(_.stripMargin.split(" \\ s+" )).toSet
344- })
345-
346- // Matches sequences of Unicode alnum letters
347- // private val allWordRegex = "^(\\p{L}*)$".r
348- private val allWordRegex = " (\\ b[^\\ s]+\\ b)" .r
329+ private val stopwords : Set [String ] = if (stopwordFile.isEmpty) {
330+ Set .empty[String ]
331+ } else {
332+ val stopwordText = sc.textFile(stopwordFile).collect()
333+ stopwordText.flatMap(_.stripMargin.split(" \\ s+" )).toSet
334+ }
335+
336+ // Matches sequences of Unicode letters
337+ private val allWordRegex = " ^(\\ p{L}*)$" .r
349338
350339 // Ignore words shorter than this length.
351340 private val minWordLength = 3
0 commit comments