From fb4c3ee32e3dc6e07cde9779cd10cbc4dfa3d5ea Mon Sep 17 00:00:00 2001 From: TivonB-AI2 <124182151+TivonB-AI2@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:36:51 -0400 Subject: [PATCH] Resolve conflict in cherry-pick of 58adff6e7c1cc40357ca3a4df1697fcf2cdd8552 and change the commit message --- integrations/Gemfile.lock | 4 + .../multiwoven/integrations/core/constants.rb | 14 ++ .../destination/microsoft_excel/client.rb | 198 ++++++++++++++++ .../microsoft_excel/config/catalog.json | 7 + .../lib/multiwoven/integrations/rollout.rb | 4 + .../microsoft_excel/client_spec.rb | 224 ++++++++++++++++++ 6 files changed, 451 insertions(+) create mode 100644 integrations/lib/multiwoven/integrations/destination/microsoft_excel/client.rb create mode 100644 integrations/lib/multiwoven/integrations/destination/microsoft_excel/config/catalog.json create mode 100644 integrations/spec/multiwoven/integrations/destination/microsoft_excel/client_spec.rb diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index 71ed2acb..5a4d3247 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,11 @@ GIT PATH remote: . specs: +<<<<<<< HEAD multiwoven-integrations (0.7.9) +======= + multiwoven-integrations (0.8.3) +>>>>>>> 58adff6e (fix(CE): fix discover and table url (#380)) activesupport async-websocket aws-sdk-athena diff --git a/integrations/lib/multiwoven/integrations/core/constants.rb b/integrations/lib/multiwoven/integrations/core/constants.rb index 45b106f0..74a07663 100644 --- a/integrations/lib/multiwoven/integrations/core/constants.rb +++ b/integrations/lib/multiwoven/integrations/core/constants.rb @@ -36,6 +36,20 @@ module Constants AIRTABLE_BASES_ENDPOINT = "https://api.airtable.com/v0/meta/bases" AIRTABLE_GET_BASE_SCHEMA_ENDPOINT = "https://api.airtable.com/v0/meta/bases/{baseId}/tables" +<<<<<<< HEAD +======= + MS_EXCEL_AUTH_ENDPOINT = "https://graph.microsoft.com/v1.0/me" + MS_EXCEL_TABLE_ROW_WRITE_API = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/"\ + "workbook/worksheets/%s/tables/%s/rows" + MS_EXCEL_TABLE_API = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/workbook/"\ + "worksheets/%s/tables?$select=name" + MS_EXCEL_FILES_API = "https://graph.microsoft.com/v1.0/drives/%s/root/children" + MS_EXCEL_WORKSHEETS_API = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/"\ + "workbook/worksheets" + MS_EXCEL_SHEET_RANGE_API = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/"\ + "workbook/worksheets/%s/range(address='A1:Z1')/usedRange?$select=values" + +>>>>>>> 58adff6e (fix(CE): fix discover and table url (#380)) AWS_ACCESS_KEY_ID = ENV["AWS_ACCESS_KEY_ID"] AWS_SECRET_ACCESS_KEY = ENV["AWS_SECRET_ACCESS_KEY"] diff --git a/integrations/lib/multiwoven/integrations/destination/microsoft_excel/client.rb b/integrations/lib/multiwoven/integrations/destination/microsoft_excel/client.rb new file mode 100644 index 00000000..cc120a68 --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/microsoft_excel/client.rb @@ -0,0 +1,198 @@ +# frozen_string_literal: true + +module Multiwoven::Integrations::Destination + module MicrosoftExcel + include Multiwoven::Integrations::Core + class Client < DestinationConnector + prepend Multiwoven::Integrations::Core::RateLimiter + def check_connection(connection_config) + connection_config = connection_config.with_indifferent_access + drive_id = create_connection(connection_config) + if drive_id + success_status + else + failure_status(nil) + end + rescue StandardError => e + handle_exception(e, { + context: "MICROSOFT:EXCEL:CHECK_CONNECTION:EXCEPTION", + type: "error" + }) + failure_status(e) + end + + def discover(connection_config) + catalog_json = read_json(CATALOG_SPEC_PATH) + connection_config = connection_config.with_indifferent_access + token = connection_config[:token] + drive_id = create_connection(connection_config) + records = get_file(token, drive_id) + records.each do |record| + file_id = record[:id] + record[:worksheets] = get_file_data(token, drive_id, file_id) + end + catalog = Catalog.new(streams: create_streams(records, catalog_json)) + catalog.to_multiwoven_message + rescue StandardError => e + handle_exception(e, { + context: "MICROSOFT:EXCEL:DISCOVER:EXCEPTION", + type: "error" + }) + end + + def write(sync_config, records, _action = "destination_insert") + connection_config = sync_config.destination.connection_specification.with_indifferent_access + token = connection_config[:token] + file_name = sync_config.stream.name.split(", ").first + sheet_name = sync_config.stream.name.split(", ").last + drive_id = create_connection(connection_config) + excel_files = get_file(token, drive_id) + worksheet = excel_files.find { |file| file[:name] == file_name } + item_id = worksheet[:id] + table = get_table(token, drive_id, item_id, sheet_name) + write_url = format(MS_EXCEL_TABLE_ROW_WRITE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name, + table_name: table["name"]) + payload = { values: records.map(&:values) } + process_write_request(write_url, payload, token, sync_config) + end + + private + + def create_connection(connection_config) + token = connection_config[:token] + response = Multiwoven::Integrations::Core::HttpClient.request( + MS_EXCEL_AUTH_ENDPOINT, + HTTP_GET, + headers: auth_headers(token) + ) + JSON.parse(response.body)["id"] + end + + def get_table(token, drive_id, item_id, sheet_name) + table_url = format(MS_EXCEL_TABLE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name) + response = Multiwoven::Integrations::Core::HttpClient.request( + table_url, + HTTP_GET, + headers: auth_headers(token) + ) + JSON.parse(response.body)["value"].first + end + + def get_file(token, drive_id) + url = format(MS_EXCEL_FILES_API, drive_id: drive_id) + response = Multiwoven::Integrations::Core::HttpClient.request( + url, + HTTP_GET, + headers: auth_headers(token) + ) + files = JSON.parse(response.body)["value"] + excel_files = files.select { |file| file["name"].match(/\.(xlsx|xls|xlsm)$/) } + excel_files.map { |file| { name: file["name"], id: file["id"] } } + end + + def get_all_sheets(token, drive_id, item_id) + base_url = format(MS_EXCEL_WORKSHEETS_API, drive_id: drive_id, item_id: item_id) + worksheet_response = Multiwoven::Integrations::Core::HttpClient.request( + base_url, + HTTP_GET, + headers: auth_headers(token) + ) + JSON.parse(worksheet_response.body)["value"] + end + + def get_file_data(token, drive_id, item_id) + result = [] + worksheets_data = get_all_sheets(token, drive_id, item_id) + worksheets_data.each do |sheet| + sheet_name = sheet["name"] + sheet_url = format(MS_EXCEL_SHEET_RANGE_API, drive_id: drive_id, item_id: item_id, sheet_name: sheet_name) + + sheet_response = Multiwoven::Integrations::Core::HttpClient.request( + sheet_url, + HTTP_GET, + headers: auth_headers(token) + ) + sheets_data = JSON.parse(sheet_response.body) + column_names = if sheets_data.key?("error") + ["Column A"] + else + sheets_data["values"].first + end + result << { + sheet_name: sheet_name, + column_names: column_names + } + end + result + end + + def create_streams(records, catalog_json) + group_by_table(records).flat_map do |_, record| + record.map do |_, r| + Multiwoven::Integrations::Protocol::Stream.new( + name: r[:workbook], + action: StreamAction["fetch"], + json_schema: convert_to_json_schema(r[:columns]), + request_rate_limit: catalog_json["request_rate_limit"] || 60, + request_rate_limit_unit: catalog_json["request_rate_limit_unit"] || "minute", + request_rate_concurrency: catalog_json["request_rate_concurrency"] || 1 + ) + end + end + end + + def group_by_table(records) + result = {} + + records.each_with_index do |entries, entries_index| + entries[:worksheets].each_with_index do |sheet, entry_index| + workbook_sheet = "#{entries[:name]}, #{sheet[:sheet_name]}" + columns = sheet[:column_names].map do |column_name| + column_name = "empty column" if column_name.empty? + { + column_name: column_name, + data_type: "String", + is_nullable: true + } + end + result[entries_index] ||= {} + result[entries_index][entry_index] = { workbook: workbook_sheet, columns: columns } + end + end + result + end + + def process_write_request(write_url, payload, token, sync_config) + write_success = 0 + write_failure = 0 + log_message_array = [] + + begin + response = Multiwoven::Integrations::Core::HttpClient.request( + write_url, + HTTP_POST, + payload: payload, + headers: auth_headers(token) + ) + if success?(response) + write_success += 1 + else + write_failure += 1 + end + log_message_array << log_request_response("info", [HTTP_POST, write_url, payload], response) + rescue StandardError => e + handle_exception(e, { + context: "MICROSOFT:EXCEL:RECORD:WRITE:EXCEPTION", + type: "error", + sync_id: sync_config.sync_id, + sync_run_id: sync_config.sync_run_id + }) + write_failure += 1 + log_message_array << log_request_response("error", [HTTP_POST, write_url, payload], e.message) + end + + tracking_message(write_success, write_failure, log_message_array) + end + end + end +end diff --git a/integrations/lib/multiwoven/integrations/destination/microsoft_excel/config/catalog.json b/integrations/lib/multiwoven/integrations/destination/microsoft_excel/config/catalog.json new file mode 100644 index 00000000..ace9ffee --- /dev/null +++ b/integrations/lib/multiwoven/integrations/destination/microsoft_excel/config/catalog.json @@ -0,0 +1,7 @@ +{ + "request_rate_limit": 6000, + "request_rate_limit_unit": "minute", + "request_rate_concurrency": 1, + "streams": [] +} + diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index 62c6d503..0071b80b 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,11 @@ module Multiwoven module Integrations +<<<<<<< HEAD VERSION = "0.7.9" +======= + VERSION = "0.8.3" +>>>>>>> 58adff6e (fix(CE): fix discover and table url (#380)) ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/spec/multiwoven/integrations/destination/microsoft_excel/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/microsoft_excel/client_spec.rb new file mode 100644 index 00000000..46a5280d --- /dev/null +++ b/integrations/spec/multiwoven/integrations/destination/microsoft_excel/client_spec.rb @@ -0,0 +1,224 @@ +# frozen_string_literal: true + +RSpec.describe Multiwoven::Integrations::Destination::MicrosoftExcel::Client do + include WebMock::API + + before(:each) do + WebMock.disable_net_connect!(allow_localhost: true) + end + + let(:client) { described_class.new } + let(:connection_config) do + { + token: "test" + } + end + let(:sync_config_json) do + { + source: { + name: "Sample Source Connector", + type: "source", + connection_specification: { + private_api_key: "test_api_key" + } + }, + destination: { + name: "Databricks", + type: "destination", + connection_specification: connection_config + }, + model: { + name: "ExampleModel", + query: "SELECT col1, col2, col3 FROM test_table_1", + query_type: "raw_sql", + primary_key: "col1" + }, + sync_mode: "incremental", + destination_sync_mode: "insert", + stream: { + name: "test_table.xlsx, sheet", + action: "create", + json_schema: {}, + supported_sync_modes: %w[incremental], + request_rate_limit: 4, + rate_limit_unit_seconds: 1 + } + } + end + + let(:response_body) { { "id" => "DRIVE1" }.to_json } + let(:successful_update_response_body) { { "values" => [["400", "4.4", "Fourth"]] }.to_json } + let(:failed_update_response_body) { { "values" => [["400", "4.4", "Fourth"]] }.to_json } + + describe "#check_connection" do + context "when the connection is successful" do + it "returns a succeeded connection status" do + stub_request(:get, "https://graph.microsoft.com/v1.0/me") + .to_return(status: 200, body: response_body, headers: {}) + + allow(client).to receive(:create_connection).and_return("DRIVE1") + + message = client.check_connection(sync_config_json[:destination][:connection_specification]) + result = message.connection_status + expect(result.status).to eq("succeeded") + expect(result.message).to be_nil + end + end + + context "when the connection fails" do + it "returns a failed connection status with an error message" do + allow(client).to receive(:create_connection).and_raise(StandardError, "Connection failed") + message = client.check_connection(sync_config_json[:destination][:connection_specification]) + result = message.connection_status + expect(result.status).to eq("failed") + expect(result.message).to include("Connection failed") + end + end + end + + describe "#discover" do + it "discovers schema successfully" do + stub_request(:get, "https://graph.microsoft.com/v1.0/me") + .to_return(status: 200, body: response_body, headers: {}) + stub_request(:get, "https://graph.microsoft.com/v1.0/drives/DRIVE1/root/children") + .to_return( + status: 200, + body: { + "value" => [ + { "id" => "file1_id", "name" => "test_file.xlsx" } + ] + }.to_json, + headers: {} + ) + allow(client).to receive(:get_all_sheets).and_return([ + { "name" => "Sheet1" } + ]) + stub_request(:get, "https://graph.microsoft.com/v1.0/drives/DRIVE1/items/file1_id/workbook/worksheets/Sheet1/"\ + "range(address='A1:Z1')/usedRange?$select=values") + .to_return( + status: 200, + body: { + "values" => [%w[col1 col2 col3]] + }.to_json, + headers: {} + ) + + message = client.discover(connection_config) + catalog = message.catalog + expect(catalog).to be_a(Multiwoven::Integrations::Protocol::Catalog) + expect(catalog.streams.first.request_rate_limit).to eql(6000) + expect(catalog.streams.first.request_rate_limit_unit).to eql("minute") + expect(catalog.streams.first.request_rate_concurrency).to eql(1) + expect(catalog.streams.count).to eql(1) + expect(catalog.streams[0].supported_sync_modes).to eql(%w[incremental]) + end + end + + describe "#write" do + context "when the write operation is successful" do + it "increments the success count" do + stub_request(:get, "https://graph.microsoft.com/v1.0/me") + .to_return(status: 200, body: response_body, headers: {}) + + stub_request(:get, "https://graph.microsoft.com/v1.0/drives/DRIVE1/root/children") + .to_return( + status: 200, + body: { + "value" => [ + { "id" => "file1_id", "name" => "test_table.xlsx" } + ] + }.to_json, + headers: {} + ) + + stub_request(:get, "https://graph.microsoft.com/v1.0/drives/DRIVE1/items/file1_id/workbook/worksheets/sheet/"\ + "tables?$select=name") + .to_return( + status: 200, + body: { + "value" => [ + { "name" => "Table1" } + ] + }.to_json, + headers: {} + ) + + stub_request(:post, "https://graph.microsoft.com/v1.0/drives/DRIVE1/items/file1_id/workbook/worksheets/"\ + "sheet/tables/Table1/rows") + .to_return(status: 201, body: successful_update_response_body, headers: {}) + + sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) + records = [{ "Col1" => 400, "Col2" => 4.4, "Col3" => "Fourth" }] + + message = client.write(sync_config, records) + tracker = message.tracking + + expect(tracker.success).to eq(records.count) + expect(tracker.failed).to eq(0) + log_message = message.tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") + end + end + + context "when the write operation fails" do + it "increments the failure count" do + stub_request(:get, "https://graph.microsoft.com/v1.0/me") + .to_return(status: 200, body: response_body, headers: {}) + + stub_request(:get, "https://graph.microsoft.com/v1.0/drives/DRIVE1/root/children") + .to_return( + status: 200, + body: { + "value" => [ + { "id" => "file1_id", "name" => "test_table.xlsx" } + ] + }.to_json, + headers: {} + ) + + stub_request(:get, "https://graph.microsoft.com/v1.0/drives/DRIVE1/items/file1_id/workbook/worksheets/sheet/"\ + "tables?$select=name") + .to_return( + status: 200, + body: { + "value" => [ + { "name" => "Table1" } + ] + }.to_json, + headers: {} + ) + + stub_request(:post, + "https://graph.microsoft.com/v1.0/drives/DRIVE1/items/file1_id/workbook/worksheets/"\ + "sheet/tables/Table1/rows") + .to_return(status: 400, body: failed_update_response_body, headers: {}) + + sync_config = Multiwoven::Integrations::Protocol::SyncConfig.from_json(sync_config_json.to_json) + records = [{ "Col1" => 400, "Col2" => 4.4, "Col3" => "Fourth" }] + + message = client.write(sync_config, records) + tracker = message.tracking + expect(tracker.failed).to eq(records.count) + expect(tracker.success).to eq(0) + log_message = message.tracking.logs.first + expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage) + expect(log_message.level).to eql("info") + + expect(log_message.message).to include("request") + expect(log_message.message).to include("response") + end + end + end + + describe "#meta_data" do + # change this to rollout validation for all connector rolling out + it "client class_name and meta name is same" do + meta_name = client.class.to_s.split("::")[-2] + expect(client.send(:meta_data)[:data][:name]).to eq(meta_name) + end + end +end