Skip to content

Commit

Permalink
Added methods for partitioner (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
anthony-khong authored Sep 1, 2020
1 parent 56ebd27 commit 38731f7
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 25 deletions.
20 changes: 20 additions & 0 deletions src/clojure/zero_one/geni/partitioner.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(ns zero-one.geni.partitioner
(:refer-clojure :exclude [partition])
(:import
(org.apache.spark HashPartitioner)))

(defn hash-partitioner [partitions]
(HashPartitioner. partitions))

(defn num-partitions [partitioner]
(.numPartitions partitioner))

(defn get-partition [partitioner k]
(.getPartition partitioner k))

(defn equals [left right]
(.equals left right))
(def equals? equals)

(defn hash-code [partitioner]
(.hashCode partitioner))
60 changes: 37 additions & 23 deletions src/clojure/zero_one/geni/rdd.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
max
min
name
partition-by
reduce
take
vals])
Expand Down Expand Up @@ -56,11 +57,10 @@

(defn num-partitions [rdd] (.getNumPartitions rdd))

; TODO: must be able to test the use of partitioner
;(defn partitioner [rdd]
; (let [maybe-partitioner (.partitioner rdd)]
; (when (.isPresent maybe-partitioner)
; (.get maybe-partitioner))))
(defn partitioner [rdd]
(let [maybe-partitioner (.partitioner rdd)]
(when (.isPresent maybe-partitioner)
(.get maybe-partitioner))))

(defn partitions [rdd] (.partitions rdd))

Expand Down Expand Up @@ -148,13 +148,18 @@
(defn repartition [rdd num-partitions]
(.repartition rdd num-partitions))

(defn repartition-and-sort-within-partitions
([rdd partitioner] (.repartitionAndSortWithinPartitions rdd partitioner))
([rdd partitioner cmp] (.repartitionAndSortWithinPartitions rdd partitioner cmp)))

(defn sample
([rdd with-replacement fraction] (.sample rdd with-replacement fraction))
([rdd with-replacement fraction seed] (.sample rdd with-replacement fraction seed)))

(defn subtract
([left right] (.subtract left right))
([left right num-partitions] (.subtract left right num-partitions)))
([left right partitions-or-partitioner]
(.subtract left right partitions-or-partitioner)))

(defn top
([rdd n] (-> rdd (.top n) seq interop/->clojure))
Expand Down Expand Up @@ -247,6 +252,7 @@
(.saveAsTextFile rdd path))

;; PairRDD Transformations

(defn aggregate-by-key
([rdd zero seq-fn comb-fn]
(.aggregateByKey rdd
Expand All @@ -266,12 +272,12 @@
(function/function create-fn)
(function/function2 merge-value-fn)
(function/function2 merge-combiner-fn)))
([rdd create-fn merge-value-fn merge-combiner-fn num-partitions]
([rdd create-fn merge-value-fn merge-combiner-fn partitions-or-partitioner]
(.combineByKey rdd
(function/function create-fn)
(function/function2 merge-value-fn)
(function/function2 merge-combiner-fn)
num-partitions)))
partitions-or-partitioner)))

(defn count-by-key [rdd]
(into {} (.countByKey rdd)))
Expand All @@ -288,49 +294,55 @@

(defn count-approx-distinct-by-key
([rdd relative-sd] (.countApproxDistinctByKey rdd relative-sd))
([rdd relative-sd num-partitions]
(.countApproxDistinctByKey rdd relative-sd num-partitions)))
([rdd relative-sd partitions-or-partitioner]
(.countApproxDistinctByKey rdd relative-sd partitions-or-partitioner)))

(defn flat-map-values [rdd f]
(.flatMapValues rdd (function/flat-map-function f)))

(defn fold-by-key
([rdd zero f] (.foldByKey rdd zero (function/function2 f)))
([rdd zero num-partitions f]
(.foldByKey rdd zero num-partitions (function/function2 f))))
([rdd zero partitions-or-partitioner f]
(.foldByKey rdd zero partitions-or-partitioner (function/function2 f))))

(defn full-outer-join
([left right] (.fullOuterJoin left right))
([left right num-partitions] (.fullOuterJoin left right num-partitions)))
([left right partitions-or-partitioner] (.fullOuterJoin left right partitions-or-partitioner)))

(defn group-by
([rdd f] (.groupBy rdd (function/function f)))
([rdd f num-partitions] (.groupBy rdd (function/function f) num-partitions)))

(defn join
([left right] (.join left right))
([left right num-partitions] (.join left right num-partitions)))
([left right partitions-or-partitioner] (.join left right partitions-or-partitioner)))

(defn keys [rdd]
(.keys rdd))

(defn left-outer-join
([left right] (.leftOuterJoin left right))
([left right num-partitions] (.leftOuterJoin left right num-partitions)))
([left right partitions-or-partitioner]
(.leftOuterJoin left right partitions-or-partitioner)))

