From 9dd96706ba2de216596f999e0d7cd37f693c4b0b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 26 Dec 2024 13:48:07 +0530 Subject: [PATCH 1/5] feat(CE): stream support in http model (#532) Co-authored-by: afthab vp --- integrations/Gemfile.lock | 2 +- integrations/lib/multiwoven/integrations.rb | 2 + .../integrations/core/http_client.rb | 30 +---- .../integrations/core/http_helper.rb | 36 ++++++ .../integrations/core/source_connector.rb | 22 ++++ .../core/streaming_http_client.rb | 21 ++++ .../lib/multiwoven/integrations/rollout.rb | 2 +- .../integrations/source/http_model/client.rb | 108 ++++++++++-------- .../source/http_model/config/spec.json | 15 ++- .../core/streaming_http_client_spec.rb | 54 +++++++++ .../source/http_model/client_spec.rb | 70 +++++++++++- 11 files changed, 280 insertions(+), 82 deletions(-) create mode 100644 integrations/lib/multiwoven/integrations/core/http_helper.rb create mode 100644 integrations/lib/multiwoven/integrations/core/streaming_http_client.rb create mode 100644 integrations/spec/multiwoven/integrations/core/streaming_http_client_spec.rb diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index 31c38bc6..f801d94b 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.15.10) + multiwoven-integrations (0.15.11) MailchimpMarketing activesupport async-websocket diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index f8164b95..6c3ac56f 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -52,7 +52,9 @@ require_relative "integrations/core/base_connector" require_relative "integrations/core/source_connector" require_relative "integrations/core/destination_connector" +require_relative "integrations/core/http_helper" require_relative "integrations/core/http_client" +require_relative "integrations/core/streaming_http_client" require_relative "integrations/core/query_builder" # Source diff --git a/integrations/lib/multiwoven/integrations/core/http_client.rb b/integrations/lib/multiwoven/integrations/core/http_client.rb index 0e36b49a..fe0d2f1b 100644 --- a/integrations/lib/multiwoven/integrations/core/http_client.rb +++ b/integrations/lib/multiwoven/integrations/core/http_client.rb @@ -3,40 +3,14 @@ module Multiwoven module Integrations::Core class HttpClient + extend HttpHelper class << self def request(url, method, payload: nil, headers: {}, config: {}) uri = URI(url) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = (uri.scheme == "https") - - # Set timeout if provided - if config[:timeout] - timeout_value = config[:timeout].to_f - http.open_timeout = timeout_value - http.read_timeout = timeout_value - end - + http = configure_http(uri, config) request = build_request(method, uri, payload, headers) http.request(request) end - - private - - def build_request(method, uri, payload, headers) - request_class = case method.upcase - when Constants::HTTP_GET then Net::HTTP::Get - when Constants::HTTP_POST then Net::HTTP::Post - when Constants::HTTP_PUT then Net::HTTP::Put - when Constants::HTTP_PATCH then Net::HTTP::Patch - when Constants::HTTP_DELETE then Net::HTTP::Delete - else raise ArgumentError, "Unsupported HTTP method: #{method}" - end - - request = request_class.new(uri) - headers.each { |key, value| request[key] = value } - request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase) - request - end end end end diff --git a/integrations/lib/multiwoven/integrations/core/http_helper.rb b/integrations/lib/multiwoven/integrations/core/http_helper.rb new file mode 100644 index 00000000..f4a8df46 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/core/http_helper.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module Multiwoven + module Integrations::Core + module HttpHelper + def build_request(method, uri, payload, headers) + request_class = case method.upcase + when Constants::HTTP_GET then Net::HTTP::Get + when Constants::HTTP_POST then Net::HTTP::Post + when Constants::HTTP_PUT then Net::HTTP::Put + when Constants::HTTP_PATCH then Net::HTTP::Patch + when Constants::HTTP_DELETE then Net::HTTP::Delete + else raise ArgumentError, "Unsupported HTTP method: #{method}" + end + + request = request_class.new(uri) + headers.each { |key, value| request[key] = value } + request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase) + request + end + + def configure_http(uri, config) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = (uri.scheme == "https") + + if config[:timeout] + timeout_value = config[:timeout].to_f + http.open_timeout = timeout_value + http.read_timeout = timeout_value + end + + http + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/core/source_connector.rb b/integrations/lib/multiwoven/integrations/core/source_connector.rb index 76ac2264..ebe9e6bd 100644 --- a/integrations/lib/multiwoven/integrations/core/source_connector.rb +++ b/integrations/lib/multiwoven/integrations/core/source_connector.rb @@ -39,6 +39,28 @@ def batched_query(sql_query, limit, offset) # Appending the LIMIT and OFFSET clauses to the SQL query "#{sql_query} LIMIT #{limit} OFFSET #{offset}" end + + def send_request(options = {}) + Multiwoven::Integrations::Core::HttpClient.request( + options[:url], + options[:http_method], + payload: options[:payload], + headers: options[:headers], + config: options[:config] + ) + end + + def send_streaming_request(options = {}) + Multiwoven::Integrations::Core::StreamingHttpClient.request( + options[:url], + options[:http_method], + payload: options[:payload], + headers: options[:headers], + config: options[:config] + ) do |chunk| + yield chunk if block_given? # Pass each chunk for processing (streaming response) + end + end end end end diff --git a/integrations/lib/multiwoven/integrations/core/streaming_http_client.rb b/integrations/lib/multiwoven/integrations/core/streaming_http_client.rb new file mode 100644 index 00000000..ad8aa306 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/core/streaming_http_client.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +module Multiwoven + module Integrations::Core + class StreamingHttpClient + extend HttpHelper + class << self + def request(url, method, payload: nil, headers: {}, config: {}) + uri = URI(url) + http = configure_http(uri, config) + request = build_request(method, uri, payload, headers) + http.request(request) do |response| + response.read_body do |chunk| + yield chunk if block_given? # Pass each response chunk + end + end + end + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index 961bc54b..f7dbce86 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.15.10" + VERSION = "0.15.11" ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/lib/multiwoven/integrations/source/http_model/client.rb b/integrations/lib/multiwoven/integrations/source/http_model/client.rb index e5c8c045..8fe2883d 100644 --- a/integrations/lib/multiwoven/integrations/source/http_model/client.rb +++ b/integrations/lib/multiwoven/integrations/source/http_model/client.rb @@ -5,24 +5,17 @@ module HttpModel include Multiwoven::Integrations::Core class Client < SourceConnector def check_connection(connection_config) - connection_config = connection_config.with_indifferent_access - url_host = connection_config[:url_host] - http_method = connection_config[:http_method] - headers = connection_config[:headers] - payload = JSON.parse(connection_config[:request_format]) - config = connection_config[:config] - config[:timeout] ||= 30 - response = send_request(url_host, http_method, payload, headers, config) - if success?(response) - success_status - else - failure_status(nil) - end + connection_config = prepare_config(connection_config) + response = send_request( + url: connection_config[:url_host], + http_method: connection_config[:http_method], + payload: JSON.parse(connection_config[:request_format]), + headers: connection_config[:headers], + config: connection_config[:config] + ) + success?(response) ? success_status : failure_status(nil) rescue StandardError => e - handle_exception(e, { - context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION", - type: "error" - }) + handle_exception(e, { context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION", type: "error" }) failure_status(e) end @@ -31,40 +24,66 @@ def discover(_connection_config = nil) catalog = build_catalog(catalog_json) catalog.to_multiwoven_message rescue StandardError => e - handle_exception(e, { - context: "HTTP MODEL:DISCOVER:EXCEPTION", - type: "error" - }) + handle_exception(e, { context: "HTTP MODEL:DISCOVER:EXCEPTION", type: "error" }) end def read(sync_config) - connection_config = sync_config.source.connection_specification - connection_config = connection_config.with_indifferent_access + connection_config = prepare_config(sync_config.source.connection_specification) + stream = connection_config[:is_stream] ||= false # The server checks the ConnectorQueryType. # If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol. # This query is then sent to the AI/ML model. - payload = JSON.parse(sync_config.model.query) - run_model(connection_config, payload) + payload = parse_json(sync_config.model.query) + + if stream + run_model_stream(connection_config, payload) { |message| yield message if block_given? } + else + run_model(connection_config, payload) + end rescue StandardError => e - handle_exception(e, { - context: "HTTP MODEL:READ:EXCEPTION", - type: "error" - }) + handle_exception(e, { context: "HTTP MODEL:READ:EXCEPTION", type: "error" }) end private + def prepare_config(config) + config.with_indifferent_access.tap do |conf| + conf[:config][:timeout] ||= 30 + end + end + + def parse_json(json_string) + JSON.parse(json_string) + rescue JSON::ParserError => e + handle_exception(e, { context: "HTTP MODEL:PARSE_JSON:EXCEPTION", type: "error" }) + {} + end + def run_model(connection_config, payload) - connection_config = connection_config.with_indifferent_access - url_host = connection_config[:url_host] - headers = connection_config[:headers] - config = connection_config[:config] - http_method = connection_config[:http_method] - config[:timeout] ||= 30 - response = send_request(url_host, http_method, payload, headers, config) + response = send_request( + url: connection_config[:url_host], + http_method: connection_config[:http_method], + payload: payload, + headers: connection_config[:headers], + config: connection_config[:config] + ) process_response(response) rescue StandardError => e - handle_exception(e, context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error") + handle_exception(e, { context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error" }) + end + + def run_model_stream(connection_config, payload) + send_streaming_request( + url: connection_config[:url_host], + http_method: connection_config[:http_method], + payload: payload, + headers: connection_config[:headers], + config: connection_config[:config] + ) do |chunk| + process_streaming_response(chunk) { |message| yield message if block_given? } + end + rescue StandardError => e + handle_exception(e, { context: "HTTP MODEL:RUN_STREAM_MODEL:EXCEPTION", type: "error" }) end def process_response(response) @@ -74,16 +93,15 @@ def process_response(response) else create_log_message("HTTP MODEL:RUN_MODEL", "error", "request failed: #{response.body}") end + rescue StandardError => e + handle_exception(e, { context: "HTTP MODEL:PROCESS_RESPONSE:EXCEPTION", type: "error" }) end - def send_request(url, http_method, payload, headers, config) - Multiwoven::Integrations::Core::HttpClient.request( - url, - http_method, - payload: payload, - headers: headers, - config: config - ) + def process_streaming_response(chunk) + data = JSON.parse(chunk) + yield [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] if block_given? + rescue StandardError => e + handle_exception(e, { context: "HTTP MODEL:PROCESS_STREAMING_RESPONSE:EXCEPTION", type: "error" }) end end end diff --git a/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json b/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json index 2b6989bb..9e83dc8b 100644 --- a/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json +++ b/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json @@ -19,10 +19,17 @@ "title": "URL", "order": 1 }, + "is_stream": { + "type": "boolean", + "title": "Streaming Enabled", + "description": "Enables data streaming for such as chat, when supported by the model. When true, messages and model data are processed in chunks for immediate delivery, enhancing responsiveness. Default is false, processing only after the entire response is received.", + "default": false, + "order": 2 + }, "headers": { "title": "HTTP Headers", "description": "Custom headers to include in the HTTP request. Useful for authentication, content type specifications, and other request metadata.", - "order": 2, + "order": 3, "additionalProperties": { "type": "string" }, @@ -42,21 +49,21 @@ "order": 0 } }, - "order": 3 + "order": 4 }, "request_format": { "title": "Request Format", "description": "Sample Request Format", "type": "string", "x-request-format": true, - "order": 4 + "order": 5 }, "response_format": { "title": "Response Format", "description": "Sample Response Format", "type": "string", "x-response-format": true, - "order": 5 + "order": 6 } } } diff --git a/integrations/spec/multiwoven/integrations/core/streaming_http_client_spec.rb b/integrations/spec/multiwoven/integrations/core/streaming_http_client_spec.rb new file mode 100644 index 00000000..c01c8c6e --- /dev/null +++ b/integrations/spec/multiwoven/integrations/core/streaming_http_client_spec.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module Multiwoven + module Integrations::Core + RSpec.describe StreamingHttpClient do + describe ".request" do + let(:url) { "https://example.com/api/stream" } + let(:method) { "GET" } + let(:headers) { { "Authorization" => "Bearer token" } } + let(:config) { { timeout: 5 } } + let(:mock_response) { double("Net::HTTPResponse", code: "200") } + + it "makes a streaming HTTP request" do + http = double("Net::HTTP") + allow(Net::HTTP).to receive(:new).and_return(http) + allow(http).to receive(:use_ssl=) + allow(http).to receive(:open_timeout=) + allow(http).to receive(:read_timeout=) + allow(http).to receive(:request) do |&block| + block.call(mock_response) + end + + allow(mock_response).to receive(:read_body) do |&block| + block.call("chunk1") + block.call("chunk2") + block.call("chunk3") + end + chunks = [] + described_class.request(url, method, headers: headers, config: config) do |chunk| + chunks << chunk + end + expect(chunks).to eq(%w[chunk1 chunk2 chunk3]) + end + + it "handles errors gracefully" do + http = double("Net::HTTP") + allow(Net::HTTP).to receive(:new).and_return(http) + allow(http).to receive(:use_ssl=) + allow(http).to receive(:open_timeout=) + allow(http).to receive(:read_timeout=) + allow(http).to receive(:request) do |&block| + block.call(mock_response) + end + + allow(mock_response).to receive(:read_body).and_raise(StandardError, "Network error") + + expect do + described_class.request(url, method, headers: headers, config: config) { |_| } + end.to raise_error(StandardError, "Network error") + end + end + end + end +end diff --git a/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb b/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb index c69e0ae2..cce8315f 100644 --- a/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/source/http_model/client_spec.rb @@ -63,7 +63,10 @@ end let(:sync_config) { Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) } - + let(:sync_config_stream) do + sync_config_json[:source][:connection_specification][:is_stream] = true + Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) + end before do allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) end @@ -144,7 +147,7 @@ context "when the read is successful" do let(:response_body) { { "message" => "Hello! how can I help" }.to_json } before do - response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized") + response = Net::HTTPSuccess.new("1.1", "200", "success") response.content_type = "application/json" url = sync_config_json[:source][:connection_specification][:url_host] http_method = sync_config_json[:source][:connection_specification][:http_method] @@ -168,7 +171,7 @@ end end - context "when the write operation fails" do + context "when the read operation fails" do let(:response_body) { { "message" => "failed" }.to_json } before do response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized") @@ -198,4 +201,65 @@ end end end + + describe "#read with is_stream = true" do + context "when the read is successful" do + before do + payload = sync_config_json[:model][:query] + streaming_chunk_first = { "message" => "streaming data 1" }.to_json + streaming_chunk_second = { "message" => "streaming data 2" }.to_json + + allow(Multiwoven::Integrations::Core::StreamingHttpClient).to receive(:request) + .with(sync_config_json[:source][:connection_specification][:url_host], + sync_config_json[:source][:connection_specification][:http_method], + payload: JSON.parse(payload), + headers: sync_config_json[:source][:connection_specification][:headers], + config: sync_config_json[:source][:connection_specification][:config]) + .and_yield(streaming_chunk_first) + .and_yield(streaming_chunk_second) + + response = Net::HTTPSuccess.new("1.1", "200", "success") + response.content_type = "application/json" + end + + it "streams data and processes chunks" do + results = [] + client.read(sync_config_stream) { |message| results << message } + expect(results.first).to be_an(Array) + expect(results.first.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results.first.first.record.data["message"]).to eq("streaming data 1") + + expect(results.last).to be_an(Array) + expect(results.last.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results.last.first.record.data["message"]).to eq("streaming data 2") + end + end + + context "when streaming fails on a chunk" do + let(:streaming_chunk_first) { { "message" => "streaming data chunk 1" }.to_json } + + before do + url = sync_config_json[:source][:connection_specification][:url_host] + http_method = sync_config_json[:stream][:request_method] + headers = sync_config_json[:source][:connection_specification][:headers] + config = sync_config_json[:source][:connection_specification][:config] + allow(Multiwoven::Integrations::Core::StreamingHttpClient).to receive(:request) + .with(url, + http_method, + payload: JSON.parse(payload.to_json), + headers: headers, + config: config) + .and_yield(streaming_chunk_first) + .and_raise(StandardError, "Streaming error on chunk 2") + end + + it "handles streaming errors gracefully" do + results = [] + client.read(sync_config_stream) { |message| results << message } + expect(results.last).to be_an(Array) + expect(results.last.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results.last.first.record.data["message"]).to eq("streaming data chunk 1") + end + end + end end From 9bb9389510421e6fd40ab7db4a34455b19457641 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 26 Dec 2024 13:50:50 +0530 Subject: [PATCH 2/5] chore(CE): list api accept per page (#543) Co-authored-by: afthab vp --- .../spec/requests/api/v1/models_controller_spec.rb | 12 ++++++++++++ .../requests/api/v1/sync_runs_controller_spec.rb | 8 +++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/server/spec/requests/api/v1/models_controller_spec.rb b/server/spec/requests/api/v1/models_controller_spec.rb index 80a378e4..447c9ecf 100644 --- a/server/spec/requests/api/v1/models_controller_spec.rb +++ b/server/spec/requests/api/v1/models_controller_spec.rb @@ -63,6 +63,18 @@ expect(response_hash.dig(:links, :first)).to include("http://www.example.com/api/v1/models?page=1&per_page=20") end + it "returns success and no models when the data is empty" do + workspace.models.destroy_all + get "/api/v1/models", headers: auth_headers(user, workspace_id) + expect(response).to have_http_status(:ok) + response_hash = JSON.parse(response.body).with_indifferent_access + expect(response_hash[:data].count).to eql(0) + expect(response_hash.dig(:links, :first)).to include("http://www.example.com/api/v1/models?page=1") + expect(response_hash.dig(:links, :last)).to include("http://www.example.com/api/v1/models?page=1") + expect(response_hash.dig(:links, :next)).to be_nil + expect(response_hash.dig(:links, :prev)).to be_nil + end + it "returns success and all mode for viewer role" do workspace.workspace_users.first.update(role: viewer_role) get "/api/v1/models", headers: auth_headers(user, workspace_id) diff --git a/server/spec/requests/api/v1/sync_runs_controller_spec.rb b/server/spec/requests/api/v1/sync_runs_controller_spec.rb index a3e2f08a..278b277e 100644 --- a/server/spec/requests/api/v1/sync_runs_controller_spec.rb +++ b/server/spec/requests/api/v1/sync_runs_controller_spec.rb @@ -38,15 +38,13 @@ context "when it is an authenticated user" do it "returns success and fetch sync " do -<<<<<<< HEAD - get "/api/v1/syncs/#{sync.id}/sync_runs", headers: auth_headers(user, workspace_id) -======= get "/api/v1/syncs/#{sync.id}/sync_runs?page=1&per_page=20", headers: auth_headers(user, workspace_id) response_hash = JSON.parse(response.body).with_indifferent_access ->>>>>>> ac183819 (chore(CE): list api accept per page (#732)) expect(response).to have_http_status(:ok) - response_hash = JSON.parse(response.body).with_indifferent_access expect(response_hash[:data].size).to eq(2) + first_row_date = DateTime.parse(response_hash[:data].first.dig(:attributes, :updated_at)) + second_row_date = DateTime.parse(response_hash[:data].last.dig(:attributes, :updated_at)) + expect(first_row_date).to be > second_row_date response_hash[:data].each_with_index do |row, _index| sync_run = sync_runs.find { |sr| sr.id == row[:id].to_i } From c0ce381379ab14370827a16d3e013996144a8a43 Mon Sep 17 00:00:00 2001 From: "Rafael E. O'Neill" <106079170+RafaelOAiSquared@users.noreply.github.com> Date: Thu, 26 Dec 2024 04:25:54 -0400 Subject: [PATCH 3/5] fix(CE): pagination fix for empty data (#546) Co-authored-by: afthab vp --- server/config/initializers/custom_pagination_links.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/config/initializers/custom_pagination_links.rb b/server/config/initializers/custom_pagination_links.rb index a57ca3ce..e39f93cc 100644 --- a/server/config/initializers/custom_pagination_links.rb +++ b/server/config/initializers/custom_pagination_links.rb @@ -23,8 +23,8 @@ def pages_from {}.tap do |pages| pages[:first] = FIRST_PAGE pages[:prev] = first_page? ? nil : collection.current_page - FIRST_PAGE - pages[:next] = last_page? ? nil : collection.current_page + FIRST_PAGE - pages[:last] = collection.total_pages + pages[:next] = (!last_page? && collection.total_pages > 1) ? collection.current_page + FIRST_PAGE : nil + pages[:last] = [collection.total_pages, FIRST_PAGE].max end end From ed80bd86cefa8a86702d1950a9e270af7131ba2f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 26 Dec 2024 14:50:31 +0530 Subject: [PATCH 4/5] feat(CE): Open AI ai ml source connector (#547) Co-authored-by: afthab vp --- integrations/Gemfile.lock | 2 +- integrations/lib/multiwoven/integrations.rb | 1 + .../multiwoven/integrations/core/constants.rb | 2 + .../lib/multiwoven/integrations/rollout.rb | 3 +- .../source/http_model/config/meta.json | 2 +- .../integrations/source/open_ai/client.rb | 117 ++++++++ .../source/open_ai/config/catalog.json | 6 + .../source/open_ai/config/meta.json | 15 + .../source/open_ai/config/spec.json | 54 ++++ .../integrations/source/open_ai/icon.svg | 1 + .../source/open_ai/client_spec.rb | 260 ++++++++++++++++++ 11 files changed, 460 insertions(+), 3 deletions(-) create mode 100644 integrations/lib/multiwoven/integrations/source/open_ai/client.rb create mode 100644 integrations/lib/multiwoven/integrations/source/open_ai/config/catalog.json create mode 100644 integrations/lib/multiwoven/integrations/source/open_ai/config/meta.json create mode 100644 integrations/lib/multiwoven/integrations/source/open_ai/config/spec.json create mode 100644 integrations/lib/multiwoven/integrations/source/open_ai/icon.svg create mode 100644 integrations/spec/multiwoven/integrations/source/open_ai/client_spec.rb diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index f801d94b..58b9073a 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.15.11) + multiwoven-integrations (0.16.0) MailchimpMarketing activesupport async-websocket diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index 6c3ac56f..1218e832 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -73,6 +73,7 @@ require_relative "integrations/source/aws_sagemaker_model/client" require_relative "integrations/source/google_vertex_model/client" require_relative "integrations/source/http_model/client" +require_relative "integrations/source/open_ai/client" # Destination require_relative "integrations/destination/klaviyo/client" diff --git a/integrations/lib/multiwoven/integrations/core/constants.rb b/integrations/lib/multiwoven/integrations/core/constants.rb index fcabb17f..7a02845c 100644 --- a/integrations/lib/multiwoven/integrations/core/constants.rb +++ b/integrations/lib/multiwoven/integrations/core/constants.rb @@ -63,6 +63,8 @@ module Constants # google sheets GOOGLE_SHEETS_SCOPE = "https://www.googleapis.com/auth/drive" GOOGLE_SPREADSHEET_ID_REGEX = %r{/d/([-\w]{20,})/}.freeze + + OPEN_AI_URL = "https://api.openai.com/v1/chat/completions" end end end diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index f7dbce86..add3445d 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.15.11" + VERSION = "0.16.0" ENABLED_SOURCES = %w[ Snowflake @@ -20,6 +20,7 @@ module Integrations AwsSagemakerModel VertexModel HttpModel + OpenAI ].freeze ENABLED_DESTINATIONS = %w[ diff --git a/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json b/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json index f1e85784..373ea430 100644 --- a/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json +++ b/integrations/lib/multiwoven/integrations/source/http_model/config/meta.json @@ -4,7 +4,7 @@ "title": "HTTP Model Endpoint", "connector_type": "source", "category": "AI Model", - "documentation_url": "https://docs.mutliwoven.com", + "documentation_url": "https://docs.mutltiwoven.com", "github_issue_label": "source-http-model", "icon": "icon.svg", "license": "MIT", diff --git a/integrations/lib/multiwoven/integrations/source/open_ai/client.rb b/integrations/lib/multiwoven/integrations/source/open_ai/client.rb new file mode 100644 index 00000000..cce398de --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/open_ai/client.rb @@ -0,0 +1,117 @@ +# frozen_string_literal: true + +module Multiwoven::Integrations::Source + module OpenAI + include Multiwoven::Integrations::Core + class Client < SourceConnector + def check_connection(connection_config) + connection_config = prepare_config(connection_config) + response = send_request( + url: OPEN_AI_URL, + http_method: HTTP_POST, + payload: JSON.parse(connection_config[:request_format]), + headers: auth_headers(connection_config[:api_key]), + config: connection_config[:config] + ) + success?(response) ? success_status : failure_status(nil) + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:CHECK_CONNECTION:EXCEPTION", type: "error" }) + failure_status(e) + end + + def discover(_connection_config = nil) + catalog_json = read_json(CATALOG_SPEC_PATH) + catalog = build_catalog(catalog_json) + catalog.to_multiwoven_message + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:DISCOVER:EXCEPTION", type: "error" }) + end + + def read(sync_config) + connection_config = prepare_config(sync_config.source.connection_specification) + stream = connection_config[:is_stream] ||= false + # The server checks the ConnectorQueryType. + # If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol. + # This query is then sent to the AI/ML model. + payload = parse_json(sync_config.model.query) + + if stream + run_model_stream(connection_config, payload) { |message| yield message if block_given? } + else + run_model(connection_config, payload) + end + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:READ:EXCEPTION", type: "error" }) + end + + private + + def prepare_config(config) + config.with_indifferent_access.tap do |conf| + conf[:config][:timeout] ||= 30 + end + end + + def parse_json(json_string) + JSON.parse(json_string) + rescue JSON::ParserError => e + handle_exception(e, { context: "OPEN AI:PARSE_JSON:EXCEPTION", type: "error" }) + {} + end + + def run_model(connection_config, payload) + response = send_request( + url: OPEN_AI_URL, + http_method: HTTP_POST, + payload: payload, + headers: auth_headers(connection_config[:api_key]), + config: connection_config[:config] + ) + process_response(response) + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:RUN_MODEL:EXCEPTION", type: "error" }) + end + + def run_model_stream(connection_config, payload) + send_streaming_request( + url: OPEN_AI_URL, + http_method: HTTP_POST, + payload: payload, + headers: auth_headers(connection_config[:api_key]), + config: connection_config[:config] + ) do |chunk| + process_streaming_response(chunk) { |message| yield message if block_given? } + end + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:RUN_STREAM_MODEL:EXCEPTION", type: "error" }) + end + + def process_response(response) + if success?(response) + data = JSON.parse(response.body) + [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] + else + create_log_message("OPEN AI:RUN_MODEL", "error", "request failed: #{response.body}") + end + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:PROCESS_RESPONSE:EXCEPTION", type: "error" }) + end + + def extract_data_entries(chunk) + chunk.split(/^data: /).map(&:strip).reject(&:empty?) + end + + def process_streaming_response(chunk) + data_entries = extract_data_entries(chunk) + data_entries.each do |entry| + next if entry == "[DONE]" + + data = parse_json(entry) + yield [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] if block_given? + rescue StandardError => e + handle_exception(e, { context: "OPEN AI:PROCESS_STREAMING_RESPONSE:EXCEPTION", type: "error", entry: entry }) + end + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/source/open_ai/config/catalog.json b/integrations/lib/multiwoven/integrations/source/open_ai/config/catalog.json new file mode 100644 index 00000000..dacb788b --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/open_ai/config/catalog.json @@ -0,0 +1,6 @@ +{ + "request_rate_limit": 600, + "request_rate_limit_unit": "minute", + "request_rate_concurrency": 10, + "streams": [] +} diff --git a/integrations/lib/multiwoven/integrations/source/open_ai/config/meta.json b/integrations/lib/multiwoven/integrations/source/open_ai/config/meta.json new file mode 100644 index 00000000..8cb0b153 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/open_ai/config/meta.json @@ -0,0 +1,15 @@ +{ + "data": { + "name": "OpenAI", + "title": "OpenAI Model Endpoint", + "connector_type": "source", + "category": "AI Model", + "documentation_url": "https://docs.mutltiwoven.com", + "github_issue_label": "source-open-ai-model", + "icon": "icon.svg", + "license": "MIT", + "release_stage": "alpha", + "support_level": "community", + "tags": ["language:ruby", "multiwoven"] + } +} diff --git a/integrations/lib/multiwoven/integrations/source/open_ai/config/spec.json b/integrations/lib/multiwoven/integrations/source/open_ai/config/spec.json new file mode 100644 index 00000000..b123a89c --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/open_ai/config/spec.json @@ -0,0 +1,54 @@ +{ + "documentation_url": "https://docs.multiwoven.com/integrations/source/open-ai-endpoint", + "stream_type": "user_defined", + "connector_query_type": "ai_ml", + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Open AI Endpoint", + "type": "object", + "required": ["api_key", "request_format", "response_format"], + "properties": { + "api_key": { + "type": "string", + "multiwoven_secret": true, + "title": "API Key", + "order": 0 + }, + "is_stream": { + "type": "boolean", + "title": "Streaming Enabled", + "description": "Enables data streaming for such as chat, when supported by the model. When true, messages and model data are processed in chunks for immediate delivery, enhancing responsiveness. Default is false, processing only after the entire response is received.", + "default": false, + "order": 1 + }, + "config": { + "title": "", + "type": "object", + "properties": { + "timeout": { + "type": "string", + "default": "30", + "title": "HTTP Timeout", + "description": "The maximum time, in seconds, to wait for a response from the server before the request is canceled.", + "order": 0 + } + }, + "order": 2 + }, + "request_format": { + "title": "Request Format", + "description": "Sample Request Format", + "type": "string", + "x-request-format": true, + "order": 3 + }, + "response_format": { + "title": "Response Format", + "description": "Sample Response Format", + "type": "string", + "x-response-format": true, + "order": 4 + } + } + } +} diff --git a/integrations/lib/multiwoven/integrations/source/open_ai/icon.svg b/integrations/lib/multiwoven/integrations/source/open_ai/icon.svg new file mode 100644 index 00000000..859d7af3 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/source/open_ai/icon.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/integrations/spec/multiwoven/integrations/source/open_ai/client_spec.rb b/integrations/spec/multiwoven/integrations/source/open_ai/client_spec.rb new file mode 100644 index 00000000..0ccf4a85 --- /dev/null +++ b/integrations/spec/multiwoven/integrations/source/open_ai/client_spec.rb @@ -0,0 +1,260 @@ +# frozen_string_literal: true + +RSpec.describe Multiwoven::Integrations::Source::OpenAI::Client do + include WebMock::API + + before(:each) do + WebMock.disable_net_connect!(allow_localhost: true) + end + + let(:client) { described_class.new } + let(:mock_http_session) { double("Net::Http::Session") } + let(:api_key) { "test_api_key" } + let(:payload) do + { + queries: "Hello there" + } + end + + let(:sync_config_json) do + { + source: { + name: "DestinationConnectorName", + type: "destination", + connection_specification: { + api_key: api_key, + config: { + timeout: 25 + }, + request_format: payload.to_json + } + }, + destination: { + name: "Http", + type: "destination", + connection_specification: { + example_destination_key: "example_destination_value" + } + }, + model: { + name: "ExampleModel", + query: payload.to_json, + query_type: "ai_ml", + primary_key: "id" + }, + stream: { + name: "example_stream", + json_schema: { "field1": "type1" }, + request_method: "POST", + request_rate_limit: 4, + rate_limit_unit_seconds: 1 + }, + sync_mode: "full_refresh", + cursor_field: "timestamp", + destination_sync_mode: "upsert", + sync_id: "1" + } + end + + let(:sync_config) { Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) } + let(:sync_config_stream) do + sync_config_json[:source][:connection_specification][:is_stream] = true + Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) + end + before do + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + end + let(:headers) do + { + "Accept" => "application/json", + "Authorization" => "Bearer #{api_key}", + "Content-Type" => "application/json" + } + end + let(:endpoint) { "https://api.openai.com/v1/chat/completions" } + + describe "#check_connection" do + context "when the connection is successful" do + let(:response_body) { { "message" => "success" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "200", "Unauthorized") + response.content_type = "application/json" + config = sync_config_json[:source][:connection_specification][:config] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(endpoint, + "POST", + payload: JSON.parse(payload.to_json), + headers: headers, + config: config) + .and_return(response) + end + + it "returns a successful connection status" do + response = client.check_connection(sync_config_json[:source][:connection_specification]) + expect(response).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(response.connection_status.status).to eq("succeeded") + end + end + + context "when the connection fails" do + let(:response_body) { { "message" => "failed" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized") + response.content_type = "application/json" + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(endpoint, + "POST", + headers: headers) + .and_return(response) + end + + it "returns a failed connection status with an error message" do + response = client.check_connection(sync_config_json[:source][:connection_specification]) + + expect(response).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(response.connection_status.status).to eq("failed") + end + end + end + + describe "#discover" do + it "successfully returns the catalog message" do + message = client.discover(nil) + catalog = message.catalog + expect(catalog).to be_a(Multiwoven::Integrations::Protocol::Catalog) + expect(catalog.request_rate_limit).to eql(600) + expect(catalog.request_rate_limit_unit).to eql("minute") + expect(catalog.request_rate_concurrency).to eql(10) + end + + it "handles exceptions during discovery" do + allow(client).to receive(:read_json).and_raise(StandardError.new("test error")) + expect(client).to receive(:handle_exception).with( + an_instance_of(StandardError), + hash_including(context: "OPEN AI:DISCOVER:EXCEPTION", type: "error") + ) + client.discover + end + end + + describe "#read" do + context "when the read is successful" do + let(:response_body) { { "message" => "Hello! how can I help" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "200", "success") + response.content_type = "application/json" + config = sync_config_json[:source][:connection_specification][:config] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(endpoint, + "POST", + payload: JSON.parse(payload.to_json), + headers: headers, + config: config) + .and_return(response) + end + + it "successfully reads records" do + records = client.read(sync_config) + expect(records).to be_an(Array) + expect(records.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(records.first.record.data).to eq(JSON.parse(response_body)) + end + end + + context "when the read operation fails" do + let(:response_body) { { "message" => "failed" }.to_json } + before do + response = Net::HTTPSuccess.new("1.1", "401", "Unauthorized") + response.content_type = "application/json" + config = sync_config_json[:source][:connection_specification][:config] + allow(response).to receive(:body).and_return(response_body) + allow(Multiwoven::Integrations::Core::HttpClient).to receive(:request) + .with(endpoint, + "POST", + headers: headers, + config: config) + .and_return(response) + end + + it "handles exceptions during reading" do + error_instance = StandardError.new("test error") + allow(client).to receive(:run_model).and_raise(error_instance) + expect(client).to receive(:handle_exception).with( + error_instance, + hash_including(context: "OPEN AI:READ:EXCEPTION", type: "error") + ) + + client.read(sync_config) + end + end + end + + describe "#read with is_stream = true" do + context "when the read is successful" do + before do + payload = sync_config_json[:model][:query] + streaming_chunk_first = <<~DATA + data: {"choices":[{"delta":{"content":"How I "}}]} + + data: {"choices":[{"delta":{"content":"can help "}}]} + DATA + streaming_chunk_second = "data: {\"choices\":[{\"delta\":{\"content\":\"you\"}}]}\n\n" + + allow(Multiwoven::Integrations::Core::StreamingHttpClient).to receive(:request) + .with(endpoint, + "POST", + payload: JSON.parse(payload), + headers: headers, + config: sync_config_json[:source][:connection_specification][:config]) + .and_yield(streaming_chunk_first) + .and_yield(streaming_chunk_second) + + response = Net::HTTPSuccess.new("1.1", "200", "success") + response.content_type = "application/json" + end + + it "streams data and processes chunks" do + results = [] + client.read(sync_config_stream) { |message| results << message } + expect(results.first).to be_an(Array) + expect(results.first.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results.first.first.record.data.dig("choices", 0, "delta", "content")).to eq("How I ") + + expect(results[1]).to be_an(Array) + expect(results[1].first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results[1].first.record.data.dig("choices", 0, "delta", "content")).to eq("can help ") + + expect(results[2]).to be_an(Array) + expect(results[2].first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results[2].first.record.data.dig("choices", 0, "delta", "content")).to eq("you") + end + end + + context "when streaming fails on a chunk" do + let(:streaming_chunk_first) { { "message" => "streaming data chunk 1" }.to_json } + + before do + config = sync_config_json[:source][:connection_specification][:config] + allow(Multiwoven::Integrations::Core::StreamingHttpClient).to receive(:request) + .with(endpoint, + "POST", + payload: JSON.parse(payload.to_json), + headers: headers, + config: config) + .and_yield(streaming_chunk_first) + .and_raise(StandardError, "Streaming error on chunk 2") + end + + it "handles streaming errors gracefully" do + results = [] + client.read(sync_config_stream) { |message| results << message } + expect(results.last).to be_an(Array) + expect(results.last.first.record).to be_a(Multiwoven::Integrations::Protocol::RecordMessage) + expect(results.last.first.record.data["message"]).to eq("streaming data chunk 1") + end + end + end +end From f30b2c3ab193426b2e3dc5d361ff00d55ce2ee79 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 27 Dec 2024 15:12:39 +0530 Subject: [PATCH 5/5] chore(CE): Update HTTP model spec (#551) Co-authored-by: TivonB-AI2 <124182151+TivonB-AI2@users.noreply.github.com> --- integrations/Gemfile.lock | 2 +- integrations/lib/multiwoven/integrations/rollout.rb | 2 +- .../multiwoven/integrations/source/http_model/config/spec.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index 58b9073a..cd85a166 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.16.0) + multiwoven-integrations (0.16.1) MailchimpMarketing activesupport async-websocket diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index add3445d..6f6409fb 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.16.0" + VERSION = "0.16.1" ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json b/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json index 9e83dc8b..194808b4 100644 --- a/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json +++ b/integrations/lib/multiwoven/integrations/source/http_model/config/spec.json @@ -6,7 +6,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "HTTP Model Endpoint", "type": "object", - "required": ["url_host"], + "required": ["url_host", "http_method"], "properties": { "http_method": { "type": "string",