diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index b268e97f..80bacca5 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.14.2) + multiwoven-integrations (0.15.0) MailchimpMarketing activesupport async-websocket diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index ea783a62..f8164b95 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -93,6 +93,7 @@ require_relative "integrations/destination/microsoft_excel/client" require_relative "integrations/destination/microsoft_sql/client" require_relative "integrations/destination/mailchimp/client" +require_relative "integrations/destination/ais_data_store/client" module Multiwoven module Integrations diff --git a/integrations/lib/multiwoven/integrations/destination/ais_data_store/client.rb b/integrations/lib/multiwoven/integrations/destination/ais_data_store/client.rb new file mode 100644 index 00000000..67d878d6 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/ais_data_store/client.rb @@ -0,0 +1,127 @@ +# frozen_string_literal: true + +require "pg" + +module Multiwoven::Integrations::Destination + module AISDataStore + include Multiwoven::Integrations::Core + class Client < DestinationConnector + def check_connection(connection_config) + connection_config = connection_config.with_indifferent_access + create_connection(connection_config) + ConnectionStatus.new( + status: ConnectionStatusType["succeeded"] + ).to_multiwoven_message + rescue PG::Error => e + ConnectionStatus.new( + status: ConnectionStatusType["failed"], message: e.message + ).to_multiwoven_message + end + + def discover(connection_config) + connection_config = connection_config.with_indifferent_access + query = "SELECT table_name, column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_schema = '#{connection_config[:schema]}' AND table_catalog = '#{connection_config[:database]}' + ORDER BY table_name, ordinal_position;" + + db = create_connection(connection_config) + records = db.exec(query) do |result| + result.map do |row| + row + end + end + catalog = Catalog.new(streams: create_streams(records)) + catalog.to_multiwoven_message + rescue StandardError => e + handle_exception(e, { + context: "POSTGRESQL:DISCOVER:EXCEPTION", + type: "error" + }) + ensure + db&.close + end + + def write(sync_config, records, action = "destination_insert") + connection_config = sync_config.destination.connection_specification.with_indifferent_access + table_name = sync_config.stream.name + primary_key = sync_config.model.primary_key + log_message_array = [] + db = create_connection(connection_config) + + write_success = 0 + write_failure = 0 + + records.each do |record| + query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key) + logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}") + begin + response = db.exec(query) + write_success += 1 + log_message_array << log_request_response("info", query, response) + rescue StandardError => e + handle_exception(e, { + context: "POSTGRESQL:RECORD:WRITE:EXCEPTION", + type: "error", + sync_id: sync_config.sync_id, + sync_run_id: sync_config.sync_run_id + }) + write_failure += 1 + log_message_array << log_request_response("error", query, e.message) + end + end + tracking_message(write_success, write_failure, log_message_array) + rescue StandardError => e + handle_exception(e, { + context: "POSTGRESQL:RECORD:WRITE:EXCEPTION", + type: "error", + sync_id: sync_config.sync_id, + sync_run_id: sync_config.sync_run_id + }) + end + + private + + def query(connection, query) + connection.exec(query) do |result| + result.map do |row| + RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message + end + end + end + + def create_connection(connection_config) + raise "Unsupported Auth type" unless connection_config[:credentials][:auth_type] == "username/password" + + PG.connect( + host: connection_config[:host], + dbname: connection_config[:database], + user: connection_config[:credentials][:username], + password: connection_config[:credentials][:password], + port: connection_config[:port] + ) + end + + def create_streams(records) + group_by_table(records).map do |r| + Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns])) + end + end + + def group_by_table(records) + records.group_by { |entry| entry["table_name"] }.map do |table_name, columns| + { + tablename: table_name, + columns: columns.map do |column| + { + column_name: column["column_name"], + type: column["data_type"], + optional: column["is_nullable"] == "YES" + } + end + } + end + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/destination/ais_data_store/config/meta.json b/integrations/lib/multiwoven/integrations/destination/ais_data_store/config/meta.json new file mode 100644 index 00000000..194991f1 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/ais_data_store/config/meta.json @@ -0,0 +1,15 @@ +{ + "data": { + "name": "AISDataStore", + "title": "AIS Data Store", + "connector_type": "destination", + "category": "Database", + "documentation_url": "https://docs.mutliwoven.com", + "github_issue_label": "destination-postgresql", + "icon": "icon.svg", + "license": "MIT", + "release_stage": "alpha", + "support_level": "community", + "tags": ["language:ruby", "multiwoven"] + } +} diff --git a/integrations/lib/multiwoven/integrations/destination/ais_data_store/config/spec.json b/integrations/lib/multiwoven/integrations/destination/ais_data_store/config/spec.json new file mode 100644 index 00000000..7ce099e6 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/ais_data_store/config/spec.json @@ -0,0 +1,68 @@ +{ + "documentation_url": "https://docs.multiwoven.com/integrations/sources/postgresql", + "stream_type": "dynamic", + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Postgresql", + "type": "object", + "required": ["host", "port", "database", "schema"], + "properties": { + "credentials": { + "title": "", + "type": "object", + "required": ["auth_type", "username", "password"], + "properties": { + "auth_type": { + "type": "string", + "default": "username/password", + "order": 0, + "readOnly": true + }, + "username": { + "description": "Username refers to your individual PostgreSQL login credentials. At a minimum, the user associated with these credentials must be granted read access to the data intended for synchronization.", + "examples": ["POSTGRESQL_USER"], + "type": "string", + "title": "Username", + "order": 1 + }, + "password": { + "description": "This field requires the password associated with the user account specified in the preceding section.", + "type": "string", + "multiwoven_secret": true, + "title": "Password", + "order": 2 + } + }, + "order": 0 + }, + "host": { + "description": "The hostname or IP address of your PostgreSQL server.", + "examples": ["127.0.0.1"], + "type": "string", + "title": "Host", + "order": 1 + }, + "port": { + "description": "The port number for your PostgreSQL server, which defaults to 5432, may vary based on your configuration. ", + "examples": ["5432"], + "type": "string", + "title": "Port", + "order": 2 + }, + "database": { + "description": "The specific PostgreSQL database to connect to.", + "examples": ["POSTGRESQL_DB"], + "type": "string", + "title": "Database", + "order": 3 + }, + "schema": { + "description": "The schema within the PostgreSQL database.", + "examples": ["POSTGRESQL_SCHEMA"], + "type": "string", + "title": "Schema", + "order": 4 + } + } + } +} diff --git a/integrations/lib/multiwoven/integrations/destination/ais_data_store/icon.svg b/integrations/lib/multiwoven/integrations/destination/ais_data_store/icon.svg index 277ee307..eb2d575c 100644 --- a/integrations/lib/multiwoven/integrations/destination/ais_data_store/icon.svg +++ b/integrations/lib/multiwoven/integrations/destination/ais_data_store/icon.svg @@ -1,4 +1,11 @@ +<<<<<<< HEAD +======= + + + + +>>>>>>> 70e1bd5d (feat(CE): AIS Data store destination connector (#573)) diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index 4abff0f7..6d9488b8 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.14.2" + VERSION = "0.15.0" ENABLED_SOURCES = %w[ Snowflake @@ -43,6 +43,7 @@ module Integrations MicrosoftExcel MicrosoftSql Mailchimp + AISDataStore ].freeze end end diff --git a/integrations/spec/multiwoven/integrations/destination/ais_data_store/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/ais_data_store/client_spec.rb new file mode 100644 index 00000000..8419c92f --- /dev/null +++ b/integrations/spec/multiwoven/integrations/destination/ais_data_store/client_spec.rb @@ -0,0 +1,215 @@ +# frozen_string_literal: true + +RSpec.describe Multiwoven::Integrations::Destination::AISDataStore::Client do + let(:client) { Multiwoven::Integrations::Destination::AISDataStore::Client.new } + let(:sync_config) do + { + "source": { + "name": "PostgresqlSourceConnector", + "type": "source", + "connection_specification": { + "credentials": { + "auth_type": "username/password", + "username": ENV["POSTGRESQL_USERNAME"], + "password": ENV["POSTGRESQL_PASSWORD"] + }, + "host": "test.pg.com", + "port": "8080", + "database": "test_database", + "schema": "test_schema" + } + }, + "destination": { + "name": "AISDataStore", + "type": "destination", + "connection_specification": { + "credentials": { + "auth_type": "username/password", + "username": ENV["POSTGRESQL_USERNAME"], + "password": ENV["POSTGRESQL_PASSWORD"] + }, + "host": "test.pg.com", + "port": "8080", + "database": "test_database", + "schema": "test_schema" + } + }, + "model": { + "name": "ExamplePostgresqlModel", + "query": "SELECT * FROM contacts;", + "query_type": "raw_sql", + "primary_key": "id" + }, + "stream": { + "name": "users", "action": "create", + "json_schema": { "user_id": "string", "email": "string", "location": "string" }, + "supported_sync_modes": %w[full_refresh incremental] + }, + "sync_mode": "full_refresh", + "cursor_field": "timestamp", + "destination_sync_mode": "upsert", + "sync_id": "1" + } + end + + let(:pg_connection) { instance_double(PG::Connection) } + let(:pg_result) { instance_double(PG::Result) } + + let(:records) do + [ + Multiwoven::Integrations::Protocol::RecordMessage.new( + data: { + email: "user1@example.com", + location: "New York", + user_id: 1 + }, + emitted_at: Time.now.to_i + ), + Multiwoven::Integrations::Protocol::RecordMessage.new( + data: { + email: "user2@example.com", + location: "San Francisco", + user_id: 2 + }, + emitted_at: Time.now.to_i + ) + ] + end + + describe "#check_connection" do + context "when the connection is successful" do + it "returns a succeeded connection status" do + allow(Sequel).to receive(:postgres).and_return(true) + allow(PG).to receive(:connect).and_return(pg_connection) + message = client.check_connection(sync_config[:source][:connection_specification]) + result = message.connection_status + + expect(result.status).to eq("succeeded") + expect(result.message).to be_nil + end + end + + context "when the connection fails" do + it "returns a failed connection status with an error message" do + allow(PG).to receive(:connect).and_raise(PG::Error.new("Connection failed")) + + message = client.check_connection(sync_config[:source][:connection_specification]) + result = message.connection_status + expect(result.status).to eq("failed") + expect(result.message).to include("Connection failed") + end + end + end + + # write specs + + describe "#write" do + context "success" do + it "write records successfully" do + s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) + s_config.sync_run_id = "33" + allow(PG).to receive(:connect).and_return(pg_connection) + + allow(pg_connection).to receive(:exec).and_return(true) + + tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking + expect(tracking.success).to eql(1) + log_message = tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") + end + + it "write records successfully on update record action destination_update" do + s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) + s_config.sync_run_id = "33" + allow(PG).to receive(:connect).and_return(pg_connection) + + allow(pg_connection).to receive(:exec).and_return(true) + + tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)], "destination_update").tracking + expect(tracking.success).to eql(1) + expect(tracking.logs.count).to eql(1) + log_message = tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") + end + end + + context "failure" do + it "handle record write failures" do + s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json) + s_config.sync_run_id = "34" + + allow(PG).to receive(:connect).and_return(pg_connection) + + allow(pg_connection).to receive(:exec).and_raise(StandardError.new("test error")) + + tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking + expect(tracking.failed).to eql(1) + expect(tracking.logs.count).to eql(1) + log_message = tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("error") + expect(log_message.message).to include("request") + expect(log_message.message).to include("\"response\":\"test error\"") + end + end + end + + describe "#discover" do + it "discovers schema successfully" do + allow(PG).to receive(:connect).and_return(pg_connection) + discovery_query = "SELECT table_name, column_name, data_type, is_nullable\n" \ + " FROM information_schema.columns\n" \ + " WHERE table_schema = 'test_schema' AND table_catalog = 'test_database'\n" \ + " ORDER BY table_name, ordinal_position;" + allow(pg_connection).to receive(:exec).with(discovery_query).and_return( + [ + { + "table_name" => "combined_users", "column_name" => "city", "data_type" => "varchar", "is_nullable" => "YES" + } + ] + ) + allow(pg_connection).to receive(:close).and_return(true) + message = client.discover(sync_config[:source][:connection_specification]) + + expect(message.catalog).to be_an(Multiwoven::Integrations::Protocol::Catalog) + first_stream = message.catalog.streams.first + expect(first_stream).to be_a(Multiwoven::Integrations::Protocol::Stream) + expect(first_stream.name).to eq("combined_users") + expect(first_stream.json_schema).to be_an(Hash) + expect(first_stream.json_schema["type"]).to eq("object") + expect(first_stream.json_schema["properties"]).to eq({ "city" => { "type" => %w[string null] } }) + end + + it "discover schema failure" do + allow(client).to receive(:create_connection).and_raise(StandardError.new("test error")) + expect(client).to receive(:handle_exception).with( + an_instance_of(StandardError), { + context: "POSTGRESQL:DISCOVER:EXCEPTION", + type: "error" + } + ) + client.discover(sync_config[:source][:connection_specification]) + end + end + + describe "#meta_data" do + it "client class_name and meta name is same" do + meta_name = client.class.to_s.split("::")[-2] + expect(client.send(:meta_data)[:data][:name]).to eq(meta_name) + end + end + + describe "method definition" do + it "defines a private #query method" do + expect(described_class.private_instance_methods).to include(:query) + end + end +end