diff --git a/integrations/Gemfile b/integrations/Gemfile index 667bf366..6593aedf 100644 --- a/integrations/Gemfile +++ b/integrations/Gemfile @@ -51,6 +51,8 @@ gem "net-sftp" gem "csv" +gem "rubyzip" + group :development, :test do gem "simplecov", require: false gem "simplecov_json_formatter", require: false diff --git a/integrations/Gemfile.lock b/integrations/Gemfile.lock index 60346af1..8e9a1eb2 100644 --- a/integrations/Gemfile.lock +++ b/integrations/Gemfile.lock @@ -7,7 +7,7 @@ GIT PATH remote: . specs: - multiwoven-integrations (0.1.63) + multiwoven-integrations (0.1.64) activesupport async-websocket csv @@ -24,6 +24,7 @@ PATH restforce ruby-limiter ruby-odbc + rubyzip sequel slack-ruby-client stripe @@ -229,6 +230,7 @@ GEM ruby-limiter (2.3.0) ruby-progressbar (1.13.0) ruby2_keywords (0.0.5) + rubyzip (2.3.2) sequel (5.75.0) bigdecimal signet (0.18.0) @@ -292,6 +294,7 @@ DEPENDENCIES rubocop (~> 1.21) ruby-limiter ruby-odbc! + rubyzip sequel simplecov simplecov_json_formatter diff --git a/integrations/lib/multiwoven/integrations.rb b/integrations/lib/multiwoven/integrations.rb index 16325115..d08da0a6 100644 --- a/integrations/lib/multiwoven/integrations.rb +++ b/integrations/lib/multiwoven/integrations.rb @@ -22,6 +22,7 @@ require "net/sftp" require "csv" require "securerandom" +require "zip" # Service require_relative "integrations/config" diff --git a/integrations/lib/multiwoven/integrations/destination/sftp/client.rb b/integrations/lib/multiwoven/integrations/destination/sftp/client.rb index 8a3cfc44..bbaa3ac0 100644 --- a/integrations/lib/multiwoven/integrations/destination/sftp/client.rb +++ b/integrations/lib/multiwoven/integrations/destination/sftp/client.rb @@ -39,27 +39,58 @@ def write(sync_config, records, _action = "insert") file_path = generate_file_path(sync_config) local_file_name = generate_local_file_name(sync_config) csv_content = generate_csv_content(records) + records_size = records.size write_success = 0 - write_failure = 0 + case connection_config[:format][:compression_type] + when CompressionType.enum("zip") + write_success = write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) + when CompressionType.enum("un_compressed") + write_success = write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) + else + raise ArgumentError, "Unsupported compression type: #{connection_config[:format][:compression_type]}" + end + write_failure = records.size - write_success + tracking_message(write_success, write_failure) + rescue StandardError => e + handle_exception( + "SFTP:WRITE:EXCEPTION", + "error", + e + ) + end + + def write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size) + write_success = 0 + Tempfile.create([local_file_name, ".zip"]) do |tempfile| + Zip::File.open(tempfile.path, Zip::File::CREATE) do |zipfile| + zipfile.get_output_stream("#{local_file_name}.csv") { |f| f.write(csv_content) } + end + with_sftp_client(connection_config) do |sftp| + sftp.upload!(tempfile.path, file_path) + write_success = records_size + rescue StandardError => e + handle_exception("SFTP:RECORD:WRITE:EXCEPTION", "error", e) + write_success = 0 + end + end + write_success + end + + def write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size) + write_success = 0 Tempfile.create([local_file_name, ".csv"]) do |tempfile| tempfile.write(csv_content) tempfile.close with_sftp_client(connection_config) do |sftp| sftp.upload!(tempfile.path, file_path) - write_success += records.size + write_success = records_size rescue StandardError => e handle_exception("SFTP:RECORD:WRITE:EXCEPTION", "error", e) - write_failure += records.size + write_success = 0 end end - tracking_message(write_success, write_failure) - rescue StandardError => e - handle_exception( - "SFTP:WRITE:EXCEPTION", - "error", - e - ) + write_success end def clear_all_records(sync_config) @@ -82,7 +113,13 @@ def clear_all_records(sync_config) def generate_file_path(sync_config) connection_specification = sync_config.destination.connection_specification.with_indifferent_access timestamp = Time.now.strftime("%Y%m%d-%H%M%S") - file_name = "#{connection_specification[:file_name]}_#{timestamp}.csv" + format = connection_specification[:format] + extension = if format[:compression_type] == "un_compressed" + format[:format_type] + else + format[:compression_type] + end + file_name = "#{connection_specification[:file_name]}_#{timestamp}.#{extension}" File.join(connection_specification[:destination_path], file_name) end diff --git a/integrations/lib/multiwoven/integrations/destination/sftp/config/spec.json b/integrations/lib/multiwoven/integrations/destination/sftp/config/spec.json index 264bd1b0..844d5960 100644 --- a/integrations/lib/multiwoven/integrations/destination/sftp/config/spec.json +++ b/integrations/lib/multiwoven/integrations/destination/sftp/config/spec.json @@ -1,10 +1,10 @@ { - "documentation_url": "https://docs.multiwoven.com/integrations/destination/klaviyo", + "documentation_url": "https://docs.multiwoven.com/destinations/file-storage/sftp", "stream_type": "static", "connection_specification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "SFTP", - "required": ["host", "username", "password", "destination_path"], + "required": ["host", "username", "password", "destination_path", "format" ], "properties": { "host": { "title": "Host", @@ -45,7 +45,29 @@ "type": "string", "description": "Name of the file to be written.", "order": 5 + }, + "format": { + "title": "Output Format", + "type": "object", + "description": "Format of the data output.", + "order": 6, + "required": ["format_type"], + "properties": { + "format_type": { + "title": "File Format Type", + "type": "string", + "enum": ["csv"], + "default": "csv" + }, + "compression_type": { + "title": "Compression Type", + "description": "Whether the output files should be compressed.", + "type": "string", + "enum": ["un_compressed", "zip"], + "default": "un_compressed" + } + } } } } -} +} \ No newline at end of file diff --git a/integrations/lib/multiwoven/integrations/protocol/protocol.rb b/integrations/lib/multiwoven/integrations/protocol/protocol.rb index 4b3bc5c3..4550dbc1 100644 --- a/integrations/lib/multiwoven/integrations/protocol/protocol.rb +++ b/integrations/lib/multiwoven/integrations/protocol/protocol.rb @@ -26,6 +26,8 @@ module Types LogLevel = Types::String.enum("fatal", "error", "warn", "info", "debug", "trace") RequestRateLimitingUnit = Types::String.default("minute").enum("minute", "hour", "day") SchemaMode = Types::String.enum("schema", "schemaless") + FileFormatType = Types::String.enum("csv") + CompressionType = Types::String.enum("un_compressed", "zip") class ProtocolModel < Dry::Struct extend Multiwoven::Integrations::Core::Utils diff --git a/integrations/lib/multiwoven/integrations/rollout.rb b/integrations/lib/multiwoven/integrations/rollout.rb index bb85970a..aea3a5ab 100644 --- a/integrations/lib/multiwoven/integrations/rollout.rb +++ b/integrations/lib/multiwoven/integrations/rollout.rb @@ -2,7 +2,7 @@ module Multiwoven module Integrations - VERSION = "0.1.63" + VERSION = "0.1.64" ENABLED_SOURCES = %w[ Snowflake diff --git a/integrations/multiwoven-integrations.gemspec b/integrations/multiwoven-integrations.gemspec index a98ea19c..4513e38c 100644 --- a/integrations/multiwoven-integrations.gemspec +++ b/integrations/multiwoven-integrations.gemspec @@ -49,6 +49,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "restforce" spec.add_runtime_dependency "ruby-limiter" spec.add_runtime_dependency "ruby-odbc" + spec.add_runtime_dependency "rubyzip" spec.add_runtime_dependency "sequel" spec.add_runtime_dependency "slack-ruby-client" spec.add_runtime_dependency "stripe" diff --git a/integrations/spec/multiwoven/integrations/destination/sftp/client_spec.rb b/integrations/spec/multiwoven/integrations/destination/sftp/client_spec.rb index c777361e..a9a930a3 100644 --- a/integrations/spec/multiwoven/integrations/destination/sftp/client_spec.rb +++ b/integrations/spec/multiwoven/integrations/destination/sftp/client_spec.rb @@ -17,7 +17,11 @@ port: 22, password: "test_password", destination_path: "/multiwoven", - file_name: "test" + file_name: "test", + format: { + format_type: "csv", + compression_type: "un_compressed" + } }.with_indifferent_access end let(:sync_config_json) do @@ -58,6 +62,18 @@ end let(:csv_content) { "id,name\n1,Test Record\n" } + def sync_config + Multiwoven::Integrations::Protocol::SyncConfig.from_json( + sync_config_json.to_json + ) + end + + def sync_config_compressed_zip + sync_config_json[:destination][:connection_specification][:format][:compression_type] = "zip" + Multiwoven::Integrations::Protocol::SyncConfig.from_json( + sync_config_json.to_json + ) + end describe "#check_connection" do it "successfully checks connection" do expect(client).to receive(:with_sftp_client).and_yield(double) @@ -92,7 +108,7 @@ end describe "#write" do - it "successfully writes records" do + it "successfully writes records with un_compressed" do allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session) allow(client).to receive(:generate_csv_content).and_return(csv_content) allow(mock_sftp_session).to receive(:upload!).and_return(true) @@ -101,7 +117,16 @@ expect(response.tracking.failed).to eq(0) end - it "handles the failure and increments the failure count" do + it "successfully writes records with compressed" do + allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session) + allow(client).to receive(:generate_csv_content).and_return(csv_content) + allow(mock_sftp_session).to receive(:upload!).and_return(true) + response = client.write(sync_config_compressed_zip, records, "insert") + expect(response.tracking.success).to eq(records.size) + expect(response.tracking.failed).to eq(0) + end + + it "handles the failure with un_compressed " do allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session) allow(mock_sftp_session).to receive(:upload!).and_raise(StandardError, "SFTP upload failed") response = client.write(sync_config, records, "insert") @@ -111,6 +136,15 @@ expect(response.tracking.success).to eq(0) end + it "handles the failure with compressed " do + allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session) + allow(mock_sftp_session).to receive(:upload!).and_raise(StandardError, "SFTP upload failed") + response = client.write(sync_config_compressed_zip, records, "insert") + + expect(response.tracking.failed).to eq(records.size) + expect(response.tracking.success).to eq(0) + end + it "handles write failure with_sftp_client" do allow(client).to receive(:with_sftp_client).and_raise(StandardError.new("write failed")) response = client.write(sync_config, records, "insert") @@ -185,14 +219,14 @@ end describe "#generate_file_path" do - it "generate file" do - expect(client.send(:generate_file_path, sync_config)).to include("/multiwoven/test_") + it "generate csv file" do + file_path = client.send(:generate_file_path, sync_config) + expect(file_path).to match(%r{/multiwoven/test_\d{8}-\d{6}\.csv\z}) end - end - def sync_config - Multiwoven::Integrations::Protocol::SyncConfig.from_json( - sync_config_json.to_json - ) + it "generate zip file" do + file_path = client.send(:generate_file_path, sync_config_compressed_zip) + expect(file_path).to match(%r{/multiwoven/test_\d{8}-\d{6}\.zip\z}) + end end end diff --git a/server/0001-feat-add-compression-support-for-sftp-103.patch b/server/0001-feat-add-compression-support-for-sftp-103.patch new file mode 100644 index 00000000..97d27f8f --- /dev/null +++ b/server/0001-feat-add-compression-support-for-sftp-103.patch @@ -0,0 +1,23 @@ +From 7fac85d2241ad4e7e34ead928c24ecef29e6475c Mon Sep 17 00:00:00 2001 +From: afthab vp +Date: Fri, 3 May 2024 21:58:09 +0530 +Subject: [PATCH] feat: add compression support for sftp (#103) + +* feat: Add zip support in sftp write + +* feat: add zip support + +* fix: support both csv and zip + +* fix: add spec + +* fix: revert the server change + +* fix: revert server changes + +* fix: revert server batchquery + +* fix: pr comments resolved +-- +2.43.1 +