Skip to content

Commit

Permalink
ALS-7810: Finalize data dictionary pfb output
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Nov 20, 2024
1 parent dc0ebe3 commit 842fc23
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

import java.util.Map;

public record Concept(String conceptPath, String name, Map<String, String> meta) {
public record Concept(String type, String conceptPath, String name, String display, String dataset, String description, Map<String, String> meta) {
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing.io;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.Concept;
import edu.harvard.hms.dbmi.avillach.hpds.processing.dictionary.DictionaryService;
import org.apache.avro.Schema;
Expand All @@ -24,7 +26,7 @@
public class PfbWriter implements ResultWriter {

public static final String PATIENT_TABLE_PREFIX = "pic-sure-";
public static final String DRS_URL_TABLE_PREFIX = "drs-url-";
public static final String DATA_DICTIONARY_TABLE_PREFIX = "data-dictionary-";
private Logger log = LoggerFactory.getLogger(PfbWriter.class);

private final DictionaryService dictionaryService;
Expand All @@ -35,7 +37,7 @@ public class PfbWriter implements ResultWriter {
private final String queryId;

private final String patientTableName;
private final String drsUrlTableName;
private final String dataDictionaryTableName;
private SchemaBuilder.FieldAssembler<Schema> entityFieldAssembler;

private List<String> originalFields;
Expand All @@ -44,7 +46,7 @@ public class PfbWriter implements ResultWriter {
private File file;
private Schema entitySchema;
private Schema patientDataSchema;
private Schema drsUriSchema;
private Schema dataDictionarySchema;
private Schema relationSchema;

private static final Set<String> SINGULAR_FIELDS = Set.of("patient_id");
Expand All @@ -54,7 +56,7 @@ public PfbWriter(File tempFile, String queryId, DictionaryService dictionaryServ
this.queryId = queryId;
this.dictionaryService = dictionaryService;
this.patientTableName = formatFieldName(PATIENT_TABLE_PREFIX + queryId);
this.drsUrlTableName = formatFieldName(DRS_URL_TABLE_PREFIX + queryId);
this.dataDictionaryTableName = formatFieldName(DATA_DICTIONARY_TABLE_PREFIX + queryId);
entityFieldAssembler = SchemaBuilder.record("entity")
.namespace("edu.harvard.dbmi")
.fields();
Expand Down Expand Up @@ -85,10 +87,14 @@ public void writeHeader(String[] data) {
originalFields = List.of(data);
formattedFields = originalFields.stream().map(this::formatFieldName).collect(Collectors.toList());

drsUriSchema = SchemaBuilder.record(drsUrlTableName)
dataDictionarySchema = SchemaBuilder.record(dataDictionaryTableName)
.fields()
.requiredString("concept_path")
.name("drs_uri").type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault()
.nullableString("type", "null")
.nullableString("display", "null")
.nullableString("dataset", "null")
.nullableString("description", "null")
.endRecord();

SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record(patientTableName)
Expand All @@ -103,7 +109,7 @@ public void writeHeader(String[] data) {
});
patientDataSchema = patientRecords.endRecord();

Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, drsUriSchema);
Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema, dataDictionarySchema);

entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault();
entityFieldAssembler.nullableString("id", "null");
Expand All @@ -122,36 +128,52 @@ public void writeHeader(String[] data) {
}

writeMetadata();
writeDrsUris();
writeDataDictionary();
}

private void writeDrsUris() {
private void writeDataDictionary() {
GenericRecord entityRecord = new GenericData.Record(entitySchema);;
Map<String, Concept> conceptMap = Map.of();
try {
conceptMap = dictionaryService.getConcepts(originalFields).stream()
.collect(Collectors.toMap(Concept::conceptPath, Function.identity()));
} catch (RuntimeException e) {
log.error("Error fetching DRS URIs from dictionary service", e);
log.error("Error fetching concepts from dictionary service", e);
return;
}

for (int i = 0; i < formattedFields.size(); i++) {
GenericRecord drsUriData = new GenericData.Record(drsUriSchema);
drsUriData.put("concept_path", formattedFields.get(i));
String formattedField = formattedFields.get(i);
if ("patient_id".equals(formattedField)) {
continue;
}
GenericRecord dataDictionaryData = new GenericData.Record(dataDictionarySchema);
dataDictionaryData.put("concept_path", formattedField);

Concept concept = conceptMap.get(originalFields.get(i));
List<String> drsUris = List.of();
if (concept != null) {
Map<String, String> meta = concept.meta();
if (meta != null) {
drsUris = new ArrayList<>(meta.keySet().stream().toList());
drsUris.addAll(meta.values().stream().toList());
String drsUriJson = meta.get("drs_uri");
if (drsUriJson != null) {
try {
String[] drsUriArray = new ObjectMapper().readValue(drsUriJson, String[].class);
drsUris = List.of(drsUriArray);
} catch (JsonProcessingException e) {
log.error("Error parsing drs_uri as json: " + drsUriJson);
}
}
}
dataDictionaryData.put("type", concept.type());
dataDictionaryData.put("display", concept.display());
dataDictionaryData.put("dataset", concept.dataset());
dataDictionaryData.put("description", concept.description());
}
drsUriData.put("drs_uri", drsUris);
dataDictionaryData.put("drs_uri", drsUris);

entityRecord.put("object", drsUriData);
entityRecord.put("name", drsUrlTableName);
entityRecord.put("object", dataDictionaryData);
entityRecord.put("name", dataDictionaryTableName);
entityRecord.put("id", "null");
entityRecord.put("relations", List.of());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void writeValidPFB() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"), UUID.randomUUID().toString(), dictionaryService);

Mockito.when(dictionaryService.getConcepts(List.of("patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\")))
.thenReturn(List.of(new Concept("\\demographics\\age\\", "age", Map.of("drs_uri", "a-drs.uri"))));
.thenReturn(List.of(new Concept("Categorical", "\\demographics\\age\\", "age", "AGE", "demographics", "patient age", Map.of("drs_uri", "a-drs.uri"))));

pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"});
List<List<String>> nullableList = new ArrayList<>();
Expand Down

0 comments on commit 842fc23

Please sign in to comment.