Skip to content

Commit

Permalink
Move import_csv to it's own class
Browse files Browse the repository at this point in the history
  • Loading branch information
forsbergplustwo committed Aug 23, 2023
1 parent 5b223ee commit 6c3a107
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 116 deletions.
117 changes: 3 additions & 114 deletions app/models/payment_history.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# 2. Split the CSV import into separate class
# 3. Split the Partner API import into separate class

require "zip"
require "graphql/client"
require "graphql/client/http"

Expand All @@ -12,6 +11,8 @@ class PaymentHistory < ActiveRecord::Base

THROTTLE_MIN_TIME_PER_CALL = 0.3

YEARS_TO_IMPORT = 4.years.freeze

USAGE_CHARGE_TYPES = [
"App sale – usage",
"Usage application fee",
Expand Down Expand Up @@ -152,119 +153,7 @@ class PaymentHistory < ActiveRecord::Base

class << self
def default_start_date
4.years.ago.to_date
end

def import_csv(current_user, last_calculated_metric_date, filename)
temp = Tempfile.new("import")
s3 = Aws::S3::Client.new
file = s3.get_object(
{bucket: "partner-metrics",
key: filename}, target: temp.path
)
if filename.include?(".zip")
Zip.on_exists_proc = true
Zip.continue_on_exists_proc = true
Zip::File.open(temp.path) do |zip_file|
# Handle entries one by one
zip_file.each do |entry|
file = Tempfile.new("extracted")
# Extract to file/directory/symlink
Rails.logger.info("Extracting #{entry.name}")
entry.extract(file)
end
end
else
file = temp
end
current_user.payment_histories.where("payment_date > ?", last_calculated_metric_date).delete_all
options = {
converters: :all,
header_converters: :symbol
}
chunk_count = 0
chunk_payments = []
CsvHashReader.foreach(file, options) do |csv|
next if csv[:charge_creation_time].blank? || csv[:partner_share].to_f == 0.0 || Date.parse(csv[:charge_creation_time]) < last_calculated_metric_date
chunk_payments << process_csv_row(csv, current_user)
chunk_count += 1
if chunk_count % 3000 == 0
save_chunk(chunk_payments, current_user)
chunk_payments = []
current_user.update(import: "Importing (#{chunk_count} rows processed)", import_status: 100)
GC.start
end
end
save_chunk(chunk_payments, current_user)
temp.close
file.close
Rails.logger.info("Total chunks: #{chunk_count}")
rescue => e
current_user.update(import: "Failed", import_status: 100, partner_api_errors: "Error: #{e.message}")
raise e
end

def process_csv_row(csv, current_user)
record = current_user.payment_histories.new(
shop: csv[:shop],
shop_country: csv[:shop_country],
payment_date: csv[:charge_creation_time],
app_title: csv[:app_title].presence || "Unknown",
revenue: csv[:partner_share]
)
record[:charge_type] =
case csv[:charge_type]
when "RecurringApplicationFee",
"Recurring application fee",
"App sale – recurring",
"App sale – subscription",
"App sale – 30-day subscription",
"App sale – yearly subscription"
"recurring_revenue"
when "OneTimeApplicationFee",
"Usage application fee",
"ThemePurchaseFee",
"One time application fee",
"Theme purchase fee",
"App sale – one-time",
"App sale – usage",
"Service sale"
# STUPID: For my apps, I want Usage charges counted as "recurring" and not "one_time", others's don't
if USAGE_CHARGE_TYPES.include?(csv[:charge_type]) && current_user.count_usage_charges_as_recurring == true
"recurring_revenue"
else
"onetime_revenue"
end
when "AffiliateFee",
"Affiliate fee",
"Development store referral commission",
"Affiliate referral commission",
"Shopify Plus referral commission"
"affiliate_revenue"
when "Manual",
"ApplicationDowngradeAdjustment",
"ApplicationCredit",
"AffiliateFeeRefundAdjustment",
"Application credit",
"Application downgrade adjustment",
"Application fee refund adjustment",
"App credit",
"App refund",
"App credit refund",
"Development store commission adjustment",
"Payout correction",
"App downgrade",
"Service refund"
"refund"
else
csv[:charge_type]
end
record
end

def save_chunk(chunk_payments, current_user)
PaymentHistory.import(chunk_payments, validate: false, no_returning: true) if chunk_payments.present?
true
YEARS_TO_IMPORT.ago.to_date
end

def import_partner_api(current_user, last_calculated_metric_date)
Expand Down
184 changes: 184 additions & 0 deletions app/models/payment_history/csv_importer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
require "zip"

class PaymentHistory::CsvImporter
attr_accessor :user, :calculate_from_date, :temp_files

CSV_READER_OPTIONS = {
converters: :all,
header_converters: :symbol
}.freeze

SAVE_EVERY_N_ROWS = 1000

UNKNOWN_APP_TITLE = "Unknown".freeze

CSV_REVENUE_TYPES = {
"recurring_revenue" => [
"RecurringApplicationFee",
"Recurring application fee",
"App sale – recurring",
"App sale – subscription",
"App sale – 30-day subscription",
"App sale – yearly subscription"
],
"onetime_revenue" => [
"OneTimeApplicationFee",
"ThemePurchaseFee",
"One time application fee",
"Theme purchase fee",
"App sale – one-time",
"App sale – usage",
"Service sale"
],
"affiliate_revenue" => [
"AffiliateFee",
"Affiliate fee",
"Development store referral commission",
"Affiliate referral commission",
"Shopify Plus referral commission"
],
"refund" => [
"Manual",
"ApplicationDowngradeAdjustment",
"ApplicationCredit",
"AffiliateFeeRefundAdjustment",
"Application credit",
"Application downgrade adjustment",
"Application fee refund adjustment",
"App credit",
"App refund",
"App credit refund",
"Development store commission adjustment",
"Payout correction",
"App downgrade",
"Service refund"
],
"usage_revenue" => [
"App sale – usage",
"Usage application fee",
"AppUsageSale"
]
}.freeze

def initialize(user:, filename:)
@user = user
@calculate_from_date = user.calculate_from_date
@temp_files = {}
@temp_files[:csv] = prepare_csv_file(filename)
@rows_processed_count = 0
@batch_of_payments = []
end

def import!
clear_old_payments
import_new_payments
rescue => e
handle_import_error(e)
ensure
close_and_unlink_temp_files
end

private

def clear_old_payments
user.payment_histories.where("payment_date > ?", calculate_from_date).delete_all
end

def import_new_payments
# Loops through CSV file, saving in chunks of N rows
CsvHashReader.foreach(@temp_files[:csv], CSV_READER_OPTIONS) do |csv_row|
next if irrelevant_row?(csv_row)
break if row_too_old?(csv_row)

@batch_of_payments << new_payment(csv_row)

@rows_processed_count += 1
if @rows_processed_count % SAVE_EVERY_N_ROWS == 0
save_chunk(@batch_of_payments)
user.update(import: "Importing (#{@rows_processed_count} rows processed)", import_status: 100)
end
end
# Save any remaining rows
save_chunk(@batch_of_payments)
true
end

def new_payment(csv_row)
user.payment_histories.new(
app_title: csv_row[:app_title].presence || UNKNOWN_APP_TITLE,
charge_type: calculate_charge_type(csv_row),
shop: csv_row[:shop],
shop_country: csv_row[:shop_country],
payment_date: csv_row[:charge_creation_time],
revenue: csv_row[:partner_share].to_f
)
end

def save_chunk(payments)
# Uses "activerecord-import", which is much faster than saving each row individually
PaymentHistory.import(payments, validate: false, no_returning: true) if payments.present?
@batch_of_payments = []
end

def irrelevant_row?(csv_row)
csv_row[:charge_creation_time].blank? || csv_row[:partner_share].to_f == 0.0
end

def row_too_old?(csv_row)
csv_row[:charge_creation_time] < calculate_from_date.to_s
end

def calculate_charge_type(csv_row)
charge_type = CSV_REVENUE_TYPES.find { |_key, value| value.include?(csv_row[:charge_type]) }&.first
if charge_type == "usage_revenue"
charge_type = user.count_usage_charges_as_recurring ? "recurring_revenue" : "onetime_revenue"
end
charge_type
end

def prepare_csv_file(filename)
file = fetch_from_s3(filename)
if zipped?(filename)
extract_zip_file(file)
else
file
end
end

def zipped?(filename)
filename.include?(".zip")
end

def fetch_from_s3(filename)
temp_files[:s3_download] = Tempfile.new("s3_download")
s3 = Aws::S3::Client.new
s3.get_object({bucket: "partner-metrics", key: filename}, target: temp_files[:s3_download].path)
temp_files[:s3_download]
end

def extract_zip_file(zipped_file)
temp_files[:unzipped] = Tempfile.new("unzipped")
Zip.on_exists_proc = true
Zip.continue_on_exists_proc = true
Zip::File.open(zipped_file.path) do |zip_file|
zip_file.each do |entry|
entry.extract(temp_files[:unzipped])
end
end
temp_files[:unzipped]
end

def handle_import_error(e)
# TODO: Create a generic import status class
user.update(import: "Failed", import_status: 100, partner_api_errors: "Error: #{e.message}")
# Resque swallows errors, so we need to log them here
Rails.logger.error("Error importing CSV: #{e.message}")
Rails.logger.error(e.backtrace.join("\n"))
raise e
end

def close_and_unlink_temp_files
temp_files.each_value(&:close)
temp_files.each_value(&:unlink)
end
end
4 changes: 4 additions & 0 deletions app/models/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def oldest_metric_date
metrics.minimum("metric_date")
end

def calculate_from_date
newest_metric_date || PaymentHistory.default_start_date
end

# TODO: DRY the following methods up

def yearly_revenue_per_product(date:, charge_type: nil)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO: Convert to ActiveJob -> Sidekiq

class ImportMetricsWorker
@queue = :import_queue

Expand Down
3 changes: 1 addition & 2 deletions app/workers/import_worker.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# TODO: Convert to ActiveJob -> Sidekiq
# Split into two workers: ImportCsvJob and ImportPartnerApiJob

class ImportWorker
@queue = :import_queue
Expand All @@ -11,7 +10,7 @@ def self.perform(current_user_id, filename = nil)
last_calculated_metric = current_user.newest_metric_date || PaymentHistory.default_start_date

if !filename.nil?
PaymentHistory.import_csv(current_user, last_calculated_metric, filename)
PaymentHistory::CsvImporter.new(user: current_user, filename: filename).import!
else
PaymentHistory.import_partner_api(current_user, last_calculated_metric)
end
Expand Down

0 comments on commit 6c3a107

Please sign in to comment.