Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/Multiwoven/multiwoven into …
Browse files Browse the repository at this point in the history
…cherry-pick-ce-commit-76afbbf89c39fe58d6ac260842bca7120d852d04
  • Loading branch information
pabss-ai2 committed Jul 8, 2024
2 parents 386d337 + e830d85 commit 95521db
Show file tree
Hide file tree
Showing 30 changed files with 337 additions and 95 deletions.
1 change: 1 addition & 0 deletions integrations/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x64-mingw-ucrt
x86_64-darwin-23

Expand Down
17 changes: 17 additions & 0 deletions integrations/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ Multiwoven integrations is the collection of connectors built on top of [Multiwo
Multiwoven protocol is an open source standard for moving data between data sources to any third-part destinations.
Anyone can build a connetor with basic ruby knowledge using the protocol.

## Prerequisites

Before you begin the installation, ensure you have the following dependencies installed:

- **MySQL Client**
- Command: `brew install mysql-client`
- Description: Required for database interactions.

- **Zstandard (zstd)**
- Command: `brew install zstd`
- Description: Needed for data compression and decompression.

- **OpenSSL 3**
- Command: `brew install openssl@3`
- Description: Essential for secure communication.


### Installation

Install the gem and add to the application's Gemfile by executing:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ def write(_sync_config, _records, _action = "destination_insert")
raise "Not implemented"
# return Protocol::TrackingMessage
end

def tracking_message(success, failure, log_message_array)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure, logs: log_message_array
).to_multiwoven_message
end
end
end
end
8 changes: 8 additions & 0 deletions integrations/lib/multiwoven/integrations/core/utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def report_exception(exception, meta = {})
reporter&.report(exception, meta)
end

def log_request_response(level, request, response)
Integrations::Protocol::LogMessage.new(
name: self.class.name,
level: level,
message: { request: request.to_s, response: response.to_s, level: level }.to_json
)
end

