Skip to content

Commit

Permalink
feat(CE): Add HTTP Model Source Connector (#466)
Browse files Browse the repository at this point in the history
Co-authored-by: TivonB-AI2 <[email protected]>
  • Loading branch information
github-actions[bot] and TivonB-AI2 authored Nov 13, 2024
1 parent bcd6730 commit 3e39637
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 2 deletions.
3 changes: 2 additions & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.13.2)
multiwoven-integrations (0.14.0)
MailchimpMarketing
activesupport
async-websocket
aws-sdk-athena
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
require_relative "integrations/source/databrics_model/client"
require_relative "integrations/source/aws_sagemaker_model/client"
require_relative "integrations/source/google_vertex_model/client"
require_relative "integrations/source/http_model/client"

# Destination
require_relative "integrations/destination/klaviyo/client"
Expand Down
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.13.2"
VERSION = "0.14.0"

ENABLED_SOURCES = %w[
Snowflake
Expand All @@ -19,6 +19,7 @@ module Integrations
DatabricksModel
AwsSagemakerModel
VertexModel
HttpModel
].freeze

ENABLED_DESTINATIONS = %w[
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Source
module HttpModel
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
url_host = connection_config[:url_host]
headers = connection_config[:headers]
response = Multiwoven::Integrations::Core::HttpClient.request(
url_host,
HTTP_GET,
headers: headers
)
if success?(response)
success_status
else
failure_status(nil)
end
rescue StandardError => e
handle_exception(e, {
context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
failure_status(e)
end

def discover(_connection_config = nil)
catalog_json = read_json(CATALOG_SPEC_PATH)
catalog = build_catalog(catalog_json)
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "HTTP MODEL:DISCOVER:EXCEPTION",
type: "error"
})
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
# The server checks the ConnectorQueryType.
# If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol.
# This query is then sent to the AI/ML model.
payload = JSON.parse(sync_config.model.query)
run_model(connection_config, payload)
rescue StandardError => e
handle_exception(e, {
context: "HTTP MODEL:READ:EXCEPTION",
type: "error"
})
end

private

def run_model(connection_config, payload)
connection_config = connection_config.with_indifferent_access
url_host = connection_config[:url_host]
headers = connection_config[:headers]
config = connection_config[:config]
config[:timeout] ||= 30
response = send_request(url_host, payload, headers, config)
process_response(response)
rescue StandardError => e
handle_exception(e, context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error")
end

def process_response(response)
if success?(response)
data = JSON.parse(response.body)
[RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message]
else
create_log_message("HTTP MODEL:RUN_MODEL", "error", "request failed")
end
end

def send_request(url, payload, headers, config)
Multiwoven::Integrations::Core::HttpClient.request(
url,
HTTP_POST,
payload: payload,
headers: headers,
config: config
)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"request_rate_limit": 600,
"request_rate_limit_unit": "minute",
"request_rate_concurrency": 10,
"streams": []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "HttpModel",
"title": "HTTP Model",
"connector_type": "source",
"category": "AI Model",
"documentation_url": "https://docs.mutliwoven.com",
"github_issue_label": "source-http-model",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/source/http-model",
"stream_type": "user_defined",
"connector_query_type": "ai_ml",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Http Model",
"type": "object",
"required": ["url_host"],
"properties": {
"url_host": {
"type": "string",
"title": "URL",
"order": 0
},
"headers": {
"type": "string",
"title": "Http Headers",
"order": 1
},
"config": {
"title": "",
"type": "object",
"properties": {
"timeout": {
"type": "integer",
"minimum": 0,
"default": 30,
"title": "Http Timeout",
"order": 0
}
},
"order": 2
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# frozen_string_literal: true

RSpec.describe Multiwoven::Integrations::Source::HttpModel::Client do
include WebMock::API

before(:each) do
WebMock.disable_net_connect!(allow_localhost: true)
end

let(:client) { described_class.new }
let(:mock_http_session) { double("Net::Http::Session") }

let(:payload) do
{
queries: "Hello there"
}
end

let(:sync_config_json) do
{
source: {
name: "DestinationConnectorName",
type: "destination",
connection_specification: {
url_host: "https://your-subdomain",
headers: {
"Accept" => "application/json",
"Authorization" => "Bearer test_token",
"Content-Type" => "application/json"
},
config: {
timeout: 25
}
}
},
destination: {
name: "Http",
type: "destination",
connection_specification: {
example_destination_key: "example_destination_value"
}
},
model: {
name: "ExampleModel",
query: payload.to_json,
query_type: "ai_ml",
primary_key: "id"
},
stream: {
name: "example_stream",
json_schema: { "field1": "type1" },
request_method: "POST",
request_rate_limit: 4,
rate_limit_unit_seconds: 1
},
sync_mode: "full_refresh",
cursor_field: "timestamp",
destination_sync_mode: "upsert",
sync_id: "1"
}
end

let(:sync_config) { Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) }

before do
allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request)
end

describe "#check_connection" do
context "when the connection is successful" do
let(:response_body) { { "message" => "success" }.to_json }
before do
response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized")
response.content_type = "application/json"
url = sync_config_json[:source][:connection_specification][:url_host]
headers = sync_config_json[:source][:connection_specification][:headers]
allow(response).to receive(:body).and_return(response_body)
allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request)
.with(url,
"GET",
headers: headers)
.and_return(response)
end

it "returns a successful connection status" do
response = client.check_connection(sync_config_json[:source][:connection_specification])
expect(response).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
expect(response.connection_status.status).to eq("succeeded")
end
end

context "when the connection fails" do
let(:response_body) { { "message" => "failed" }.to_json }
before do
response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized")
response.content_type = "application/json"
url = sync_config_json[:source][:connection_specification][:url_host]
headers = sync_config_json[:source][:connection_specification][:headers]
allow(response).to receive(:body).and_return(response_body)
allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request)
.with(url,
"GET",
headers: headers)
.and_return(response)
end

