From 38731f783eff377893950c52c355c226c085331a Mon Sep 17 00:00:00 2001 From: Anthony Khong Date: Tue, 1 Sep 2020 08:45:40 +0700 Subject: [PATCH] Added methods for partitioner (#181) --- src/clojure/zero_one/geni/partitioner.clj | 20 ++++++++ src/clojure/zero_one/geni/rdd.clj | 60 ++++++++++++++--------- test/zero_one/geni/partitioner_test.clj | 12 +++++ test/zero_one/geni/rdd_test.clj | 27 +++++++++- 4 files changed, 94 insertions(+), 25 deletions(-) create mode 100644 src/clojure/zero_one/geni/partitioner.clj create mode 100644 test/zero_one/geni/partitioner_test.clj diff --git a/src/clojure/zero_one/geni/partitioner.clj b/src/clojure/zero_one/geni/partitioner.clj new file mode 100644 index 00000000..bfaa88b6 --- /dev/null +++ b/src/clojure/zero_one/geni/partitioner.clj @@ -0,0 +1,20 @@ +(ns zero-one.geni.partitioner + (:refer-clojure :exclude [partition]) + (:import + (org.apache.spark HashPartitioner))) + +(defn hash-partitioner [partitions] + (HashPartitioner. partitions)) + +(defn num-partitions [partitioner] + (.numPartitions partitioner)) + +(defn get-partition [partitioner k] + (.getPartition partitioner k)) + +(defn equals [left right] + (.equals left right)) +(def equals? equals) + +(defn hash-code [partitioner] + (.hashCode partitioner)) diff --git a/src/clojure/zero_one/geni/rdd.clj b/src/clojure/zero_one/geni/rdd.clj index 7d5c3946..932463de 100644 --- a/src/clojure/zero_one/geni/rdd.clj +++ b/src/clojure/zero_one/geni/rdd.clj @@ -11,6 +11,7 @@ max min name + partition-by reduce take vals]) @@ -56,11 +57,10 @@ (defn num-partitions [rdd] (.getNumPartitions rdd)) -; TODO: must be able to test the use of partitioner -;(defn partitioner [rdd] -; (let [maybe-partitioner (.partitioner rdd)] -; (when (.isPresent maybe-partitioner) -; (.get maybe-partitioner)))) +(defn partitioner [rdd] + (let [maybe-partitioner (.partitioner rdd)] + (when (.isPresent maybe-partitioner) + (.get maybe-partitioner)))) (defn partitions [rdd] (.partitions rdd)) @@ -148,13 +148,18 @@ (defn repartition [rdd num-partitions] (.repartition rdd num-partitions)) +(defn repartition-and-sort-within-partitions + ([rdd partitioner] (.repartitionAndSortWithinPartitions rdd partitioner)) + ([rdd partitioner cmp] (.repartitionAndSortWithinPartitions rdd partitioner cmp))) + (defn sample ([rdd with-replacement fraction] (.sample rdd with-replacement fraction)) ([rdd with-replacement fraction seed] (.sample rdd with-replacement fraction seed))) (defn subtract ([left right] (.subtract left right)) - ([left right num-partitions] (.subtract left right num-partitions))) + ([left right partitions-or-partitioner] + (.subtract left right partitions-or-partitioner))) (defn top ([rdd n] (-> rdd (.top n) seq interop/->clojure)) @@ -247,6 +252,7 @@ (.saveAsTextFile rdd path)) ;; PairRDD Transformations + (defn aggregate-by-key ([rdd zero seq-fn comb-fn] (.aggregateByKey rdd @@ -266,12 +272,12 @@ (function/function create-fn) (function/function2 merge-value-fn) (function/function2 merge-combiner-fn))) - ([rdd create-fn merge-value-fn merge-combiner-fn num-partitions] + ([rdd create-fn merge-value-fn merge-combiner-fn partitions-or-partitioner] (.combineByKey rdd (function/function create-fn) (function/function2 merge-value-fn) (function/function2 merge-combiner-fn) - num-partitions))) + partitions-or-partitioner))) (defn count-by-key [rdd] (into {} (.countByKey rdd))) @@ -288,20 +294,20 @@ (defn count-approx-distinct-by-key ([rdd relative-sd] (.countApproxDistinctByKey rdd relative-sd)) - ([rdd relative-sd num-partitions] - (.countApproxDistinctByKey rdd relative-sd num-partitions))) + ([rdd relative-sd partitions-or-partitioner] + (.countApproxDistinctByKey rdd relative-sd partitions-or-partitioner))) (defn flat-map-values [rdd f] (.flatMapValues rdd (function/flat-map-function f))) (defn fold-by-key ([rdd zero f] (.foldByKey rdd zero (function/function2 f))) - ([rdd zero num-partitions f] - (.foldByKey rdd zero num-partitions (function/function2 f)))) + ([rdd zero partitions-or-partitioner f] + (.foldByKey rdd zero partitions-or-partitioner (function/function2 f)))) (defn full-outer-join ([left right] (.fullOuterJoin left right)) - ([left right num-partitions] (.fullOuterJoin left right num-partitions))) + ([left right partitions-or-partitioner] (.fullOuterJoin left right partitions-or-partitioner))) (defn group-by ([rdd f] (.groupBy rdd (function/function f))) @@ -309,28 +315,34 @@ (defn join ([left right] (.join left right)) - ([left right num-partitions] (.join left right num-partitions))) + ([left right partitions-or-partitioner] (.join left right partitions-or-partitioner))) (defn keys [rdd] (.keys rdd)) (defn left-outer-join ([left right] (.leftOuterJoin left right)) - ([left right num-partitions] (.leftOuterJoin left right num-partitions))) + ([left right partitions-or-partitioner] + (.leftOuterJoin left right partitions-or-partitioner))) (defn map-values [rdd f] (.mapValues rdd (function/function f))) +(defn partition-by [rdd partitioner] + (.partitionBy rdd partitioner)) + (defn reduce-by-key ([rdd f] (.reduceByKey rdd (function/function2 f))) - ([rdd f num-partitions] (.reduceByKey rdd (function/function2 f) num-partitions))) + ([rdd f partitions-or-partitioner] + (.reduceByKey rdd (function/function2 f) partitions-or-partitioner))) (defn reduce-by-key-locally [rdd f] (into {} (.reduceByKeyLocally rdd (function/function2 f)))) (defn right-outer-join ([left right] (.rightOuterJoin left right)) - ([left right num-partitions] (.rightOuterJoin left right num-partitions))) + ([left right partitions-or-partitioner] + (.rightOuterJoin left right partitions-or-partitioner))) (defn sample-by-key ([rdd with-replacement fractions] @@ -350,7 +362,8 @@ (defn subtract-by-key ([left right] (.subtractByKey left right)) - ([left right num-partitions] (.subtractByKey left right num-partitions))) + ([left right partitions-or-partitioner] + (.subtractByKey left right partitions-or-partitioner))) (defn values [rdd] (.values rdd)) @@ -368,7 +381,6 @@ (defn is-initial-value-final [result] (.isInitialValueFinal result)) (def final? is-initial-value-final) -;; Polymorphic (import-vars [zero-one.geni.storage disk-only @@ -384,9 +396,11 @@ none off-heap]) +;; JavaSparkContext: +;; TODO: broadcast, default-min-partitions, default-parallelism, empty-rdd +;; TODO: checkpoint-dir, spark-conf, local-property, persistent-rdds, spark-home +;; TODO: local?, jars, master, parallelise-doubles, parallelise-pairs, resources, +;; TODO: spark-context/sc, union, version + ;; Others: -;; TODO: broadcast, partitioner ;; TODO: name unmangling / setting callsite name -;; PairRDD -;; TODO: cogroup-partitioned -;; TODO: repartition-and-sort-within-partitions diff --git a/test/zero_one/geni/partitioner_test.clj b/test/zero_one/geni/partitioner_test.clj new file mode 100644 index 00000000..05ae7ff7 --- /dev/null +++ b/test/zero_one/geni/partitioner_test.clj @@ -0,0 +1,12 @@ +(ns zero-one.geni.partitioner-test + (:require + [midje.sweet :refer [facts =>]] + [zero-one.geni.partitioner :as partitioner])) + +(facts "On partitioner fields" :rdd + (let [partitioner (partitioner/hash-partitioner 12)] + (partitioner/num-partitions partitioner) => 12 + (partitioner/get-partition partitioner 123) => int? + (partitioner/equals? partitioner partitioner) => true + (partitioner/hash-code partitioner) => int?)) + diff --git a/test/zero_one/geni/rdd_test.clj b/test/zero_one/geni/rdd_test.clj index 755b53df..27188c07 100644 --- a/test/zero_one/geni/rdd_test.clj +++ b/test/zero_one/geni/rdd_test.clj @@ -4,6 +4,7 @@ [midje.sweet :refer [facts fact =>]] [zero-one.geni.aot-functions :as aot] [zero-one.geni.defaults] + [zero-one.geni.partitioner :as partitioner] [zero-one.geni.rdd :as rdd] [zero-one.geni.test-resources :refer [create-temp-file!]]) (:import @@ -15,6 +16,24 @@ (def dummy-pair-rdd (rdd/map-to-pair dummy-rdd aot/to-pair)) +(facts "On repartitioning" :rdd + (fact "partition-by works" + (-> dummy-rdd + (rdd/map-to-pair aot/to-pair) + (rdd/partition-by (partitioner/hash-partitioner 11)) + rdd/num-partitions) => 11) + (fact "repartition-and-sort-within-partitions works" + (-> dummy-rdd + (rdd/map-to-pair aot/to-pair) + (rdd/repartition-and-sort-within-partitions (partitioner/hash-partitioner 1)) + rdd/collect + distinct) => #(= % (sort %)) + (-> (rdd/parallelise [1 2 3 4 5 4 3 2 1]) + (rdd/map-to-pair aot/to-pair) + (rdd/repartition-and-sort-within-partitions (partitioner/hash-partitioner 1) >) + rdd/collect + distinct) => #(= % (reverse (sort %))))) + (facts "On basic PairRDD transformations" :rdd (fact "cogroup work" (let [left (rdd/flat-map-to-pair dummy-rdd aot/split-spaces-and-pair) @@ -187,8 +206,12 @@ (rdd/checkpointed? rdd) => false (rdd/empty? (rdd/parallelise [])) => true (rdd/empty? rdd) => false - (rdd/empty? rdd) => false)) - ; (rdd/partitioner rdd) => nil?)) + (rdd/empty? rdd) => false + (rdd/partitioner rdd) => nil? + (-> dummy-rdd + (rdd/map-to-pair aot/to-pair) + (rdd/group-by-key (partitioner/hash-partitioner 123)) + rdd/partitioner) => (complement nil?))) (facts "On basic PartialResult" :rdd (let [result (rdd/count-approx dummy-rdd 1000)]