From e463120a68d36f7dcf9ea489d4252f5c3b8a3efd Mon Sep 17 00:00:00 2001 From: GovindHede Date: Wed, 7 Aug 2024 00:22:34 +0530 Subject: [PATCH 1/3] Ensure compatible table schema by adding missing columns and expanding column sizes --- airbyte/_future_cdk/sql_processor.py | 84 +++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/airbyte/_future_cdk/sql_processor.py b/airbyte/_future_cdk/sql_processor.py index 925ab5e3..f157bcb7 100644 --- a/airbyte/_future_cdk/sql_processor.py +++ b/airbyte/_future_cdk/sql_processor.py @@ -424,18 +424,95 @@ def _ensure_compatible_table_schema( stream_name: str, table_name: str, ) -> None: - """Return true if the given table is compatible with the stream's schema. + """Ensure the given table is compatible with the stream's schema. Raises an exception if the table schema is not compatible with the schema of the input stream. """ - # TODO: Expand this to check for column types and sizes. - # https://github.com/airbytehq/pyairbyte/issues/321 + # First, add any missing columns to the table self._add_missing_columns_to_table( stream_name=stream_name, table_name=table_name, ) + # Now, ensure column sizes are compatible + stream_schema = self._get_stream_schema(stream_name) + existing_schema = self._get_table_schema(table_name) + + alterations = [] + for column_name, new_column_spec in stream_schema.items(): + if column_name in existing_schema: + existing_column_spec = existing_schema[column_name] + if self._is_size_expansion_needed(existing_column_spec, new_column_spec): + alterations.append(self._generate_alter_column_statement(table_name, column_name, new_column_spec)) + + if alterations: + self._execute_alterations(alterations) + + def _get_stream_schema(self, stream_name: str) -> dict: + """ + Retrieve the schema for the specified stream. + + :param stream_name: Name of the stream + :return: Dictionary of the stream's schema with column names as keys and their specifications as values + """ + # Implement this method to fetch the schema from the stream + pass + + def _get_table_schema(self, table_name: str) -> dict: + """ + Retrieve the schema of the specified table. + + :param table_name: Name of the table + :return: Dictionary of existing schema with column names as keys and their specifications as values + """ + query = f"DESCRIBE {table_name}" + with self.get_sql_connection() as conn: + result = conn.execute(query).fetchall() + schema = {} + for row in result: + schema[row['Field']] = row + return schema + + def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool: + """ + Check if the column size needs to be expanded. + + :param existing_spec: Specification of the existing column + :param new_spec: Specification of the new column + :return: True if size expansion is needed, False otherwise + """ + existing_type = existing_spec['Type'] + new_type = new_spec['Type'] + + if '(' in existing_type and '(' in new_type: + existing_size = int(existing_type.split('(')[1].rstrip(')')) + new_size = int(new_type.split('(')[1].rstrip(')')) + return new_size > existing_size + return False + + def _generate_alter_column_statement(self, table_name: str, column_name: str, column_spec: dict) -> str: + """ + Generate an ALTER TABLE statement for expanding column size. + + :param table_name: Name of the table + :param column_name: Name of the column + :param column_spec: New column specification + :return: ALTER TABLE statement as a string + """ + new_type = column_spec['Type'] + return f"ALTER TABLE {table_name} MODIFY {column_name} {new_type}" + + def _execute_alterations(self, alterations: list[str]) -> None: + """ + Execute a list of ALTER TABLE statements. + + :param alterations: List of ALTER TABLE statements + """ + with self.get_sql_connection() as conn: + for alter_statement in alterations: + conn.execute(alter_statement) + @final def _create_table( self, @@ -454,6 +531,7 @@ def _create_table( """ _ = self._execute_sql(cmd) + @final def _get_sql_column_definitions( self, From 750bafe0e6683c865d9f9d10aabd18ceddc0384e Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 9 Aug 2024 21:01:12 +0000 Subject: [PATCH 2/3] Auto-fix lint and format issues --- airbyte/_future_cdk/sql_processor.py | 52 ++++++++++++++-------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/airbyte/_future_cdk/sql_processor.py b/airbyte/_future_cdk/sql_processor.py index f157bcb7..6ec86e1d 100644 --- a/airbyte/_future_cdk/sql_processor.py +++ b/airbyte/_future_cdk/sql_processor.py @@ -444,15 +444,18 @@ def _ensure_compatible_table_schema( if column_name in existing_schema: existing_column_spec = existing_schema[column_name] if self._is_size_expansion_needed(existing_column_spec, new_column_spec): - alterations.append(self._generate_alter_column_statement(table_name, column_name, new_column_spec)) + alterations.append( + self._generate_alter_column_statement( + table_name, column_name, new_column_spec + ) + ) if alterations: self._execute_alterations(alterations) def _get_stream_schema(self, stream_name: str) -> dict: - """ - Retrieve the schema for the specified stream. - + """Retrieve the schema for the specified stream. + :param stream_name: Name of the stream :return: Dictionary of the stream's schema with column names as keys and their specifications as values """ @@ -460,9 +463,8 @@ def _get_stream_schema(self, stream_name: str) -> dict: pass def _get_table_schema(self, table_name: str) -> dict: - """ - Retrieve the schema of the specified table. - + """Retrieve the schema of the specified table. + :param table_name: Name of the table :return: Dictionary of existing schema with column names as keys and their specifications as values """ @@ -471,42 +473,41 @@ def _get_table_schema(self, table_name: str) -> dict: result = conn.execute(query).fetchall() schema = {} for row in result: - schema[row['Field']] = row + schema[row["Field"]] = row return schema def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool: - """ - Check if the column size needs to be expanded. - + """Check if the column size needs to be expanded. + :param existing_spec: Specification of the existing column :param new_spec: Specification of the new column :return: True if size expansion is needed, False otherwise """ - existing_type = existing_spec['Type'] - new_type = new_spec['Type'] - - if '(' in existing_type and '(' in new_type: - existing_size = int(existing_type.split('(')[1].rstrip(')')) - new_size = int(new_type.split('(')[1].rstrip(')')) + existing_type = existing_spec["Type"] + new_type = new_spec["Type"] + + if "(" in existing_type and "(" in new_type: + existing_size = int(existing_type.split("(")[1].rstrip(")")) + new_size = int(new_type.split("(")[1].rstrip(")")) return new_size > existing_size return False - def _generate_alter_column_statement(self, table_name: str, column_name: str, column_spec: dict) -> str: - """ - Generate an ALTER TABLE statement for expanding column size. - + def _generate_alter_column_statement( + self, table_name: str, column_name: str, column_spec: dict + ) -> str: + """Generate an ALTER TABLE statement for expanding column size. + :param table_name: Name of the table :param column_name: Name of the column :param column_spec: New column specification :return: ALTER TABLE statement as a string """ - new_type = column_spec['Type'] + new_type = column_spec["Type"] return f"ALTER TABLE {table_name} MODIFY {column_name} {new_type}" def _execute_alterations(self, alterations: list[str]) -> None: - """ - Execute a list of ALTER TABLE statements. - + """Execute a list of ALTER TABLE statements. + :param alterations: List of ALTER TABLE statements """ with self.get_sql_connection() as conn: @@ -531,7 +532,6 @@ def _create_table( """ _ = self._execute_sql(cmd) - @final def _get_sql_column_definitions( self, From 19d417bc140be5e774e5f1b61b16b48aea56cadc Mon Sep 17 00:00:00 2001 From: GovindHede Date: Sun, 11 Aug 2024 00:53:39 +0530 Subject: [PATCH 3/3] Fix _get_table_schema method to handle missing 'Field' column --- airbyte/_future_cdk/sql_processor.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/airbyte/_future_cdk/sql_processor.py b/airbyte/_future_cdk/sql_processor.py index f157bcb7..468ec45a 100644 --- a/airbyte/_future_cdk/sql_processor.py +++ b/airbyte/_future_cdk/sql_processor.py @@ -452,9 +452,10 @@ def _ensure_compatible_table_schema( def _get_stream_schema(self, stream_name: str) -> dict: """ Retrieve the schema for the specified stream. - + :param stream_name: Name of the stream - :return: Dictionary of the stream's schema with column names as keys and their specifications as values + :return: Dictionary of the stream's schema with column names as keys + and their specifications as values """ # Implement this method to fetch the schema from the stream pass @@ -462,22 +463,29 @@ def _get_stream_schema(self, stream_name: str) -> dict: def _get_table_schema(self, table_name: str) -> dict: """ Retrieve the schema of the specified table. - + :param table_name: Name of the table - :return: Dictionary of existing schema with column names as keys and their specifications as values + :return: Dictionary of existing schema with column names as keys + and their specifications as values """ query = f"DESCRIBE {table_name}" with self.get_sql_connection() as conn: result = conn.execute(query).fetchall() + schema = {} for row in result: - schema[row['Field']] = row + if 'Field' in row: + schema[row['Field']] = row + else: + # Handle the missing column scenario + raise KeyError(f"Expected column 'Field' not found in the result of DESCRIBE {table_name}") + return schema + + def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool: - """ - Check if the column size needs to be expanded. - + """Check if the column size needs to be expanded. :param existing_spec: Specification of the existing column :param new_spec: Specification of the new column :return: True if size expansion is needed, False otherwise