Skip to content

Commit

Permalink
Resolved merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
GovindHede committed Aug 10, 2024
2 parents 19d417b + 750bafe commit 5b3a971
Showing 1 changed file with 35 additions and 16 deletions.
51 changes: 35 additions & 16 deletions airbyte/_future_cdk/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,22 @@ def _ensure_compatible_table_schema(
if column_name in existing_schema:
existing_column_spec = existing_schema[column_name]
if self._is_size_expansion_needed(existing_column_spec, new_column_spec):
alterations.append(self._generate_alter_column_statement(table_name, column_name, new_column_spec))
alterations.append(
self._generate_alter_column_statement(
table_name, column_name, new_column_spec
)
)

if alterations:
self._execute_alterations(alterations)

def _get_stream_schema(self, stream_name: str) -> dict:
<<<<<<< HEAD
"""
Retrieve the schema for the specified stream.
=======
"""Retrieve the schema for the specified stream.
>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e

:param stream_name: Name of the stream
:return: Dictionary of the stream's schema with column names as keys
Expand All @@ -461,8 +469,12 @@ def _get_stream_schema(self, stream_name: str) -> dict:
pass
def _get_table_schema(self, table_name: str) -> dict:
<<<<<<< HEAD
"""
Retrieve the schema of the specified table.
=======
"""Retrieve the schema of the specified table.
>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e
:param table_name: Name of the table
:return: Dictionary of existing schema with column names as keys
Expand All @@ -474,47 +486,55 @@ def _get_table_schema(self, table_name: str) -> dict:

schema = {}
for row in result:
<<<<<<< HEAD
if 'Field' in row:
schema[row['Field']] = row
else:
# Handle the missing column scenario
raise KeyError(f"Expected column 'Field' not found in the result of DESCRIBE {table_name}")

=======
schema[row["Field"]] = row
>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e
return schema



def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool:
"""Check if the column size needs to be expanded.
<<<<<<< HEAD
=======
>>>>>>> 750bafe0e6683c865d9f9d10aabd18ceddc0384e
:param existing_spec: Specification of the existing column
:param new_spec: Specification of the new column
:return: True if size expansion is needed, False otherwise
"""
existing_type = existing_spec['Type']
new_type = new_spec['Type']
if '(' in existing_type and '(' in new_type:
existing_size = int(existing_type.split('(')[1].rstrip(')'))
new_size = int(new_type.split('(')[1].rstrip(')'))
existing_type = existing_spec["Type"]
new_type = new_spec["Type"]

if "(" in existing_type and "(" in new_type:
existing_size = int(existing_type.split("(")[1].rstrip(")"))
new_size = int(new_type.split("(")[1].rstrip(")"))
return new_size > existing_size
return False

def _generate_alter_column_statement(self, table_name: str, column_name: str, column_spec: dict) -> str:
"""
Generate an ALTER TABLE statement for expanding column size.
def _generate_alter_column_statement(
self, table_name: str, column_name: str, column_spec: dict
) -> str:
"""Generate an ALTER TABLE statement for expanding column size.
:param table_name: Name of the table
:param column_name: Name of the column
:param column_spec: New column specification
:return: ALTER TABLE statement as a string
"""
new_type = column_spec['Type']
new_type = column_spec["Type"]
return f"ALTER TABLE {table_name} MODIFY {column_name} {new_type}"

def _execute_alterations(self, alterations: list[str]) -> None:
"""
Execute a list of ALTER TABLE statements.
"""Execute a list of ALTER TABLE statements.
:param alterations: List of ALTER TABLE statements
"""
with self.get_sql_connection() as conn:
Expand All @@ -539,7 +559,6 @@ def _create_table(
"""
_ = self._execute_sql(cmd)


@final
def _get_sql_column_definitions(
self,
Expand Down

0 comments on commit 5b3a971

Please sign in to comment.