Skip to content

Commit

Permalink
Merge pull request #12 from Multiwoven/feat/sftp-compression-support-…
Browse files Browse the repository at this point in the history
…zip-ee

feat: Added zip support for sftp
  • Loading branch information
afthabvp authored May 7, 2024
2 parents e9a5546 + d405020 commit 4776be6
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 26 deletions.
2 changes: 2 additions & 0 deletions integrations/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ gem "net-sftp"

gem "csv"

gem "rubyzip"

group :development, :test do
gem "simplecov", require: false
gem "simplecov_json_formatter", require: false
Expand Down
5 changes: 4 additions & 1 deletion integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
multiwoven-integrations (0.1.63)
multiwoven-integrations (0.1.64)
activesupport
async-websocket
csv
Expand All @@ -24,6 +24,7 @@ PATH
restforce
ruby-limiter
ruby-odbc
rubyzip
sequel
slack-ruby-client
stripe
Expand Down Expand Up @@ -229,6 +230,7 @@ GEM
ruby-limiter (2.3.0)
ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5)
rubyzip (2.3.2)
sequel (5.75.0)
bigdecimal
signet (0.18.0)
Expand Down Expand Up @@ -292,6 +294,7 @@ DEPENDENCIES
rubocop (~> 1.21)
ruby-limiter
ruby-odbc!
rubyzip
sequel
simplecov
simplecov_json_formatter
Expand Down
1 change: 1 addition & 0 deletions integrations/lib/multiwoven/integrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
require "net/sftp"
require "csv"
require "securerandom"
require "zip"

# Service
require_relative "integrations/config"
Expand Down
59 changes: 48 additions & 11 deletions integrations/lib/multiwoven/integrations/destination/sftp/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,58 @@ def write(sync_config, records, _action = "insert")
file_path = generate_file_path(sync_config)
local_file_name = generate_local_file_name(sync_config)
csv_content = generate_csv_content(records)
records_size = records.size
write_success = 0
write_failure = 0

case connection_config[:format][:compression_type]
when CompressionType.enum("zip")
write_success = write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
when CompressionType.enum("un_compressed")
write_success = write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
else
raise ArgumentError, "Unsupported compression type: #{connection_config[:format][:compression_type]}"
end
write_failure = records.size - write_success
tracking_message(write_success, write_failure)
rescue StandardError => e
handle_exception(
"SFTP:WRITE:EXCEPTION",
"error",
e
)
end

def write_compressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
write_success = 0
Tempfile.create([local_file_name, ".zip"]) do |tempfile|
Zip::File.open(tempfile.path, Zip::File::CREATE) do |zipfile|
zipfile.get_output_stream("#{local_file_name}.csv") { |f| f.write(csv_content) }
end
with_sftp_client(connection_config) do |sftp|
sftp.upload!(tempfile.path, file_path)
write_success = records_size
rescue StandardError => e
handle_exception("SFTP:RECORD:WRITE:EXCEPTION", "error", e)
write_success = 0
end
end
write_success
end

def write_uncompressed_data(connection_config, file_path, local_file_name, csv_content, records_size)
write_success = 0
Tempfile.create([local_file_name, ".csv"]) do |tempfile|
tempfile.write(csv_content)
tempfile.close
with_sftp_client(connection_config) do |sftp|
sftp.upload!(tempfile.path, file_path)
write_success += records.size
write_success = records_size
rescue StandardError => e
handle_exception("SFTP:RECORD:WRITE:EXCEPTION", "error", e)
write_failure += records.size
write_success = 0
end
end
tracking_message(write_success, write_failure)
rescue StandardError => e
handle_exception(
"SFTP:WRITE:EXCEPTION",
"error",
e
)
write_success
end

def clear_all_records(sync_config)
Expand All @@ -82,7 +113,13 @@ def clear_all_records(sync_config)
def generate_file_path(sync_config)
connection_specification = sync_config.destination.connection_specification.with_indifferent_access
timestamp = Time.now.strftime("%Y%m%d-%H%M%S")
file_name = "#{connection_specification[:file_name]}_#{timestamp}.csv"
format = connection_specification[:format]
extension = if format[:compression_type] == "un_compressed"
format[:format_type]
else
format[:compression_type]
end
file_name = "#{connection_specification[:file_name]}_#{timestamp}.#{extension}"
File.join(connection_specification[:destination_path], file_name)
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"documentation_url": "https://docs.multiwoven.com/integrations/destination/klaviyo",
"documentation_url": "https://docs.multiwoven.com/destinations/file-storage/sftp",
"stream_type": "static",
"connection_specification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "SFTP",
"required": ["host", "username", "password", "destination_path"],
"required": ["host", "username", "password", "destination_path", "format" ],
"properties": {
"host": {
"title": "Host",
Expand Down Expand Up @@ -45,7 +45,29 @@
"type": "string",
"description": "Name of the file to be written.",
"order": 5
},
"format": {
"title": "Output Format",
"type": "object",
"description": "Format of the data output.",
"order": 6,
"required": ["format_type"],
"properties": {
"format_type": {
"title": "File Format Type",
"type": "string",
"enum": ["csv"],
"default": "csv"
},
"compression_type": {
"title": "Compression Type",
"description": "Whether the output files should be compressed.",
"type": "string",
"enum": ["un_compressed", "zip"],
"default": "un_compressed"
}
}
}
}
}
}
}
2 changes: 2 additions & 0 deletions integrations/lib/multiwoven/integrations/protocol/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ module Types
LogLevel = Types::String.enum("fatal", "error", "warn", "info", "debug", "trace")
RequestRateLimitingUnit = Types::String.default("minute").enum("minute", "hour", "day")
SchemaMode = Types::String.enum("schema", "schemaless")
FileFormatType = Types::String.enum("csv")
CompressionType = Types::String.enum("un_compressed", "zip")

