Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added Athena Connector #83

Merged
merged 26 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a3c3bce
feat: Added Athena Connector
TivonB-AI2 Apr 26, 2024
4773a03
feat: Added AWS Athena connector
TivonB-AI2 Apr 26, 2024
5968529
feat: Added AWS Athena connector
TivonB-AI2 Apr 27, 2024
8b591b0
feat: Added AWS Athena connector
TivonB-AI2 Apr 27, 2024
82a4ac2
feat: Added AWS Athena connector
TivonB-AI2 Apr 27, 2024
0aa549f
feat: Added AWS Athena connector
TivonB-AI2 Apr 29, 2024
5d28978
feat: Added AWS Athena connector
TivonB-AI2 Apr 29, 2024
c7a3fc7
feat: Added AWS Athena connector
TivonB-AI2 Apr 29, 2024
58b4722
feat: Added AWS Athena connector
TivonB-AI2 May 1, 2024
8c26d70
feat: Added AWS Athena connector
TivonB-AI2 May 2, 2024
9d572ce
feat: Added AWS Athena connector
TivonB-AI2 May 2, 2024
62436e1
feat: Added AWS Athena connector
TivonB-AI2 May 6, 2024
1c45dbf
feat: Added AWS Athena connector
TivonB-AI2 May 7, 2024
ef6873e
feat: Added AWS Athena connector
TivonB-AI2 May 7, 2024
a4ad659
feat: Added AWS Athena connector
TivonB-AI2 May 7, 2024
7ad8605
feat: Added AWS Athena connector
TivonB-AI2 May 8, 2024
8858657
Merge branch 'main' into feat/add-athena-connector
TivonB-AI2 May 8, 2024
7c148e6
Merge branch 'main' into feat/add-athena-connector
TivonB-AI2 May 9, 2024
58d06b7
fix: Added Athena Connector
TivonB-AI2 May 11, 2024
219da67
Merge branch 'main' into feat/add-athena-connector
TivonB-AI2 May 13, 2024
52c048a
fix: Added Athena Connector
TivonB-AI2 May 13, 2024
b09a3fd
fix: Added Athena Connector
TivonB-AI2 May 13, 2024
b2d1a2f
feat: Added Athena Connector
TivonB-AI2 May 14, 2024
176a3ef
feat: Added Athena Connector
TivonB-AI2 May 14, 2024
3317138
feat: Added Athena Connector
TivonB-AI2 May 14, 2024
a7adb29
feat: Added Athena Connector
TivonB-AI2 May 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ gem "async-websocket", "~> 0.8.0"

gem "git"

gem "hubspot-api-client"
gem "hubspot-api-client", "~> 17.2.0"
karthik-sivadas marked this conversation as resolved.
Show resolved Hide resolved

gem "ruby-limiter"

Expand All @@ -51,6 +51,8 @@ gem "net-sftp"

gem "csv"

gem "aws-sdk-athena"

group :development, :test do
gem "simplecov", require: false
gem "simplecov_json_formatter", require: false
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
require_relative "integrations/source/postgresql/client"
require_relative "integrations/source/databricks/client"
require_relative "integrations/source/salesforce_consumer_goods_cloud/client"
require_relative "integrations/source/aws_athena/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations/rollout.rb
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Integrations
Postgresql
Databricks
SalesforceConsumerGoodsCloud
AWSAthena
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
121 changes: 121 additions & 0 deletions integrations/lib/multiwoven/integrations/source/aws_athena/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# frozen_string_literal: true

require "aws-sdk-athena"
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved

module Multiwoven::Integrations::Source
module AWSAthena
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
athena_client = create_connection(connection_config)
athena_client.list_work_groups
ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
rescue StandardError => e
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = '#{connection_config[:schema]}' ORDER BY table_name, ordinal_position;"
db = create_connection(connection_config)
response = db.start_query_execution(
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved
query_string: query,
query_execution_context: { database: connection_config[:schema] },
result_configuration: { output_location: connection_config[:output_location] }
)
query_execution_id = response[:query_execution_id]
wait_for_query_completion(db, query_execution_id)

