Skip to content

Commit

Permalink
feat(CE): Add mariaDB source connector (Multiwoven#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
TivonB-AI2 authored Jun 26, 2024
1 parent d9bcc51 commit 24457f1
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 2 deletions.
2 changes: 2 additions & 0 deletions integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ gem "duckdb"

gem "iterable-api-client"

gem "mysql2"

group :development, :test do
gem "simplecov", require: false
gem "simplecov_json_formatter", require: false
Expand Down
4 changes: 3 additions & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.2.0)
multiwoven-integrations (0.3.0)
activesupport
async-websocket
aws-sdk-athena
Expand Down Expand Up @@ -212,6 +212,7 @@ GEM
multi_json (1.15.0)
multipart-post (2.4.1)
mutex_m (0.2.0)
mysql2 (0.5.6)
net-sftp (4.0.0)
net-ssh (>= 5.0.0, < 8.0.0)
net-ssh (7.2.3)
Expand Down Expand Up @@ -342,6 +343,7 @@ DEPENDENCIES
hubspot-api-client (~> 17.2.0)
iterable-api-client
multiwoven-integrations!
mysql2
net-sftp
pg
rake (~> 13.0)
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
require_relative "integrations/source/aws_athena/client"
require_relative "integrations/source/clickhouse/client"
require_relative "integrations/source/amazon_s3/client"
require_relative "integrations/source/maria_db/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.2.0"
VERSION = "0.3.0"

ENABLED_SOURCES = %w[
Snowflake
Expand All @@ -14,6 +14,7 @@ module Integrations
AwsAthena
Clickhouse
AmazonS3
MariaDB
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
92 changes: 92 additions & 0 deletions integrations/lib/multiwoven/integrations/source/maria_db/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Source
module MariaDB
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
rescue StandardError => e
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable FROM information_schema.columns WHERE table_schema = '#{connection_config[:database]}' ORDER BY table_name, ordinal_position;"
db = create_connection(connection_config)
results = query_execution(db, query)
catalog = Catalog.new(streams: create_streams(results))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "MARIA:DB:DISCOVER:EXCEPTION",
type: "error"
})
end

def read(sync_config)
connection_config = sync_config.source.connection_specification.with_indifferent_access
query = sync_config.model.query
query = batched_query(query, sync_config.limit, sync_config.offset) unless sync_config.limit.nil? && sync_config.offset.nil?
db = create_connection(connection_config)
query(db, query)
rescue StandardError => e
handle_exception(e, {
context: "MARIA:DB:READ:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def create_connection(connection_config)
Sequel.connect(
adapter: "mysql2",
host: connection_config[:host],
port: connection_config[:port],
user: connection_config[:username],
password: connection_config[:password],
database: connection_config[:database]
)
end

def query_execution(db, query)
db.fetch(query).all
end

def create_streams(records)
group_by_table(records).map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def query(db, query)
records = []
query_execution(db, query).map do |row|
records << RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
records
end

def group_by_table(records)
result = {}
records.each_with_index do |entry, index|
table_name = entry[:table_name]
column_data = {
column_name: entry[:column_name],
data_type: entry[:data_type],
is_nullable: entry[:is_nullable] == "YES"
}
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = [column_data]
end
result
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "MariaDB",
"title": "Maria DB",
"connector_type": "source",
"category": "Data Warehouse",
"documentation_url": "https://docs.squared.ai/guides/data-integration/sources/mariadb",
"github_issue_label": "source-maria-db",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"documentation_url": "https://docs.squared.ai/guides/data-integration/sources/mariadb",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Maria DB",
"type": "object",
"required": ["host", "port", "username", "password", "database"],
"properties": {
"host": {
"description": "The hostname or IP address of the server where the MariaDB database is hosted.",
"examples": ["localhost"],
"type": "string",
"title": "Host",
"order": 0
},
"port": {
"description": "The port number on which the MariaDB server is listening for connections.",
"examples": ["3306"],
"type": "string",
"title": "Port",
"order": 1
},
"username": {
"description": "The username used to authenticate and connect to the MariaDB database.",
"examples": ["root"],
"type": "string",
"title": "Username",
"order": 2
},
"password": {
"description": "The password corresponding to the username used for authentication.",
"type": "string",
"multiwoven_secret": true,
"title": "Password",
"order": 3
},
"database": {
"description": "The name of the specific database within the MariaDB server to connect to.",
"examples": ["mydatabase"],
"type": "string",
"title": "Database",
"order": 4
}
}
}
}
15 changes: 15 additions & 0 deletions integrations/lib/multiwoven/integrations/source/maria_db/icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 24457f1

Please sign in to comment.