Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL PTransform Update. #112

Merged
merged 3 commits into from
Feb 24, 2024
Merged

Conversation

becketqin
Copy link
Collaborator

The existing SqlPTransform has a few limitations by design:

  1. It only supports batch mode.
  2. It assumes there is always a downstream PTransform after the SQLPtransform, i.e. no INSERT INTO statement is supported.

This patch removes these two restrictions.

Copy link

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I have a few minor questions for me to understand. Thanks.


@Override
public PDone expand(PBegin input) {
return PDone.in(input.getPipeline());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we log the statements and catalogs here? Seems better if we log them somewhere for debugging purpose.

Btw, there is an overridable "PTransform.validate(PipelineOptions)" method that you can use to do validations. I think we can double check things like statements are not empty and catalog is valid.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are logging the full statements in the translator. I changed that logging to info and added a debug level logging here. Good point about validation. I added an empty statement check there.

StreamStatementSet ss = tEnv.createStatementSet();
for (String statement : sqlTransform.getStatements()) {
combinedStatements.add(statement);
if (statement.substring(0, INSERT_INTO.length()).toUpperCase().startsWith(INSERT_INTO)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe move this line of logic into a static method like isInsertIntoStatement()? A bit easier to read.

}
// Now attach everything to StreamExecutionEnv.
ss.attachAsDataStream();
LOG.debug("Executing SQL statements:\n {}", combinedStatements);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just make it info log so it's clear :)

public class StatementOnlySqlTransform extends PTransform<PBegin, PDone> {

private final List<String> statements;
private final Map<String, SerializableCatalog> catalogs;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: we can support multiple catalogs here? Curious what's the use case would look like.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sometimes the datasets may come from different external storage systems. For example, we may have a job reading from Hive, MySql and Kafka at the same time. In this case, there might be three catalogs each for one of the external system.

ss.addInsertSql(statement);
} else {
// Not an insert into statement. Treat it as a DDL.
tEnv.executeSql(statement);
Copy link

@xinyuiscool xinyuiscool Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be valid normal query statements without INSERT INTO? I am not very sure how FLink SQL statements look like today. Maybe everything except DDL has INSERT INTO in the beginning?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A query logic in SQL is always a SELECT statement. In SQL the query logic has to be represented by some entity, and such entity would either be a Table or a View. So any SELECT statement will either be a part of CREATE TABLE statement or CREATE VIEW statement. INSERT INTO is effectively binding a SELECT statement with an existing table instead of creating a new one.

From execution perspective, only INSERT INTO will trigger an action to run the query. Without an INSERT INTO statement, all the queries will only generate temporary views or tables. That is why we can call tEnv.executeSql() for all the statements other than INSERT INTO, because they will not really trigger an execution. And we have to append all the INSERT INTO statement in a StatementSet so that multiple INSERT INTO can be executed in the same Flink job.

@becketqin becketqin force-pushed the sql_ptransform_update branch from d30d083 to 5ea8e10 Compare February 22, 2024 21:08
Copy link

@roborahul roborahul left a comment

@becketqin becketqin force-pushed the sql_ptransform_update branch from 5ea8e10 to b645f96 Compare February 23, 2024 22:42
Copy link

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Thanks

@github-actions github-actions bot added the build label Feb 24, 2024
@becketqin becketqin merged commit ff2d3ea into linkedin:li_trunk Feb 24, 2024
6 checks passed
@@ -114,6 +114,16 @@ public static <T> SingleOutputSqlTransform<T> of(Class<T> outputClass) {
return new SingleOutputSqlTransform<>(of(Integer.class, outputClass));
}

/**
* Create a {@link StatementOnlySqlTransform} which takes a full script of SQL statements and
* execute them. The statements must have at least one <code>INSERT INTO</code> statement.
Copy link

@venkata91 venkata91 Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin Sorry for commenting after the PR is merged. Couple of clarifying questions.

  1. What if the SQL statements had a INSERT OVERWRITE instead of INSERT INTO? Is that not a valid StatementOnlySqlTransform?
  2. Similarly, how about having a CREATE TABLE AS SELECT with out INSERT INTO, should we support that as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. We should support both. I'll have a follow up patch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created the PR. However, currenlty we cannot support CTAS because the OSS Flink StreamStatementSet.attachAsDataStream() does not support that yet. We will need to make change to the OSS Flink first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants