Skip to content

Commit

Permalink
[enhance](regression-test) test_stream_load_where_delete_expr
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 committed Oct 9, 2024
1 parent 5d2e3a4 commit 7d51291
Showing 1 changed file with 197 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,56 @@ suite("test_stream_load_where_delete_expr", "p0") {
(
"replication_num" = "1"
);"""

def uniquePartitionSql = """CREATE TABLE ${tableName}(
user_id BIGINT NOT NULL COMMENT "用户 ID",
age INT COMMENT "用户年龄",
name VARCHAR(20) COMMENT "用户姓名"
)
ENGINE=OLAP
UNIQUE KEY(user_id,age)
PARTITION BY RANGE(`age`)
(
PARTITION `p1` VALUES LESS THAN ("35"),
PARTITION `p2` VALUES LESS THAN ("65")
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES
(
"replication_num" = "1"
);"""

def dupPartitionSql = """CREATE TABLE ${tableName}(
user_id BIGINT NOT NULL COMMENT "用户 ID",
age INT COMMENT "用户年龄",
name VARCHAR(20) COMMENT "用户姓名"
)
ENGINE=OLAP
DUPLICATE KEY(user_id,age)
PARTITION BY RANGE(`age`)
(
PARTITION `p1` VALUES LESS THAN ("35"),
PARTITION `p2` VALUES LESS THAN ("65")
)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES
(
"replication_num" = "1"
);"""
def dupSql = """CREATE TABLE ${tableName}(
user_id BIGINT NOT NULL COMMENT "用户 ID",
name VARCHAR(20) COMMENT "用户姓名",
age INT COMMENT "用户年龄"
)
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
DUPLICATE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES
(
"replication_num" = "1"
);"""


sql """ DROP TABLE IF EXISTS ${tableName} """
sql uniqueSql
streamLoad {
Expand Down Expand Up @@ -182,7 +221,162 @@ suite("test_stream_load_where_delete_expr", "p0") {
assertEquals(5, json.NumberLoadedRows)
}
}




sql """ DROP TABLE IF EXISTS ${tableName} """
sql uniquePartitionSql
streamLoad {
table "${tableName}"
set 'columns', 'user_id, name, age'
set 'column_separator', ','
set 'column_separator', ','
set 'partitions', 'p1, p2'
set 'max_filter_ratio', '0.5'
set 'where', 'age>=35'

file 'streamload_2.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(4, json.NumberLoadedRows)
}
}


sql """ DROP TABLE IF EXISTS ${tableName} """
sql uniquePartitionSql

streamLoad {
table "${tableName}"
set 'columns', 'user_id, name, age'
set 'column_separator', ','
set 'where', 'age>=35 or name="Olivia"'
set 'partitions', 'p1, p2'
set 'max_filter_ratio', '0.5'
file 'streamload_2.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(5, json.NumberLoadedRows)
}
}


sql """ DROP TABLE IF EXISTS ${tableName} """
sql uniquePartitionSql

streamLoad {
table "${tableName}"
set 'columns', 'user_id, name, age'
set 'column_separator', ','
set 'partitions', 'p1, p2'
set 'max_filter_ratio', '0.5'
file 'streamload_1.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(9, json.NumberLoadedRows)
}
}


streamLoad {
table "${tableName}"
set 'columns', 'user_id, name, age'
set 'column_separator', ','
set 'merge_type', 'DELETE'
file 'streamload_3.csv'
set 'partitions', 'p1, p2'
set 'max_filter_ratio', '0.5'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(8, json.NumberLoadedRows)
}
}


/**
* dupPartition
*/
sql """ DROP TABLE IF EXISTS ${tableName} """
sql dupPartitionSql
streamLoad {
table "${tableName}"
set 'columns', 'user_id, name, age'
set 'column_separator', ','
set 'where', 'age>=35'
set 'partitions', 'p1, p2'
set 'max_filter_ratio', '0.5'
file 'streamload_2.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(4, json.NumberLoadedRows)
}
}


sql """ DROP TABLE IF EXISTS ${tableName} """
sql dupPartitionSql

streamLoad {
table "${tableName}"
set 'columns', 'user_id, name, age'
set 'column_separator', ','
set 'where', 'age>=35 or name="Olivia"'
set 'partitions', 'p1, p2'
set 'max_filter_ratio', '0.5'
file 'streamload_2.csv'
time 10000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(10, json.NumberTotalRows)
assertEquals(5, json.NumberLoadedRows)
}
}

}

0 comments on commit 7d51291

Please sign in to comment.