Skip to content

Commit

Permalink
feat(CE): add oracle db source connector (#274)
Browse files Browse the repository at this point in the history
Co-authored-by: TivonB-AI2 <[email protected]>
  • Loading branch information
github-actions[bot] and TivonB-AI2 authored Aug 2, 2024
1 parent 108cc82 commit a74d841
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 2 deletions.
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.6.0)
multiwoven-integrations (0.7.0)
activesupport
async-websocket
aws-sdk-athena
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
require_relative "integrations/source/clickhouse/client"
require_relative "integrations/source/amazon_s3/client"
require_relative "integrations/source/maria_db/client"
require_relative "integrations/source/oracle_db/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.6.0"
VERSION = "0.7.0"

ENABLED_SOURCES = %w[
Snowflake
Expand All @@ -15,6 +15,7 @@ module Integrations
Clickhouse
AmazonS3
MariaDB
Oracle
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
127 changes: 127 additions & 0 deletions integrations/lib/multiwoven/integrations/source/oracle_db/client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Source
module Oracle
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
create_connection(connection_config)
ConnectionStatus.new(
status: ConnectionStatusType["succeeded"]
).to_multiwoven_message
rescue StandardError => e
ConnectionStatus.new(
status: ConnectionStatusType["failed"], message: e.message
).to_multiwoven_message
end

def discover(connection_config)
records = []
connection_config = connection_config.with_indifferent_access
query = "SELECT table_name, column_name, data_type, nullable
FROM all_tab_columns
WHERE owner = '#{connection_config[:username].upcase}'
ORDER BY table_name, column_id"
conn = create_connection(connection_config)
cursor = conn.exec(query)
while (row = cursor.fetch)
records << row
end
catalog = Catalog.new(streams: create_streams(records))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(
"ORACLE:DISCOVER:EXCEPTION",
"error",
e
)
end

def read(sync_config)
connection_config = sync_config.source.connection_specification.with_indifferent_access
query = sync_config.model.query
db = create_connection(connection_config)
query(db, query)
rescue StandardError => e
handle_exception(e, {
context: "ORACLE:READ:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
end

private

def create_connection(connection_config)
OCI8.new(connection_config[:username], connection_config[:password], "#{connection_config[:host]}:#{connection_config[:port]}/#{connection_config[:sid]}")
end

def create_streams(records)
group_by_table(records).map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(name: r[:tablename], action: StreamAction["fetch"], json_schema: convert_to_json_schema(r[:columns]))
end
end

def query(connection, query)
records = []
query = reformat_query(query)
cursor = connection.exec(query)
columns = cursor.get_col_names
while (row = cursor.fetch)
data_hash = columns.zip(row).to_h
records << RecordMessage.new(data: data_hash, emitted_at: Time.now.to_i).to_multiwoven_message
end
records
end

def group_by_table(records)
result = {}
records.each_with_index do |entry, index|
table_name = entry[0]
column_data = {
column_name: entry[1],
data_type: entry[2],
is_nullable: entry[3] == "Y"
}
result[index] ||= {}
result[index][:tablename] = table_name
result[index][:columns] = [column_data]
end
result.values.group_by { |entry| entry[:tablename] }.transform_values do |entries|
{ tablename: entries.first[:tablename], columns: entries.flat_map { |entry| entry[:columns] } }
end
end

def reformat_query(sql_query)
offset = nil
limit = nil

sql_query = sql_query.gsub(";", "")

if sql_query.match?(/LIMIT (\d+)/i)
limit = sql_query.match(/LIMIT (\d+)/i)[1].to_i
sql_query.sub!(/LIMIT \d+/i, "")
end

if sql_query.match?(/OFFSET (\d+)/i)
offset = sql_query.match(/OFFSET (\d+)/i)[1].to_i
sql_query.sub!(/OFFSET \d+/i, "")
end

sql_query.strip!

if offset && limit
"#{sql_query} OFFSET #{offset} ROWS FETCH NEXT #{limit} ROWS ONLY"
elsif offset
"#{sql_query} OFFSET #{offset} ROWS"
elsif limit
"#{sql_query} FETCH NEXT #{limit} ROWS ONLY"
else
sql_query
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "Oracle",
"title": "Oracle",
"connector_type": "source",
"category": "Database",
"documentation_url": "https://docs.squared.ai/guides/data-integration/source/oracle",
"github_issue_label": "source-oracle",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"documentation_url": "https://docs.squared.ai/guides/data-integration/source/oracle",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Oracle",
"type": "object",
"required": ["host", "port", "sid", "username", "password"],
"properties": {
"host": {
"description": "The Oracle host.",
"examples": ["localhost"],
"type": "string",
"title": "Host",
"order": 0
},
"port": {
"description": "The Oracle port number.",
"examples": ["1521"],
"type": "string",
"title": "Port",
"order": 1
},
"sid": {
"description": "The name of your service in Oracle.",
"examples": ["ORCLPDB1"],
"type": "string",
"title": "SID",
"order": 2
},
"username": {
"description": "The username used to authenticate and connect.",
"type": "string",
"title": "Username",
"order": 3
},
"password": {
"description": "The password corresponding to the username used for authentication.",
"type": "string",
"multiwoven_secret": true,
"title": "Password",
"order": 4
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# frozen_string_literal: true

RSpec.describe Multiwoven::Integrations::Source::Oracle::Client do
let(:client) { Multiwoven::Integrations::Source::Oracle::Client.new }
let(:sync_config) do
{
"source": {
"name": "OracleConnector",
"type": "source",
"connection_specification": {
"host": "localhost",
"port": "1521",
"servicename": "PDB1",
"username": "oracle_user",
"password": "oracle_password"
}
},
"destination": {
"name": "DestinationConnectorName",
"type": "destination",
"connection_specification": {
"example_destination_key": "example_destination_value"
}
},
"model": {
"name": "OracleDB Model",
"query": "SELECT col1, col2, col3 FROM test_table",
"query_type": "raw_sql",
"primary_key": "id"
},
"stream": {
"name": "example_stream", "action": "create",
"json_schema": { "field1": "type1" },
"supported_sync_modes": %w[full_refresh incremental],
"source_defined_cursor": true,
"default_cursor_field": ["field1"],
"source_defined_primary_key": [["field1"], ["field2"]],
"namespace": "exampleNamespace",
"url": "https://api.example.com/data",
"method": "GET"
},
"sync_mode": "full_refresh",
"cursor_field": "timestamp",
"destination_sync_mode": "upsert",
"sync_id": "1"
}
end

let(:oracle_connection) { instance_double(OCI8) }
let(:cursor) { instance_double("OCI8::Cursor") }

describe "#check_connection" do
context "when the connection is successful" do
it "returns a succeeded connection status" do
allow(OCI8).to receive(:new).and_return(oracle_connection)
allow(oracle_connection).to receive(:exec).and_return(true)
message = client.check_connection(sync_config[:source][:connection_specification])
result = message.connection_status
expect(result.status).to eq("succeeded")
expect(result.message).to be_nil
end
end

context "when the connection fails" do
it "returns a failed connection status with an error message" do
allow(client).to receive(:create_connection).and_raise(StandardError, "Connection failed")
message = client.check_connection(sync_config[:source][:connection_specification])
result = message.connection_status
expect(result.status).to eq("failed")
expect(result.message).to include("Connection failed")
end
end
end

# read and #discover tests for MariaDB
describe "#read" do
it "reads records successfully" do
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
columns = %w[col1 col2 col3]
response = %w[1 First Row Text First Row Additional Text]
allow(OCI8).to receive(:new).and_return(oracle_connection)
allow(oracle_connection).to receive(:exec).and_return(cursor)
allow(cursor).to receive(:get_col_names).and_return(columns, nil)
allow(cursor).to receive(:fetch).and_return(response, nil)
records = client.read(s_config)
expect(records).to be_an(Array)
expect(records).not_to be_empty
expect(records.first).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
end

it "reads records successfully with limit" do
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
s_config.limit = 100
s_config.offset = 1
columns = %w[col1 col2 col3]
response = %w[1 First Row Text First Row Additional Text]
allow(OCI8).to receive(:new).and_return(oracle_connection)
allow(oracle_connection).to receive(:exec).and_return(cursor)
allow(cursor).to receive(:get_col_names).and_return(columns, nil)
allow(cursor).to receive(:fetch).and_return(response, nil)
records = client.read(s_config)
expect(records).to be_an(Array)
expect(records).not_to be_empty
expect(records.first).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
end

it "read records failure" do
s_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config.to_json)
s_config.sync_run_id = "2"
allow(client).to receive(:create_connection).and_raise(StandardError, "test error")
expect(client).to receive(:handle_exception).with(
an_instance_of(StandardError), {
context: "ORACLE:READ:EXCEPTION",
type: "error",
sync_id: "1",
sync_run_id: "2"
}
)
client.read(s_config)
end
end

describe "#discover" do
it "discovers schema successfully" do
response = %w[test_table col1 NUMBER Y]
allow(OCI8).to receive(:new).and_return(oracle_connection)
allow(oracle_connection).to receive(:exec).and_return(cursor)
allow(cursor).to receive(:fetch).and_return(response, nil)
message = client.discover(sync_config[:source][:connection_specification])
expect(message.catalog).to be_an(Multiwoven::Integrations::Protocol::Catalog)
first_stream = message.catalog.streams.first
expect(first_stream).to be_a(Multiwoven::Integrations::Protocol::Stream)
expect(first_stream.name).to eq("test_table")
expect(first_stream.json_schema).to be_an(Hash)
expect(first_stream.json_schema["type"]).to eq("object")
expect(first_stream.json_schema["properties"]).to eq({ "col1" => { "type" => "string" } })
end
end

describe "#meta_data" do
# change this to rollout validation for all connector rolling out
it "client class_name and meta name is same" do
meta_name = client.class.to_s.split("::")[-2]
expect(client.send(:meta_data)[:data][:name]).to eq(meta_name)
end
end

describe "method definition" do
it "defines a private #query method" do
expect(described_class.private_instance_methods).to include(:query)
end
end
end

0 comments on commit a74d841

Please sign in to comment.