Skip to content

Commit

Permalink
Merge branch 'main' into chore/add-request-response-log-airtable
Browse files Browse the repository at this point in the history
  • Loading branch information
TivonB-AI2 authored Aug 12, 2024
2 parents fbd2c57 + d022a23 commit fe27a80
Show file tree
Hide file tree
Showing 44 changed files with 474 additions and 591 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,15 @@ def extract_spreadsheet_id(link)

# Batch has a limit of sending 2MB data. So creating a chunk of records to meet that limit
def process_record_chunks(records, sync_config)
log_message_array = []
write_success = 0
write_failure = 0

records.each_slice(MAX_CHUNK_SIZE) do |chunk|
values = prepare_chunk_values(chunk, sync_config.stream)
update_sheet_values(values, sync_config.stream.name)
request, response = *update_sheet_values(values, sync_config.stream.name)
write_success += values.size
log_message_array << log_request_response("info", request, response)
rescue StandardError => e
handle_exception(e, {
context: "GOOGLE_SHEETS:RECORD:WRITE:EXCEPTION",
Expand All @@ -168,9 +170,9 @@ def process_record_chunks(records, sync_config)
sync_run_id: sync_config.sync_run_id
})
write_failure += chunk.size
log_message_array << log_request_response("error", request, e.message)
end

tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
end

# We need to format the data to adhere to google sheets API format. This converts the sync mapped data to 2D array format expected by google sheets API
Expand Down Expand Up @@ -199,19 +201,14 @@ def update_sheet_values(values, stream_name)
)

# TODO: Remove & this is added for the test to pass we need
@client&.batch_update_values(@spreadsheet_id, batch_update_request)
response = @client&.batch_update_values(@spreadsheet_id, batch_update_request)
[batch_update_request, response]
end

def load_catalog
read_json(CATALOG_SPEC_PATH)
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end

def delete_extra_sheets(sheet_ids)
# Leave one sheet intact as a spreadsheet must have at least one sheet.
# Delete all other sheets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,31 +54,35 @@ def initialize_client(connection_config)
end

def process_records(records, stream)
log_message_array = []
write_success = 0
write_failure = 0
records.each do |record_object|
record = extract_data(record_object, stream.json_schema[:properties])
response = process_stream(record, stream)
request, response = *process_stream(record, stream)
if response.success?
write_success += 1
else
write_failure += 1
end
log_message_array << log_request_response("info", request, response.body)
rescue StandardError => e
handle_exception("ITERABLE:WRITE:EXCEPTION", "error", e)
write_failure += 1
log_message_array << log_request_response("error", request, e.message)
end
tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
end

def process_stream(record, stream)
klass = ::Iterable.const_get(stream.name).new(*initialize_params(stream, record))
item_attrs = initialize_attribute(stream, record)
if stream.name == "CatalogItems"
klass.send(@action, item_attrs)
else
klass.send(@action)
end
response = if stream.name == "CatalogItems"
klass.send("create", item_attrs)
else
klass.send("create")
end
[item_attrs, response]
end

def initialize_params(stream, record)
Expand All @@ -97,12 +101,6 @@ def initialize_attribute(stream, record)
end
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end

def load_catalog
read_json(CATALOG_SPEC_PATH)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def write(sync_config, records, action = "destination_insert")
primary_key = sync_config.model.primary_key
db = create_connection(connection_config)

log_message_array = []
write_success = 0
write_failure = 0

