diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index b81231bf..0809ed77 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.3.4) + multiwoven-integrations (0.3.5) activesupport async-websocket aws-sdk-athena @@ -326,6 +326,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x64-mingw-ucrt x86_64-darwin-23 diff --git a/integrations/lib/multiwoven/integrations/core/destination_connector.rb b/integrations/lib/multiwoven/integrations/core/destination_connector.rb index dff29687..f0595a31 100644 --- a/integrations/lib/multiwoven/integrations/core/destination_connector.rb +++ b/integrations/lib/multiwoven/integrations/core/destination_connector.rb @@ -9,6 +9,12 @@ def write(_sync_config, _records, _action = "destination_insert") raise "Not implemented" # return Protocol::TrackingMessage end + + def tracking_message(success, failure, log_message_array) + Multiwoven::Integrations::Protocol::TrackingMessage.new( + success: success, failed: failure, logs: log_message_array + ).to_multiwoven_message + end end end end diff --git a/integrations/lib/multiwoven/integrations/core/utils.rb b/integrations/lib/multiwoven/integrations/core/utils.rb index 32b4238e..e1078f70 100644 --- a/integrations/lib/multiwoven/integrations/core/utils.rb +++ b/integrations/lib/multiwoven/integrations/core/utils.rb @@ -53,6 +53,14 @@ def report_exception(exception, meta = {}) reporter&.report(exception, meta) end + def log_request_response(level, request, response) + Integrations::Protocol::LogMessage.new( + name: self.class.name, + level: level, + message: { request: request.to_s, response: response.to_s, level: level }.to_json + ) + end + def create_log_message(context, type, exception) Integrations::Protocol::LogMessage.new( name: context, diff --git a/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb b/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb index 89038783..6d943331 100644 --- a/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb +++ b/integrations/lib/multiwoven/integrations/destination/postgresql/client.rb @@ -46,6 +46,7 @@ def write(sync_config, records, action = "destination_insert") connection_config = sync_config.destination.connection_specification.with_indifferent_access table_name = sync_config.stream.name primary_key = sync_config.model.primary_key + log_message_array = [] db = create_connection(connection_config) write_success = 0 @@ -55,8 +56,9 @@ def write(sync_config, records, action = "destination_insert") query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key) logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}") begin - db.exec(query) + response = db.exec(query) write_success += 1 + log_message_array << log_request_response("info", query, response) rescue StandardError => e handle_exception(e, { context: "POSTGRESQL:RECORD:WRITE:EXCEPTION", @@ -65,9 +67,10 @@ def write(sync_config, records, action = "destination_insert") sync_run_id: sync_config.sync_run_id }) write_failure += 1 + log_message_array << log_request_response("error", query, e.message) end end - tracking_message(write_success, write_failure) + tracking_message(write_success, write_failure, log_message_array) rescue StandardError => e handle_exception(e, { context: "POSTGRESQL:RECORD:WRITE:EXCEPTION", @@ -119,12 +122,6 @@ def group_by_table(records) } end end - - def tracking_message(success, failure) - Multiwoven::Integrations::Protocol::TrackingMessage.new( - success: success, failed: failure - ).to_multiwoven_message - end end end end diff --git a/integrations/lib/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client.rb b/integrations/lib/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client.rb index 4754c4bc..dd22b70c 100644 --- a/integrations/lib/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client.rb +++ b/integrations/lib/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client.rb @@ -80,10 +80,14 @@ def process_records(records, stream) write_success = 0 write_failure = 0 properties = stream.json_schema[:properties] + log_message_array = [] + records.each do |record_object| record = extract_data(record_object, properties) - process_record(stream, record) + args = [stream.name, "Id", record] + response = send_data_to_salesforce(args) write_success += 1 + log_message_array << log_request_response("info", args, response) rescue StandardError => e # TODO: add sync_id and sync run id to the logs handle_exception(e, { @@ -93,18 +97,14 @@ def process_records(records, stream) sync_run_id: @sync_config.sync_run_id }) write_failure += 1 + log_message_array << log_request_response("error", args, e.message) end - tracking_message(write_success, write_failure) - end - - def process_record(stream, record) - send_data_to_salesforce(stream.name, record) + tracking_message(write_success, write_failure, log_message_array) end - def send_data_to_salesforce(stream_name, record = {}) + def send_data_to_salesforce(args) method_name = "upsert!" - args = [stream_name, "Id", record] - @logger.debug("sync_id: #{@sync_config.sync_id}, sync_run_id: #{@sync_config.sync_run_id}, record: #{record}") + @logger.debug("sync_id: #{@sync_config.sync_id}, sync_run_id: #{@sync_config.sync_run_id}, args: #{args}") @client.send(method_name, *args) end @@ -124,12 +124,6 @@ def load_catalog read_json(CATALOG_SPEC_PATH) end - def tracking_message(success, failure) - Multiwoven::Integrations::Protocol::TrackingMessage.new( - success: success, failed: failure - ).to_multiwoven_message - end - def log_debug(message) Multiwoven::Integrations::Service.logger.debug(message) end diff --git a/integrations/lib/multiwoven/integrations/protocol/protocol.rb b/integrations/lib/multiwoven/integrations/protocol/protocol.rb index b3381659..9b4c5efd 100644 --- a/integrations/lib/multiwoven/integrations/protocol/protocol.rb +++ b/integrations/lib/multiwoven/integrations/protocol/protocol.rb @@ -197,6 +197,7 @@ class TrackingMessage < ProtocolModel attribute :success, Types::Integer.default(0) attribute :failed, Types::Integer.default(0) attribute? :meta, Types::Hash + attribute? :logs, Types::Array.of(LogMessage) def to_multiwoven_message MultiwovenMessage.new( diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index a55b8e5c..b78c0746 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.3.4" + VERSION = "0.3.5" ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/spec/multiwoven/integrations/core/destination_connector_spec.rb b/integrations/spec/multiwoven/integrations/core/destination_connector_spec.rb index f28920dd..31cec0d8 100644 --- a/integrations/spec/multiwoven/integrations/core/destination_connector_spec.rb +++ b/integrations/spec/multiwoven/integrations/core/destination_connector_spec.rb @@ -17,6 +17,38 @@ module Integrations::Core expect { connector.write(sync_config, records) }.to raise_error("Not implemented") end end + + describe "#tracking_message" do + let(:log_message_data) do + Multiwoven::Integrations::Protocol::LogMessage.new( + name: self.class.name, + level: "info", + message: { request: "Sample req", response: "Sample req", level: "info" }.to_json + ) + end + it "returns a MultiwovenMessage with tracking information" do + connector = described_class.new + success = 2 + failure = 1 + + multiwoven_message = connector.tracking_message(success, failure, [log_message_data]) + + expect(multiwoven_message).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) + expect(multiwoven_message.type).to eq("tracking") + + tracking_message = multiwoven_message.tracking + expect(tracking_message).to be_a(Multiwoven::Integrations::Protocol::TrackingMessage) + expect(tracking_message.success).to eq(success) + expect(tracking_message.failed).to eq(failure) + + logs = tracking_message.logs + expect(logs).to be_an(Array) + expect(logs.size).to eq(1) + expect(logs.first).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(logs.first.level).to eq("info") + expect(logs.first.message).to eq("{\"request\":\"Sample req\",\"response\":\"Sample req\",\"level\":\"info\"}") + end + end end end end diff --git a/integrations/spec/multiwoven/integrations/core/utils_spec.rb b/integrations/spec/multiwoven/integrations/core/utils_spec.rb index 48c85e8d..028ae405 100644 --- a/integrations/spec/multiwoven/integrations/core/utils_spec.rb +++ b/integrations/spec/multiwoven/integrations/core/utils_spec.rb @@ -50,6 +50,24 @@ module Core expect(dummy_class.hash_to_string(hash)).to eq("key1 = 1, key2 = 2.5, key3 = true, key4 = ") end end + + describe "#log_request_response" do + let(:level) { "info" } + let(:request) { { user_id: 1, action: "create" } } + let(:response) { { status: "success" } } + + it "creates a LogMessage object with correct attributes" do + log_message = dummy_class.log_request_response(level, request, response) + + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eq(level) + + parsed_message = JSON.parse(log_message.message) + expect(parsed_message["request"]).to eq(request.to_s) + expect(parsed_message["response"]).to eq(response.to_s) + expect(parsed_message["level"]).to eq(level) + end + end end end end diff --git a/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb index 8860c6f0..b4155222 100644 --- a/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/destination/postgresql/client_spec.rb @@ -114,6 +114,12 @@ tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking expect(tracking.success).to eql(1) + log_message = tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") end it "write records successfully on update record action destination_update" do @@ -125,6 +131,13 @@ tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)], "destination_update").tracking expect(tracking.success).to eql(1) + expect(tracking.logs.count).to eql(1) + log_message = tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") end end @@ -139,6 +152,12 @@ tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking expect(tracking.failed).to eql(1) + expect(tracking.logs.count).to eql(1) + log_message = tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("error") + expect(log_message.message).to include("request") + expect(log_message.message).to include("\"response\":\"test error\"") end end end diff --git a/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client_spec.rb index 70c6769c..30e17a2b 100644 --- a/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/destination/salesforce_consumer_goods_cloud/client_spec.rb @@ -194,6 +194,13 @@ response = client.write(sync_config, records) expect(response.tracking.success).to eq(records.size) expect(response.tracking.failed).to eq(0) + expect(response.tracking.logs.count).to eql(2) + log_message = response.tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") end end @@ -207,6 +214,13 @@ response = client.write(sync_config, records) expect(response.tracking.failed).to eq(records.size) expect(response.tracking.success).to eq(0) + expect(response.tracking.logs.count).to eql(2) + log_message = response.tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("error") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") end end end diff --git a/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb b/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb index e2735380..bcb9e589 100644 --- a/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb +++ b/integrations/spec/multiwoven/integrations/protocol/protocol_spec.rb @@ -457,16 +457,26 @@ module Integrations::Protocol RSpec.describe Multiwoven::Integrations::Protocol::TrackingMessage do describe "#to_multiwoven_message" do + let(:log_message_data) do + Multiwoven::Integrations::Protocol::LogMessage.new( + name: self.class.name, + level: "info", + message: { request: "Sample req", response: "Sample req", level: "info" }.to_json + ) + end let(:tracking_message) do - Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 3, failed: 1) + Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 3, failed: 1, logs: [log_message_data]) end it "converts to a MultiwovenMessage" do multiwoven_message = tracking_message.to_multiwoven_message - expect(multiwoven_message).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage) expect(multiwoven_message.type).to eq("tracking") expect(multiwoven_message.tracking).to eq(tracking_message) + expect(multiwoven_message.tracking.logs.first).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(multiwoven_message.tracking.logs.first.level).to eq("info") + expect(multiwoven_message.tracking.logs.first.message) + .to eq("{\"request\":\"Sample req\",\"response\":\"Sample req\",\"level\":\"info\"}") end end end