From a61be7aee99a21ad0e7202c78edeb9e7427ec74b Mon Sep 17 00:00:00 2001 From: Dan Crankshaw Date: Tue, 25 Mar 2014 07:03:41 +0000 Subject: [PATCH] More timing logging and fixing the iterator issue. --- .../spark/graphx/WikiPipelineBenchmark.scala | 12 ++++++- .../spark/graphx/impl/RoutingTable.scala | 34 +++++++++++-------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala index 86e52ce20..4aa2e7931 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/WikiPipelineBenchmark.scala @@ -77,6 +77,7 @@ object WikiPipelineBenchmark extends Logging { val startTime = System.currentTimeMillis logWarning("starting pagerank") // GRAPH VIEW + val ccStartTime = System.currentTimeMillis val ccGraph = ConnectedComponents.run(currentGraph).cache val zeroVal = new JTreeSet[VertexId]() val seqOp = (s: JTreeSet[VertexId], vtuple: (VertexId, VertexId)) => { @@ -89,9 +90,14 @@ object WikiPipelineBenchmark extends Logging { } // TABLE VIEW val numCCs = ccGraph.vertices.aggregate(zeroVal)(seqOp, combOp).size() + val ccEndTime = System.currentTimeMillis + logWarning(s"Connected Components TIMEX: ${(ccEndTime - ccStartTime)/1000.0}") logWarning(s"Number of connected components for iteration $i: $numCCs") + val prStartTime = System.currentTimeMillis val pr = PageRank.run(currentGraph, 20).cache pr.vertices.count + val prEndTime = System.currentTimeMillis + logWarning(s"Pagerank TIMEX: ${(prEndTime - prStartTime)/1000.0}") logWarning("Pagerank completed") // TABLE VIEW val prAndTitle = currentGraph.outerJoinVertices(pr.vertices)({(id: VertexId, title: String, rank: Option[Double]) => (title, rank.getOrElse(0.0))}).cache @@ -106,7 +112,11 @@ object WikiPipelineBenchmark extends Logging { } val newGraph = currentGraph.subgraph(x => true, filterTop20).cache newGraph.vertices.count - logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}") + logWarning(s"TOTAL_TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}") + currentGraph.unpersistVertices(blocking = false) + ccGraph.unpersistVertices(blocking = false) + pr.unpersistVertices(blocking = false) + prAndTitle.unpersistVertices(blocking = false) currentGraph = newGraph } currentGraph diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala index fe44e1ee0..352459719 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala @@ -49,24 +49,28 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = { // Determine which vertices each edge partition needs by creating a mapping from vid to pid. val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => - val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() - val numEdges = edgePartition.size - val vSet = new VertexSet - if (includeSrcAttr) { // Add src vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.srcIds(i)) - i += 1 + if (iter.hasNext) { + val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() + val numEdges = edgePartition.size + val vSet = new VertexSet + if (includeSrcAttr) { // Add src vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.srcIds(i)) + i += 1 + } } - } - if (includeDstAttr) { // Add dst vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.dstIds(i)) - i += 1 + if (includeDstAttr) { // Add dst vertices to the set. + var i = 0 + while (i < numEdges) { + vSet.add(edgePartition.dstIds(i)) + i += 1 + } } + vSet.iterator.map { vid => (vid, pid) } + } else { + Iterator.empty } - vSet.iterator.map { vid => (vid, pid) } } val numPartitions = vertices.partitions.size