results = transform_results(db.get_query_results(query_execution_id: query_execution_id))
catalog = Catalog.new(streams: create_streams(results))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"AWS:ATHENA:DISCOVER:EXCEPTION",
"error",
e
)
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?

db = create_connection(connection_config)
response = db.start_query_execution(
query_string: query,
query_execution_context: { database: sync_config[:source][:connection_specification][:schema] },
result_configuration: { output_location: sync_config[:source][:connection_specification][:output_location] }
)
query_execution_id = response[:query_execution_id]
wait_for_query_completion(db, query_execution_id)
results = transform_results(db.get_query_results(query_execution_id: query_execution_id))
query(results)
rescue StandardError => e
handle_exception(
"AWS:ATHENA:READ:EXCEPTION",
"error",
e
)
end

private

def create_connection(connection_config)
Aws.config.update({ credentials: Aws::Credentials.new(connection_config[:access_key], connection_config[:secret_access_key]), region: connection_config[:region] })
Aws::Athena::Client.new
end

def create_streams(records)
group_by_table(records).map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def wait_for_query_completion(db, query_execution_id)
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved
loop do
response = db.get_query_execution(query_execution_id: query_execution_id)
status = response.query_execution.status.state
break if %w[SUCCEEDED FAILED CANCELLED].include?(status)

sleep 1
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved
end
end

def transform_results(results)
columns = results.result_set.result_set_metadata.column_info.map(&:name)
rows = results.result_set.rows.map do |row|
row.data.map(&:var_char_value)
end
rows.map { |row| columns.zip(row).to_h }
end

def query(queries)
records = []
queries.map do |row|
records << RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
records
end

def group_by_table(records)
result = {}
records.each_with_index do |entry, index|
table_name = entry["table_name"]
column_data = {
column_name: entry["column_name"],
data_type: entry["data_type"],
is_nullable: entry["is_nullable"] == "YES"
}
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = [column_data]
end
result
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "AWSAthena",
"title": "AWS Athena",
"connector_type": "source",
"category": "Data Warehouse",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "source-aws-athena",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/athena",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AWS Athena",
"type": "object",
"required": ["region", "workgroup", "catalog", "database", "output_location"],
"properties": {
"access_key": {
"description": "The AWS Access Key ID to use for authentication.",
"examples": ["AWSATHENAEXAMPLE"],
"type": "string",
"title": "Personal Access Key",
"order": 0
},
"secret_access_key": {
"description": "The AWS Secret Access Key to use for authentication.",
"examples": ["AWS/ATHENA/EXAMPLEKEY"],
"type": "string",
"title": "Secret Access Key",
TivonB-AI2 marked this conversation as resolved.
Show resolved Hide resolved
"order": 1
},
"region": {
"description": "AWS region where Athena is located.",
"examples": ["ATHENA_REGION"],
"type": "string",
"title": "Secret Access Key",
"order": 2
},
"workgroup": {
"description": "The Athena workgroup you previously set up in AWS.",
"examples": ["ATHENA_WORKGROUP"],
"type": "string",
"title": "Workgroup",
"order": 3
},
"catalog": {
"description": "The Data catalog name within Athena.",
"examples": ["ATHENA_CATALOG"],
"type": "string",
"title": "Catalog",
"order": 4
},
"schema": {
"description": "The specific Athena database/schema to connect to.",
"examples": ["ATHENA_DB"],
"type": "string",
"title": "Database",
"order": 5
},
"output_location": {
"description": "S3 path for query output.",
"examples": ["s3://example-bucket-name/query-results/"],
"type": "string",
"title": "Query",
"order": 6
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions integrations/multiwoven-integrations.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Gem::Specification.new do |spec|

spec.add_runtime_dependency "activesupport"
spec.add_runtime_dependency "async-websocket"
spec.add_runtime_dependency "aws-sdk-athena"
spec.add_runtime_dependency "csv"
spec.add_runtime_dependency "dry-schema"
spec.add_runtime_dependency "dry-struct"
Expand Down
Loading
Loading