it "returns a failed connection status with an error message" do
response = client.check_connection(sync_config_json[:source][:connection_specification])

expect(response).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
expect(response.connection_status.status).to eq("failed")
end
end
end

describe "#discover" do
it "successfully returns the catalog message" do
message = client.discover(nil)
catalog = message.catalog
expect(catalog).to be_a(Multiwoven::Integrations::Protocol::Catalog)
expect(catalog.request_rate_limit).to eql(600)
expect(catalog.request_rate_limit_unit).to eql("minute")
expect(catalog.request_rate_concurrency).to eql(10)
end

it "handles exceptions during discovery" do
allow(client).to receive(:read_json).and_raise(StandardError.new("test error"))
expect(client).to receive(:handle_exception).with(
an_instance_of(StandardError),
hash_including(context: "HTTP MODEL:DISCOVER:EXCEPTION", type: "error")
)
client.discover
end
end

describe "#read" do
context "when the read is successful" do
let(:response_body) { { "message" => "Hello! how can I help" }.to_json }
before do
response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized")
response.content_type = "application/json"
url = sync_config_json[:source][:connection_specification][:url_host]
headers = sync_config_json[:source][:connection_specification][:headers]
config = sync_config_json[:source][:connection_specification][:config]
allow(response).to receive(:body).and_return(response_body)
allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request)
.with(url,
"POST",
payload: JSON.parse(payload.to_json),
headers: headers,
config: config)
.and_return(response)
end

it "successfully reads records" do
records = client.read(sync_config)
expect(records).to be_an(Array)
expect(records.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage)
expect(records.first.record.data).to eq(JSON.parse(response_body))
end
end

context "when the write operation fails" do
let(:response_body) { { "message" => "failed" }.to_json }
before do
response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized")
response.content_type = "application/json"
url = sync_config_json[:source][:connection_specification][:url_host]
headers = sync_config_json[:source][:connection_specification][:headers]
config = sync_config_json[:source][:connection_specification][:config]
allow(response).to receive(:body).and_return(response_body)
allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request)
.with(url,
"GET",
headers: headers,
config: config)
.and_return(response)
end

it "handles exceptions during reading" do
error_instance = StandardError.new("test error")
allow(client).to receive(:run_model).and_raise(error_instance)
expect(client).to receive(:handle_exception).with(
error_instance,
hash_including(context: "HTTP MODEL:READ:EXCEPTION", type: "error")
)

client.read(sync_config)
end
end
end
end

0 comments on commit 3e39637

Please sign in to comment.