Skip to content

Commit

Permalink
Java spark context methods (#182)
Browse files Browse the repository at this point in the history
* Some static fields for JavaSparkContext

* Optional spark sessions

* Corrected missed calls
  • Loading branch information
anthony-khong authored Sep 1, 2020
1 parent 38731f7 commit d6377d4
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 15 deletions.
15 changes: 15 additions & 0 deletions src/clojure/zero_one/geni/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[camel-snake-kebab.core :refer [->kebab-case]]
[clojure.java.data :as j]
[clojure.string :refer [replace-first]]
[clojure.walk :as walk]
[zero-one.geni.utils :refer [ensure-coll]])
(:import
(java.io ByteArrayOutputStream)
Expand Down Expand Up @@ -52,6 +53,9 @@
(defn ->scala-seq [coll]
(JavaConversions/asScalaBuffer (seq coll)))

(defn ->scala-tuple2 [coll]
(Tuple2. (first coll) (second coll)))

(defn scala-tuple->vec [p]
(->> (.productArity p)
(range)
Expand All @@ -70,13 +74,24 @@
(defn ->scala-function3 [f]
(reify Function3 (apply [_ x y z] (f x y z))))

(defn optional->nillable [value]
(when (.isPresent value)
(.get value)))

(defmacro with-scala-out-str [& body]
`(let [out-buffer# (ByteArrayOutputStream.)]
(Console/withOut
out-buffer#
(->scala-function0 (fn [] ~@body)))
(.toString out-buffer# "UTF-8")))

(defn spark-conf->map [conf]
(->> conf
.getAll
(map scala-tuple->vec)
(into {})
walk/keywordize-keys))

(defn ->dense-vector [values]
(let [[x & xs] values]
(Vectors/dense x (->scala-seq xs))))
Expand Down
96 changes: 87 additions & 9 deletions src/clojure/zero_one/geni/rdd.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,96 @@
(org.apache.spark.api.java JavaSparkContext)
(org.apache.spark.sql SparkSession)))

(def value (memfn value))

;; Java Spark Context
(defn java-spark-context [spark]
(JavaSparkContext/fromSparkContext (.sparkContext spark)))

(defn app-name
([] (app-name @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .appName)))

(defn broadcast
([value] (broadcast @zero-one.geni.defaults/spark value))
([spark value] (-> spark java-spark-context (.broadcast value))))

(defn checkpoint-dir
([] (checkpoint-dir @zero-one.geni.defaults/spark))
([spark]
(-> spark java-spark-context .getCheckpointDir interop/optional->nillable)))

(defn conf
([] (conf @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .getConf interop/spark-conf->map)))

(defn default-min-partitions
([] (default-min-partitions @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .defaultMinPartitions)))

(defn default-parallelism
([] (default-parallelism @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .defaultParallelism)))

(defn empty-rdd
([] (empty-rdd @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .emptyRDD)))

(defn jars
([] (jars @zero-one.geni.defaults/spark))
([spark] (->> spark java-spark-context .jars (into []))))

(defn local?
([] (local? @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .isLocal)))
(def is-local local?)

(defn local-property
([k] (local-property @zero-one.geni.defaults/spark k))
([spark k] (-> spark java-spark-context (.getLocalProperty k))))

(defn master
([] (master @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .master)))

(defn parallelise
([data] (parallelise @zero-one.geni.defaults/spark data))
([spark data] (-> spark java-spark-context (.parallelize data))))
(def parallelize parallelise)

(defn parallelise-doubles
([data] (parallelise-doubles @zero-one.geni.defaults/spark data))
([spark data]
(-> spark
java-spark-context
(.parallelizeDoubles (clojure.core/map double data)))))
(def parallelize-doubles parallelise-doubles)

(defn parallelise-pairs
([data] (parallelise-pairs @zero-one.geni.defaults/spark data))
([spark data]
(-> spark
java-spark-context
(.parallelizePairs (clojure.core/map interop/->scala-tuple2 data)))))
(def parallelize-pairs parallelise-pairs)

(defn persistent-rdds
([] (persistent-rdds @zero-one.geni.defaults/spark))
([spark] (->> spark java-spark-context .getPersistentRDDs (into {}))))

(defn resources
([] (resources @zero-one.geni.defaults/spark))
([spark] (->> spark java-spark-context .resources (into {}))))

(defn spark-context
([] (spark-context @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .sc)))
(def sc spark-context)

(defn spark-home
([] (spark-home @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .getSparkHome interop/optional->nillable)))

(defmulti text-file (fn [head & _] (class head)))
(defmethod text-file :default
([path] (text-file @zero-one.geni.defaults/spark path))
Expand All @@ -42,6 +124,10 @@
([spark path] (-> spark java-spark-context (.textFile path)))
([spark path min-partitions] (-> spark java-spark-context (.textFile path min-partitions))))

(defn version
([] (version @zero-one.geni.defaults/spark))
([spark] (-> spark java-spark-context .version)))

;; Getters
(defn context [rdd] (JavaSparkContext/fromSparkContext (.context rdd)))

Expand All @@ -58,9 +144,7 @@
(defn num-partitions [rdd] (.getNumPartitions rdd))

(defn partitioner [rdd]
(let [maybe-partitioner (.partitioner rdd)]
(when (.isPresent maybe-partitioner)
(.get maybe-partitioner))))
(interop/optional->nillable (.partitioner rdd)))

(defn partitions [rdd] (.partitions rdd))

Expand Down Expand Up @@ -396,11 +480,5 @@
none
off-heap])

;; JavaSparkContext:
;; TODO: broadcast, default-min-partitions, default-parallelism, empty-rdd
;; TODO: checkpoint-dir, spark-conf, local-property, persistent-rdds, spark-home
;; TODO: local?, jars, master, parallelise-doubles, parallelise-pairs, resources,
;; TODO: spark-context/sc, union, version

;; Others:
;; TODO: name unmangling / setting callsite name
5 changes: 1 addition & 4 deletions src/clojure/zero_one/geni/spark.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,4 @@
(->> spark-session
.sparkContext
.getConf
.getAll
(map interop/scala-tuple->vec)
(into {})
clojure.walk/keywordize-keys))
interop/spark-conf->map))
26 changes: 24 additions & 2 deletions test/zero_one/geni/rdd_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,34 @@
[zero-one.geni.rdd :as rdd]
[zero-one.geni.test-resources :refer [create-temp-file!]])
(:import
(org.apache.spark.api.java JavaSparkContext)))
(org.apache.spark SparkContext)
(org.apache.spark.api.java JavaRDD JavaSparkContext)))

(def dummy-rdd
(rdd/text-file "test/resources/rdd.txt"))

(def dummy-pair-rdd
(rdd/map-to-pair dummy-rdd aot/to-pair))

(facts "On JavaSparkContext methods" :rdd
(fact "expected static fields"
(rdd/app-name) => "Geni App"
(rdd/value (rdd/broadcast [1 2 3])) => [1 2 3]
(rdd/checkpoint-dir) => string?
(rdd/conf) => map?
(rdd/default-min-partitions) => integer?
(rdd/default-parallelism) => integer?
(rdd/empty-rdd) => (partial instance? JavaRDD)
(rdd/jars) => vector?
(rdd/local?) => true
(rdd/local-property "abc") => nil?
(rdd/master) => "local[*]"
(rdd/persistent-rdds) => map?
(rdd/resources) => {}
(rdd/spark-home) => nil?
(rdd/sc) => (partial instance? SparkContext)
(rdd/version) => "3.0.0"))

(facts "On repartitioning" :rdd
(fact "partition-by works"
(-> dummy-rdd
Expand Down Expand Up @@ -199,7 +219,7 @@
(rdd/count read-rdd) => (rdd/count write-rdd))))

(facts "On basic RDD fields" :rdd
(let [rdd (rdd/parallelise [1])]
(let [rdd (rdd/parallelise-doubles [1])]
(rdd/context rdd) => (partial instance? JavaSparkContext)
(rdd/id rdd) => integer?
(rdd/name rdd) => nil?
Expand Down Expand Up @@ -275,6 +295,8 @@
(rdd/map-to-pair aot/to-pair)
rdd/group-by-key
rdd/num-partitions) => #(< 1 %)
(-> (rdd/parallelise-pairs [[1 2] [3 4]])
rdd/collect) => [[1 2] [3 4]]
(-> dummy-pair-rdd
(rdd/group-by-key 7)
rdd/num-partitions) => 7
Expand Down

0 comments on commit d6377d4

Please sign in to comment.