diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index a45b3bcbca..0c3fb5dca7 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1666,7 +1666,16 @@ String[] runNumericCastingWith(String expression) throws Exception { // Generate a projection expression like CAST(tiny AS ) AS tiny, ... private static String generateCastTo(String type) { return "id, " - + Stream.of("tiny", "small", "int", "bigint", "float", "double", "decimal") + + Stream.of( + "tiny", + "small", + "int", + "bigint", + "float", + "double", + "decimal", + "valid_char", + "invalid_char") .map(col -> String.format("CAST(%s_c AS %s) AS %s_c", col, type, col)) .collect(Collectors.joining(", ")); } @@ -1675,99 +1684,99 @@ private static String generateCastTo(String type) { void testNumericCastingsWithTruncation() throws Exception { assertThat(runNumericCastingWith("*")) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 2),`valid_char_c` VARCHAR(17),`invalid_char_c` VARCHAR(17)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11, -12.13, foo], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00, 0, bar], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11, 12.13, baz], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("BOOLEAN"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, false, false, false, false, false, false, false], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, false, false, false, false, false, false, false, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, true, true, true, true, true, true, true, false, false], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("TINYINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("SMALLINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` SMALLINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("INT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` INT,`invalid_char_c` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("BIGINT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("FLOAT"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DOUBLE"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11, -12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11, 12.13, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(1, 0)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` DECIMAL(1, 0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, null, null, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(2, 0)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` DECIMAL(2, 0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(3, 1)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` DECIMAL(3, 1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1, -12.1, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1, 12.1, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); assertThat(runNumericCastingWith(generateCastTo("DECIMAL(19, 10)"))) .containsExactly( - "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000], op=INSERT, meta=()}", - "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null], op=INSERT, meta=()}"); + "CreateTableEvent{tableId=ns.scm.tbl, schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, -6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}", + "DataChangeEvent{tableId=ns.scm.tbl, before=[], after=[2, null, null, null, null, null, null, null, null, null], op=INSERT, meta=()}"); } private List generateSchemaEvolutionEvents(TableId tableId) { @@ -1953,6 +1962,8 @@ private List generateNumericCastingEvents(TableId tableId) { .physicalColumn("float_c", DataTypes.FLOAT()) .physicalColumn("double_c", DataTypes.DOUBLE()) .physicalColumn("decimal_c", DataTypes.DECIMAL(10, 2)) + .physicalColumn("valid_char_c", DataTypes.VARCHAR(17)) + .physicalColumn("invalid_char_c", DataTypes.VARCHAR(17)) .primaryKey("id") .build(); @@ -1969,7 +1980,9 @@ private List generateNumericCastingEvents(TableId tableId) { (long) -5, -6.7f, -8.9d, - DecimalData.fromBigDecimal(new BigDecimal("-10.11"), 10, 2)))); + DecimalData.fromBigDecimal(new BigDecimal("-10.11"), 10, 2), + "-12.13", + "foo"))); events.add( DataChangeEvent.insertEvent( tableId, @@ -1982,7 +1995,9 @@ private List generateNumericCastingEvents(TableId tableId) { (long) 0, 0f, 0d, - DecimalData.fromBigDecimal(BigDecimal.ZERO, 10, 2)))); + DecimalData.fromBigDecimal(BigDecimal.ZERO, 10, 2), + "0", + "bar"))); events.add( DataChangeEvent.insertEvent( tableId, @@ -1995,11 +2010,15 @@ private List generateNumericCastingEvents(TableId tableId) { (long) 5, 6.7f, 8.9d, - DecimalData.fromBigDecimal(new BigDecimal("10.11"), 10, 2)))); + DecimalData.fromBigDecimal(new BigDecimal("10.11"), 10, 2), + "12.13", + "baz"))); events.add( DataChangeEvent.insertEvent( tableId, - generate(schema, 2L, null, null, null, null, null, null, null))); + generate( + schema, 2L, null, null, null, null, null, null, null, null, + null))); } return events; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index 1ea2cdb7c2..6f6d52a3b9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -44,7 +44,10 @@ import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToDate; import static org.apache.flink.cdc.common.utils.DateTimeUtils.timestampMillisToTime; -/** System function utils to support the call of flink cdc pipeline transform. */ +/** + * System function utils to support the call of flink cdc pipeline transform.
+ * {@code castToXxx}-series function returns `null` when conversion is not viable. + */ public class SystemFunctionUtils { private static final Logger LOG = LoggerFactory.getLogger(SystemFunctionUtils.class); @@ -649,7 +652,13 @@ public static Byte castToByte(Object object) { try { return Byte.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Byte#valueOf. + } + try { return Double.valueOf(stringRep).byteValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -673,7 +682,13 @@ public static Short castToShort(Object object) { try { return Short.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Short#valueOf. + } + try { return Double.valueOf(stringRep).shortValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -697,7 +712,13 @@ public static Integer castToInteger(Object object) { try { return Integer.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Integer#valueOf. + } + try { return Double.valueOf(stringRep).intValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -721,7 +742,13 @@ public static Long castToLong(Object object) { try { return Long.valueOf(stringRep); } catch (NumberFormatException e) { + // Ignore this exception because it could still represent a valid floating point number, + // but could not be accepted by Long#valueOf. + } + try { return Double.valueOf(stringRep).longValue(); + } catch (NumberFormatException ignored) { + return null; } } @@ -741,7 +768,11 @@ public static Float castToFloat(Object object) { if (object instanceof Float) { return (Float) object; } - return Float.valueOf(castObjectIntoString(object)); + try { + return Float.valueOf(castObjectIntoString(object)); + } catch (NumberFormatException ignored) { + return null; + } } public static Double castToDouble(Object object) { @@ -760,7 +791,11 @@ public static Double castToDouble(Object object) { if (object instanceof Float) { return ((Float) object).doubleValue(); } - return Double.valueOf(castObjectIntoString(object)); + try { + return Double.valueOf(castObjectIntoString(object)); + } catch (NumberFormatException ignored) { + return null; + } } public static BigDecimal castToBigDecimal(Object object, int precision, int scale) { @@ -770,12 +805,18 @@ public static BigDecimal castToBigDecimal(Object object, int precision, int scal if (object instanceof Boolean) { object = (Boolean) object ? 1 : 0; } - BigDecimal bigDecimal = - new BigDecimal(castObjectIntoString(object), new MathContext(precision)); - bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + + BigDecimal bigDecimal; + try { + bigDecimal = new BigDecimal(castObjectIntoString(object), new MathContext(precision)); + bigDecimal = bigDecimal.setScale(scale, RoundingMode.HALF_UP); + } catch (NumberFormatException ignored) { + return null; + } + + // If the precision overflows, null will be returned. Otherwise, we may accidentally emit a + // non-serializable object into the pipeline that breaks downstream. if (bigDecimal.precision() > precision) { - // TODO: Handle malformed data properly after dirty data handling is implemented in - // Pipeline Framework. return null; } return bigDecimal;