Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Clickhouse connector #162

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ gem "aws-sdk-athena"

gem "rubyzip"

gem "faraday"

group :development, :test do
gem "simplecov", require: false
gem "simplecov_json_formatter", require: false
Expand Down
3 changes: 2 additions & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ GEM
multipart-post (~> 2.0)

PLATFORMS
arm64-darwin-23
x64-mingw-ucrt

DEPENDENCIES
activesupport
Expand All @@ -313,6 +313,7 @@ DEPENDENCIES
dry-schema
dry-struct
dry-types
faraday
git
google-apis-sheets_v4
google-cloud-bigquery
Expand Down
3 changes: 3 additions & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
require "aws-sdk-athena"
require "zip"
require "zendesk_api"
require "faraday"
require "base64"

# Service
require_relative "integrations/config"
Expand All @@ -51,6 +53,7 @@
require_relative "integrations/source/databricks/client"
require_relative "integrations/source/salesforce_consumer_goods_cloud/client"
require_relative "integrations/source/aws_athena/client"
require_relative "integrations/source/clickhouse/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.1.68"
VERSION = "0.1.69"

ENABLED_SOURCES = %w[
Snowflake
Expand All @@ -12,6 +12,7 @@ module Integrations
Databricks
SalesforceConsumerGoodsCloud
AwsAthena
Clickhouse
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
102 changes: 102 additions & 0 deletions integrations/lib/multiwoven/integrations/source/clickhouse/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Source
module Clickhouse
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(
status: ConnectionStatusType["succeeded"]
).to_multiwoven_message
rescue StandardError => e
ConnectionStatus.new(
status: ConnectionStatusType["failed"], message: e.message
).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = '#{connection_config[:database]}' ORDER BY table_name, ordinal_position;"
db = create_connection(connection_config)
records = query_execution(db, query)
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"CLICKHOUSE:DISCOVER:EXCEPTION",
"error",
e
)
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?
db = create_connection(connection_config)
query(db, query)
rescue StandardError => e
handle_exception(
"CLICKHOUSE:READ:EXCEPTION",
"error",
e
)
end

private

def query(connection, query)
query_execution(connection, query).map do |row|
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
end

def create_connection(connection_config)
@auth_token = Base64.strict_encode64("#{connection_config[:username]}:#{connection_config[:password]}")
Faraday.new(connection_config[:host]) do |faraday|
faraday.request :url_encoded
faraday.adapter Faraday.default_adapter
end
end

def query_execution(connection, query)
response = connection.post do |req|
req.url "/"
req.headers["Authorization"] = "Basic #{@auth_token}"
req.headers["Content-Type"] = "text/plain"
req.body = query
end
column_names = query[/SELECT (.*?) FROM/i, 1].split(",").map(&:strip)
response.body.strip.split("\n").map do |row|
columns = row.split("\t")
column_names.zip(columns).to_h
end
end

def create_streams(records)
group_by_table(records).map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def group_by_table(records)
result = {}
records.each_with_index do |entry, index|
table_name = entry["table_name"]
column_data = {
column_name: entry["column_name"],
data_type: entry["data_type"].gsub(/Nullable\((\w+)\)/, '\1').downcase.gsub!(/\d+/, ""),
is_nullable: entry["is_nullable"] == "1"
}
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = [column_data]
end
result
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "Clickhouse",
"title": "ClickHouse",
"connector_type": "source",
"category": "Data Warehouse",
"documentation_url": "https://docs.multiwoven.com/integrations/sources/clickhouse",
"github_issue_label": "source-clickhouse",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/clickhouse",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ClickHouse",
"type": "object",
"required": ["url", "username", "password", "database"],
"properties": {
"url": {
"description": "The ClickHouse host url to connect.",
"examples": ["tu61szglca.us-west-2.aws.clickhouse.cloud"],
"type": "string",
"title": "Personal URL",
"order": 0
},
"username": {
"description": "The username for ClickHouse.",
"examples": ["Default"],
"type": "string",
"title": "Username",
"order": 1
},
"password": {
"description": "The password for ClickHouse.",
"examples": ["Default"],
"type": "string",
"multiwoven_secret": true,
"title": "Password",
"order": 2
},
"database": {
"description": "The ClickHouse database.",
"examples": ["default"],
"type": "string",
"title": "Database",
"order": 3
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading