Skip to content

Commit

Permalink
feat(CE): AIS Data store destination connector (#462)
Browse files Browse the repository at this point in the history
Co-authored-by: afthab vp <[email protected]>
  • Loading branch information
github-actions[bot] and afthabvp authored Nov 13, 2024
1 parent 15b9caf commit 8642e55
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 2 deletions.
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.14.2)
multiwoven-integrations (0.15.0)
MailchimpMarketing
activesupport
async-websocket
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
require_relative "integrations/destination/microsoft_excel/client"
require_relative "integrations/destination/microsoft_sql/client"
require_relative "integrations/destination/mailchimp/client"
require_relative "integrations/destination/ais_data_store/client"

module Multiwoven
module Integrations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# frozen_string_literal: true

require "pg"

module Multiwoven::Integrations::Destination
module AISDataStore
include Multiwoven::Integrations::Core
class Client < DestinationConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(
status: ConnectionStatusType["succeeded"]
).to_multiwoven_message
rescue PG::Error => e
ConnectionStatus.new(
status: ConnectionStatusType["failed"], message: e.message
).to_multiwoven_message
end

def discover(connection_config)
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = '#{connection_config[:schema]}' AND table_catalog = '#{connection_config[:database]}'
ORDER BY table_name, ordinal_position;"

db = create_connection(connection_config)
records = db.exec(query) do |result|
result.map do |row|
row
end
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:DISCOVER:EXCEPTION",
type: "error"
})
ensure
db&.close
end

def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
log_message_array = []
db = create_connection(connection_config)

write_success = 0
write_failure = 0

records.each do |record|
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
response = db.exec(query)
write_success += 1
log_message_array << log_request_response("info", query, response)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", query, e.message)
end
end
tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def query(connection, query)
connection.exec(query) do |result|
result.map do |row|
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
end
end
end

def create_connection(connection_config)
raise "Unsupported Auth type" unless connection_config[:credentials][:auth_type] == "username/password"

PG.connect(
host: connection_config[:host],
dbname: connection_config[:database],
user: connection_config[:credentials][:username],
password: connection_config[:credentials][:password],
port: connection_config[:port]
)
end

def create_streams(records)
group_by_table(records).map do |r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def group_by_table(records)
records.group_by { |entry| entry["table_name"] }.map do |table_name, columns|
{
tablename: table_name,
columns: columns.map do |column|
{
column_name: column["column_name"],
type: column["data_type"],
optional: column["is_nullable"] == "YES"
}
end
}
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "AISDataStore",
"title": "AIS Data Store",
"connector_type": "destination",
"category": "Database",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "destination-postgresql",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/sources/postgresql",
"stream_type": "dynamic",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Postgresql",
"type": "object",
"required": ["host", "port", "database", "schema"],
"properties": {
"credentials": {
"title": "",
"type": "object",
"required": ["auth_type", "username", "password"],
"properties": {
"auth_type": {
"type": "string",
"default": "username/password",
"order": 0,
"readOnly": true
},
"username": {
"description": "Username refers to your individual PostgreSQL login credentials. At a minimum, the user associated with these credentials must be granted read access to the data intended for synchronization.",
"examples": ["POSTGRESQL_USER"],
"type": "string",
"title": "Username",
"order": 1
},
"password": {
"description": "This field requires the password associated with the user account specified in the preceding section.",
"type": "string",
"multiwoven_secret": true,
"title": "Password",
"order": 2
}
},
"order": 0
},
"host": {
"description": "The hostname or IP address of your PostgreSQL server.",
"examples": ["127.0.0.1"],
"type": "string",
"title": "Host",
"order": 1
},
"port": {
"description": "The port number for your PostgreSQL server, which defaults to 5432, may vary based on your configuration. ",
"examples": ["5432"],
"type": "string",
"title": "Port",
"order": 2
},
"database": {
"description": "The specific PostgreSQL database to connect to.",
"examples": ["POSTGRESQL_DB"],
"type": "string",
"title": "Database",
"order": 3
},
"schema": {
"description": "The schema within the PostgreSQL database.",
"examples": ["POSTGRESQL_SCHEMA"],
"type": "string",
"title": "Schema",
"order": 4
}
}
}
}
Loading

0 comments on commit 8642e55

Please sign in to comment.