diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java index 46e7fa7f4af5..a31df4811af5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java @@ -167,7 +167,7 @@ protected Map extractRowData( Map resultMap = new HashMap<>(); linkedHashMap.forEach( (key, value) -> { - paimonFieldTypes.put(key, DataTypes.STRING()); + paimonFieldTypes.put(applyCaseSensitiveFieldName(key), DataTypes.STRING()); resultMap.put(key, value); }); @@ -177,7 +177,9 @@ protected Map extractRowData( resultMap.put( computedColumn.columnName(), computedColumn.eval(resultMap.get(computedColumn.fieldReference()))); - paimonFieldTypes.put(computedColumn.columnName(), computedColumn.columnType()); + paimonFieldTypes.put( + applyCaseSensitiveFieldName(computedColumn.columnName()), + computedColumn.columnType()); }); return resultMap; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java index c49fbc430ed7..42ac8c948fdd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java @@ -89,7 +89,7 @@ protected void extractFieldTypesFromDatabaseSchema() { .forEachRemaining( fieldName -> { String fieldType = schema.get(fieldName).asText(); - fieldTypes.put(applyCaseSensitiveFieldName(fieldName), fieldType); + fieldTypes.put(fieldName, fieldType); }); this.fieldTypes = fieldTypes; } @@ -131,7 +131,9 @@ protected LinkedHashMap setPaimonFieldType() { LinkedHashMap paimonFieldTypes = new LinkedHashMap<>(); fieldTypes.forEach( (name, type) -> - paimonFieldTypes.put(name, MySqlTypeUtils.toDataType(type, typeMapping))); + paimonFieldTypes.put( + applyCaseSensitiveFieldName(name), + MySqlTypeUtils.toDataType(type, typeMapping))); return paimonFieldTypes; } @@ -168,7 +170,9 @@ protected Map extractRowData( JsonNode record, LinkedHashMap paimonFieldTypes) { fieldTypes.forEach( (name, type) -> - paimonFieldTypes.put(name, MySqlTypeUtils.toDataType(type, typeMapping))); + paimonFieldTypes.put( + applyCaseSensitiveFieldName(name), + MySqlTypeUtils.toDataType(type, typeMapping))); Map jsonMap = OBJECT_MAPPER.convertValue(record, new TypeReference>() {}); if (jsonMap == null) { @@ -191,7 +195,9 @@ protected Map extractRowData( resultMap.put( computedColumn.columnName(), computedColumn.eval(resultMap.get(computedColumn.fieldReference()))); - paimonFieldTypes.put(computedColumn.columnName(), computedColumn.columnType()); + paimonFieldTypes.put( + applyCaseSensitiveFieldName(computedColumn.columnName()), + computedColumn.columnType()); } return resultMap; } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java index 79c01d474e7e..d7a2bbc8fd03 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.kafka; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.testutils.assertj.AssertionUtils; import org.apache.paimon.types.DataType; @@ -570,4 +571,42 @@ public void testCatalogAndTableConfig() { assertThat(action.tableConfig()) .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value")); } + + @Test + @Timeout(60) + public void testCaseInsensitive() throws Exception { + final String topic = "case-insensitive"; + createTestTopic(topic, 1, 1); + + // ---------- Write the Canal json into Kafka ------------------- + writeRecordsToKafka( + topic, readLines("kafka/canal/database/case-insensitive/canal-data-1.txt")); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "canal-json"); + kafkaConfig.put("topic", topic); + + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .withCatalogConfig( + Collections.singletonMap( + CatalogOptions.METASTORE.key(), "test-case-insensitive")) + .build(); + runActionWithDefaultEnv(action); + + waitingTables("t1"); + FileStoreTable table = getFileStoreTable("t1"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT() + }, + new String[] {"k1", "v0", "v1"}); + waitForResult( + Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"), + table, + rowType, + Collections.singletonList("k1")); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java index c169b89489ba..538b109a292e 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncDatabaseActionITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.kafka; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.testutils.assertj.AssertionUtils; import org.apache.paimon.types.DataType; @@ -462,4 +463,46 @@ private void includingAndExcludingTablesImpl( waitingTables(existedTables); assertTableNotExists(notExistedTables); } + + @Test + @Timeout(60) + public void testCaseInsensitive() throws Exception { + final String topic = "case-insensitive"; + createTestTopic(topic, 1, 1); + + // ---------- Write the ogg json into Kafka ------------------- + + writeRecordsToKafka(topic, readLines("kafka/ogg/database/case-insensitive/ogg-data-1.txt")); + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "ogg-json"); + kafkaConfig.put("topic", topic); + + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .withCatalogConfig( + Collections.singletonMap( + CatalogOptions.METASTORE.key(), "test-case-insensitive")) + .build(); + runActionWithDefaultEnv(action); + + waitingTables("t1"); + FileStoreTable table = getFileStoreTable("t1"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys1 = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]", + "+I[102, car battery, 12V car battery, 8.100000381469727]"); + waitForResult(expected, table, rowType, primaryKeys1); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java index 2a06b4fc41e7..deb11739a3a9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java @@ -18,10 +18,7 @@ package org.apache.paimon.flink.action.cdc.mongodb; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.flink.action.ActionBase; -import org.apache.paimon.flink.action.cdc.mysql.TestCaseInsensitiveCatalogFactory; -import org.apache.paimon.fs.Path; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -30,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import java.lang.reflect.Field; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -187,9 +183,6 @@ public void testMongoDBNestedDataSynchronizationAndVerification() throws Excepti @Test @Timeout(60) public void testDynamicTableCreationInMongoDB() throws Exception { - catalog = - new TestCaseInsensitiveCatalogFactory() - .createCatalog(CatalogContext.create(new Path(warehouse))); String dbName = database + UUID.randomUUID(); writeRecordsToMongoDB("test-data-5", dbName, "database"); Map mongodbConfig = getBasicMongoDBConfig(); @@ -197,11 +190,10 @@ public void testDynamicTableCreationInMongoDB() throws Exception { MongoDBSyncDatabaseAction action = syncDatabaseActionBuilder(mongodbConfig) .withTableConfig(getBasicTableConfig()) + .withCatalogConfig( + Collections.singletonMap( + CatalogOptions.METASTORE.key(), "test-case-insensitive")) .build(); - Field catalogField = ActionBase.class.getDeclaredField("catalog"); - catalogField.setAccessible(true); - Object newCatalog = catalog; - catalogField.set(action, newCatalog); runActionWithDefaultEnv(action); waitingTables("t3"); diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt new file mode 100644 index 000000000000..ebf431813cfd --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"data":[{"k1":"5","v0":"five","V1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"} +{"data":[{"K1":"7","v0":"seven","V1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"K1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/database/case-insensitive/ogg-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/database/case-insensitive/ogg-data-1.txt new file mode 100644 index 000000000000..957a40d8fc10 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/ogg/database/case-insensitive/ogg-data-1.txt @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +{"table":"PAIMON_SYNC_DATABASE.T1","pos":"00000000000000000000143","primary_keys":["ID"],"after":{"ID":101,"NAME":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"op_type":"I", "current_ts":"2020-05-13T13:39:35.766000", "op_ts":"2020-05-13 15:40:06.000000"} +{"table":"PAIMON_SYNC_DATABASE.T1","pos":"00000000000000000000144","primary_keys":["id"],"after":{"ID":102,"name":"car battery","description":"12V car battery","WEIGHT":8.100000381469727},"op_type":"I","op_ts":"2020-05-13 15:40:07.000000"} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java index 55a0bc0039e1..0d111933fbb6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordSchemaBuilder.java @@ -21,8 +21,12 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.types.DataType; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion; /** Schema builder for {@link RichCdcMultiplexRecord}. */ public class RichCdcMultiplexRecordSchemaBuilder @@ -44,12 +48,16 @@ public Optional build(RichCdcMultiplexRecord record) { for (Map.Entry entry : record.fieldTypes().entrySet()) { builder.column( - caseSensitive ? entry.getKey() : entry.getKey().toLowerCase(), - entry.getValue(), - null); + caseSensitiveConversion(entry.getKey(), caseSensitive), entry.getValue(), null); } - Schema schema = builder.primaryKey(record.primaryKeys()).build(); + List primaryKeys = + caseSensitive + ? record.primaryKeys() + : record.primaryKeys().stream() + .map(String::toLowerCase) + .collect(Collectors.toList()); + Schema schema = builder.primaryKey(primaryKeys).build(); return Optional.of(schema); }