Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CE): AIS Data store destination connector #462

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.14.2)
multiwoven-integrations (0.15.0)
MailchimpMarketing
activesupport
async-websocket
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
require_relative "integrations/destination/microsoft_excel/client"
require_relative "integrations/destination/microsoft_sql/client"
require_relative "integrations/destination/mailchimp/client"
require_relative "integrations/destination/ais_data_store/client"

module Multiwoven
module Integrations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# frozen_string_literal: true

require "pg"

module Multiwoven::Integrations::Destination
module AISDataStore
include Multiwoven::Integrations::Core
class Client < DestinationConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(
status: ConnectionStatusType["succeeded"]
).to_multiwoven_message
rescue PG::Error => e
ConnectionStatus.new(
status: ConnectionStatusType["failed"], message: e.message
).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '#{connection_config[:schema]}' AND table_catalog = '#{connection_config[:database]}'
ORDER BY table_name, ordinal_position;"

db = create_connection(connection_config)
records = db.exec(query) do |result|
result.map do |row|
row
end
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:DISCOVER:EXCEPTION",
type: "error"
})
ensure
db&.close
end

def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
log_message_array = []
db = create_connection(connection_config)

write_success = 0
write_failure = 0

records.each do |record|
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
response = db.exec(query)
write_success += 1
log_message_array << log_request_response("info", query, response)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", query, e.message)
end
end
tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def query(connection, query)
connection.exec(query) do |result|
result.map do |row|
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
end
end

def create_connection(connection_config)
raise "Unsupported Auth type" unless connection_config[:credentials][:auth_type] == "username/password"

PG.connect(
host: connection_config[:host],
dbname: connection_config[:database],
user: connection_config[:credentials][:username],
password: connection_config[:credentials][:password],
port: connection_config[:port]
)
end

def create_streams(records)
group_by_table(records).map do |r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def group_by_table(records)
records.group_by { |entry| entry["table_name"] }.map do |table_name, columns|
{
tablename: table_name,
columns: columns.map do |column|
{
column_name: column["column_name"],
type: column["data_type"],
optional: column["is_nullable"] == "YES"
}
end
}
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "AISDataStore",
"title": "AIS Data Store",
"connector_type": "destination",
"category": "Database",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "destination-postgresql",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/postgresql",
"stream_type": "dynamic",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgresql",
"type": "object",
"required": ["host", "port", "database", "schema"],
"properties": {
"credentials": {
"title": "",
"type": "object",
"required": ["auth_type", "username", "password"],
"properties": {
"auth_type": {
"type": "string",
"default": "username/password",
"order": 0,
"readOnly": true
},
"username": {
"description": "Username refers to your individual PostgreSQL login credentials. At a minimum, the user associated with these credentials must be granted read access to the data intended for synchronization.",
"examples": ["POSTGRESQL_USER"],
"type": "string",
"title": "Username",
"order": 1
},
"password": {
"description": "This field requires the password associated with the user account specified in the preceding section.",
"type": "string",
"multiwoven_secret": true,
"title": "Password",
"order": 2
}
},
"order": 0
},
"host": {
"description": "The hostname or IP address of your PostgreSQL server.",
"examples": ["127.0.0.1"],
"type": "string",
"title": "Host",
"order": 1
},
"port": {
"description": "The port number for your PostgreSQL server, which defaults to 5432, may vary based on your configuration. ",
"examples": ["5432"],
"type": "string",
"title": "Port",
"order": 2
},
"database": {
"description": "The specific PostgreSQL database to connect to.",
"examples": ["POSTGRESQL_DB"],
"type": "string",
"title": "Database",
"order": 3
},
"schema": {
"description": "The schema within the PostgreSQL database.",
"examples": ["POSTGRESQL_SCHEMA"],
"type": "string",
"title": "Schema",
"order": 4
}
}
}
}
Loading
Loading