diff --git a/src/clojure/zero_one/geni/streaming.clj b/src/clojure/zero_one/geni/streaming.clj index 4a7c6932..c4b54855 100644 --- a/src/clojure/zero_one/geni/streaming.clj +++ b/src/clojure/zero_one/geni/streaming.clj @@ -1,5 +1,6 @@ (ns zero-one.geni.streaming (:refer-clojure :exclude [count + filter print]) (:require [potemkin :refer [import-vars]] @@ -14,8 +15,8 @@ Time) (org.apache.spark.sql SparkSession))) -;; TODO: count-by-value-and-window, dstream, filter, flat-map-to-pair, flat-map-values -;; TODO: foreachRDD, map, map-partitions-to-pair, map-to-pair, reduce, reduce-by-window +;; TODO: count-by-value-and-window, +;; TODO: foreachRDD, map, map-partitions-to-pair, reduce, reduce-by-window ;; TODO: repartition, slice, transform, transform-to-pair, transform-with, ;; TODO: transform-with-to-pair, window, wrap-rdd @@ -36,10 +37,10 @@ (.textFileStream context path)) (defmulti save-as-text-files! (fn [head & _] (class head))) -(defmethod save-as-text-files! JavaDStream [d-stream path] - (save-as-text-files! (.dstream d-stream) path)) -(defmethod save-as-text-files! :default [d-stream path] - (.saveAsTextFiles d-stream path "")) +(defmethod save-as-text-files! JavaDStream [dstream path] + (save-as-text-files! (.dstream dstream) path)) +(defmethod save-as-text-files! :default [dstream path] + (.saveAsTextFiles dstream path "")) (defn start! [context] (future (.start context))) @@ -50,48 +51,73 @@ (defn stop! [context] (future (.stop context false true))) -(defn cache [d-stream] - (.cache d-stream)) +(defn cache [dstream] + (.cache dstream)) -(defn checkpoint [d-stream interval] - (.checkpoint d-stream interval)) +(defn checkpoint [dstream interval] + (.checkpoint dstream interval)) -(defn context [d-stream] - (.context d-stream)) +(defn context [dstream] + (.context dstream)) (defn ->time [value] (if (instance? Time value) value (Time. value))) -(defn compute [d-stream t] - (.compute d-stream (->time t))) +(defn compute [dstream t] + (.compute dstream (->time t))) -(defn count [d-stream] - (.count d-stream)) +(defn count [dstream] + (.count dstream)) (defn count-by-value - ([d-stream] (.countByValue d-stream)) - ([d-stream num-partitions] (.countByValue d-stream num-partitions))) + ([dstream] (.countByValue dstream)) + ([dstream num-partitions] (.countByValue dstream num-partitions))) -(defn flat-map [d-stream f] - (.flatMap d-stream (function/flat-map-function f))) +(def dstream (memfn dstream)) -(defn glom [d-stream] - (.glom d-stream)) +(defn filter [dstream f] + (.filter dstream (function/function f))) + +(defn flat-map [dstream f] + (.flatMap dstream (function/flat-map-function f))) + +(defn flat-map-to-pair [dstream f] + (.flatMapToPair dstream (function/pair-flat-map-function f))) +(def mapcat-to-pair flat-map-to-pair) + +(defn flat-map-values [dstream f] + (.flatMapValues dstream (function/flat-map-function f))) + +(defn glom [dstream] + (.glom dstream)) + +(defn map-to-pair [rdd f] + (.mapToPair rdd (function/pair-function f))) (defn persist - ([d-stream] (.persist d-stream)) - ([d-stream storage-level] (.persist d-stream storage-level))) + ([dstream] (.persist dstream)) + ([dstream storage-level] (.persist dstream storage-level))) (defn print - ([d-stream] (.print d-stream)) - ([d-stream num] (.print d-stream num))) + ([dstream] (.print dstream)) + ([dstream num] (.print dstream num))) -(defn slide-duration [d-stream] - (.slideDuration d-stream)) +(defn slide-duration [dstream] + (.slideDuration dstream)) (defn union [left right] (.union left right)) +;; Pair DStream +(defn ->java-dstream [dstream] + (.toJavaDStream dstream)) + +(defn reduce-by-key + ([dstream f] + (.reduceByKey dstream (function/function2 f))) + ([dstream f partitions-or-partitioner] + (.reduceByKey dstream (function/function2 f) partitions-or-partitioner))) + (import-vars [zero-one.geni.storage disk-only diff --git a/test/zero_one/geni/streaming_test.clj b/test/zero_one/geni/streaming_test.clj index dac27f1e..6f9cc52e 100644 --- a/test/zero_one/geni/streaming_test.clj +++ b/test/zero_one/geni/streaming_test.clj @@ -1,15 +1,16 @@ (ns zero-one.geni.streaming-test (:require [clojure.edn :as edn] - [clojure.string :as string] [clojure.java.io :as io] + [clojure.string :as string] [midje.sweet :refer [facts fact throws =>]] [zero-one.geni.aot-functions :as aot] [zero-one.geni.defaults :as defaults] [zero-one.geni.streaming :as streaming]) (:import + (org.apache.spark.streaming Duration StreamingContext) (org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext) - (org.apache.spark.streaming Duration StreamingContext))) + (org.apache.spark.streaming.dstream DStream))) (defn create-random-temp-file! [file-name] (let [temp-dir (System/getProperty "java.io.tmpdir") @@ -37,69 +38,120 @@ input-stream (streaming/text-file-stream context (-> read-file .getParent .toString)) - d-stream ((:fn opts identity) input-stream)] + dstream ((:fn opts identity) input-stream)] (spit read-file "") - (streaming/save-as-text-files! d-stream (.toString write-file)) + (streaming/save-as-text-files! dstream (.toString write-file)) @(streaming/start! context) (Thread/sleep (:sleep-ms opts 50)) (spit read-file (str (:content opts "Hello World!"))) - ((:action-fn opts identity) d-stream) + ((:action-fn opts identity) dstream) (Thread/sleep (:sleep-ms opts 50)) (streaming/await-termination! context) @(streaming/stop! context) (let [result (written-content write-file) n-retries (:n-retries opts 0) - max-retries (:max-tries opts 5)] - (if (and (= result "") + max-retries (:max-tries opts 5) + expected (:expected opts result) + finish-fn (:finish-fn opts identity)] + (if (and (or (= result "") + (not= (finish-fn result) expected)) (< n-retries max-retries)) (do (println "Retrying stream-results ...") (stream-results (assoc opts :n-retries (inc n-retries)))) - result)))) + (finish-fn result))))) (def dummy-text (slurp "test/resources/rdd.txt")) (facts "On DStream methods" :streaming - (stream-results {:content dummy-text - :fn (comp streaming/count streaming/count-by-value)}) + (stream-results + {:content dummy-text + :fn #(-> % + (streaming/map-to-pair aot/to-pair) + (streaming/flat-map-values aot/to-pair) + streaming/->java-dstream) + :finish-fn #(-> % (string/split #"\n") distinct count) + :expected 6}) + => 6 + (stream-results + {:content dummy-text + :fn #(-> % + (streaming/flat-map-to-pair aot/split-spaces-and-pair) + (streaming/reduce-by-key +) + streaming/->java-dstream) + :finish-fn #(-> % (string/split #"\n") distinct count) + :expected 23}) + => 23 + (stream-results + {:content dummy-text + :fn #(-> % + (streaming/flat-map aot/split-spaces) + (streaming/filter aot/equals-lewis)) + :finish-fn #(set (string/split % #"\n")) + :expected #{"Lewis"}}) + => #{"Lewis"} + (stream-results + {:content dummy-text + :fn (comp streaming/count streaming/count-by-value)}) => #(string/includes? % "0\n") - (stream-results {:content dummy-text - :fn (comp streaming/count #(streaming/count-by-value % 1))}) + (stream-results + {:content dummy-text + :fn (comp streaming/count #(streaming/count-by-value % 1))}) => #(string/includes? % "0\n") (stream-results {:content dummy-text - :action-fn #(assert (nil? (streaming/compute % (+ 100 (System/currentTimeMillis)))))}) + :action-fn #(let [now (System/currentTimeMillis)] + (assert (nil? (streaming/compute % (+ 100 now)))))}) => string? - (-> (stream-results - {:content dummy-text - :fn #(streaming/flat-map % aot/split-spaces)}) - (string/split #"\n") - count) - => pos?) + (stream-results + {:content dummy-text + :fn #(streaming/flat-map % aot/split-spaces) + :finish-fn #(count (string/split % #"\n")) + :expected 522}) + => 522) (facts "On DStream testing" :streaming - (stream-results {:content (range 10) :fn streaming/cache}) + (stream-results + {:content (range 10) + :fn streaming/cache}) => (str (range 10) "\n") - (stream-results {:content (range 10) - :fn #(streaming/checkpoint % (streaming/milliseconds 200))}) + (stream-results + {:content (range 10) + :fn #(streaming/checkpoint % (streaming/milliseconds 200))}) => (str (range 10) "\n") - (stream-results {:content "abc\ndef" :fn streaming/count}) + (stream-results + {:content "abc\ndef" + :fn streaming/count}) => #(->> (string/split % #"\n") (map edn/read-string) (every? int?)) - (stream-results {:content "abc\ndef" :fn streaming/glom}) + (stream-results + {:content "abc\ndef" + :fn streaming/glom}) => #(string/includes? % "[abc") - (stream-results {:content "abc\ndef" :fn streaming/persist}) - => #(string/includes? % "abc\n") - (stream-results {:content "abc\ndef" - :fn #(streaming/persist % streaming/memory-only)}) + (stream-results + {:content "abc\ndef" + :fn streaming/persist + :expected "abc\ndef\n"}) => "abc\ndef\n" - (stream-results {:content (range 10) :fn streaming/print}) + (stream-results + {:content "abc\ndef" + :fn #(streaming/persist % streaming/memory-only)}) + => "abc\ndef\n" + (stream-results + {:content (range 10) + :fn streaming/print}) => (throws java.lang.NullPointerException) - (stream-results {:content (range 10) :fn #(streaming/print % 2)}) + (stream-results + {:content (range 10) + :fn #(streaming/print % 2)}) => (throws java.lang.NullPointerException) - (stream-results {:content (range 10) :fn #(streaming/union % %)}) + (stream-results + {:content (range 10) + :fn #(streaming/union % %)}) => "(0 1 2 3 4 5 6 7 8 9)\n(0 1 2 3 4 5 6 7 8 9)\n" - (stream-results {:content (range 10) :fn streaming/slide-duration}) + (stream-results + {:content (range 10) + :fn streaming/slide-duration}) ;;; TODO: chain with other method => (throws java.lang.IllegalArgumentException)) @@ -113,10 +165,11 @@ (let [context (streaming/streaming-context @defaults/spark (streaming/seconds 1))] (fact "streaming context instantiatable" context => (partial instance? JavaStreamingContext)) - (fact "retrieving context from a d-stream" - (let [d-stream (streaming/socket-text-stream context - "localhost" - 9999 - streaming/memory-only)] - d-stream => (partial instance? JavaDStream) - (streaming/context d-stream) => (partial instance? StreamingContext))))) + (fact "retrieving context from a dstream" + (let [dstream (streaming/socket-text-stream context + "localhost" + 9999 + streaming/memory-only)] + dstream => (partial instance? JavaDStream) + (streaming/dstream dstream) => (partial instance? DStream) + (streaming/context dstream) => (partial instance? StreamingContext)))))