From bac8b6dca64b6fbbd5f28a288550441d08f84121 Mon Sep 17 00:00:00 2001 From: cavemandaveman Date: Fri, 27 Jul 2018 18:11:43 -0500 Subject: [PATCH] Infer avro schema if not provided --- README.md | 2 +- nifi-encrypt-value-nar/pom.xml | 6 +++--- nifi-encrypt-value-processors/pom.xml | 2 +- .../processors/encryptvalue/EncryptValue.java | 8 +++++++- .../processors/util/FormatStream.java | 10 +++++++++- .../encryptvalue/EncryptValueAvroTest.java | 16 ++++++++++++++++ pom.xml | 2 +- 7 files changed, 38 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index e157fe0..88d5425 100644 --- a/README.md +++ b/README.md @@ -59,5 +59,5 @@ Determines what hashing algorithm should be used to perform the encryption - ~~Add support for more hashing algorithms~~ - ~~Support salting~~ - Allow choice of Avro compression (Snappy, bzip2, etc.) -- Infer Avro schema if not passed in +- ~~Infer Avro schema if not passed in~~ - Better unit tests for Avro diff --git a/nifi-encrypt-value-nar/pom.xml b/nifi-encrypt-value-nar/pom.xml index 3fe295f..e1d7082 100644 --- a/nifi-encrypt-value-nar/pom.xml +++ b/nifi-encrypt-value-nar/pom.xml @@ -19,11 +19,11 @@ com.nineteen04labs nifi-encrypt-value-bundle - 18.07.4 + 18.07.5 nifi-encrypt-value-nar - 18.07.4 + 18.07.5 nar true @@ -34,7 +34,7 @@ com.nineteen04labs nifi-encrypt-value-processors - 18.07.4 + 18.07.5 diff --git a/nifi-encrypt-value-processors/pom.xml b/nifi-encrypt-value-processors/pom.xml index eacddad..f52b828 100644 --- a/nifi-encrypt-value-processors/pom.xml +++ b/nifi-encrypt-value-processors/pom.xml @@ -20,7 +20,7 @@ com.nineteen04labs nifi-encrypt-value-bundle - 18.07.4 + 18.07.5 nifi-encrypt-value-processors diff --git a/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/encryptvalue/EncryptValue.java b/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/encryptvalue/EncryptValue.java index 19da8cf..1f36036 100644 --- a/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/encryptvalue/EncryptValue.java +++ b/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/encryptvalue/EncryptValue.java @@ -112,8 +112,14 @@ public void process(InputStream in, OutputStream out) throws IOException { JsonGenerator jsonGen = jsonFactory.createGenerator(baos); Schema schema = null; + if (flowFormat.equals("AVRO")) { - schema = new Schema.Parser().parse(schemaString); + try { + schema = new Schema.Parser().parse(schemaString); + } catch (NullPointerException e) { + schema = FormatStream.getEmbeddedSchema(in); + in.reset(); + } in = FormatStream.avroToJson(in, schema); } diff --git a/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java b/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java index de424ac..7d14891 100644 --- a/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java +++ b/nifi-encrypt-value-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java @@ -43,7 +43,7 @@ public class FormatStream { private static final Logger logger = LoggerFactory.getLogger(FormatStream.class); public static InputStream avroToJson(InputStream in, Schema schema) throws IOException { - GenericDatumReader reader = new GenericDatumReader(); + DatumReader reader = new GenericDatumReader(); DataFileStream streamReader = new DataFileStream(in, reader); DatumWriter writer = new ExtendedGenericDatumWriter<>(schema); @@ -87,6 +87,14 @@ public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream, return baos; } + public static Schema getEmbeddedSchema(InputStream in) throws IOException { + DatumReader reader = new GenericDatumReader(); + DataFileStream streamReader = new DataFileStream(in, reader); + streamReader.close(); + + return streamReader.getSchema(); + } + private static InputStream convertStream(ByteArrayOutputStream baos) throws IOException { PipedInputStream pin = new PipedInputStream(); PipedOutputStream pout = new PipedOutputStream(pin); diff --git a/nifi-encrypt-value-processors/src/test/java/com/nineteen04labs/processors/encryptvalue/EncryptValueAvroTest.java b/nifi-encrypt-value-processors/src/test/java/com/nineteen04labs/processors/encryptvalue/EncryptValueAvroTest.java index 90f7f09..4b43b05 100644 --- a/nifi-encrypt-value-processors/src/test/java/com/nineteen04labs/processors/encryptvalue/EncryptValueAvroTest.java +++ b/nifi-encrypt-value-processors/src/test/java/com/nineteen04labs/processors/encryptvalue/EncryptValueAvroTest.java @@ -42,6 +42,8 @@ public void setSchema() throws IOException { @Test public void testSHA512() throws IOException { testEncryption("SHA-512"); + runner.clearTransferState(); + testEncryptionNoSchema("SHA-512"); } @Test @@ -78,4 +80,18 @@ private void testEncryption(final String hashAlgorithm) throws IOException { runner.assertAllFlowFilesTransferred(EncryptValueRelationships.REL_SUCCESS, 1); } + private void testEncryptionNoSchema(final String hashAlgorithm) throws IOException { + runner.setProperty(EncryptValueProperties.FIELD_NAMES, "first_name,last_name,card_number"); + runner.setProperty(EncryptValueProperties.FLOW_FORMAT, "AVRO"); + runner.setProperty(EncryptValueProperties.HASH_ALG, hashAlgorithm); + runner.setProperty(EncryptValueProperties.SALT, "ef3de698a8956f6eff8b7344407d861b7"); + runner.setValidateExpressionUsage(false); + + runner.enqueue(unencryptedFile); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(EncryptValueRelationships.REL_SUCCESS, 1); + } + } diff --git a/pom.xml b/pom.xml index ee1a30b..610470f 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ com.nineteen04labs nifi-encrypt-value-bundle - 18.07.4 + 18.07.5 pom