diff --git a/src/clojure/zero_one/geni/interop.clj b/src/clojure/zero_one/geni/interop.clj index 99152472..7aa17951 100644 --- a/src/clojure/zero_one/geni/interop.clj +++ b/src/clojure/zero_one/geni/interop.clj @@ -3,6 +3,7 @@ [camel-snake-kebab.core :refer [->kebab-case]] [clojure.java.data :as j] [clojure.string :refer [replace-first]] + [clojure.walk :as walk] [zero-one.geni.utils :refer [ensure-coll]]) (:import (java.io ByteArrayOutputStream) @@ -52,6 +53,9 @@ (defn ->scala-seq [coll] (JavaConversions/asScalaBuffer (seq coll))) +(defn ->scala-tuple2 [coll] + (Tuple2. (first coll) (second coll))) + (defn scala-tuple->vec [p] (->> (.productArity p) (range) @@ -70,6 +74,10 @@ (defn ->scala-function3 [f] (reify Function3 (apply [_ x y z] (f x y z)))) +(defn optional->nillable [value] + (when (.isPresent value) + (.get value))) + (defmacro with-scala-out-str [& body] `(let [out-buffer# (ByteArrayOutputStream.)] (Console/withOut @@ -77,6 +85,13 @@ (->scala-function0 (fn [] ~@body))) (.toString out-buffer# "UTF-8"))) +(defn spark-conf->map [conf] + (->> conf + .getAll + (map scala-tuple->vec) + (into {}) + walk/keywordize-keys)) + (defn ->dense-vector [values] (let [[x & xs] values] (Vectors/dense x (->scala-seq xs)))) diff --git a/src/clojure/zero_one/geni/rdd.clj b/src/clojure/zero_one/geni/rdd.clj index 932463de..fa5bc98f 100644 --- a/src/clojure/zero_one/geni/rdd.clj +++ b/src/clojure/zero_one/geni/rdd.clj @@ -26,14 +26,96 @@ (org.apache.spark.api.java JavaSparkContext) (org.apache.spark.sql SparkSession))) +(def value (memfn value)) + +;; Java Spark Context (defn java-spark-context [spark] (JavaSparkContext/fromSparkContext (.sparkContext spark))) +(defn app-name + ([] (app-name @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .appName))) + +(defn broadcast + ([value] (broadcast @zero-one.geni.defaults/spark value)) + ([spark value] (-> spark java-spark-context (.broadcast value)))) + +(defn checkpoint-dir + ([] (checkpoint-dir @zero-one.geni.defaults/spark)) + ([spark] + (-> spark java-spark-context .getCheckpointDir interop/optional->nillable))) + +(defn conf + ([] (conf @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .getConf interop/spark-conf->map))) + +(defn default-min-partitions + ([] (default-min-partitions @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .defaultMinPartitions))) + +(defn default-parallelism + ([] (default-parallelism @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .defaultParallelism))) + +(defn empty-rdd + ([] (empty-rdd @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .emptyRDD))) + +(defn jars + ([] (jars @zero-one.geni.defaults/spark)) + ([spark] (->> spark java-spark-context .jars (into [])))) + +(defn local? + ([] (local? @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .isLocal))) +(def is-local local?) + +(defn local-property + ([k] (local-property @zero-one.geni.defaults/spark k)) + ([spark k] (-> spark java-spark-context (.getLocalProperty k)))) + +(defn master + ([] (master @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .master))) + (defn parallelise ([data] (parallelise @zero-one.geni.defaults/spark data)) ([spark data] (-> spark java-spark-context (.parallelize data)))) (def parallelize parallelise) +(defn parallelise-doubles + ([data] (parallelise-doubles @zero-one.geni.defaults/spark data)) + ([spark data] + (-> spark + java-spark-context + (.parallelizeDoubles (clojure.core/map double data))))) +(def parallelize-doubles parallelise-doubles) + +(defn parallelise-pairs + ([data] (parallelise-pairs @zero-one.geni.defaults/spark data)) + ([spark data] + (-> spark + java-spark-context + (.parallelizePairs (clojure.core/map interop/->scala-tuple2 data))))) +(def parallelize-pairs parallelise-pairs) + +(defn persistent-rdds + ([] (persistent-rdds @zero-one.geni.defaults/spark)) + ([spark] (->> spark java-spark-context .getPersistentRDDs (into {})))) + +(defn resources + ([] (resources @zero-one.geni.defaults/spark)) + ([spark] (->> spark java-spark-context .resources (into {})))) + +(defn spark-context + ([] (spark-context @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .sc))) +(def sc spark-context) + +(defn spark-home + ([] (spark-home @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .getSparkHome interop/optional->nillable))) + (defmulti text-file (fn [head & _] (class head))) (defmethod text-file :default ([path] (text-file @zero-one.geni.defaults/spark path)) @@ -42,6 +124,10 @@ ([spark path] (-> spark java-spark-context (.textFile path))) ([spark path min-partitions] (-> spark java-spark-context (.textFile path min-partitions)))) +(defn version + ([] (version @zero-one.geni.defaults/spark)) + ([spark] (-> spark java-spark-context .version))) + ;; Getters (defn context [rdd] (JavaSparkContext/fromSparkContext (.context rdd))) @@ -58,9 +144,7 @@ (defn num-partitions [rdd] (.getNumPartitions rdd)) (defn partitioner [rdd] - (let [maybe-partitioner (.partitioner rdd)] - (when (.isPresent maybe-partitioner) - (.get maybe-partitioner)))) + (interop/optional->nillable (.partitioner rdd))) (defn partitions [rdd] (.partitions rdd)) @@ -396,11 +480,5 @@ none off-heap]) -;; JavaSparkContext: -;; TODO: broadcast, default-min-partitions, default-parallelism, empty-rdd -;; TODO: checkpoint-dir, spark-conf, local-property, persistent-rdds, spark-home -;; TODO: local?, jars, master, parallelise-doubles, parallelise-pairs, resources, -;; TODO: spark-context/sc, union, version - ;; Others: ;; TODO: name unmangling / setting callsite name diff --git a/src/clojure/zero_one/geni/spark.clj b/src/clojure/zero_one/geni/spark.clj index 950af4d8..d603ac98 100644 --- a/src/clojure/zero_one/geni/spark.clj +++ b/src/clojure/zero_one/geni/spark.clj @@ -38,7 +38,4 @@ (->> spark-session .sparkContext .getConf - .getAll - (map interop/scala-tuple->vec) - (into {}) - clojure.walk/keywordize-keys)) + interop/spark-conf->map)) diff --git a/test/zero_one/geni/rdd_test.clj b/test/zero_one/geni/rdd_test.clj index 27188c07..7c57460d 100644 --- a/test/zero_one/geni/rdd_test.clj +++ b/test/zero_one/geni/rdd_test.clj @@ -8,7 +8,8 @@ [zero-one.geni.rdd :as rdd] [zero-one.geni.test-resources :refer [create-temp-file!]]) (:import - (org.apache.spark.api.java JavaSparkContext))) + (org.apache.spark SparkContext) + (org.apache.spark.api.java JavaRDD JavaSparkContext))) (def dummy-rdd (rdd/text-file "test/resources/rdd.txt")) @@ -16,6 +17,25 @@ (def dummy-pair-rdd (rdd/map-to-pair dummy-rdd aot/to-pair)) +(facts "On JavaSparkContext methods" :rdd + (fact "expected static fields" + (rdd/app-name) => "Geni App" + (rdd/value (rdd/broadcast [1 2 3])) => [1 2 3] + (rdd/checkpoint-dir) => string? + (rdd/conf) => map? + (rdd/default-min-partitions) => integer? + (rdd/default-parallelism) => integer? + (rdd/empty-rdd) => (partial instance? JavaRDD) + (rdd/jars) => vector? + (rdd/local?) => true + (rdd/local-property "abc") => nil? + (rdd/master) => "local[*]" + (rdd/persistent-rdds) => map? + (rdd/resources) => {} + (rdd/spark-home) => nil? + (rdd/sc) => (partial instance? SparkContext) + (rdd/version) => "3.0.0")) + (facts "On repartitioning" :rdd (fact "partition-by works" (-> dummy-rdd @@ -199,7 +219,7 @@ (rdd/count read-rdd) => (rdd/count write-rdd)))) (facts "On basic RDD fields" :rdd - (let [rdd (rdd/parallelise [1])] + (let [rdd (rdd/parallelise-doubles [1])] (rdd/context rdd) => (partial instance? JavaSparkContext) (rdd/id rdd) => integer? (rdd/name rdd) => nil? @@ -275,6 +295,8 @@ (rdd/map-to-pair aot/to-pair) rdd/group-by-key rdd/num-partitions) => #(< 1 %) + (-> (rdd/parallelise-pairs [[1 2] [3 4]]) + rdd/collect) => [[1 2] [3 4]] (-> dummy-pair-rdd (rdd/group-by-key 7) rdd/num-partitions) => 7