Skip to content

Commit

Permalink
feat: EMPTY as default for TSV and NDJSON. (#13321)
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun authored Oct 18, 2023
1 parent fc80217 commit c22249d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use bstr::ByteSlice;
use common_exception::Result;
use common_expression::ColumnBuilder;
use common_expression::Scalar;
use common_expression::TableSchemaRef;
use common_formats::FieldDecoder;
use common_formats::FieldJsonAstDecoder;
Expand Down Expand Up @@ -44,6 +45,7 @@ impl InputFormatNDJson {
buf: &[u8],
columns: &mut [ColumnBuilder],
schema: &TableSchemaRef,
default_values: &Option<Vec<Scalar>>,
) -> std::result::Result<(), FileParseError> {
let mut json: serde_json::Value =
serde_json::from_reader(buf).map_err(|e| FileParseError::InvalidNDJsonRow {
Expand All @@ -68,20 +70,32 @@ impl InputFormatNDJson {
for ((column_index, field), column) in
schema.fields().iter().enumerate().zip(columns.iter_mut())
{
let value = if field_decoder.ident_case_sensitive {
&json[field.name().to_owned()]
let field_name = if field_decoder.ident_case_sensitive {
field.name().to_owned()
} else {
&json[field.name().to_lowercase()]
field.name().to_lowercase()
};
field_decoder.read_field(column, value).map_err(|e| {
FileParseError::ColumnDecodeError {
column_index,
column_name: field.name().to_owned(),
column_type: field.data_type.to_string(),
decode_error: e.to_string(),
column_data: truncate_column_data(value.to_string()),
let value = &json[field_name];
if value == &serde_json::Value::Null {
match default_values {
None => {
column.push_default();
}
Some(values) => {
column.push(values[column_index].as_ref());
}
}
})?;
} else {
field_decoder.read_field(column, value).map_err(|e| {
FileParseError::ColumnDecodeError {
column_index,
column_name: field.name().to_owned(),
column_type: field.data_type.to_string(),
decode_error: e.to_string(),
column_data: truncate_column_data(value.to_string()),
}
})?;
}
}
}
Ok(())
Expand Down Expand Up @@ -126,7 +140,13 @@ impl InputFormatTextBase for InputFormatNDJson {
let buf = &batch.data[start..*end];
let buf = buf.trim();
if !buf.is_empty() {
if let Err(e) = Self::read_row(field_decoder, buf, columns, &builder.ctx.schema) {
if let Err(e) = Self::read_row(
field_decoder,
buf,
columns,
&builder.ctx.schema,
&builder.ctx.default_values,
) {
builder.ctx.on_error(
e,
Some((columns, builder.num_rows)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;

use common_exception::Result;
use common_expression::ColumnBuilder;
use common_expression::Scalar;
use common_expression::TableSchemaRef;
use common_formats::FieldDecoder;
use common_formats::FieldDecoderRowBased;
Expand Down Expand Up @@ -50,9 +51,17 @@ impl InputFormatTSV {
col_data: &[u8],
column_index: usize,
schema: &TableSchemaRef,
default_values: &Option<Vec<Scalar>>,
) -> std::result::Result<(), FileParseError> {
if col_data.is_empty() {
builder.push_default();
match default_values {
None => {
builder.push_default();
}
Some(values) => {
builder.push(values[column_index].as_ref());
}
}
Ok(())
} else {
let mut reader = Cursor::new(col_data);
Expand All @@ -75,6 +84,7 @@ impl InputFormatTSV {
columns: &mut Vec<ColumnBuilder>,
schema: &TableSchemaRef,
columns_to_read: &Option<Vec<usize>>,
default_values: &Option<Vec<Scalar>>,
) -> std::result::Result<(), FileParseError> {
let num_columns = columns.len();
let mut column_index = 0;
Expand All @@ -94,6 +104,7 @@ impl InputFormatTSV {
&buf[field_start..field_end],
column_index,
schema,
default_values,
) {
error = Some(e);
break;
Expand Down Expand Up @@ -123,6 +134,7 @@ impl InputFormatTSV {
&buf[field_start..field_end],
column_index,
schema,
default_values,
) {
error = Some(err);
break;
Expand Down Expand Up @@ -216,6 +228,7 @@ impl InputFormatTextBase for InputFormatTSV {
columns,
schema,
&builder.projection,
&builder.ctx.default_values,
) {
builder.ctx.on_error(
e,
Expand Down
18 changes: 18 additions & 0 deletions tests/sqllogictests/suites/stage/formats/ndjson/ndjson_default
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
statement ok
drop table if exists t

statement ok
create table t(b int, b1 int, b2 int default 1)

query TIITI
copy into t from @data/ndjson/json_sample.ndjson file_format = (type = NDJSON)
----
ndjson/json_sample.ndjson 4 0 NULL NULL

query
select * from t order by b;
----
1 NULL 1
2 NULL 1
3 NULL 1
4 NULL 1
15 changes: 15 additions & 0 deletions tests/sqllogictests/suites/stage/formats/tsv/tsv_default
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
statement ok
drop table if exists it

statement ok
create table it(c1 int default 1, c2 int, c3 int null, c4 int not null, c5 int not null default 5)

query TIITI
copy into it from @data/csv/empty.csv file_format = (type = TSV, field_delimiter=',')
----
csv/empty.csv 1 0 NULL NULL

query
select * from it;
----
1 2 NULL 0 5

1 comment on commit c22249d

@vercel
Copy link

@vercel vercel bot commented on c22249d Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.