Skip to content

Commit

Permalink
fix(mssql-cdc): support case sensitve db, schema, table name (#18032)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Aug 14, 2024
1 parent 3a856bb commit 69422aa
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 9 deletions.
71 changes: 70 additions & 1 deletion e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@ control substitution on
system ok
sqlcmd -C -d master -Q 'create database mydb;' -b

system ok
sqlcmd -C -d master -Q 'create database UpperDB COLLATE SQL_Latin1_General_CP1_CS_AS;' -b

system ok
sqlcmd -C -i e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_prepare.sql -b

system ok
sqlcmd -C -d UpperDB -Q "CREATE SCHEMA UpperSchema;" -b

system ok
sqlcmd -C -d UpperDB -Q "EXEC sys.sp_cdc_enable_db; CREATE TABLE UpperSchema.UpperTable (ID INT PRIMARY KEY, Name VARCHAR(100)); EXEC sys.sp_cdc_enable_table @source_schema = 'UpperSchema', @source_name = 'UpperTable', @role_name = NULL; INSERT INTO UpperSchema.UpperTable VALUES (1, 'Alice');" -b

# ------------ validate stage ------------

# invalid address, comment this test out because it takes long to wait for TCP connection timeout.
Expand Down Expand Up @@ -114,6 +123,17 @@ CREATE TABLE sqlserver_all_data_types (
database.name = '${SQLCMDDBNAME}',
);

# invalid dbname
statement error does not match db_name
CREATE SOURCE upper_mssql_source WITH (
connector = 'sqlserver-cdc',
hostname = '${SQLCMDSERVER:sqlserver-server}',
port = '${SQLCMDPORT:1433}',
username = '${SQLCMDUSER:SA}',
password = '${SQLCMDPASSWORD}',
database.name = 'upperdb',
);

# ------------ Create source/table/mv stage ------------
# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
Expand All @@ -126,6 +146,16 @@ CREATE SOURCE mssql_source WITH (
database.name = '${SQLCMDDBNAME}',
);

statement ok
CREATE SOURCE upper_mssql_source WITH (
connector = 'sqlserver-cdc',
hostname = '${SQLCMDSERVER:sqlserver-server}',
port = '${SQLCMDPORT:1433}',
username = '${SQLCMDUSER:SA}',
password = '${SQLCMDPASSWORD}',
database.name = 'UpperDB',
);

statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mssql_source;

Expand Down Expand Up @@ -250,6 +280,34 @@ CREATE TABLE shared_sqlserver_all_data_types (
PRIMARY KEY (id)
) from mssql_source table 'dbo.sqlserver_all_data_types';

statement error Sql Server table 'UpperSchema'.'UpperTable' doesn't exist in 'mydb'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from mssql_source table 'UpperSchema.UpperTable';

statement error Column 'name' not found in the upstream database
CREATE TABLE upper_table (
"ID" INT,
name VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'UpperSchema.UpperTable';

statement error Sql Server table 'upperSchema'.'upperTable' doesn't exist in 'UpperDB'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'upperSchema.upperTable';

statement ok
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'UpperSchema.UpperTable';

statement ok
create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders;

Expand Down Expand Up @@ -307,6 +365,9 @@ SELECT * from shared_sqlserver_all_data_types order by id;
system ok
sqlcmd -C -i e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc_insert.sql -b

system ok
sqlcmd -C -d UpperDB -Q "INSERT INTO UpperSchema.UpperTable VALUES (11, 'Alice');" -b

sleep 10s

# ------------ recover cluster ------------
Expand All @@ -332,7 +393,6 @@ select cnt from shared_sqlserver_all_data_types_cnt;
----
6


query III
select * from shared_orders order by order_id;
----
Expand All @@ -359,6 +419,15 @@ SELECT * from shared_sqlserver_all_data_types order by id;
12 t 255 -32768 -2147483648 -9223372036854775808 -10 -10000 -10000 aa \xff 1990-01-01 13:59:59.123 2000-01-01 11:00:00.123 1990-01-01 00:00:01.123+00:00
13 t 127 32767 2147483647 9223372036854775807 -10 10000 10000 zzzz \xffffffff 2999-12-31 23:59:59.999 2099-12-31 23:59:59.999 2999-12-31 23:59:59.999+00:00

query TT
SELECT * from upper_table order by "ID";
----
1 Alice
11 Alice

# ------------ drop stage ------------
statement ok
drop source upper_mssql_source cascade;

statement ok
drop source mssql_source cascade;
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void validateDbConfig() {
throw ValidatorUtils.invalidArgument(
"Sql Server's DB_NAME() '"
+ res.getString(1)
+ "' does not match db_name'"
+ "' does not match db_name '"
+ dbName
+ "'.");
}
Expand Down Expand Up @@ -142,8 +142,8 @@ private void validateTableSchema() throws SQLException {
if (res.getInt(1) == 0) {
throw ValidatorUtils.invalidArgument(
String.format(
"Sql Server table '%s'.'%s' doesn't exist",
schemaName, tableName));
"Sql Server table '%s'.'%s' doesn't exist in '%s'",
schemaName, tableName, dbName));
}
}
}
Expand Down Expand Up @@ -182,7 +182,20 @@ private void validateTableSchema() throws SQLException {
throw ValidatorUtils.invalidArgument("Primary key mismatch");
}
}

