Skip to content

Commit

Permalink
Add support for INSERT OVERWRITE
Browse files Browse the repository at this point in the history
  • Loading branch information
Becket Qin authored and becketqin committed Apr 25, 2024
1 parent e1b1981 commit d4514b0
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static <T> SingleOutputSqlTransform<T> of(Class<T> outputClass) {

/**
* Create a {@link StatementOnlySqlTransform} which takes a full script of SQL statements and
* execute them. The statements must have at least one <code>INSERT INTO</code> statement.
* execute them. The statements must have at least one <code>INSERT</code> statement.
*
* @return A {@link StatementOnlySqlTransform}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class StatementOnlyFlinkSqlTransformTranslator
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<PBegin, PDone>>{
private static final Logger LOG = LoggerFactory.getLogger(StatementOnlyFlinkSqlTransformTranslator.class);
public static final String FLINK_STATEMENT_ONLY_SQL_URN = "beam:transform:flink:sql-statements-only:v1";
private static final String INSERT_INTO = "INSERT INTO";
private static final String INSERT = "INSERT";

@Override
public void translateNode(PTransform<PBegin, PDone> transform, FlinkStreamingTranslationContext context) {
Expand All @@ -53,7 +53,7 @@ public void translateNode(PTransform<PBegin, PDone> transform, FlinkStreamingTra
for (String statement : sqlTransform.getStatements()) {
combinedStatements.add(statement);
try {
if (isInsertIntoStatement(statement)) {
if (isInsertStatement(statement)) {
ss.addInsertSql(statement);
} else {
// Not an insert into statement. Treat it as a DDL.
Expand Down Expand Up @@ -103,7 +103,7 @@ public static class FlinkSqlTransformsRegistrar
}

// ------------------- private helper methods -----------------
private static boolean isInsertIntoStatement(String statement) {
return statement.substring(0, INSERT_INTO.length()).toUpperCase().startsWith(INSERT_INTO);
private static boolean isInsertStatement(String statement) {
return statement.substring(0, INSERT.length()).toUpperCase().startsWith(INSERT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ public void testEmptyStatements() {
pipeline.run(getPipelineOptions());
}

@Test
public void testInsertOverwrite() {
SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog");

Pipeline pipeline = Pipeline.create();
StatementOnlySqlTransform transform = SqlTransform.ofStatements()
.withCatalog("MyCatalog", catalog)
.addStatement("INSERT OVERWRITE MyCatalog.TestDatabase.OrdersVerify SELECT * FROM MyCatalog.TestDatabase.Orders;");

pipeline.apply(transform);
pipeline.run(getPipelineOptions());
}

// ----------------
private void testBasics(boolean isStreaming) {
SerializableCatalog catalog = TestingInMemCatalogFactory.getCatalog("TestCatalog");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.types.logical.BigIntType;
Expand Down Expand Up @@ -90,6 +91,12 @@ public DynamicTableSink createDynamicTableSink(Context tableSinkFactoryContext)
final List<String> columnsToVerify = schema.getColumnNames();

return new SerializableDynamicTableSink() {

@Override
public void applyOverwrite(boolean b) {
// Do nothing.
}

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return ChangelogMode.all();
Expand Down Expand Up @@ -231,7 +238,7 @@ private static ExpectedResultTable readRowsFromFile(String path, boolean hasHead
}
}

interface SerializableDynamicTableSink extends DynamicTableSink, Serializable {
interface SerializableDynamicTableSink extends DynamicTableSink, SupportsOverwrite, Serializable {

}

Expand Down

0 comments on commit d4514b0

Please sign in to comment.