From 68fce79eeeceb81c1a8cb2443ae1b1f249696a4f Mon Sep 17 00:00:00 2001 From: Marcel Maltry Date: Wed, 14 Feb 2024 14:58:09 +0100 Subject: [PATCH] [Benchmark] Support index field in connectors Index support added for mutable, DuckDB, and PostgreSQL. Notably, HyPer is not yet supported as index support is currently disabled in `tableauhyperapi`. The method field is ignored by all databases but mutable. --- benchmark/database_connectors/connector.py | 22 +++++++++++++++++++++ benchmark/database_connectors/duckdb.py | 9 ++++++++- benchmark/database_connectors/hyper.py | 8 ++++++++ benchmark/database_connectors/mutable.py | 21 ++++++++++++++++++++ benchmark/database_connectors/postgresql.py | 9 ++++++++- 5 files changed, 67 insertions(+), 2 deletions(-) diff --git a/benchmark/database_connectors/connector.py b/benchmark/database_connectors/connector.py index 3aa648aa..b2f49477 100644 --- a/benchmark/database_connectors/connector.py +++ b/benchmark/database_connectors/connector.py @@ -111,6 +111,28 @@ def check_with_scale_factors(params: dict[str, Any]) -> bool: return False + # Generates statements for creating indexes. + @staticmethod + def generate_create_index_stmts(table_name: str, indexes: dict[str, dict[str, Any]]) -> list[str]: + create_indexes: list[str] = list() + for index_name, index in indexes.items(): + method: str | None = index.get('method') # ignored + attributes: str | list[str] = index['attributes'] + if isinstance(attributes, list): + attributes = ', '.join(attributes) + create_indexes.append(f'CREATE INDEX {index_name} ON "{table_name}" ({attributes});') + return create_indexes + + + # Generates statements for dropping indexes. + @staticmethod + def generate_drop_index_stmts(indexes: dict[str, dict[str, Any]]) -> list[str]: + drop_indexes: list[str] = list() + for index_name, _ in indexes.items(): + drop_indexes.append(f'DROP INDEX IF EXISTS {index_name};') + return drop_indexes + + #=================================================================================================================== # Start the shell with `command` and pass `query` to its stdin. If the process does not respond after `timeout` # milliseconds, raise a ExperimentTimeoutExpired(). Return the stdout of the process containing the diff --git a/benchmark/database_connectors/duckdb.py b/benchmark/database_connectors/duckdb.py index 8be762f6..b67a63a0 100644 --- a/benchmark/database_connectors/duckdb.py +++ b/benchmark/database_connectors/duckdb.py @@ -82,6 +82,8 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult: # Create tables from tmp tables with scale factor for table_name, table in params['data'].items(): statements.append(f'DELETE FROM "{table_name}";') # empty existing table + drop_indexes: list[str] = self.generate_drop_index_stmts(table.get('indexes', dict())) + statements.extend(drop_indexes) # drop indexes sf: float | int if table.get('scale_factors') is not None: scale_factors = table['scale_factors'] @@ -96,6 +98,8 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult: header: int = int(table.get('header', 0)) num_rows: int = round((table['lines_in_file'] - header) * sf) statements.append(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}{COMPLETE_TABLE_SUFFIX}" LIMIT {num_rows};') + create_indexes: list[str] = self.generate_create_index_stmts(table_name, table.get('indexes', dict())) + statements.extend(create_indexes) # create indexes statements.append(".timer on") statements.append(query_stmt) # Actual query from this case @@ -130,13 +134,16 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult: statements.extend(complete_tables) statements.extend(actual_tables) - # Dropping and recreating tables in between runs removes any cache influences + # Dropping and recreating tables and indexes in between runs removes any cache influences refill_stmts: list[str] = list() for name, table in params['data'].items(): refill_stmts.append(f'DROP TABLE "{name}";') + # Dropping a table also drops its indexes refill_stmts.extend(actual_tables) for name, table in params['data'].items(): refill_stmts.append(f'INSERT INTO "{name}" (SELECT * FROM "{name}{COMPLETE_TABLE_SUFFIX}");') + create_indexes: list[str] = self.generate_create_index_stmts(name, table.get('indexes', dict())) + refill_stmts.extend(create_indexes) for _ in range(n_runs): statements.extend(refill_stmts) diff --git a/benchmark/database_connectors/hyper.py b/benchmark/database_connectors/hyper.py index a9037377..93251e05 100644 --- a/benchmark/database_connectors/hyper.py +++ b/benchmark/database_connectors/hyper.py @@ -92,6 +92,10 @@ def _execute(n_runs: int, params: dict[str, Any]) -> ConnectorResult: # Set up tables for table_name, table in params['data'].items(): connection.execute_command(f'DELETE FROM "{table_name}";') # Empty table first + # Index support is currently disabled in tableauhyperapi + # drop_indexes: list[str] = HyPer.generate_drop_index_stmts(table.get('indexes', dict())) + # for stmt in drop_indexes: + # connection.execute_command(stmt) # Drop indexes sf: float | int if table.get('scale_factors') is not None: @@ -107,6 +111,10 @@ def _execute(n_runs: int, params: dict[str, Any]) -> ConnectorResult: header: int = int(table.get('header', 0)) num_rows: int = round((table['lines_in_file'] - header) * sf) connection.execute_command(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}_tmp" LIMIT {num_rows};') + # Index support is currently disabled in tableauhyperapi + # create_indexes: list[str] = HyPer.generate_create_index_stmts(table_name, table.get('indexes', dict())) + # for stmt in create_indexes: + # connection.execute_command(stmt) # Create indexes # Execute query with connection.execute_query(query) as result: diff --git a/benchmark/database_connectors/mutable.py b/benchmark/database_connectors/mutable.py index 76c8292a..cdab6b9a 100644 --- a/benchmark/database_connectors/mutable.py +++ b/benchmark/database_connectors/mutable.py @@ -257,6 +257,10 @@ def get_setup_statements(self, suite: str, path_to_data: str, data: dict[str, di import_str += ' HAS HEADER SKIP HEADER' statements.append(import_str + ';') + # Create CREATE INDEX statements for current table + create_indexes: list[str] = self.generate_create_index_stmts(table_name, table.get('indexes', dict())) + statements.extend(create_indexes) + return statements @@ -283,6 +287,23 @@ def parse_results(results: str, pattern: str) -> list[float]: return durations + # Overrides `generate_create_index_stmts` from Connector ABC + @staticmethod + def generate_create_index_stmts(table_name: str, indexes: dict[str, dict[str, Any]]) -> list[str]: + create_indexes: list[str] = list() + for index_name, index in indexes.items(): + method: str | None = index.get('method') + attributes: str | list[str] = index['attributes'] + if isinstance(attributes, list): + attributes = ', '.join(attributes) + index_str: str = f'CREATE INDEX {index_name} ON {table_name}' + if method: + index_str += f' USING {method}' + index_str += f' ({attributes});' + create_indexes.append(index_str) + return create_indexes + + # Overrides `print_command` from Connector ABC def print_command(self, command: str | bytes | Sequence[str | bytes], query: str, indent: str = '') -> None: assert isinstance(command, Sequence) and isinstance(command[0], str) and not isinstance(command, str), \ diff --git a/benchmark/database_connectors/postgresql.py b/benchmark/database_connectors/postgresql.py index 87fca930..92292263 100644 --- a/benchmark/database_connectors/postgresql.py +++ b/benchmark/database_connectors/postgresql.py @@ -93,7 +93,11 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult: header: int = int(table.get('header', 0)) num_rows: int = round((table['lines_in_file'] - header) * sf) cursor.execute(f'DELETE FROM "{table_name}";') # empty existing table + drop_indexes: list[str] = self.generate_drop_index_stmts(table.get('indexes', dict())) + cursor.execute(''.join(drop_indexes)) cursor.execute(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}{COMPLETE_TABLE_SUFFIX}" LIMIT {num_rows};') # copy data with scale factor + create_indexes: list[str] = self.generate_create_index_stmts(table_name, table.get('indexes', dict())) + cursor.execute(''.join(create_indexes)) finally: connection.close() del connection @@ -129,13 +133,16 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult: # Prepare db actual_tables: list[str] = self.prepare_db(params) - # Dropping and recreating tables in between runs removes any cache influences + # Dropping and recreating tables and indexes in between runs removes any cache influences refill_stmts: list[str] = list() for name, table in params['data'].items(): refill_stmts.append(f'DROP TABLE "{name}";') + # Dropping a table also drops its indexes refill_stmts.extend(actual_tables) for name, table in params['data'].items(): refill_stmts.append(f'INSERT INTO "{name}" (SELECT * FROM "{name}{COMPLETE_TABLE_SUFFIX}");') + create_indexes: list[str] = self.generate_create_index_stmts(name, table.get('indexes', dict())) + refill_stmts.extend(create_indexes) # Write cases/queries to a file that will be passed to the command to execute with open(TMP_SQL_FILE, "w") as tmp: