Skip to content

Commit

Permalink
catalog: Move builtin types to correct schema (#23842)
Browse files Browse the repository at this point in the history
Previously, we were accidentally putting all types in the pg_catalog
schema. This commit moves the unsigned types and mz_timestamp to the
mz_catalog schema and the acl_item type to the mz_internal schema.

Fixes #23833
  • Loading branch information
jkosh44 authored Dec 16, 2023
1 parent 9c5df2a commit f2a47ad
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 56 deletions.
52 changes: 52 additions & 0 deletions misc/python/materialize/checks/all_checks/materialize_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check


class MaterializeType(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
> CREATE view v1 (a) AS SELECT 1::mz_timestamp;
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
"""
> CREATE VIEW v2 (a) AS SELECT 1::uint4;
""",
"""
> CREATE VIEW v3 (a) AS SELECT 1::uint8;
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT * FROM v1;
1
> SELECT * FROM v2;
1
> SELECT * FROM v3;
1
"""
)
)
11 changes: 1 addition & 10 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5392,16 +5392,7 @@ mod tests {
let catalog = Arc::new(catalog);
let conn_catalog = catalog.for_system_session();

let resolve_type_oid = |item: &str| {
conn_catalog
.resolve_type(&PartialItemName {
database: None,
schema: Some(PG_CATALOG_SCHEMA.into()),
item: item.to_string(),
})
.unwrap_or_else(|_| panic!("unable to resolve type: {item}"))
.oid()
};
let resolve_type_oid = |item: &str| conn_catalog.state().get_system_type(item).oid();
let mut handles = Vec::new();

// Extracted during planning; always panics when executed.
Expand Down
51 changes: 49 additions & 2 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
use std::collections::BTreeMap;

use futures::future::BoxFuture;
use mz_catalog::builtin::{BuiltinType, BUILTINS};
use mz_catalog::durable::Transaction;
use mz_ore::collections::CollectionExt;
use mz_ore::now::{EpochMillis, NowFn};
use mz_repr::namespaces::PG_CATALOG_SCHEMA;
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::{Raw, Statement};
use mz_sql::catalog::NameReference;
use mz_sql_parser::ast::{visit_mut, Ident, Raw, RawDataType, Statement};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::sources::GenericSourceConnection;
use once_cell::sync::Lazy;
use semver::Version;
use tracing::info;

Expand Down Expand Up @@ -86,10 +90,10 @@ pub(crate) async fn migrate(
//
// Migration functions may also take `tx` as input to stage
// arbitrary changes to the catalog.

if catalog_version <= Version::new(0, 79, u64::MAX) {
ast_rewrite_create_sink_into_kafka_options_0_80_0(stmt)?;
}
ast_rewrite_rewrite_type_schemas_0_81_0(stmt);

Ok(())
})
Expand Down Expand Up @@ -277,6 +281,49 @@ fn ast_rewrite_create_sink_into_kafka_options_0_80_0(
Ok(())
}

/// Rewrite all non-`pg_catalog` system types to have the correct schema.
fn ast_rewrite_rewrite_type_schemas_0_81_0(stmt: &mut Statement<Raw>) {
use mz_sql::ast::visit_mut::VisitMut;

static NON_PG_CATALOG_TYPES: Lazy<BTreeMap<&'static str, &'static BuiltinType<NameReference>>> =
Lazy::new(|| {
BUILTINS::types()
.filter(|typ| typ.schema != PG_CATALOG_SCHEMA)
.map(|typ| (typ.name, typ))
.collect()
});

struct Rewriter;

impl<'ast> VisitMut<'ast, Raw> for Rewriter {
fn visit_data_type_mut(&mut self, node: &'ast mut RawDataType) {
match node {
RawDataType::Array(_) => {}
RawDataType::List(_) => {}
RawDataType::Map { .. } => {}
RawDataType::Other { name, .. } => {
let name = name.name_mut();
let name = &mut name.0;
let len = name.len();
if len >= 2 {
let item_name = &name[len - 1];
let schema_name = &name[len - 2];

if schema_name.as_str() == PG_CATALOG_SCHEMA {
if let Some(typ) = NON_PG_CATALOG_TYPES.get(item_name.as_str()) {
name[len - 2] = Ident::new_unchecked(typ.schema);
}
}
}
}
}
visit_mut::visit_data_type_mut(self, node);
}
}

Rewriter.visit_statement_mut(stmt);
}

