Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CE): stream support in http model #532

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.15.10)
multiwoven-integrations (0.15.11)
MailchimpMarketing
activesupport
async-websocket
Expand Down
2 changes: 2 additions & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
require_relative "integrations/core/base_connector"
require_relative "integrations/core/source_connector"
require_relative "integrations/core/destination_connector"
require_relative "integrations/core/http_helper"
require_relative "integrations/core/http_client"
require_relative "integrations/core/streaming_http_client"
require_relative "integrations/core/query_builder"

# Source
Expand Down
30 changes: 2 additions & 28 deletions integrations/lib/multiwoven/integrations/core/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,14 @@
module Multiwoven
module Integrations::Core
class HttpClient
extend HttpHelper
class << self
def request(url, method, payload: nil, headers: {}, config: {})
uri = URI(url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = (uri.scheme == "https")

# Set timeout if provided
if config[:timeout]
timeout_value = config[:timeout].to_f
http.open_timeout = timeout_value
http.read_timeout = timeout_value
end

http = configure_http(uri, config)
request = build_request(method, uri, payload, headers)
http.request(request)
end

private

def build_request(method, uri, payload, headers)
request_class = case method.upcase
when Constants::HTTP_GET then Net::HTTP::Get
when Constants::HTTP_POST then Net::HTTP::Post
when Constants::HTTP_PUT then Net::HTTP::Put
when Constants::HTTP_PATCH then Net::HTTP::Patch
when Constants::HTTP_DELETE then Net::HTTP::Delete
else raise ArgumentError, "Unsupported HTTP method: #{method}"
end

request = request_class.new(uri)
headers.each { |key, value| request[key] = value }
request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase)
request
end
end
end
end
Expand Down
36 changes: 36 additions & 0 deletions integrations/lib/multiwoven/integrations/core/http_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# frozen_string_literal: true

module Multiwoven
module Integrations::Core
module HttpHelper
def build_request(method, uri, payload, headers)
request_class = case method.upcase
when Constants::HTTP_GET then Net::HTTP::Get
when Constants::HTTP_POST then Net::HTTP::Post
when Constants::HTTP_PUT then Net::HTTP::Put
when Constants::HTTP_PATCH then Net::HTTP::Patch
when Constants::HTTP_DELETE then Net::HTTP::Delete
else raise ArgumentError, "Unsupported HTTP method: #{method}"
end

request = request_class.new(uri)
headers.each { |key, value| request[key] = value }
request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase)
request
end

def configure_http(uri, config)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = (uri.scheme == "https")

if config[:timeout]
timeout_value = config[:timeout].to_f
http.open_timeout = timeout_value
http.read_timeout = timeout_value
end

http
end
end
end
end
22 changes: 22 additions & 0 deletions integrations/lib/multiwoven/integrations/core/source_connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,28 @@ def batched_query(sql_query, limit, offset)
# Appending the LIMIT and OFFSET clauses to the SQL query
"#{sql_query} LIMIT #{limit} OFFSET #{offset}"
end

def send_request(options = {})
Multiwoven::Integrations::Core::HttpClient.request(
options[:url],
options[:http_method],
payload: options[:payload],
headers: options[:headers],
config: options[:config]
)
end

def send_streaming_request(options = {})
Multiwoven::Integrations::Core::StreamingHttpClient.request(
options[:url],
options[:http_method],
payload: options[:payload],
headers: options[:headers],
config: options[:config]
) do |chunk|
yield chunk if block_given? # Pass each chunk for processing (streaming response)
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module Multiwoven
module Integrations::Core
class StreamingHttpClient
extend HttpHelper
class << self
def request(url, method, payload: nil, headers: {}, config: {})
uri = URI(url)
http = configure_http(uri, config)
request = build_request(method, uri, payload, headers)
http.request(request) do |response|
response.read_body do |chunk|
yield chunk if block_given? # Pass each response chunk
end
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.15.10"
VERSION = "0.15.11"

ENABLED_SOURCES = %w[
Snowflake
Expand Down
108 changes: 63 additions & 45 deletions integrations/lib/multiwoven/integrations/source/http_model/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,17 @@ module HttpModel
include Multiwoven::Integrations::Core
class Client < SourceConnector
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
url_host = connection_config[:url_host]
http_method = connection_config[:http_method]
headers = connection_config[:headers]
payload = JSON.parse(connection_config[:request_format])
config = connection_config[:config]
config[:timeout] ||= 30
response = send_request(url_host, http_method, payload, headers, config)
if success?(response)
success_status
else
failure_status(nil)
end
connection_config = prepare_config(connection_config)
response = send_request(
url: connection_config[:url_host],
http_method: connection_config[:http_method],
payload: JSON.parse(connection_config[:request_format]),
headers: connection_config[:headers],
config: connection_config[:config]
)
success?(response) ? success_status : failure_status(nil)
rescue StandardError => e
handle_exception(e, {
context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
handle_exception(e, { context: "HTTP MODEL:CHECK_CONNECTION:EXCEPTION", type: "error" })
failure_status(e)
end

Expand All @@ -31,40 +24,66 @@ def discover(_connection_config = nil)
catalog = build_catalog(catalog_json)
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "HTTP MODEL:DISCOVER:EXCEPTION",
type: "error"
})
handle_exception(e, { context: "HTTP MODEL:DISCOVER:EXCEPTION", type: "error" })
end

def read(sync_config)
connection_config = sync_config.source.connection_specification
connection_config = connection_config.with_indifferent_access
connection_config = prepare_config(sync_config.source.connection_specification)
stream = connection_config[:is_stream] ||= false
# The server checks the ConnectorQueryType.
# If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol.
# This query is then sent to the AI/ML model.
payload = JSON.parse(sync_config.model.query)
run_model(connection_config, payload)
payload = parse_json(sync_config.model.query)

if stream
run_model_stream(connection_config, payload) { |message| yield message if block_given? }
else
run_model(connection_config, payload)
end
rescue StandardError => e
handle_exception(e, {
context: "HTTP MODEL:READ:EXCEPTION",
type: "error"
})
handle_exception(e, { context: "HTTP MODEL:READ:EXCEPTION", type: "error" })
end

private

def prepare_config(config)
config.with_indifferent_access.tap do |conf|
conf[:config][:timeout] ||= 30
end
end

def parse_json(json_string)
JSON.parse(json_string)
rescue JSON::ParserError => e
handle_exception(e, { context: "HTTP MODEL:PARSE_JSON:EXCEPTION", type: "error" })
{}
end

def run_model(connection_config, payload)
connection_config = connection_config.with_indifferent_access
url_host = connection_config[:url_host]
headers = connection_config[:headers]
config = connection_config[:config]
http_method = connection_config[:http_method]
config[:timeout] ||= 30
response = send_request(url_host, http_method, payload, headers, config)
response = send_request(
url: connection_config[:url_host],
http_method: connection_config[:http_method],
payload: payload,
headers: connection_config[:headers],
config: connection_config[:config]
)
process_response(response)
rescue StandardError => e
handle_exception(e, context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error")
handle_exception(e, { context: "HTTP MODEL:RUN_MODEL:EXCEPTION", type: "error" })
end

def run_model_stream(connection_config, payload)
send_streaming_request(
url: connection_config[:url_host],
http_method: connection_config[:http_method],
payload: payload,
headers: connection_config[:headers],
config: connection_config[:config]
) do |chunk|
process_streaming_response(chunk) { |message| yield message if block_given? }
end
rescue StandardError => e
handle_exception(e, { context: "HTTP MODEL:RUN_STREAM_MODEL:EXCEPTION", type: "error" })
end

def process_response(response)
Expand All @@ -74,16 +93,15 @@ def process_response(response)
else
create_log_message("HTTP MODEL:RUN_MODEL", "error", "request failed: #{response.body}")
end
rescue StandardError => e
handle_exception(e, { context: "HTTP MODEL:PROCESS_RESPONSE:EXCEPTION", type: "error" })
end

def send_request(url, http_method, payload, headers, config)
Multiwoven::Integrations::Core::HttpClient.request(
url,
http_method,
payload: payload,
headers: headers,
config: config
)
def process_streaming_response(chunk)
data = JSON.parse(chunk)
yield [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] if block_given?
rescue StandardError => e
handle_exception(e, { context: "HTTP MODEL:PROCESS_STREAMING_RESPONSE:EXCEPTION", type: "error" })
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@
"title": "URL",
"order": 1
},
"is_stream": {
"type": "boolean",
"title": "Streaming Enabled",
"description": "Enables data streaming for such as chat, when supported by the model. When true, messages and model data are processed in chunks for immediate delivery, enhancing responsiveness. Default is false, processing only after the entire response is received.",
"default": false,
"order": 2
},
"headers": {
"title": "HTTP Headers",
"description": "Custom headers to include in the HTTP request. Useful for authentication, content type specifications, and other request metadata.",
"order": 2,
"order": 3,
"additionalProperties": {
"type": "string"
},
Expand All @@ -42,21 +49,21 @@
"order": 0
}
},
"order": 3
"order": 4
},
"request_format": {
"title": "Request Format",
"description": "Sample Request Format",
"type": "string",
"x-request-format": true,
"order": 4
"order": 5
},
"response_format": {
"title": "Response Format",
"description": "Sample Response Format",
"type": "string",
"x-response-format": true,
"order": 5
"order": 6
}
}
}
Expand Down
Loading