Expand All @@ -50,6 +51,7 @@ def write(sync_config, records, action = "destination_insert")
begin
db.run(query)
write_success += 1
log_message_array << log_request_response("info", query, "Successful")
rescue StandardError => e
handle_exception(e, {
context: "MARIA:DB:RECORD:WRITE:EXCEPTION",
Expand All @@ -58,9 +60,10 @@ def write(sync_config, records, action = "destination_insert")
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", query, e.message)
end
end
tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
handle_exception(e, {
context: "MARIA:DB:RECORD:WRITE:EXCEPTION",
Expand Down Expand Up @@ -106,12 +109,6 @@ def group_by_table(records)
{ tablename: entries.first[:tablename], columns: entries.flat_map { |entry| entry[:columns] } }
end
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ def configure_slack(api_token)
end

def process_records(records, stream)
log_message_array = []
write_success = 0
write_failure = 0
records.each do |record_object|
process_record(stream, record_object.with_indifferent_access)
request, response = *process_record(stream, record_object.with_indifferent_access)
write_success += 1
log_message_array << log_request_response("info", request, response)
rescue StandardError => e
write_failure += 1
handle_exception(e, {
Expand All @@ -70,8 +72,9 @@ def process_records(records, stream)
sync_id: @sync_config.sync_id,
sync_run_id: @sync_config.sync_run_id
})
log_message_array << log_request_response("error", request, e.message)
end
tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
end

def process_record(stream, record)
Expand All @@ -80,7 +83,8 @@ def process_record(stream, record)

def send_data_to_slack(stream_name, record = {})
args = build_args(stream_name, record)
@client.send(stream_name, **args)
response = @client.send(stream_name, **args)
[args, response]
end

def build_args(stream_name, record)
Expand Down Expand Up @@ -114,12 +118,6 @@ def failure_status(error)
def load_catalog
read_json(CATALOG_SPEC_PATH)
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,24 @@ def authenticate_client
end

def process_records(records, stream)
log_message_array = []
write_success = 0
write_failure = 0

records.each do |record|
zendesk_data = prepare_record_data(record, stream.name)
plural_stream_name = pluralize_stream_name(stream.name.downcase)
args = [plural_stream_name, @action, zendesk_data]

if @action == "create"
@client.send(plural_stream_name).create!(zendesk_data)
response = @client.send(plural_stream_name).create!(zendesk_data)
else
existing_record = @client.send(plural_stream_name).find(id: record[:id])
existing_record.update!(zendesk_data)
response = existing_record.update!(zendesk_data)
end

write_success += 1
log_message_array << log_request_response("info", args, response)
rescue StandardError => e
handle_exception(e, {
context: "ZENDESK:WRITE:EXCEPTION",
Expand All @@ -87,9 +90,9 @@ def process_records(records, stream)
sync_run_id: @sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", args, e.message)
end

tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
end

def pluralize_stream_name(name)
Expand Down Expand Up @@ -122,12 +125,6 @@ def prepare_record_data(record, type)
def load_catalog
read_json(CATALOG_SPEC_PATH)
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,31 @@

expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

context "when the write operation fails" do
before do
batch_update_request = instance_double(Google::Apis::SheetsV4::BatchUpdateValuesRequest)
allow(google_sheets_service).to receive(:batch_update_values)
.with(@spreadsheet_id, batch_update_request)
.and_raise(Google::Apis::ClientError.new("Invalid request"))
allow(@client).to receive(:update_sheet_values).and_raise(StandardError.new("Failed to update_sheet_values"))
end

it "increments the failure count" do
response = client.write(sync_config, records)

expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
describe "#write" do
context "when the write operation is successful" do
before do
allow_any_instance_of(::Iterable::CatalogItems).to receive(:create).and_return(double(success?: true))
allow_any_instance_of(::Iterable::CatalogItems).to receive(:create).and_return(double(success?: true, body: 1))
end

it "increments the success count" do
Expand All @@ -117,12 +117,18 @@
response = client.write(sync_config, records)
expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

context "when the write operation fails" do
before do
allow_any_instance_of(::Iterable::CatalogItems).to receive(:create).and_return(double(success?: false))
allow_any_instance_of(::Iterable::CatalogItems).to receive(:create).and_return(double(success?: false, body: 1))
end
it "increments the failure count" do
sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(
Expand All @@ -131,6 +137,12 @@
response = client.write(sync_config, records)
expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@
response = client.write(sync_config, records)
expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

Expand All @@ -135,6 +141,12 @@
response = client.write(sync_config, records)
expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@

expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

Expand All @@ -142,6 +148,12 @@

expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end
end
Expand Down
Loading

0 comments on commit fe27a80

Please sign in to comment.