diff --git a/cadc-dali/build.gradle b/cadc-dali/build.gradle index 369867ec..f0d1b516 100644 --- a/cadc-dali/build.gradle +++ b/cadc-dali/build.gradle @@ -28,8 +28,8 @@ dependencies { implementation 'org.opencadc:cadc-util:[1.6,)' implementation 'org.opencadc:cadc-uws:[1.0.4,)' - implementation 'org.apache.parquet:parquet-avro:1.13.1' - implementation 'org.apache.parquet:parquet-hadoop:1.13.1' + implementation 'org.apache.parquet:parquet-avro:1.15.0' + implementation 'org.apache.parquet:parquet-hadoop:1.15.0' implementation 'org.apache.hadoop:hadoop-common:3.3.6' implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:3.3.6' diff --git a/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/DynamicSchemaGenerator.java b/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/DynamicSchemaGenerator.java index 84b0285e..09ccbdc2 100644 --- a/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/DynamicSchemaGenerator.java +++ b/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/DynamicSchemaGenerator.java @@ -1,5 +1,6 @@ package ca.nrc.cadc.dali.tables.parquet; +import ca.nrc.cadc.dali.tables.votable.VOTableField; import org.apache.avro.Schema; import org.apache.log4j.Logger; @@ -15,21 +16,19 @@ public class DynamicSchemaGenerator { private static final Logger log = Logger.getLogger(DynamicSchemaGenerator.class); - public static Schema generateSchema(ResultSet rs) { + public static Schema generateSchema(List voFields) { // List to hold Avro fields List fields = new ArrayList<>(); - try { - ResultSetMetaData metaData = rs.getMetaData(); - int columnCount = metaData.getColumnCount(); + int columnCount = voFields.size(); log.debug("Resultset Metadata Column count = " + columnCount); - for (int i = 1; i <= columnCount; i++) { - String columnName = metaData.getColumnName(i); - Schema.Field field = new Schema.Field(columnName, getAvroFieldType(metaData.getColumnType(i)), null, null); + for (VOTableField voField : voFields) { + String columnName = voField.getName(); + Schema.Field field = new Schema.Field(columnName.replaceAll("\"",""), getAvroFieldType(voField.getDatatype()), null, null); fields.add(field); } log.debug("Schema.Field count = " + fields.size()); - } catch (SQLException e) { + } catch (Exception e) { log.debug("Failure while retriving metadata from ResultSet", e); throw new RuntimeException("Failure while retriving metadata from ResultSet : " + e.getMessage(), e); } @@ -42,38 +41,36 @@ public static Schema generateSchema(ResultSet rs) { return schema; } - private static Schema getAvroFieldType(int sqlType) { + private static Schema getAvroFieldType(String sqlType) { // Map SQL data types to Avro data types Schema fieldType; switch (sqlType) { - case Types.INTEGER: + case "int": fieldType = Schema.create(Schema.Type.INT); break; - case Types.BIGINT: + case "long": fieldType = Schema.create(Schema.Type.LONG); break; - case Types.FLOAT: + case "float": fieldType = Schema.create(Schema.Type.FLOAT); break; - case Types.DOUBLE: + case "double": fieldType = Schema.create(Schema.Type.DOUBLE); break; - case Types.VARCHAR: - case Types.CHAR: + case "char": fieldType = Schema.create(Schema.Type.STRING); break; - case Types.BOOLEAN: + case "boolean": fieldType = Schema.create(Schema.Type.BOOLEAN); break; - case Types.DATE: - case Types.TIMESTAMP: - fieldType = Schema.create(Schema.Type.STRING); // TODO: Discuss(Double is for Start/EndDate, IntTime in VOTable) + case "date": + case "timestamp": + fieldType = Schema.create(Schema.Type.STRING); // TODO: TBD break; default: - fieldType = Schema.create(Schema.Type.STRING); // Default to STRING for unknown types + fieldType = Schema.create(Schema.Type.STRING); } - return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), fieldType)); // TODO: Discuss - + return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), fieldType)); // TODO: TBD } } diff --git a/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/ParquetWriter.java b/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/ParquetWriter.java index 6359f6f9..fafb9e73 100644 --- a/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/ParquetWriter.java +++ b/cadc-dali/src/main/java/ca/nrc/cadc/dali/tables/parquet/ParquetWriter.java @@ -1,6 +1,8 @@ package ca.nrc.cadc.dali.tables.parquet; import ca.nrc.cadc.dali.tables.TableWriter; +import ca.nrc.cadc.dali.tables.votable.VOTableDocument; +import ca.nrc.cadc.dali.tables.votable.VOTableResource; import ca.nrc.cadc.dali.util.FormatFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -16,9 +18,10 @@ import java.io.IOException; import java.io.OutputStream; import java.io.Writer; -import java.sql.ResultSet; +import java.util.Iterator; +import java.util.List; -public class ParquetWriter implements TableWriter { +public class ParquetWriter implements TableWriter { private static final Logger log = Logger.getLogger(ParquetWriter.class); @@ -49,56 +52,60 @@ public void setFormatFactory(FormatFactory ff) { } @Override - public void write(ResultSet tm, OutputStream out) throws IOException { + public void write(VOTableDocument tm, OutputStream out) throws IOException { write(tm, out, Long.MAX_VALUE); } @Override - public void write(ResultSet resultSet, OutputStream out, Long maxRec) throws IOException { + public void write(VOTableDocument resultSet, OutputStream out, Long maxRec) throws IOException { log.debug("ParquetWriter Write service called. MaxRec = " + maxRec); + for (VOTableResource resource : resultSet.getResources()) { + Schema schema = DynamicSchemaGenerator.generateSchema(resource.getTable().getFields()); + OutputFile outputFile = outputFileFromStream(out); + + try (org.apache.parquet.hadoop.ParquetWriter writer = AvroParquetWriter.builder(outputFile) + .withSchema(schema) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withRowGroupSize(Long.valueOf(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE)) + .withPageSize(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE) + .withConf(new Configuration()) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .withValidation(false) + .withDictionaryEncoding(false) + .build()) { + + Iterator> iterator = resource.getTable().getTableData().iterator(); + int recordCount = 1; + + while (iterator.hasNext() && recordCount <= maxRec) { + GenericRecord record = new GenericData.Record(schema); + List rowData = iterator.next(); + int columnIndex = 0; + + for (Schema.Field field : schema.getFields()) { + String columnName = field.name(); + record.put(columnName, rowData.get(columnIndex)); + columnIndex++; + } - Schema schema = DynamicSchemaGenerator.generateSchema(resultSet); - - OutputFile outputFile = outputFileFromStream(out); - - try (org.apache.parquet.hadoop.ParquetWriter writer = AvroParquetWriter.builder(outputFile) - .withSchema(schema) - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withRowGroupSize(Long.valueOf(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE)) - .withPageSize(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE) - .withConf(new Configuration()) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - .withValidation(false) - .withDictionaryEncoding(false) - .build()) { - - int i = 1; - while (resultSet.next() && i <= maxRec) { - GenericRecord record = new GenericData.Record(schema); - for (Schema.Field field : schema.getFields()) { - - String columnName = field.name(); - Object value = resultSet.getObject(columnName); - record.put(columnName, value); + writer.write(record); + recordCount++; + log.debug("Total Records generated= " + (recordCount - 1)); } - writer.write(record); - i++; + } catch (Exception e) { + throw new IOException("error while writing", e); } - - log.debug("Total Records generated= " + (i - 1)); - out.close(); - } catch (Exception e) { - throw new IOException("error while writing", e); } + out.close(); } @Override - public void write(ResultSet tm, Writer out) { + public void write(VOTableDocument tm, Writer out) { throw new UnsupportedOperationException("This method for Parquet Writer is not supported."); } @Override - public void write(ResultSet resultSet, Writer out, Long maxRec) { + public void write(VOTableDocument resultSet, Writer out, Long maxRec) { throw new UnsupportedOperationException("This method for Parquet Writer is not supported."); }