Skip to content

Commit

Permalink
Better retrying dstream test fn (#220)
Browse files Browse the repository at this point in the history
* failed attempt to convert everything to javadstream

* Made do with less powerful test for streaming

* Added TODOs + more methdos

* Better streaming test retrying

* Better test + more methods
  • Loading branch information
anthony-khong authored Sep 27, 2020
1 parent 1a3a4ba commit aa19725
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 67 deletions.
82 changes: 54 additions & 28 deletions src/clojure/zero_one/geni/streaming.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns zero-one.geni.streaming
(:refer-clojure :exclude [count
filter
print])
(:require
[potemkin :refer [import-vars]]
Expand All @@ -14,8 +15,8 @@
Time)
(org.apache.spark.sql SparkSession)))

;; TODO: count-by-value-and-window, dstream, filter, flat-map-to-pair, flat-map-values
;; TODO: foreachRDD, map, map-partitions-to-pair, map-to-pair, reduce, reduce-by-window
;; TODO: count-by-value-and-window,
;; TODO: foreachRDD, map, map-partitions-to-pair, reduce, reduce-by-window
;; TODO: repartition, slice, transform, transform-to-pair, transform-with,
;; TODO: transform-with-to-pair, window, wrap-rdd

Expand All @@ -36,10 +37,10 @@
(.textFileStream context path))

(defmulti save-as-text-files! (fn [head & _] (class head)))
(defmethod save-as-text-files! JavaDStream [d-stream path]
(save-as-text-files! (.dstream d-stream) path))
(defmethod save-as-text-files! :default [d-stream path]
(.saveAsTextFiles d-stream path ""))
(defmethod save-as-text-files! JavaDStream [dstream path]
(save-as-text-files! (.dstream dstream) path))
(defmethod save-as-text-files! :default [dstream path]
(.saveAsTextFiles dstream path ""))

(defn start! [context]
(future (.start context)))
Expand All @@ -50,48 +51,73 @@
(defn stop! [context]
(future (.stop context false true)))

(defn cache [d-stream]
(.cache d-stream))
(defn cache [dstream]
(.cache dstream))

(defn checkpoint [d-stream interval]
(.checkpoint d-stream interval))
(defn checkpoint [dstream interval]
(.checkpoint dstream interval))

(defn context [d-stream]
(.context d-stream))
(defn context [dstream]
(.context dstream))

(defn ->time [value]
(if (instance? Time value) value (Time. value)))

(defn compute [d-stream t]
(.compute d-stream (->time t)))
(defn compute [dstream t]
(.compute dstream (->time t)))

(defn count [d-stream]
(.count d-stream))
(defn count [dstream]
(.count dstream))

(defn count-by-value
([d-stream] (.countByValue d-stream))
([d-stream num-partitions] (.countByValue d-stream num-partitions)))
([dstream] (.countByValue dstream))
([dstream num-partitions] (.countByValue dstream num-partitions)))

(defn flat-map [d-stream f]
(.flatMap d-stream (function/flat-map-function f)))
(def dstream (memfn dstream))

(defn glom [d-stream]
(.glom d-stream))
(defn filter [dstream f]
(.filter dstream (function/function f)))

(defn flat-map [dstream f]
(.flatMap dstream (function/flat-map-function f)))

(defn flat-map-to-pair [dstream f]
(.flatMapToPair dstream (function/pair-flat-map-function f)))
(def mapcat-to-pair flat-map-to-pair)

(defn flat-map-values [dstream f]
(.flatMapValues dstream (function/flat-map-function f)))

(defn glom [dstream]
(.glom dstream))

(defn map-to-pair [rdd f]
(.mapToPair rdd (function/pair-function f)))

(defn persist
([d-stream] (.persist d-stream))
([d-stream storage-level] (.persist d-stream storage-level)))
([dstream] (.persist dstream))
([dstream storage-level] (.persist dstream storage-level)))

