-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a PTransform implementation to support Flink SQL. #90
Conversation
@yananhao12 Will you help take a look? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. Have a few comments about simplifying the APIs a bit. Also want to see whether we can keep the API with types in Beam so we don't expose Flink types in the mix.
* Transform. | ||
* @see StreamTableEnvironment#createTemporaryView(String, DataStream) | ||
*/ | ||
FlinkSql<InputT, OutputT> withMainInputTableTypeInformation(TypeInformation<InputT> typeInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we expose this as Beam API, shall we stick to Beam's TypeDescriptor instead of Flink's TypeInformation directly? It seems we can converte between these two classes. Same for the other usage of TypeInformation in the public api methods in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not exposed to the end user. But the user facing API does ask for the TypeInformation
. The purpose of letting users to provide TypeInformation
is to make Beam record -> Flink Table row conversion more efficient.
I agree that it would be good if we can hide this, but that may come at a cost.
More specifically, Flink has many different types of TypeInformation
, including AtomicType
, ObjectArrayType
, BasicArrayType
, ListType
, CompositeType
, etc. Using a specific type of TypeInformation
will make the java object to row conversion more efficient.
For example, if a class is a POJO, then PojoTypeInfo
will be used, and it is a CompositeType
, so when performing DataStream -> Table conversion, Flink will be able to convert the POJO objects to Flink Table Rows directly by having each POJO field as a Table column. For Avro records, we can use AvroTypeInfo
, which is also a CompositeType
.
If we let user only specify the Beam TypeDescriptor
/ Coder
here, then we will only have AtomicType
because the CoderTypeInformation
which wraps the Beam Coder
is an AtomicType
. That means the each record in Beam PCollection
will be put as a whole into a single column with the column data type of RAW
. And users will need to use a UDTF to expand a RAW record into multiple columns, which is an extra step. This introduces additional data format conversion cost.
There might be one way to hide TypeInformation
without compromising performance for a set of records type is following (taking Avro record as an example):
- let users pass in a
Coder
here - Introduce a new
Coder
type calledAvroCoder
which wraps anAvroTypeInfo
. Users will callAvroCoder.of(MyAvroSpecificRecord.class)
to create theAvroCoder
. - In the SQL transform translator, check and see if the Coder is an instance of
AvroCoder
. If so, use theTypeInformation
of the specific record.
The downside of this solution is any record type that wants to be accelerated needs a framework level code change. And the acceleration contract is kind of implicit.
We will be facing similar situation for DataType
of the output table as well, if we want to also hide DataType
from users.
I am leaving the code unchanged as of now. Once we decide which way to go, I can update the patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. I was thinking since for avro, we will have the coder (oss AvroCoder) defined, which will contain schema, so in theory we can generate the corresponding Flink TypeInfo object. But it requires specific logic for avro (as you mentioned above), so probably only for LinkedIn and not worth the extra effort.
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSql.java
Outdated
Show resolved
Hide resolved
* @param <InputT> the input data type. | ||
* @param <OutputT> the output data type. | ||
*/ | ||
private static class FlinkSQLTransformTranslator<InputT, OutputT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we move this to a separate class under package ...flink.translation.sql? Not sure whether we will have slightly variations of this in the future version of flink. i think it might be better to separate this from the FlinkSql API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can separate this class out.
Package wise, at this point, all the translators are in FlinkStreamingTransformTranslators
class. So theoretically speaking we should put this class there as well. However, if we do that, we will need to put the entire FlinkStreamingTransformTranslators
class into the source-override directory. I am a little reluctant to do so because this makes the critical code distributed all over the places.
I actually tried a few ways to organize the packages and ended up the current way, which puts all the sql related stuff in the same package. It seems making the code more readable because I don't have to jump over places, and also supporting multiple Flink versions becomes easier as I just need to add a single package to the source-override dir.
So it seems better to keep FlinkSQLTransformTranslator
in the current package, but make it a separate class. In the future, if we need to support multiple versions, potentially we just need to overwrite this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. In beam typically the transform and translator are separated due to the transform is engine-agnostic. Since this transform is for Flink only, I guess making it in the same package doesn't matter too much.
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSql.java
Outdated
Show resolved
Hide resolved
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSql.java
Outdated
Show resolved
Hide resolved
* | ||
* @param name the name of the table to be set as the main output of this Sql transform. | ||
*/ | ||
FlinkSql<InputT, OutputT> withMainOutputTableName(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As commented above, seems using a default should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method only needs to be called if there are multiple queries. In that case, users will need specify a result table from a query as the main output. If there is only one query in the transform, the result table of that query will be treated as the main output by default, so users don't need to call this method.
schemaRegistry.getFromRowFunction(outputTypeDescriptor)); | ||
} catch (NoSuchSchemaException e) { | ||
try { | ||
out.setCoder(coderRegistry.getCoder(outputTypeDescriptor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check the type is Beam row or not to set up either the schema coder or the specific type coder? Seems if the user uses the specific types, the schema coder will do extra serde to get to the types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure I understand that you mean. This piece of logic is the same as in ParDo.SingleOutput#expand()
. From what I understand, it looks up the coder in the SchemaRegistry first, then CoderRegistry. If the type is a Beam row, would the coder be available in the SchemaRegistry already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the thing I thought about: after applying the FlinkSQLTransform. there can be two apis to use:
-
use typed api:
input.apply(FlinkSQLTransform).apply(MapElements.of(....)).apply(Count.perKey()) -
use schema api:
input.apply(FlinkSQLTransform).apply(Select.fieldNames(..)).apply(Group.byField())
This patch will set the coder to use schema coder, so the second case should be supported naturally. I was thinking about the first case. It still should work as normal given Beam can use schema to serde the record for grouping.
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/TableInfo.java
Show resolved
Hide resolved
RecordsVerifier<T> recordsVerifier = new RecordsVerifier<>(file, clazz); | ||
final int expectedNumRecords = recordsVerifier.expectedRecords.size(); | ||
|
||
pCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why we are not using PAssert to verify the content of pCollection? Seems easier that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I forgot that.
import org.junit.Test; | ||
|
||
/** The unit tests for the Flink SQL PTransform. */ | ||
public class FlinkSqlPTransformTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add another test to have Row based transform, e..g Beam schema transform like Select or Filter, to make sure we can use those too? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think those are supported because Beam Row is not supported in Flink SQL to begin with.
If the records of the input PCollection are Beam Rows, the entire record will be converted to a single column of type RAW
, and it is up the user to further parse this Beam Row to a Flink Row. We can potentially provide a built-in UDTF to do this, but that is something orthogonal to the SQL PTransform itself. And after the Beam Row is converted to a Flink Row, all the select or filter will be using Flink SQL instead of Beam Select / Filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, seems you already registered the schema coder for the output PCollection. That's all needed for supporting schema Transforms. I asked here so we can verify that the output schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! In the middle still...
Two main comments/questions:
a) if FlinkSQL is an end-user facing PTransform, would suggest keeping it engine agnostic
b) if FlinkSQL is always reading a physical data source as the input, shouldn't we model it as a BeamIO?
* <p>See {@link SingleOutputSqlTransform} for more details regarding how to specify the input | ||
* table, output table and SQL logic. | ||
*/ | ||
public class FlinkSql<InputT, OutputT> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: is this FlinkSql class exposed to Beam API users? If it is, my concern is that this is leakage of abstraction of underneath engine details to the user. Shouldn't we go w/ a more generic SqlPTransform name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point. Will update the class name to Sql
instead.
* .withAdditionalOutputTable(new TupleTag<ProductAndSales>("SalesByProduct") {}); | ||
* | ||
* PCollectionTuple outputs = pipeline | ||
* .apply("DummyInput", Create.empty(TextualIntegerCoder.of())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we require a dummy input in this pipeline? As I realize from earlier discussion on the reader functions needed to implement this SQLPTransform, this SQLPTransform is designed to directly read from data sources, right? Shouldn't we just provide a SqlIO instead?
* @param ddl the table definition | ||
* @return this {@link SingleOutputSqlTransform} itself. | ||
*/ | ||
public SingleOutputSqlTransform<InputT, OutputT> withDDL(String ddl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed offline, we should add withCatalog()
API as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xinyuiscool @nickpan47 Thanks for the review. I just updated the patch to address most of the comments, including:
- Rename
FlinkSql
toSqlTransform
- Add catalog support (both via DDL and via catalog instance)
- Add UDF support.
- Avoid letting user create dummy input PCollection.
- Some other class structure clean up.
The main thing left is whether we should expose TypeInformation. I don't have a strong opinion on this. There are pros and cons. Once we make the decision I'll update the patch.
* <p>See {@link SingleOutputSqlTransform} for more details regarding how to specify the input | ||
* table, output table and SQL logic. | ||
*/ | ||
public class FlinkSql<InputT, OutputT> implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point. Will update the class name to Sql
instead.
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSql.java
Outdated
Show resolved
Hide resolved
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSql.java
Outdated
Show resolved
Hide resolved
* | ||
* @param name the name of the table to be set as the main output of this Sql transform. | ||
*/ | ||
FlinkSql<InputT, OutputT> withMainOutputTableName(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method only needs to be called if there are multiple queries. In that case, users will need specify a result table from a query as the main output. If there is only one query in the transform, the result table of that query will be treated as the main output by default, so users don't need to call this method.
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/FlinkSql.java
Outdated
Show resolved
Hide resolved
runners/flink/1.15/src/main/java/org/apache/beam/runners/flink/transform/sql/TableInfo.java
Show resolved
Hide resolved
schemaRegistry.getFromRowFunction(outputTypeDescriptor)); | ||
} catch (NoSuchSchemaException e) { | ||
try { | ||
out.setCoder(coderRegistry.getCoder(outputTypeDescriptor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure I understand that you mean. This piece of logic is the same as in ParDo.SingleOutput#expand()
. From what I understand, it looks up the coder in the SchemaRegistry first, then CoderRegistry. If the type is a Beam row, would the coder be available in the SchemaRegistry already?
import org.junit.Test; | ||
|
||
/** The unit tests for the Flink SQL PTransform. */ | ||
public class FlinkSqlPTransformTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think those are supported because Beam Row is not supported in Flink SQL to begin with.
If the records of the input PCollection are Beam Rows, the entire record will be converted to a single column of type RAW
, and it is up the user to further parse this Beam Row to a Flink Row. We can potentially provide a built-in UDTF to do this, but that is something orthogonal to the SQL PTransform itself. And after the Beam Row is converted to a Flink Row, all the select or filter will be using Flink SQL instead of Beam Select / Filter.
RecordsVerifier<T> recordsVerifier = new RecordsVerifier<>(file, clazz); | ||
final int expectedNumRecords = recordsVerifier.expectedRecords.size(); | ||
|
||
pCollection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I forgot that.
* Transform. | ||
* @see StreamTableEnvironment#createTemporaryView(String, DataStream) | ||
*/ | ||
FlinkSql<InputT, OutputT> withMainInputTableTypeInformation(TypeInformation<InputT> typeInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not exposed to the end user. But the user facing API does ask for the TypeInformation
. The purpose of letting users to provide TypeInformation
is to make Beam record -> Flink Table row conversion more efficient.
I agree that it would be good if we can hide this, but that may come at a cost.
More specifically, Flink has many different types of TypeInformation
, including AtomicType
, ObjectArrayType
, BasicArrayType
, ListType
, CompositeType
, etc. Using a specific type of TypeInformation
will make the java object to row conversion more efficient.
For example, if a class is a POJO, then PojoTypeInfo
will be used, and it is a CompositeType
, so when performing DataStream -> Table conversion, Flink will be able to convert the POJO objects to Flink Table Rows directly by having each POJO field as a Table column. For Avro records, we can use AvroTypeInfo
, which is also a CompositeType
.
If we let user only specify the Beam TypeDescriptor
/ Coder
here, then we will only have AtomicType
because the CoderTypeInformation
which wraps the Beam Coder
is an AtomicType
. That means the each record in Beam PCollection
will be put as a whole into a single column with the column data type of RAW
. And users will need to use a UDTF to expand a RAW record into multiple columns, which is an extra step. This introduces additional data format conversion cost.
There might be one way to hide TypeInformation
without compromising performance for a set of records type is following (taking Avro record as an example):
- let users pass in a
Coder
here - Introduce a new
Coder
type calledAvroCoder
which wraps anAvroTypeInfo
. Users will callAvroCoder.of(MyAvroSpecificRecord.class)
to create theAvroCoder
. - In the SQL transform translator, check and see if the Coder is an instance of
AvroCoder
. If so, use theTypeInformation
of the specific record.
The downside of this solution is any record type that wants to be accelerated needs a framework level code change. And the acceleration contract is kind of implicit.
We will be facing similar situation for DataType
of the output table as well, if we want to also hide DataType
from users.
I am leaving the code unchanged as of now. Once we decide which way to go, I can update the patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
44142c9
to
5ff7c04
Compare
5ff7c04
to
7cebd2b
Compare
This reverts commit 4551072.
This patch introduces a new PTransform implementation to support Flink SQL.
So far the Flink SQL is only supported in batch mode. This is because the beam windowing strategy is difficult to apply to the Flink SQL operators.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.