Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into cleanup_roundrobin
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 8, 2024
2 parents d6aad99 + 0e53c6d commit 04215c5
Show file tree
Hide file tree
Showing 57 changed files with 3,653 additions and 1,527 deletions.
15 changes: 7 additions & 8 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=0 -C opt-level=0 -C incremental=false -C codegen-units=256"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"
- name: Verify Working Directory Clean
run: git diff --exit-code

Expand Down Expand Up @@ -310,14 +310,13 @@ jobs:
cd datafusion-cli
cargo test --lib --tests --bins --all-features
env:
# do not produce debug symbols to keep memory usage down
# use higher optimization level to overcome Windows rust slowness for tpc-ds
# and speed builds: https://github.com/apache/arrow-datafusion/issues/8696
# Cargo profile docs https://doc.rust-lang.org/cargo/reference/profiles.html?profile-settings#profile-settings
RUSTFLAGS: "-C debuginfo=0 -C opt-level=1 -C target-feature=+crt-static -C incremental=false -C codegen-units=256"
# Minimize producing debug symbols to keep memory usage down
# Set debuginfo=line-tables-only as debuginfo=0 causes immensely slow build
# See for more details: https://github.com/rust-lang/rust/issues/119560
RUSTFLAGS: "-C debuginfo=line-tables-only"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"
macos:
name: cargo test (mac)
runs-on: macos-latest
Expand Down Expand Up @@ -357,7 +356,7 @@ jobs:
RUSTFLAGS: "-C debuginfo=0 -C opt-level=0 -C incremental=false -C codegen-units=256"
RUST_BACKTRACE: "1"
# avoid rust stack overflows on tpc-ds tests
RUST_MINSTACK: "3000000"
RUST_MIN_STACK: "3000000"

test-datafusion-pyarrow:
name: cargo test pyarrow (amd64)
Expand Down
3 changes: 2 additions & 1 deletion datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ impl PrintOptions {
let mut row_count = 0_usize;
let mut with_header = true;

while let Some(Ok(batch)) = stream.next().await {
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
row_count += batch.num_rows();
self.format.print_batches(
&mut writer,
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/advanced_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::sync::Arc;
/// the power of the second argument `a^b`.
///
/// To do so, we must implement the `ScalarUDFImpl` trait.
#[derive(Debug, Clone)]
struct PowUdf {
signature: Signature,
aliases: Vec<String>,
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_expr::{
/// a function `partition_evaluator` that returns the `MyPartitionEvaluator` instance.
///
/// To do so, we must implement the `WindowUDFImpl` trait.
#[derive(Debug, Clone)]
struct SmoothItUdf {
signature: Signature,
}
Expand Down
17 changes: 8 additions & 9 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Column
use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
use crate::{DFSchema, DataFusionError, OwnedTableReference, Result, SchemaError};
use std::collections::HashSet;
Expand Down Expand Up @@ -211,13 +212,13 @@ impl Column {
}
}

Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
})
}

/// Qualify column if not done yet.
Expand Down Expand Up @@ -299,23 +300,21 @@ impl Column {
}

// If not due to USING columns then due to ambiguous column name
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
},
));
return _schema_err!(SchemaError::AmbiguousReference {
field: Column::new_unqualified(self.name),
});
}
}
}

Err(DataFusionError::SchemaError(SchemaError::FieldNotFound {
_schema_err!(SchemaError::FieldNotFound {
field: Box::new(self),
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
}))
})
}
}

Expand Down
39 changes: 17 additions & 22 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;

use crate::error::{
unqualified_field_not_found, DataFusionError, Result, SchemaError, _plan_err,
_schema_err,
};
use crate::{
field_not_found, Column, FunctionalDependencies, OwnedTableReference, TableReference,
Expand Down Expand Up @@ -141,11 +142,9 @@ impl DFSchema {
if let Some(qualifier) = field.qualifier() {
qualified_names.insert((qualifier, field.name()));
} else if !unqualified_names.insert(field.name()) {
return Err(DataFusionError::SchemaError(
SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
},
));
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
name: field.name().to_string(),
});
}
}

