diff --git a/cli/src/postgres_cloner.rs b/cli/src/postgres_cloner.rs index 9d87cda..dbe1a5a 100644 --- a/cli/src/postgres_cloner.rs +++ b/cli/src/postgres_cloner.rs @@ -196,22 +196,48 @@ pub fn execute_copy(pg_args: &PostgresConnArgs, query: &str, output_file: &PathB } fn format_schema(schema: &ParquetType, indent: u32) -> String { + fn format_time_unit(u: &parquet::format::TimeUnit) -> &str { + match u { + basic::TimeUnit::MILLIS(_) => "ms", + basic::TimeUnit::MICROS(_) => "µs", + basic::TimeUnit::NANOS(_) => "ns", + } + } + fn format_logical_type(t: &LogicalType) -> String { + match t { + LogicalType::Decimal { scale, precision } => format!("Decimal({}, {})", precision, scale), + LogicalType::Time { is_adjusted_to_u_t_c, unit } => + format!("Time({}, utc={:?})", format_time_unit(unit), is_adjusted_to_u_t_c), + LogicalType::Timestamp { is_adjusted_to_u_t_c, unit } => + format!("Timestamp({}, utc={:?})", format_time_unit(unit), is_adjusted_to_u_t_c), + LogicalType::Integer { bit_width, is_signed } => { + let sign = if *is_signed { "" } else { "U" }; + format!("{}Int{}", sign, bit_width) + } + _ => format!("{:?}", t) + } + } let basic_info = schema.get_basic_info(); - let mut additional_info = - basic_info.logical_type().map(|lt| format!("{:?}", lt)) + let logical_type = + basic_info.logical_type().map(|lt| format_logical_type(<)) .or_else(|| match basic_info.converted_type() { ConvertedType::NONE => None, c => Some(c.to_string()) } - ).unwrap_or("".to_string()); + ); match schema { ParquetType::PrimitiveType { basic_info, physical_type, type_length, scale, precision } => { - if *precision > 0 { + let (primary_type, mut additional_info) = match logical_type { + Some(lt) => (lt, physical_type.to_string()), + None => (physical_type.to_string(), "".to_owned()) + }; + + if *precision > 0 && !matches!(basic_info.logical_type(), Some(LogicalType::Decimal { .. })) { additional_info += &format!(" precision: {}", precision); } - if *scale > 0 { + if *scale > 0 && !matches!(basic_info.logical_type(), Some(LogicalType::Decimal { .. })) { additional_info += &format!(" scale: {}", scale); } if !additional_info.trim().is_empty() { @@ -219,14 +245,20 @@ fn format_schema(schema: &ParquetType, indent: u32) -> String { } let byte_size = if *type_length >= 0 { format!(" [{}b]", *type_length) } else { "".to_string() }; - format!("{} {}: {}{}{}", basic_info.repetition(), basic_info.name(), physical_type, byte_size, additional_info) + format!("{} {}: {}{}{}", + basic_info.repetition().to_string().to_lowercase(), + basic_info.name(), + primary_type, + byte_size, + additional_info) }, ParquetType::GroupType { basic_info, fields } => { + let mut additional_info = logical_type.unwrap_or_else(|| "".to_owned()); let fields_str = fields.iter() .map(|f| "\t".repeat(indent as usize) + " * " + &format_schema(f, indent + 1)) .collect::>(); - let rp = if basic_info.has_repetition() && basic_info.repetition() != Repetition::REQUIRED { basic_info.repetition().to_string() + " " } else { "".to_string() }; + let rp = if basic_info.has_repetition() && basic_info.repetition() != Repetition::REQUIRED { basic_info.repetition().to_string().to_lowercase() + " " } else { "".to_string() }; if !additional_info.trim().is_empty() { additional_info = format!(" ({})", additional_info.trim()); @@ -286,14 +318,10 @@ fn map_schema_column( *mapping.get(&e.name).unwrap_or_else(|| panic!("Could not map enum value {}. Was new enum case added while pg2parquet is running?", &e.name)) )) }, - SchemaSettingsEnumHandling::Text | SchemaSettingsEnumHandling::PlainText => { - let logical_type = if settings.enum_handling == SchemaSettingsEnumHandling::Text { - LogicalType::Enum - } else { - LogicalType::String - }; - Ok(resolve_primitive::(c.col_name(), c, Some(logical_type), None)) - }, + SchemaSettingsEnumHandling::Text => + Ok(resolve_primitive::(c.col_name(), c, Some(LogicalType::Enum), None)), + SchemaSettingsEnumHandling::PlainText => + Ok(resolve_primitive::(c.col_name(), c, Some(LogicalType::String), None)), } Kind::Array(ref element_type) => { let list_column = c.nest("list", 0).as_array(); @@ -409,7 +437,7 @@ fn map_simple_type( "char" => resolve_primitive::(name, c, Some(LogicalType::Integer { bit_width: 8, is_signed: false }), None), "bytea" => resolve_primitive::, ByteArrayType, _>(name, c, None, None), "name" | "text" | "xml" | "bpchar" | "varchar" | "citext" => - resolve_primitive::(name, c, None, Some(ConvertedType::UTF8)), + resolve_primitive::(name, c, Some(LogicalType::String), Some(ConvertedType::UTF8)), "jsonb" | "json" => resolve_primitive::(name, c, Some(match s.json_handling { SchemaSettingsJsonHandling::Text => LogicalType::String,