Skip to content

Commit

Permalink
Add new setting enable_new_copy_for_text_formats.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Feb 8, 2024
1 parent f9817e0 commit a0a696d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: None,
}),
("enable_new_copy_for_text_formats", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Use new implementation for loading CSV files.",
mode: SettingMode::Both,
range: None,
}),
("timezone", DefaultSettingValue {
value: UserSettingValue::String("UTC".to_owned()),
desc: "Sets the timezone.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ impl Settings {
self.try_get_u64("input_read_buffer_size")
}

pub fn get_enable_new_copy_for_text_formats(&self) -> Result<u64> {
self.try_get_u64("enable_new_copy_for_text_formats")
}

pub fn get_enable_bushy_join(&self) -> Result<u64> {
self.try_get_u64("enable_bushy_join")
}
Expand Down
8 changes: 6 additions & 2 deletions src/query/storages/stage/src/stage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,15 @@ impl Table for StageTable {
_push_downs: Option<PushDownInfo>,
_dry_run: bool,
) -> Result<(PartStatistics, Partitions)> {
let settings = ctx.get_settings();
let stage_table_info = &self.table_info;
match stage_table_info.stage_info.file_format_params {
FileFormatParams::Parquet(_) => {
ParquetTableForCopy::do_read_partitions(stage_table_info, ctx, _push_downs).await
}
FileFormatParams::Csv(_) => self.read_partitions_simple(stage_table_info).await,
FileFormatParams::Csv(_) if settings.get_enable_new_copy_for_text_formats()? == 1 => {
self.read_partitions_simple(stage_table_info).await
}
_ => self.read_partition_old(&ctx).await,
}
}
Expand All @@ -169,6 +172,7 @@ impl Table for StageTable {
pipeline: &mut Pipeline,
_put_cache: bool,
) -> Result<()> {
let settings = ctx.get_settings();
let stage_table_info =
if let DataSourceInfo::StageSource(stage_table_info) = &plan.source_info {
stage_table_info
Expand All @@ -179,7 +183,7 @@ impl Table for StageTable {
FileFormatParams::Parquet(_) => {
ParquetTableForCopy::do_read_data(ctx, plan, pipeline, _put_cache)
}
FileFormatParams::Csv(_) => {
FileFormatParams::Csv(_) if settings.get_enable_new_copy_for_text_formats()? == 1 => {
let compact_threshold = self.get_block_compact_thresholds_with_default();
RowBasedReadPipelineBuilder {
stage_table_info,
Expand Down

0 comments on commit a0a696d

Please sign in to comment.