Skip to content

Commit

Permalink
feat: Added Athena Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
TivonB-AI2 committed Apr 26, 2024
1 parent 0dbb0b0 commit a3c3bce
Show file tree
Hide file tree
Showing 6 changed files with 422 additions and 0 deletions.
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
require_relative "integrations/source/postgresql/client"
require_relative "integrations/source/databricks/client"
require_relative "integrations/source/salesforce_consumer_goods_cloud/client"
require_relative "integrations/source/athena/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
107 changes: 107 additions & 0 deletions integrations/lib/multiwoven/integrations/source/athena/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# frozen_string_literal: true

require "pg"

module Multiwoven::Integrations::Source
module AWSAthena
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
rescue PG::Error => e
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '#{connection_config[:schema]}' AND table_catalog = '#{connection_config[:database]}'
ORDER BY table_name, ordinal_position;"

db = create_connection(connection_config)
records = db.exec(query) do |result|
result.map do |row|
row
end
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"AWS:ATHENA:DISCOVER:EXCEPTION",
"error",
e
)
ensure
db&.close
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?

db = create_connection(connection_config)

query(db, query)
rescue StandardError => e
handle_exception(
"AWS:ATHENA:READ:EXCEPTION",
"error",
e
)
ensure
db&.close
end

private

def query(connection, query)
connection.exec(query) do |result|
result.map do |row|
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
end
end

def create_connection(connection_config)
raise "Unsupported Auth type" unless connection_config[:credentials][:auth_type] == "access_key/secret_access_key"

PG.connect(
access_key: connection_config[:access_key],
secret_access_key: connection_config[:secret_access_key],
region: connection_config[:region],
workgroup: connection_config[:workgroup],
catalog: connection_config[:catalog],
dbname: connection_config[:database],
output_location: connection_config[:output_location]
)
end

def create_streams(records)
group_by_table(records).map do |r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def group_by_table(records)
records.group_by { |entry| entry["table_name"] }.map do |table_name, columns|
{
tablename: table_name,
columns: columns.map do |column|
{
column_name: column["column_name"],
type: column["data_type"],
optional: column["is_nullable"] == "YES"
}
end
}
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "AWSAthena",
"title": "AWS Athena",
"connector_type": "source",
"category": "Data Warehouse",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "source-aws-athena",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/athena",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AWS Athena",
"type": "object",
"required": ["region", "workgroup", "catalog", "database", "output_location", "schema"],
"properties": {
"credentials": {
"title": "",
"type": "object",
"required": ["auth_type", "access_key", "secret_access_key"],
"properties": {
"auth_type": {
"type": "string",
"default": "access_key/secret_access_key",
"order": 0,
"readOnly": true
},
"access_key": {
"description": "The AWS Access Key ID to use for authentication.",
"examples": ["AWSATHENAEXAMPLE"],
"type": "string",
"title": "Personal Access Key",
"order": 1
},
"secret_access_key": {
"description": "The AWS Secret Access Key to use for authentication.",
"examples": ["AWS/ATHENA/EXAMPLEKEY"],
"type": "string",
"title": "Secret Access Key",
"order": 2
}
},
"order": 0
},
"region": {
"description": "AWS region where Athena is located.",
"examples": ["ATHENA_REGION"],
"type": "string",
"title": "Secret Access Key",
"order": 1
},
"workgroup": {
"description": "The Athena workgroup you previously set up in AWS.",
"examples": ["ATHENA_WORKGROUP"],
"type": "string",
"title": "Workgroup",
"order": 2
},
"catalog": {
"description": "The Data catalog name within Athena.",
"examples": ["ATHENA_CATALOG"],
"type": "string",
"title": "Catalog",
"order": 3
},
"database": {
"description": "The specific Athena database to connect to.",
"examples": ["ATHENA_DB"],
"type": "string",
"title": "Database",
"order": 4
},
"output_location": {
"description": "S3 path for query output.",
"examples": ["s3://example-bucket-name/query-results/"],
"type": "string",
"title": "Query",
"order": 5
},
"schema": {
"description": "The schema within the Athena database.",
"examples": ["ATHENA_SCHEMA"],
"type": "string",
"title": "Schema",
"order": 6
}
}
}
}
22 changes: 22 additions & 0 deletions integrations/lib/multiwoven/integrations/source/athena/icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit a3c3bce

Please sign in to comment.