Skip to content

Commit

Permalink
Optionally checkpoint in Pregel
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Apr 20, 2014
1 parent bcc79f2 commit c8a7bb1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
9 changes: 8 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ object Pregel extends Logging {
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
activeDirection: EdgeDirection = EdgeDirection.Either,
checkpoint: Boolean = false)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
if (checkpoint) {
graph.edges.checkpoint()
}
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
Expand All @@ -134,6 +138,9 @@ object Pregel extends Logging {
prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
g.cache()
if (checkpoint) {
g.vertices.checkpoint()
}

val oldMessages = messages
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ object Analytics extends Logging {
var numEPart = 4
var partitionStrategy: Option[PartitionStrategy] = None
var numIterOpt: Option[Int] = None
var checkpointDirOpt: Option[String] = None

options.foreach{
case ("tol", v) => tol = v.toFloat
case ("output", v) => outFname = v
case ("numEPart", v) => numEPart = v.toInt
case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
case ("numIter", v) => numIterOpt = Some(v.toInt)
case ("checkpointDir", v) => checkpointDirOpt = Some(v)
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

Expand All @@ -74,6 +76,10 @@ object Analytics extends Logging {
println("======================================")

val sc = new SparkContext(host, "PageRank(" + fname + ")", conf)
checkpointDirOpt match {
case Some(checkpointDir) => sc.setCheckpointDir(checkpointDir)
case None => {}
}

val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
minEdgePartitions = numEPart).cache()
Expand All @@ -83,8 +89,8 @@ object Analytics extends Logging {
println("GRAPHX: Number of edges " + graph.edges.count)

val pr = (numIterOpt match {
case Some(numIter) => PageRank.run(graph, numIter)
case None => PageRank.runUntilConvergence(graph, tol)
case Some(numIter) => PageRank.run(graph, numIter, checkpoint = checkpointDirOpt.nonEmpty)
case None => PageRank.runUntilConvergence(graph, tol, checkpoint = checkpointDirOpt.nonEmpty)
}).vertices.cache()

println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _))
Expand Down
16 changes: 8 additions & 8 deletions graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ object PageRank extends Logging {
*
*/
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] =
{
graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15, checkpoint: Boolean = false)
: Graph[Double, Double] = {
// Initialize the pagerankGraph with each edge attribute having
// weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[Double, Double] = graph
Expand All @@ -101,8 +101,8 @@ object PageRank extends Logging {
val initialMessage = 0.0

// Execute pregel for a fixed number of iterations.
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
Pregel(pagerankGraph, initialMessage, numIter, activeDirection = EdgeDirection.Out,
checkpoint = checkpoint)(vertexProgram, sendMessage, messageCombiner)
}

/**
Expand All @@ -120,8 +120,8 @@ object PageRank extends Logging {
* containing the normalized weight.
*/
def runUntilConvergence[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
{
graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15, checkpoint: Boolean = false)
: Graph[Double, Double] = {
// Initialize the pagerankGraph with each edge attribute
// having weight 1/outDegree and each vertex with attribute 1.0.
val pagerankGraph: Graph[(Double, Double), Double] = graph
Expand Down Expand Up @@ -157,8 +157,8 @@ object PageRank extends Logging {
val initialMessage = resetProb / (1.0 - resetProb)

// Execute a dynamic version of Pregel.
Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vertexProgram, sendMessage, messageCombiner)
Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out,
checkpoint = checkpoint)(vertexProgram, sendMessage, messageCombiner)
.mapVertices((vid, attr) => attr._1)
} // end of deltaPageRank
}

0 comments on commit c8a7bb1

Please sign in to comment.