From 4c0d5f06e9b633e9c4f232c532b2f4fce21fe0fc Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 1 Jun 2016 10:29:27 +0200 Subject: [PATCH 1/7] Adding more configurations to livy interpreter --- .../org/apache/zeppelin/livy/LivyHelper.java | 48 +++++++++++++------ .../zeppelin/livy/LivySparkInterpreter.java | 15 ++++++ 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 27fc422948d..95807d68df0 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -60,27 +60,45 @@ public class LivyHelper { public Integer createSession(InterpreterContext context, String kind) throws Exception { try { + Map conf = new HashMap(); + + conf.put("spark.master", property.getProperty("zeppelin.livy.master")); + + conf.put("spark.driver.cores", property.getProperty("spark.driver.cores")); + conf.put("spark.executor.cores", property.getProperty("spark.executor.cores")); + conf.put("spark.driver.memory", property.getProperty("spark.driver.memory")); + conf.put("spark.executor.memory", property.getProperty("spark.executor.memory")); + + if (!property.getProperty("spark.dynamicAllocation.enabled").equals("true")) { + conf.put("spark.executor.instances", property.getProperty("spark.executor.instances")); + } + + if (property.getProperty("spark.dynamicAllocation.enabled").equals("true")) { + conf.put("spark.dynamicAllocation.enabled", + property.getProperty("spark.dynamicAllocation.enabled")); + conf.put("spark.shuffle.service.enabled", "true"); + conf.put("spark.dynamicAllocation.cachedExecutorIdleTimeout", + property.getProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout")); + conf.put("spark.dynamicAllocation.minExecutors", + property.getProperty("spark.dynamicAllocation.minExecutors")); + conf.put("spark.dynamicAllocation.initialExecutors", + property.getProperty("spark.dynamicAllocation.initialExecutors")); + conf.put("spark.dynamicAllocation.maxExecutors", + property.getProperty("spark.dynamicAllocation.maxExecutors")); + } + + String confData = gson.toJson(conf); + String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", - "POST", + "POST", "{" + "\"kind\": \"" + kind + "\", " + - "\"master\": \"" + property.getProperty("zeppelin.livy.master") + "\", " + - "\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + "\"" + + "\"conf\": " + confData + ", " + + "\"proxyUser\": " + context.getAuthenticationInfo().getUser() + "}", context.getParagraphId() ); - if (json.contains("CreateInteractiveRequest[\\\"master\\\"]")) { - json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", - "POST", - "{" + - "\"kind\": \"" + kind + "\", " + - "\"conf\":{\"spark.master\": \"" - + property.getProperty("zeppelin.livy.master") + "\"}," + - "\"proxyUser\": \"" + context.getAuthenticationInfo().getUser() + "\"" + - "}", - context.getParagraphId() - ); - } + Map jsonMap = (Map) gson.fromJson(json, new TypeToken>() { }.getType()); diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 23a6379bebf..4ec0c39212a 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -46,6 +46,21 @@ public class LivySparkInterpreter extends Interpreter { new InterpreterPropertyBuilder() .add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.") .add("zeppelin.livy.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") + .add("spark.driver.cores", "1", "Driver cores. ex) 1, 2") + .add("spark.driver.memory", "512m", "Driver memory. ex) 512m, 32g") + .add("spark.executor.instances", "3", "Executor instances. ex) 1, 4") + .add("spark.executor.cores", "1", "Num cores per executor. ex) 1, 4") + .add("spark.executor.memory", "512m", + "Executor memory per worker instance. ex) 512m, 32g") + .add("spark.dynamicAllocation.enabled", "false", "Use dynamic resource allocation") + .add("spark.dynamicAllocation.cachedExecutorIdleTimeout", "120s", + "Remove an executor which has cached data blocks") + .add("spark.dynamicAllocation.minExecutors", "0", + "Lower bound for the number of executors if dynamic allocation is enabled. ") + .add("spark.dynamicAllocation.initialExecutors", "1", + "Initial number of executors to run if dynamic allocation is enabled. ") + .add("spark.dynamicAllocation.maxExecutors", "10", + "Upper bound for the number of executors if dynamic allocation is enabled. ") .build() ); } From 72474b94c9df78ba8e571b93ccdb50555c10c708 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Wed, 1 Jun 2016 10:29:27 +0200 Subject: [PATCH 2/7] Adding more configurations to livy interpreter --- docs/interpreter/livy.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index 295a5080f40..b3e9aacb0dc 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -44,6 +44,26 @@ Additional requirements for the Livy interpreter are: 1000 Max number of SparkSQL result to display. + + spark.driver.cores + 1 + Driver cores. ex) 1, 2. + + + spark.driver.memory + 512m + Driver memory. ex) 512m, 32g. + + + spark.executor.instances + 3 + Executor instances. ex) 1, 4. + + + spark.executor.cores + 1 + Max number of SparkSQL result to display. + @@ -105,3 +125,6 @@ The session would have timed out, you may need to restart the interpreter. > Blacklisted configuration values in session config: spark.master edit `conf/spark-blacklist.conf` file in livy server and comment out `#spark.master` line. + +if you choose to work on livy https://github.com/cloudera/hue/tree/master/apps/spark/java, +copy `spark-user-configurable-options.template` to `spark-user-configurable-options.conf` file in livy server and comment out `#spark.master` From f988af0d4256372ae8b857bcccef505e6b9a4c98 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Fri, 3 Jun 2016 14:52:31 +0200 Subject: [PATCH 3/7] Supporting all spark configurations --- docs/interpreter/livy.md | 4 +-- .../org/apache/zeppelin/livy/LivyHelper.java | 31 +++++-------------- .../zeppelin/livy/LivySparkInterpreter.java | 2 +- 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index b3e9aacb0dc..ae89f6421f3 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -30,7 +30,7 @@ Additional requirements for the Livy interpreter are: Description - zeppelin.livy.master + spark.master local[*] Spark master uri. ex) spark://masterhost:7077 @@ -126,5 +126,5 @@ The session would have timed out, you may need to restart the interpreter. edit `conf/spark-blacklist.conf` file in livy server and comment out `#spark.master` line. -if you choose to work on livy https://github.com/cloudera/hue/tree/master/apps/spark/java, +if you choose to work on livy in `apps/spark/java` directory in https://github.com/cloudera/hue , copy `spark-user-configurable-options.template` to `spark-user-configurable-options.conf` file in livy server and comment out `#spark.master` diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 95807d68df0..5ae2e67d675 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -20,6 +20,7 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; + import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; @@ -40,7 +41,9 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; @@ -62,29 +65,11 @@ public Integer createSession(InterpreterContext context, String kind) throws Exc try { Map conf = new HashMap(); - conf.put("spark.master", property.getProperty("zeppelin.livy.master")); - - conf.put("spark.driver.cores", property.getProperty("spark.driver.cores")); - conf.put("spark.executor.cores", property.getProperty("spark.executor.cores")); - conf.put("spark.driver.memory", property.getProperty("spark.driver.memory")); - conf.put("spark.executor.memory", property.getProperty("spark.executor.memory")); - - if (!property.getProperty("spark.dynamicAllocation.enabled").equals("true")) { - conf.put("spark.executor.instances", property.getProperty("spark.executor.instances")); - } - - if (property.getProperty("spark.dynamicAllocation.enabled").equals("true")) { - conf.put("spark.dynamicAllocation.enabled", - property.getProperty("spark.dynamicAllocation.enabled")); - conf.put("spark.shuffle.service.enabled", "true"); - conf.put("spark.dynamicAllocation.cachedExecutorIdleTimeout", - property.getProperty("spark.dynamicAllocation.cachedExecutorIdleTimeout")); - conf.put("spark.dynamicAllocation.minExecutors", - property.getProperty("spark.dynamicAllocation.minExecutors")); - conf.put("spark.dynamicAllocation.initialExecutors", - property.getProperty("spark.dynamicAllocation.initialExecutors")); - conf.put("spark.dynamicAllocation.maxExecutors", - property.getProperty("spark.dynamicAllocation.maxExecutors")); + Iterator> it = property.entrySet().iterator(); + while (it.hasNext()) { + Entry pair = it.next(); + if (pair.getKey().toString().startsWith("spark.") && !pair.getValue().toString().isEmpty()) + conf.put(pair.getKey().toString(), pair.getValue().toString()); } String confData = gson.toJson(conf); diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 4ec0c39212a..0afa8df3d0b 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -45,7 +45,7 @@ public class LivySparkInterpreter extends Interpreter { LivySparkInterpreter.class.getName(), new InterpreterPropertyBuilder() .add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.") - .add("zeppelin.livy.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") + .add("spark.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") .add("spark.driver.cores", "1", "Driver cores. ex) 1, 2") .add("spark.driver.memory", "512m", "Driver memory. ex) 512m, 32g") .add("spark.executor.instances", "3", "Executor instances. ex) 1, 4") From ddd99e13347215f8fb2ca61eafd6e3cc1e8398f7 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Fri, 3 Jun 2016 15:37:31 +0200 Subject: [PATCH 4/7] Removing default values to properties which belongs spark --- docs/interpreter/livy.md | 1 + .../zeppelin/livy/LivySparkInterpreter.java | 20 +++++++++---------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index ae89f6421f3..b1a7752e4d2 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -23,6 +23,7 @@ Additional requirements for the Livy interpreter are: * Livy server. ### Configuration +We added some common configurations for spark, and you can set any configuration you want which should start with `spark.` diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 0afa8df3d0b..9d15035c749 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -46,20 +46,20 @@ public class LivySparkInterpreter extends Interpreter { new InterpreterPropertyBuilder() .add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.") .add("spark.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") - .add("spark.driver.cores", "1", "Driver cores. ex) 1, 2") - .add("spark.driver.memory", "512m", "Driver memory. ex) 512m, 32g") - .add("spark.executor.instances", "3", "Executor instances. ex) 1, 4") - .add("spark.executor.cores", "1", "Num cores per executor. ex) 1, 4") - .add("spark.executor.memory", "512m", + .add("spark.driver.cores", "", "Driver cores. ex) 1, 2") + .add("spark.driver.memory", "", "Driver memory. ex) 512m, 32g") + .add("spark.executor.instances", "", "Executor instances. ex) 1, 4") + .add("spark.executor.cores", "", "Num cores per executor. ex) 1, 4") + .add("spark.executor.memory", "", "Executor memory per worker instance. ex) 512m, 32g") - .add("spark.dynamicAllocation.enabled", "false", "Use dynamic resource allocation") - .add("spark.dynamicAllocation.cachedExecutorIdleTimeout", "120s", + .add("spark.dynamicAllocation.enabled", "", "Use dynamic resource allocation") + .add("spark.dynamicAllocation.cachedExecutorIdleTimeout", "", "Remove an executor which has cached data blocks") - .add("spark.dynamicAllocation.minExecutors", "0", + .add("spark.dynamicAllocation.minExecutors", "", "Lower bound for the number of executors if dynamic allocation is enabled. ") - .add("spark.dynamicAllocation.initialExecutors", "1", + .add("spark.dynamicAllocation.initialExecutors", "", "Initial number of executors to run if dynamic allocation is enabled. ") - .add("spark.dynamicAllocation.maxExecutors", "10", + .add("spark.dynamicAllocation.maxExecutors", "", "Upper bound for the number of executors if dynamic allocation is enabled. ") .build() ); From c2b36f5cf82cc39ec6ee5352b49c8dc095fc2fcf Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Sat, 4 Jun 2016 18:33:55 +0200 Subject: [PATCH 5/7] Prefixing all spark properties with livy.* --- docs/interpreter/livy.md | 12 +++++----- .../org/apache/zeppelin/livy/LivyHelper.java | 5 +++-- .../zeppelin/livy/LivySparkInterpreter.java | 22 +++++++++---------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index b1a7752e4d2..ad6ff47eae4 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -23,7 +23,7 @@ Additional requirements for the Livy interpreter are: * Livy server. ### Configuration -We added some common configurations for spark, and you can set any configuration you want which should start with `spark.` +We added some common configurations for spark, and you can set any configuration you want which should start with `livy.spark.`
Property
@@ -31,7 +31,7 @@ We added some common configurations for spark, and you can set any configuration - + @@ -46,22 +46,22 @@ We added some common configurations for spark, and you can set any configuration - + - + - + - + diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java index 5ae2e67d675..7f3517eea5a 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivyHelper.java @@ -68,8 +68,9 @@ public Integer createSession(InterpreterContext context, String kind) throws Exc Iterator> it = property.entrySet().iterator(); while (it.hasNext()) { Entry pair = it.next(); - if (pair.getKey().toString().startsWith("spark.") && !pair.getValue().toString().isEmpty()) - conf.put(pair.getKey().toString(), pair.getValue().toString()); + if (pair.getKey().toString().startsWith("livy.spark.") && + !pair.getValue().toString().isEmpty()) + conf.put(pair.getKey().toString().substring(5), pair.getValue().toString()); } String confData = gson.toJson(conf); diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java index 9d15035c749..0a29d795f6d 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkInterpreter.java @@ -45,21 +45,21 @@ public class LivySparkInterpreter extends Interpreter { LivySparkInterpreter.class.getName(), new InterpreterPropertyBuilder() .add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.") - .add("spark.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") - .add("spark.driver.cores", "", "Driver cores. ex) 1, 2") - .add("spark.driver.memory", "", "Driver memory. ex) 512m, 32g") - .add("spark.executor.instances", "", "Executor instances. ex) 1, 4") - .add("spark.executor.cores", "", "Num cores per executor. ex) 1, 4") - .add("spark.executor.memory", "", + .add("livy.spark.master", LOCAL, "Spark master uri. ex) spark://masterhost:7077") + .add("livy.spark.driver.cores", "", "Driver cores. ex) 1, 2") + .add("livy.spark.driver.memory", "", "Driver memory. ex) 512m, 32g") + .add("livy.spark.executor.instances", "", "Executor instances. ex) 1, 4") + .add("livy.spark.executor.cores", "", "Num cores per executor. ex) 1, 4") + .add("livy.spark.executor.memory", "", "Executor memory per worker instance. ex) 512m, 32g") - .add("spark.dynamicAllocation.enabled", "", "Use dynamic resource allocation") - .add("spark.dynamicAllocation.cachedExecutorIdleTimeout", "", + .add("livy.spark.dynamicAllocation.enabled", "", "Use dynamic resource allocation") + .add("livy.spark.dynamicAllocation.cachedExecutorIdleTimeout", "", "Remove an executor which has cached data blocks") - .add("spark.dynamicAllocation.minExecutors", "", + .add("livy.spark.dynamicAllocation.minExecutors", "", "Lower bound for the number of executors if dynamic allocation is enabled. ") - .add("spark.dynamicAllocation.initialExecutors", "", + .add("livy.spark.dynamicAllocation.initialExecutors", "", "Initial number of executors to run if dynamic allocation is enabled. ") - .add("spark.dynamicAllocation.maxExecutors", "", + .add("livy.spark.dynamicAllocation.maxExecutors", "", "Upper bound for the number of executors if dynamic allocation is enabled. ") .build() ); From bb2d5dc0bc30b23bd58a8e4d4f0e04d758786d9d Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Mon, 6 Jun 2016 01:04:30 +0200 Subject: [PATCH 6/7] Adding spark guide link --- docs/interpreter/livy.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index ad6ff47eae4..d7a1e4f5fcc 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -23,7 +23,11 @@ Additional requirements for the Livy interpreter are: * Livy server. ### Configuration -We added some common configurations for spark, and you can set any configuration you want which should start with `livy.spark.` +We added some common configurations for spark, and you can set any configuration you want. +This link contains all spark configurations: http://spark.apache.org/docs/latest/configuration.html#available-properties. +And instead of starting property with `spark.` it should be replaced with `livy.spark.`. +Example: `spark.master` to `livy.spark.master` +
PropertyDescription
spark.masterlivy.spark.master local[*] Spark master uri. ex) spark://masterhost:7077
Max number of SparkSQL result to display.
spark.driver.coreslivy.spark.driver.cores 1 Driver cores. ex) 1, 2.
spark.driver.memorylivy.spark.driver.memory 512m Driver memory. ex) 512m, 32g.
spark.executor.instanceslivy.spark.executor.instances 3 Executor instances. ex) 1, 4.
spark.executor.coreslivy.spark.executor.cores 1 Max number of SparkSQL result to display.
From ccf3c82528d9c6a1be535e6b695780e6de0727c8 Mon Sep 17 00:00:00 2001 From: mahmoudelgamal Date: Mon, 6 Jun 2016 10:18:24 +0200 Subject: [PATCH 7/7] Adding all configurations to doc --- docs/interpreter/livy.md | 40 +++++++++++++++++++++++++++++++++++----- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index d7a1e4f5fcc..225cd817bee 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -51,23 +51,53 @@ Example: `spark.master` to `livy.spark.master` - + - + - + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property
livy.spark.driver.cores1 Driver cores. ex) 1, 2.
livy.spark.driver.memory512m Driver memory. ex) 512m, 32g.
livy.spark.executor.instances3 Executor instances. ex) 1, 4.
livy.spark.executor.cores1Max number of SparkSQL result to display.Num cores per executor. ex) 1, 4.
livy.spark.executor.memoryExecutor memory per worker instance. ex) 512m, 32g.
livy.spark.dynamicAllocation.enabledUse dynamic resource allocation. ex) True, False.
livy.spark.dynamicAllocation.cachedExecutorIdleTimeoutRemove an executor which has cached data blocks.
livy.spark.dynamicAllocation.minExecutorsLower bound for the number of executors.
livy.spark.dynamicAllocation.initialExecutorsInitial number of executors to run.
livy.spark.dynamicAllocation.maxExecutorsUpper bound for the number of executors.