Skip to content

Commit

Permalink
fix CIM load options
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Nov 24, 2017
1 parent 47d9e94 commit 570867f
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,32 @@ public int hashCode ()
public String toString ()
{
StringBuilder sb = new StringBuilder ();

sb.append ("[@");
sb.append ("[master: ");
sb.append (getMaster ());
sb.append (" + ");
sb.append (" + cassandra: ");
sb.append (getCassandra ());
sb.append (": ");
for (String key: getProperties ().keySet ())
if (0 != getJars ().size ())
{
sb.append (key);
sb.append ("=");
sb.append (getProperties ().get (key));
sb.append (" properties (");
for (String key: getProperties ().keySet ())
{
sb.append (key);
sb.append ("=");
sb.append (getProperties ().get (key));
sb.append (" ");
}
sb.setLength (sb.length () - 1);
sb.append (")");
}
if (0 != getJars ().size ())
{
sb.append (" (");
sb.append (" jars (");
for (String jar: getJars ())
{
sb.append (jar);
sb.append (" ");
}
sb.setLength (sb.length () - 1);
sb.append (")");
}
sb.append ("]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,19 @@ public void connect (Subject subject, ConnectionRequestInfo info)
if ((null != cassandra) && !cassandra.equals (""))
configuration.set ("spark.cassandra.connection.host", cassandra);

if (null != System.getProperty ("SPARK_HOME"))
System.setProperty ("spark.home", System.getProperty ("SPARK_HOME"));
if (null != System.getProperty ("HADOOP_HOME"))
{
// ToDo: read from conf/spark-defaults.conf
System.setProperty ("spark.driver.extraLibraryPath", System.getProperty ("HADOOP_HOME") + "/lib/native");
System.setProperty ("spark.executor.extraLibraryPath", System.getProperty ("HADOOP_HOME") + "/lib/native");
}
configuration.set ("spark.sql.warehouse.dir", "file:/tmp/spark-warehouse");
// need hive jars too:
// configuration.set ("spark.sql.catalogImplementation", "hive");
configuration.set ("spark.submit.deployMode", "client");

// add the other properties
for (String key : _RequestInfo.getProperties ().keySet ())
configuration.set (key, _RequestInfo.getProperties ().get (key));
Expand All @@ -223,9 +236,6 @@ public void connect (Subject subject, ConnectionRequestInfo info)
jars[size] = j2ee;
configuration.setJars (jars);

if (null != logger)
logger.println ("SparkConf = " + configuration.toDebugString ());

// so far, it only works for Spark standalone (as above with master set to spark://sandbox:7077
// here are some options I tried for Yarn access master set to "yarn-client" that didn't work
// configuration.setMaster ("yarn-client"); // assumes a resource manager is specified in yarn-site.xml, e.g. sandbox:8032
Expand All @@ -235,6 +245,9 @@ public void connect (Subject subject, ConnectionRequestInfo info)
// register CIMReader classes
configuration.registerKryoClasses (CIMClasses.list ());

if (null != logger)
logger.println ("SparkConf:\n" + configuration.toDebugString ());

// setting spark.executor.memory as a property of SparkConf doesn't work:
if (null != _RequestInfo.getProperties ().get ("spark.executor.memory"))
System.setProperty ("spark.executor.memory", _RequestInfo.getProperties ().get ("spark.executor.memory"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package ch.ninecode.cim.cimweb
import javax.json.Json
import javax.json.JsonStructure

import ch.ninecode.cim.CIMEdges
import ch.ninecode.cim.CIMJoin
import ch.ninecode.cim.CIMNetworkTopologyProcessor
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.HashMap

Expand All @@ -24,25 +28,75 @@ case class LoadCIMFileFunction (paths: Array[String], options: Iterable[(String,
for (f <- files)
ff.add (f)
response.add ("files", ff)
val reader_options = new HashMap[String, String] ()
if (null != options)
reader_options ++= options
else

// establish default options if needed
val op = if (null == options)
{
reader_options.put ("StorageLevel", "MEMORY_AND_DISK_SER")
reader_options.put ("ch.ninecode.cim.make_edges", "false")
reader_options.put ("ch.ninecode.cim.do_join", "false")
reader_options.put ("ch.ninecode.cim.do_topo", "false")
reader_options.put ("ch.ninecode.cim.do_topo_islands", "false")
reader_options.put ("ch.ninecode.cim.do_deduplication", if (1 < files.length) "true" else "false")
List (
("StorageLevel", "MEMORY_AND_DISK_SER"),
("ch.ninecode.cim.do_about", "false"),
("ch.ninecode.cim.do_normalize", "false"),
("ch.ninecode.cim.do_deduplication", if (1 < files.length) "true" else "false"),
("ch.ninecode.cim.make_edges", "false"),
("ch.ninecode.cim.do_join", "false"),
("ch.ninecode.cim.do_topo_islands", "false"),
("ch.ninecode.cim.do_topo", "false"),
("ch.ninecode.cim.split_maxsize", "67108864")
)
}
else
options

// echo settings to the response
val opts = Json.createObjectBuilder
for (pair <- reader_options)
for (pair <- op)
opts.add (pair._1, pair._2)
response.add ("options", opts)

// there is a problem (infinite loop) if post processing is done in the CIMReader
// so we extract out topo, edge, and join processing
var topo = false
var isld = false
var join = false
var edge = false
val reader_options = new HashMap[String, String] ()
for (option op)
option._1 match
{
case "ch.ninecode.cim.do_topo"
topo = isld || (try { option._2.toBoolean } catch { case _: Throwable => false })
case "ch.ninecode.cim.do_topo_islands"
isld = try { option._2.toBoolean } catch { case _: Throwable => false }
topo = topo || isld
case "ch.ninecode.cim.do_join"
join = try { option._2.toBoolean } catch { case _: Throwable => false }
case "ch.ninecode.cim.make_edges"
edge = try { option._2.toBoolean } catch { case _: Throwable => false }
case _
reader_options.put (option._1, option._2)
}
reader_options.put ("path", files.mkString (",")) // ToDo: why is this still needed?

val elements = spark.read.format ("ch.ninecode.cim").options (reader_options).load (files:_*)
val count = elements.count
var count = elements.count
if (topo)
{
val ntp = new CIMNetworkTopologyProcessor (spark, org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER, true)
val elements2 = ntp.process (isld)
count = elements2.count
}
if (join)
{
val join = new CIMJoin (spark, StorageLevel.fromString ("MEMORY_AND_DISK_SER"))
val elements3 = join.do_join ()
count = elements3.count
}
if (edge)
{
val edges = new CIMEdges (spark, StorageLevel.fromString ("MEMORY_AND_DISK_SER"))
val elements4 = edges.make_edges (topo)
count = elements4.count
}
response.add ("elements", count)
}
catch
Expand Down
10 changes: 8 additions & 2 deletions CIMWeb/src/main/scala/ch/ninecode/cim/cimweb/LoadFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ class LoadFile extends RESTful
@Produces (Array (MediaType.APPLICATION_JSON))
def getFile (
@PathParam ("path") path: String,
@DefaultValue ("false") @MatrixParam ("do_about") do_about: String,
@DefaultValue ("false") @MatrixParam ("do_normalize") do_normalize: String,
@DefaultValue ("false") @MatrixParam ("do_deduplication") do_deduplication: String,
@DefaultValue ("false") @MatrixParam ("make_edges") make_edges: String,
@DefaultValue ("false") @MatrixParam ("do_join") do_join: String,
@DefaultValue ("false") @MatrixParam ("do_topo") do_topo: String,
@DefaultValue ("false") @MatrixParam ("do_topo_islands") do_topo_islands: String,
@DefaultValue ("false") @MatrixParam ("do_topo") do_topo: String,
@DefaultValue ("67108864") @MatrixParam ("split_maxsize") split_maxsize: String,
@DefaultValue ("false") @MatrixParam ("header") header: String,
@DefaultValue ("false") @MatrixParam ("ignoreLeadingWhiteSpace") ignoreLeadingWhiteSpace: String,
@DefaultValue ("false") @MatrixParam ("ignoreTrailingWhiteSpace") ignoreTrailingWhiteSpace: String,
Expand Down Expand Up @@ -73,11 +76,14 @@ class LoadFile extends RESTful
val function = filetype match
{
case "CIM" // see https://github.com/derrickoswald/CIMReader#reader-api
options.put ("ch.ninecode.cim.do_about", do_about)
options.put ("ch.ninecode.cim.do_normalize", do_normalize)
options.put ("ch.ninecode.cim.do_deduplication", do_deduplication)
options.put ("ch.ninecode.cim.make_edges", make_edges)
options.put ("ch.ninecode.cim.do_join", do_join)
options.put ("ch.ninecode.cim.do_topo", do_topo)
options.put ("ch.ninecode.cim.do_topo_islands", do_topo_islands)
options.put ("ch.ninecode.cim.do_topo", do_topo)
options.put ("ch.ninecode.cim.split_maxsize", split_maxsize)
LoadCIMFileFunction (files, options)
case "CSV" // see https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader
options.put ("header", header)
Expand Down
67 changes: 64 additions & 3 deletions CIMWeb/src/main/webapp/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,51 @@
<a href="#" class="dropdown-toggle" data-toggle="dropdown" aria-expanded="false">Load <span class="caret"></span></a>
<ul class="dropdown-menu" role="menu">
<li class="dropdown-header">CIM</li>
<li>
<div class="select">
<label for="storage_level">
<select id='storage_level' class='form-control' name='storage_level'>
<option value="NONE">NONE</option>
<option value="DISK_ONLY">DISK_ONLY</option>
<option value="DISK_ONLY_2">DISK_ONLY_2</option>
<option value="MEMORY_ONLY">MEMORY_ONLY</option>
<option value="MEMORY_ONLY_2">MEMORY_ONLY_2</option>
<option value="MEMORY_ONLY_SER">MEMORY_ONLY_SER</option>
<option value="MEMORY_ONLY_SER_2">MEMORY_ONLY_SER_2</option>
<option value="MEMORY_AND_DISK">MEMORY_AND_DISK</option>
<option value="MEMORY_AND_DISK_2">MEMORY_AND_DISK_2</option>
<option value="MEMORY_AND_DISK_SER" selected>MEMORY_AND_DISK_SER</option>
<option value="MEMORY_AND_DISK_SER_2">MEMORY_AND_DISK_SER_2</option>
<option value="OFF_HEAP">OFF_HEAP</option>
</select>
StorageLevel
</label>
</div>
</li>
<li>
<div class="checkbox">
<label for="do_about">
<input id="do_about" type="checkbox"/>
Merge rdf:about
</label>
</div>
</li>
<li>
<div class="checkbox">
<label for="do_normalize">
<input id="do_normalize" type="checkbox"/>
Normalize data
</label>
</div>
</li>
<li>
<div class="checkbox">
<label for="do_deduplication">
<input id="do_deduplication" type="checkbox"/>
Deduplicate
</label>
</div>
</li>
<li>
<div class="checkbox">
<label for="make_edges">
Expand All @@ -63,9 +108,9 @@
</li>
<li>
<div class="checkbox">
<label for="do_topo">
<input id="do_topo" type="checkbox"/>
Make topology
<label for="do_join">
<input id="do_join" type="checkbox"/>
Join NIS-ISU
</label>
</div>
</li>
Expand All @@ -77,6 +122,22 @@
</label>
</div>
</li>
<li>
<div class="checkbox">
<label for="do_topo">
<input id="do_topo" type="checkbox"/>
Make topology
</label>
</div>
</li>
<li>
<div class="input">
<label for="split_maxsize">
<input id="split_maxsize" type="text" class="form-control" placeholder="split size (bytes)" style="margin-left: 15px; width: 120px;" value="67108864">
</label>
Split size
</div>div>
</li>
<li role="separator" class="divider"></li>
<li class="dropdown-header">CSV</li>
<li>
Expand Down
Loading

0 comments on commit 570867f

Please sign in to comment.