def create_log_message(context, type, exception)
Integrations::Protocol::LogMessage.new(
name: context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def write(sync_config, records, action = "destination_insert")
connection_config = sync_config.destination.connection_specification.with_indifferent_access
table_name = sync_config.stream.name
primary_key = sync_config.model.primary_key
log_message_array = []
db = create_connection(connection_config)

write_success = 0
Expand All @@ -55,8 +56,9 @@ def write(sync_config, records, action = "destination_insert")
query = Multiwoven::Integrations::Core::QueryBuilder.perform(action, table_name, record, primary_key)
logger.debug("POSTGRESQL:WRITE:QUERY query = #{query} sync_id = #{sync_config.sync_id} sync_run_id = #{sync_config.sync_run_id}")
begin
db.exec(query)
response = db.exec(query)
write_success += 1
log_message_array << log_request_response("info", query, response)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
Expand All @@ -65,9 +67,10 @@ def write(sync_config, records, action = "destination_insert")
sync_run_id: sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", query, e.message)
end
end
tracking_message(write_success, write_failure)
tracking_message(write_success, write_failure, log_message_array)
rescue StandardError => e
handle_exception(e, {
context: "POSTGRESQL:RECORD:WRITE:EXCEPTION",
Expand Down Expand Up @@ -119,12 +122,6 @@ def group_by_table(records)
}
end
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ def process_records(records, stream)
write_success = 0
write_failure = 0
properties = stream.json_schema[:properties]
log_message_array = []

records.each do |record_object|
record = extract_data(record_object, properties)
process_record(stream, record)
args = [stream.name, "Id", record]
response = send_data_to_salesforce(args)
write_success += 1
log_message_array << log_request_response("info", args, response)
rescue StandardError => e
# TODO: add sync_id and sync run id to the logs
handle_exception(e, {
Expand All @@ -93,18 +97,14 @@ def process_records(records, stream)
sync_run_id: @sync_config.sync_run_id
})
write_failure += 1
log_message_array << log_request_response("error", args, e.message)
end
tracking_message(write_success, write_failure)
end

def process_record(stream, record)
send_data_to_salesforce(stream.name, record)
tracking_message(write_success, write_failure, log_message_array)
end

def send_data_to_salesforce(stream_name, record = {})
def send_data_to_salesforce(args)
method_name = "upsert!"
args = [stream_name, "Id", record]
@logger.debug("sync_id: #{@sync_config.sync_id}, sync_run_id: #{@sync_config.sync_run_id}, record: #{record}")
@logger.debug("sync_id: #{@sync_config.sync_id}, sync_run_id: #{@sync_config.sync_run_id}, args: #{args}")
@client.send(method_name, *args)
end

Expand All @@ -124,12 +124,6 @@ def load_catalog
read_json(CATALOG_SPEC_PATH)
end

def tracking_message(success, failure)
Multiwoven::Integrations::Protocol::TrackingMessage.new(
success: success, failed: failure
).to_multiwoven_message
end

def log_debug(message)
Multiwoven::Integrations::Service.logger.debug(message)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ class TrackingMessage < ProtocolModel
attribute :success, Types::Integer.default(0)
attribute :failed, Types::Integer.default(0)
attribute? :meta, Types::Hash
attribute? :logs, Types::Array.of(LogMessage)

def to_multiwoven_message
MultiwovenMessage.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,38 @@ module Integrations::Core
expect { connector.write(sync_config, records) }.to raise_error("Not implemented")
end
end

describe "#tracking_message" do
let(:log_message_data) do
Multiwoven::Integrations::Protocol::LogMessage.new(
name: self.class.name,
level: "info",
message: { request: "Sample req", response: "Sample req", level: "info" }.to_json
)
end
it "returns a MultiwovenMessage with tracking information" do
connector = described_class.new
success = 2
failure = 1

multiwoven_message = connector.tracking_message(success, failure, [log_message_data])

expect(multiwoven_message).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
expect(multiwoven_message.type).to eq("tracking")

tracking_message = multiwoven_message.tracking
expect(tracking_message).to be_a(Multiwoven::Integrations::Protocol::TrackingMessage)
expect(tracking_message.success).to eq(success)
expect(tracking_message.failed).to eq(failure)

logs = tracking_message.logs
expect(logs).to be_an(Array)
expect(logs.size).to eq(1)
expect(logs.first).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(logs.first.level).to eq("info")
expect(logs.first.message).to eq("{\"request\":\"Sample req\",\"response\":\"Sample req\",\"level\":\"info\"}")
end
end
end
end
end
18 changes: 18 additions & 0 deletions integrations/spec/multiwoven/integrations/core/utils_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,24 @@ module Core
expect(dummy_class.hash_to_string(hash)).to eq("key1 = 1, key2 = 2.5, key3 = true, key4 = ")
end
end

describe "#log_request_response" do
let(:level) { "info" }
let(:request) { { user_id: 1, action: "create" } }
let(:response) { { status: "success" } }

it "creates a LogMessage object with correct attributes" do
log_message = dummy_class.log_request_response(level, request, response)

expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eq(level)

parsed_message = JSON.parse(log_message.message)
expect(parsed_message["request"]).to eq(request.to_s)
expect(parsed_message["response"]).to eq(response.to_s)
expect(parsed_message["level"]).to eq(level)
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@

tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking
expect(tracking.success).to eql(1)
log_message = tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end

it "write records successfully on update record action destination_update" do
Expand All @@ -125,6 +131,13 @@

tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)], "destination_update").tracking
expect(tracking.success).to eql(1)
expect(tracking.logs.count).to eql(1)
log_message = tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

Expand All @@ -139,6 +152,12 @@

tracking = subject.write(s_config, [records.first.data.transform_keys(&:to_s)]).tracking
expect(tracking.failed).to eql(1)
expect(tracking.logs.count).to eql(1)
log_message = tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")
expect(log_message.message).to include("request")
expect(log_message.message).to include("\"response\":\"test error\"")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@
response = client.write(sync_config, records)
expect(response.tracking.success).to eq(records.size)
expect(response.tracking.failed).to eq(0)
expect(response.tracking.logs.count).to eql(2)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("info")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end

