Skip to content

Commit

Permalink
Persistence Component: Introduce Postgres Sink (finos#2737)
Browse files Browse the repository at this point in the history
  • Loading branch information
kumuwu authored Apr 11, 2024
1 parent ce2a688 commit 939caa6
Show file tree
Hide file tree
Showing 135 changed files with 7,112 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,9 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.DigestUdf;
import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.ToArrayFunction;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.LowerCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.UpperCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.api.IngestStatus;
import org.finos.legend.engine.persistence.components.relational.api.IngestorResult;
import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection;
Expand Down Expand Up @@ -177,22 +173,6 @@ private BigQuerySink()
(executor, sink, dataset) -> sink.constructDatasetFromDatabase(dataset, new BigQueryDataTypeToLogicalDataTypeMapping()));
}

@Override
public Optional<Optimizer> optimizerForCaseConversion(CaseConversion caseConversion)
{
switch (caseConversion)
{
case TO_LOWER:
return Optional.of(new LowerCaseOptimizer());
case TO_UPPER:
return Optional.of(new UpperCaseOptimizer());
case NONE:
return Optional.empty();
default:
throw new IllegalArgumentException("Unrecognized case conversion: " + caseConversion);
}
}

@Override
public Executor<SqlGen, TabularData, SqlPlan> getRelationalExecutor(RelationalConnection relationalConnection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public Dataset constructDatasetFromDatabase(Dataset dataset, TypeMapping typeMap
String databaseName = dataset.datasetReference().database().orElse(null);
try
{
if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping))
if (!(typeMapping instanceof JdbcPropertiesToLogicalDataTypeMapping))
{
throw new IllegalStateException("Only JdbcPropertiesToLogicalDataTypeMapping allowed in constructDatasetFromDatabase");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,25 @@ public interface JdbcPropertiesToLogicalDataTypeMapping extends TypeMapping
String SMALLINT = "SMALLINT";
String INTEGER = "INTEGER";
String BIGINT = "BIGINT";
String INT2 = "INT2";
String INT4 = "INT4";
String INT8 = "INT8";
String DECIMAL = "DECIMAL";
String NUMERIC = "NUMERIC";
String DOUBLE = "DOUBLE";
String REAL = "REAL";
String FLOAT4 = "FLOAT4";
String FLOAT8 = "FLOAT8";
String CHAR = "CHAR";
String VARCHAR = "VARCHAR";
String TEXT = "TEXT";
String BPCHAR = "BPCHAR";
String CLOB = "CLOB";
String BINARY = "BINARY";
String VARBINARY = "VARBINARY";
String BYTEA = "BYTEA";
String BIT = "BIT";
String BOOL = "BOOL";
String BOOLEAN = "BOOLEAN";
String DATE = "DATE";
String TIME = "TIME";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2024 Goldman Sachs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.finos.legend.engine.persistence.components.relational.sqldom.schema;

public class Text extends VariableSizeDataType
{

public Text()
{
super("TEXT");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
</dependency>

<!-- TEST -->
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-ansi</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@
import org.finos.legend.engine.persistence.components.logicalplan.values.StagedFilesFieldValue;
import org.finos.legend.engine.persistence.components.logicalplan.values.ToArrayFunction;
import org.finos.legend.engine.persistence.components.logicalplan.values.TryCastFunction;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.LowerCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.UpperCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.api.DataError;
import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection;
import org.finos.legend.engine.persistence.components.relational.api.ApiUtils;
Expand Down Expand Up @@ -212,22 +209,6 @@ public Executor<SqlGen, TabularData, SqlPlan> getRelationalExecutor(RelationalCo
}
}

@Override
public Optional<Optimizer> optimizerForCaseConversion(CaseConversion caseConversion)
{
switch (caseConversion)
{
case TO_LOWER:
return Optional.of(new LowerCaseOptimizer());
case TO_UPPER:
return Optional.of(new UpperCaseOptimizer());
case NONE:
return Optional.empty();
default:
throw new IllegalArgumentException("Unrecognized case conversion: " + caseConversion);
}
}

@Override
public IngestorResult performBulkLoad(Datasets datasets, Executor<SqlGen, TabularData, SqlPlan> executor, SqlPlan ingestSqlPlan, Map<StatisticName, SqlPlan> statisticsSqlPlan, Map<String, PlaceholderValue> placeHolderKeyValues)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public DataType getDataType(FieldType type)
case TIMESTAMP:
case DATETIME:
dataType = new Timestamp();
type.scale().ifPresent(dataType::setScale);
type.length().ifPresent(dataType::setLength);
break;
case TIMESTAMP_TZ:
dataType = new TimestampWithTimezone();
type.scale().ifPresent(dataType::setScale);
type.length().ifPresent(dataType::setLength);
break;
case DATE:
dataType = new Date();
Expand All @@ -96,7 +96,7 @@ public DataType getDataType(FieldType type)
break;
case TIME:
dataType = new Time();
type.scale().ifPresent(dataType::setScale);
type.length().ifPresent(dataType::setLength);
break;
case NUMERIC:
dataType = new Numeric();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ public FieldType getDataType(String typeName, String dataType, Integer columnSiz
case DATE:
return FieldType.builder().dataType(DataType.DATE).build();
case TIME:
return FieldType.builder().dataType(DataType.TIME).scale(decimalDigits).build();
return FieldType.builder().dataType(DataType.TIME).length(decimalDigits).build();
case TIMESTAMP:
return FieldType.builder().dataType(DataType.TIMESTAMP).scale(decimalDigits).build();
return FieldType.builder().dataType(DataType.TIMESTAMP).length(decimalDigits).build();
case TIMESTAMP_WITH_TIME_ZONE:
return FieldType.builder().dataType(DataType.TIMESTAMP_TZ).scale(decimalDigits).build();
return FieldType.builder().dataType(DataType.TIMESTAMP_TZ).length(decimalDigits).build();
case JSON:
return FieldType.builder().dataType(DataType.JSON).build();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class TestUtils
public static Field nullableIntIncome = Field.builder().name(incomeName).type(FieldType.of(DataType.INTEGER, Optional.empty(), Optional.empty())).fieldAlias(incomeName).build();
public static Field decimalIncome = Field.builder().name(incomeName).type(FieldType.of(DataType.DECIMAL, 10, 2)).fieldAlias(incomeName).build();
public static Field startTime = Field.builder().name(startTimeName).type(FieldType.of(DataType.DATETIME, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(startTimeName).build();
public static Field startTimeTimestamp = Field.builder().name(startTimeName).type(FieldType.of(DataType.TIMESTAMP, null, 6)).primaryKey(true).fieldAlias(startTimeName).build();
public static Field startTimeTimestamp = Field.builder().name(startTimeName).type(FieldType.of(DataType.TIMESTAMP, 6, null)).primaryKey(true).fieldAlias(startTimeName).build();
public static Field expiryDate = Field.builder().name(expiryDateName).type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).fieldAlias(expiryDateName).build();
public static Field expiryDatePk = Field.builder().name(expiryDateName).type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(expiryDateName).build();
public static Field date = Field.builder().name(dateName).type(FieldType.of(DataType.DATE, Optional.empty(), Optional.empty())).primaryKey(true).fieldAlias(dateName).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@
import org.finos.legend.engine.persistence.components.logicalplan.operations.Create;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Show;
import org.finos.legend.engine.persistence.components.logicalplan.operations.Update;
import org.finos.legend.engine.persistence.components.optimizer.Optimizer;
import org.finos.legend.engine.persistence.components.relational.CaseConversion;
import org.finos.legend.engine.persistence.components.relational.RelationalSink;
import org.finos.legend.engine.persistence.components.relational.SqlPlan;
import org.finos.legend.engine.persistence.components.relational.ansi.AnsiSqlSink;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.LowerCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.ansi.optimizer.UpperCaseOptimizer;
import org.finos.legend.engine.persistence.components.relational.api.RelationalConnection;
import org.finos.legend.engine.persistence.components.executor.RelationalExecutionHelper;
import org.finos.legend.engine.persistence.components.relational.executor.RelationalExecutor;
Expand Down Expand Up @@ -68,7 +64,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

public class MemSqlSink extends AnsiSqlSink
Expand Down Expand Up @@ -176,23 +171,6 @@ public Executor<SqlGen, TabularData, SqlPlan> getRelationalExecutor(RelationalCo
}
}


@Override
public Optional<Optimizer> optimizerForCaseConversion(CaseConversion caseConversion)
{
switch (caseConversion)
{
case TO_LOWER:
return Optional.of(new LowerCaseOptimizer());
case TO_UPPER:
return Optional.of(new UpperCaseOptimizer());
case NONE:
return Optional.empty();
default:
throw new IllegalArgumentException("Unrecognized case conversion: " + caseConversion);
}
}

static final ValidateMainDatasetSchema VALIDATE_MAIN_DATASET_SCHEMA = new ValidateMainDatasetSchema()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2024 Goldman Sachs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component</artifactId>
<version>4.43.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>legend-engine-xt-persistence-component-relational-postgres</artifactId>
<packaging>jar</packaging>
<name>Legend Engine - XT - Persistence - Component - Relational Postgres</name>

<dependencies>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-logical-plan</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-physical-plan</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-core</artifactId>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-ansi</artifactId>
</dependency>

<!-- DRIVER -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.2</version>
<scope>runtime</scope>
</dependency>
<!-- DRIVER -->

<!-- TEST -->
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-test</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-persistence-component-relational-ansi</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 939caa6

Please sign in to comment.