From 68dd88dbbffbb52c232165e74d19ecf03482ffec Mon Sep 17 00:00:00 2001 From: TivonB-AI2 <124182151+TivonB-AI2@users.noreply.github.com> Date: Fri, 2 Aug 2024 05:02:31 -0400 Subject: [PATCH] feat(CE): add oracle db destination connector (#277) --- .github/workflows/integrations-ci.yml | 8 + .github/workflows/integrations-main.yml | 8 + integrations/Gemfile | 2 + integrations/Gemfile.lock | 6 +- integrations/lib/multiwoven/integrations.rb | 2 + .../destination/oracle_db/client.rb | 112 ++++++++++++++ .../destination/oracle_db/config/meta.json | 15 ++ .../destination/oracle_db/config/spec.json | 47 ++++++ .../destination/oracle_db/icon.svg | 4 + .../lib/multiwoven/integrations/rollout.rb | 3 +- integrations/multiwoven-integrations.gemspec | 1 + .../destination/oracle_db/client_spec.rb | 146 ++++++++++++++++++ 12 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 integrations/lib/multiwoven/integrations/destination/oracle_db/client.rb create mode 100644 integrations/lib/multiwoven/integrations/destination/oracle_db/config/meta.json create mode 100644 integrations/lib/multiwoven/integrations/destination/oracle_db/config/spec.json create mode 100644 integrations/lib/multiwoven/integrations/destination/oracle_db/icon.svg create mode 100644 integrations/spec/multiwoven/integrations/destination/oracle_db/client_spec.rb diff --git a/.github/workflows/integrations-ci.yml b/.github/workflows/integrations-ci.yml index 4e8c44d9..1eca6149 100644 --- a/.github/workflows/integrations-ci.yml +++ b/.github/workflows/integrations-ci.yml @@ -31,6 +31,14 @@ jobs: sudo mv libduckdb/libduckdb.so /usr/local/lib sudo ldconfig /usr/local/lib + - name: Download and Install Oracle Instant Client + run: | + sudo apt-get install -y libaio1 alien + wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-basic-19.6.0.0.0-1.x86_64.rpm + wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-devel-19.6.0.0.0-1.x86_64.rpm + sudo alien -i --scripts oracle-instantclient*.rpm + rm -f oracle-instantclient*.rpm + - name: Install dependencies run: | gem install bundler diff --git a/.github/workflows/integrations-main.yml b/.github/workflows/integrations-main.yml index d9ecb8ad..fd43f306 100644 --- a/.github/workflows/integrations-main.yml +++ b/.github/workflows/integrations-main.yml @@ -39,6 +39,14 @@ jobs: sudo mv libduckdb/libduckdb.so /usr/local/lib sudo ldconfig /usr/local/lib + - name: Download and Install Oracle Instant Client + run: | + sudo apt-get install -y libaio1 alien + wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-basic-19.6.0.0.0-1.x86_64.rpm + wget http://yum.oracle.com/repo/OracleLinux/OL7/oracle/instantclient/x86_64/getPackage/oracle-instantclient19.6-devel-19.6.0.0.0-1.x86_64.rpm + sudo alien -i --scripts oracle-instantclient*.rpm + rm -f oracle-instantclient*.rpm + - name: Install dependencies run: bundle install working-directory: ./integrations diff --git a/integrations/Gemfile b/integrations/Gemfile index b28b59ce..1b82fbae 100644 --- a/integrations/Gemfile +++ b/integrations/Gemfile @@ -67,6 +67,8 @@ gem "mysql2" gem "aws-sdk-sts" +gem "ruby-oci8" + group :development, :test do gem "simplecov", require: false gem "simplecov_json_formatter", require: false diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index fb0b8ee2..5843cd3a 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.5.2) + multiwoven-integrations (0.6.0) activesupport async-websocket aws-sdk-athena @@ -28,6 +28,7 @@ PATH rake restforce ruby-limiter + ruby-oci8 ruby-odbc rubyzip sequel @@ -275,6 +276,8 @@ GEM rubocop-ast (1.31.3) parser (>= 3.3.1.0) ruby-limiter (2.3.0) + ruby-oci8 (2.2.12) + ruby-oci8 (2.2.12-x64-mingw-ucrt) ruby-progressbar (1.13.0) ruby2_keywords (0.0.5) rubyzip (2.3.2) @@ -357,6 +360,7 @@ DEPENDENCIES rspec (~> 3.0) rubocop (~> 1.21) ruby-limiter + ruby-oci8 ruby-odbc! rubyzip sequel diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index d305120f..dd8636cb 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -31,6 +31,7 @@ require "duckdb" require "iterable-api-client" require "aws-sdk-sts" +require "ruby-oci8" # Service require_relative "integrations/config" @@ -78,6 +79,7 @@ require_relative "integrations/destination/iterable/client" require_relative "integrations/destination/maria_db/client" require_relative "integrations/destination/databricks_lakehouse/client" +require_relative "integrations/destination/oracle_db/client" module Multiwoven module Integrations diff --git a/integrations/lib/multiwoven/integrations/destination/oracle_db/client.rb b/integrations/lib/multiwoven/integrations/destination/oracle_db/client.rb new file mode 100644 index 00000000..da848bf8 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/oracle_db/client.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +module Multiwoven::Integrations::Destination + module Oracle + include Multiwoven::Integrations::Core + class Client < DestinationConnector + def check_connection(connection_config) + connection_config = connection_config.with_indifferent_access + create_connection(connection_config) + ConnectionStatus.new( + status: ConnectionStatusType["succeeded"] + ).to_multiwoven_message + rescue StandardError => e + ConnectionStatus.new( + status: ConnectionStatusType["failed"], message: e.message + ).to_multiwoven_message + end + + def discover(connection_config) + records = [] + connection_config = connection_config.with_indifferent_access + query = "SELECT table_name, column_name, data_type, nullable + FROM all_tab_columns + WHERE owner = '#{connection_config[:username].upcase}' + ORDER BY table_name, column_id" + conn = create_connection(connection_config) + cursor = conn.exec(query) + while (row = cursor.fetch) + records << row + end + catalog = Catalog.new(streams: create_streams(records)) + catalog.to_multiwoven_message + rescue StandardError => e + handle_exception( + "ORACLE:DISCOVER:EXCEPTION", + "error", + e + ) + end + + def write(sync_config, records, action = "destination_insert") + connection_config = sync_config.destination.connection_specification.with_indifferent_access + table_name = sync_config.stream.name + primary_key = sync_config.model.primary_key + conn = create_connection(connection_config) + + write_success = 0 + write_failure = 0 + log_message_array = [] + + records.each do |record| + query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key) + query = query.gsub(";", "") + logger.debug("ORACLE:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}") + begin + response = conn.exec(query) + conn.exec("COMMIT") + write_success += 1 + log_message_array << log_request_response("info", query, response) + rescue StandardError => e + handle_exception(e, { + context: "ORACLE:RECORD:WRITE:EXCEPTION", + type: "error", + sync_id: sync_config.sync_id, + sync_run_id: sync_config.sync_run_id + }) + write_failure += 1 + log_message_array << log_request_response("error", query, e.message) + end + end + tracking_message(write_success, write_failure, log_message_array) + rescue StandardError => e + handle_exception(e, { + context: "ORACLE:RECORD:WRITE:EXCEPTION", + type: "error", + sync_id: sync_config.sync_id, + sync_run_id: sync_config.sync_run_id + }) + end + + private + + def create_connection(connection_config) + OCI8.new(connection_config[:username], connection_config[:password], "#{connection_config[:host]}:#{connection_config[:port]}/#{connection_config[:sid]}") + end + + def create_streams(records) + group_by_table(records).map do |_, r| + Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns])) + end + end + + def group_by_table(records) + result = {} + records.each_with_index do |entry, index| + table_name = entry[0] + column_data = { + column_name: entry[1], + data_type: entry[2], + is_nullable: entry[3] == "Y" + } + result[index] ||= {} + result[index][:tablename] = table_name + result[index][:columns] = [column_data] + end + result.values.group_by { |entry| entry[:tablename] }.transform_values do |entries| + { tablename: entries.first[:tablename], columns: entries.flat_map { |entry| entry[:columns] } } + end + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/destination/oracle_db/config/meta.json b/integrations/lib/multiwoven/integrations/destination/oracle_db/config/meta.json new file mode 100644 index 00000000..5662698c --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/oracle_db/config/meta.json @@ -0,0 +1,15 @@ +{ + "data": { + "name": "Oracle", + "title": "Oracle", + "connector_type": "destination", + "category": "Database", + "documentation_url": "https://docs.squared.ai/guides/data-integration/destination/oracle", + "github_issue_label": "destination-oracle", + "icon": "icon.svg", + "license": "MIT", + "release_stage": "alpha", + "support_level": "community", + "tags": ["language:ruby", "multiwoven"] + } +} diff --git a/integrations/lib/multiwoven/integrations/destination/oracle_db/config/spec.json b/integrations/lib/multiwoven/integrations/destination/oracle_db/config/spec.json new file mode 100644 index 00000000..2eed10f6 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/oracle_db/config/spec.json @@ -0,0 +1,47 @@ +{ + "documentation_url": "https://docs.squared.ai/guides/data-integration/destination/oracle", + "stream_type": "dynamic", + "connector_query_type": "raw_sql", + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Oracle", + "type": "object", + "required": ["host", "port", "sid", "username", "password"], + "properties": { + "host": { + "description": "The Oracle host.", + "examples": ["localhost"], + "type": "string", + "title": "Host", + "order": 0 + }, + "port": { + "description": "The Oracle port number.", + "examples": ["1521"], + "type": "string", + "title": "Port", + "order": 1 + }, + "sid": { + "description": "The name of your service in Oracle.", + "examples": ["ORCLPDB1"], + "type": "string", + "title": "SID", + "order": 2 + }, + "username": { + "description": "The username used to authenticate and connect.", + "type": "string", + "title": "Username", + "order": 3 + }, + "password": { + "description": "The password corresponding to the username used for authentication.", + "type": "string", + "multiwoven_secret": true, + "title": "Password", + "order": 4 + } + } + } +} \ No newline at end of file diff --git a/integrations/lib/multiwoven/integrations/destination/oracle_db/icon.svg b/integrations/lib/multiwoven/integrations/destination/oracle_db/icon.svg new file mode 100644 index 00000000..3f4e051f --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/oracle_db/icon.svg @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index 76ece7b2..03ec5a0a 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.5.2" + VERSION = "0.6.0" ENABLED_SOURCES = %w[ Snowflake @@ -34,6 +34,7 @@ module Integrations Iterable MariaDB DatabricksLakehouse + Oracle ].freeze end end diff --git a/integrations/multiwoven-integrations.gemspec b/integrations/multiwoven-integrations.gemspec index 34aec384..498a2d19 100644 --- a/integrations/multiwoven-integrations.gemspec +++ b/integrations/multiwoven-integrations.gemspec @@ -53,6 +53,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "rake" spec.add_runtime_dependency "restforce" spec.add_runtime_dependency "ruby-limiter" + spec.add_runtime_dependency "ruby-oci8" spec.add_runtime_dependency "ruby-odbc" spec.add_runtime_dependency "rubyzip" spec.add_runtime_dependency "sequel" diff --git a/integrations/spec/multiwoven/integrations/destination/oracle_db/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/oracle_db/client_spec.rb new file mode 100644 index 00000000..7774e2e5 --- /dev/null +++ b/integrations/spec/multiwoven/integrations/destination/oracle_db/client_spec.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +RSpec.describe Multiwoven::Integrations::Destination::Oracle::Client do + include WebMock::API + + before(:each) do + WebMock.disable_net_connect!(allow_localhost: true) + end + + let(:client) { described_class.new } + let(:connection_config) do + { + host: "localhost", + port: "1521", + servicename: "PDB1", + username: "oracle_user", + password: "oracle_password" + } + end + let(:sync_config_json) do + { + source: { + name: "Sample Source Connector", + type: "source", + connection_specification: { + private_api_key: "test_api_key" + } + }, + destination: { + name: "Databricks", + type: "destination", + connection_specification: connection_config + }, + model: { + name: "ExampleModel", + query: "SELECT col1, col2, col3 FROM test_table_1", + query_type: "raw_sql", + primary_key: "col1" + }, + sync_mode: "incremental", + destination_sync_mode: "insert", + stream: { + name: "table", + action: "create", + json_schema: {}, + supported_sync_modes: %w[incremental] + } + } + end + + let(:oracle_connection) { instance_double(OCI8) } + let(:cursor) { instance_double("OCI8::Cursor") } + + describe "#check_connection" do + context "when the connection is successful" do + it "returns a succeeded connection status" do + allow(OCI8).to receive(:new).and_return(oracle_connection) + allow(oracle_connection).to receive(:exec).and_return(true) + message = client.check_connection(sync_config_json[:destination][:connection_specification]) + result = message.connection_status + expect(result.status).to eq("succeeded") + expect(result.message).to be_nil + end + end + + context "when the connection fails" do + it "returns a failed connection status with an error message" do + allow(client).to receive(:create_connection).and_raise(StandardError, "Connection failed") + message = client.check_connection(sync_config_json[:destination][:connection_specification]) + result = message.connection_status + expect(result.status).to eq("failed") + expect(result.message).to include("Connection failed") + end + end + end + + describe "#discover" do + it "discovers schema successfully" do + response = %w[test_table col1 NUMBER Y] + allow(OCI8).to receive(:new).and_return(oracle_connection) + allow(oracle_connection).to receive(:exec).and_return(cursor) + allow(cursor).to receive(:fetch).and_return(response, nil) + message = client.discover(sync_config_json[:destination][:connection_specification]) + expect(message.catalog).to be_an(Multiwoven::Integrations::Protocol::Catalog) + first_stream = message.catalog.streams.first + expect(first_stream).to be_a(Multiwoven::Integrations::Protocol::Stream) + expect(first_stream.name).to eq("test_table") + expect(first_stream.json_schema).to be_an(Hash) + expect(first_stream.json_schema["type"]).to eq("object") + expect(first_stream.json_schema["properties"]).to eq({ "col1" => { "type" => "string" } }) + end + end + + describe "#write" do + context "when the write operation is successful" do + it "increments the success count" do + sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json( + sync_config_json.to_json + ) + record = [ + { "col1" => 1, "col2" => "first", "col3" => 1.1 } + ] + allow(OCI8).to receive(:new).and_return(oracle_connection) + allow(oracle_connection).to receive(:exec).and_return(1) + allow(cursor).to receive(:fetch).and_return(1, nil) + response = client.write(sync_config, record) + expect(response.tracking.success).to eq(record.size) + expect(response.tracking.failed).to eq(0) + log_message = response.tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") + end + end + + context "when the write operation fails" do + it "increments the failure count" do + sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json( + sync_config_json.to_json + ) + record = [ + { "col1" => 1, "col2" => "first", "col3" => 1.1 } + ] + allow(OCI8).to receive(:new).and_return(oracle_connection) + allow(oracle_connection).to receive(:exec).and_raise(StandardError, "Test error") + response = client.write(sync_config, record) + expect(response.tracking.failed).to eq(record.size) + expect(response.tracking.success).to eq(0) + log_message = response.tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("error") + expect(log_message.message).to include("request") + expect(log_message.message).to include("{\"request\":\"INSERT INTO table (col1, col2, col3) VALUES ('1', 'first', '1.1')\",\"response\":\"Test error\",\"level\":\"error\"}") + end + end + end + + describe "#meta_data" do + # change this to rollout validation for all connector rolling out + it "client class_name and meta name is same" do + meta_name = client.class.to_s.split("::")[-2] + expect(client.send(:meta_data)[:data][:name]).to eq(meta_name) + end + end +end