Skip to content

Commit

Permalink
Merge branch 'main' into split2
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 authored Nov 26, 2024
2 parents 3a7fccf + 04df094 commit 7056d31
Show file tree
Hide file tree
Showing 36 changed files with 584 additions and 582 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 31 additions & 44 deletions src/common/column/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,20 @@ pub struct Buffer<T> {
/// the internal byte buffer.
data: Arc<Bytes<T>>,

/// The offset into the buffer.
offset: usize,
/// Pointer into `data` valid
///
/// We store a pointer instead of an offset to avoid pointer arithmetic
/// which causes LLVM to fail to vectorise code correctly
ptr: *const T,

// the length of the buffer. Given a region `data` of N bytes, [offset..offset+length] is visible
// to this buffer.
length: usize,
}

unsafe impl<T: Send> Send for Buffer<T> {}
unsafe impl<T: Sync> Sync for Buffer<T> {}

impl<T: PartialEq> PartialEq for Buffer<T> {
#[inline]
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -101,9 +107,10 @@ impl<T> Buffer<T> {
/// Auxiliary method to create a new Buffer
pub(crate) fn from_bytes(bytes: Bytes<T>) -> Self {
let length = bytes.len();
let ptr = bytes.as_ptr();
Buffer {
data: Arc::new(bytes),
offset: 0,
ptr,
length,
}
}
Expand All @@ -130,24 +137,7 @@ impl<T> Buffer<T> {
/// Returns the byte slice stored in this buffer
#[inline]
pub fn as_slice(&self) -> &[T] {
// Safety:
// invariant of this struct `offset + length <= data.len()`
debug_assert!(self.offset + self.length <= self.data.len());
unsafe {
self.data
.get_unchecked(self.offset..self.offset + self.length)
}
}

/// Returns the byte slice stored in this buffer
/// # Safety
/// `index` must be smaller than `len`
#[inline]
pub(super) unsafe fn get_unchecked(&self, index: usize) -> &T {
// Safety:
// invariant of this function
debug_assert!(index < self.length);
unsafe { self.data.get_unchecked(self.offset + index) }
self
}

/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
Expand Down Expand Up @@ -193,20 +183,20 @@ impl<T> Buffer<T> {
/// The caller must ensure `offset + length <= self.len()`
#[inline]
pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
self.offset += offset;
self.ptr = self.ptr.add(offset);
self.length = length;
}

/// Returns a pointer to the start of this buffer.
#[inline]
pub(crate) fn data_ptr(&self) -> *const T {
self.data.deref().as_ptr()
self.data.as_ptr()
}

/// Returns the offset of this buffer.
#[inline]
pub fn offset(&self) -> usize {
self.offset
unsafe { self.ptr.offset_from(self.data_ptr()) as usize }
}

/// # Safety
Expand Down Expand Up @@ -253,10 +243,11 @@ impl<T> Buffer<T> {
/// * has not been imported from the c data interface (FFI)
#[inline]
pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
let offset = self.offset();
Arc::get_mut(&mut self.data)
.and_then(|b| b.get_vec())
// Safety: the invariant of this struct
.map(|x| unsafe { x.get_unchecked_mut(self.offset..self.offset + self.length) })
.map(|x| unsafe { x.get_unchecked_mut(offset..offset + self.length) })
}

/// Get the strong count of underlying `Arc` data buffer.
Expand All @@ -269,28 +260,14 @@ impl<T> Buffer<T> {
Arc::weak_count(&self.data)
}

/// Returns its internal representation
#[must_use]
pub fn into_inner(self) -> (Arc<Bytes<T>>, usize, usize) {
let Self {
data,
offset,
length,
} = self;
(data, offset, length)
}

/// Creates a `[Bitmap]` from its internal representation.
/// This is the inverted from `[Bitmap::into_inner]`
///
/// # Safety
/// Callers must ensure all invariants of this struct are upheld.
pub unsafe fn from_inner_unchecked(data: Arc<Bytes<T>>, offset: usize, length: usize) -> Self {
Self {
data,
offset,
length,
}
let ptr = data.as_ptr().add(offset);
Self { data, ptr, length }
}
}

Expand All @@ -313,8 +290,9 @@ impl<T> From<Vec<T>> for Buffer<T> {
#[inline]
fn from(p: Vec<T>) -> Self {
let bytes: Bytes<T> = p.into();
let ptr = bytes.as_ptr();
Self {
offset: 0,
ptr,
length: bytes.len(),
data: Arc::new(bytes),
}
Expand All @@ -326,7 +304,15 @@ impl<T> std::ops::Deref for Buffer<T> {

#[inline]
fn deref(&self) -> &[T] {
self.as_slice()
debug_assert!(self.offset() + self.length <= self.data.len());
unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
}
}

impl<T> AsRef<[T]> for Buffer<T> {
#[inline]
fn as_ref(&self) -> &[T] {
self
}
}

Expand Down Expand Up @@ -375,8 +361,9 @@ impl<T: crate::types::NativeType> From<arrow_buffer::Buffer> for Buffer<T> {

impl<T: crate::types::NativeType> From<Buffer<T>> for arrow_buffer::Buffer {
fn from(value: Buffer<T>) -> Self {
let offset = value.offset();
crate::buffer::to_buffer(value.data).slice_with_length(
value.offset * std::mem::size_of::<T>(),
offset * std::mem::size_of::<T>(),
value.length * std::mem::size_of::<T>(),
)
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/column/tests/it/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ fn from_slice() {
let buffer = Buffer::<i32>::from(vec![0, 1, 2]);
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.as_slice(), &[0, 1, 2]);

assert_eq!(unsafe { *buffer.get_unchecked(1) }, 1);
assert_eq!(unsafe { *buffer.get_unchecked(2) }, 2);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ arrow-schema = { workspace = true }
async-backtrace = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true, features = ["serde"] }
databend-common-ast = { workspace = true }
databend-common-auth = { workspace = true }
databend-common-base = { workspace = true }
databend-common-exception = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/common/storage/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use databend_common_ast::ast::OnErrorMode;
use databend_common_exception::ErrorCode;
use databend_common_meta_app::principal::OnErrorMode;
use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;
Expand Down
91 changes: 1 addition & 90 deletions src/meta/app/src/principal/user_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ use crate::storage::StorageParams;
// internalStageParams
// directoryTableParams
// [ FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] ) } ]
// [ COPY_OPTIONS = ( copyOptions ) ]
// [ COMMENT = '<string_literal>' ]
//
// -- External stage
// CREATE [ OR REPLACE ] [ TEMPORARY ] STAGE [ IF NOT EXISTS ] <external_stage_name>
// externalStageParams
// directoryTableParams
// [ FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] ) } ]
// [ COPY_OPTIONS = ( copyOptions ) ]
// [ COMMENT = '<string_literal>' ]
//
//
Expand All @@ -52,7 +50,6 @@ use crate::storage::StorageParams;
// 's3://<bucket>[/<path>/]'
// [ { CREDENTIALS = ( { { AWS_KEY_ID = '<string>' AWS_SECRET_KEY = '<string>' [ AWS_TOKEN = '<string>' ] } | AWS_ROLE = '<string>' } ) ) } ]
//
// copyOptions ::=
// ON_ERROR = { CONTINUE | SKIP_FILE | SKIP_FILE_<num> | SKIP_FILE_<num>% | ABORT_STATEMENT }
// SIZE_LIMIT = <num>

Expand Down Expand Up @@ -403,7 +400,7 @@ pub struct StageParams {
pub storage: StorageParams,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Copy)]
pub enum OnErrorMode {
Continue,
SkipFileNum(u64),
Expand Down Expand Up @@ -509,92 +506,6 @@ pub struct CopyOptions {
pub detailed_output: bool,
}

impl CopyOptions {
pub fn apply(&mut self, opts: &BTreeMap<String, String>, ignore_unknown: bool) -> Result<()> {
if opts.is_empty() {
return Ok(());
}
for (k, v) in opts.iter() {
match k.as_str() {
"on_error" => {
let on_error = OnErrorMode::from_str(v)?;
self.on_error = on_error;
}
"size_limit" => {
let size_limit = usize::from_str(v)?;
self.size_limit = size_limit;
}
"max_files" => {
let max_files = usize::from_str(v)?;
self.max_files = max_files;
}
"split_size" => {
let split_size = usize::from_str(v)?;
self.split_size = split_size;
}
"purge" => {
let purge = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!("Cannot parse purge: {} as bool", v))
})?;
self.purge = purge;
}
"single" => {
let single = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!("Cannot parse single: {} as bool", v))
})?;
self.single = single;
}
"max_file_size" => {
let max_file_size = usize::from_str(v)?;
self.max_file_size = max_file_size;
}
"disable_variant_check" => {
let disable_variant_check = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!(
"Cannot parse disable_variant_check: {} as bool",
v
))
})?;
self.disable_variant_check = disable_variant_check;
}
"return_failed_only" => {
let return_failed_only = bool::from_str(v).map_err(|_| {
ErrorCode::StrParseError(format!(
"Cannot parse return_failed_only: {} as bool",
v
))
})?;
self.return_failed_only = return_failed_only;
}
_ => {
if !ignore_unknown {
return Err(ErrorCode::BadArguments(format!(
"Unknown stage copy option {}",
k
)));
}
}
}
}
Ok(())
}
}

