Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/Multiwoven/multiwoven into …
Browse files Browse the repository at this point in the history
…cherry-pick-ce-commit-76afbbf89c39fe58d6ac260842bca7120d852d04
  • Loading branch information
pabss-ai2 committed Jul 9, 2024
2 parents 95521db + 55ba79d commit 1763c95
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 20 deletions.
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
require_relative "integrations/destination/zendesk/client"
require_relative "integrations/destination/http/client"
require_relative "integrations/destination/iterable/client"
require_relative "integrations/destination/maria_db/client"

module Multiwoven
module Integrations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Destination
module MariaDB
include Multiwoven::Integrations::Core
class Client < DestinationConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
rescue StandardError => e
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '#{connection_config[:database]}'
ORDER BY table_name, ordinal_position;"

db = create_connection(connection_config)
records = db.fetch(query) do |result|
result.map do |row|
row
end
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"MARIA:DB:DISCOVER:EXCEPTION",
"error",
e
)
end

def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
db = create_connection(connection_config)

write_success = 0
write_failure = 0

records.each do |record|
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("MARIA:DB:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
db.run(query)
write_success += 1
rescue StandardError => e
handle_exception(e, {
context: "MARIA:DB:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
end
end
tracking_message(write_success, write_failure)
rescue StandardError => e
handle_exception(e, {
context: "MARIA:DB:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def create_connection(connection_config)
Sequel.connect(
adapter: "mysql2",
host: connection_config[:host],
port: connection_config[:port],
user: connection_config[:username],
password: connection_config[:password],
database: connection_config[:database]
)
end

def create_streams(records)
group_by_table(records).map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def group_by_table(records)
result = {}
records.each_with_index do |entry, index|
table_name = entry[:table_name]
column_data = {
column_name: entry[:column_name],
data_type: entry[:data_type],
is_nullable: entry[:is_nullable] == "YES"
}
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = [column_data]
end
result
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "MariaDB",
"title": "Maria DB",
"connector_type": "destination",
"category": "Database",
"documentation_url": "https://docs.squared.ai/guides/data-integration/destination/mariadb",
"github_issue_label": "destination-maria-db",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"documentation_url": "https://docs.squared.ai/guides/data-integration/destination/mariadb",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Maria DB",
"type": "object",
"required": ["host", "port", "username", "password", "database"],
"properties": {
"host": {
"description": "The hostname or IP address of the server where the MariaDB database is hosted.",
"examples": ["localhost"],
"type": "string",
"title": "Host",
"order": 0
},
"port": {
"description": "The port number on which the MariaDB server is listening for connections.",
"examples": ["3306"],
"type": "string",
"title": "Port",
"order": 1
},
"username": {
"description": "The username used to authenticate and connect to the MariaDB database.",
"examples": ["root"],
"type": "string",
"title": "Username",
"order": 2
},
"password": {
"description": "The password corresponding to the username used for authentication.",
"type": "string",
"multiwoven_secret": true,
"title": "Password",
"order": 3
},
"database": {
"description": "The name of the specific database within the MariaDB server to connect to.",
"examples": ["test"],
"type": "string",
"title": "Database",
"order": 4
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module Integrations
Zendesk
Http
Iterable
MariaDB
].freeze
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# frozen_string_literal: true

RSpec.describe Multiwoven::Integrations::Destination::MariaDB::Client do
include WebMock::API

before(:each) do
WebMock.disable_net_connect!(allow_localhost: true)
end

let(:client) { described_class.new }
let(:connection_config) do
{
host: "127.0.0.1",
port: "3306",
username: "Test_service",
password: ENV["MARIADB_PASSWORD"],
database: "test_database"
}
end
let(:sync_config_json) do
{
source: {
name: "Sample Source Connector",
type: "source",
connection_specification: {
private_api_key: "test_api_key"
}
},
destination: {
name: "MariaDB",
type: "destination",
connection_specification: connection_config
},
model: {
name: "ExampleModel",
query: "SELECT col1, col2, col3 FROM test_table_1",
query_type: "raw_sql",
primary_key: "col1"
},
sync_mode: "incremental",
destination_sync_mode: "insert",
stream: {
name: "Test_Table",
action: "create",
json_schema: { "field1": "type1" },
supported_sync_modes: %w[full_refresh incremental]
}
}
end

let(:sequel_client) { instance_double(Sequel::Database) }
let(:table) { double("Table") }

describe "#check_connection" do
context "when the connection is successful" do
it "returns a succeeded connection status" do
allow_any_instance_of(Multiwoven::Integrations::Destination::MariaDB::Client).to receive(:create_connection).and_return(sequel_client)
message = client.check_connection(sync_config_json[:destination][:connection_specification])
result = message.connection_status
expect(result.status).to eq("succeeded")
expect(result.message).to be_nil
end
end

context "when the connection fails" do
it "returns a failed connection status with an error message" do
allow_any_instance_of(Multiwoven::Integrations::Destination::MariaDB::Client).to receive(:create_connection).and_raise(StandardError, "Connection failed")
message = client.check_connection(sync_config_json[:destination][:connection_specification])
result = message.connection_status
expect(result.status).to eq("failed")
expect(result.message).to include("Connection failed")
end
end
end

describe "#discover" do
it "discovers schema successfully" do
dataset = [
{ table_name: "test_table", column_name: "col1", data_type: "int", is_nullable: "YES" },
{ table_name: "test_table", column_name: "col2", data_type: "varchar", is_nullable: "YES" },
{ table_name: "test_table", column_name: "col3", data_type: "float", is_nullable: "YES" }
]
allow(sequel_client).to receive(:fetch).and_return(dataset)
allow(client).to receive(:create_connection).and_return(sequel_client)

message = client.discover(sync_config_json[:destination][:connection_specification])
expect(message.catalog).to be_an(Multiwoven::Integrations::Protocol::Catalog)
first_stream = message.catalog.streams.first
expect(first_stream).to be_a(Multiwoven::Integrations::Protocol::Stream)
expect(first_stream.name).to eq("test_table")
expect(first_stream.json_schema).to be_an(Hash)
expect(first_stream.json_schema["type"]).to eq("object")
expect(first_stream.json_schema["properties"]).to eq({ "col1" => { "type" => "string" } })
end
end

describe "#write" do
context "when the write operation is successful" do
before do
allow_any_instance_of(Multiwoven::Integrations::Source::MariaDB::Client).to receive(:create_connection).and_return(sequel_client)
end

it "increments the success count" do
sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(
sync_config_json.to_json
)
records = [
{ "table_name" => "external_table", "value_attribute" => { "Col1" => 400, "Col2" => 4.4, "Col3" => "Fourth" }.to_json },
{ "table_name" => "external_table", "value_attribute" => { "Col1" => 500, "Col2" => 5.5, "Col3" => "Fifth" }.to_json },
{ "table_name" => "external_table", "value_attribute" => { "Col1" => 600, "Col2" => 6.6, "Col3" => "Sixth" }.to_json }
]
allow(client).to receive(:create_connection).and_return(sequel_client)
allow(sequel_client).to receive(:run).and_return(nil)
response = client.write(sync_config, records)
expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
end
end

context "when the write operation fails" do
before do
allow_any_instance_of(Multiwoven::Integrations::Destination::MariaDB::Client).to receive(:create_connection).and_return(sequel_client)
end
it "increments the failure count" do
sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(
sync_config_json.to_json
)
records = [
{ "table_name" => "external_table", "value_attribute" => { "Col1" => 400, "Col2" => 4.4, "Col3" => "Fourth" } },
{ "table_name" => "external_table", "value_attribute" => { "Col1" => 500, "Col2" => 5.5, "Col3" => "Fifth" } },
{ "table_name" => "external_table", "value_attribute" => { "Col1" => 600, "Col2" => 6.6, "Col3" => "Sixth" } }
]
allow(client).to receive(:create_connection).and_return(sequel_client)
allow(sequel_client).to receive(:run).and_raise(StandardError)
response = client.write(sync_config, records)
expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
end
end
end

describe "#meta_data" do
# change this to rollout validation for all connector rolling out
it "client class_name and meta name is same" do
meta_name = client.class.to_s.split("::")[-2]
expect(client.send(:meta_data)[:data][:name]).to eq(meta_name)
end
end
end
Loading

0 comments on commit 1763c95

Please sign in to comment.