class ProtocolModel < Dry::Struct
extend Multiwoven::Integrations::Core::Utils
Expand Down
2 changes: 1 addition & 1 deletion integrations/lib/multiwoven/integrations/rollout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Multiwoven
module Integrations
VERSION = "0.1.63"
VERSION = "0.1.64"

ENABLED_SOURCES = %w[
Snowflake
Expand Down
1 change: 1 addition & 0 deletions integrations/multiwoven-integrations.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "restforce"
spec.add_runtime_dependency "ruby-limiter"
spec.add_runtime_dependency "ruby-odbc"
spec.add_runtime_dependency "rubyzip"
spec.add_runtime_dependency "sequel"
spec.add_runtime_dependency "slack-ruby-client"
spec.add_runtime_dependency "stripe"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
port: 22,
password: "test_password",
destination_path: "/multiwoven",
file_name: "test"
file_name: "test",
format: {
format_type: "csv",
compression_type: "un_compressed"
}
}.with_indifferent_access
end
let(:sync_config_json) do
Expand Down Expand Up @@ -58,6 +62,18 @@
end
let(:csv_content) { "id,name\n1,Test Record\n" }

def sync_config
Multiwoven::Integrations::Protocol::SyncConfig.from_json(
sync_config_json.to_json
)
end

def sync_config_compressed_zip
sync_config_json[:destination][:connection_specification][:format][:compression_type] = "zip"
Multiwoven::Integrations::Protocol::SyncConfig.from_json(
sync_config_json.to_json
)
end
describe "#check_connection" do
it "successfully checks connection" do
expect(client).to receive(:with_sftp_client).and_yield(double)
Expand Down Expand Up @@ -92,7 +108,7 @@
end

describe "#write" do
it "successfully writes records" do
it "successfully writes records with un_compressed" do
allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session)
allow(client).to receive(:generate_csv_content).and_return(csv_content)
allow(mock_sftp_session).to receive(:upload!).and_return(true)
Expand All @@ -101,7 +117,16 @@
expect(response.tracking.failed).to eq(0)
end

it "handles the failure and increments the failure count" do
it "successfully writes records with compressed" do
allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session)
allow(client).to receive(:generate_csv_content).and_return(csv_content)
allow(mock_sftp_session).to receive(:upload!).and_return(true)
response = client.write(sync_config_compressed_zip, records, "insert")
expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
end

it "handles the failure with un_compressed " do
allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session)
allow(mock_sftp_session).to receive(:upload!).and_raise(StandardError, "SFTP upload failed")
response = client.write(sync_config, records, "insert")
Expand All @@ -111,6 +136,15 @@
expect(response.tracking.success).to eq(0)
end

it "handles the failure with compressed " do
allow(client).to receive(:with_sftp_client).and_yield(mock_sftp_session)
allow(mock_sftp_session).to receive(:upload!).and_raise(StandardError, "SFTP upload failed")
response = client.write(sync_config_compressed_zip, records, "insert")

expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
end

it "handles write failure with_sftp_client" do
allow(client).to receive(:with_sftp_client).and_raise(StandardError.new("write failed"))
response = client.write(sync_config, records, "insert")
Expand Down Expand Up @@ -185,14 +219,14 @@
end

describe "#generate_file_path" do
it "generate file" do
expect(client.send(:generate_file_path, sync_config)).to include("/multiwoven/test_")
it "generate csv file" do
file_path = client.send(:generate_file_path, sync_config)
expect(file_path).to match(%r{/multiwoven/test_\d{8}-\d{6}\.csv\z})
end
end

def sync_config
Multiwoven::Integrations::Protocol::SyncConfig.from_json(
sync_config_json.to_json
)
it "generate zip file" do
file_path = client.send(:generate_file_path, sync_config_compressed_zip)
expect(file_path).to match(%r{/multiwoven/test_\d{8}-\d{6}\.zip\z})
end
end
end
23 changes: 23 additions & 0 deletions server/0001-feat-add-compression-support-for-sftp-103.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
From 7fac85d2241ad4e7e34ead928c24ecef29e6475c Mon Sep 17 00:00:00 2001
From: afthab vp <[email protected]>
Date: Fri, 3 May 2024 21:58:09 +0530
Subject: [PATCH] feat: add compression support for sftp (#103)

* feat: Add zip support in sftp write

* feat: add zip support

* fix: support both csv and zip

* fix: add spec

* fix: revert the server change

* fix: revert server changes

* fix: revert server batchquery

* fix: pr comments resolved
--
2.43.1

0 comments on commit 4776be6

Please sign in to comment.