Skip to content

Commit

Permalink
Auto-fix lint and format issues
Browse files Browse the repository at this point in the history
  • Loading branch information
octavia-squidington-iii committed Aug 9, 2024
1 parent e463120 commit 750bafe
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions airbyte/_future_cdk/sql_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,25 +444,27 @@ def _ensure_compatible_table_schema(
if column_name in existing_schema:
existing_column_spec = existing_schema[column_name]
if self._is_size_expansion_needed(existing_column_spec, new_column_spec):
alterations.append(self._generate_alter_column_statement(table_name, column_name, new_column_spec))
alterations.append(
self._generate_alter_column_statement(
table_name, column_name, new_column_spec
)
)

if alterations:
self._execute_alterations(alterations)

def _get_stream_schema(self, stream_name: str) -> dict:
"""
Retrieve the schema for the specified stream.
"""Retrieve the schema for the specified stream.
:param stream_name: Name of the stream
:return: Dictionary of the stream's schema with column names as keys and their specifications as values
"""
# Implement this method to fetch the schema from the stream
pass

def _get_table_schema(self, table_name: str) -> dict:
"""
Retrieve the schema of the specified table.
"""Retrieve the schema of the specified table.
:param table_name: Name of the table
:return: Dictionary of existing schema with column names as keys and their specifications as values
"""
Expand All @@ -471,42 +473,41 @@ def _get_table_schema(self, table_name: str) -> dict:
result = conn.execute(query).fetchall()
schema = {}
for row in result:
schema[row['Field']] = row
schema[row["Field"]] = row
return schema

def _is_size_expansion_needed(self, existing_spec: dict, new_spec: dict) -> bool:
"""
Check if the column size needs to be expanded.
"""Check if the column size needs to be expanded.
:param existing_spec: Specification of the existing column
:param new_spec: Specification of the new column
:return: True if size expansion is needed, False otherwise
"""
existing_type = existing_spec['Type']
new_type = new_spec['Type']
if '(' in existing_type and '(' in new_type:
existing_size = int(existing_type.split('(')[1].rstrip(')'))
new_size = int(new_type.split('(')[1].rstrip(')'))
existing_type = existing_spec["Type"]
new_type = new_spec["Type"]

if "(" in existing_type and "(" in new_type:
existing_size = int(existing_type.split("(")[1].rstrip(")"))
new_size = int(new_type.split("(")[1].rstrip(")"))
return new_size > existing_size
return False

def _generate_alter_column_statement(self, table_name: str, column_name: str, column_spec: dict) -> str:
"""
Generate an ALTER TABLE statement for expanding column size.
def _generate_alter_column_statement(
self, table_name: str, column_name: str, column_spec: dict
) -> str:
"""Generate an ALTER TABLE statement for expanding column size.
:param table_name: Name of the table
:param column_name: Name of the column
:param column_spec: New column specification
:return: ALTER TABLE statement as a string
"""
new_type = column_spec['Type']
new_type = column_spec["Type"]
return f"ALTER TABLE {table_name} MODIFY {column_name} {new_type}"

def _execute_alterations(self, alterations: list[str]) -> None:
"""
Execute a list of ALTER TABLE statements.
"""Execute a list of ALTER TABLE statements.
:param alterations: List of ALTER TABLE statements
"""
with self.get_sql_connection() as conn:
Expand All @@ -531,7 +532,6 @@ def _create_table(
"""
_ = self._execute_sql(cmd)


@final
def _get_sql_column_definitions(
self,
Expand Down

0 comments on commit 750bafe

Please sign in to comment.