Skip to content

Commit

Permalink
More dstream methods (#221)
Browse files Browse the repository at this point in the history
* Added more DStream methods

* Back to 100% coverage
  • Loading branch information
anthony-khong authored Sep 28, 2020
1 parent aa19725 commit 21b51fb
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 10 deletions.
32 changes: 26 additions & 6 deletions src/clojure/zero_one/geni/streaming.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns zero-one.geni.streaming
(:refer-clojure :exclude [count
filter
print])
map
print
reduce])
(:require
[potemkin :refer [import-vars]]
[zero-one.geni.rdd.function :as function]
Expand All @@ -16,9 +18,9 @@
(org.apache.spark.sql SparkSession)))

;; TODO: count-by-value-and-window,
;; TODO: foreachRDD, map, map-partitions-to-pair, reduce, reduce-by-window
;; TODO: repartition, slice, transform, transform-to-pair, transform-with,
;; TODO: transform-with-to-pair, window, wrap-rdd
;; TODO: foreachRDD, reduce-by-window
;; TODO: slice, transform, transform-to-pair, transform-with,
;; TODO: transform-with-to-pair, window

(defn milliseconds [t] (Milliseconds/apply t))

Expand Down Expand Up @@ -91,8 +93,17 @@
(defn glom [dstream]
(.glom dstream))

(defn map-to-pair [rdd f]
(.mapToPair rdd (function/pair-function f)))
(defn map [dstream f]
(.map dstream (function/function f)))

(defn map-partitions [dstream f]
(.mapPartitions dstream (function/flat-map-function f)))

(defn map-partitions-to-pair [dstream f]
(.mapPartitionsToPair dstream (function/pair-flat-map-function f)))

(defn map-to-pair [dstream f]
(.mapToPair dstream (function/pair-function f)))

(defn persist
([dstream] (.persist dstream))
Expand All @@ -102,12 +113,21 @@
([dstream] (.print dstream))
([dstream num] (.print dstream num)))

(defn reduce [dstream f]
(.reduce dstream (function/function2 f)))

(defn repartition [dstream num-partitions]
(.repartition dstream num-partitions))

(defn slide-duration [dstream]
(.slideDuration dstream))

(defn union [left right]
(.union left right))

(defn wrap-rdd [dstream rdd]
(.wrapRDD dstream (.rdd rdd)))

;; Pair DStream
(defn ->java-dstream [dstream]
(.toJavaDStream dstream))
Expand Down
45 changes: 41 additions & 4 deletions test/zero_one/geni/streaming_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
[midje.sweet :refer [facts fact throws =>]]
[zero-one.geni.aot-functions :as aot]
[zero-one.geni.defaults :as defaults]
[zero-one.geni.rdd :as rdd]
[zero-one.geni.streaming :as streaming])
(:import
(org.apache.spark.streaming Duration StreamingContext)
(org.apache.spark.streaming Duration StreamingContext Time)
(org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext)
(org.apache.spark.streaming.dstream DStream)))

Expand Down Expand Up @@ -65,6 +66,39 @@
(slurp "test/resources/rdd.txt"))

(facts "On DStream methods" :streaming
(stream-results
{:content dummy-text
:fn #(-> %
(streaming/repartition 2)
(streaming/map count)
(streaming/reduce +))
:expected "2709\n"})
=> "2709\n"
(stream-results
{:content dummy-text
:fn #(-> %
(streaming/map-partitions-to-pair aot/mapcat-split-spaces)
(streaming/reduce-by-key + 2)
streaming/->java-dstream)
:finish-fn #(count (string/split % #"\n"))
:expected 23})
=> 23
(stream-results
{:content dummy-text
:fn #(streaming/map-partitions % aot/map-split-spaces)
:finish-fn #(->> (string/split % #"\n")
(map count)
(reduce +))
:expected 2313})
=> 2313
(stream-results
{:content dummy-text
:fn #(streaming/map % count)
:finish-fn #(->> (string/split % #"\n")
(map edn/read-string)
(reduce +))
:expected 2709})
=> 2709
(stream-results
{:content dummy-text
:fn #(-> %
Expand Down Expand Up @@ -155,11 +189,13 @@
;;; TODO: chain with other method
=> (throws java.lang.IllegalArgumentException))

(facts "On durations" :streaming
(facts "On durations and times" :streaming
(fact "durations instantiatable"
(mapv
(fn [f] (f 1) => (partial instance? Duration))
[streaming/milliseconds streaming/seconds streaming/minutes])))
[streaming/milliseconds streaming/seconds streaming/minutes]))
(fact "time coerceable"
(streaming/->time 123) => (Time. 123)))

(facts "On StreamingContext" :streaming
(let [context (streaming/streaming-context @defaults/spark (streaming/seconds 1))]
Expand All @@ -172,4 +208,5 @@
streaming/memory-only)]
dstream => (partial instance? JavaDStream)
(streaming/dstream dstream) => (partial instance? DStream)
(streaming/context dstream) => (partial instance? StreamingContext)))))
(streaming/context dstream) => (partial instance? StreamingContext)
(rdd/collect (streaming/wrap-rdd dstream (rdd/parallelise [1 2 3]))) => [1 2 3]))))

0 comments on commit 21b51fb

Please sign in to comment.