Skip to content

Commit

Permalink
fix(iceberg): fix map - arrow convert & fix sink nested types to iceb…
Browse files Browse the repository at this point in the history
…erg (#18463)

Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Sep 11, 2024
1 parent 7833509 commit 00150a3
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ e2e_test/generated/*
scale-test.tar.zst
simulation-it-test.tar.zst


# hummock-trace
.trace

# spark binary
e2e_test/iceberg/spark-*-bin*
e2e_test/iceberg/metastore_db

**/poetry.lock

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ tonic-build = { package = "madsim-tonic-build", version = "0.5" }
otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "e6cd165b9bc85783b42c106e99186b86b73e3507" }
prost = { version = "0.13" }
prost-build = { version = "0.13" }
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "1860eb315183a5f3f72b4097c1e40d49407f8373", features = [
# branch dylan/fix_parquet_nested_type_field_id
icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "3f4724158acee37a4785f56670a1427993a58739", features = [
"prometheus",
] }
arrow-array-iceberg = { package = "arrow-array", version = "52" }
Expand Down
42 changes: 37 additions & 5 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,23 @@ def execute_slt(args, slt):

def verify_result(args, verify_sql, verify_schema, verify_data):
tc = unittest.TestCase()
print(f"Executing sql: {verify_sql}")

time.sleep(3)
print(f"verify_result:\nExecuting sql: {verify_sql}")
spark = get_spark(args)
df = spark.sql(verify_sql).collect()
print(f"Result:")
print(f"================")
for row in df:
print(row)
print(f"================")
rows = verify_data.splitlines()
tc.assertEqual(len(df), len(rows))
tc.assertEqual(len(df), len(rows), "row length mismatch")
tc.assertEqual(len(verify_schema), len(df[0]), "column length mismatch")
for row1, row2 in zip(df, rows):
print(f"Row1: {row1}, Row 2: {row2}")
row2 = row2.split(",")
# New parsing logic for row2
row2 = parse_row(row2)
for idx, ty in enumerate(verify_schema):
if ty == "int" or ty == "long":
tc.assertEqual(row1[idx], int(row2[idx]))
Expand All @@ -89,7 +96,7 @@ def verify_result(args, verify_sql, verify_schema, verify_data):
else:
tc.assertEqual(row1[idx], decimal.Decimal(row2[idx]))
else:
tc.fail(f"Unsupported type {ty}")
tc.assertEqual(str(row1[idx]), str(row2[idx]))

def compare_sql(args, cmp_sqls):
assert len(cmp_sqls) == 2
Expand All @@ -113,6 +120,32 @@ def drop_table(args, drop_sqls):
spark.sql(sql)


def parse_row(row):
result = []
current = ""
parenthesis_count = {"{": 0, "[": 0, "(": 0}
for char in row:
if char in parenthesis_count:
parenthesis_count[char] += 1
elif char == "}":
parenthesis_count["{"] -= 1
elif char == "]":
parenthesis_count["["] -= 1
elif char == ")":
parenthesis_count["("] -= 1

if char == "," and all(value == 0 for value in parenthesis_count.values()):
result.append(current.strip())
current = ""
else:
current += char

if current:
result.append(current.strip())

return result


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test script for iceberg")
parser.add_argument("-t", dest="test_case", type=str, help="Test case file")
Expand Down Expand Up @@ -151,4 +184,3 @@ def drop_table(args, drop_sqls):
execute_slt(config, verify_slt)
if drop_sqls is not None and drop_sqls != "":
drop_table(config, drop_sqls)

2 changes: 2 additions & 0 deletions e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

set -ex

ICEBERG_VERSION=1.4.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ v_bool boolean,
v_date date,
v_timestamp timestamptz,
v_ts_ntz timestamp,
v_decimal decimal
v_decimal decimal,
v_map map(int, int),
v_array int[],
v_struct struct<a int,b int>
);

