Skip to content

Commit

Permalink
* Handling SHUFFLE_MAP_BYPASS task type in SGX worker - return a reco…
Browse files Browse the repository at this point in the history
…rd-partition mapping iterator

* ShuffleMapTask checks if SGX is enabled and calls the appropriate shuffle interface
  • Loading branch information
pgaref committed Jul 30, 2019
1 parent 9440547 commit 3bc0d07
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ private[spark] class SGXWorker(closuseSer: SerializerInstance, dataSer: Serializ
logInfo(s"Executing ${funcArray.size} (pipelined) funcs")

eval_type match {

case SGXFunctionType.SHUFFLE_MAP_BYPASS =>
logDebug(s"ShuffleMap Bypass #Partitions ${numOfPartitions}")
val iterator = new ReaderIterator(inSock, dataSer).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]
val sgxPartitioner = new SGXPartitioner(numOfPartitions)
// Mapping of encrypted keys to partitions (needed by the shuffler Writter)
val keyMapping = scala.collection.mutable.Map[Any, Any]()
while (iterator.hasNext) {
val record = iterator.next()
keyMapping(record._1) = sgxPartitioner.getPartition(record._1)
}
SGXRDD.writeIteratorToStream[Any](keyMapping.toIterator, dataSer, outSock)
outSock.writeInt(SpecialSGXChars.END_OF_DATA_SECTION)
outSock.flush()

case SGXFunctionType.NON_UDF =>
// Read Iterator
val iterator = new ReaderIterator(inSock, dataSer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties

import scala.collection.JavaConverters._
import scala.language.existentials

import org.apache.spark._
import org.apache.spark.api.sgx.{SGXFunctionType, SGXRunner}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
import org.apache.spark.util.SGXUtils

/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
Expand Down Expand Up @@ -96,7 +99,18 @@ private[spark] class ShuffleMapTask(
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

if (SparkEnv.get.conf.isSGXWorkerEnabled()) {
val runner = SGXRunner(SGXUtils.toIteratorSizeSGXFunc, SGXFunctionType.SHUFFLE_MAP_BYPASS)
// Need to explicitly set the number of partitions here
val keyMapping = runner.compute(rdd.iterator(partition, context).asInstanceOf[Iterator[Array[Byte]]],
partitionId, context, dep.partitioner.numPartitions).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]
val keyMap = scala.collection.mutable.Map[Any, Integer]()
for (i <- keyMapping) keyMap(i._1) = i._2.asInstanceOf[Integer]
writer.sgxWrite(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]], keyMap.asJava)
} else {
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
}
writer.stop(success = true).get
} catch {
case e: Exception =>
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/util/SGXUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.util

import scala.collection.mutable


object SGXUtils {
/** Closures used for SGX tests should be written here (not in Tests) as SGXRunner is using
Expand All @@ -32,4 +34,17 @@ object SGXUtils {
val mapToList = (iter: Array[Int]) => iter.toList

val flatMapOneToVal: (Int) => TraversableOnce[Int] = (x: Int) => 1 to x


val groupBySum = (s: (String, scala.Iterable[Int])) => (s._1, (s._2.sum))


/**
* Dummy closure to maintain API consisent (used for shuffles - even though not used)
*/
val toIteratorSizeSGXFunc = (itr: Iterator[Any]) => {
val result = new mutable.ArrayBuffer[Any]
itr.foreach(e => result.append(e))
result.toArray.iterator
}
}

0 comments on commit 3bc0d07

Please sign in to comment.