Skip to content

Commit

Permalink
feat: Added AWS Athena connector
Browse files Browse the repository at this point in the history
  • Loading branch information
TivonB-AI2 committed May 8, 2024
1 parent a4ad659 commit 7ad8605
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def check_connection(connection_config)
def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = '#{connection_config[:schema]}' ORDER BY table_name, ordinal_position;"
results = query_execution(connection_config, query)
db = create_connection(connection_config)
results = query_execution(db, query)
catalog = Catalog.new(streams: create_streams(results))
catalog.to_multiwoven_message
rescue StandardError => e
Expand All @@ -32,8 +33,8 @@ def read(sync_config)
connection_config = connection_config.with_indifferent_access
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?
results = query_execution(connection_config, query)
query(results)
db = create_connection(connection_config)
query(db, query)
rescue StandardError => e
handle_exception(
"AWS:ATHENA:READ:EXCEPTION",
Expand All @@ -46,15 +47,16 @@ def read(sync_config)

def create_connection(connection_config)
Aws.config.update({ credentials: Aws::Credentials.new(connection_config[:access_key], connection_config[:secret_access_key]), region: connection_config[:region] })
@database = connection_config[:schema]
@output_location = connection_config[:output_location]
Aws::Athena::Client.new
end

def query_execution(connection_config, query)
db = create_connection(connection_config)
def query_execution(db, query)
response = db.start_query_execution(
query_string: query,
query_execution_context: { database: connection_config[:schema] },
result_configuration: { output_location: connection_config[:output_location] }
query_execution_context: { database: @database },
result_configuration: { output_location: @output_location }
)
query_execution_id = response[:query_execution_id]
loop do
Expand All @@ -79,9 +81,9 @@ def transform_results(results)
rows.map { |row| columns.zip(row).to_h }
end

def query(queries)
def query(db, query)
records = []
queries.map do |row|
query_execution(db, query).map do |row|
records << RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
records
Expand Down

0 comments on commit 7ad8605

Please sign in to comment.