Skip to content

Commit

Permalink
feat(CE): integration changes for sync record log (#223)
Browse files Browse the repository at this point in the history
* Resolve conflict in cherry-pick of 32aaa10b522739667ba42574b646b6b9ad6e80a3 and change the commit message

* chore(CE): conflict resolve

* chore(CE): update gem lock

---------

Co-authored-by: afthab vp <[email protected]>
Co-authored-by: afthab vp <[email protected]>
  • Loading branch information
3 people authored Jul 4, 2024
1 parent 800fe3d commit eebc9dc
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 27 deletions.
3 changes: 2 additions & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.3.4)
multiwoven-integrations (0.3.5)
activesupport
async-websocket
aws-sdk-athena
Expand Down Expand Up @@ -326,6 +326,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x64-mingw-ucrt
x86_64-darwin-23

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ def write(_sync_config, _records, _action = "destination_insert")
raise "Not implemented"
# return Protocol::TrackingMessage
end

def tracking_message(success, failure, log_message_array)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure, logs: log_message_array
).to_multiwoven_message
end
end
end
end
8 changes: 8 additions & 0 deletions integrations/lib/multiwoven/integrations/core/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def report_exception(exception, meta = {})
reporter&.report(exception, meta)
end

def log_request_response(level, request, response)
Integrations::Protocol::LogMessage.new(
name: self.class.name,
level: level,
message: { request: request.to_s, response: response.to_s, level: level }.to_json
)
end

def create_log_message(context, type, exception)
Integrations::Protocol::LogMessage.new(
name: context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
log_message_array = []
db = create_connection(connection_config)

write_success = 0
Expand All @@ -55,8 +56,9 @@ def write(sync_config, records, action = "destination_insert")
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
db.exec(query)
response = db.exec(query)
write_success += 1
log_message_array << log_request_response("info", query, response)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
Expand All @@ -65,9 +67,10 @@ def write(sync_config, records, action = "destination_insert")
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", query, e.message)
end
end
tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
Expand Down Expand Up @@ -119,12 +122,6 @@ def group_by_table(records)
}
end
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ def process_records(records, stream)
write_success = 0
write_failure = 0
properties = stream.json_schema[:properties]
log_message_array = []