Expand All @@ -207,6 +214,13 @@
response = client.write(sync_config, records)
expect(response.tracking.failed).to eq(records.size)
expect(response.tracking.success).to eq(0)
expect(response.tracking.logs.count).to eql(2)
log_message = response.tracking.logs.first
expect(log_message).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(log_message.level).to eql("error")

expect(log_message.message).to include("request")
expect(log_message.message).to include("response")
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,26 @@ module Integrations::Protocol

RSpec.describe Multiwoven::Integrations::Protocol::TrackingMessage do
describe "#to_multiwoven_message" do
let(:log_message_data) do
Multiwoven::Integrations::Protocol::LogMessage.new(
name: self.class.name,
level: "info",
message: { request: "Sample req", response: "Sample req", level: "info" }.to_json
)
end
let(:tracking_message) do
Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 3, failed: 1)
Multiwoven::Integrations::Protocol::TrackingMessage.new(success: 3, failed: 1, logs: [log_message_data])
end

it "converts to a MultiwovenMessage" do
multiwoven_message = tracking_message.to_multiwoven_message

expect(multiwoven_message).to be_a(Multiwoven::Integrations::Protocol::MultiwovenMessage)
expect(multiwoven_message.type).to eq("tracking")
expect(multiwoven_message.tracking).to eq(tracking_message)
expect(multiwoven_message.tracking.logs.first).to be_a(Multiwoven::Integrations::Protocol::LogMessage)
expect(multiwoven_message.tracking.logs.first.level).to eq("info")
expect(multiwoven_message.tracking.logs.first.message)
.to eq("{\"request\":\"Sample req\",\"response\":\"Sample req\",\"level\":\"info\"}")
end
end
end
Expand Down
25 changes: 15 additions & 10 deletions release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@

All notable changes to this project will be documented in this file.

## [0.13.0] - 2024-06-24
## [0.14.0] - 2024-07-01

### 🚀 Features

- Added Iterable Connector
- *(CE)* Workspace settings and useQueryWrapper (#193)
- *(CE)* Add amazon s3 source connector
- *(CE)* Separate code climate reports and badges (#203)
- *(CE)* Add mariaDB source connector (#208)
- *(CE)* Lock user login attempts (#182)
- *(CE)* Sync records error log (#211)
- *(CE)* Add table selector as model query type (#243) (#209)
- *(CE)* Force refresh catalog when refresh flag is set true (#248) (#217)
- *(CE)* Add table selector as model query type (#234) (#213)

### 🐛 Bug Fixes

- *(CE)* EE to CE Commit Sync PAT issue
- *(CE)* Github URL issues
- *(CE)* Change Github PAT to SSH PRIVATE KEY (#196)
- *(CE)* UI Maintainability and workspace id on page refresh (#228) (#200)
- *(CE)* CE sync commit not working for multiple commits (#201)
- *(CE)* Skip verify_authorized in logout (#204)
- *(CE)* Json error field added in sync record (#205)
- *(CE)* Add mariadb-dev in DockerFile (#235)
- *(CE)* Signup error response (#214)

### ⚙️ Miscellaneous Tasks

- *(CE)* Add GitHub actions for syncing previous days CE commits
- *(CE)* Update the role policies (#206)
- *(CE)* Update server gem (#227)
- *(CE)* Pundit policy at role permissions level (#210)

<!-- generated by git-cliff -->
2 changes: 1 addition & 1 deletion server/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ gem "interactor", "~> 3.0"

gem "ruby-odbc", git: "https://github.com/Multiwoven/ruby-odbc.git"

gem "multiwoven-integrations", "~> 0.3.4"
gem "multiwoven-integrations", "~> 0.3.5"

gem "temporal-ruby", github: "coinbase/temporal-ruby"

Expand Down
Loading

0 comments on commit 95521db

Please sign in to comment.