Skip to content

Commit

Permalink
Replaced Resultset with VOTableDocument.
Browse files Browse the repository at this point in the history
  • Loading branch information
aratikakadiya committed Dec 10, 2024
1 parent 570f299 commit 6cea8cb
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 60 deletions.
4 changes: 2 additions & 2 deletions cadc-dali/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ dependencies {
implementation 'org.opencadc:cadc-util:[1.6,)'
implementation 'org.opencadc:cadc-uws:[1.0.4,)'

implementation 'org.apache.parquet:parquet-avro:1.13.1'
implementation 'org.apache.parquet:parquet-hadoop:1.13.1'
implementation 'org.apache.parquet:parquet-avro:1.15.0'
implementation 'org.apache.parquet:parquet-hadoop:1.15.0'
implementation 'org.apache.hadoop:hadoop-common:3.3.6'
implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:3.3.6'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ca.nrc.cadc.dali.tables.parquet;

import ca.nrc.cadc.dali.tables.votable.VOTableField;
import org.apache.avro.Schema;
import org.apache.log4j.Logger;

Expand All @@ -15,21 +16,19 @@ public class DynamicSchemaGenerator {

private static final Logger log = Logger.getLogger(DynamicSchemaGenerator.class);

public static Schema generateSchema(ResultSet rs) {
public static Schema generateSchema(List<VOTableField> voFields) {
// List to hold Avro fields
List<Schema.Field> fields = new ArrayList<>();

try {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
int columnCount = voFields.size();
log.debug("Resultset Metadata Column count = " + columnCount);
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
Schema.Field field = new Schema.Field(columnName, getAvroFieldType(metaData.getColumnType(i)), null, null);
for (VOTableField voField : voFields) {
String columnName = voField.getName();
Schema.Field field = new Schema.Field(columnName.replaceAll("\"",""), getAvroFieldType(voField.getDatatype()), null, null);
fields.add(field);
}
log.debug("Schema.Field count = " + fields.size());
} catch (SQLException e) {
} catch (Exception e) {
log.debug("Failure while retriving metadata from ResultSet", e);
throw new RuntimeException("Failure while retriving metadata from ResultSet : " + e.getMessage(), e);
}
Expand All @@ -42,38 +41,36 @@ public static Schema generateSchema(ResultSet rs) {
return schema;
}

private static Schema getAvroFieldType(int sqlType) {
private static Schema getAvroFieldType(String sqlType) {
// Map SQL data types to Avro data types
Schema fieldType;
switch (sqlType) {
case Types.INTEGER:
case "int":
fieldType = Schema.create(Schema.Type.INT);
break;
case Types.BIGINT:
case "long":
fieldType = Schema.create(Schema.Type.LONG);
break;
case Types.FLOAT:
case "float":
fieldType = Schema.create(Schema.Type.FLOAT);
break;
case Types.DOUBLE:
case "double":
fieldType = Schema.create(Schema.Type.DOUBLE);
break;
case Types.VARCHAR:
case Types.CHAR:
case "char":
fieldType = Schema.create(Schema.Type.STRING);
break;
case Types.BOOLEAN:
case "boolean":
fieldType = Schema.create(Schema.Type.BOOLEAN);
break;
case Types.DATE:
case Types.TIMESTAMP:
fieldType = Schema.create(Schema.Type.STRING); // TODO: Discuss(Double is for Start/EndDate, IntTime in VOTable)
case "date":
case "timestamp":
fieldType = Schema.create(Schema.Type.STRING); // TODO: TBD
break;
default:
fieldType = Schema.create(Schema.Type.STRING); // Default to STRING for unknown types
fieldType = Schema.create(Schema.Type.STRING);
}
return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), fieldType)); // TODO: Discuss

return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), fieldType)); // TODO: TBD
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ca.nrc.cadc.dali.tables.parquet;

import ca.nrc.cadc.dali.tables.TableWriter;
import ca.nrc.cadc.dali.tables.votable.VOTableDocument;
import ca.nrc.cadc.dali.tables.votable.VOTableResource;
import ca.nrc.cadc.dali.util.FormatFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand All @@ -16,9 +18,10 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
import java.sql.ResultSet;
import java.util.Iterator;
import java.util.List;

public class ParquetWriter implements TableWriter<ResultSet> {
public class ParquetWriter implements TableWriter<VOTableDocument> {

private static final Logger log = Logger.getLogger(ParquetWriter.class);

Expand Down Expand Up @@ -49,56 +52,60 @@ public void setFormatFactory(FormatFactory ff) {
}

@Override
public void write(ResultSet tm, OutputStream out) throws IOException {
public void write(VOTableDocument tm, OutputStream out) throws IOException {
write(tm, out, Long.MAX_VALUE);
}

@Override
public void write(ResultSet resultSet, OutputStream out, Long maxRec) throws IOException {
public void write(VOTableDocument resultSet, OutputStream out, Long maxRec) throws IOException {
log.debug("ParquetWriter Write service called. MaxRec = " + maxRec);
for (VOTableResource resource : resultSet.getResources()) {
Schema schema = DynamicSchemaGenerator.generateSchema(resource.getTable().getFields());
OutputFile outputFile = outputFileFromStream(out);

try (org.apache.parquet.hadoop.ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withRowGroupSize(Long.valueOf(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE))
.withPageSize(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE)
.withConf(new Configuration())
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(false)
.withDictionaryEncoding(false)
.build()) {

Iterator<List<Object>> iterator = resource.getTable().getTableData().iterator();
int recordCount = 1;

while (iterator.hasNext() && recordCount <= maxRec) {
GenericRecord record = new GenericData.Record(schema);
List<Object> rowData = iterator.next();
int columnIndex = 0;

for (Schema.Field field : schema.getFields()) {
String columnName = field.name();
record.put(columnName, rowData.get(columnIndex));
columnIndex++;
}

Schema schema = DynamicSchemaGenerator.generateSchema(resultSet);

OutputFile outputFile = outputFileFromStream(out);

try (org.apache.parquet.hadoop.ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(outputFile)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withRowGroupSize(Long.valueOf(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE))
.withPageSize(org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE)
.withConf(new Configuration())
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withValidation(false)
.withDictionaryEncoding(false)
.build()) {

int i = 1;
while (resultSet.next() && i <= maxRec) {
GenericRecord record = new GenericData.Record(schema);
for (Schema.Field field : schema.getFields()) {

String columnName = field.name();
Object value = resultSet.getObject(columnName);
record.put(columnName, value);
writer.write(record);
recordCount++;
log.debug("Total Records generated= " + (recordCount - 1));
}
writer.write(record);
i++;
} catch (Exception e) {
throw new IOException("error while writing", e);
}

log.debug("Total Records generated= " + (i - 1));
out.close();
} catch (Exception e) {
throw new IOException("error while writing", e);
}
out.close();
}

@Override
public void write(ResultSet tm, Writer out) {
public void write(VOTableDocument tm, Writer out) {
throw new UnsupportedOperationException("This method for Parquet Writer is not supported.");
}

@Override
public void write(ResultSet resultSet, Writer out, Long maxRec) {
public void write(VOTableDocument resultSet, Writer out, Long maxRec) {
throw new UnsupportedOperationException("This method for Parquet Writer is not supported.");
}

Expand Down

0 comments on commit 6cea8cb

Please sign in to comment.