impl Display for CopyOptions {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "OnErrorMode {}", self.on_error)?;
write!(f, "SizeLimit {}", self.size_limit)?;
write!(f, "MaxFiles {}", self.max_files)?;
write!(f, "SplitSize {}", self.split_size)?;
write!(f, "Purge {}", self.purge)?;
write!(f, "DisableVariantCheck {}", self.disable_variant_check)?;
write!(f, "ReturnFailedOnly {}", self.return_failed_only)?;
write!(f, "MaxFileSize {}", self.max_file_size)?;
write!(f, "Single {}", self.single)?;
write!(f, "DetailedOutput {}", self.detailed_output)
}
}

#[derive(serde::Serialize, serde::Deserialize, Default, Clone, Debug, Eq, PartialEq)]
#[serde(default)]
pub struct StageInfo {
Expand Down
19 changes: 11 additions & 8 deletions src/query/ast/src/ast/format/syntax/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,36 +188,39 @@ pub(crate) fn pretty_copy_into_table(copy_stmt: CopyIntoTableStmt) -> RcDoc<'sta
} else {
RcDoc::nil()
})
.append(if !copy_stmt.validation_mode.is_empty() {
.append(if !copy_stmt.options.validation_mode.is_empty() {
RcDoc::line()
.append(RcDoc::text("VALIDATION_MODE = "))
.append(RcDoc::text(copy_stmt.validation_mode))
.append(RcDoc::text(copy_stmt.options.validation_mode))
} else {
RcDoc::nil()
})
.append(if copy_stmt.size_limit != 0 {
.append(if copy_stmt.options.size_limit != 0 {
RcDoc::line()
.append(RcDoc::text("SIZE_LIMIT = "))
.append(RcDoc::text(format!("{}", copy_stmt.size_limit)))
.append(RcDoc::text(format!("{}", copy_stmt.options.size_limit)))
} else {
RcDoc::nil()
})
.append(if copy_stmt.max_files != 0 {
.append(if copy_stmt.options.max_files != 0 {
RcDoc::line()
.append(RcDoc::text("MAX_FILES = "))
.append(RcDoc::text(format!("{}", copy_stmt.max_files)))
.append(RcDoc::text(format!("{}", copy_stmt.options.max_files)))
} else {
RcDoc::nil()
})
.append(
RcDoc::line()
.append(RcDoc::text("PURGE = "))
.append(RcDoc::text(format!("{}", copy_stmt.purge))),
.append(RcDoc::text(format!("{}", copy_stmt.options.purge))),
)
.append(
RcDoc::line()
.append(RcDoc::text("DISABLE_VARIANT_CHECK = "))
.append(RcDoc::text(format!("{}", copy_stmt.disable_variant_check))),
.append(RcDoc::text(format!(
"{}",
copy_stmt.options.disable_variant_check
))),
)
}

Expand Down
Loading

0 comments on commit 7056d31

Please sign in to comment.