Skip to content

Commit

Permalink
Built the basic flow for the async pipeline - testing is remaining
Browse files Browse the repository at this point in the history
  • Loading branch information
jprakash-db committed Oct 29, 2024
1 parent ecdddba commit e637408
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
7 changes: 7 additions & 0 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ def execute(
self,
operation: str,
parameters: Optional[TParameterCollection] = None,
perform_async = True
) -> "Cursor":
"""
Execute a query and wait for execution to complete.
Expand Down Expand Up @@ -796,6 +797,7 @@ def execute(
cursor=self,
use_cloud_fetch=self.connection.use_cloud_fetch,
parameters=prepared_params,
perform_async=perform_async,
)
self.active_result_set = ResultSet(
self.connection,
Expand All @@ -812,6 +814,11 @@ def execute(

return self

def executeAsync(self,
operation: str,
parameters: Optional[TParameterCollection] = None,):
return execute(operation, parameters, True)

def executemany(self, operation, seq_of_parameters):
"""
Execute the operation once for every set of passed in parameters.
Expand Down
14 changes: 9 additions & 5 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,7 @@ def execute_command(
cursor,
use_cloud_fetch=True,
parameters=[],
perform_async=False,
):
assert session_handle is not None

Expand Down Expand Up @@ -846,7 +847,8 @@ def execute_command(
parameters=parameters,
)
resp = self.make_request(self._client.ExecuteStatement, req)
return self._handle_execute_response(resp, cursor)

return self._handle_execute_response(resp, cursor, perform_async)

def get_catalogs(self, session_handle, max_rows, max_bytes, cursor):
assert session_handle is not None
Expand Down Expand Up @@ -934,14 +936,16 @@ def get_columns(
resp = self.make_request(self._client.GetColumns, req)
return self._handle_execute_response(resp, cursor)

def _handle_execute_response(self, resp, cursor):
def _handle_execute_response(self, resp, cursor, perform_async=False):
cursor.active_op_handle = resp.operationHandle
self._check_direct_results_for_error(resp.directResults)

final_operation_state = self._wait_until_command_done(
if perform_async:
final_operation_state=ttypes.TStatusCode.STILL_EXECUTING_STATUS
else:
final_operation_state=self._wait_until_command_done(
resp.operationHandle,
resp.directResults and resp.directResults.operationStatus,
)
resp.directResults and resp.directResults.operationStatus)

return self._results_message_to_execute_response(resp, final_operation_state)

Expand Down

0 comments on commit e637408

Please sign in to comment.