From f36e57645c72f932ba31ea852a4e4ef1f9c5b28a Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Fri, 28 Mar 2014 15:09:23 -0700 Subject: [PATCH] Log warning on partition recompute --- .../src/main/scala/org/apache/spark/rdd/RDD.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1b43040c6..b6aa3377d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -22,6 +22,7 @@ import java.util.Random import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLog @@ -221,12 +222,22 @@ abstract class RDD[T: ClassTag]( } } + private val previouslyComputed = new HashSet[Partition] /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { - if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) + if (isCheckpointed) { + firstParent[T].iterator(split, context) + } else { + if (previouslyComputed.contains(split)) { + logWarning("Recomputing RDD %d, partition %d".format(id, split.index)) + } else { + previouslyComputed.add(split) + } + compute(split, context) + } } // Transformations (return a new RDD) @@ -1045,6 +1056,8 @@ abstract class RDD[T: ClassTag]( private[spark] def elementClassTag: ClassTag[T] = classTag[T] + private[spark] val computeSites = new ArrayBuffer[String] + private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None /** Returns the first parent RDD */