diff --git a/airbyte/_future_cdk/sql_processor.py b/airbyte/_future_cdk/sql_processor.py index 468ec45a..5a2bff5e 100644 --- a/airbyte/_future_cdk/sql_processor.py +++ b/airbyte/_future_cdk/sql_processor.py @@ -444,14 +444,22 @@ def _ensure_compatible_table_schema( if column_name in existing_schema: existing_column_spec = existing_schema[column_name] if self._is_size_expansion_needed(existing_column_spec, new_column_spec): - alterations.append(self._generate_alter_column_statement(table_name, column_name, new_column_spec)) + alterations.append( + self._generate_alter_column_statement( + table_name, column_name, new_column_spec + ) + ) if alterations: self._execute_alterations(alterations) def _get_stream_schema(self, stream_name: str) -> dict: +<<<<<<< HEAD """ Retrieve the schema for the specified stream. +======= + """Retrieve the schema for the specified stream. +>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e :param stream_name: Name of the stream :return: Dictionary of the stream's schema with column names as keys @@ -461,8 +469,12 @@ def _get_stream_schema(self, stream_name: str) -> dict: pass def _get_table_schema(self, table_name: str) -> dict: +<<<<<<< HEAD """ Retrieve the schema of the specified table. +======= + """Retrieve the schema of the specified table. +>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e :param table_name: Name of the table :return: Dictionary of existing schema with column names as keys @@ -474,47 +486,55 @@ def _get_table_schema(self, table_name: str) -> dict: schema = {} for row in result: +<<<<<<< HEAD if 'Field' in row: schema[row['Field']] = row else: # Handle the missing column scenario raise KeyError(f"Expected column 'Field' not found in the result of DESCRIBE {table_name}") +======= + schema[row["Field"]] = row +>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e return schema def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool: """Check if the column size needs to be expanded. +<<<<<<< HEAD +======= + +>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e :param existing_spec: Specification of the existing column :param new_spec: Specification of the new column :return: True if size expansion is needed, False otherwise """ - existing_type = existing_spec['Type'] - new_type = new_spec['Type'] - - if '(' in existing_type and '(' in new_type: - existing_size = int(existing_type.split('(')[1].rstrip(')')) - new_size = int(new_type.split('(')[1].rstrip(')')) + existing_type = existing_spec["Type"] + new_type = new_spec["Type"] + + if "(" in existing_type and "(" in new_type: + existing_size = int(existing_type.split("(")[1].rstrip(")")) + new_size = int(new_type.split("(")[1].rstrip(")")) return new_size > existing_size return False - def _generate_alter_column_statement(self, table_name: str, column_name: str, column_spec: dict) -> str: - """ - Generate an ALTER TABLE statement for expanding column size. - + def _generate_alter_column_statement( + self, table_name: str, column_name: str, column_spec: dict + ) -> str: + """Generate an ALTER TABLE statement for expanding column size. + :param table_name: Name of the table :param column_name: Name of the column :param column_spec: New column specification :return: ALTER TABLE statement as a string """ - new_type = column_spec['Type'] + new_type = column_spec["Type"] return f"ALTER TABLE {table_name} MODIFY {column_name} {new_type}" def _execute_alterations(self, alterations: list[str]) -> None: - """ - Execute a list of ALTER TABLE statements. - + """Execute a list of ALTER TABLE statements. + :param alterations: List of ALTER TABLE statements """ with self.get_sql_connection() as conn: @@ -539,7 +559,6 @@ def _create_table( """ _ = self._execute_sql(cmd) - @final def _get_sql_column_definitions( self,