From 682039ffc942e5edec3f2a358e8b295622cb102b Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 25 Nov 2024 15:08:42 +0800 Subject: [PATCH] feat: disable specifying copy options when create stage. (#16925) feat disable specifying copy options when create stage. --- Cargo.lock | 2 + src/common/storage/Cargo.toml | 1 + src/common/storage/src/copy.rs | 2 +- src/meta/app/src/principal/user_stage.rs | 91 +--- src/query/ast/src/ast/format/syntax/dml.rs | 19 +- src/query/ast/src/ast/statements/copy.rs | 223 ++++++++-- src/query/ast/src/ast/statements/principal.rs | 60 --- src/query/ast/src/ast/statements/stage.rs | 15 - src/query/ast/src/parser/copy.rs | 24 +- src/query/ast/src/parser/statement.rs | 11 - src/query/ast/src/parser/token.rs | 2 - .../ast/tests/it/testdata/stmt-error.txt | 4 +- src/query/ast/tests/it/testdata/stmt.txt | 392 ++++++++++-------- .../plan/datasource/datasource_info/stage.rs | 4 +- .../interpreter_copy_into_location.rs | 1 + .../interpreter_copy_into_table.rs | 11 +- .../src/interpreters/interpreter_replace.rs | 13 +- .../builders/builder_copy_into_table.rs | 9 +- .../pipelines/builders/builder_on_finished.rs | 12 +- src/query/service/src/sessions/query_ctx.rs | 3 + .../service/src/sessions/query_ctx_shared.rs | 2 +- .../it/storages/testdata/columns_table.txt | 1 - .../physical_copy_into_table.rs | 1 - .../sql/src/planner/binder/copy_into_table.rs | 74 +--- src/query/sql/src/planner/binder/ddl/stage.rs | 16 - .../sql/src/planner/plans/copy_into_table.rs | 17 +- src/query/sql/src/planner/plans/insert.rs | 2 - src/query/storages/stage/Cargo.toml | 1 + .../storages/stage/src/read/error_handler.rs | 2 +- .../storages/stage/src/read/load_context.rs | 7 +- src/query/storages/system/src/stages_table.rs | 4 - 31 files changed, 502 insertions(+), 524 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc38613c7e9b..b25f028bcba7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4073,6 +4073,7 @@ dependencies = [ "async-backtrace", "chrono", "dashmap 6.1.0", + "databend-common-ast", "databend-common-auth", "databend-common-base", "databend-common-exception", @@ -4461,6 +4462,7 @@ dependencies = [ "async-trait", "bstr", "csv-core", + "databend-common-ast", "databend-common-base", "databend-common-building", "databend-common-catalog", diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index bc47fa73a375..f1990cdb5d73 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -15,6 +15,7 @@ arrow-schema = { workspace = true } async-backtrace = { workspace = true } chrono = { workspace = true } dashmap = { workspace = true, features = ["serde"] } +databend-common-ast = { workspace = true } databend-common-auth = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } diff --git a/src/common/storage/src/copy.rs b/src/common/storage/src/copy.rs index 95ea628b7a6f..97726787d452 100644 --- a/src/common/storage/src/copy.rs +++ b/src/common/storage/src/copy.rs @@ -14,8 +14,8 @@ use dashmap::mapref::entry::Entry; use dashmap::DashMap; +use databend_common_ast::ast::OnErrorMode; use databend_common_exception::ErrorCode; -use databend_common_meta_app::principal::OnErrorMode; use serde::Deserialize; use serde::Serialize; use thiserror::Error; diff --git a/src/meta/app/src/principal/user_stage.rs b/src/meta/app/src/principal/user_stage.rs index c116bb98aae8..ddeeca304de7 100644 --- a/src/meta/app/src/principal/user_stage.rs +++ b/src/meta/app/src/principal/user_stage.rs @@ -34,7 +34,6 @@ use crate::storage::StorageParams; // internalStageParams // directoryTableParams // [ FILE_FORMAT = ( { FORMAT_NAME = '' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] ) } ] -// [ COPY_OPTIONS = ( copyOptions ) ] // [ COMMENT = '' ] // // -- External stage @@ -42,7 +41,6 @@ use crate::storage::StorageParams; // externalStageParams // directoryTableParams // [ FILE_FORMAT = ( { FORMAT_NAME = '' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] ) } ] -// [ COPY_OPTIONS = ( copyOptions ) ] // [ COMMENT = '' ] // // @@ -52,7 +50,6 @@ use crate::storage::StorageParams; // 's3://[//]' // [ { CREDENTIALS = ( { { AWS_KEY_ID = '' AWS_SECRET_KEY = '' [ AWS_TOKEN = '' ] } | AWS_ROLE = '' } ) ) } ] // -// copyOptions ::= // ON_ERROR = { CONTINUE | SKIP_FILE | SKIP_FILE_ | SKIP_FILE_% | ABORT_STATEMENT } // SIZE_LIMIT = @@ -403,7 +400,7 @@ pub struct StageParams { pub storage: StorageParams, } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Copy)] pub enum OnErrorMode { Continue, SkipFileNum(u64), @@ -509,92 +506,6 @@ pub struct CopyOptions { pub detailed_output: bool, } -impl CopyOptions { - pub fn apply(&mut self, opts: &BTreeMap, ignore_unknown: bool) -> Result<()> { - if opts.is_empty() { - return Ok(()); - } - for (k, v) in opts.iter() { - match k.as_str() { - "on_error" => { - let on_error = OnErrorMode::from_str(v)?; - self.on_error = on_error; - } - "size_limit" => { - let size_limit = usize::from_str(v)?; - self.size_limit = size_limit; - } - "max_files" => { - let max_files = usize::from_str(v)?; - self.max_files = max_files; - } - "split_size" => { - let split_size = usize::from_str(v)?; - self.split_size = split_size; - } - "purge" => { - let purge = bool::from_str(v).map_err(|_| { - ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v)) - })?; - self.purge = purge; - } - "single" => { - let single = bool::from_str(v).map_err(|_| { - ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v)) - })?; - self.single = single; - } - "max_file_size" => { - let max_file_size = usize::from_str(v)?; - self.max_file_size = max_file_size; - } - "disable_variant_check" => { - let disable_variant_check = bool::from_str(v).map_err(|_| { - ErrorCode::StrParseError(format!( - "Cannot parse disable_variant_check: {} as bool", - v - )) - })?; - self.disable_variant_check = disable_variant_check; - } - "return_failed_only" => { - let return_failed_only = bool::from_str(v).map_err(|_| { - ErrorCode::StrParseError(format!( - "Cannot parse return_failed_only: {} as bool", - v - )) - })?; - self.return_failed_only = return_failed_only; - } - _ => { - if !ignore_unknown { - return Err(ErrorCode::BadArguments(format!( - "Unknown stage copy option {}", - k - ))); - } - } - } - } - Ok(()) - } -} - -impl Display for CopyOptions { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "OnErrorMode {}", self.on_error)?; - write!(f, "SizeLimit {}", self.size_limit)?; - write!(f, "MaxFiles {}", self.max_files)?; - write!(f, "SplitSize {}", self.split_size)?; - write!(f, "Purge {}", self.purge)?; - write!(f, "DisableVariantCheck {}", self.disable_variant_check)?; - write!(f, "ReturnFailedOnly {}", self.return_failed_only)?; - write!(f, "MaxFileSize {}", self.max_file_size)?; - write!(f, "Single {}", self.single)?; - write!(f, "DetailedOutput {}", self.detailed_output) - } -} - #[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)] #[serde(default)] pub struct StageInfo { diff --git a/src/query/ast/src/ast/format/syntax/dml.rs b/src/query/ast/src/ast/format/syntax/dml.rs index fc270b5e43b0..440368b25e7a 100644 --- a/src/query/ast/src/ast/format/syntax/dml.rs +++ b/src/query/ast/src/ast/format/syntax/dml.rs @@ -188,36 +188,39 @@ pub(crate) fn pretty_copy_into_table(copy_stmt: CopyIntoTableStmt) -> RcDoc<'sta } else { RcDoc::nil() }) - .append(if !copy_stmt.validation_mode.is_empty() { + .append(if !copy_stmt.options.validation_mode.is_empty() { RcDoc::line() .append(RcDoc::text("VALIDATION_MODE = ")) - .append(RcDoc::text(copy_stmt.validation_mode)) + .append(RcDoc::text(copy_stmt.options.validation_mode)) } else { RcDoc::nil() }) - .append(if copy_stmt.size_limit != 0 { + .append(if copy_stmt.options.size_limit != 0 { RcDoc::line() .append(RcDoc::text("SIZE_LIMIT = ")) - .append(RcDoc::text(format!("{}", copy_stmt.size_limit))) + .append(RcDoc::text(format!("{}", copy_stmt.options.size_limit))) } else { RcDoc::nil() }) - .append(if copy_stmt.max_files != 0 { + .append(if copy_stmt.options.max_files != 0 { RcDoc::line() .append(RcDoc::text("MAX_FILES = ")) - .append(RcDoc::text(format!("{}", copy_stmt.max_files))) + .append(RcDoc::text(format!("{}", copy_stmt.options.max_files))) } else { RcDoc::nil() }) .append( RcDoc::line() .append(RcDoc::text("PURGE = ")) - .append(RcDoc::text(format!("{}", copy_stmt.purge))), + .append(RcDoc::text(format!("{}", copy_stmt.options.purge))), ) .append( RcDoc::line() .append(RcDoc::text("DISABLE_VARIANT_CHECK = ")) - .append(RcDoc::text(format!("{}", copy_stmt.disable_variant_check))), + .append(RcDoc::text(format!( + "{}", + copy_stmt.options.disable_variant_check + ))), ) } diff --git a/src/query/ast/src/ast/statements/copy.rs b/src/query/ast/src/ast/statements/copy.rs index 3cfaac47e08f..241268dbd4dc 100644 --- a/src/query/ast/src/ast/statements/copy.rs +++ b/src/query/ast/src/ast/statements/copy.rs @@ -14,8 +14,10 @@ use std::collections::BTreeMap; use std::collections::HashSet; +use std::fmt; use std::fmt::Display; use std::fmt::Formatter; +use std::str::FromStr; use derive_visitor::Drive; use derive_visitor::DriveMut; @@ -55,36 +57,29 @@ pub struct CopyIntoTableStmt { // files to load pub files: Option>, pub pattern: Option, - pub force: bool, - // copy options - /// TODO(xuanwo): parse into validation_mode directly. - pub validation_mode: String, - pub size_limit: usize, - pub max_files: usize, - pub split_size: usize, - pub purge: bool, - pub disable_variant_check: bool, - pub return_failed_only: bool, - pub on_error: String, + pub options: CopyIntoTableOptions, } impl CopyIntoTableStmt { - pub fn apply_option(&mut self, opt: CopyIntoTableOption) { + pub fn apply_option( + &mut self, + opt: CopyIntoTableOption, + ) -> std::result::Result<(), &'static str> { match opt { CopyIntoTableOption::Files(v) => self.files = Some(v), CopyIntoTableOption::Pattern(v) => self.pattern = Some(v), CopyIntoTableOption::FileFormat(v) => self.file_format = v, - CopyIntoTableOption::ValidationMode(v) => self.validation_mode = v, - CopyIntoTableOption::SizeLimit(v) => self.size_limit = v, - CopyIntoTableOption::MaxFiles(v) => self.max_files = v, - CopyIntoTableOption::SplitSize(v) => self.split_size = v, - CopyIntoTableOption::Purge(v) => self.purge = v, - CopyIntoTableOption::Force(v) => self.force = v, - CopyIntoTableOption::DisableVariantCheck(v) => self.disable_variant_check = v, - CopyIntoTableOption::ReturnFailedOnly(v) => self.return_failed_only = v, - CopyIntoTableOption::OnError(v) => self.on_error = v, + CopyIntoTableOption::SizeLimit(v) => self.options.size_limit = v, + CopyIntoTableOption::MaxFiles(v) => self.options.max_files = v, + CopyIntoTableOption::SplitSize(v) => self.options.split_size = v, + CopyIntoTableOption::Purge(v) => self.options.purge = v, + CopyIntoTableOption::Force(v) => self.options.force = v, + CopyIntoTableOption::DisableVariantCheck(v) => self.options.disable_variant_check = v, + CopyIntoTableOption::ReturnFailedOnly(v) => self.options.return_failed_only = v, + CopyIntoTableOption::OnError(v) => self.options.on_error = OnErrorMode::from_str(&v)?, } + Ok(()) } } @@ -117,32 +112,119 @@ impl Display for CopyIntoTableStmt { write!(f, " FILE_FORMAT = ({})", self.file_format)?; } - if !self.validation_mode.is_empty() { - write!(f, "VALIDATION_MODE = {}", self.validation_mode)?; + if !self.options.validation_mode.is_empty() { + write!(f, "VALIDATION_MODE = {}", self.options.validation_mode)?; } - if self.size_limit != 0 { - write!(f, " SIZE_LIMIT = {}", self.size_limit)?; + if self.options.size_limit != 0 { + write!(f, " SIZE_LIMIT = {}", self.options.size_limit)?; } - if self.max_files != 0 { - write!(f, " MAX_FILES = {}", self.max_files)?; + if self.options.max_files != 0 { + write!(f, " MAX_FILES = {}", self.options.max_files)?; } - if self.split_size != 0 { - write!(f, " SPLIT_SIZE = {}", self.split_size)?; + if self.options.split_size != 0 { + write!(f, " SPLIT_SIZE = {}", self.options.split_size)?; } - write!(f, " PURGE = {}", self.purge)?; - write!(f, " FORCE = {}", self.force)?; - write!(f, " DISABLE_VARIANT_CHECK = {}", self.disable_variant_check)?; - write!(f, " ON_ERROR = {}", self.on_error)?; - write!(f, " RETURN_FAILED_ONLY = {}", self.return_failed_only)?; + write!(f, " PURGE = {}", self.options.purge)?; + write!(f, " FORCE = {}", self.options.force)?; + write!( + f, + " DISABLE_VARIANT_CHECK = {}", + self.options.disable_variant_check + )?; + write!(f, " ON_ERROR = {}", self.options.on_error)?; + write!( + f, + " RETURN_FAILED_ONLY = {}", + self.options.return_failed_only + )?; Ok(()) } } +#[derive( + serde::Serialize, serde::Deserialize, Debug, Clone, Default, PartialEq, Drive, DriveMut, Eq, +)] +pub struct CopyIntoTableOptions { + pub on_error: OnErrorMode, + pub size_limit: usize, + pub max_files: usize, + pub split_size: usize, + pub force: bool, + pub purge: bool, + pub disable_variant_check: bool, + pub return_failed_only: bool, + pub validation_mode: String, +} + +impl CopyIntoTableOptions { + fn parse_uint(k: &str, v: &String) -> std::result::Result { + usize::from_str(v).map_err(|e| format!("can not parse {}={} as uint: {}", k, v, e)) + } + fn parse_bool(k: &str, v: &String) -> std::result::Result { + bool::from_str(v).map_err(|e| format!("can not parse {}={} as bool: {}", k, v, e)) + } + + pub fn apply( + &mut self, + opts: &BTreeMap, + ignore_unknown: bool, + ) -> std::result::Result<(), String> { + if opts.is_empty() { + return Ok(()); + } + for (k, v) in opts.iter() { + match k.as_str() { + "on_error" => { + let on_error = OnErrorMode::from_str(v)?; + self.on_error = on_error; + } + "size_limit" => { + self.size_limit = Self::parse_uint(k, v)?; + } + "max_files" => { + self.max_files = Self::parse_uint(k, v)?; + } + "split_size" => { + self.split_size = Self::parse_uint(k, v)?; + } + "purge" => { + self.purge = Self::parse_bool(k, v)?; + } + "disable_variant_check" => { + self.disable_variant_check = Self::parse_bool(k, v)?; + } + "return_failed_only" => { + self.return_failed_only = Self::parse_bool(k, v)?; + } + _ => { + if !ignore_unknown { + return Err(format!("Unknown stage copy option {}", k)); + } + } + } + } + Ok(()) + } +} + +impl Display for CopyIntoTableOptions { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "OnErrorMode {}", self.on_error)?; + write!(f, "SizeLimit {}", self.size_limit)?; + write!(f, "MaxFiles {}", self.max_files)?; + write!(f, "SplitSize {}", self.split_size)?; + write!(f, "Purge {}", self.purge)?; + write!(f, "DisableVariantCheck {}", self.disable_variant_check)?; + write!(f, "ReturnFailedOnly {}", self.return_failed_only)?; + Ok(()) + } +} + #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)] pub struct CopyIntoLocationOptions { pub single: bool, @@ -239,7 +321,7 @@ impl Display for CopyIntoTableSource { #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub enum CopyIntoLocationSource { Query(Box), - /// it will be rewrite as `(SELECT * FROM table)` + /// it will be rewritten as `(SELECT * FROM table)` Table(TableRef), } @@ -482,7 +564,6 @@ pub enum CopyIntoTableOption { Files(Vec), Pattern(LiteralStringOrVariable), FileFormat(FileFormatOptions), - ValidationMode(String), SizeLimit(usize), MaxFiles(usize), SplitSize(usize), @@ -563,3 +644,73 @@ impl Display for FileFormatValue { } } } + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Drive, DriveMut, Eq)] +pub enum OnErrorMode { + Continue, + SkipFileNum(u64), + AbortNum(u64), +} + +impl Default for OnErrorMode { + fn default() -> Self { + Self::AbortNum(1) + } +} + +impl Display for OnErrorMode { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self { + OnErrorMode::Continue => { + write!(f, "continue") + } + OnErrorMode::SkipFileNum(n) => { + if *n <= 1 { + write!(f, "skipfile") + } else { + write!(f, "skipfile_{}", n) + } + } + OnErrorMode::AbortNum(n) => { + if *n <= 1 { + write!(f, "abort") + } else { + write!(f, "abort_{}", n) + } + } + } + } +} + +const ERROR_MODE_MSG: &str = + "OnError must one of {{ CONTINUE | SKIP_FILE | SKIP_FILE_ | ABORT | ABORT_ }}"; +impl FromStr for OnErrorMode { + type Err = &'static str; + + fn from_str(s: &str) -> std::result::Result { + match s.to_uppercase().as_str() { + "" | "ABORT" => Ok(OnErrorMode::AbortNum(1)), + "CONTINUE" => Ok(OnErrorMode::Continue), + "SKIP_FILE" => Ok(OnErrorMode::SkipFileNum(1)), + v => { + if v.starts_with("ABORT_") { + let num_str = v.replace("ABORT_", ""); + let nums = num_str.parse::(); + match nums { + Ok(n) if n < 1 => Err(ERROR_MODE_MSG), + Ok(n) => Ok(OnErrorMode::AbortNum(n)), + Err(_) => Err(ERROR_MODE_MSG), + } + } else { + let num_str = v.replace("SKIP_FILE_", ""); + let nums = num_str.parse::(); + match nums { + Ok(n) if n < 1 => Err(ERROR_MODE_MSG), + Ok(n) => Ok(OnErrorMode::SkipFileNum(n)), + Err(_) => Err(ERROR_MODE_MSG), + } + } + } + } + } +} diff --git a/src/query/ast/src/ast/statements/principal.rs b/src/query/ast/src/ast/statements/principal.rs index accfcdcb695f..fe574ebb70c9 100644 --- a/src/query/ast/src/ast/statements/principal.rs +++ b/src/query/ast/src/ast/statements/principal.rs @@ -201,63 +201,3 @@ impl Display for ShareGrantObjectPrivilege { } } } - -#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] -pub struct CopyOptions { - pub on_error: OnErrorMode, - pub size_limit: usize, - pub max_files: usize, - pub split_size: usize, - pub purge: bool, - pub disable_variant_check: bool, - pub return_failed_only: bool, - pub max_file_size: usize, - pub single: bool, - pub detailed_output: bool, -} - -impl Display for CopyOptions { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "OnErrorMode {}", self.on_error)?; - write!(f, "SizeLimit {}", self.size_limit)?; - write!(f, "MaxFiles {}", self.max_files)?; - write!(f, "SplitSize {}", self.split_size)?; - write!(f, "Purge {}", self.purge)?; - write!(f, "DisableVariantCheck {}", self.disable_variant_check)?; - write!(f, "ReturnFailedOnly {}", self.return_failed_only)?; - write!(f, "MaxFileSize {}", self.max_file_size)?; - write!(f, "Single {}", self.single)?; - write!(f, "DetailedOutput {}", self.detailed_output) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] -pub enum OnErrorMode { - Continue, - SkipFileNum(u64), - AbortNum(u64), -} - -impl Display for OnErrorMode { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - match self { - OnErrorMode::Continue => { - write!(f, "continue") - } - OnErrorMode::SkipFileNum(n) => { - if *n <= 1 { - write!(f, "skipfile") - } else { - write!(f, "skipfile_{}", n) - } - } - OnErrorMode::AbortNum(n) => { - if *n <= 1 { - write!(f, "abort") - } else { - write!(f, "abort_{}", n) - } - } - } - } -} diff --git a/src/query/ast/src/ast/statements/stage.rs b/src/query/ast/src/ast/statements/stage.rs index d8017c848198..428d40e4239c 100644 --- a/src/query/ast/src/ast/statements/stage.rs +++ b/src/query/ast/src/ast/statements/stage.rs @@ -35,9 +35,6 @@ pub struct CreateStageStmt { pub location: Option, pub file_format_options: FileFormatOptions, - pub on_error: String, - pub size_limit: usize, - pub validation_mode: String, pub comments: String, } @@ -61,18 +58,6 @@ impl Display for CreateStageStmt { write!(f, " FILE_FORMAT = ({})", self.file_format_options)?; } - if !self.on_error.is_empty() { - write!(f, " ON_ERROR = '{}'", self.on_error)?; - } - - if self.size_limit != 0 { - write!(f, " SIZE_LIMIT = {}", self.size_limit)?; - } - - if !self.validation_mode.is_empty() { - write!(f, " VALIDATION_MODE = {}", self.validation_mode)?; - } - if !self.comments.is_empty() { write!(f, " COMMENTS = '{}'", self.comments)?; } diff --git a/src/query/ast/src/parser/copy.rs b/src/query/ast/src/parser/copy.rs index e7a601a4eac3..7692e3b92b09 100644 --- a/src/query/ast/src/parser/copy.rs +++ b/src/query/ast/src/parser/copy.rs @@ -41,6 +41,7 @@ use crate::parser::stage::file_location; use crate::parser::statement::hint; use crate::parser::token::TokenKind::COPY; use crate::parser::token::TokenKind::*; +use crate::parser::ErrorKind; use crate::parser::Input; pub fn copy_into_table(i: Input) -> IResult { @@ -51,7 +52,7 @@ pub fn copy_into_table(i: Input) -> IResult { }), )); - map( + map_res( rule! { #with? ~ COPY ~ #hint? @@ -69,20 +70,15 @@ pub fn copy_into_table(i: Input) -> IResult { files: Default::default(), pattern: Default::default(), file_format: Default::default(), - validation_mode: Default::default(), - size_limit: Default::default(), - max_files: Default::default(), - split_size: Default::default(), - purge: Default::default(), - force: Default::default(), - disable_variant_check: Default::default(), - on_error: "abort".to_string(), - return_failed_only: Default::default(), + + options: Default::default(), }; for opt in opts { - copy_stmt.apply_option(opt); + copy_stmt + .apply_option(opt) + .map_err(|e| nom::Err::Failure(ErrorKind::Other(e)))?; } - Statement::CopyIntoTable(copy_stmt) + Ok(Statement::CopyIntoTable(copy_stmt)) }, )(i) } @@ -157,10 +153,6 @@ fn copy_into_table_option(i: Input) -> IResult { map(rule! { #file_format_clause }, |options| { CopyIntoTableOption::FileFormat(options) }), - map( - rule! { VALIDATION_MODE ~ "=" ~ #literal_string }, - |(_, _, validation_mode)| CopyIntoTableOption::ValidationMode(validation_mode), - ), map( rule! { SIZE_LIMIT ~ "=" ~ #literal_u64 }, |(_, _, size_limit)| CopyIntoTableOption::SizeLimit(size_limit as usize), diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 1705f94753fe..b92fabeb0516 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1557,9 +1557,6 @@ pub fn statement_body(i: Input) -> IResult { ~ ( #stage_name ) ~ ( (URL ~ ^"=")? ~ #uri_location )? ~ ( #file_format_clause )? - ~ ( ON_ERROR ~ ^"=" ~ ^#ident )? - ~ ( SIZE_LIMIT ~ ^"=" ~ ^#literal_u64 )? - ~ ( VALIDATION_MODE ~ ^"=" ~ ^#ident )? ~ ( (COMMENT | COMMENTS) ~ ^"=" ~ ^#literal_string )? }, |( @@ -1570,9 +1567,6 @@ pub fn statement_body(i: Input) -> IResult { stage, url_opt, file_format_opt, - on_error_opt, - size_limit_opt, - validation_mode_opt, comment_opt, )| { let create_option = @@ -1582,11 +1576,6 @@ pub fn statement_body(i: Input) -> IResult { stage_name: stage.to_string(), location: url_opt.map(|(_, location)| location), file_format_options: file_format_opt.unwrap_or_default(), - on_error: on_error_opt.map(|v| v.2.to_string()).unwrap_or_default(), - size_limit: size_limit_opt.map(|v| v.2 as usize).unwrap_or_default(), - validation_mode: validation_mode_opt - .map(|v| v.2.to_string()) - .unwrap_or_default(), comments: comment_opt.map(|v| v.2).unwrap_or_default(), })) }, diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index 657ca0ccf0c7..232cd7210a23 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -1220,8 +1220,6 @@ pub enum TokenKind { VACUUM, #[token("VALUES", ignore(ascii_case))] VALUES, - #[token("VALIDATION_MODE", ignore(ascii_case))] - VALIDATION_MODE, #[token("VARBINARY", ignore(ascii_case))] VARBINARY, #[token("VARCHAR", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/testdata/stmt-error.txt b/src/query/ast/tests/it/testdata/stmt-error.txt index 18ec23cfb979..7a5779795204 100644 --- a/src/query/ast/tests/it/testdata/stmt-error.txt +++ b/src/query/ast/tests/it/testdata/stmt-error.txt @@ -321,7 +321,7 @@ error: --> SQL:1:38 | 1 | COPY INTO mytable FROM 's3://bucket' CONECTION= (); - | ^^^^^^^^^ unexpected `CONECTION`, expecting `CONNECTION`, `ON_ERROR`, `RETURN_FAILED_ONLY`, `FORMAT`, `VALIDATION_MODE`, `FORCE`, `PATTERN`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `MAX_FILES`, `DISABLE_VARIANT_CHECK`, `SPLIT_SIZE`, or `;` + | ^^^^^^^^^ unexpected `CONECTION`, expecting `CONNECTION`, `ON_ERROR`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `PATTERN`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `MAX_FILES`, `DISABLE_VARIANT_CHECK`, `SPLIT_SIZE`, or `;` ---------- Input ---------- @@ -331,7 +331,7 @@ error: --> SQL:1:33 | 1 | COPY INTO mytable FROM @mystage CONNECTION = (); - | ^^^^^^^^^^ unexpected `CONNECTION`, expecting `ON_ERROR`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `VALIDATION_MODE`, `DISABLE_VARIANT_CHECK`, `PATTERN`, `MAX_FILES`, `SPLIT_SIZE`, or `;` + | ^^^^^^^^^^ unexpected `CONNECTION`, expecting `ON_ERROR`, `RETURN_FAILED_ONLY`, `FORMAT`, `FORCE`, `FILES`, `PURGE`, `SIZE_LIMIT`, `FILE_FORMAT`, `DISABLE_VARIANT_CHECK`, `PATTERN`, `MAX_FILES`, `SPLIT_SIZE`, or `;` ---------- Input ---------- diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 8bd0d0d7a129..206022cca94e 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -11232,9 +11232,6 @@ CreateStage( file_format_options: FileFormatOptions { options: {}, }, - on_error: "", - size_limit: 0, - validation_mode: "", comments: "", }, ) @@ -11276,9 +11273,6 @@ CreateStage( ), }, }, - on_error: "", - size_limit: 0, - validation_mode: "", comments: "", }, ) @@ -11320,9 +11314,6 @@ CreateStage( ), }, }, - on_error: "", - size_limit: 0, - validation_mode: "", comments: "", }, ) @@ -11364,9 +11355,6 @@ CreateStage( ), }, }, - on_error: "", - size_limit: 0, - validation_mode: "", comments: "", }, ) @@ -11408,9 +11396,6 @@ CreateStage( ), }, }, - on_error: "", - size_limit: 0, - validation_mode: "", comments: "", }, ) @@ -13898,15 +13883,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -13965,15 +13954,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14041,15 +14034,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 10, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 10, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14117,15 +14114,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 3000, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 3000, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14197,15 +14198,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14277,15 +14282,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14332,15 +14341,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 0, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 0, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14387,15 +14400,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 0, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 0, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14458,15 +14475,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14715,15 +14736,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14782,15 +14807,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14849,15 +14878,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14916,15 +14949,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: true, - validation_mode: "", - size_limit: 0, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 0, + max_files: 0, + split_size: 0, + force: true, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -14992,15 +15029,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 10, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: true, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 10, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: true, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -15062,15 +15103,19 @@ CopyIntoTable( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 0, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 0, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, ) @@ -20887,15 +20932,19 @@ CreatePipe( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 0, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 0, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, }, ) @@ -20948,15 +20997,19 @@ CreatePipe( }, files: None, pattern: None, - force: false, - validation_mode: "", - size_limit: 0, - max_files: 0, - split_size: 0, - purge: false, - disable_variant_check: false, - return_failed_only: false, - on_error: "abort", + options: CopyIntoTableOptions { + on_error: AbortNum( + 1, + ), + size_limit: 0, + max_files: 0, + split_size: 0, + force: false, + purge: false, + disable_variant_check: false, + return_failed_only: false, + validation_mode: "", + }, }, }, ) @@ -22611,9 +22664,6 @@ CreateStage( ), }, }, - on_error: "", - size_limit: 0, - validation_mode: "", comments: "", }, ) diff --git a/src/query/catalog/src/plan/datasource/datasource_info/stage.rs b/src/query/catalog/src/plan/datasource/datasource_info/stage.rs index 06d7219925ab..c48ca6f63444 100644 --- a/src/query/catalog/src/plan/datasource/datasource_info/stage.rs +++ b/src/query/catalog/src/plan/datasource/datasource_info/stage.rs @@ -18,6 +18,7 @@ use std::fmt::Formatter; use std::sync::Arc; use databend_common_ast::ast::CopyIntoLocationOptions; +use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_exception::Result; use databend_common_expression::RemoteExpr; use databend_common_expression::TableSchema; @@ -42,6 +43,7 @@ pub struct StageTableInfo { pub duplicated_files_detected: Vec, pub is_select: bool, pub copy_into_location_options: CopyIntoLocationOptions, + pub copy_into_table_options: CopyIntoTableOptions, } impl StageTableInfo { @@ -95,6 +97,6 @@ impl Display for StageTableInfo { write!(f, "StageParam {}", self.stage_info.stage_params.storage)?; write!(f, "IsTemporary {}", self.stage_info.is_temporary)?; write!(f, "FileFormatParams {}", self.stage_info.file_format_params)?; - write!(f, "CopyOption {}", self.stage_info.copy_options) + Ok(()) } } diff --git a/src/query/service/src/interpreters/interpreter_copy_into_location.rs b/src/query/service/src/interpreters/interpreter_copy_into_location.rs index c771f4d2eac0..7046f7ae3f17 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_location.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_location.rs @@ -112,6 +112,7 @@ impl CopyIntoLocationInterpreter { is_select: false, default_values: None, copy_into_location_options: options.clone(), + copy_into_table_options: Default::default(), }, })); diff --git a/src/query/service/src/interpreters/interpreter_copy_into_table.rs b/src/query/service/src/interpreters/interpreter_copy_into_table.rs index ff1894c9db06..1b2ecd128ebf 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_table.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_table.rs @@ -155,7 +155,6 @@ impl CopyIntoTableInterpreter { required_source_schema: plan.required_source_schema.clone(), stage_table_info: plan.stage_table_info.clone(), table_info: to_table.get_table_info().clone(), - force: plan.force, write_mode: plan.write_mode, validation_mode: plan.validation_mode.clone(), project_columns, @@ -184,8 +183,7 @@ impl CopyIntoTableInterpreter { let return_all = !self .plan .stage_table_info - .stage_info - .copy_options + .copy_into_table_options .return_failed_only; let cs = self.ctx.get_copy_status(); @@ -249,9 +247,8 @@ impl CopyIntoTableInterpreter { let copied_files_meta_req = PipelineBuilder::build_upsert_copied_files_to_meta_req( ctx.clone(), to_table.as_ref(), - &plan.stage_table_info.stage_info, &files_to_copy, - plan.force, + &plan.stage_table_info.copy_into_table_options, )?; to_table.commit_insertion( @@ -282,7 +279,7 @@ impl CopyIntoTableInterpreter { PipelineBuilder::set_purge_files_on_finished( ctx.clone(), files_to_be_deleted, - plan.stage_table_info.stage_info.copy_options.purge, + &plan.stage_table_info.copy_into_table_options, plan.stage_table_info.stage_info.clone(), main_pipeline, )?; @@ -300,7 +297,7 @@ impl CopyIntoTableInterpreter { // unfortunately, hooking the on_finished callback of a "blank" pipeline, // e.g. `PipelineBuildResult::create` leads to runtime error (during pipeline execution). - if self.plan.stage_table_info.stage_info.copy_options.purge + if self.plan.stage_table_info.copy_into_table_options.purge && !self .plan .stage_table_info diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 0aea9d1f2e8a..6018939ae07a 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_catalog::lock::LockTableOption; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; @@ -98,11 +99,11 @@ impl Interpreter for ReplaceInterpreter { .add_lock_guard(self.plan.lock_guard.clone()); // purge - if let Some((files, stage_info)) = purge_info { + if let Some((files, stage_info, options)) = purge_info { PipelineBuilder::set_purge_files_on_finished( self.ctx.clone(), files.into_iter().map(|v| v.path).collect(), - stage_info.copy_options.purge, + &options, stage_info, &mut pipeline.main_pipeline, )?; @@ -128,7 +129,10 @@ impl Interpreter for ReplaceInterpreter { impl ReplaceInterpreter { async fn build_physical_plan( &self, - ) -> Result<(Box, Option<(Vec, StageInfo)>)> { + ) -> Result<( + Box, + Option<(Vec, StageInfo, CopyIntoTableOptions)>, + )> { let plan = &self.plan; let table = self .ctx @@ -373,7 +377,7 @@ impl ReplaceInterpreter { ctx: Arc, source: &'a InsertInputSource, schema: DataSchemaRef, - purge_info: &mut Option<(Vec, StageInfo)>, + purge_info: &mut Option<(Vec, StageInfo, CopyIntoTableOptions)>, ) -> Result { match source { InsertInputSource::Values(source) => self @@ -399,6 +403,7 @@ impl ReplaceInterpreter { *purge_info = Some(( copy_plan.stage_table_info.files_to_copy.unwrap_or_default(), copy_plan.stage_table_info.stage_info.clone(), + copy_plan.stage_table_info.copy_into_table_options.clone(), )); Ok(ReplaceSourceCtx { root: Box::new(physical_plan), diff --git a/src/query/service/src/pipelines/builders/builder_copy_into_table.rs b/src/query/service/src/pipelines/builders/builder_copy_into_table.rs index 361cb573b406..a843cbba728b 100644 --- a/src/query/service/src/pipelines/builders/builder_copy_into_table.rs +++ b/src/query/service/src/pipelines/builders/builder_copy_into_table.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; +use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -26,7 +27,6 @@ use databend_common_expression::DataSchemaRefExt; use databend_common_expression::Scalar; use databend_common_meta_app::principal::FileFormatParams; use databend_common_meta_app::principal::ParquetFileFormatParams; -use databend_common_meta_app::principal::StageInfo; use databend_common_meta_app::schema::TableCopiedFileInfo; use databend_common_meta_app::schema::UpsertTableCopiedFileReq; use databend_common_pipeline_core::Pipeline; @@ -195,9 +195,8 @@ impl PipelineBuilder { pub(crate) fn build_upsert_copied_files_to_meta_req( ctx: Arc, to_table: &dyn Table, - stage_info: &StageInfo, copied_files: &[StageFileInfo], - force: bool, + options: &CopyIntoTableOptions, ) -> Result> { let mut copied_file_tree = BTreeMap::new(); for file in copied_files { @@ -216,7 +215,7 @@ impl PipelineBuilder { let expire_hours = ctx.get_settings().get_load_file_metadata_expire_hours()?; let upsert_copied_files_request = { - if stage_info.copy_options.purge && force { + if options.purge && options.force { // if `purge-after-copy` is enabled, and in `force` copy mode, // we do not need to upsert copied files into meta server info!( @@ -231,7 +230,7 @@ impl PipelineBuilder { let req = UpsertTableCopiedFileReq { file_info: copied_file_tree, ttl: Some(Duration::from_hours(expire_hours)), - insert_if_not_exists: !force, + insert_if_not_exists: !options.force, }; Some(req) } diff --git a/src/query/service/src/pipelines/builders/builder_on_finished.rs b/src/query/service/src/pipelines/builders/builder_on_finished.rs index 1c53f44a0556..a3b69a8a5a4d 100644 --- a/src/query/service/src/pipelines/builders/builder_on_finished.rs +++ b/src/query/service/src/pipelines/builders/builder_on_finished.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Instant; +use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -34,7 +35,7 @@ impl PipelineBuilder { pub fn set_purge_files_on_finished( ctx: Arc, files: Vec, - copy_purge_option: bool, + options: &CopyIntoTableOptions, stage_info: StageInfo, main_pipeline: &mut Pipeline, ) -> Result<()> { @@ -42,11 +43,14 @@ impl PipelineBuilder { let txn_mgr = ctx.txn_mgr(); let mut txn_mgr = txn_mgr.lock(); let is_active = txn_mgr.is_active(); - if is_active && copy_purge_option { + if is_active && options.purge { txn_mgr.add_need_purge_files(stage_info.clone(), files.clone()); } is_active }; + + let on_error_mode = options.on_error.clone(); + let purge = options.purge; // set on_finished callback. main_pipeline.set_on_finished(move |info: &ExecutionInfo| { match &info.res { @@ -58,7 +62,7 @@ impl PipelineBuilder { for (file_name, e) in error_map { error!( "copy(on_error={}): file {} encounter error {},", - stage_info.copy_options.on_error, + on_error_mode, file_name, e.to_string() ); @@ -67,7 +71,7 @@ impl PipelineBuilder { // 2. Try to purge copied files if purge option is true, if error will skip. // If a file is already copied(status with AlreadyCopied) we will try to purge them. - if !is_active && copy_purge_option { + if !is_active && purge { Self::try_purge_files(ctx.clone(), &stage_info, &files).await; } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 9cd8430de768..85df6db7987e 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1368,6 +1368,7 @@ impl TableContext for QueryContext { is_select: true, default_values: None, copy_into_location_options: Default::default(), + copy_into_table_options: Default::default(), }; OrcTable::try_create(info).await } @@ -1385,6 +1386,7 @@ impl TableContext for QueryContext { is_select: true, default_values: None, copy_into_location_options: Default::default(), + copy_into_table_options: Default::default(), }; StageTable::try_create(info) } @@ -1420,6 +1422,7 @@ impl TableContext for QueryContext { is_select: true, default_values: None, copy_into_location_options: Default::default(), + copy_into_table_options: Default::default(), }; StageTable::try_create(info) } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 145057beb63e..429a30cc4970 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -237,7 +237,7 @@ impl QueryContextShared { } pub fn get_on_error_mode(&self) -> Option { - self.on_error_mode.read().clone() + *self.on_error_mode.read() } pub fn set_on_error_mode(&self, mode: OnErrorMode) { diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index b18e7992fae0..745b091b3c1a 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -81,7 +81,6 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'constraint_catalog' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'constraint_name' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'constraint_schema' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | -| 'copy_options' | 'system' | 'stages' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'cpu_usage' | 'system' | 'query_log' | 'UInt32' | 'INT UNSIGNED' | '' | '' | 'NO' | '' | | 'create_time' | 'information_schema' | 'tables' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'background_jobs' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | diff --git a/src/query/sql/src/executor/physical_plans/physical_copy_into_table.rs b/src/query/sql/src/executor/physical_plans/physical_copy_into_table.rs index eb8ebb1e4e4e..1fd92f5c3ce5 100644 --- a/src/query/sql/src/executor/physical_plans/physical_copy_into_table.rs +++ b/src/query/sql/src/executor/physical_plans/physical_copy_into_table.rs @@ -34,7 +34,6 @@ pub struct CopyIntoTable { pub required_source_schema: DataSchemaRef, pub write_mode: CopyIntoTableMode, pub validation_mode: ValidationMode, - pub force: bool, pub stage_table_info: StageTableInfo, pub table_info: TableInfo, diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 84adfb0b719d..de9dd5c660e7 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_ast::ast::ColumnID as AstColumnID; use databend_common_ast::ast::ColumnRef; +use databend_common_ast::ast::CopyIntoTableOptions; use databend_common_ast::ast::CopyIntoTableSource; use databend_common_ast::ast::CopyIntoTableStmt; use databend_common_ast::ast::Expr; @@ -52,7 +53,6 @@ use databend_common_meta_app::principal::EmptyFieldAs; use databend_common_meta_app::principal::FileFormatOptionsReader; use databend_common_meta_app::principal::FileFormatParams; use databend_common_meta_app::principal::NullAs; -use databend_common_meta_app::principal::OnErrorMode; use databend_common_meta_app::principal::StageInfo; use databend_common_meta_app::principal::COPY_MAX_FILES_PER_COMMIT; use databend_common_storage::StageFilesInfo; @@ -150,12 +150,22 @@ impl<'a> Binder { .get_table(&catalog_name, &database_name, &table_name) .await?; - let validation_mode = ValidationMode::from_str(stmt.validation_mode.as_str()) + let validation_mode = ValidationMode::from_str(stmt.options.validation_mode.as_str()) .map_err(ErrorCode::SyntaxException)?; let (mut stage_info, path) = resolve_file_location(self.ctx.as_ref(), location).await?; - self.apply_copy_into_table_options(stmt, &mut stage_info) - .await?; + if !stmt.file_format.is_empty() { + stage_info.file_format_params = self.try_resolve_file_format(&stmt.file_format).await?; + } + + if !(stmt.options.purge && stmt.options.force) + && stmt.options.max_files > COPY_MAX_FILES_PER_COMMIT + { + return Err(ErrorCode::InvalidArgument(format!( + "max_files {} is too large, max_files should be less than {COPY_MAX_FILES_PER_COMMIT}", + stmt.options.max_files + ))); + } let pattern = match &stmt.pattern { None => None, Some(pattern) => Some(Self::resolve_copy_pattern(self.ctx.clone(), pattern)?), @@ -193,7 +203,6 @@ impl<'a> Binder { is_transform, no_file_to_copy: false, from_attachment: false, - force: stmt.force, stage_table_info: StageTableInfo { schema: stage_schema, files_info, @@ -203,13 +212,13 @@ impl<'a> Binder { is_select: false, default_values, copy_into_location_options: Default::default(), + copy_into_table_options: stmt.options.clone(), }, values_consts: vec![], required_source_schema: required_values_schema.clone(), required_values_schema: required_values_schema.clone(), write_mode: CopyIntoTableMode::Copy, query: None, - enable_distributed: false, }) } @@ -266,7 +275,7 @@ impl<'a> Binder { pub(crate) async fn bind_attachment( &mut self, attachment: StageAttachment, - ) -> Result<(StageInfo, StageFilesInfo)> { + ) -> Result<(StageInfo, StageFilesInfo, CopyIntoTableOptions)> { let (mut stage_info, path) = resolve_stage_location(self.ctx.as_ref(), &attachment.location[1..]).await?; @@ -287,16 +296,18 @@ impl<'a> Binder { } stage_info.file_format_params = params; } + let mut copy_options = CopyIntoTableOptions::default(); if let Some(ref options) = attachment.copy_options { - stage_info.copy_options.apply(options, true)?; + copy_options.apply(options, true)?; } + copy_options.force = true; let files_info = StageFilesInfo { path, files: None, pattern: None, }; - Ok((stage_info, files_info)) + Ok((stage_info, files_info, copy_options)) } /// Bind COPY INFO FROM @@ -326,7 +337,7 @@ impl<'a> Binder { let thread_num = self.ctx.get_settings().get_max_threads()? as usize; - let (stage_info, files_info) = self.bind_attachment(attachment).await?; + let (stage_info, files_info, options) = self.bind_attachment(attachment).await?; // list the files to be copied in binding phase // note that, this method(`bind_copy_from_attachment`) are used by @@ -353,7 +364,6 @@ impl<'a> Binder { required_source_schema: data_schema.clone(), required_values_schema, values_consts: const_columns, - force: true, stage_table_info: StageTableInfo { schema: stage_schema, files_info, @@ -363,6 +373,7 @@ impl<'a> Binder { is_select: false, default_values: Some(default_values), copy_into_location_options: Default::default(), + copy_into_table_options: options, }, write_mode, query: None, @@ -441,8 +452,7 @@ impl<'a> Binder { // disable variant check to allow copy invalid JSON into tables let disable_variant_check = plan .stage_table_info - .stage_info - .copy_options + .copy_into_table_options .disable_variant_check; if disable_variant_check { let hints = Hint { @@ -474,44 +484,6 @@ impl<'a> Binder { Ok(Plan::CopyIntoTable(Box::new(plan))) } - #[async_backtrace::framed] - pub async fn apply_copy_into_table_options( - &mut self, - stmt: &CopyIntoTableStmt, - stage: &mut StageInfo, - ) -> Result<()> { - if !stmt.file_format.is_empty() { - stage.file_format_params = self.try_resolve_file_format(&stmt.file_format).await?; - } - - stage.copy_options.on_error = - OnErrorMode::from_str(&stmt.on_error).map_err(ErrorCode::SyntaxException)?; - - if stmt.size_limit != 0 { - stage.copy_options.size_limit = stmt.size_limit; - } - - stage.copy_options.split_size = stmt.split_size; - stage.copy_options.purge = stmt.purge; - stage.copy_options.disable_variant_check = stmt.disable_variant_check; - stage.copy_options.return_failed_only = stmt.return_failed_only; - - if stmt.max_files != 0 { - stage.copy_options.max_files = stmt.max_files; - } - - if !(stage.copy_options.purge && stmt.force) - && stage.copy_options.max_files > COPY_MAX_FILES_PER_COMMIT - { - return Err(ErrorCode::InvalidArgument(format!( - "max_files {} is too large, max_files should be less than {COPY_MAX_FILES_PER_COMMIT}", - stage.copy_options.max_files - ))); - } - - Ok(()) - } - #[async_backtrace::framed] pub(crate) async fn prepared_values( &self, diff --git a/src/query/sql/src/planner/binder/ddl/stage.rs b/src/query/sql/src/planner/binder/ddl/stage.rs index 2036ba4f8c2e..9e61e5904493 100644 --- a/src/query/sql/src/planner/binder/ddl/stage.rs +++ b/src/query/sql/src/planner/binder/ddl/stage.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::str::FromStr; - use databend_common_ast::ast::CreateStageStmt; use databend_common_ast::ast::FileFormatOptions; use databend_common_ast::ast::UriLocation; @@ -21,7 +19,6 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::FileFormatOptionsReader; use databend_common_meta_app::principal::FileFormatParams; -use databend_common_meta_app::principal::OnErrorMode; use databend_common_meta_app::principal::StageInfo; use databend_common_storage::init_operator; @@ -59,9 +56,6 @@ impl Binder { stage_name, location, file_format_options, - on_error, - size_limit, - validation_mode: _, comments: _, } = stmt; @@ -103,16 +97,6 @@ impl Binder { stage_info.file_format_params = self.try_resolve_file_format(file_format_options).await?; } - // Copy options. - { - // on_error. - if !on_error.is_empty() { - stage_info.copy_options.on_error = - OnErrorMode::from_str(on_error).map_err(ErrorCode::SyntaxException)?; - } - - stage_info.copy_options.size_limit = *size_limit; - } Ok(Plan::CreateStage(Box::new(CreateStagePlan { create_option: create_option.clone().into(), diff --git a/src/query/sql/src/planner/plans/copy_into_table.rs b/src/query/sql/src/planner/plans/copy_into_table.rs index d11050411cfd..75f30f612dce 100644 --- a/src/query/sql/src/planner/plans/copy_into_table.rs +++ b/src/query/sql/src/planner/plans/copy_into_table.rs @@ -130,7 +130,6 @@ pub struct CopyIntoTablePlan { pub write_mode: CopyIntoTableMode, pub validation_mode: ValidationMode, - pub force: bool, pub stage_table_info: StageTableInfo, pub query: Option>, @@ -146,7 +145,7 @@ impl CopyIntoTablePlan { let start = Instant::now(); let stage_table_info = &self.stage_table_info; - let max_files = stage_table_info.stage_info.copy_options.max_files; + let max_files = stage_table_info.copy_into_table_options.max_files; let max_files = if max_files == 0 { None } else { @@ -155,15 +154,16 @@ impl CopyIntoTablePlan { let thread_num = ctx.get_settings().get_max_threads()? as usize; let operator = init_stage_operator(&stage_table_info.stage_info)?; + let options = &stage_table_info.copy_into_table_options; let all_source_file_infos = if operator.info().native_capability().blocking { - if self.force { + if options.force { stage_table_info .files_info .blocking_list(&operator, max_files) } else { stage_table_info.files_info.blocking_list(&operator, None) } - } else if self.force { + } else if options.force { stage_table_info .files_info .list(&operator, thread_num, max_files) @@ -187,10 +187,8 @@ impl CopyIntoTablePlan { start.elapsed() )); - let (need_copy_file_infos, duplicated) = if self.force { - if !self.stage_table_info.stage_info.copy_options.purge - && all_source_file_infos.len() > COPY_MAX_FILES_PER_COMMIT - { + let (need_copy_file_infos, duplicated) = if options.force { + if !options.purge && all_source_file_infos.len() > COPY_MAX_FILES_PER_COMMIT { return Err(ErrorCode::Internal(COPY_MAX_FILES_COMMIT_MSG)); } info!( @@ -261,7 +259,6 @@ impl Debug for CopyIntoTablePlan { table_name, no_file_to_copy, validation_mode, - force, stage_table_info, query, .. @@ -274,8 +271,6 @@ impl Debug for CopyIntoTablePlan { write!(f, ", no_file_to_copy: {no_file_to_copy:?}")?; write!(f, ", validation_mode: {validation_mode:?}")?; write!(f, ", from: {stage_table_info:?}")?; - write!(f, " force: {force}")?; - write!(f, " is_from: {force}")?; write!(f, " query: {query:?}")?; Ok(()) } diff --git a/src/query/sql/src/planner/plans/insert.rs b/src/query/sql/src/planner/plans/insert.rs index f20a2c0830ee..7685f4c6c382 100644 --- a/src/query/sql/src/planner/plans/insert.rs +++ b/src/query/sql/src/planner/plans/insert.rs @@ -163,7 +163,6 @@ pub(crate) fn format_insert_source( required_source_schema, write_mode, validation_mode, - force, stage_table_info, enable_distributed, .. @@ -191,7 +190,6 @@ pub(crate) fn format_insert_source( )), FormatTreeNode::new(format!("write_mode: {write_mode}")), FormatTreeNode::new(format!("validation_mode: {validation_mode}")), - FormatTreeNode::new(format!("force: {force}")), FormatTreeNode::new(format!("stage_table_info: {stage_table_info}")), FormatTreeNode::new(format!("enable_distributed: {enable_distributed}")), ]; diff --git a/src/query/storages/stage/Cargo.toml b/src/query/storages/stage/Cargo.toml index 6988e16c3afb..e8f6e7044f1e 100644 --- a/src/query/storages/stage/Cargo.toml +++ b/src/query/storages/stage/Cargo.toml @@ -16,6 +16,7 @@ async-backtrace = { workspace = true } async-trait = { workspace = true } bstr = { workspace = true } csv-core = { workspace = true } +databend-common-ast = { workspace = true } databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-compress = { workspace = true } diff --git a/src/query/storages/stage/src/read/error_handler.rs b/src/query/storages/stage/src/read/error_handler.rs index f56453f96a39..3f9e22b3851a 100644 --- a/src/query/storages/stage/src/read/error_handler.rs +++ b/src/query/storages/stage/src/read/error_handler.rs @@ -15,9 +15,9 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +use databend_common_ast::ast::OnErrorMode; use databend_common_exception::Result; use databend_common_expression::ColumnBuilder; -use databend_common_meta_app::principal::OnErrorMode; use databend_common_storage::FileParseError; use databend_common_storage::FileStatus; diff --git a/src/query/storages/stage/src/read/load_context.rs b/src/query/storages/stage/src/read/load_context.rs index 2f82bdd2c982..8088c771b466 100644 --- a/src/query/storages/stage/src/read/load_context.rs +++ b/src/query/storages/stage/src/read/load_context.rs @@ -57,14 +57,15 @@ impl LoadContext { pos_projection: Option>, block_compact_thresholds: BlockThresholds, ) -> Result { - let copy_options = &stage_table_info.stage_info.copy_options; let settings = ctx.get_settings(); let func_ctx = ctx.get_function_context()?; let is_select = stage_table_info.is_select; let mut file_format_options_ext = FileFormatOptionsExt::create_from_settings(&settings, is_select)?; - file_format_options_ext.disable_variant_check = copy_options.disable_variant_check; - let on_error_mode = copy_options.on_error.clone(); + file_format_options_ext.disable_variant_check = stage_table_info + .copy_into_table_options + .disable_variant_check; + let on_error_mode = stage_table_info.copy_into_table_options.on_error.clone(); let fields = stage_table_info .schema .fields() diff --git a/src/query/storages/system/src/stages_table.rs b/src/query/storages/system/src/stages_table.rs index 13a03d1ed9fa..d597a634db31 100644 --- a/src/query/storages/system/src/stages_table.rs +++ b/src/query/storages/system/src/stages_table.rs @@ -79,7 +79,6 @@ impl AsyncSystemTable for StagesTable { let mut name: Vec = Vec::with_capacity(stages.len()); let mut stage_type: Vec = Vec::with_capacity(stages.len()); let mut stage_params: Vec = Vec::with_capacity(stages.len()); - let mut copy_options: Vec = Vec::with_capacity(stages.len()); let mut file_format_options: Vec = Vec::with_capacity(stages.len()); let mut comment: Vec = Vec::with_capacity(stages.len()); let mut number_of_files: Vec> = Vec::with_capacity(stages.len()); @@ -97,7 +96,6 @@ impl AsyncSystemTable for StagesTable { ); stage_type.push(stage.stage_type.clone().to_string()); stage_params.push(format!("{:?}", stage.stage_params)); - copy_options.push(format!("{:?}", stage.copy_options)); file_format_options.push(format!("{:?}", stage.file_format_params)); // TODO(xuanwo): we will remove this line. match stage.stage_type { @@ -117,7 +115,6 @@ impl AsyncSystemTable for StagesTable { StringType::from_data(name), StringType::from_data(stage_type), StringType::from_data(stage_params), - StringType::from_data(copy_options), StringType::from_data(file_format_options), UInt64Type::from_opt_data(number_of_files), StringType::from_opt_data(creator), @@ -134,7 +131,6 @@ impl StagesTable { TableField::new("name", TableDataType::String), TableField::new("stage_type", TableDataType::String), TableField::new("stage_params", TableDataType::String), - TableField::new("copy_options", TableDataType::String), TableField::new("file_format_options", TableDataType::String), // NULL for external stage TableField::new(