Expand All @@ -159,14 +158,12 @@ impl DFSchema {
qualified_names.sort();
for (qualifier, name) in &qualified_names {
if unqualified_names.contains(name) {
return Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column {
relation: Some((*qualifier).clone()),
name: name.to_string(),
},
},
));
return _schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: Some((*qualifier).clone()),
name: name.to_string(),
}
});
}
}
Ok(Self {
Expand Down Expand Up @@ -230,9 +227,9 @@ impl DFSchema {
for field in other_schema.fields() {
// skip duplicate columns
let duplicated_field = match field.qualifier() {
Some(q) => self.field_with_name(Some(q), field.name()).is_ok(),
Some(q) => self.has_column_with_qualified_name(q, field.name()),
// for unqualified columns, check as unqualified name
None => self.field_with_unqualified_name(field.name()).is_ok(),
None => self.has_column_with_unqualified_name(field.name()),
};
if !duplicated_field {
self.fields.push(field.clone());
Expand Down Expand Up @@ -392,14 +389,12 @@ impl DFSchema {
if fields_without_qualifier.len() == 1 {
Ok(fields_without_qualifier[0])
} else {
Err(DataFusionError::SchemaError(
SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
_schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
))
})
}
}
}
Expand Down
93 changes: 60 additions & 33 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ pub enum DataFusionError {
Configuration(String),
/// This error happens with schema-related errors, such as schema inference not possible
/// and non-unique column names.
SchemaError(SchemaError),
/// 2nd argument is for optional backtrace
/// Boxing the optional backtrace to prevent <https://rust-lang.github.io/rust-clippy/master/index.html#/result_large_err>
SchemaError(SchemaError, Box<Option<String>>),
/// Error returned during execution of the query.
/// Examples include files not found, errors in parsing certain types.
Execution(String),
Expand Down Expand Up @@ -125,34 +127,6 @@ pub enum SchemaError {
},
}

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

impl Display for SchemaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -298,7 +272,7 @@ impl Display for DataFusionError {
write!(f, "IO error: {desc}")
}
DataFusionError::SQL(ref desc, ref backtrace) => {
let backtrace = backtrace.clone().unwrap_or("".to_owned());
let backtrace: String = backtrace.clone().unwrap_or("".to_owned());
write!(f, "SQL error: {desc:?}{backtrace}")
}
DataFusionError::Configuration(ref desc) => {
Expand All @@ -314,8 +288,10 @@ impl Display for DataFusionError {
DataFusionError::Plan(ref desc) => {
write!(f, "Error during planning: {desc}")
}
DataFusionError::SchemaError(ref desc) => {
write!(f, "Schema error: {desc}")
DataFusionError::SchemaError(ref desc, ref backtrace) => {
let backtrace: &str =
&backtrace.as_ref().clone().unwrap_or("".to_owned());
write!(f, "Schema error: {desc}{backtrace}")
}
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {desc}")
Expand Down Expand Up @@ -356,7 +332,7 @@ impl Error for DataFusionError {
DataFusionError::Internal(_) => None,
DataFusionError::Configuration(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::SchemaError(e, _) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
Expand Down Expand Up @@ -556,12 +532,63 @@ macro_rules! arrow_err {
};
}

// Exposes a macro to create `DataFusionError::SchemaError` with optional backtrace
#[macro_export]
macro_rules! schema_datafusion_err {
($ERR:expr) => {
DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
)
};
}

// Exposes a macro to create `Err(DataFusionError::SchemaError)` with optional backtrace
#[macro_export]
macro_rules! schema_err {
($ERR:expr) => {
Err(DataFusionError::SchemaError(
$ERR,
Box::new(Some(DataFusionError::get_back_trace())),
))
};
}

// To avoid compiler error when using macro in the same crate:
// macros from the current crate cannot be referred to by absolute paths
pub use internal_datafusion_err as _internal_datafusion_err;
pub use internal_err as _internal_err;
pub use not_impl_err as _not_impl_err;
pub use plan_err as _plan_err;
pub use schema_err as _schema_err;

/// Create a "field not found" DataFusion::SchemaError
pub fn field_not_found<R: Into<OwnedTableReference>>(
qualifier: Option<R>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}

#[cfg(test)]
mod test {
Expand Down
Loading

0 comments on commit 04215c5

Please sign in to comment.