statement ok
Expand All @@ -41,10 +44,10 @@ CREATE SINK s6 AS select * from mv6 WITH (

statement ok
INSERT INTO t6 VALUES
(1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11),
(2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22),
(3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf'),
(4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf');
(1, 1, 1000, 1.1, 1.11, '1-1', true, '2022-03-11', '2022-03-11 01:00:00Z'::timestamptz, '2022-03-11 01:00:00',1.11, map {1:100,2:200}, array[1,2,3], row(1,2)),
(2, 2, 2000, 2.2, 2.22, '2-2', false, '2022-03-12', '2022-03-12 02:00:00Z'::timestamptz, '2022-03-12 02:00:00',2.22, map {3:300}, array[1,null,3], row(3,null)),
(3, 3, 3000, 3.3, 3.33, '3-3', true, '2022-03-13', '2022-03-13 03:00:00Z'::timestamptz, '2022-03-13 03:00:00','inf', null, null, null),
(4, 4, 4000, 4.4, 4.44, '4-4', false, '2022-03-14', '2022-03-14 04:00:00Z'::timestamptz, '2022-03-14 04:00:00','-inf', null, null, null);

statement ok
FLUSH;
Expand All @@ -53,7 +56,7 @@ sleep 5s

statement ok
INSERT INTO t6 VALUES
(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00','nan');
(5, 5, 5000, 5.5, 5.55, '5-5', true, '2022-03-15', '2022-03-15 05:00:00Z'::timestamptz, '2022-03-15 05:00:00','nan', null, null, null);

statement ok
FLUSH;
Expand Down
17 changes: 10 additions & 7 deletions e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,27 @@ init_sqls = [
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz,
v_decimal decimal(10,5)
v_decimal decimal(10,5),
v_map map<int, int>,
v_array array<int>,
v_struct struct<a:int, b:int>
) USING iceberg TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_no_partition_append_only_table.slt'

verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz','decimal']
verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz','decimal', 'map', 'array', 'struct']

verify_sql = 'SELECT * FROM demo_db.no_partition_append_only_table ORDER BY id ASC'


verify_data = """
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00,1.11
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00,2.22
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00,99999.99999
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00,-99999.99999
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00,1.11,{1: 100, 2: 200},[1, 2, 3],Row(a=1, b=2)
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00,2.22,{3: 300},[1, None, 3],Row(a=3, b=None)
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00,99999.99999,None,None,None
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00,-99999.99999,None,None,None
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00,none,None,None,None
"""

verify_slt = 'test_case/iceberg_sink_no_partition_append_only_table_verify.slt'
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/no_partition_upsert.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ init_sqls = [

slt = 'test_case/iceberg_sink_no_partition_upsert_table.slt'

verify_schema = ['int','int','long','string']
verify_schema = ['int','int','long','string','date']

verify_sql = 'SELECT * FROM demo_db.no_partition_upsert_table ORDER BY id, v1 ASC'

Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/partition_upsert.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ init_sqls = [

slt = 'test_case/iceberg_sink_partition_upsert_table.slt'

verify_schema = ['int','int','long','string']
verify_schema = ['int','int','long','string', 'date']

verify_sql = 'SELECT * FROM demo_db.partition_upsert_table ORDER BY id, v1 ASC'

Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/range_partition_upsert.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ init_sqls = [

slt = 'test_case/iceberg_sink_range_partition_upsert_table.slt'

verify_schema = ['int','int','long','string']
verify_schema = ['int','int','long','string','date']

verify_sql = 'SELECT * FROM demo_db.range_partition_upsert_table ORDER BY id, v1 ASC'

Expand Down
11 changes: 8 additions & 3 deletions src/common/src/array/arrow/arrow_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,17 @@ pub trait ToArrow {
#[inline]
fn map_type_to_arrow(&self, map_type: &MapType) -> Result<arrow_schema::DataType, ArrayError> {
let sorted = false;
let list_type = map_type.clone().into_list();
// "key" is always non-null
let key = self
.to_arrow_field("key", map_type.key())?
.with_nullable(false);
let value = self.to_arrow_field("value", map_type.value())?;
Ok(arrow_schema::DataType::Map(
Arc::new(arrow_schema::Field::new(
"entries",
self.list_type_to_arrow(&list_type)?,
true,
arrow_schema::DataType::Struct([Arc::new(key), Arc::new(value)].into()),
// "entries" is always non-null
false,
)),
sorted,
))
Expand Down
117 changes: 117 additions & 0 deletions src/common/src/array/arrow/arrow_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl FromArrow for UdfArrowConvert {

#[cfg(test)]
mod tests {

use super::*;
use crate::array::*;

Expand Down Expand Up @@ -205,4 +206,120 @@ mod tests {
.unwrap();
assert_eq!(rw_array.as_list(), &array);
}

#[test]
fn map() {
let map_type = MapType::from_kv(DataType::Varchar, DataType::Int32);
let rw_map_type = DataType::Map(map_type.clone());
let mut builder = MapArrayBuilder::with_type(3, rw_map_type.clone());
builder.append_owned(Some(
MapValue::try_from_kv(
ListValue::from_str("{a,b,c}", &DataType::List(Box::new(DataType::Varchar)))
.unwrap(),
ListValue::from_str("{1,2,3}", &DataType::List(Box::new(DataType::Int32))).unwrap(),
)
.unwrap(),
));
builder.append_owned(None);
builder.append_owned(Some(
MapValue::try_from_kv(
ListValue::from_str("{a,c}", &DataType::List(Box::new(DataType::Varchar))).unwrap(),
ListValue::from_str("{1,3}", &DataType::List(Box::new(DataType::Int32))).unwrap(),
)
.unwrap(),
));
let rw_array = builder.finish();

let arrow_map_type = UdfArrowConvert::default()
.map_type_to_arrow(&map_type)
.unwrap();
expect_test::expect![[r#"
Map(
Field {
name: "entries",
data_type: Struct(
[
Field {
name: "key",
data_type: Utf8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
Field {
name: "value",
data_type: Int32,
nullable: true,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
],
),
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
false,
)
"#]]
.assert_debug_eq(&arrow_map_type);
let rw_map_type_new = UdfArrowConvert::default()
.from_field(&arrow_schema::Field::new(
"map",
arrow_map_type.clone(),
true,
))
.unwrap();
assert_eq!(rw_map_type, rw_map_type_new);
let arrow = UdfArrowConvert::default()
.map_to_arrow(&arrow_map_type, &rw_array)
.unwrap();
expect_test::expect![[r#"
MapArray
[
StructArray
[
-- child 0: "key" (Utf8)
StringArray
[
"a",
"b",
"c",
]
-- child 1: "value" (Int32)
PrimitiveArray<Int32>
[
1,
2,
3,
]
],
null,
StructArray
[
-- child 0: "key" (Utf8)
StringArray
[
"a",
"c",
]
-- child 1: "value" (Int32)
PrimitiveArray<Int32>
[
1,
3,
]
],
]
"#]]
.assert_debug_eq(&arrow);

let rw_array_new = UdfArrowConvert::default()
.from_map_array(arrow.as_any().downcast_ref().unwrap())
.unwrap();
assert_eq!(&rw_array, rw_array_new.as_map());
}
}
12 changes: 9 additions & 3 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,15 +1375,21 @@ pub fn try_matches_arrow_schema(
(ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
(ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
(ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
(left, right) => left == right,
// cases where left != right (metadata, field name mismatch)
//
// all nested types: in iceberg `field_id` will always be present, but RW doesn't have it:
// {"PARQUET:field_id": ".."}
//
// map: The standard name in arrow is "entries", "key", "value".
// in iceberg-rs, it's called "key_value"
(left, right) => left.equals_datatype(right),
};
if !compatible {
bail!("Field {}'s type not compatible, risingwave converted data type {}, iceberg's data type: {}",
bail!("field {}'s type is incompatible\nRisingWave converted data type: {}\niceberg's data type: {}",
arrow_field.name(), converted_arrow_data_type, arrow_field.data_type()
);
}
}

Ok(())
}

Expand Down

0 comments on commit 00150a3

Please sign in to comment.