Skip to content

Commit

Permalink
feat(CE): destination/microsoft excel (#314)
Browse files Browse the repository at this point in the history
Co-authored-by: TivonB-AI2 <[email protected]>
  • Loading branch information
RafaelOAiSquared and TivonB-AI2 authored Aug 19, 2024
1 parent 38fbcb9 commit 46f1840
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 20 deletions.
2 changes: 1 addition & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.7.9)
multiwoven-integrations (0.8.0)
activesupport
async-websocket
aws-sdk-athena
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
require_relative "integrations/destination/maria_db/client"
require_relative "integrations/destination/databricks_lakehouse/client"
require_relative "integrations/destination/oracle_db/client"
require_relative "integrations/destination/microsoft_excel/client"

module Multiwoven
module Integrations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def success_status
end

def failure_status(error)
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: error.message).to_multiwoven_message
message = error&.message || "failed"
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: message).to_multiwoven_message
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions integrations/lib/multiwoven/integrations/core/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ module Constants
AIRTABLE_BASES_ENDPOINT = "https://api.airtable.com/v0/meta/bases"
AIRTABLE_GET_BASE_SCHEMA_ENDPOINT = "https://api.airtable.com/v0/meta/bases/{baseId}/tables"

MS_EXCEL_AUTH_ENDPOINT = "https://graph.microsoft.com/v1.0/me"
MS_EXCEL_TABLE_ROW_WRITE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets/%<sheet_name>s/tables/%<table_name>s/rows"
MS_EXCEL_TABLE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/workbook/"\
"worksheets/sheet/tables?$select=name"
MS_EXCEL_FILES_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/root/children"
MS_EXCEL_WORKSHEETS_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets"
MS_EXCEL_SHEET_RANGE_API = "https://graph.microsoft.com/v1.0/drives/%<drive_id>s/items/%<item_id>s/"\
"workbook/worksheets/%<sheet_name>s/range(address='A1:Z1')/usedRange?$select=values"

AWS_ACCESS_KEY_ID = ENV["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = ENV["AWS_SECRET_ACCESS_KEY"]

Expand All @@ -44,6 +55,7 @@ module Constants
HTTP_POST = "POST"
HTTP_PUT = "PUT"
HTTP_DELETE = "DELETE"
HTTP_PATCH = "PATCH"

# google sheets
GOOGLE_SHEETS_SCOPE = "https://www.googleapis.com/auth/drive"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ def tracking_message(success, failure, log_message_array)
success: success, failed: failure, logs: log_message_array
).to_multiwoven_message
end

def auth_headers(access_token)
{
"Accept" => "application/json",
"Authorization" => "Bearer #{access_token}",
"Content-Type" => "application/json"
}
end
end
end
end
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/core/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ def build_request(method, uri, payload, headers)
when Constants::HTTP_GET then Net::HTTP::Get
when Constants::HTTP_POST then Net::HTTP::Post
when Constants::HTTP_PUT then Net::HTTP::Put
when Constants::HTTP_PATCH then Net::HTTP::Patch
when Constants::HTTP_DELETE then Net::HTTP::Delete
else raise ArgumentError, "Unsupported HTTP method: #{method}"
end

request = request_class.new(uri)
headers.each { |key, value| request[key] = value }
request.body = payload.to_json if payload && %w[POST PUT].include?(method.upcase)
request.body = payload.to_json if payload && %w[POST PUT PATCH].include?(method.upcase)
request
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,6 @@ def create_payload(records)
}
end

def auth_headers(access_token)
{
"Accept" => "application/json",
"Authorization" => "Bearer #{access_token}",
"Content-Type" => "application/json"
}
end

def base_id_exists?(bases, base_id)
return if extract_bases(bases).any? { |base| base["id"] == base_id }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,6 @@ def extract_schema_and_data(records, json_schema)
[schema, data]
end

def auth_headers(access_token)
{
"Accept" => "application/json",
"Authorization" => "Bearer #{access_token}",
"Content-Type" => "application/json"
}
end

def ad_account_exists?(response, ad_account_id)
return if extract_data(response).any? { |ad_account| ad_account["id"] == "act_#{ad_account_id}" }

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# frozen_string_literal: true

module Multiwoven::Integrations::Destination
module MicrosoftExcel
include Multiwoven::Integrations::Core
class Client < DestinationConnector
prepend Multiwoven::Integrations::Core::RateLimiter
def check_connection(connection_config)
connection_config = connection_config.with_indifferent_access
drive_id = create_connection(connection_config)
if drive_id
success_status
else
failure_status(nil)
end
rescue StandardError => e
handle_exception(e, {
context: "MICROSOFT:EXCEL:CHECK_CONNECTION:EXCEPTION",
type: "error"
})
failure_status(e)
end

def discover(connection_config)
catalog_json = read_json(CATALOG_SPEC_PATH)
connection_config = connection_config.with_indifferent_access
token = connection_config[:token]
drive_id = create_connection(connection_config)
records = get_file(token, drive_id)
records.each do |record|
file_id = record[:id]
record[:worksheets] = get_file_data(token, drive_id, file_id)
end
catalog = Catalog.new(streams: create_streams(records, catalog_json))
catalog.to_multiwoven_message
rescue StandardError => e
handle_exception(e, {
context: "MICROSOFT:EXCEL:DISCOVER:EXCEPTION",
type: "error"
})
end

