Skip to content

Commit

Permalink
Optimize group-by and apply cljfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
skylee03 committed Feb 3, 2024
1 parent 9e19684 commit dd12645
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 106 deletions.
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
[org.apache.spark/spark-sql_2.12 "3.3.3"]
[org.apache.spark/spark-streaming_2.12 "3.3.3"]]
:main ^:skip-aot datajure.dsl
:plugins [[dev.weavejester/lein-cljfmt "0.12.0"]]
:target-path "target/%s"
:jvm-opts ["--add-opens=java.base/java.nio=ALL-UNNAMED"
"--add-opens=java.base/java.net=ALL-UNNAMED"
Expand Down
5 changes: 2 additions & 3 deletions src/datajure/dsl.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns datajure.dsl
(:refer-clojure :exclude [print])
(:refer-clojure :exclude [print])
(:require [datajure.repl :as repl])
(:gen-class))

Expand Down Expand Up @@ -99,14 +99,13 @@
(filterv #(= 3 (count %))))
group-by-list (:group-by options-map)
sort-by-list (:sort-by options-map)

query-map {:row row-list
:where where-list
:group-by group-by-list
:having having-list
:sort-by sort-by-list
:select select-list}]
`(query-using-map ~dataset ~query-map)))
`(query-using-map ~dataset ~query-map)))
([dataset row-filter-list select-list]
`(query ~dataset ~row-filter-list ~select-list [])))

Expand Down
8 changes: 4 additions & 4 deletions src/datajure/operation_ck.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns datajure.operation-ck
(:refer-clojure :exclude [group-by sort-by])
(:refer-clojure :exclude [group-by sort-by])
(:require [clojure.java.io :refer [make-parents]]
[clojask.dataframe :as ck]))

Expand Down Expand Up @@ -228,7 +228,7 @@
"Select columns of `dataset` according to `query-map`."
[dataset query-map]
(let [select-all-keys (split-col-agg-keys-r dataset (:select query-map))]
(if (empty? select-all-keys)
(ck/compute dataset 8 "./.dsl/select-result.csv")
(ck/compute dataset 8 "./.dsl/select-result.csv" :select select-all-keys)))
(if (empty? select-all-keys)
(ck/compute dataset 8 "./.dsl/select-result.csv")
(ck/compute dataset 8 "./.dsl/select-result.csv" :select select-all-keys)))
dataset)
21 changes: 7 additions & 14 deletions src/datajure/operation_ds.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
(require '[tech.v3.dataset :as ds]
'[tech.v3.dataset.join :as ds-join])


(def ^:private aggregate-function-keywords #{:min :mean :mode :max :sum :sd :skew :n-valid :n-missing :n})

(defn- filter-column-r
Expand Down Expand Up @@ -52,8 +51,6 @@
list-num-missing (descriptive-ds :n-missing)
list-num-total (mapv #(+ %1 %2) list-num-missing list-num-valid)
list-sum (mapv #(if (and (number? %1) (number? %2)) (* %1 %2) nil) list-mean list-num-valid)


min-keys (mapv #(get-key-val %1 %2 :min) list-col list-min)
mean-keys (mapv #(get-key-val %1 %2 :mean) list-col list-mean)
mode-keys (mapv #(get-key-val %1 %2 :mode) list-col list-mode)
Expand All @@ -79,7 +76,6 @@
agg-ds (apply ds/concat (vals descriptive-grouped-map))]
(ds-join/left-join group-by-col first-ds agg-ds))))


(defn- get-combined-group-by-col
"Get the combined form of column names as described by `group-by-col`."
[group-by-col]
Expand All @@ -96,15 +92,13 @@
(let [group-by-col (get query-map :group-by)]
(if (nil? group-by-col)
dataset
(if (or (seq? group-by-col) (list? group-by-col) (vector? group-by-col))
(if (empty? group-by-col)
dataset
(if (= 1 (count group-by-col))
(group-by-single dataset (first group-by-col))
(let [new-col (get-combined-group-by-col group-by-col)
new-dataset (assoc dataset new-col (get-combined-group-by-val dataset group-by-col))]
(group-by-single new-dataset new-col))))
(group-by-single dataset group-by-col)))))
(if (empty? group-by-col)
dataset
(if (= 1 (count group-by-col))
(group-by-single dataset (first group-by-col))
(let [new-col (get-combined-group-by-col group-by-col)
new-dataset (assoc dataset new-col (get-combined-group-by-val dataset group-by-col))]
(group-by-single new-dataset new-col)))))))

(defn having
"Perform the `HAVING` operation on `dataset` by specifying a search condition for a group or an aggregate according to `query-map`."
Expand All @@ -131,7 +125,6 @@
(ds/sort-by-column dataset colname)
(ds/sort-by-column dataset colname compare-fn)))))))


(defn- split-col-agg-keys-r
"Convert aggregation keywords in `mixed-words` from separated form to combined form."
[mixed-words]
Expand Down
1 change: 0 additions & 1 deletion src/datajure/operation_g.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
(get-agg-key first-exp :first)))]
(g/sort dataset colname))))))


(defn- split-col-agg-keys-r
"Convert aggregation keywords in `mixed-words` from separated form to combined form."
[mixed-words]
Expand Down
53 changes: 15 additions & 38 deletions src/datajure/operation_tc.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
[(get-agg-key col-name attr-keyword) val])

(defn- get-description-column-ds
"Convert the statistical information of the columns described by `groupby-col` and `groupby-col-val` from `descriptive-ds` to a dataset."
[descriptive-ds groupby-col groupby-col-val]
"Convert the statistical information of the columns described by `groupby-cols` and `groupby-col-val` from `descriptive-ds` to a dataset."
[descriptive-ds groupby-cols groupby-col-val]
(let [list-col (descriptive-ds :col-name)
list-min (descriptive-ds :min)
list-mean (descriptive-ds :mean)
Expand All @@ -50,8 +50,6 @@
list-num-missing (descriptive-ds :n-missing)
list-num-total (mapv #(+ %1 %2) list-num-missing list-num-valid)
list-sum (mapv #(if (and (number? %1) (number? %2)) (* %1 %2) nil) list-mean list-num-valid)


min-keys (mapv #(get-key-val %1 %2 :min) list-col list-min)
mean-keys (mapv #(get-key-val %1 %2 :mean) list-col list-mean)
mode-keys (mapv #(get-key-val %1 %2 :mode) list-col list-mode)
Expand All @@ -61,48 +59,28 @@
skew-keys (mapv #(get-key-val %1 %2 :skew) list-col list-skew)
num-valid-keys (mapv #(get-key-val %1 %2 :n-valid) list-col list-num-valid)
num-missing-keys (mapv #(get-key-val %1 %2 :n-missing) list-col list-num-missing)
num-total-keys (mapv #(get-key-val %1 %2 :n) list-col list-num-total)]
num-total-keys (mapv #(get-key-val %1 %2 :n) list-col list-num-total)
group-by-list (into [] (map (fn [k] [k [(get groupby-col-val k)]]) groupby-cols))]
(tc/dataset
(into {} (into {} (filter second (reduce into [[[groupby-col groupby-col-val]] min-keys mean-keys mode-keys max-keys sum-keys sd-keys skew-keys num-valid-keys num-missing-keys num-total-keys])))))))

(defn- group-by-single
"Perform `group-by` operation on `dataset` as specified by `group-by-col`."
[dataset group-by-col]
(if (nil? group-by-col)
dataset
(let [grouped-map (tc/group-by dataset group-by-col {:result-type :as-map})
descriptive-grouped-map (into {} (map (fn [[k v]] [k (get-description-column-ds (tc/info v) group-by-col k)]) grouped-map))
fisrt-rows (mapv #(tc/select-rows % [0]) (vals grouped-map))
first-ds (apply tc/concat fisrt-rows)
agg-ds (apply tc/concat (vals descriptive-grouped-map))]
(tc/left-join first-ds agg-ds group-by-col))))


(defn- get-combined-group-by-col
"Get the combined form of column names as described by `group-by-col`."
[group-by-col]
(keyword (apply str (mapv name group-by-col))))

(defn- get-combined-group-by-val
"Get the combined form of columns of `dataset` as described by `group-by-col`."
[dataset group-by-col]
(mapv #(apply str (str %)) (tc/rows (tc/select-columns dataset group-by-col))))
(into {} (into {} (filter second (reduce into [group-by-list min-keys mean-keys mode-keys max-keys sum-keys sd-keys skew-keys num-valid-keys num-missing-keys num-total-keys])))))))

(defn group-by
"Group the records in `dataset` according to `query-map`."
[dataset query-map]
(let [group-by-col (get query-map :group-by)]
(if (nil? group-by-col)
dataset
(if (or (seq? group-by-col) (list? group-by-col) (vector? group-by-col))
(if (empty? group-by-col)
(let [group-by-cols (if (or (seq? group-by-col) (list? group-by-col) (vector? group-by-col))
group-by-col
[group-by-col])]
(if (empty? group-by-cols)
dataset
(if (= 1 (count group-by-col))
(group-by-single dataset (first group-by-col))
(let [new-col (get-combined-group-by-col group-by-col)
new-dataset (assoc dataset new-col (get-combined-group-by-val dataset group-by-col))]
(group-by-single new-dataset new-col))))
(group-by-single dataset group-by-col)))))
(let [grouped-map (tc/group-by dataset group-by-cols {:result-type :as-map})
descriptive-grouped-map (into {} (map (fn [[k v]] [k (get-description-column-ds (tc/info v) group-by-cols k)]) grouped-map))
fisrt-rows (mapv #(tc/select-rows % [0]) (vals grouped-map))
first-ds (apply tc/concat fisrt-rows)
agg-ds (apply tc/concat (vals descriptive-grouped-map))]
(tc/left-join first-ds agg-ds group-by-cols)))))))

(defn having
"Perform the `HAVING` operation on `dataset` by specifying a search condition for a group or an aggregate according to `query-map`."
Expand All @@ -129,7 +107,6 @@
(tc/order-by dataset colname)
(tc/order-by dataset colname compare-fn)))))))


(defn- split-col-agg-keys-r
"Convert aggregation keywords in `mixed-words` from separated form to combined form."
[mixed-words]
Expand Down
92 changes: 46 additions & 46 deletions test/datajure/dsl_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,29 @@
(let [expected (slurp "./test/datajure/tc-expected.txt")
actual (with-out-str (do (dtj/set-backend "tablecloth")
(-> data
(dtj/dataset)
(dtj/query [[:salary #(< 300 %)] [:age #(> 20 %)]] [])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary #(< 1000 %)]] [:age :sum :salary] [:group-by :age])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :sum :salary :sd :salary] [:group-by :age :sort-by :sd :salary >])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :name :sum :salary] [:group-by :age :name])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:salary #(< 0 %)] [:age #(< 24 %)]] [])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary #(< 0 %)] [:age #(< 0 %)]] [:name :age :salary :sum :salary :sd :salary] [:group-by :name :age :sort-by :salary])
(dtj/print))))]
(dtj/dataset)
(dtj/query [[:salary #(< 300 %)] [:age #(> 20 %)]] [])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary #(< 1000 %)]] [:age :sum :salary] [:group-by :age])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :sum :salary :sd :salary] [:group-by :age :sort-by :sd :salary >])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :name :sum :salary] [:group-by :age :name])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:salary #(< 0 %)] [:age #(< 24 %)]] [])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary #(< 0 %)] [:age #(< 0 %)]] [:name :age :salary :sum :salary :sd :salary] [:group-by :name :age :sort-by :salary])
(dtj/print))))]
(is (check actual expected) actual)))

(deftest ck-test
Expand Down Expand Up @@ -104,27 +104,27 @@
(let [expected (slurp "./test/datajure/g-expected.txt")
actual (with-out-str (do (dtj/set-backend "geni")
(-> data
(dtj/dataset)
(dtj/query [[:salary (g/< (g/lit 300) :salary)] [:age (g/> (g/lit 20) :age)]] [])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary (g/< (g/lit 1000) (keyword "sum(salary)"))]] [:age :sum :salary] [:group-by :age])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :sum :salary :sd :salary] [:group-by :age :sort-by :sd :salary])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :name :sum :salary] [:group-by :age :name :sort-by :sum :salary])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:salary (g/< (g/lit 0) :salary)] [:age (g/< (g/lit 24) :age)]] [] [:sort-by :salary])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary (g/< (g/lit 0) (keyword "sum(salary)"))] [:age (g/< (g/lit 0) :age)]] [:name :age :salary :sum :salary :sd :salary] [:group-by :name :age :sort-by :sum :salary])
(dtj/print))))]
(dtj/dataset)
(dtj/query [[:salary (g/< (g/lit 300) :salary)] [:age (g/> (g/lit 20) :age)]] [])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary (g/< (g/lit 1000) (keyword "sum(salary)"))]] [:age :sum :salary] [:group-by :age])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :sum :salary :sd :salary] [:group-by :age :sort-by :sd :salary])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [] [:age :name :sum :salary] [:group-by :age :name :sort-by :sum :salary])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:salary (g/< (g/lit 0) :salary)] [:age (g/< (g/lit 24) :age)]] [] [:sort-by :salary])
(dtj/print))
(-> data
(dtj/dataset)
(dtj/query [[:sum :salary (g/< (g/lit 0) (keyword "sum(salary)"))] [:age (g/< (g/lit 0) :age)]] [:name :age :salary :sum :salary :sd :salary] [:group-by :name :age :sort-by :sum :salary])
(dtj/print))))]
(is (check actual expected) actual)))

0 comments on commit dd12645

Please sign in to comment.