Skip to content

Commit

Permalink
feat(CE): Add databricks lakehouse destination
Browse files Browse the repository at this point in the history
  • Loading branch information
TivonB-AI2 authored and ai-squared committed Jul 17, 2024
1 parent fba4c85 commit 02c1d62
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 2 deletions.
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.4.1)
multiwoven-integrations (0.5.0)
activesupport
async-websocket
aws-sdk-athena
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
require_relative "integrations/destination/http/client"
require_relative "integrations/destination/iterable/client"
require_relative "integrations/destination/maria_db/client"
require_relative "integrations/destination/databricks_lakehouse/client"

module Multiwoven
module Integrations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# frozen_string_literal: true

module Multiwoven
module Integrations
module Destination
module DatabricksLakehouse
include Multiwoven::Integrations::Core
class Client < DestinationConnector
MAX_CHUNK_SIZE = 10
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
db = create_connection(connection_config)
response = db.get("/api/2.0/clusters/list")
if response.status == 200
success_status
else
failure_status(nil)
end
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS:LAKEHOUSE:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
failure_status(e)
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
table_query = "SHOW TABLES IN #{connection_config[:catalog]}.#{connection_config[:schema]};"
db = create_connection(connection_config)
records = []
table_response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], table_query).to_json)
table_response_body = JSON.parse(table_response.body)
table_response_body["result"]["data_array"].each do |table|
table_name = table[1]
query = "DESCRIBE TABLE #{connection_config[:catalog]}.#{connection_config[:schema]}.#{table_name};"
column_response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json)
column_response_body = JSON.parse(column_response.body)
records << [table_name, column_response_body["result"]["data_array"]]
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"DATABRICKS:LAKEHOUSE:DISCOVER:EXCEPTION",
"error",
e
)
end

def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = "#{connection_config[:catalog]}.#{connection_config[:schema]}.#{sync_config.stream.name}"
primary_key = sync_config.model.primary_key
db = create_connection(connection_config)
write_success = 0
write_failure = 0
log_message_array = []

records.each do |record|
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("DATABRICKS:LAKEHOUSE:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
arg = ["/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query)]
response = db.post("/api/2.0/sql/statements", generate_body(connection_config[:warehouse_id], query).to_json)
if response.status == 200
write_success += 1
else
write_failure += 1
end
log_message_array << log_request_response("info", arg, response)
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
end
end
tracking_message(write_success, write_failure)
rescue StandardError => e
handle_exception(e, {
context: "DATABRICKS:LAKEHOUSE:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def create_connection(connection_config)
Faraday.new(url: connection_config[:host]) do |conn|
conn.headers["Authorization"] = "Bearer #{connection_config[:api_token]}"
conn.headers["Content-Type"] = "application/json"
conn.adapter Faraday.default_adapter
end
end

def generate_body(warehouse_id, query)
{
warehouse_id: warehouse_id,
statement: query,
wait_timeout: "15s"
}
end

def create_streams(records)
message = []
group_by_table(records).each_value do |r|
message << Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
message
end

def group_by_table(records)
result = {}
records.each_with_index do |entries, index|
table_name = records[index][0]
column = []
entry_data = entries[1]
entry_data.each do |entry|
column << {
column_name: entry[0],
data_type: entry[1],
is_nullable: true
}
end
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = column
end
result
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "DatabricksLakehouse",
"title": "Databricks Lakehouse",
"connector_type": "destination",
"category": "Marketing Automation",
"documentation_url": "https://docs.multiwoven.com/destinations/databricks_lakehouse",
"github_issue_label": "destination-databricks-lakehouse",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/destination/databrick_lakehouse",
"stream_type": "static",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Databricks Lakehouse",
"type": "object",
"required": ["host", "api_token", "warehouse_id", "catalog", "schema"],
"properties": {
"host": {
"description": "The databrick lakehouse host domain.",
"type": "string",
"title": "Host",
"order": 0
},
"api_token": {
"description": "The databrick lakehouse api token.",
"type": "string",
"multiwoven_secret": true,
"title": "API Token",
"order": 1
},"warehouse_id": {
"description": "The databrick lakehouse warehouse ID.",
"type": "string",
"title": "Warehouse ID",
"order": 2
},
"catalog": {
"description": "The name of the catalog",
"default": "hive_metastore",
"type": "string",
"title": "Databricks catalog",
"order": 3
},
"schema": {
"description": "The default schema tables are written.",
"default": "default",
"type": "string",
"title": "Database schema",
"order": 4
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.4.1"
VERSION = "0.5.0"

ENABLED_SOURCES = %w[
Snowflake
Expand Down Expand Up @@ -33,6 +33,7 @@ module Integrations
Http
Iterable
MariaDB
DatabricksLakehouse
].freeze
end
end
Loading

0 comments on commit 02c1d62

Please sign in to comment.