diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/BenchmarkTPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/BenchmarkTPCDS.scala new file mode 100644 index 00000000..63509f41 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/BenchmarkTPCDS.scala @@ -0,0 +1,79 @@ +package com.databricks.spark.sql.perf.tpcds + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession + +case class BenchmarkTPCDSConfig( + database: String = "", + iterations: Int = 3, + resultLocation: String = "", + scaleFactor: String = "1000", + timeout: Int = 60 +) + +object BenchmarkTPCDS { + def main(args: Array[String]) = { + val parser = new scopt.OptionParser[BenchmarkTPCDSConfig]("Gen-TPC-DS-data") { + opt[String]('d', "database") + .action { (x, c) => c.copy(database = x) } + .required() + opt[Int]('i', "iterations") + .action { (x, c) => c.copy(iterations = x) } + .required() + opt[String]('s', "scaleFactor") + .action((x, c) => c.copy(scaleFactor = x)) + .required() + opt[String]('r', "resultLocation") + .action((x, c) => c.copy(resultLocation = x)) + .required() + opt[Int]('t', "timeout") + .action((x, c) => c.copy(timeout = x)) + .required() + } + + parser.parse(args, BenchmarkTPCDSConfig()) match { + case Some(config) => + run(config) + case None => + System.exit(1) + } + } + + def run(config: BenchmarkTPCDSConfig) = { + val conf = new SparkConf() + conf.set("spark.hadoop.hive.exec.scratchdir", "/tmp/hive-scratch") + conf.set("spark.hadoop.hive.metastore.sasl.enabled", "true") + conf.set("spark.authenticate", "true") + conf.set("spark.sql.catalogImplementation", "hive") + conf.set("spark.sql.broadcastTimeout", "10000") + val spark = SparkSession + .builder() + .appName(getClass.getName) + .master("yarn") + .config(conf) + .getOrCreate() + + spark.sql(s"use ${config.database}") + + val tpcds = new TPCDS(sqlContext = spark.sqlContext) + val query_filter = Seq() + + def queries = { + val filtered_queries = query_filter match { + case Seq() => tpcds.tpcds2_4Queries + case _ => tpcds.tpcds2_4Queries.filter(q => query_filter.contains(q.name)) + } + filtered_queries + } + + val experiment = tpcds.runExperiment( + queries, + iterations = config.iterations, + resultLocation = config.resultLocation, + tags = Map("runtype" -> "benchmark", "database" -> config.database, "scale_factor" -> config.scaleFactor)) + + println(experiment.toString) + experiment.waitForFinish(config.timeout*60*60) + + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/GenTPCDSData.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/GenTPCDSData.scala index d3414844..5d582352 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/GenTPCDSData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/GenTPCDSData.scala @@ -16,10 +16,11 @@ package com.databricks.spark.sql.perf.tpcds +import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession case class GenTPCDSDataConfig( - master: String = "local[*]", + master: String = "yarn", dsdgenDir: String = null, scaleFactor: String = null, location: String = null, @@ -31,7 +32,9 @@ case class GenTPCDSDataConfig( clusterByPartitionColumns: Boolean = true, filterOutNullPartitionValues: Boolean = true, tableFilter: String = "", - numPartitions: Int = 100) + numPartitions: Int = 100, + databaseName: String = "", + createHiveTableEnabled: Boolean = false) /** * Gen TPCDS data. @@ -45,7 +48,7 @@ object GenTPCDSData { val parser = new scopt.OptionParser[GenTPCDSDataConfig]("Gen-TPC-DS-data") { opt[String]('m', "master") .action { (x, c) => c.copy(master = x) } - .text("the Spark master to use, default to local[*]") + .text("the Spark master to use, default to yarn") opt[String]('d', "dsdgenDir") .action { (x, c) => c.copy(dsdgenDir = x) } .text("location of dsdgen") @@ -83,6 +86,12 @@ object GenTPCDSData { opt[Int]('n', "numPartitions") .action((x, c) => c.copy(numPartitions = x)) .text("how many dsdgen partitions to run - number of input tasks.") + opt[Boolean](name = "createHiveTableEnabled") + .action((x, c) => c.copy(createHiveTableEnabled = x)) + .text("boolean flag to create hive external table.") + opt[String]("databaseName") + .action((x, c) => c.copy(databaseName = x)) + .text("database name to use for creating hive external tables.") help("help") .text("prints this usage text") } @@ -96,10 +105,16 @@ object GenTPCDSData { } private def run(config: GenTPCDSDataConfig) { + val conf = new SparkConf() + conf.set("spark.hadoop.hive.exec.scratchdir", "/tmp/hive-scratch") + conf.set("spark.hadoop.hive.metastore.sasl.enabled", "true") + conf.set("spark.authenticate", "true") + conf.set("spark.sql.catalogImplementation", "hive") val spark = SparkSession .builder() .appName(getClass.getName) .master(config.master) + .config(conf) .getOrCreate() val tables = new TPCDSTables(spark.sqlContext, @@ -117,5 +132,14 @@ object GenTPCDSData { filterOutNullPartitionValues = config.filterOutNullPartitionValues, tableFilter = config.tableFilter, numPartitions = config.numPartitions) + + if (config.createHiveTableEnabled) { + tables.createExternalTables( + config.location, + config.format, + config.databaseName, + overwrite = false, + discoverPartitions = true) + } } }