From 21b51fb089bc49ee1fda7fa71a7551354cda6984 Mon Sep 17 00:00:00 2001 From: Anthony Khong Date: Mon, 28 Sep 2020 08:53:12 +0700 Subject: [PATCH] More dstream methods (#221) * Added more DStream methods * Back to 100% coverage --- src/clojure/zero_one/geni/streaming.clj | 32 ++++++++++++++---- test/zero_one/geni/streaming_test.clj | 45 ++++++++++++++++++++++--- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/src/clojure/zero_one/geni/streaming.clj b/src/clojure/zero_one/geni/streaming.clj index c4b54855..23aaab43 100644 --- a/src/clojure/zero_one/geni/streaming.clj +++ b/src/clojure/zero_one/geni/streaming.clj @@ -1,7 +1,9 @@ (ns zero-one.geni.streaming (:refer-clojure :exclude [count filter - print]) + map + print + reduce]) (:require [potemkin :refer [import-vars]] [zero-one.geni.rdd.function :as function] @@ -16,9 +18,9 @@ (org.apache.spark.sql SparkSession))) ;; TODO: count-by-value-and-window, -;; TODO: foreachRDD, map, map-partitions-to-pair, reduce, reduce-by-window -;; TODO: repartition, slice, transform, transform-to-pair, transform-with, -;; TODO: transform-with-to-pair, window, wrap-rdd +;; TODO: foreachRDD, reduce-by-window +;; TODO: slice, transform, transform-to-pair, transform-with, +;; TODO: transform-with-to-pair, window (defn milliseconds [t] (Milliseconds/apply t)) @@ -91,8 +93,17 @@ (defn glom [dstream] (.glom dstream)) -(defn map-to-pair [rdd f] - (.mapToPair rdd (function/pair-function f))) +(defn map [dstream f] + (.map dstream (function/function f))) + +(defn map-partitions [dstream f] + (.mapPartitions dstream (function/flat-map-function f))) + +(defn map-partitions-to-pair [dstream f] + (.mapPartitionsToPair dstream (function/pair-flat-map-function f))) + +(defn map-to-pair [dstream f] + (.mapToPair dstream (function/pair-function f))) (defn persist ([dstream] (.persist dstream)) @@ -102,12 +113,21 @@ ([dstream] (.print dstream)) ([dstream num] (.print dstream num))) +(defn reduce [dstream f] + (.reduce dstream (function/function2 f))) + +(defn repartition [dstream num-partitions] + (.repartition dstream num-partitions)) + (defn slide-duration [dstream] (.slideDuration dstream)) (defn union [left right] (.union left right)) +(defn wrap-rdd [dstream rdd] + (.wrapRDD dstream (.rdd rdd))) + ;; Pair DStream (defn ->java-dstream [dstream] (.toJavaDStream dstream)) diff --git a/test/zero_one/geni/streaming_test.clj b/test/zero_one/geni/streaming_test.clj index 6f9cc52e..589a13e1 100644 --- a/test/zero_one/geni/streaming_test.clj +++ b/test/zero_one/geni/streaming_test.clj @@ -6,9 +6,10 @@ [midje.sweet :refer [facts fact throws =>]] [zero-one.geni.aot-functions :as aot] [zero-one.geni.defaults :as defaults] + [zero-one.geni.rdd :as rdd] [zero-one.geni.streaming :as streaming]) (:import - (org.apache.spark.streaming Duration StreamingContext) + (org.apache.spark.streaming Duration StreamingContext Time) (org.apache.spark.streaming.api.java JavaDStream JavaStreamingContext) (org.apache.spark.streaming.dstream DStream))) @@ -65,6 +66,39 @@ (slurp "test/resources/rdd.txt")) (facts "On DStream methods" :streaming + (stream-results + {:content dummy-text + :fn #(-> % + (streaming/repartition 2) + (streaming/map count) + (streaming/reduce +)) + :expected "2709\n"}) + => "2709\n" + (stream-results + {:content dummy-text + :fn #(-> % + (streaming/map-partitions-to-pair aot/mapcat-split-spaces) + (streaming/reduce-by-key + 2) + streaming/->java-dstream) + :finish-fn #(count (string/split % #"\n")) + :expected 23}) + => 23 + (stream-results + {:content dummy-text + :fn #(streaming/map-partitions % aot/map-split-spaces) + :finish-fn #(->> (string/split % #"\n") + (map count) + (reduce +)) + :expected 2313}) + => 2313 + (stream-results + {:content dummy-text + :fn #(streaming/map % count) + :finish-fn #(->> (string/split % #"\n") + (map edn/read-string) + (reduce +)) + :expected 2709}) + => 2709 (stream-results {:content dummy-text :fn #(-> % @@ -155,11 +189,13 @@ ;;; TODO: chain with other method => (throws java.lang.IllegalArgumentException)) -(facts "On durations" :streaming +(facts "On durations and times" :streaming (fact "durations instantiatable" (mapv (fn [f] (f 1) => (partial instance? Duration)) - [streaming/milliseconds streaming/seconds streaming/minutes]))) + [streaming/milliseconds streaming/seconds streaming/minutes])) + (fact "time coerceable" + (streaming/->time 123) => (Time. 123))) (facts "On StreamingContext" :streaming (let [context (streaming/streaming-context @defaults/spark (streaming/seconds 1))] @@ -172,4 +208,5 @@ streaming/memory-only)] dstream => (partial instance? JavaDStream) (streaming/dstream dstream) => (partial instance? DStream) - (streaming/context dstream) => (partial instance? StreamingContext))))) + (streaming/context dstream) => (partial instance? StreamingContext) + (rdd/collect (streaming/wrap-rdd dstream (rdd/parallelise [1 2 3]))) => [1 2 3]))))