Skip to content

Commit

Permalink
Resolve conflict in cherry-pick of 58adff6e7c1cc40357ca3a4df1697fcf2c…
Browse files Browse the repository at this point in the history
…dd8552 and change the commit message
  • Loading branch information
TivonB-AI2 authored and RafaelOAiSquared committed Aug 15, 2024
1 parent 13c17f9 commit fb4c3ee
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 0 deletions.
4 changes: 4 additions & 0 deletions integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ GIT
PATH
remote: .
specs:
<<<<<<< HEAD
multiwoven-integrations (0.7.9)
=======
multiwoven-integrations (0.8.3)
>>>>>>> 58adff6e (fix(CE): fix discover and table url (#380))
activesupport
async-websocket
aws-sdk-athena
Expand Down
14 changes: 14 additions & 0 deletions integrations/lib/multiwoven/integrations/core/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,20 @@ module Constants
AIRTABLE_BASES_ENDPOINT = "https://api.airtable.com/v0/meta/bases"
AIRTABLE_GET_BASE_SCHEMA_ENDPOINT = "https://api.airtable.com/v0/meta/bases/{baseId}/tables"

<<<<<<< HEAD
=======
MS_EXCEL_AUTH_ENDPOINT = "https://graph.microsoft.com/v1.0/me"
MS_EXCEL_TABLE_ROW_WRITE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets/%<sheet_name>s/tables/%<table_name>s/rows"
MS_EXCEL_TABLE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/workbook/"\
"worksheets/%<sheet_name>s/tables?$select=name"
MS_EXCEL_FILES_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/root/children"
MS_EXCEL_WORKSHEETS_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets"
MS_EXCEL_SHEET_RANGE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets/%<sheet_name>s/range(address='A1:Z1')/usedRange?$select=values"

>>>>>>> 58adff6e (fix(CE): fix discover and table url (#380))
AWS_ACCESS_KEY_ID = ENV["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = ENV["AWS_SECRET_ACCESS_KEY"]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Destination
module MicrosoftExcel
include Multiwoven::Integrations::Core
class Client < DestinationConnector
prepend Multiwoven::Integrations::Core::RateLimiter
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
drive_id = create_connection(connection_config)
if drive_id
success_status
else
failure_status(nil)
end
rescue StandardError => e
handle_exception(e, {
context: "MICROSOFT:EXCEL:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
failure_status(e)
end

def discover(connection_config)
catalog_json = read_json(CATALOG_SPEC_PATH)
connection_config = connection_config.with_indifferent_access
token = connection_config[:token]
drive_id = create_connection(connection_config)
records = get_file(token, drive_id)
records.each do |record|
file_id = record[:id]
record[:worksheets] = get_file_data(token, drive_id, file_id)
end
catalog = Catalog.new(streams: create_streams(records, catalog_json))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "MICROSOFT:EXCEL:DISCOVER:EXCEPTION",
type: "error"
})
end

def write(sync_config, records, _action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
token = connection_config[:token]
file_name = sync_config.stream.name.split(", ").first
sheet_name = sync_config.stream.name.split(", ").last
drive_id = create_connection(connection_config)
excel_files = get_file(token, drive_id)
worksheet = excel_files.find { |file| file[:name] == file_name }
item_id = worksheet[:id]
table = get_table(token, drive_id, item_id, sheet_name)
write_url = format(MS_EXCEL_TABLE_ROW_WRITE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name,
table_name: table["name"])
payload = { values: records.map(&:values) }
process_write_request(write_url, payload, token, sync_config)
end

private

def create_connection(connection_config)
token = connection_config[:token]
response = Multiwoven::Integrations::Core::HttpClient.request(
MS_EXCEL_AUTH_ENDPOINT,
HTTP_GET,
headers: auth_headers(token)
)
JSON.parse(response.body)["id"]
end

def get_table(token, drive_id, item_id, sheet_name)
table_url = format(MS_EXCEL_TABLE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name)
response = Multiwoven::Integrations::Core::HttpClient.request(
table_url,
HTTP_GET,
headers: auth_headers(token)
)
JSON.parse(response.body)["value"].first
end

def get_file(token, drive_id)
url = format(MS_EXCEL_FILES_API, drive_id: drive_id)
response = Multiwoven::Integrations::Core::HttpClient.request(
url,
HTTP_GET,
headers: auth_headers(token)
)
files = JSON.parse(response.body)["value"]
excel_files = files.select { |file| file["name"].match(/\.(xlsx|xls|xlsm)$/) }
excel_files.map { |file| { name: file["name"], id: file["id"] } }
end

def get_all_sheets(token, drive_id, item_id)
base_url = format(MS_EXCEL_WORKSHEETS_API, drive_id: drive_id, item_id: item_id)
worksheet_response = Multiwoven::Integrations::Core::HttpClient.request(
base_url,
HTTP_GET,
headers: auth_headers(token)
)
JSON.parse(worksheet_response.body)["value"]
end

def get_file_data(token, drive_id, item_id)
result = []
worksheets_data = get_all_sheets(token, drive_id, item_id)
worksheets_data.each do |sheet|
sheet_name = sheet["name"]
sheet_url = format(MS_EXCEL_SHEET_RANGE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name)

sheet_response = Multiwoven::Integrations::Core::HttpClient.request(
sheet_url,
HTTP_GET,
headers: auth_headers(token)
)
sheets_data = JSON.parse(sheet_response.body)
column_names = if sheets_data.key?("error")
["Column A"]
else
sheets_data["values"].first
end
result << {
sheet_name: sheet_name,
column_names: column_names
}
end
result
end

def create_streams(records, catalog_json)
group_by_table(records).flat_map do |_, record|
record.map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(
name: r[:workbook],
action: StreamAction["fetch"],
json_schema: convert_to_json_schema(r[:columns]),
request_rate_limit: catalog_json["request_rate_limit"] || 60,
request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute",
request_rate_concurrency: catalog_json["request_rate_concurrency"] || 1
)
end
end
end

def group_by_table(records)
result = {}

records.each_with_index do |entries, entries_index|
entries[:worksheets].each_with_index do |sheet, entry_index|
workbook_sheet = "#{entries[:name]}, #{sheet[:sheet_name]}"
columns = sheet[:column_names].map do |column_name|
column_name = "empty column" if column_name.empty?
{
column_name: column_name,
data_type: "String",
is_nullable: true
}
end
result[entries_index] ||= {}
result[entries_index][entry_index] = { workbook: workbook_sheet, columns: columns }
end
end
result
end

def process_write_request(write_url, payload, token, sync_config)
write_success = 0
write_failure = 0
log_message_array = []

begin
response = Multiwoven::Integrations::Core::HttpClient.request(
write_url,
HTTP_POST,
payload: payload,
headers: auth_headers(token)
)
if success?(response)
write_success += 1
else
write_failure += 1
end
log_message_array << log_request_response("info", [HTTP_POST, write_url, payload], response)
rescue StandardError => e
handle_exception(e, {
context: "MICROSOFT:EXCEL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", [HTTP_POST, write_url, payload], e.message)
end

tracking_message(write_success, write_failure, log_message_array)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"request_rate_limit": 6000,
"request_rate_limit_unit": "minute",
"request_rate_concurrency": 1,
"streams": []
}

4 changes: 4 additions & 0 deletions integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

module Multiwoven
module Integrations
<<<<<<< HEAD
VERSION = "0.7.9"
=======
VERSION = "0.8.3"
>>>>>>> 58adff6e (fix(CE): fix discover and table url (#380))

ENABLED_SOURCES = %w[
Snowflake
Expand Down
Loading

0 comments on commit fb4c3ee

Please sign in to comment.