Skip to content

Commit

Permalink
Improve the readability of the printed schema
Browse files Browse the repository at this point in the history
  • Loading branch information
exyi committed May 2, 2024
1 parent 1d930b7 commit ffcc01c
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions cli/src/postgres_cloner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,37 +196,69 @@ pub fn execute_copy(pg_args: &PostgresConnArgs, query: &str, output_file: &PathB
}

fn format_schema(schema: &ParquetType, indent: u32) -> String {
fn format_time_unit(u: &parquet::format::TimeUnit) -> &str {
match u {
basic::TimeUnit::MILLIS(_) => "ms",
basic::TimeUnit::MICROS(_) => "µs",
basic::TimeUnit::NANOS(_) => "ns",
}
}
fn format_logical_type(t: &LogicalType) -> String {
match t {
LogicalType::Decimal { scale, precision } => format!("Decimal({}, {})", precision, scale),
LogicalType::Time { is_adjusted_to_u_t_c, unit } =>
format!("Time({}, utc={:?})", format_time_unit(unit), is_adjusted_to_u_t_c),
LogicalType::Timestamp { is_adjusted_to_u_t_c, unit } =>
format!("Timestamp({}, utc={:?})", format_time_unit(unit), is_adjusted_to_u_t_c),
LogicalType::Integer { bit_width, is_signed } => {
let sign = if *is_signed { "" } else { "U" };
format!("{}Int{}", sign, bit_width)
}
_ => format!("{:?}", t)
}
}
let basic_info = schema.get_basic_info();
let mut additional_info =
basic_info.logical_type().map(|lt| format!("{:?}", lt))
let logical_type =
basic_info.logical_type().map(|lt| format_logical_type(&lt))
.or_else(||
match basic_info.converted_type() {
ConvertedType::NONE => None,
c => Some(c.to_string())
}
).unwrap_or("".to_string());
);

match schema {
ParquetType::PrimitiveType { basic_info, physical_type, type_length, scale, precision } => {
if *precision > 0 {
let (primary_type, mut additional_info) = match logical_type {
Some(lt) => (lt, physical_type.to_string()),
None => (physical_type.to_string(), "".to_owned())
};

if *precision > 0 && !matches!(basic_info.logical_type(), Some(LogicalType::Decimal { .. })) {
additional_info += &format!(" precision: {}", precision);
}
if *scale > 0 {
if *scale > 0 && !matches!(basic_info.logical_type(), Some(LogicalType::Decimal { .. })) {
additional_info += &format!(" scale: {}", scale);
}
if !additional_info.trim().is_empty() {
additional_info = format!(" ({})", additional_info.trim());
}
let byte_size = if *type_length >= 0 { format!(" [{}b]", *type_length) } else { "".to_string() };

format!("{} {}: {}{}{}", basic_info.repetition(), basic_info.name(), physical_type, byte_size, additional_info)
format!("{} {}: {}{}{}",
basic_info.repetition().to_string().to_lowercase(),
basic_info.name(),
primary_type,
byte_size,
additional_info)
},
ParquetType::GroupType { basic_info, fields } => {
let mut additional_info = logical_type.unwrap_or_else(|| "".to_owned());
let fields_str = fields.iter()
.map(|f| "\t".repeat(indent as usize) + " * " + &format_schema(f, indent + 1))
.collect::<Vec<_>>();

let rp = if basic_info.has_repetition() && basic_info.repetition() != Repetition::REQUIRED { basic_info.repetition().to_string() + " " } else { "".to_string() };
let rp = if basic_info.has_repetition() && basic_info.repetition() != Repetition::REQUIRED { basic_info.repetition().to_string().to_lowercase() + " " } else { "".to_string() };

if !additional_info.trim().is_empty() {
additional_info = format!(" ({})", additional_info.trim());
Expand Down Expand Up @@ -286,14 +318,10 @@ fn map_schema_column<TRow: PgAbstractRow + Clone + 'static>(
*mapping.get(&e.name).unwrap_or_else(|| panic!("Could not map enum value {}. Was new enum case added while pg2parquet is running?", &e.name))
))
},
SchemaSettingsEnumHandling::Text | SchemaSettingsEnumHandling::PlainText => {
let logical_type = if settings.enum_handling == SchemaSettingsEnumHandling::Text {
LogicalType::Enum
} else {
LogicalType::String
};
Ok(resolve_primitive::<PgEnum, ByteArrayType, _>(c.col_name(), c, Some(logical_type), None))
},
SchemaSettingsEnumHandling::Text =>
Ok(resolve_primitive::<PgEnum, ByteArrayType, _>(c.col_name(), c, Some(LogicalType::Enum), None)),
SchemaSettingsEnumHandling::PlainText =>
Ok(resolve_primitive::<PgEnum, ByteArrayType, _>(c.col_name(), c, Some(LogicalType::String), None)),
}
Kind::Array(ref element_type) => {
let list_column = c.nest("list", 0).as_array();
Expand Down Expand Up @@ -409,7 +437,7 @@ fn map_simple_type<TRow: PgAbstractRow + Clone + 'static>(
"char" => resolve_primitive::<i8, Int32Type, _>(name, c, Some(LogicalType::Integer { bit_width: 8, is_signed: false }), None),
"bytea" => resolve_primitive::<Vec<u8>, ByteArrayType, _>(name, c, None, None),
"name" | "text" | "xml" | "bpchar" | "varchar" | "citext" =>
resolve_primitive::<String, ByteArrayType, _>(name, c, None, Some(ConvertedType::UTF8)),
resolve_primitive::<String, ByteArrayType, _>(name, c, Some(LogicalType::String), Some(ConvertedType::UTF8)),
"jsonb" | "json" =>
resolve_primitive::<PgRawJsonb, ByteArrayType, _>(name, c, Some(match s.json_handling {
SchemaSettingsJsonHandling::Text => LogicalType::String,
Expand Down

0 comments on commit ffcc01c

Please sign in to comment.