Skip to content

Commit

Permalink
[mysql] Fallback to build schema by 'desc table' case when parsing DD…
Browse files Browse the repository at this point in the history
…L failure (apache#949)
  • Loading branch information
Cleverdada authored Mar 16, 2022
1 parent bd6936b commit 410e4fa
Show file tree
Hide file tree
Showing 4 changed files with 912 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges.TableChange;
import io.debezium.schema.SchemaChangeEvent;
import org.apache.commons.lang3.StringUtils;

import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -40,6 +42,9 @@

/** A component used to get schema by table path. */
public class MySqlSchema {
private static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE ";
private static final String DESC_TABLE = "DESC ";

private final MySqlConnectorConfig connectorConfig;
private final MySqlDatabaseSchema databaseSchema;
private final Map<TableId, TableChange> schemasByTableId;
Expand All @@ -58,7 +63,7 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
// read schema from cache first
TableChange schema = schemasByTableId.get(tableId);
if (schema == null) {
schema = readTableSchema(jdbc, tableId);
schema = buildTableSchema(jdbc, tableId);
schemasByTableId.put(tableId, schema);
}
return schema;
Expand All @@ -68,38 +73,92 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
// Helpers
// ------------------------------------------------------------------------------------------

private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
private TableChange buildTableSchema(JdbcConnection jdbc, TableId tableId) {
final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
final String sql = "SHOW CREATE TABLE " + quote(tableId);
String showCreateTable = SHOW_CREATE_TABLE + quote(tableId);
buildSchemaByShowCreateTable(jdbc, tableId, tableChangeMap);
if (!tableChangeMap.containsKey(tableId)) {
// fallback to desc table
String descTable = DESC_TABLE + quote(tableId);
buildSchemaByDescTable(jdbc, descTable, tableId, tableChangeMap);
if (!tableChangeMap.containsKey(tableId)) {
throw new FlinkRuntimeException(
String.format(
"Can't obtain schema for table %s by running %s and %s ",
tableId, showCreateTable, descTable));
}
}
return tableChangeMap.get(tableId);
}

private void buildSchemaByShowCreateTable(
JdbcConnection jdbc, TableId tableId, Map<TableId, TableChange> tableChangeMap) {
final String sql = SHOW_CREATE_TABLE + quote(tableId);
try {
jdbc.query(
sql,
rs -> {
if (rs.next()) {
final String ddl = rs.getString(2);
final MySqlOffsetContext offsetContext =
MySqlOffsetContext.initial(connectorConfig);
List<SchemaChangeEvent> schemaChangeEvents =
databaseSchema.parseSnapshotDdl(
ddl, tableId.catalog(), offsetContext, Instant.now());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
for (TableChange tableChange :
schemaChangeEvent.getTableChanges()) {
tableChangeMap.put(tableId, tableChange);
}
}
parseSchemaByDdl(ddl, tableId, tableChangeMap);
}
});
} catch (SQLException e) {
throw new FlinkRuntimeException(
String.format("Failed to read schema for table %s by running %s", tableId, sql),
e);
}
if (!tableChangeMap.containsKey(tableId)) {
throw new FlinkRuntimeException(
String.format("Can't obtain schema for table %s by running %s", tableId, sql));
}

private void parseSchemaByDdl(
String ddl, TableId tableId, Map<TableId, TableChange> tableChangeMap) {
final MySqlOffsetContext offsetContext = MySqlOffsetContext.initial(connectorConfig);
List<SchemaChangeEvent> schemaChangeEvents =
databaseSchema.parseSnapshotDdl(
ddl, tableId.catalog(), offsetContext, Instant.now());
for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) {
for (TableChange tableChange : schemaChangeEvent.getTableChanges()) {
tableChangeMap.put(tableId, tableChange);
}
}
}

return tableChangeMap.get(tableId);
private void buildSchemaByDescTable(
JdbcConnection jdbc,
String descTable,
TableId tableId,
Map<TableId, TableChange> tableChangeMap) {
List<MySqlFieldDefinition> fieldMetas = new ArrayList<>();
List<String> primaryKeys = new ArrayList<>();
try {
jdbc.query(
descTable,
rs -> {
while (rs.next()) {
MySqlFieldDefinition meta = new MySqlFieldDefinition();
meta.setColumnName(rs.getString("Field"));
meta.setColumnType(rs.getString("Type"));
meta.setNullable(
StringUtils.equalsIgnoreCase(rs.getString("Null"), "YES"));
meta.setKey("PRI".equalsIgnoreCase(rs.getString("Key")));
meta.setUnique("UNI".equalsIgnoreCase(rs.getString("Key")));
meta.setDefaultValue(rs.getString("Default"));
meta.setExtra(rs.getString("Extra"));
if (meta.isKey()) {
primaryKeys.add(meta.getColumnName());
}
fieldMetas.add(meta);
}
});
parseSchemaByDdl(
new MySqlTableDefinition(tableId, fieldMetas, primaryKeys).toDdl(),
tableId,
tableChangeMap);
} catch (SQLException e) {
throw new FlinkRuntimeException(
String.format(
"Failed to read schema for table %s by running %s", tableId, descTable),
e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.schema;

import org.apache.flink.util.CollectionUtil;

import com.ververica.cdc.connectors.mysql.source.utils.StatementUtils;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.stream.Collectors;

import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote;

/** used to generate table definition in ddl with "desc table". */
public class MySqlTableDefinition {
TableId tableId;
List<MySqlFieldDefinition> fieldDefinitions;
List<String> primaryKeys;

public MySqlTableDefinition(
TableId tableId,
List<MySqlFieldDefinition> fieldDefinitions,
List<String> primaryKeys) {
this.tableId = tableId;
this.fieldDefinitions = fieldDefinitions;
this.primaryKeys = primaryKeys;
}

String toDdl() {
return String.format(
"CREATE TABLE %s (\n\t %s %s );",
quote(tableId), fieldDefinitions(), pkDefinition());
}

private String fieldDefinitions() {
return fieldDefinitions.stream()
.map(MySqlFieldDefinition::toDdl)
.collect(Collectors.joining(", \n\t"));
}

private String pkDefinition() {
StringBuilder pkDefinition = new StringBuilder();
if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
pkDefinition.append(",");
pkDefinition.append(
String.format(
"PRIMARY KEY ( %s )",
primaryKeys.stream()
.map(StatementUtils::quote)
.collect(Collectors.joining(","))));
}
return pkDefinition.toString();
}
}

/** used to generate field definition in ddl with "desc table". */
class MySqlFieldDefinition {
private String columnName;
private String columnType;
private boolean nullable;
private boolean key;
private String defaultValue;
private String extra;
private boolean unique;

public String getColumnName() {
return columnName;
}

public void setColumnName(String columnName) {
this.columnName = columnName;
}

public String getColumnType() {
return columnType;
}

public void setColumnType(String columnType) {
this.columnType = columnType;
}

public void setNullable(boolean nullable) {
this.nullable = nullable;
}

public String getDefaultValue() {
return StringUtils.isEmpty(defaultValue) ? "" : "DEFAULT " + defaultValue;
}

public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}

public boolean isUnsigned() {
return StringUtils.containsIgnoreCase(columnType, "unsigned");
}

public boolean isNullable() {
return nullable;
}

public boolean isKey() {
return key;
}

public void setKey(boolean key) {
this.key = key;
}

public String getExtra() {
return extra;
}

public void setExtra(String extra) {
this.extra = extra;
}

public boolean isUnique() {
return unique;
}

public void setUnique(boolean unique) {
this.unique = unique;
}

public String toDdl() {
return quote(columnName) + " " + columnType + " " + (nullable ? "" : "NOT NULL");
}
}
Loading

0 comments on commit 410e4fa

Please sign in to comment.