(defn print
([d-stream] (.print d-stream))
([d-stream num] (.print d-stream num)))
([dstream] (.print dstream))
([dstream num] (.print dstream num)))

(defn slide-duration [d-stream]
(.slideDuration d-stream))
(defn slide-duration [dstream]
(.slideDuration dstream))

(defn union [left right]
(.union left right))

;; Pair DStream
(defn ->java-dstream [dstream]
(.toJavaDStream dstream))

(defn reduce-by-key
([dstream f]
(.reduceByKey dstream (function/function2 f)))
([dstream f partitions-or-partitioner]
(.reduceByKey dstream (function/function2 f) partitions-or-partitioner)))

(import-vars
[zero-one.geni.storage
disk-only
Expand Down
131 changes: 92 additions & 39 deletions test/zero_one/geni/streaming_test.clj
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
(ns zero-one.geni.streaming-test
(:require
[clojure.edn :as edn]
[clojure.string :as string]
[clojure.java.io :as io]
[clojure.string :as string]
[midje.sweet :refer [facts fact throws =>]]
[zero-one.geni.aot-functions :as aot]
[zero-one.geni.defaults :as defaults]
[zero-one.geni.streaming :as streaming])
(:import
(org.apache.spark.streaming Duration StreamingContext)
(org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext)
(org.apache.spark.streaming Duration StreamingContext)))
(org.apache.spark.streaming.dstream DStream)))

(defn create-random-temp-file! [file-name]
(let [temp-dir (System/getProperty "java.io.tmpdir")
Expand Down Expand Up @@ -37,69 +38,120 @@
input-stream (streaming/text-file-stream
context
(-> read-file .getParent .toString))
d-stream ((:fn opts identity) input-stream)]
dstream ((:fn opts identity) input-stream)]
(spit read-file "")
(streaming/save-as-text-files! d-stream (.toString write-file))
(streaming/save-as-text-files! dstream (.toString write-file))
@(streaming/start! context)
(Thread/sleep (:sleep-ms opts 50))
(spit read-file (str (:content opts "Hello World!")))
((:action-fn opts identity) d-stream)
((:action-fn opts identity) dstream)
(Thread/sleep (:sleep-ms opts 50))
(streaming/await-termination! context)
@(streaming/stop! context)
(let [result (written-content write-file)
n-retries (:n-retries opts 0)
max-retries (:max-tries opts 5)]
(if (and (= result "")
max-retries (:max-tries opts 5)
expected (:expected opts result)
finish-fn (:finish-fn opts identity)]
(if (and (or (= result "")
(not= (finish-fn result) expected))
(< n-retries max-retries))
(do
(println "Retrying stream-results ...")
(stream-results (assoc opts :n-retries (inc n-retries))))
result))))
(finish-fn result)))))

(def dummy-text
(slurp "test/resources/rdd.txt"))

