Skip to content

Commit

Permalink
Infer avro schema if not provided
Browse files Browse the repository at this point in the history
  • Loading branch information
cavemandaveman committed Jul 27, 2018
1 parent cace192 commit bac8b6d
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 8 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ Determines what hashing algorithm should be used to perform the encryption
- ~~Add support for more hashing algorithms~~
- ~~Support salting~~
- Allow choice of Avro compression (Snappy, bzip2, etc.)
- Infer Avro schema if not passed in
- ~~Infer Avro schema if not passed in~~
- Better unit tests for Avro
6 changes: 3 additions & 3 deletions nifi-encrypt-value-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-bundle</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
</parent>

<artifactId>nifi-encrypt-value-nar</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-processors</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion nifi-encrypt-value-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-bundle</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
</parent>

<artifactId>nifi-encrypt-value-processors</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,14 @@ public void process(InputStream in, OutputStream out) throws IOException {
JsonGenerator jsonGen = jsonFactory.createGenerator(baos);

Schema schema = null;

if (flowFormat.equals("AVRO")) {
schema = new Schema.Parser().parse(schemaString);
try {
schema = new Schema.Parser().parse(schemaString);
} catch (NullPointerException e) {
schema = FormatStream.getEmbeddedSchema(in);
in.reset();
}
in = FormatStream.avroToJson(in, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class FormatStream {
private static final Logger logger = LoggerFactory.getLogger(FormatStream.class);

public static InputStream avroToJson(InputStream in, Schema schema) throws IOException {
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
DatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileStream<Object> streamReader = new DataFileStream<Object>(in, reader);
DatumWriter<Object> writer = new ExtendedGenericDatumWriter<>(schema);

Expand Down Expand Up @@ -87,6 +87,14 @@ public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream,
return baos;
}

public static Schema getEmbeddedSchema(InputStream in) throws IOException {
DatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileStream<Object> streamReader = new DataFileStream<Object>(in, reader);
streamReader.close();

return streamReader.getSchema();
}

private static InputStream convertStream(ByteArrayOutputStream baos) throws IOException {
PipedInputStream pin = new PipedInputStream();
PipedOutputStream pout = new PipedOutputStream(pin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public void setSchema() throws IOException {
@Test
public void testSHA512() throws IOException {
testEncryption("SHA-512");
runner.clearTransferState();
testEncryptionNoSchema("SHA-512");
}

@Test
Expand Down Expand Up @@ -78,4 +80,18 @@ private void testEncryption(final String hashAlgorithm) throws IOException {
runner.assertAllFlowFilesTransferred(EncryptValueRelationships.REL_SUCCESS, 1);
}

private void testEncryptionNoSchema(final String hashAlgorithm) throws IOException {
runner.setProperty(EncryptValueProperties.FIELD_NAMES, "first_name,last_name,card_number");
runner.setProperty(EncryptValueProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(EncryptValueProperties.HASH_ALG, hashAlgorithm);
runner.setProperty(EncryptValueProperties.SALT, "ef3de698a8956f6eff8b7344407d861b7");
runner.setValidateExpressionUsage(false);

runner.enqueue(unencryptedFile);

runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(EncryptValueRelationships.REL_SUCCESS, 1);
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-encrypt-value-bundle</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
<packaging>pom</packaging>

<modules>
Expand Down

0 comments on commit bac8b6d

Please sign in to comment.