From 4d1e76084713e9e7c42d11602d4db87cef6e5524 Mon Sep 17 00:00:00 2001 From: Anthony Khong Date: Wed, 2 Sep 2020 08:42:17 +0700 Subject: [PATCH] Sparkplug serialisation model (#183) * Sparkplugs serialisation instead of sparklings * Return to 100% tets coverage * Mention sparkplug in all related source files * Fixed typo --- src/clojure/zero_one/geni/rdd/function.clj | 81 ++++++++-- src/java/geni/rdd/function/ComparatorFn.java | 27 ++++ src/java/geni/rdd/function/FlatMapFn1.java | 30 ++++ src/java/geni/rdd/function/FlatMapFn2.java | 30 ++++ .../geni/rdd/function/FlatMapFunction.java | 19 --- .../geni/rdd/function/FlatMapFunction2.java | 17 -- src/java/geni/rdd/function/Fn1.java | 28 ++++ src/java/geni/rdd/function/Fn2.java | 28 ++++ src/java/geni/rdd/function/Fn3.java | 28 ++++ src/java/geni/rdd/function/Function.java | 13 -- src/java/geni/rdd/function/Function2.java | 13 -- src/java/geni/rdd/function/Function3.java | 13 -- src/java/geni/rdd/function/PairFlatMapFn.java | 42 +++++ .../rdd/function/PairFlatMapFunction.java | 30 ---- .../{PairFunction.java => PairFn.java} | 40 +++-- .../geni/rdd/function/SerializableFn.java | 145 ++++++++++++++++++ src/java/geni/rdd/function/VoidFn.java | 28 ++++ src/java/geni/rdd/function/VoidFunction.java | 15 -- .../AbstractSerializableWrappedIFn.java | 35 ----- src/java/geni/rdd/serialization/Utils.java | 59 ------- test/zero_one/geni/rdd_function_test.clj | 31 ++++ 21 files changed, 516 insertions(+), 236 deletions(-) create mode 100644 src/java/geni/rdd/function/ComparatorFn.java create mode 100644 src/java/geni/rdd/function/FlatMapFn1.java create mode 100644 src/java/geni/rdd/function/FlatMapFn2.java delete mode 100644 src/java/geni/rdd/function/FlatMapFunction.java delete mode 100644 src/java/geni/rdd/function/FlatMapFunction2.java create mode 100644 src/java/geni/rdd/function/Fn1.java create mode 100644 src/java/geni/rdd/function/Fn2.java create mode 100644 src/java/geni/rdd/function/Fn3.java delete mode 100644 src/java/geni/rdd/function/Function.java delete mode 100644 src/java/geni/rdd/function/Function2.java delete mode 100644 src/java/geni/rdd/function/Function3.java create mode 100644 src/java/geni/rdd/function/PairFlatMapFn.java delete mode 100644 src/java/geni/rdd/function/PairFlatMapFunction.java rename src/java/geni/rdd/function/{PairFunction.java => PairFn.java} (58%) create mode 100644 src/java/geni/rdd/function/SerializableFn.java create mode 100644 src/java/geni/rdd/function/VoidFn.java delete mode 100644 src/java/geni/rdd/function/VoidFunction.java delete mode 100644 src/java/geni/rdd/serialization/AbstractSerializableWrappedIFn.java delete mode 100644 src/java/geni/rdd/serialization/Utils.java create mode 100644 test/zero_one/geni/rdd_function_test.clj diff --git a/src/clojure/zero_one/geni/rdd/function.clj b/src/clojure/zero_one/geni/rdd/function.clj index 44db2704..a0025ca0 100644 --- a/src/clojure/zero_one/geni/rdd/function.clj +++ b/src/clojure/zero_one/geni/rdd/function.clj @@ -1,13 +1,68 @@ -(ns zero-one.geni.rdd.function) - -(defmacro gen-function [cls wrapper-name] - `(defn ~wrapper-name [f#] - (new ~(symbol (str "zero_one.geni.rdd.function." cls)) f#))) - -(gen-function Function function) -(gen-function Function2 function2) -(gen-function VoidFunction void-function) -(gen-function FlatMapFunction flat-map-function) -(gen-function FlatMapFunction2 flat-map-function2) -(gen-function PairFlatMapFunction pair-flat-map-function) -(gen-function PairFunction pair-function) +;; Taken from https://github.com/amperity/sparkplug +(ns zero-one.geni.rdd.function + (:require + [clojure.string :as str]) + (:import + (java.lang.reflect Field Modifier) + (java.util HashSet))) + +(defn access-field [^Field field obj] + (try + (.setAccessible field true) + (.get field obj) + (catch Exception _ nil))) ;; Original was IllegalAccessException + +(defn walk-object-vars [^HashSet references ^HashSet visited obj] + (when-not (or (nil? obj) + (boolean? obj) + (string? obj) + (number? obj) + (keyword? obj) + (symbol? obj) + (instance? clojure.lang.Ref obj) + (.contains visited obj)) + (.add visited obj) + (if (var? obj) + (let [ns-sym (ns-name (:ns (meta obj)))] + (.add references ns-sym)) + (do + (when (map? obj) + (doall + (for [entry obj] + (walk-object-vars references visited entry)))) + (doall + (for [^Field field (.getDeclaredFields (class obj))] + (when (or (not (map? obj)) (Modifier/isStatic (.getModifiers field))) + (let [value (access-field field obj)] + (when (or (ifn? value) (map? value)) + (walk-object-vars references visited value)))))))))) + +(defn namespace-references [^Object obj] + (let [obj-ns (-> (.. obj getClass getName) + (Compiler/demunge) + (str/split #"/") + (first) + (symbol)) + references (HashSet.) + visited (HashSet.)] + (when-not (class? (resolve obj-ns)) + (.add references obj-ns)) + (walk-object-vars references visited obj) + (disj (set references) 'clojure.core))) + +(defmacro ^:private gen-function + [fn-name constructor] + (let [class-sym (symbol (str "zero_one.geni.rdd.function." fn-name))] + `(defn ~(vary-meta constructor assoc :tag class-sym) + ~(str "Construct a new serializable " fn-name " function wrapping `f`.") + [~'f] + (let [references# (namespace-references ~'f)] + (new ~class-sym ~'f (mapv str references#)))))) + +(gen-function Fn1 function) +(gen-function Fn2 function2) +(gen-function FlatMapFn1 flat-map-function) +(gen-function FlatMapFn2 flat-map-function2) +(gen-function PairFlatMapFn pair-flat-map-function) +(gen-function PairFn pair-function) +(gen-function VoidFn void-function) diff --git a/src/java/geni/rdd/function/ComparatorFn.java b/src/java/geni/rdd/function/ComparatorFn.java new file mode 100644 index 00000000..0d4ecd3e --- /dev/null +++ b/src/java/geni/rdd/function/ComparatorFn.java @@ -0,0 +1,27 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Collection; +import java.util.Comparator; + + +/** + * Compatibility wrapper for a `Comparator` of two arguments. + */ +public class ComparatorFn extends SerializableFn implements Comparator { + + public ComparatorFn(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public int compare(Object v1, Object v2) { + return (int)f.invoke(v1, v2); + } + +} diff --git a/src/java/geni/rdd/function/FlatMapFn1.java b/src/java/geni/rdd/function/FlatMapFn1.java new file mode 100644 index 00000000..11d77ed7 --- /dev/null +++ b/src/java/geni/rdd/function/FlatMapFn1.java @@ -0,0 +1,30 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Iterator; +import java.util.Collection; + +import org.apache.spark.api.java.function.FlatMapFunction; + + +/** + * Compatibility wrapper for a Spark `FlatMapFunction` of one argument. + */ +public class FlatMapFn1 extends SerializableFn implements FlatMapFunction { + + public FlatMapFn1(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public Iterator call(Object v1) throws Exception { + Collection results = (Collection)f.invoke(v1); + return results.iterator(); + } + +} diff --git a/src/java/geni/rdd/function/FlatMapFn2.java b/src/java/geni/rdd/function/FlatMapFn2.java new file mode 100644 index 00000000..99f79343 --- /dev/null +++ b/src/java/geni/rdd/function/FlatMapFn2.java @@ -0,0 +1,30 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Iterator; +import java.util.Collection; + +import org.apache.spark.api.java.function.FlatMapFunction2; + + +/** + * Compatibility wrapper for a Spark `FlatMapFunction2` of two arguments. + */ +public class FlatMapFn2 extends SerializableFn implements FlatMapFunction2 { + + public FlatMapFn2(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public Iterator call(Object v1, Object v2) throws Exception { + Collection results = (Collection)f.invoke(v1, v2); + return results.iterator(); + } + +} diff --git a/src/java/geni/rdd/function/FlatMapFunction.java b/src/java/geni/rdd/function/FlatMapFunction.java deleted file mode 100644 index 59799fd6..00000000 --- a/src/java/geni/rdd/function/FlatMapFunction.java +++ /dev/null @@ -1,19 +0,0 @@ -package zero_one.geni.rdd.function; - -import java.util.Iterator; -import java.lang.Object; -import java.util.Collection; - -import clojure.lang.IFn; - -public class FlatMapFunction extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.FlatMapFunction { - public FlatMapFunction(IFn func) { - super(func); - } - - @SuppressWarnings("unchecked") - public java.util.Iterator call(Object v1) throws Exception { - return (java.util.Iterator) ((Collection) f.invoke(v1)).iterator(); - } - -} diff --git a/src/java/geni/rdd/function/FlatMapFunction2.java b/src/java/geni/rdd/function/FlatMapFunction2.java deleted file mode 100644 index b3d18b16..00000000 --- a/src/java/geni/rdd/function/FlatMapFunction2.java +++ /dev/null @@ -1,17 +0,0 @@ -package zero_one.geni.rdd.function; -import java.util.Iterator; -import java.lang.Object; -import java.util.Collection; - -import clojure.lang.IFn; - -public class FlatMapFunction2 extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.FlatMapFunction2 { - public FlatMapFunction2(IFn func) { - super(func); - } - - @SuppressWarnings("unchecked") - public java.util.Iterator call(Object v1, Object v2) throws Exception { - return (java.util.Iterator) ((Collection) f.invoke(v1, v2)).iterator(); - } -} diff --git a/src/java/geni/rdd/function/Fn1.java b/src/java/geni/rdd/function/Fn1.java new file mode 100644 index 00000000..34acc350 --- /dev/null +++ b/src/java/geni/rdd/function/Fn1.java @@ -0,0 +1,28 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Collection; + +import org.apache.spark.api.java.function.Function; + + +/** + * Compatibility wrapper for a Spark `Function` of one argument. + */ +public class Fn1 extends SerializableFn implements Function { + + public Fn1(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public Object call(Object v1) throws Exception { + return f.invoke(v1); + } + +} diff --git a/src/java/geni/rdd/function/Fn2.java b/src/java/geni/rdd/function/Fn2.java new file mode 100644 index 00000000..4630e94e --- /dev/null +++ b/src/java/geni/rdd/function/Fn2.java @@ -0,0 +1,28 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Collection; + +import org.apache.spark.api.java.function.Function2; + + +/** + * Compatibility wrapper for a Spark `Function2` of two arguments. + */ +public class Fn2 extends SerializableFn implements Function2 { + + public Fn2(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public Object call(Object v1, Object v2) throws Exception { + return f.invoke(v1, v2); + } + +} diff --git a/src/java/geni/rdd/function/Fn3.java b/src/java/geni/rdd/function/Fn3.java new file mode 100644 index 00000000..633bf734 --- /dev/null +++ b/src/java/geni/rdd/function/Fn3.java @@ -0,0 +1,28 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Collection; + +import org.apache.spark.api.java.function.Function3; + + +/** + * Compatibility wrapper for a Spark `Function3` of three arguments. + */ +public class Fn3 extends SerializableFn implements Function3 { + + public Fn3(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public Object call(Object v1, Object v2, Object v3) throws Exception { + return f.invoke(v1, v2, v3); + } + +} diff --git a/src/java/geni/rdd/function/Function.java b/src/java/geni/rdd/function/Function.java deleted file mode 100644 index 1a863e11..00000000 --- a/src/java/geni/rdd/function/Function.java +++ /dev/null @@ -1,13 +0,0 @@ -package zero_one.geni.rdd.function; - -import clojure.lang.IFn; - -public class Function extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function, org.apache.spark.sql.api.java.UDF1 { - public Function(IFn func) { - super(func); - } - - public Object call(Object v1) throws Exception { - return f.invoke(v1); - } -} diff --git a/src/java/geni/rdd/function/Function2.java b/src/java/geni/rdd/function/Function2.java deleted file mode 100644 index 532eec8d..00000000 --- a/src/java/geni/rdd/function/Function2.java +++ /dev/null @@ -1,13 +0,0 @@ -package zero_one.geni.rdd.function; - -import clojure.lang.IFn; - -public class Function2 extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function2, org.apache.spark.sql.api.java.UDF2 { - public Function2(IFn func) { - super(func); - } - - public Object call(Object v1, Object v2) throws Exception { - return f.invoke(v1, v2); - } -} diff --git a/src/java/geni/rdd/function/Function3.java b/src/java/geni/rdd/function/Function3.java deleted file mode 100644 index 415ebb6f..00000000 --- a/src/java/geni/rdd/function/Function3.java +++ /dev/null @@ -1,13 +0,0 @@ -package zero_one.geni.rdd.function; - -import clojure.lang.IFn; - -public class Function3 extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function3, org.apache.spark.sql.api.java.UDF3 { - public Function3(IFn func) { - super(func); - } - - public Object call(Object v1, Object v2, Object v3) throws Exception { - return f.invoke(v1, v2, v3); - } -} diff --git a/src/java/geni/rdd/function/PairFlatMapFn.java b/src/java/geni/rdd/function/PairFlatMapFn.java new file mode 100644 index 00000000..82cac9b1 --- /dev/null +++ b/src/java/geni/rdd/function/PairFlatMapFn.java @@ -0,0 +1,42 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Collection; +import java.util.Iterator; + +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import scala.Tuple2; + + +/** + * Compatibility wrapper for a Spark `PairFlatMapFunction` of one argument + * which returns a sequence of pairs. + */ +public class PairFlatMapFn extends SerializableFn implements PairFlatMapFunction { + + public PairFlatMapFn(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public Iterator> call(Object v1) throws Exception { + Collection result = (Collection)f.invoke(v1); + Iterator results = result.iterator(); + return new Iterator>() { + public boolean hasNext() { + return results.hasNext(); + } + + public Tuple2 next() { + return PairFn.coercePair(f, results.next()); + } + }; + } + +} diff --git a/src/java/geni/rdd/function/PairFlatMapFunction.java b/src/java/geni/rdd/function/PairFlatMapFunction.java deleted file mode 100644 index 19a7ce71..00000000 --- a/src/java/geni/rdd/function/PairFlatMapFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -package zero_one.geni.rdd.function; -import java.util.Iterator; -import java.lang.Object; -import java.util.Collection; - - - -import clojure.lang.IFn; -import scala.Tuple2; - -public class PairFlatMapFunction extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.PairFlatMapFunction { - public PairFlatMapFunction(IFn func) { - super(func); - } - - @SuppressWarnings("unchecked") - public Iterator> call(Object v1) throws Exception { - Collection result = (Collection)f.invoke(v1); - Iterator results = result.iterator(); - return new Iterator>() { - public boolean hasNext() { - return results.hasNext(); - } - - public Tuple2 next() { - return PairFunction.coercePair(f, results.next()); - } - }; - } -} diff --git a/src/java/geni/rdd/function/PairFunction.java b/src/java/geni/rdd/function/PairFn.java similarity index 58% rename from src/java/geni/rdd/function/PairFunction.java rename to src/java/geni/rdd/function/PairFn.java index 15b20cac..1a4b9bbd 100644 --- a/src/java/geni/rdd/function/PairFunction.java +++ b/src/java/geni/rdd/function/PairFn.java @@ -1,43 +1,65 @@ +// Taken from https://github.com/amperity/sparkplug package zero_one.geni.rdd.function; + import clojure.lang.IFn; import clojure.lang.IMapEntry; import clojure.lang.IPersistentVector; + +import java.util.Collection; + +import org.apache.spark.api.java.function.PairFunction; + import scala.Tuple2; -public class PairFunction extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.PairFunction { - public PairFunction(IFn func) { - super(func); + +/** + * Compatibility wrapper for a Spark `PairFunction` of one argument which + * returns a pair. + */ +public class PairFn extends SerializableFn implements PairFunction { + + public PairFn(IFn f, Collection namespaces) { + super(f, namespaces); } + + @Override @SuppressWarnings("unchecked") public Tuple2 call(Object v1) throws Exception { return coercePair(f, f.invoke(v1)); } + /** - * Coerce a result value into a Scala `Tuple2` as the result of a function. - * - * @param f the function which produced the result, to report in error messages - * @param result object to try to coerce - * @return a Scala tuple with two values - */ + * Coerce a result value into a Scala `Tuple2` as the result of a function. + * + * @param f the function which produced the result, to report in error messages + * @param result object to try to coerce + * @return a Scala tuple with two values + */ public static Tuple2 coercePair(IFn f, Object result) { + // Null can't be coerced. if (result == null) { throw new RuntimeException("Wrapped pair function " + f + " returned a null"); + // Scala tuples can be returned directly. } else if (result instanceof Tuple2) { return (Tuple2)result; + // Use key/value from Clojure map entries to construct a tuple. } else if (result instanceof IMapEntry) { IMapEntry entry = (IMapEntry)result; return new Tuple2(entry.key(), entry.val()); + // Try to generically coerce a sequential result into a tuple. } else if (result instanceof IPersistentVector) { IPersistentVector vector = (IPersistentVector)result; if (vector.count() != 2) { throw new RuntimeException("Wrapped pair function " + f + " returned a vector without exactly two values: " + vector.count()); } return new Tuple2(vector.nth(0), vector.nth(1)); + // Unknown type, can't coerce. } else { throw new RuntimeException("Wrapped pair function " + f + " returned an invalid pair type: " + result.getClass().getName()); } } + } diff --git a/src/java/geni/rdd/function/SerializableFn.java b/src/java/geni/rdd/function/SerializableFn.java new file mode 100644 index 00000000..5c317182 --- /dev/null +++ b/src/java/geni/rdd/function/SerializableFn.java @@ -0,0 +1,145 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.Compiler; +import clojure.lang.IFn; +import clojure.lang.RT; +import clojure.lang.Symbol; +import clojure.lang.Var; + +import java.io.IOException; +import java.io.InvalidObjectException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Base class for function classes built for interop with Spark and Scala. + * + * This class is designed to be serialized across computation boundaries in a + * manner compatible with Spark and Kryo, while ensuring that required code is + * loaded upon deserialization. + */ +public abstract class SerializableFn implements Serializable { + + private static final Logger logger = LoggerFactory.getLogger(SerializableFn.class); + private static final Var require = RT.var("clojure.core", "require"); + + protected IFn f; + protected List namespaces; + + + /** + * Default empty constructor. + */ + private SerializableFn() { + } + + + /** + * Construct a new serializable wrapper for the function with an explicit + * set of required namespaces. + * + * @param fn Clojure function to wrap + * @param namespaces collection of namespaces required + */ + protected SerializableFn(IFn fn, Collection namespaces) { + this.f = fn; + List namespaceColl = new ArrayList(namespaces); + Collections.sort(namespaceColl); + this.namespaces = Collections.unmodifiableList(namespaceColl); + } + + + /** + * Serialize the function to the provided output stream. + * An unspoken part of the `Serializable` interface. + * + * @param out stream to write the function to + */ + private void writeObject(ObjectOutputStream out) throws IOException { + try { + logger.trace("Serializing " + f); + // Write the function class name + // This is only used for debugging + out.writeObject(f.getClass().getName()); + // Write out the referenced namespaces. + out.writeInt(namespaces.size()); + for (String ns : namespaces) { + out.writeObject(ns); + } + // Write out the function itself. + out.writeObject(f); + } catch (IOException ex) { + logger.error("Error serializing function " + f, ex); + throw ex; + } catch (RuntimeException ex){ + logger.error("Error serializing function " + f, ex); + throw ex; + } + } + + + /** + * Deserialize a function from the provided input stream. + * An unspoken part of the `Serializable` interface. + * + * @param in stream to read the function from + */ + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String className = ""; + try { + // Read the function class name. + className = (String)in.readObject(); + logger.trace("Deserializing " + className); + // Read the referenced namespaces and load them. + int nsCount = in.readInt(); + this.namespaces = new ArrayList(nsCount); + for (int i = 0; i < nsCount; i++) { + String ns = (String)in.readObject(); + namespaces.add(ns); + requireNamespace(ns); + } + // Read the function itself. + this.f = (IFn)in.readObject(); + } catch (IOException ex) { + logger.error("IO error deserializing function " + className, ex); + throw ex; + } catch (ClassNotFoundException ex) { + logger.error("Class error deserializing function " + className, ex); + throw ex; + } catch (RuntimeException ex) { + logger.error("Error deserializing function " + className, ex); + throw ex; + } + } + + + /** + * Load the namespace specified by the given symbol. + * + * @param namespace string designating the namespace to load + */ + private static void requireNamespace(String namespace) { + try { + logger.trace("(require " + namespace + ")"); + synchronized (RT.REQUIRE_LOCK) { + Symbol sym = Symbol.intern(namespace); + require.invoke(sym); + } + } catch (Exception ex) { + logger.warn("Error loading namespace " + namespace, ex); + } + } + +} diff --git a/src/java/geni/rdd/function/VoidFn.java b/src/java/geni/rdd/function/VoidFn.java new file mode 100644 index 00000000..69324f8b --- /dev/null +++ b/src/java/geni/rdd/function/VoidFn.java @@ -0,0 +1,28 @@ +// Taken from https://github.com/amperity/sparkplug +package zero_one.geni.rdd.function; + + +import clojure.lang.IFn; + +import java.util.Collection; + +import org.apache.spark.api.java.function.VoidFunction; + + +/** + * Compatibility wrapper for a Spark `VoidFunction` of one argument. + */ +public class VoidFn extends SerializableFn implements VoidFunction { + + public VoidFn(IFn f, Collection namespaces) { + super(f, namespaces); + } + + + @Override + @SuppressWarnings("unchecked") + public void call(Object v1) throws Exception { + f.invoke(v1); + } + +} diff --git a/src/java/geni/rdd/function/VoidFunction.java b/src/java/geni/rdd/function/VoidFunction.java deleted file mode 100644 index bcf9acc9..00000000 --- a/src/java/geni/rdd/function/VoidFunction.java +++ /dev/null @@ -1,15 +0,0 @@ -package zero_one.geni.rdd.function; - -import clojure.lang.IFn; - -public class VoidFunction extends zero_one.geni.rdd.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.VoidFunction { - - @SuppressWarnings("unchecked") - public void call(Object v1) throws Exception { - f.invoke(v1); - } - - public VoidFunction(IFn func) { - super(func); - } -} diff --git a/src/java/geni/rdd/serialization/AbstractSerializableWrappedIFn.java b/src/java/geni/rdd/serialization/AbstractSerializableWrappedIFn.java deleted file mode 100644 index 0fc361da..00000000 --- a/src/java/geni/rdd/serialization/AbstractSerializableWrappedIFn.java +++ /dev/null @@ -1,35 +0,0 @@ -package zero_one.geni.rdd.serialization; - -import clojure.lang.IFn; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import static zero_one.geni.rdd.serialization.Utils.readIFn; -import static zero_one.geni.rdd.serialization.Utils.writeIFn; - -/** - * Created by cbetz on 03.12.14. - */ -public abstract class AbstractSerializableWrappedIFn implements Serializable { - protected IFn f; - - public AbstractSerializableWrappedIFn() { - } - - public AbstractSerializableWrappedIFn(IFn func) { - f = func; - } - - private void writeObject(ObjectOutputStream out) throws IOException { - writeIFn(out, f); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - f = readIFn(in); - } - - -} diff --git a/src/java/geni/rdd/serialization/Utils.java b/src/java/geni/rdd/serialization/Utils.java deleted file mode 100644 index e3455b69..00000000 --- a/src/java/geni/rdd/serialization/Utils.java +++ /dev/null @@ -1,59 +0,0 @@ -package zero_one.geni.rdd.serialization; - -import java.lang.ClassNotFoundException; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.ObjectInputStream; - -import clojure.lang.IFn; -import clojure.lang.RT; -import clojure.lang.Var; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class Utils { - final static Logger logger = LoggerFactory.getLogger(Utils.class); - - static final Var require = RT.var("clojure.core", "require"); - static final Var symbol = RT.var("clojure.core", "symbol"); - - private Utils() { - } - - public static void requireNamespace(String namespace) { - try { - if (!"clojure.lang.Keyword".equals(namespace)) { // Keyword is a special case. - require.invoke(symbol.invoke(namespace)); - } - } catch (Exception e) { - logger.warn ("Error deserializing function (require " + namespace +") " ,e); - } - } - - public static void writeIFn(ObjectOutputStream out, IFn f) throws IOException { - try { - logger.debug("Serializing " + f ); - out.writeObject(f.getClass().getName()); - out.writeObject(f); - } catch (Exception e) { - logger.warn("Error serializing IFn " + f,e); - } - } - - public static IFn readIFn(ObjectInputStream in) throws IOException, ClassNotFoundException { - String clazz = "", namespace = ""; - try { - clazz = (String) in.readObject(); - namespace = clazz.split("\\$")[0]; - - requireNamespace(namespace); - - IFn f = (IFn) in.readObject(); - logger.debug("Deserializing " + f ); - return f; - } catch (Exception e) { - logger.warn("Error deserializing object (clazz: " + clazz + ", namespace: " + namespace + ")" ,e); - } - return null; - } -} diff --git a/test/zero_one/geni/rdd_function_test.clj b/test/zero_one/geni/rdd_function_test.clj new file mode 100644 index 00000000..1db447d0 --- /dev/null +++ b/test/zero_one/geni/rdd_function_test.clj @@ -0,0 +1,31 @@ +(ns zero-one.geni.rdd-function-test + (:require + [clojure.set :as set] + [midje.sweet :refer [facts fact =>]] + [zero-one.geni.rdd.function :as function]) + (:import + (java.util HashSet))) + +(facts "On serialisability" + (fact "On access-field" + (for [field (-> HashSet .getDeclaredFields seq)] + (function/access-field field {})) => #(and (integer? (first %)) + (nil? (second %)) + (not (nil? (nth % 2))))) + (fact "On namespace-references" + (function/namespace-references function/namespace-references) + => (partial set/subset? #{'clojure.string 'zero-one.geni.rdd.function}) + (function/namespace-references clojure.lang.Keyword) => #{}) + (fact "On walk-object-vars" + (doall + (for [obj [nil true "abc" 123 :def 'ghi (ref {})]] + (let [refs (HashSet.) + visited (HashSet.)] + (function/walk-object-vars refs visited obj) + (into #{} refs) => empty? + (into #{} visited) => empty?))) + (let [refs (HashSet.) + visited (HashSet.)] + (function/walk-object-vars refs visited {:abc function/walk-object-vars}) + (into #{} visited) => (complement empty?) + (into #{} refs) => (partial set/subset? #{'clojure.core}))))