-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36879][runtime] Support to convert delete as insert in transform #3787
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @ruanhang1993's nice work, just left some trivial comments.
@@ -316,6 +317,10 @@ private TransformDef toTransformDef(JsonNode transformNode) { | |||
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY)) | |||
.map(JsonNode::asText) | |||
.orElse(null); | |||
String convertDeleteAsInsert = | |||
Optional.ofNullable(transformNode.get(TRANSFORM_CONVERT_DELETE_AS_INSERT_KEY)) | |||
.map(JsonNode::asText) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can simply call JsonNode::asBoolean
here
@@ -74,4 +77,8 @@ public String getPartitionKey() { | |||
public String getTableOption() { | |||
return tableOption; | |||
} | |||
|
|||
public boolean isconvertDeleteAsInsert() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public boolean isconvertDeleteAsInsert() { | |
public boolean isConvertDeleteAsInsert() { |
@@ -448,6 +455,17 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha | |||
} | |||
} | |||
|
|||
private Optional<DataChangeEvent> concertDeleteAsInsert(DataChangeEvent dataChangeEvent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private Optional<DataChangeEvent> concertDeleteAsInsert(DataChangeEvent dataChangeEvent) { | |
private Optional<DataChangeEvent> convertDeleteAsInsert(DataChangeEvent dataChangeEvent) { |
Typo
@@ -50,4 +53,8 @@ public Optional<TransformProjection> getProjection() { | |||
public Optional<TransformFilter> getFilter() { | |||
return filter; | |||
} | |||
|
|||
public boolean isconvertDeleteAsInsert() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public boolean isconvertDeleteAsInsert() { | |
public boolean isConvertDeleteAsInsert() { |
Use camelCase
@@ -47,6 +47,7 @@ transform: | |||
partition-keys: product_name | |||
table-options: comment=app order | |||
description: project fields from source table | |||
convert-delete-as-insert: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs in Core Concepts / Transform needs to be updated, too
Support to convert delete as insert in transform