(facts "On DStream methods" :streaming
(stream-results {:content dummy-text
:fn (comp streaming/count streaming/count-by-value)})
(stream-results
{:content dummy-text
:fn #(-> %
(streaming/map-to-pair aot/to-pair)
(streaming/flat-map-values aot/to-pair)
streaming/->java-dstream)
:finish-fn #(-> % (string/split #"\n") distinct count)
:expected 6})
=> 6
(stream-results
{:content dummy-text
:fn #(-> %
(streaming/flat-map-to-pair aot/split-spaces-and-pair)
(streaming/reduce-by-key +)
streaming/->java-dstream)
:finish-fn #(-> % (string/split #"\n") distinct count)
:expected 23})
=> 23
(stream-results
{:content dummy-text
:fn #(-> %
(streaming/flat-map aot/split-spaces)
(streaming/filter aot/equals-lewis))
:finish-fn #(set (string/split % #"\n"))
:expected #{"Lewis"}})
=> #{"Lewis"}
(stream-results
{:content dummy-text
:fn (comp streaming/count streaming/count-by-value)})
=> #(string/includes? % "0\n")
(stream-results {:content dummy-text
:fn (comp streaming/count #(streaming/count-by-value % 1))})
(stream-results
{:content dummy-text
:fn (comp streaming/count #(streaming/count-by-value % 1))})
=> #(string/includes? % "0\n")
(stream-results
{:content dummy-text
:action-fn #(assert (nil? (streaming/compute % (+ 100 (System/currentTimeMillis)))))})
:action-fn #(let [now (System/currentTimeMillis)]
(assert (nil? (streaming/compute % (+ 100 now)))))})
=> string?
(-> (stream-results
{:content dummy-text
:fn #(streaming/flat-map % aot/split-spaces)})
(string/split #"\n")
count)
=> pos?)
(stream-results
{:content dummy-text
:fn #(streaming/flat-map % aot/split-spaces)
:finish-fn #(count (string/split % #"\n"))
:expected 522})
=> 522)

(facts "On DStream testing" :streaming
(stream-results {:content (range 10) :fn streaming/cache})
(stream-results
{:content (range 10)
:fn streaming/cache})
=> (str (range 10) "\n")
(stream-results {:content (range 10)
:fn #(streaming/checkpoint % (streaming/milliseconds 200))})
(stream-results
{:content (range 10)
:fn #(streaming/checkpoint % (streaming/milliseconds 200))})
=> (str (range 10) "\n")
(stream-results {:content "abc\ndef" :fn streaming/count})
(stream-results
{:content "abc\ndef"
:fn streaming/count})
=> #(->> (string/split % #"\n") (map edn/read-string) (every? int?))
(stream-results {:content "abc\ndef" :fn streaming/glom})
(stream-results
{:content "abc\ndef"
:fn streaming/glom})
=> #(string/includes? % "[abc")
(stream-results {:content "abc\ndef" :fn streaming/persist})
=> #(string/includes? % "abc\n")
(stream-results {:content "abc\ndef"
:fn #(streaming/persist % streaming/memory-only)})
(stream-results
{:content "abc\ndef"
:fn streaming/persist
:expected "abc\ndef\n"})
=> "abc\ndef\n"
(stream-results {:content (range 10) :fn streaming/print})
(stream-results
{:content "abc\ndef"
:fn #(streaming/persist % streaming/memory-only)})
=> "abc\ndef\n"
(stream-results
{:content (range 10)
:fn streaming/print})
=> (throws java.lang.NullPointerException)
(stream-results {:content (range 10) :fn #(streaming/print % 2)})
(stream-results
{:content (range 10)
:fn #(streaming/print % 2)})
=> (throws java.lang.NullPointerException)
(stream-results {:content (range 10) :fn #(streaming/union % %)})
(stream-results
{:content (range 10)
:fn #(streaming/union % %)})
=> "(0 1 2 3 4 5 6 7 8 9)\n(0 1 2 3 4 5 6 7 8 9)\n"
(stream-results {:content (range 10) :fn streaming/slide-duration})
(stream-results
{:content (range 10)
:fn streaming/slide-duration})
;;; TODO: chain with other method
=> (throws java.lang.IllegalArgumentException))

Expand All @@ -113,10 +165,11 @@
(let [context (streaming/streaming-context @defaults/spark (streaming/seconds 1))]
(fact "streaming context instantiatable"
context => (partial instance? JavaStreamingContext))
(fact "retrieving context from a d-stream"
(let [d-stream (streaming/socket-text-stream context
"localhost"
9999
streaming/memory-only)]
d-stream => (partial instance? JavaDStream)
(streaming/context d-stream) => (partial instance? StreamingContext)))))
(fact "retrieving context from a dstream"
(let [dstream (streaming/socket-text-stream context
"localhost"
9999
streaming/memory-only)]
dstream => (partial instance? JavaDStream)
(streaming/dstream dstream) => (partial instance? DStream)
(streaming/context dstream) => (partial instance? StreamingContext)))))

0 comments on commit aa19725

Please sign in to comment.