diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_where_delete_expr.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_where_delete_expr.groovy index 3870876a57f3d9..d40b66ad4a8f33 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load_where_delete_expr.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_where_delete_expr.groovy @@ -31,17 +31,56 @@ suite("test_stream_load_where_delete_expr", "p0") { ( "replication_num" = "1" );""" + + def uniquePartitionSql = """CREATE TABLE ${tableName}( + user_id BIGINT NOT NULL COMMENT "用户 ID", + age INT COMMENT "用户年龄", + name VARCHAR(20) COMMENT "用户姓名" + + ) + ENGINE=OLAP + UNIQUE KEY(user_id,age) + PARTITION BY RANGE(`age`) + ( + PARTITION `p1` VALUES LESS THAN ("35"), + PARTITION `p2` VALUES LESS THAN ("65") + ) + DISTRIBUTED BY HASH(user_id) BUCKETS 10 + PROPERTIES + ( + "replication_num" = "1" + );""" + + def dupPartitionSql = """CREATE TABLE ${tableName}( + user_id BIGINT NOT NULL COMMENT "用户 ID", + age INT COMMENT "用户年龄", + name VARCHAR(20) COMMENT "用户姓名" + ) + ENGINE=OLAP + DUPLICATE KEY(user_id,age) + PARTITION BY RANGE(`age`) + ( + PARTITION `p1` VALUES LESS THAN ("35"), + PARTITION `p2` VALUES LESS THAN ("65") + ) + DISTRIBUTED BY HASH(user_id) BUCKETS 10 + PROPERTIES + ( + "replication_num" = "1" + );""" def dupSql = """CREATE TABLE ${tableName}( user_id BIGINT NOT NULL COMMENT "用户 ID", name VARCHAR(20) COMMENT "用户姓名", age INT COMMENT "用户年龄" ) - DUPLICATE KEY(user_id) - DISTRIBUTED BY HASH(user_id) BUCKETS 10 + DUPLICATE KEY(user_id) + DISTRIBUTED BY HASH(user_id) BUCKETS 10 PROPERTIES ( "replication_num" = "1" );""" + + sql """ DROP TABLE IF EXISTS ${tableName} """ sql uniqueSql streamLoad { @@ -182,7 +221,162 @@ suite("test_stream_load_where_delete_expr", "p0") { assertEquals(5, json.NumberLoadedRows) } } - + + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql uniquePartitionSql + streamLoad { + table "${tableName}" + set 'columns', 'user_id, name, age' + set 'column_separator', ',' + set 'column_separator', ',' + set 'partitions', 'p1, p2' + set 'max_filter_ratio', '0.5' + set 'where', 'age>=35' + + file 'streamload_2.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + } + } + + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql uniquePartitionSql + + streamLoad { + table "${tableName}" + set 'columns', 'user_id, name, age' + set 'column_separator', ',' + set 'where', 'age>=35 or name="Olivia"' + set 'partitions', 'p1, p2' + set 'max_filter_ratio', '0.5' + file 'streamload_2.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(5, json.NumberLoadedRows) + } + } + + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql uniquePartitionSql + + streamLoad { + table "${tableName}" + set 'columns', 'user_id, name, age' + set 'column_separator', ',' + set 'partitions', 'p1, p2' + set 'max_filter_ratio', '0.5' + file 'streamload_1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(9, json.NumberLoadedRows) + } + } + + + streamLoad { + table "${tableName}" + set 'columns', 'user_id, name, age' + set 'column_separator', ',' + set 'merge_type', 'DELETE' + file 'streamload_3.csv' + set 'partitions', 'p1, p2' + set 'max_filter_ratio', '0.5' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(8, json.NumberLoadedRows) + } + } + + + /** + * dupPartition + */ + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql dupPartitionSql + streamLoad { + table "${tableName}" + set 'columns', 'user_id, name, age' + set 'column_separator', ',' + set 'where', 'age>=35' + set 'partitions', 'p1, p2' + set 'max_filter_ratio', '0.5' + file 'streamload_2.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + } + } + + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql dupPartitionSql + + streamLoad { + table "${tableName}" + set 'columns', 'user_id, name, age' + set 'column_separator', ',' + set 'where', 'age>=35 or name="Olivia"' + set 'partitions', 'p1, p2' + set 'max_filter_ratio', '0.5' + file 'streamload_2.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(10, json.NumberTotalRows) + assertEquals(5, json.NumberLoadedRows) + } + } + }