Skip to content

Commit

Permalink
We keep set semantics for the core rdf triples, by merging multiple g…
Browse files Browse the repository at this point in the history
…raph memberships into one list column
  • Loading branch information
JervenBolleman committed Dec 18, 2023
1 parent 81e5b4c commit bc13d4d
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 35 deletions.
43 changes: 31 additions & 12 deletions src/main/java/swiss/sib/swissprot/r2s2/loading/LoadIntoTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,36 @@ public final class LoadIntoTable implements AutoCloseable {
private final IRI predicate;
private final Table table;
private final Inserter inserter;
private volatile int c = 1;

private final Lock lock = new ReentrantLock();
private final Connection conn;

private interface Inserter {
private interface Inserter extends AutoCloseable {
public void add(String[] subjParts, Resource subj, String[] objParts, Value obj, int tempGraphId)
throws SQLException;

public void close() throws SQLException;

public default long count() {
return 0L;
}
}

private record JdbcInserter(PreparedStatement stat) implements Inserter {
public JdbcInserter(Connection conn, String name, GroupOfColumns subjectColumns, GroupOfColumns objectColumns)
throws SQLException {
this(conn.prepareStatement("insert into " + name + '('
this(generateSql(conn, name, subjectColumns, objectColumns));
}

private static PreparedStatement generateSql(Connection conn, String name, GroupOfColumns subjectColumns,
GroupOfColumns objectColumns) throws SQLException {
String insertViaSql = "insert into " + name + '('
+ Stream.concat(subjectColumns.columns().stream(), objectColumns.columns().stream())
.map(Column::name).collect(Collectors.joining(", "))
+ ") values (" + Stream.concat(subjectColumns.columns().stream(), objectColumns.columns().stream())
.map(c -> "?").collect(Collectors.joining(", "))
+ ")"));
+ ")";
return conn.prepareStatement(insertViaSql);
}

public void add(String[] subjParts, Resource subj, String[] objParts, Value obj, int tempGraphId)
Expand Down Expand Up @@ -103,14 +113,17 @@ private int add(String[] parseO, Value v, PreparedStatement stat, int index) thr
}
}

private class DuckDbInserter implements Inserter {
private static class DuckDbInserter implements Inserter {
private final DuckDBAppender appender;
private final DuckDBConnection conn;
private volatile int count = 1;
private final String tableName;

public DuckDbInserter(DuckDBConnection conn) throws SQLException {
public DuckDbInserter(DuckDBConnection conn, String tableName) throws SQLException {
super();
this.conn = conn;
this.appender = conn.createAppender("", table.name());
this.tableName = tableName;
this.appender = conn.createAppender("", tableName);
}

public void add(String[] subjParts, Resource subj, String[] objParts, Value obj, int tempGraphId)
Expand All @@ -120,11 +133,11 @@ public void add(String[] subjParts, Resource subj, String[] objParts, Value obj,
add(objParts, obj, appender);
appender.append(tempGraphId);
appender.endRow();
if (c % FLUSH_EVERY_X == 0) {
if (count % FLUSH_EVERY_X == 0) {
appender.flush();
logger.info("Flushed " + table.name() + " now has " + c + " rows");
logger.info("Flushed " + tableName + " now has " + count + " rows");
}
c++;
count++;
}

public void close() throws SQLException {
Expand All @@ -142,6 +155,10 @@ private void add(String[] parseO, Value subjectS, DuckDBAppender appender) throw
}
}
}

public long count() {
return count;
}
}

public LoadIntoTable(Statement template, Connection masterConn, TemporaryIriIdMap tgid, TempIriId predicate,
Expand Down Expand Up @@ -170,7 +187,8 @@ public LoadIntoTable(Statement template, Connection masterConn, TemporaryIriIdMa
final String tableName = tableName(predicate, namespaces, subjectKind, objectKind, lang, datatype);
this.table = makeTable(predicate, subjectColumns, objectColumns, tableName);
if (masterConn instanceof DuckDBConnection) {
this.inserter = new DuckDbInserter((DuckDBConnection) ((DuckDBConnection) masterConn).duplicate());
this.inserter = new DuckDbInserter((DuckDBConnection) ((DuckDBConnection) masterConn).duplicate(),
this.table.name());
} else {
this.inserter = new JdbcInserter(masterConn, this.table.name(), subjectColumns, objectColumns);
}
Expand Down Expand Up @@ -210,8 +228,8 @@ private static String tableName(TempIriId predicate, Map<String, String> namespa
public void close() throws SQLException {
if (!closed) {
this.inserter.close();
logger.info("Closed " + table.name() + " now has " + this.inserter.count() + " rows");
}
logger.info("Closed " + table.name() + " now has " + c + " rows");
closed = true;
}

Expand Down Expand Up @@ -270,6 +288,7 @@ public void write(Statement statement) throws SQLException {
int tempGraphId = getTemporaryGraphId(statement);
write(subjectS, objectS, tempGraphId);
} else {
logger.error("Statement without an attached graph.");
Loader.Failures.NO_GRAPH.exit();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package swiss.sib.swissprot.r2s2.optimization;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -14,6 +16,7 @@
import swiss.sib.swissprot.r2s2.sql.Column;
import swiss.sib.swissprot.r2s2.sql.GroupOfColumns;
import swiss.sib.swissprot.r2s2.sql.PredicateMap;
import swiss.sib.swissprot.r2s2.sql.SqlDatatype;
import swiss.sib.swissprot.r2s2.sql.Table;

public class ReintroduceRdfSetSemantics {
Expand All @@ -25,36 +28,118 @@ public static void optimize(Connection conn, Table table) {
try {
String tempName = table.name() + "_temp";

try (Statement st = conn.createStatement()) {
String columns = allColumns(table).map(Column::definition).collect(Collectors.joining(", "));
final String sql = "CREATE TABLE " + tempName + "(" + columns + ")";
log.info("Running:" + sql);
st.execute(sql);
}
try (Statement st = conn.createStatement()) {
String columns = allColumns(table).map(Column::name).collect(Collectors.joining(", "));
final String sql = "INSERT INTO " + tempName + "(" + columns + ") SELECT DISTINCT " + columns
+ " FROM " + table.name() + " ORDER BY " + columns;
log.info("Running:" + sql);
st.execute(sql);
}
try (Statement st = conn.createStatement()) {
final String sql = "DROP TABLE " + table.name();
log.info("Running:" + sql);
st.execute(sql);
}
try (Statement st = conn.createStatement()) {
final String sql = "ALTER TABLE " + tempName + " RENAME TO " + table.name();
log.info("Running:" + sql);
st.execute(sql);
// if more than one graph per subject/object

Map<Boolean, List<Column>> m = table.subject().columns().stream()
.collect(Collectors.groupingBy(GroupOfColumns::isAGraphColumn));

List<Column> nonGraphColumns = m.get(Boolean.FALSE);
List<Column> graphColumns = m.get(Boolean.TRUE);
boolean subjectsInOneGraph = seeIfSubjectsAreInMultipleGraphs(table.name(), conn,
nonGraphColumns, graphColumns);
if (!subjectsInOneGraph) {
assert graphColumns.size() == 1;
graphColumns.get(0);

createNewTableForOneSubjectMultipleGraphs(conn, table, tempName, nonGraphColumns, graphColumns);
} else {
createNewTableForOneSubjectOneGraph(conn, table, tempName);
}

renameTemporaryTableIntoFinalName(conn, table, tempName);
JdbcUtil.commitIfNeeded(conn);
} catch (SQLException e) {
throw new IllegalStateException(e);
}
}
}

private static void renameTemporaryTableIntoFinalName(Connection conn, Table table, String tempName)
throws SQLException {
try (Statement st = conn.createStatement()) {
final String sql = "DROP TABLE " + table.name();
log.info("Running:" + sql);
st.execute(sql);
}
try (Statement st = conn.createStatement()) {
final String sql = "ALTER TABLE " + tempName + " RENAME TO " + table.name();
log.info("Running:" + sql);
st.execute(sql);
}
}

private static void createNewTableForOneSubjectMultipleGraphs(Connection conn, Table table, String tempName, List<Column> nonGraphColumns, List<Column> graphColumns)
throws SQLException {
for (Column gc:graphColumns) {
gc.setDatatype(SqlDatatype.GRAPH_IRIS_LIST);
}
String nonGraphColumnNames = nonGraphColumns.stream().map(Column::name).collect(Collectors.joining(", "));
String graphColumnNames = graphColumns.stream().map(Column::name).collect(Collectors.joining(", "));
try (Statement st = conn.createStatement()) {
String columns = allColumns(table).map(Column::definition).collect(Collectors.joining(", "));
final String sql = "CREATE TABLE " + tempName + "(" + columns + ")";
log.info("Running:" + sql);
st.execute(sql);
}
try (Statement st = conn.createStatement()) {
String columns = allColumns(table).map(Column::name).collect(Collectors.joining(", "));
final String sql = "INSERT INTO " + tempName + "(" + nonGraphColumnNames + ", LIST(DISTINCT "
+ graphColumnNames + ")) SELECT DISTINCT " + columns + " FROM " + table.name() + "GROUP BY "
+ nonGraphColumnNames + " ORDER BY " + nonGraphColumnNames;
log.info("Running:" + sql);
st.execute(sql);
}
}

private static void createNewTableForOneSubjectOneGraph(Connection conn, Table table, String tempName)
throws SQLException {
try (Statement st = conn.createStatement()) {
String columns = allColumns(table).map(Column::definition).collect(Collectors.joining(", "));
final String sql = "CREATE TABLE " + tempName + "(" + columns + ")";
log.info("Running:" + sql);
st.execute(sql);
}
try (Statement st = conn.createStatement()) {
String columns = allColumns(table).map(Column::name).collect(Collectors.joining(", "));
final String sql = "INSERT INTO " + tempName + "(" + columns + ") SELECT DISTINCT " + columns
+ " FROM " + table.name() + " ORDER BY " + columns;
log.info("Running:" + sql);
st.execute(sql);
}
}

private static boolean seeIfSubjectsAreInMultipleGraphs(String tableName, Connection conn,
List<Column> nonGraphColumns, List<Column> graphColumns) throws SQLException {
if (graphColumns == null || graphColumns.isEmpty()) {
return true;
}
try (Statement st = conn.createStatement()) {
String nonGraphColumnNames = nonGraphColumns.stream().map(Column::name).collect(Collectors.joining(", "));
String graphColumnNames = graphColumns.stream().map(Column::name).collect(Collectors.joining(", "));

String debugsql = "SELECT " + nonGraphColumnNames + ","+graphColumnNames+" FROM " + tableName + "";
log.info("Running:" + debugsql );
try (ResultSet rs = st.executeQuery(debugsql)) {
while (rs.next()) {
log.info(rs.getString(4));
}
}

String sql = "SELECT " + nonGraphColumnNames + " FROM " + tableName + " GROUP BY "
+ nonGraphColumnNames + " HAVING (COUNT(DISTINCT " + graphColumnNames
+ ") > 1)";
log.info("Running:" + sql);
boolean subjectsInOneGraph = true;
try (ResultSet rs = st.executeQuery(sql)) {
if (rs.next()) {
log.info("No distinct graphs per subject.");
subjectsInOneGraph = false;
}
}
return subjectsInOneGraph;
}
}

public static Stream<Column> allColumns(Table table) {
final Stream<Column> subjectColums = table.subject().columns().stream();
final Stream<Column> objectColums = table.objects().stream().map(PredicateMap::groupOfColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
public enum SqlDatatype {
BOOLEAN("boolean"), NUMERIC("numeric"), TEXT("text"), DATE("date"), TIMESTAMP("timestamp"), INTERVAL("interval"),
BLOB("blob"), LIST("list"), STRUCT("struct"), MAP("map"), UNION("union"), INTEGER("integer"), BIGINT("bigint"),
DOUBLE("double"), FLOAT("float"), GRAPH_IRIS("graph_iris"), SCHEME("scheme"), HOST("host"), GYEAR("gyear");
DOUBLE("double"), FLOAT("float"), GRAPH_IRIS("graph_iris"), SCHEME("scheme"), HOST("host"), GYEAR("gyear"), GRAPH_IRIS_LIST("graph_iris[]");

private final String sql;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package swiss.sib.swissprot.r2s2.optimization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Map;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.junit.jupiter.api.Test;

import swiss.sib.swissprot.r2s2.loading.LoadIntoTable;
import swiss.sib.swissprot.r2s2.loading.TemporaryIriIdMap;
import swiss.sib.swissprot.r2s2.loading.TemporaryIriIdMap.TempIriId;
import swiss.sib.swissprot.r2s2.sql.Table;

public class ReintroduceRdfSetSemanticsTest {
private static final String NS = "http://example.org/";
private static final ValueFactory vf = SimpleValueFactory.getInstance();
private static final IRI zeroIri = vf.createIRI(NS, "1");
private static final IRI oneIri = vf.createIRI(NS, "2");
private static final IRI zeroGraph = vf.createIRI(NS, "zeroGraph");
private static final IRI oneGraph = vf.createIRI(NS, "oneGraph");

@Test
void twoGraphsOneSubject() throws SQLException, IOException {
Map<String, String> ns = Map.of(RDF.PREFIX, RDF.NAMESPACE, "ex", NS);
TemporaryIriIdMap p = new TemporaryIriIdMap();
TempIriId pt = p.temporaryIriId(RDF.TYPE);
try (Connection conn = DriverManager.getConnection("jdbc:duckdb:")) {

Statement test = vf.createStatement(oneIri, pt, zeroIri, zeroGraph);
Statement test2 = vf.createStatement(oneIri, pt, zeroIri, oneGraph);

Table t = load(ns, conn, test, test2, p, pt);
assertEquals(2, countAllRows(t.name(), conn));
}
}

private long countAllRows(String name, Connection conn) throws SQLException {
try (var s = conn.createStatement(); var rs = s.executeQuery("SELECT COUNT(*) FROM " + name)) {
assertTrue(rs.next());
return rs.getLong(1);
}
}

private Table load(Map<String, String> ns, Connection conn, Statement test, Statement test2, TemporaryIriIdMap p,
TempIriId pt) throws SQLException, IOException {
try (LoadIntoTable loadIntoTable = new LoadIntoTable(test, conn, p, pt, ns)) {
assertTrue(loadIntoTable.testForAcceptance(test));
loadIntoTable.write(test);
assertTrue(loadIntoTable.testForAcceptance(test2));
loadIntoTable.write(test2);
return loadIntoTable.table();
}
}
}

0 comments on commit bc13d4d

Please sign in to comment.