Skip to content

Commit

Permalink
fix: load parquet with missing_field_as get wrong results.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Jan 2, 2024
1 parent dabea9f commit 522b49a
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/query/storages/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![feature(let_chains)]
#![feature(core_intrinsics)]
#![feature(int_roundings)]
#![feature(box_patterns)]
#![allow(clippy::diverging_sub_expression)]

mod parquet2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_arrow::arrow::compute::cast::can_cast_types;
use databend_common_arrow::arrow::datatypes::Field as ArrowField;
use databend_common_catalog::plan::Projection;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
Expand Down Expand Up @@ -76,6 +79,7 @@ impl RowGroupReaderForCopy {
let arrow_schema = infer_schema_with_extension(file_metadata)?;
let schema_descr = file_metadata.schema_descr_ptr();
let parquet_table_schema = arrow_to_table_schema(&arrow_schema)?;
let mut pushdown_columns = vec![];
let mut output_projection = vec![];

let mut num_inputs = 0;
Expand All @@ -87,10 +91,11 @@ impl RowGroupReaderForCopy {
.position(|f| f.name() == field_name)
{
Some(pos) => {
pushdown_columns.push(pos);
let from_field = parquet_table_schema.field(pos);
let expr = Expr::ColumnRef {
span: None,
id: num_inputs,
id: pos,
data_type: from_field.data_type().into(),
display_name: from_field.name().clone(),
};
Expand All @@ -100,7 +105,7 @@ impl RowGroupReaderForCopy {
expr
} else if can_cast_types(
ArrowField::from(from_field).data_type(),
ArrowField::from(from_field).data_type(),
ArrowField::from(to_field).data_type(),
) {
Expr::Cast {
span: None,
Expand Down Expand Up @@ -133,13 +138,34 @@ impl RowGroupReaderForCopy {
"not column name match in parquet file {location}",
)));
}

pushdown_columns.sort();
let mapping = pushdown_columns
.clone()
.into_iter()
.enumerate()
.map(|(i, pos)| (pos, i))
.collect::<HashMap<_, _>>();
for expr in output_projection.iter_mut() {
match expr {
Expr::ColumnRef { id, .. } => *id = mapping[id],
Expr::Cast {
expr: box Expr::ColumnRef { id, .. },
..
} => *id = mapping[id],
_ => {}
}
}
let pushdowns = PushDownInfo {
projection: Some(Projection::Columns(pushdown_columns)),
..Default::default()
};
let mut reader_builder = ParquetRSReaderBuilder::create_with_parquet_schema(
ctx,
op,
Arc::new(parquet_table_schema),
schema_descr,
);
)
.with_push_downs(Some(&pushdowns));
reader_builder.build_output()?;

let row_group_reader_builder = reader_builder.create_no_prefetch_policy_builder()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ impl ParquetTableForCopy {

let operator = init_stage_operator(&stage_table_info.stage_info)?;
// User set the files.
let files = stage_table_info.list_files(None).await?;
let files = stage_table_info.files_to_copy.as_ref().expect(
"ParquetTableForCopy::do_read_partitions must be called with files_to_copy set",
);
let file_infos = files
.iter()
.map(|f| (f.path.clone(), f.size))
Expand Down
Binary file removed tests/data/parquet/diff_schema/c1c2c3.parquet
Binary file not shown.
Binary file removed tests/data/parquet/diff_schema/c2c3c4.parquet
Binary file not shown.
Binary file added tests/data/parquet/diff_schema/f1.parquet
Binary file not shown.
Binary file added tests/data/parquet/diff_schema/f2.parquet
Binary file not shown.
12 changes: 7 additions & 5 deletions tests/data/parquet/diff_schema/gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import pyarrow as pa
import pyarrow.parquet as pq


df = pd.DataFrame({"c1": range(1, 11), "c2": np.int8(range(2, 12)), "c3": range(3, 13)})
df = pd.DataFrame({"c1": range(110, 120), "c2": np.int16(range(120, 130)), "c3": range(130, 140)})
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table(table, f"c1c2c3.parquet", row_group_size=3)
pq.write_table(table, f"f1.parquet", row_group_size=3)

df = pd.DataFrame(
{"c2": range(220, 230), "c4": map(str, range(240, 250)), "c5": range(250, 260), "c6": range(260, 270)})

df = pd.DataFrame({"c2": range(12, 22), "c3": range(13, 23), "c4": map(str, range(14, 24))})
df = df[["c6", "c5", "c2", "c4"]]
table = pa.Table.from_pandas(df, preserve_index=False)
pq.write_table( table, f"c2c3c4.parquet", row_group_size=3)
pq.write_table(table, f"f2.parquet", row_group_size=3)
Original file line number Diff line number Diff line change
@@ -1,38 +1,57 @@
statement ok
drop table if exists c1234
drop table if exists t1

statement ok
create table c1234 (c1 int, c2 int, c3 int64, c4 string default 'ok')
create table t1 (c1 int, c2 int, c3 int64, c4 string default 'ok')

query
select * from infer_schema(location => '@data/parquet/diff_schema/f1.parquet')
----
c1 BIGINT 1 0
c2 SMALLINT 1 1
c3 BIGINT 1 2

query
select * from infer_schema(location => '@data/parquet/diff_schema/f2.parquet')
----
c6 BIGINT 1 0
c5 BIGINT 1 1
c2 BIGINT 1 2
c4 VARCHAR 1 3

query error get diff schema
copy into c1234 from @data/parquet/diff_schema/ file_format=(type=parquet) pattern='.*[.]parquet'
copy into t1 from @data/parquet/diff_schema/ file_format=(type=parquet) pattern='.*[.]parquet'

query
copy into t1 from @data/parquet/diff_schema/ file_format=(type=parquet missing_field_as='field_default') pattern='.*[.]parquet'
----
parquet/diff_schema/f1.parquet 10 0 NULL NULL
parquet/diff_schema/f2.parquet 10 0 NULL NULL

query
copy into c1234 from @data/parquet/diff_schema/ file_format=(type=parquet missing_field_as='field_default') pattern='.*[.]parquet'
select * from t1 order by c1,c2,c3,c4
----
parquet/diff_schema/c1c2c3.parquet 10 0 NULL NULL
parquet/diff_schema/c2c3c4.parquet 10 0 NULL NULL
110 120 130 ok
111 121 131 ok
112 122 132 ok
113 123 133 ok
114 124 134 ok
115 125 135 ok
116 126 136 ok
117 127 137 ok
118 128 138 ok
119 129 139 ok
NULL 220 NULL 240
NULL 221 NULL 241
NULL 222 NULL 242
NULL 223 NULL 243
NULL 224 NULL 244
NULL 225 NULL 245
NULL 226 NULL 246
NULL 227 NULL 247
NULL 228 NULL 248
NULL 229 NULL 249

query
select * from c1234 order by c1,c2,c3,c4
copy into t1 from @data/parquet/diff_schema/ file_format=(type=parquet missing_field_as='field_default') pattern='.*[.]parquet'
----
1 2 3 ok
2 3 4 ok
3 4 5 ok
4 5 6 ok
5 6 7 ok
6 7 8 ok
7 8 9 ok
8 9 10 ok
9 10 11 ok
10 11 12 ok
NULL 12 13 14
NULL 13 14 15
NULL 14 15 16
NULL 15 16 17
NULL 16 17 18
NULL 17 18 19
NULL 18 19 20
NULL 19 20 21
NULL 20 21 22
NULL 21 22 23

0 comments on commit 522b49a

Please sign in to comment.