records.each do |record_object|
record = extract_data(record_object, properties)
process_record(stream, record)
args = [stream.name, "Id", record]
response = send_data_to_salesforce(args)
write_success += 1
log_message_array << log_request_response("info", args, response)
rescue StandardError => e
# TODO: add sync_id and sync run id to the logs
handle_exception(e, {
Expand All @@ -93,18 +97,14 @@ def process_records(records, stream)
sync_run_id: @sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", args, e.message)
end
tracking_message(write_success, write_failure)
end

def process_record(stream, record)
send_data_to_salesforce(stream.name, record)
tracking_message(write_success, write_failure, log_message_array)
end

def send_data_to_salesforce(stream_name, record = {})
def send_data_to_salesforce(args)
method_name = "upsert!"
args = [stream_name, "Id", record]
@logger.debug("sync_id: #{@sync_config.sync_id}, sync_run_id: #{@sync_config.sync_run_id}, record: #{record}")
@logger.debug("sync_id: #{@sync_config.sync_id}, sync_run_id: #{@sync_config.sync_run_id}, args: #{args}")
@client.send(method_name, *args)
end

Expand All @@ -124,12 +124,6 @@ def load_catalog
read_json(CATALOG_SPEC_PATH)
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end

def log_debug(message)
Multiwoven::Integrations::Service.logger.debug(message)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class TrackingMessage < ProtocolModel
attribute :success, Types::Integer.default(0)
attribute :failed, Types::Integer.default(0)
attribute? :meta, Types::Hash
attribute? :logs, Types::Array.of(LogMessage)

def to_multiwoven_message
MultiwovenMessage.new(
Expand Down
2 changes: 1 addition & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.3.4"
VERSION = "0.3.5"

ENABLED_SOURCES = %w[
Snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,38 @@ module Integrations::Core
expect { connector.write(sync_config, records) }.to raise_error("Not implemented")
end
end

describe "#tracking_message" do
let(:log_message_data) do
Multiwoven::Integrations::Protocol::LogMessage.new(
name: self.class.name,
level: "info",
message: { request: "Sample req", response: "Sample req", level: "info" }.to_json
)
end
it "returns a MultiwovenMessage with tracking information" do
connector = described_class.new
success = 2
failure = 1

multiwoven_message = connector.tracking_message(success, failure, [log_message_data])

expect(multiwoven_message).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
expect(multiwoven_message.type).to eq("tracking")

tracking_message = multiwoven_message.tracking
expect(tracking_message).to be_a(Multiwoven::Integrations::Protocol::TrackingMessage)
expect(tracking_message.success).to eq(success)
expect(tracking_message.failed).to eq(failure)

logs = tracking_message.logs
expect(logs).to be_an(Array)
expect(logs.size).to eq(1)
expect(logs.first).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(logs.first.level).to eq("info")
expect(logs.first.message).to eq("{\"request\":\"Sample req\",\"response\":\"Sample req\",\"level\":\"info\"}")
end
end
end
end
end
18 changes: 18 additions & 0 deletions integrations/spec/multiwoven/integrations/core/utils_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ module Core
expect(dummy_class.hash_to_string(hash)).to eq("key1 = 1, key2 = 2.5, key3 = true, key4 = ")
end
end

describe "#log_request_response" do
let(:level) { "info" }
let(:request) { { user_id: 1, action: "create" } }
let(:response) { { status: "success" } }

it "creates a LogMessage object with correct attributes" do
log_message = dummy_class.log_request_response(level, request, response)

expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eq(level)

parsed_message = JSON.parse(log_message.message)
expect(parsed_message["request"]).to eq(request.to_s)
expect(parsed_message["response"]).to eq(response.to_s)
expect(parsed_message["level"]).to eq(level)
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@

tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking
expect(tracking.success).to eql(1)
log_message = tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end

it "write records successfully on update record action destination_update" do
Expand All @@ -125,6 +131,13 @@

tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)], "destination_update").tracking
expect(tracking.success).to eql(1)
expect(tracking.logs.count).to eql(1)
log_message = tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

Expand All @@ -139,6 +152,12 @@

tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking
expect(tracking.failed).to eql(1)
expect(tracking.logs.count).to eql(1)
log_message = tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")
expect(log_message.message).to include("request")
expect(log_message.message).to include("\"response\":\"test error\"")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@
response = client.write(sync_config, records)
expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
expect(response.tracking.logs.count).to eql(2)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

Expand All @@ -207,6 +214,13 @@
response = client.write(sync_config, records)
expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
expect(response.tracking.logs.count).to eql(2)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,26 @@ module Integrations::Protocol

RSpec.describe Multiwoven::Integrations::Protocol::TrackingMessage do
describe "#to_multiwoven_message" do
let(:log_message_data) do
Multiwoven::Integrations::Protocol::LogMessage.new(
name: self.class.name,
level: "info",
message: { request: "Sample req", response: "Sample req", level: "info" }.to_json
)
end
let(:tracking_message) do
Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 3, failed: 1)
Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 3, failed: 1, logs: [log_message_data])
end

it "converts to a MultiwovenMessage" do
multiwoven_message = tracking_message.to_multiwoven_message

expect(multiwoven_message).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
expect(multiwoven_message.type).to eq("tracking")
expect(multiwoven_message.tracking).to eq(tracking_message)
expect(multiwoven_message.tracking.logs.first).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(multiwoven_message.tracking.logs.first.level).to eq("info")
expect(multiwoven_message.tracking.logs.first.message)
.to eq("{\"request\":\"Sample req\",\"response\":\"Sample req\",\"level\":\"info\"}")
end
end
end
Expand Down

0 comments on commit eebc9dc

Please sign in to comment.