def write(sync_config, records, _action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
token = connection_config[:token]
file_name = sync_config.stream.name.split(", ").first
sheet_name = sync_config.stream.name.split(", ").last
drive_id = create_connection(connection_config)
excel_files = get_file(token, drive_id)
worksheet = excel_files.find { |file| file[:name] == file_name }
item_id = worksheet[:id]

table = get_table(token, drive_id, item_id)
write_url = format(MS_EXCEL_TABLE_ROW_WRITE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name,
table_name: table["name"])
payload = { values: records.map(&:values) }
process_write_request(write_url, payload, token, sync_config)
end

private

def create_connection(connection_config)
token = connection_config[:token]
response = Multiwoven::Integrations::Core::HttpClient.request(
MS_EXCEL_AUTH_ENDPOINT,
HTTP_GET,
headers: auth_headers(token)
)
JSON.parse(response.body)["id"]
end

def get_table(token, drive_id, item_id)
table_url = format(MS_EXCEL_TABLE_API, drive_id: drive_id, item_id: item_id)
response = Multiwoven::Integrations::Core::HttpClient.request(
table_url,
HTTP_GET,
headers: auth_headers(token)
)
JSON.parse(response.body)["value"].first
end

def get_file(token, drive_id)
url = format(MS_EXCEL_FILES_API, drive_id: drive_id)
response = Multiwoven::Integrations::Core::HttpClient.request(
url,
HTTP_GET,
headers: auth_headers(token)
)
files = JSON.parse(response.body)["value"]
excel_files = files.select { |file| file["name"].match(/\.(xlsx|xls|xlsm)$/) }
excel_files.map { |file| { name: file["name"], id: file["id"] } }
end

def get_all_sheets(token, drive_id, item_id)
base_url = format(MS_EXCEL_WORKSHEETS_API, drive_id: drive_id, item_id: item_id)
worksheet_response = Multiwoven::Integrations::Core::HttpClient.request(
base_url,
HTTP_GET,
headers: auth_headers(token)
)
JSON.parse(worksheet_response.body)["value"]
end

def get_file_data(token, drive_id, item_id)
result = []
worksheets_data = get_all_sheets(token, drive_id, item_id)
worksheets_data.each do |sheet|
sheet_name = sheet["name"]
sheet_url = format(MS_EXCEL_SHEET_RANGE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name)

sheet_response = Multiwoven::Integrations::Core::HttpClient.request(
sheet_url,
HTTP_GET,
headers: auth_headers(token)
)
sheets_data = JSON.parse(sheet_response.body)
result << {
sheet_name: sheet_name,
column_names: sheets_data["values"].first
}
end
result
end

def create_streams(records, catalog_json)
group_by_table(records).flat_map do |_, record|
record.map do |_, r|
Multiwoven::Integrations::Protocol::Stream.new(
name: r[:workbook],
action: StreamAction["fetch"],
json_schema: convert_to_json_schema(r[:columns]),
request_rate_limit: catalog_json["request_rate_limit"] || 60,
request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute",
request_rate_concurrency: catalog_json["request_rate_concurrency"] || 1
)
end
end
end

def group_by_table(records)
result = {}

records.each_with_index do |entries, entries_index|
entries[:worksheets].each_with_index do |sheet, entry_index|
workbook_sheet = "#{entries[:name]}, #{sheet[:sheet_name]}"
columns = sheet[:column_names].map do |column_name|
column_name = "empty column" if column_name.empty?
{
column_name: column_name,
data_type: "String",
is_nullable: true
}
end
result[entries_index] ||= {}
result[entries_index][entry_index] = { workbook: workbook_sheet, columns: columns }
end
end
result
end

def process_write_request(write_url, payload, token, sync_config)
write_success = 0
write_failure = 0
log_message_array = []

begin
response = Multiwoven::Integrations::Core::HttpClient.request(
write_url,
HTTP_POST,
payload: payload,
headers: auth_headers(token)
)
if success?(response)
write_success += 1
else
write_failure += 1
end
log_message_array << log_request_response("info", [HTTP_POST, write_url, payload], response)
rescue StandardError => e
handle_exception(e, {
context: "MICROSOFT:EXCEL:RECORD:WRITE:EXCEPTION",
type: "error",
sync_id: sync_config.sync_id,
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", [HTTP_POST, write_url, payload], e.message)
end

tracking_message(write_success, write_failure, log_message_array)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"request_rate_limit": 6000,
"request_rate_limit_unit": "minute",
"request_rate_concurrency": 10,
"streams": []
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"data": {
"name": "MicrosoftExcel",
"title": "Microsoft Excel",
"connector_type": "destination",
"category": "Database",
"documentation_url": "https://docs.squared.ai/guides/data-integration/destination/microsoft_excel",
"github_issue_label": "destination-microsoft-excel",
"icon": "icon.svg",
"license": "MIT",
"release_stage": "alpha",
"support_level": "community",
"tags": ["language:ruby", "multiwoven"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"documentation_url": "https://docs.squared.ai/guides/data-integration/destination/microsoft_excel",
"stream_type": "dynamic",
"connector_query_type": "raw_sql",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Microsoft Excel",
"type": "object",
"required": ["token"],
"properties": {
"token": {
"description": "Token from Microsoft Graph.",
"type": "string",
"title": "Token",
"order": 0
}
}
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.7.9"
VERSION = "0.8.0"

ENABLED_SOURCES = %w[
Snowflake
Expand Down Expand Up @@ -36,6 +36,7 @@ module Integrations
MariaDB
DatabricksLakehouse
Oracle
MicrosoftExcel
].freeze
end
end
Loading

0 comments on commit 46f1840

Please sign in to comment.