// Check whether the db is case-sensitive
boolean isCaseSensitive = false;
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("sqlserver.case.sensitive"))) {
stmt.setString(1, this.dbName);
var res = stmt.executeQuery();
while (res.next()) {
var caseSensitive = res.getInt(2);
if (caseSensitive == 1) {
isCaseSensitive = true;
}
}
}
// Check whether source schema match table schema on upstream
// All columns defined must exist in upstream database
try (var stmt =
Expand All @@ -196,15 +209,19 @@ private void validateTableSchema() throws SQLException {
while (res.next()) {
var field = res.getString(1);
var dataType = res.getString(2);
schema.put(field.toLowerCase(), dataType);
if (isCaseSensitive) {
schema.put(field, dataType);
} else {
schema.put(field.toLowerCase(), dataType);
}
}

for (var e : tableSchema.getColumnTypes().entrySet()) {
// skip validate internal columns
if (e.getKey().startsWith(ValidatorUtils.INTERNAL_COLUMN_PREFIX)) {
continue;
}
var dataType = schema.get(e.getKey().toLowerCase());
var dataType = schema.get(isCaseSensitive ? e.getKey() : e.getKey().toLowerCase());
if (dataType == null) {
throw ValidatorUtils.invalidArgument(
"Column '" + e.getKey() + "' not found in the upstream database");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ GROUP BY r1.rolname \
tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \
SELECT ARRAY_AGG(members) AS members FROM tmp
sqlserver.db.cdc.enabled=SELECT name, is_cdc_enabled FROM sys.databases WHERE name = DB_NAME()
sqlserver.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
sqlserver.table=SELECT count(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
sqlserver.table.cdc.enabled=SELECT COUNT(*) FROM cdc.change_tables AS ct INNER JOIN sys.tables AS t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas AS s ON t.schema_id = s.schema_id WHERE s.name = ? AND t.name = ?
sqlserver.pk=SELECT k.column_name FROM information_schema.table_constraints t INNER JOIN information_schema.key_column_usage k ON t.constraint_name = k.constraint_name AND t.table_name = k.table_name WHERE t.constraint_type = 'PRIMARY KEY' AND t.table_schema = ? AND t.table_name = ?
sqlserver.pk=SELECT k.column_name FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS t INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE k ON t.constraint_name = k.constraint_name AND t.table_name = k.table_name WHERE t.constraint_type = 'PRIMARY KEY' AND t.table_schema = ? AND t.table_name = ?
sqlserver.table_schema=SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION
sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT', 'OBJECT', 'SELECT') FROM cdc.change_tables AS ct INNER JOIN sys.tables AS t ON ct.source_object_id = t.object_id INNER JOIN sys.schemas AS s ON t.schema_id = s.schema_id WHERE s.name = ? AND t.name = ?
sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn()
sqlserver.case.sensitive=WITH collations AS (SELECT name, CASE WHEN description like '%case-insensitive%' THEN 0 WHEN description like '%case-sensitive%' THEN 1 END isCaseSensitive FROM sys.fn_helpcollations()) SELECT * FROM collations WHERE name = CONVERT(varchar, DATABASEPROPERTYEX( ? ,'collation'));
citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass
postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ?

0 comments on commit 69422aa

Please sign in to comment.