Skip to content

Commit

Permalink
removed unnecessary code and dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
nestorSag committed Jul 25, 2018
1 parent 9abbbf9 commit bba4eff
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 136 deletions.
6 changes: 3 additions & 3 deletions src/main/scala/models/mixture/GradientGaussianMixture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.github.gradientgmm.models
import com.github.gradientgmm.components.{UpdatableGaussianComponent, UpdatableWeights, Utils}
import com.github.gradientgmm.optim.algorithms.{Optimizable, Optimizer, GradientAscent}

import breeze.linalg.{diag, eigSym, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, trace, sum}
import breeze.linalg.{diag, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, trace, sum}
import breeze.numerics.sqrt

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -215,7 +215,7 @@ class GradientGaussianMixture private[models] (
* Create a StreamingGaussianMixture object using the model's current state
*/
def streamingModel: StreamingGaussianMixture = {
def asStreamingModel: StreamingGaussianMixture = {
StreamingGaussianMixture(weights.weights,gaussians,optim)
}

Expand Down Expand Up @@ -299,7 +299,7 @@ object GradientGaussianMixture{

val sc = data.sparkContext
val d = data.take(1)(0).size //get data dimensionality
val n = math.min(dataSize,pointsPerCl*k).toInt
val n = math.min(dataSize,pointsPerCl*k).toInt //in case the data has too few points
var samples = sc.parallelize(data.takeSample(withReplacement = false, n, seed))

//create kmeans model
Expand Down
137 changes: 6 additions & 131 deletions src/main/scala/models/mixture/StreamingGaussianMixture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,12 @@ package com.github.gradientgmm.models
import com.github.gradientgmm.components.{UpdatableGaussianComponent, UpdatableWeights, Utils}
import com.github.gradientgmm.optim.algorithms.{Optimizable, Optimizer, GradientAscent}

import breeze.linalg.{diag, eigSym, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, trace, sum}
import breeze.numerics.sqrt

import org.apache.spark.mllib.linalg.{Matrix => SM, Vector => SV, Vectors => SVS, Matrices => SMS}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.{Vector => SV}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.rdd.RDD


import org.apache.log4j.Logger

/**
* Gradient-based Gaussian Mixture model for streaming data
* See ''Hosseini, Reshad & Sra, Suvrit. (2017). An Alternative to EM for Gaussian Mixture Models: Batch and Stochastic Riemannian Optimization''
Expand All @@ -34,9 +29,9 @@ class StreamingGaussianMixture private[models] (
*/
def step(data: DStream[SV]) {
data.foreachRDD { (rdd, time) =>
step(rdd)
}
data.foreachRDD { (rdd, time) =>
step(rdd)
}
}

/**
Expand All @@ -49,7 +44,7 @@ class StreamingGaussianMixture private[models] (
data.foreachRDD { (rdd, time) =>
predict(rdd)
}
}
}

/**
* Soft cluster membership prediction for streaming data
Expand All @@ -69,10 +64,9 @@ class StreamingGaussianMixture private[models] (

object StreamingGaussianMixture{
/**
* Creates a new StreamingGaussianMixture instance
* Creates a new StreamingGaussianMixture instance with a GradientAscent optimizer
* @param weights Array of weights
* @param gaussians Array of mixture components
* @param optim Optimizer object
*/
def apply(
Expand Down Expand Up @@ -103,123 +97,4 @@ object StreamingGaussianMixture{
optim)
}

/**
* Creates a new StreamingGaussianMixture instance initialized with the
* results of a K-means model fitted with a sample of the data
* @param data training data in the form of an RDD of Spark vectors
* @param optim Optimizer object
* @param k Number of components in the mixture
* @param nSamples Number of data points to train the K-means model
* @param nIters Number of iterations allowed for the K-means model
* @param seed random seed
*/
def initialize(
data: RDD[SV],
optim: Optimizer,
k: Int,
nSamples: Int,
nIters: Int,
seed: Long = 0): StreamingGaussianMixture = {

val sc = data.sparkContext
val d = data.take(1)(0).size
val n = math.max(nSamples,2*k)
var samples = sc.parallelize(data.takeSample(withReplacement = false, n, seed))

//create kmeans model
val kmeansModel = new KMeans()
.setMaxIterations(nIters)
.setK(k)
.setSeed(seed)
.run(samples)

val means = kmeansModel.clusterCenters.map{case v => Utils.toBDV(v.toArray)}

//add means to sample points to avoid having cluster with zero points
samples = samples.union(sc.parallelize(means.map{case v => SVS.dense(v.toArray)}))

// broadcast values to compute sample covariance matrices
val kmm = sc.broadcast(kmeansModel)
val scMeans = sc.broadcast(means)

// get empirical cluster proportions to initialize the mixture/s weights
//add 1 to counts to avoid division by zero
val proportions = samples
.map{case s => (kmm.value.predict(s),1)}
.reduceByKey(_ + _)
.sortByKey()
.collect()
.map{case (k,p) => p.toDouble}

val scProportions = sc.broadcast(proportions)

//get empirical covariance matrices
//also add a rescaled identity matrix to avoid starting with singular matrices
val pseudoCov = samples
.map{case v => {
val prediction = kmm.value.predict(v)
val denom = math.sqrt(scProportions.value(prediction))
(prediction,(Utils.toBDV(v.toArray)-scMeans.value(prediction))/denom) }} // x => (x-mean)
.map{case (k,v) => (k,v*v.t)}
.reduceByKey(_ + _)
.map{case (k,v) => {
val avgVariance = math.max(1e-4,trace(v))/d
(k,v + BDM.eye[Double](d) * avgVariance)
}}
.sortByKey()
.collect()
.map{case (k,m) => m}

new StreamingGaussianMixture(
new UpdatableWeights(proportions.map{case p => p/(n+k)}),
(0 to k-1).map{case i => UpdatableGaussianComponent(means(i),pseudoCov(i))}.toArray,
optim)

}

/**
* Fit a Gaussian Mixture Model (see [[https://en.wikipedia.org/wiki/Mixture_model#Gaussian_mixture_model]]).
* The model is initialized using a K-means algorithm over a small sample and then
* fitting the resulting parameters to the data using this {GMMOptimization} object
* @param data Data to fit the model
* @param optim Optimization algorithm
* @param k Number of mixture components (clusters)
* @param batchSize number of samples processed per iteration
* @param maxIter maximum number of gradient ascent steps allowed
* @param convTol log-likelihood change tolerance for stopping criteria
* @param startingSampleSize Sample size for the K-means algorithm
* @param kMeansIters Number of iterations allowed for the K-means algorithm
* @param seed Random seed
* @return Fitted model
*/
def fit(
data: RDD[SV],
optim: Optimizer = new GradientAscent(),
k: Int = 2,
batchSize: Option[Int] = None,
maxIter: Int = 100,
convTol: Double = 1e-6,
startingSampleSize: Int = 50,
kMeansIters: Int = 20,
seed: Int = 0): StreamingGaussianMixture = {

val model = initialize(
data,
optim,
k,
startingSampleSize,
kMeansIters,
seed)

if(batchSize.isDefined){
model.setBatchSize(batchSize.get)
}

model
.setMaxIter(maxIter)
.setConvergenceTol(convTol)
.step(data)

model
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.gradientgmm.models

import com.github.gradientgmm.components.{UpdatableGaussianComponent, UpdatableWeights, Utils}

import breeze.linalg.{diag, eigSym, max, DenseMatrix => BDM, DenseVector => BDV, Vector => BV}
import breeze.linalg.{diag, max, DenseMatrix => BDM, DenseVector => BDV, Vector => BV}

import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaRDD
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.gradientgmm.optim.algorithms

import breeze.linalg.{diag, eigSym, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, trace, sum}
import breeze.linalg.{diag, DenseMatrix => BDM, DenseVector => BDV, Vector => BV, trace, sum}
import breeze.numerics.sqrt

import org.apache.spark.mllib.linalg.{Vector => SV}
Expand Down

0 comments on commit bba4eff

Please sign in to comment.