Skip to content

Commit

Permalink
feat: disable specifying copy options when create stage. (#16925)
Browse files Browse the repository at this point in the history
feat disable specifying copy options when create stage.
  • Loading branch information
youngsofun authored Nov 25, 2024
1 parent cf150d9 commit 682039f
Show file tree
Hide file tree
Showing 31 changed files with 502 additions and 524 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true, features = ["serde"] }
databend-common-ast = { workspace = true }
databend-common-auth = { workspace = true }
databend-common-base = { workspace = true }
databend-common-exception = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/common/storage/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use databend_common_ast::ast::OnErrorMode;
use databend_common_exception::ErrorCode;
use databend_common_meta_app::principal::OnErrorMode;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;
Expand Down
91 changes: 1 addition & 90 deletions src/meta/app/src/principal/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ use crate::storage::StorageParams;
// internalStageParams
// directoryTableParams
// [ FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] ) } ]
// [ COPY_OPTIONS = ( copyOptions ) ]
// [ COMMENT = '<string_literal>' ]
//
// -- External stage
// CREATE [ OR REPLACE ] [ TEMPORARY ] STAGE [ IF NOT EXISTS ] <external_stage_name>
// externalStageParams
// directoryTableParams
// [ FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] ) } ]
// [ COPY_OPTIONS = ( copyOptions ) ]
// [ COMMENT = '<string_literal>' ]
//
//
Expand All @@ -52,7 +50,6 @@ use crate::storage::StorageParams;
// 's3://<bucket>[/<path>/]'
// [ { CREDENTIALS = ( { { AWS_KEY_ID = '<string>' AWS_SECRET_KEY = '<string>' [ AWS_TOKEN = '<string>' ] } | AWS_ROLE = '<string>' } ) ) } ]
//
// copyOptions ::=
// ON_ERROR = { CONTINUE | SKIP_FILE | SKIP_FILE_<num> | SKIP_FILE_<num>% | ABORT_STATEMENT }
// SIZE_LIMIT = <num>

Expand Down Expand Up @@ -403,7 +400,7 @@ pub struct StageParams {
pub storage: StorageParams,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Copy)]
pub enum OnErrorMode {
Continue,
SkipFileNum(u64),
Expand Down Expand Up @@ -509,92 +506,6 @@ pub struct CopyOptions {
pub detailed_output: bool,
}

impl CopyOptions {
pub fn apply(&mut self, opts: &BTreeMap<String, String>, ignore_unknown: bool) -> Result<()> {
if opts.is_empty() {
return Ok(());
}
for (k, v) in opts.iter() {
match k.as_str() {
"on_error" => {
let on_error = OnErrorMode::from_str(v)?;
self.on_error = on_error;
}
"size_limit" => {
let size_limit = usize::from_str(v)?;
self.size_limit = size_limit;
}
"max_files" => {
let max_files = usize::from_str(v)?;
self.max_files = max_files;
}
"split_size" => {
let split_size = usize::from_str(v)?;
self.split_size = split_size;
}
"purge" => {
let purge = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v))
})?;
self.purge = purge;
}
"single" => {
let single = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v))
})?;
self.single = single;
}
"max_file_size" => {
let max_file_size = usize::from_str(v)?;
self.max_file_size = max_file_size;
}
"disable_variant_check" => {
let disable_variant_check = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!(
"Cannot parse disable_variant_check: {} as bool",
v
))
})?;
self.disable_variant_check = disable_variant_check;
}
"return_failed_only" => {
let return_failed_only = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!(
"Cannot parse return_failed_only: {} as bool",
v
))
})?;
self.return_failed_only = return_failed_only;
}
_ => {
if !ignore_unknown {
return Err(ErrorCode::BadArguments(format!(
"Unknown stage copy option {}",
k
)));
}
}
}
}
Ok(())
}
}

impl Display for CopyOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "OnErrorMode {}", self.on_error)?;
write!(f, "SizeLimit {}", self.size_limit)?;
write!(f, "MaxFiles {}", self.max_files)?;
write!(f, "SplitSize {}", self.split_size)?;
write!(f, "Purge {}", self.purge)?;
write!(f, "DisableVariantCheck {}", self.disable_variant_check)?;
write!(f, "ReturnFailedOnly {}", self.return_failed_only)?;
write!(f, "MaxFileSize {}", self.max_file_size)?;
write!(f, "Single {}", self.single)?;
write!(f, "DetailedOutput {}", self.detailed_output)
}
}

#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)]
#[serde(default)]
pub struct StageInfo {
Expand Down
19 changes: 11 additions & 8 deletions src/query/ast/src/ast/format/syntax/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,36 +188,39 @@ pub(crate) fn pretty_copy_into_table(copy_stmt: CopyIntoTableStmt) -> RcDoc<'sta
} else {
RcDoc::nil()
})
.append(if !copy_stmt.validation_mode.is_empty() {
.append(if !copy_stmt.options.validation_mode.is_empty() {
RcDoc::line()
.append(RcDoc::text("VALIDATION_MODE = "))
.append(RcDoc::text(copy_stmt.validation_mode))
.append(RcDoc::text(copy_stmt.options.validation_mode))
} else {
RcDoc::nil()
})
.append(if copy_stmt.size_limit != 0 {
.append(if copy_stmt.options.size_limit != 0 {
RcDoc::line()
.append(RcDoc::text("SIZE_LIMIT = "))
.append(RcDoc::text(format!("{}", copy_stmt.size_limit)))
.append(RcDoc::text(format!("{}", copy_stmt.options.size_limit)))
} else {
RcDoc::nil()
})
.append(if copy_stmt.max_files != 0 {
.append(if copy_stmt.options.max_files != 0 {
RcDoc::line()
.append(RcDoc::text("MAX_FILES = "))
.append(RcDoc::text(format!("{}", copy_stmt.max_files)))
.append(RcDoc::text(format!("{}", copy_stmt.options.max_files)))
} else {
RcDoc::nil()
})
.append(
RcDoc::line()
.append(RcDoc::text("PURGE = "))
.append(RcDoc::text(format!("{}", copy_stmt.purge))),
.append(RcDoc::text(format!("{}", copy_stmt.options.purge))),
)
.append(
RcDoc::line()
.append(RcDoc::text("DISABLE_VARIANT_CHECK = "))
.append(RcDoc::text(format!("{}", copy_stmt.disable_variant_check))),
.append(RcDoc::text(format!(
"{}",
copy_stmt.options.disable_variant_check
))),
)
}

Expand Down
Loading

0 comments on commit 682039f

Please sign in to comment.