Skip to content

Commit

Permalink
[Benchmark] Support index field in connectors
Browse files Browse the repository at this point in the history
Index support added for mutable, DuckDB, and PostgreSQL.  Notably, HyPer
is not yet supported as index support is currently disabled in
`tableauhyperapi`.  The method field is ignored by all databases but
mutable.
  • Loading branch information
marcelmaltry committed Feb 16, 2024
1 parent 1d37833 commit 68fce79
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 2 deletions.
22 changes: 22 additions & 0 deletions benchmark/database_connectors/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ def check_with_scale_factors(params: dict[str, Any]) -> bool:
return False


# Generates statements for creating indexes.
@staticmethod
def generate_create_index_stmts(table_name: str, indexes: dict[str, dict[str, Any]]) -> list[str]:
create_indexes: list[str] = list()
for index_name, index in indexes.items():
method: str | None = index.get('method') # ignored
attributes: str | list[str] = index['attributes']
if isinstance(attributes, list):
attributes = ', '.join(attributes)
create_indexes.append(f'CREATE INDEX {index_name} ON "{table_name}" ({attributes});')
return create_indexes


# Generates statements for dropping indexes.
@staticmethod
def generate_drop_index_stmts(indexes: dict[str, dict[str, Any]]) -> list[str]:
drop_indexes: list[str] = list()
for index_name, _ in indexes.items():
drop_indexes.append(f'DROP INDEX IF EXISTS {index_name};')
return drop_indexes


#===================================================================================================================
# Start the shell with `command` and pass `query` to its stdin. If the process does not respond after `timeout`
# milliseconds, raise a ExperimentTimeoutExpired(). Return the stdout of the process containing the
Expand Down
9 changes: 8 additions & 1 deletion benchmark/database_connectors/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
# Create tables from tmp tables with scale factor
for table_name, table in params['data'].items():
statements.append(f'DELETE FROM "{table_name}";') # empty existing table
drop_indexes: list[str] = self.generate_drop_index_stmts(table.get('indexes', dict()))
statements.extend(drop_indexes) # drop indexes
sf: float | int
if table.get('scale_factors') is not None:
scale_factors = table['scale_factors']
Expand All @@ -96,6 +98,8 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
header: int = int(table.get('header', 0))
num_rows: int = round((table['lines_in_file'] - header) * sf)
statements.append(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}{COMPLETE_TABLE_SUFFIX}" LIMIT {num_rows};')
create_indexes: list[str] = self.generate_create_index_stmts(table_name, table.get('indexes', dict()))
statements.extend(create_indexes) # create indexes

statements.append(".timer on")
statements.append(query_stmt) # Actual query from this case
Expand Down Expand Up @@ -130,13 +134,16 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
statements.extend(complete_tables)
statements.extend(actual_tables)

# Dropping and recreating tables in between runs removes any cache influences
# Dropping and recreating tables and indexes in between runs removes any cache influences
refill_stmts: list[str] = list()
for name, table in params['data'].items():
refill_stmts.append(f'DROP TABLE "{name}";')
# Dropping a table also drops its indexes
refill_stmts.extend(actual_tables)
for name, table in params['data'].items():
refill_stmts.append(f'INSERT INTO "{name}" (SELECT * FROM "{name}{COMPLETE_TABLE_SUFFIX}");')
create_indexes: list[str] = self.generate_create_index_stmts(name, table.get('indexes', dict()))
refill_stmts.extend(create_indexes)

for _ in range(n_runs):
statements.extend(refill_stmts)
Expand Down
8 changes: 8 additions & 0 deletions benchmark/database_connectors/hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ def _execute(n_runs: int, params: dict[str, Any]) -> ConnectorResult:
# Set up tables
for table_name, table in params['data'].items():
connection.execute_command(f'DELETE FROM "{table_name}";') # Empty table first
# Index support is currently disabled in tableauhyperapi
# drop_indexes: list[str] = HyPer.generate_drop_index_stmts(table.get('indexes', dict()))
# for stmt in drop_indexes:
# connection.execute_command(stmt) # Drop indexes

