From 3e39637e4c75c15e68e0f8f08b8172881b65261e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:30:15 +0530 Subject: [PATCH] feat(CE): Add HTTP Model Source Connector (#466) Co-authored-by: TivonB-AI2 <124182151+TivonB-AI2@users.noreply.github.com> --- integrations/Gemfile.lock | 3 +- integrations/lib/multiwoven/integrations.rb | 1 + .../lib/multiwoven/integrations/rollout.rb | 3 +- .../integrations/source/http_model/client.rb | 89 ++++++++ .../source/http_model/config/catalog.json | 6 + .../source/http_model/config/meta.json | 15 ++ .../source/http_model/config/spec.json | 37 ++++ .../source/http_model/client_spec.rb | 192 ++++++++++++++++++ 8 files changed, 344 insertions(+), 2 deletions(-) create mode 100644 integrations/lib/multiwoven/integrations/source/http_model/client.rb create mode 100644 integrations/lib/multiwoven/integrations/source/http_model/config/catalog.json create mode 100644 integrations/lib/multiwoven/integrations/source/http_model/config/meta.json create mode 100644 integrations/lib/multiwoven/integrations/source/http_model/config/spec.json create mode 100644 integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index a1944352..57f8f681 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,8 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.13.2) + multiwoven-integrations (0.14.0) + MailchimpMarketing activesupport async-websocket aws-sdk-athena diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index d64b7d96..ea783a62 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -70,6 +70,7 @@ require_relative "integrations/source/databrics_model/client" require_relative "integrations/source/aws_sagemaker_model/client" require_relative "integrations/source/google_vertex_model/client" +require_relative "integrations/source/http_model/client" # Destination require_relative "integrations/destination/klaviyo/client" diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index ce530d91..26ea3a0e 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.13.2" + VERSION = "0.14.0" ENABLED_SOURCES = %w[ Snowflake @@ -19,6 +19,7 @@ module Integrations DatabricksModel AwsSagemakerModel VertexModel + HttpModel ].freeze ENABLED_DESTINATIONS = %w[ diff --git a/integrations/lib/multiwoven/integrations/source/http_model/client.rb b/integrations/lib/multiwoven/integrations/source/http_model/client.rb new file mode 100644 index 00000000..9d175c89 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/http_model/client.rb @@ -0,0 +1,89 @@ +# frozen_string_literal: true + +module Multiwoven::Integrations::Source + module HttpModel + include Multiwoven::Integrations::Core + class Client < SourceConnector + def check_connection(connection_config) + connection_config = connection_config.with_indifferent_access + url_host = connection_config[:url_host] + headers = connection_config[:headers] + response = Multiwoven::Integrations::Core::HttpClient.request( + url_host, + HTTP_GET, + headers: headers + ) + if success?(response) + success_status + else + failure_status(nil) + end + rescue StandardError => e + handle_exception(e, { + context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION", + type: "error" + }) + failure_status(e) + end + + def discover(_connection_config = nil) + catalog_json = read_json(CATALOG_SPEC_PATH) + catalog = build_catalog(catalog_json) + catalog.to_multiwoven_message + rescue StandardError => e + handle_exception(e, { + context: "HTTP MODEL:DISCOVER:EXCEPTION", + type: "error" + }) + end + + def read(sync_config) + connection_config = sync_config.source.connection_specification + connection_config = connection_config.with_indifferent_access + # The server checks the ConnectorQueryType. + # If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol. + # This query is then sent to the AI/ML model. + payload = JSON.parse(sync_config.model.query) + run_model(connection_config, payload) + rescue StandardError => e + handle_exception(e, { + context: "HTTP MODEL:READ:EXCEPTION", + type: "error" + }) + end + + private + + def run_model(connection_config, payload) + connection_config = connection_config.with_indifferent_access + url_host = connection_config[:url_host] + headers = connection_config[:headers] + config = connection_config[:config] + config[:timeout] ||= 30 + response = send_request(url_host, payload, headers, config) + process_response(response) + rescue StandardError => e + handle_exception(e, context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error") + end + + def process_response(response) + if success?(response) + data = JSON.parse(response.body) + [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] + else + create_log_message("HTTP MODEL:RUN_MODEL", "error", "request failed") + end + end + + def send_request(url, payload, headers, config) + Multiwoven::Integrations::Core::HttpClient.request( + url, + HTTP_POST, + payload: payload, + headers: headers, + config: config + ) + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/source/http_model/config/catalog.json b/integrations/lib/multiwoven/integrations/source/http_model/config/catalog.json new file mode 100644 index 00000000..dacb788b --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/http_model/config/catalog.json @@ -0,0 +1,6 @@ +{ + "request_rate_limit": 600, + "request_rate_limit_unit": "minute", + "request_rate_concurrency": 10, + "streams": [] +} diff --git a/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json b/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json new file mode 100644 index 00000000..dca2f332 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json @@ -0,0 +1,15 @@ +{ + "data": { + "name": "HttpModel", + "title": "HTTP Model", + "connector_type": "source", + "category": "AI Model", + "documentation_url": "https://docs.mutliwoven.com", + "github_issue_label": "source-http-model", + "icon": "icon.svg", + "license": "MIT", + "release_stage": "alpha", + "support_level": "community", + "tags": ["language:ruby", "multiwoven"] + } +} diff --git a/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json b/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json new file mode 100644 index 00000000..f270b35d --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json @@ -0,0 +1,37 @@ +{ + "documentation_url": "https://docs.multiwoven.com/integrations/source/http-model", + "stream_type": "user_defined", + "connector_query_type": "ai_ml", + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Http Model", + "type": "object", + "required": ["url_host"], + "properties": { + "url_host": { + "type": "string", + "title": "URL", + "order": 0 + }, + "headers": { + "type": "string", + "title": "Http Headers", + "order": 1 + }, + "config": { + "title": "", + "type": "object", + "properties": { + "timeout": { + "type": "integer", + "minimum": 0, + "default": 30, + "title": "Http Timeout", + "order": 0 + } + }, + "order": 2 + } + } + } +} diff --git a/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb b/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb new file mode 100644 index 00000000..fb7dd951 --- /dev/null +++ b/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb @@ -0,0 +1,192 @@ +# frozen_string_literal: true + +RSpec.describe Multiwoven::Integrations::Source::HttpModel::Client do + include WebMock::API + + before(:each) do + WebMock.disable_net_connect!(allow_localhost: true) + end + + let(:client) { described_class.new } + let(:mock_http_session) { double("Net::Http::Session") } + + let(:payload) do + { + queries: "Hello there" + } + end + + let(:sync_config_json) do + { + source: { + name: "DestinationConnectorName", + type: "destination", + connection_specification: { + url_host: "https://your-subdomain", + headers: { + "Accept" => "application/json", + "Authorization" => "Bearer test_token", + "Content-Type" => "application/json" + }, + config: { + timeout: 25 + } + } + }, + destination: { + name: "Http", + type: "destination", + connection_specification: { + example_destination_key: "example_destination_value" + } + }, + model: { + name: "ExampleModel", + query: payload.to_json, + query_type: "ai_ml", + primary_key: "id" + }, + stream: { + name: "example_stream", + json_schema: { "field1": "type1" }, + request_method: "POST", + request_rate_limit: 4, + rate_limit_unit_seconds: 1 + }, + sync_mode: "full_refresh", + cursor_field: "timestamp", + destination_sync_mode: "upsert", + sync_id: "1" + } + end + + let(:sync_config) { Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) } + + before do + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + end + + describe "#check_connection" do + context "when the connection is successful" do + let(:response_body) { { "message" => "success" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized") + response.content_type = "application/json" + url = sync_config_json[:source][:connection_specification][:url_host] + headers = sync_config_json[:source][:connection_specification][:headers] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(url, + "GET", + headers: headers) + .and_return(response) + end + + it "returns a successful connection status" do + response = client.check_connection(sync_config_json[:source][:connection_specification]) + expect(response).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(response.connection_status.status).to eq("succeeded") + end + end + + context "when the connection fails" do + let(:response_body) { { "message" => "failed" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized") + response.content_type = "application/json" + url = sync_config_json[:source][:connection_specification][:url_host] + headers = sync_config_json[:source][:connection_specification][:headers] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(url, + "GET", + headers: headers) + .and_return(response) + end + + it "returns a failed connection status with an error message" do + response = client.check_connection(sync_config_json[:source][:connection_specification]) + + expect(response).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(response.connection_status.status).to eq("failed") + end + end + end + + describe "#discover" do + it "successfully returns the catalog message" do + message = client.discover(nil) + catalog = message.catalog + expect(catalog).to be_a(Multiwoven::Integrations::Protocol::Catalog) + expect(catalog.request_rate_limit).to eql(600) + expect(catalog.request_rate_limit_unit).to eql("minute") + expect(catalog.request_rate_concurrency).to eql(10) + end + + it "handles exceptions during discovery" do + allow(client).to receive(:read_json).and_raise(StandardError.new("test error")) + expect(client).to receive(:handle_exception).with( + an_instance_of(StandardError), + hash_including(context: "HTTP MODEL:DISCOVER:EXCEPTION", type: "error") + ) + client.discover + end + end + + describe "#read" do + context "when the read is successful" do + let(:response_body) { { "message" => "Hello! how can I help" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized") + response.content_type = "application/json" + url = sync_config_json[:source][:connection_specification][:url_host] + headers = sync_config_json[:source][:connection_specification][:headers] + config = sync_config_json[:source][:connection_specification][:config] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(url, + "POST", + payload: JSON.parse(payload.to_json), + headers: headers, + config: config) + .and_return(response) + end + + it "successfully reads records" do + records = client.read(sync_config) + expect(records).to be_an(Array) + expect(records.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(records.first.record.data).to eq(JSON.parse(response_body)) + end + end + + context "when the write operation fails" do + let(:response_body) { { "message" => "failed" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized") + response.content_type = "application/json" + url = sync_config_json[:source][:connection_specification][:url_host] + headers = sync_config_json[:source][:connection_specification][:headers] + config = sync_config_json[:source][:connection_specification][:config] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(url, + "GET", + headers: headers, + config: config) + .and_return(response) + end + + it "handles exceptions during reading" do + error_instance = StandardError.new("test error") + allow(client).to receive(:run_model).and_raise(error_instance) + expect(client).to receive(:handle_exception).with( + error_instance, + hash_including(context: "HTTP MODEL:READ:EXCEPTION", type: "error") + ) + + client.read(sync_config) + end + end + end +end