From 750bafe0e6683c865d9f9d10aabd18ceddc0384e Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 9 Aug 2024 21:01:12 +0000 Subject: [PATCH] Auto-fix lint and format issues --- airbyte/_future_cdk/sql_processor.py | 52 ++++++++++++++-------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/airbyte/_future_cdk/sql_processor.py b/airbyte/_future_cdk/sql_processor.py index f157bcb7..6ec86e1d 100644 --- a/airbyte/_future_cdk/sql_processor.py +++ b/airbyte/_future_cdk/sql_processor.py @@ -444,15 +444,18 @@ def _ensure_compatible_table_schema( if column_name in existing_schema: existing_column_spec = existing_schema[column_name] if self._is_size_expansion_needed(existing_column_spec, new_column_spec): - alterations.append(self._generate_alter_column_statement(table_name, column_name, new_column_spec)) + alterations.append( + self._generate_alter_column_statement( + table_name, column_name, new_column_spec + ) + ) if alterations: self._execute_alterations(alterations) def _get_stream_schema(self, stream_name: str) -> dict: - """ - Retrieve the schema for the specified stream. - + """Retrieve the schema for the specified stream. + :param stream_name: Name of the stream :return: Dictionary of the stream's schema with column names as keys and their specifications as values """ @@ -460,9 +463,8 @@ def _get_stream_schema(self, stream_name: str) -> dict: pass def _get_table_schema(self, table_name: str) -> dict: - """ - Retrieve the schema of the specified table. - + """Retrieve the schema of the specified table. + :param table_name: Name of the table :return: Dictionary of existing schema with column names as keys and their specifications as values """ @@ -471,42 +473,41 @@ def _get_table_schema(self, table_name: str) -> dict: result = conn.execute(query).fetchall() schema = {} for row in result: - schema[row['Field']] = row + schema[row["Field"]] = row return schema def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool: - """ - Check if the column size needs to be expanded. - + """Check if the column size needs to be expanded. + :param existing_spec: Specification of the existing column :param new_spec: Specification of the new column :return: True if size expansion is needed, False otherwise """ - existing_type = existing_spec['Type'] - new_type = new_spec['Type'] - - if '(' in existing_type and '(' in new_type: - existing_size = int(existing_type.split('(')[1].rstrip(')')) - new_size = int(new_type.split('(')[1].rstrip(')')) + existing_type = existing_spec["Type"] + new_type = new_spec["Type"] + + if "(" in existing_type and "(" in new_type: + existing_size = int(existing_type.split("(")[1].rstrip(")")) + new_size = int(new_type.split("(")[1].rstrip(")")) return new_size > existing_size return False - def _generate_alter_column_statement(self, table_name: str, column_name: str, column_spec: dict) -> str: - """ - Generate an ALTER TABLE statement for expanding column size. - + def _generate_alter_column_statement( + self, table_name: str, column_name: str, column_spec: dict + ) -> str: + """Generate an ALTER TABLE statement for expanding column size. + :param table_name: Name of the table :param column_name: Name of the column :param column_spec: New column specification :return: ALTER TABLE statement as a string """ - new_type = column_spec['Type'] + new_type = column_spec["Type"] return f"ALTER TABLE {table_name} MODIFY {column_name} {new_type}" def _execute_alterations(self, alterations: list[str]) -> None: - """ - Execute a list of ALTER TABLE statements. - + """Execute a list of ALTER TABLE statements. + :param alterations: List of ALTER TABLE statements """ with self.get_sql_connection() as conn: @@ -531,7 +532,6 @@ def _create_table( """ _ = self._execute_sql(cmd) - @final def _get_sql_column_definitions( self,