sf: float | int
if table.get('scale_factors') is not None:
Expand All @@ -107,6 +111,10 @@ def _execute(n_runs: int, params: dict[str, Any]) -> ConnectorResult:
header: int = int(table.get('header', 0))
num_rows: int = round((table['lines_in_file'] - header) * sf)
connection.execute_command(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}_tmp" LIMIT {num_rows};')
# Index support is currently disabled in tableauhyperapi
# create_indexes: list[str] = HyPer.generate_create_index_stmts(table_name, table.get('indexes', dict()))
# for stmt in create_indexes:
# connection.execute_command(stmt) # Create indexes

# Execute query
with connection.execute_query(query) as result:
Expand Down
21 changes: 21 additions & 0 deletions benchmark/database_connectors/mutable.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,10 @@ def get_setup_statements(self, suite: str, path_to_data: str, data: dict[str, di
import_str += ' HAS HEADER SKIP HEADER'
statements.append(import_str + ';')

# Create CREATE INDEX statements for current table
create_indexes: list[str] = self.generate_create_index_stmts(table_name, table.get('indexes', dict()))
statements.extend(create_indexes)

return statements


Expand All @@ -283,6 +287,23 @@ def parse_results(results: str, pattern: str) -> list[float]:
return durations


# Overrides `generate_create_index_stmts` from Connector ABC
@staticmethod
def generate_create_index_stmts(table_name: str, indexes: dict[str, dict[str, Any]]) -> list[str]:
create_indexes: list[str] = list()
for index_name, index in indexes.items():
method: str | None = index.get('method')
attributes: str | list[str] = index['attributes']
if isinstance(attributes, list):
attributes = ', '.join(attributes)
index_str: str = f'CREATE INDEX {index_name} ON {table_name}'
if method:
index_str += f' USING {method}'
index_str += f' ({attributes});'
create_indexes.append(index_str)
return create_indexes


# Overrides `print_command` from Connector ABC
def print_command(self, command: str | bytes | Sequence[str | bytes], query: str, indent: str = '') -> None:
assert isinstance(command, Sequence) and isinstance(command[0], str) and not isinstance(command, str), \
Expand Down
9 changes: 8 additions & 1 deletion benchmark/database_connectors/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
header: int = int(table.get('header', 0))
num_rows: int = round((table['lines_in_file'] - header) * sf)
cursor.execute(f'DELETE FROM "{table_name}";') # empty existing table
drop_indexes: list[str] = self.generate_drop_index_stmts(table.get('indexes', dict()))
cursor.execute(''.join(drop_indexes))
cursor.execute(f'INSERT INTO "{table_name}" SELECT * FROM "{table_name}{COMPLETE_TABLE_SUFFIX}" LIMIT {num_rows};') # copy data with scale factor
create_indexes: list[str] = self.generate_create_index_stmts(table_name, table.get('indexes', dict()))
cursor.execute(''.join(create_indexes))
finally:
connection.close()
del connection
Expand Down Expand Up @@ -129,13 +133,16 @@ def execute(self, n_runs: int, params: dict[str, Any]) -> ConnectorResult:
# Prepare db
actual_tables: list[str] = self.prepare_db(params)

# Dropping and recreating tables in between runs removes any cache influences
# Dropping and recreating tables and indexes in between runs removes any cache influences
refill_stmts: list[str] = list()
for name, table in params['data'].items():
refill_stmts.append(f'DROP TABLE "{name}";')
# Dropping a table also drops its indexes
refill_stmts.extend(actual_tables)
for name, table in params['data'].items():
refill_stmts.append(f'INSERT INTO "{name}" (SELECT * FROM "{name}{COMPLETE_TABLE_SUFFIX}");')
create_indexes: list[str] = self.generate_create_index_stmts(name, table.get('indexes', dict()))
refill_stmts.extend(create_indexes)

# Write cases/queries to a file that will be passed to the command to execute
with open(TMP_SQL_FILE, "w") as tmp:
Expand Down

0 comments on commit 68fce79

Please sign in to comment.