fn _add_to_audit_log(
tx: &mut Transaction,
event_type: mz_audit_log::EventType,
Expand Down
5 changes: 3 additions & 2 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,6 @@ impl Catalog {
_ => {}
}
}
let pg_catalog_schema_id = state.get_pg_catalog_schema_id().clone();
for typ in &mut builtin_types {
let element_id = name_to_id_map[typ.name];
typ.details.array_id = element_id_to_array_id.get(&element_id).map(|id| id.clone());
Expand All @@ -1207,13 +1206,15 @@ impl Catalog {
let desc = None;
assert!(!matches!(typ.details.typ, CatalogType::Record { .. }));

let schema_id = state.resolve_system_schema(typ.schema);

state.insert_item(
element_id,
typ.oid,
QualifiedItemName {
qualifiers: ItemQualifiers {
database_spec: ResolvedDatabaseSpecifier::Ambient,
schema_spec: SchemaSpecifier::Id(pg_catalog_schema_id),
schema_spec: SchemaSpecifier::Id(schema_id),
},
item: typ.name.to_owned(),
},
Expand Down
23 changes: 15 additions & 8 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1626,30 +1626,30 @@ impl CatalogState {
Ok(())
}

/// Optimized lookup for a builtin table
/// Optimized lookup for a builtin table.
///
/// Panics if the builtin table doesn't exist in the catalog
/// Panics if the builtin table doesn't exist in the catalog.
pub fn resolve_builtin_table(&self, builtin: &'static BuiltinTable) -> GlobalId {
self.resolve_builtin_object(&Builtin::<IdReference>::Table(builtin))
}

/// Optimized lookup for a builtin log
/// Optimized lookup for a builtin log.
///
/// Panics if the builtin log doesn't exist in the catalog
/// Panics if the builtin log doesn't exist in the catalog.
pub fn resolve_builtin_log(&self, builtin: &'static BuiltinLog) -> GlobalId {
self.resolve_builtin_object(&Builtin::<IdReference>::Log(builtin))
}

/// Optimized lookup for a builtin storage collection
/// Optimized lookup for a builtin storage collection.
///
/// Panics if the builtin storage collection doesn't exist in the catalog
/// Panics if the builtin storage collection doesn't exist in the catalog.
pub fn resolve_builtin_source(&self, builtin: &'static BuiltinSource) -> GlobalId {
self.resolve_builtin_object(&Builtin::<IdReference>::Source(builtin))
}

/// Optimized lookup for a builtin object
/// Optimized lookup for a builtin object.
///
/// Panics if the builtin object doesn't exist in the catalog
/// Panics if the builtin object doesn't exist in the catalog.
pub fn resolve_builtin_object<T: TypeReference>(&self, builtin: &Builtin<T>) -> GlobalId {
let schema_id = &self.ambient_schemas_by_name[builtin.schema()];
let schema = &self.ambient_schemas_by_id[schema_id];
Expand Down Expand Up @@ -1706,6 +1706,13 @@ impl CatalogState {
Err(SqlCatalogError::UnknownSchema(schema_name.into()))
}

/// Optimized lookup for a system schema.
///
/// Panics if the system schema doesn't exist in the catalog.
pub fn resolve_system_schema(&self, name: &str) -> SchemaId {
self.ambient_schemas_by_name[name]
}

pub fn resolve_search_path(
&self,
session: &Session,
Expand Down
36 changes: 29 additions & 7 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ pub const TYPE_ANYCOMPATIBLERANGE: BuiltinType<NameReference> = BuiltinType {

pub const TYPE_LIST: BuiltinType<NameReference> = BuiltinType {
name: "list",
schema: PG_CATALOG_SCHEMA,
schema: MZ_CATALOG_SCHEMA,
oid: mz_pgrepr::oid::TYPE_LIST_OID,
details: CatalogTypeDetails {
typ: CatalogType::Pseudo,
Expand All @@ -1270,7 +1270,7 @@ pub const TYPE_LIST: BuiltinType<NameReference> = BuiltinType {

pub const TYPE_MAP: BuiltinType<NameReference> = BuiltinType {
name: "map",
schema: PG_CATALOG_SCHEMA,
schema: MZ_CATALOG_SCHEMA,
oid: mz_pgrepr::oid::TYPE_MAP_OID,
details: CatalogTypeDetails {
typ: CatalogType::Pseudo,
Expand All @@ -1281,7 +1281,7 @@ pub const TYPE_MAP: BuiltinType<NameReference> = BuiltinType {

pub const TYPE_ANYCOMPATIBLELIST: BuiltinType<NameReference> = BuiltinType {
name: "anycompatiblelist",
schema: PG_CATALOG_SCHEMA,
schema: MZ_CATALOG_SCHEMA,
oid: mz_pgrepr::oid::TYPE_ANYCOMPATIBLELIST_OID,
details: CatalogTypeDetails {
typ: CatalogType::Pseudo,
Expand All @@ -1292,7 +1292,7 @@ pub const TYPE_ANYCOMPATIBLELIST: BuiltinType<NameReference> = BuiltinType {

pub const TYPE_ANYCOMPATIBLEMAP: BuiltinType<NameReference> = BuiltinType {
name: "anycompatiblemap",
schema: PG_CATALOG_SCHEMA,
schema: MZ_CATALOG_SCHEMA,
oid: mz_pgrepr::oid::TYPE_ANYCOMPATIBLEMAP_OID,
details: CatalogTypeDetails {
typ: CatalogType::Pseudo,
Expand Down Expand Up @@ -2886,17 +2886,17 @@ pub const MZ_OBJECTS: BuiltinView = BuiltinView {
sql:
"SELECT id, oid, schema_id, name, type, owner_id, privileges FROM mz_catalog.mz_relations
UNION ALL
SELECT id, oid, schema_id, name, 'sink', owner_id, NULL::mz_aclitem[] FROM mz_catalog.mz_sinks
SELECT id, oid, schema_id, name, 'sink', owner_id, NULL::mz_catalog.mz_aclitem[] FROM mz_catalog.mz_sinks
UNION ALL
SELECT mz_indexes.id, mz_indexes.oid, mz_relations.schema_id, mz_indexes.name, 'index', mz_indexes.owner_id, NULL::mz_aclitem[]
SELECT mz_indexes.id, mz_indexes.oid, mz_relations.schema_id, mz_indexes.name, 'index', mz_indexes.owner_id, NULL::mz_catalog.mz_aclitem[]
FROM mz_catalog.mz_indexes
JOIN mz_catalog.mz_relations ON mz_indexes.on_id = mz_relations.id
UNION ALL
SELECT id, oid, schema_id, name, 'connection', owner_id, privileges FROM mz_catalog.mz_connections
UNION ALL
SELECT id, oid, schema_id, name, 'type', owner_id, privileges FROM mz_catalog.mz_types
UNION ALL
SELECT id, oid, schema_id, name, 'function', owner_id, NULL::mz_aclitem[] FROM mz_catalog.mz_functions
SELECT id, oid, schema_id, name, 'function', owner_id, NULL::mz_catalog.mz_aclitem[] FROM mz_catalog.mz_functions
UNION ALL
SELECT id, oid, schema_id, name, 'secret', owner_id, privileges FROM mz_catalog.mz_secrets",
sensitivity: DataSensitivity::Public,
Expand Down Expand Up @@ -6452,3 +6452,25 @@ pub mod BUILTINS {
BUILTINS_STATIC.iter()
}
}

#[mz_ore::test]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
fn test_builtin_type_schema() {
use mz_pgrepr::oid::FIRST_MATERIALIZE_OID;

for typ in BUILTINS::types() {
if typ.oid < FIRST_MATERIALIZE_OID {
assert_eq!(
typ.schema, PG_CATALOG_SCHEMA,
"{typ:?} should be in {PG_CATALOG_SCHEMA} schema"
);
} else {
// `mz_pgrepr::Type` resolution relies on all non-PG types existing in the mz_catalog
// schema.
assert_eq!(
typ.schema, MZ_CATALOG_SCHEMA,
"{typ:?} should be in {MZ_CATALOG_SCHEMA} schema"
);
}
}
}
11 changes: 9 additions & 2 deletions src/sql/src/plan/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod validate;

use crate::session::vars;
pub(crate) use ddl::PgConfigOptionExtracted;
use mz_pgrepr::oid::{FIRST_MATERIALIZE_OID, FIRST_USER_OID};
use mz_repr::role_id::RoleId;

/// Describes the output of a SQL statement.
Expand Down Expand Up @@ -764,11 +765,17 @@ impl<'a> StatementContext<'a> {
// representation on `pgrepr::Type` promises to
// produce an unqualified type name that does
// not require quoting.
//
let mut ty = if ty.oid() >= FIRST_USER_OID {
sql_bail!("internal error, unexpected user type: {ty:?} ");
} else if ty.oid() < FIRST_MATERIALIZE_OID {
format!("pg_catalog.{}", ty)
} else {
// This relies on all non-PG types existing in `mz_catalog`, which is annoying.
format!("mz_catalog.{}", ty)
};
// TODO(benesch): converting `json` to `jsonb`
// is wrong. We ought to support the `json` type
// directly.
let mut ty = format!("pg_catalog.{}", ty);
if ty == "pg_catalog.json" {
ty = "pg_catalog.jsonb".into();
}
Expand Down
12 changes: 6 additions & 6 deletions test/sqllogictest/aclitem.slt
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ SELECT mz_internal.mz_aclexplode(ARRAY[mz_internal.make_mz_aclitem('u1', 'u2', '
(u2,u1,SELECT,f)

query error MZ_ACL arrays must not contain null values
SELECT mz_internal.mz_aclexplode(array[null]::mz_aclitem[]);
SELECT mz_internal.mz_aclexplode(array[null]::mz_catalog.mz_aclitem[]);

# Test casting to/from aclitem and mz_aclitem

Expand All @@ -324,12 +324,12 @@ SELECT mz_internal.make_mz_aclitem('p', 'u2', 'CREATE, USAGE')::aclitem = makeac
true

query B
SELECT makeaclitem((SELECT oid FROM mz_roles WHERE name = 'materialize'), (SELECT oid FROM mz_roles WHERE name = 'test_role'), 'CREATE, USAGE', false)::mz_aclitem = mz_internal.make_mz_aclitem('u1', 'u2', 'CREATE, USAGE')
SELECT makeaclitem((SELECT oid FROM mz_roles WHERE name = 'materialize'), (SELECT oid FROM mz_roles WHERE name = 'test_role'), 'CREATE, USAGE', false)::mz_catalog.mz_aclitem = mz_internal.make_mz_aclitem('u1', 'u2', 'CREATE, USAGE')
----
true

query B
SELECT makeaclitem(0, (SELECT oid FROM mz_roles WHERE name = 'test_role'), 'CREATE, USAGE', false)::mz_aclitem = mz_internal.make_mz_aclitem('p', 'u2', 'CREATE, USAGE')
SELECT makeaclitem(0, (SELECT oid FROM mz_roles WHERE name = 'test_role'), 'CREATE, USAGE', false)::mz_catalog.mz_aclitem = mz_internal.make_mz_aclitem('p', 'u2', 'CREATE, USAGE')
----
true

Expand All @@ -349,17 +349,17 @@ SELECT mz_internal.make_mz_aclitem('u3251', 's345', 'CREATE')::aclitem::text
NULL

query T
SELECT makeaclitem(99991, (SELECT oid FROM mz_roles WHERE name = 'test_role'), 'CREATE', false)::mz_aclitem
SELECT makeaclitem(99991, (SELECT oid FROM mz_roles WHERE name = 'test_role'), 'CREATE', false)::mz_catalog.mz_aclitem
----
NULL

query T
SELECT makeaclitem((SELECT oid FROM mz_roles WHERE name = 'materialize'), 87398, 'CREATE', false)::mz_aclitem
SELECT makeaclitem((SELECT oid FROM mz_roles WHERE name = 'materialize'), 87398, 'CREATE', false)::mz_catalog.mz_aclitem
----
NULL

query T
SELECT makeaclitem(3251, 345, 'CREATE', false)::mz_aclitem
SELECT makeaclitem(3251, 345, 'CREATE', false)::mz_catalog.mz_aclitem
----
NULL

Expand Down
2 changes: 1 addition & 1 deletion test/sqllogictest/list.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2023,7 +2023,7 @@ SELECT '{{1,2}}'::int4_list_list_c::text;
query error type "bool list" does not exist
CREATE TYPE nested_list AS LIST (ELEMENT TYPE = "bool list")

query error db error: ERROR: cannot reference pseudo type pg_catalog\.list
query error db error: ERROR: cannot reference pseudo type mz_catalog\.list
CREATE TYPE nested_list AS LIST (ELEMENT TYPE = list)

# 🔬🔬 Check each valid non-array element type
Expand Down
Loading

0 comments on commit f2a47ad

Please sign in to comment.