(defn map-values [rdd f]
(.mapValues rdd (function/function f)))

(defn partition-by [rdd partitioner]
(.partitionBy rdd partitioner))

(defn reduce-by-key
([rdd f] (.reduceByKey rdd (function/function2 f)))
([rdd f num-partitions] (.reduceByKey rdd (function/function2 f) num-partitions)))
([rdd f partitions-or-partitioner]
(.reduceByKey rdd (function/function2 f) partitions-or-partitioner)))

(defn reduce-by-key-locally [rdd f]
(into {} (.reduceByKeyLocally rdd (function/function2 f))))

(defn right-outer-join
([left right] (.rightOuterJoin left right))
([left right num-partitions] (.rightOuterJoin left right num-partitions)))
([left right partitions-or-partitioner]
(.rightOuterJoin left right partitions-or-partitioner)))

(defn sample-by-key
([rdd with-replacement fractions]
Expand All @@ -350,7 +362,8 @@

(defn subtract-by-key
([left right] (.subtractByKey left right))
([left right num-partitions] (.subtractByKey left right num-partitions)))
([left right partitions-or-partitioner]
(.subtractByKey left right partitions-or-partitioner)))

(defn values [rdd]
(.values rdd))
Expand All @@ -368,7 +381,6 @@
(defn is-initial-value-final [result] (.isInitialValueFinal result))
(def final? is-initial-value-final)

;; Polymorphic
(import-vars
[zero-one.geni.storage
disk-only
Expand All @@ -384,9 +396,11 @@
none
off-heap])

;; JavaSparkContext:
;; TODO: broadcast, default-min-partitions, default-parallelism, empty-rdd
;; TODO: checkpoint-dir, spark-conf, local-property, persistent-rdds, spark-home
;; TODO: local?, jars, master, parallelise-doubles, parallelise-pairs, resources,
;; TODO: spark-context/sc, union, version

;; Others:
;; TODO: broadcast, partitioner
;; TODO: name unmangling / setting callsite name
;; PairRDD
;; TODO: cogroup-partitioned
;; TODO: repartition-and-sort-within-partitions
12 changes: 12 additions & 0 deletions test/zero_one/geni/partitioner_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns zero-one.geni.partitioner-test
(:require
[midje.sweet :refer [facts =>]]
[zero-one.geni.partitioner :as partitioner]))

(facts "On partitioner fields" :rdd
(let [partitioner (partitioner/hash-partitioner 12)]
(partitioner/num-partitions partitioner) => 12
(partitioner/get-partition partitioner 123) => int?
(partitioner/equals? partitioner partitioner) => true
(partitioner/hash-code partitioner) => int?))

27 changes: 25 additions & 2 deletions test/zero_one/geni/rdd_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[midje.sweet :refer [facts fact =>]]
[zero-one.geni.aot-functions :as aot]
[zero-one.geni.defaults]
[zero-one.geni.partitioner :as partitioner]
[zero-one.geni.rdd :as rdd]
[zero-one.geni.test-resources :refer [create-temp-file!]])
(:import
Expand All @@ -15,6 +16,24 @@
(def dummy-pair-rdd
(rdd/map-to-pair dummy-rdd aot/to-pair))

(facts "On repartitioning" :rdd
(fact "partition-by works"
(-> dummy-rdd
(rdd/map-to-pair aot/to-pair)
(rdd/partition-by (partitioner/hash-partitioner 11))
rdd/num-partitions) => 11)
(fact "repartition-and-sort-within-partitions works"
(-> dummy-rdd
(rdd/map-to-pair aot/to-pair)
(rdd/repartition-and-sort-within-partitions (partitioner/hash-partitioner 1))
rdd/collect
distinct) => #(= % (sort %))
(-> (rdd/parallelise [1 2 3 4 5 4 3 2 1])
(rdd/map-to-pair aot/to-pair)
(rdd/repartition-and-sort-within-partitions (partitioner/hash-partitioner 1) >)
rdd/collect
distinct) => #(= % (reverse (sort %)))))

(facts "On basic PairRDD transformations" :rdd
(fact "cogroup work"
(let [left (rdd/flat-map-to-pair dummy-rdd aot/split-spaces-and-pair)
Expand Down Expand Up @@ -187,8 +206,12 @@
(rdd/checkpointed? rdd) => false
(rdd/empty? (rdd/parallelise [])) => true
(rdd/empty? rdd) => false
(rdd/empty? rdd) => false))
; (rdd/partitioner rdd) => nil?))
(rdd/empty? rdd) => false
(rdd/partitioner rdd) => nil?
(-> dummy-rdd
(rdd/map-to-pair aot/to-pair)
(rdd/group-by-key (partitioner/hash-partitioner 123))
rdd/partitioner) => (complement nil?)))

(facts "On basic PartialResult" :rdd
(let [result (rdd/count-approx dummy-rdd 1000)]
Expand Down

0 comments on commit 38731f7

Please sign in to comment.