Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize extract from marketo #88

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ This tap:
> pip install tap-marketo
```

2. Get your Endpoint, Identity, Client ID and Client Secret
2. Get your Endpoint, Client ID and Client Secret

**Endpoint and Identity**
**Endpoint**

The base URL contains the account id (a.k.a. Munchkin id) and is therefore unique for each Marketo subscription.
Your base URL is found by logging into Marketo and navigating to the Admin > Integration > Web Services menu.
It is labled as “Endpoint:” underneath the “REST API” section as shown in the following screenshots.

Identity is found directly below the endpoint entry.

http://developers.marketo.com/rest-api/base-url/

**Client ID and Secret**
Expand All @@ -40,13 +38,32 @@ This tap:

3. Create the config file

Create a JSON file called `config.json` containing the Endpoint, Identity, Client ID and Client Secret.
Create a JSON file called `config.json` containing the Endpoint, Start Date, Client ID and Client Secret (and optionally Attribution Window)

**Start Date**

Determines how much historical data will be extracted. Please be aware that the larger the time period and amount of data, the longer the initial extraction can be expected to take.

**Attribution Window**

[Optional] Attribution window is used to set an earlier export_start
for incremental replication of of the **leads** stream. This allows the tap to _catch_
leads that may have been missed in the prior export.

`attribution_window` may be specified by a combination of days, hours and minutes. this parameter is quite useful in a moderate frequency incremental bulk extracts (e.g. once an hour) to allow users a way to avoid extracting all leads updated 1 day prior (i.e. default attribution window)
examples of valid attribution_windows:
* 1 day
* 12:00:00
* 01:30:00
* 1 day 06:55:00

attribution_window defaults to 1 Day if not specified.
```json
{"endpoint": "your-endpoint",
"identity": "your-identity",
"start_date": "earliest-date-to-sync",
"client_id": "your-client_id",
"client_secret": "your-client-secret"}
"client_secret": "your-client-secret",
"attribution_window": "buffer-time-subtracted-from-updatedAt-for-leads-stream"}
```

4. [Optional] Create the initial state file
Expand All @@ -70,7 +87,15 @@ This tap:
tap-marketo --config config.json [--state state.json]
```

## Config Parameters

| parameter name | description | examples | required or optional |
| ------ | ------ | ------ | ------ |
| endpoint | Base URL for the rest api, specific to your Marketo account (as in 123-ABC-123 would be your marketo account id in the example to the right) | https://123-ABC-123.mktorest.com/rest | required |
| start_date | the earliest date to use as filter for the replication key during the initial sync or a full refresh | 2020-01-01T00:00:00Z | required |
| client_id | The client id used to authenticate with the marketo rest api, generated through their web UI by your marketo admin (random dash separated alpha-numeric string) | a134dakfj-kldjk-39487fh3-ad834bi30 (note: actual length may differ) | required |
| client_secret | The client secret used to authenticate with the marketo rest api generated through their web UI by your marketo admin (random alpha-numeric string) | akdj498abalj314klja934 (note: actual length may differ) | required |
| attribution_window | a string specifying a duration of time (combination of days, hours & minutes) to subtract from the latest `updatedAt` value for the leads stream stored in the state | 1 day, 12:00:00, 01:30:00, 1 day 06:55:00, 00:20:00 | optional |
---

Copyright © 2017 Stitch
54 changes: 51 additions & 3 deletions tap_marketo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

# Marketo Docs are located at http://developers.marketo.com/rest-api/

from datetime import datetime, timedelta
import itertools
import re

import pendulum
import singer
from singer import bookmarks

from tap_marketo.client import Client
from tap_marketo.discover import discover
from tap_marketo.sync import sync, determine_replication_key
import tap_marketo.sync as sync_
from singer.bookmarks import (
get_bookmark,
write_bookmark,
get_currently_syncing,
set_currently_syncing,
)
from singer.catalog import Catalog

REQUIRED_CONFIG_KEYS = [
"start_date",
Expand All @@ -32,7 +37,47 @@
]


ATTRIBUTION_WINDOW_README = """
`attribution_window` may be specified by a combination of days, hours and minutes seconds. This parameter is
quite useful in a moderate frequency incremental bulk extracts (e.g. once an hour)
to allow users a way to avoid extracting all leads updated 1 day prior (i.e. default attribution window)
examples of valid attribution_windows: `1 day`, `1 days`, `2 day`, `10 days`, `10:00:00`, `1 day 05:00:00`
"""

def parse_attribution_window(attribution_window_string):
f"""
Parse optional config parameter `attribution_window`.
Attribution window is used to set an earlier export_start
for incremental replication of of the leads stream.

{ATTRIBUTION_WINDOW_README}
"""
errstr = f"`{attribution_window_string}` is not a valid attribution window."
pat = '^((?P<day>^\d+)\s+days?)?(\s+)?(?P<time>(\d{2}:\d{2}:\d{2}))?$'
match = re.match(pat, attribution_window_string)
if not match:
raise ValueError(errstr)
groups = match.groupdict()
delta_day = groups["day"] or '0'
delta_time = groups["time"] or '00:00:00'
try:
parsed_time = datetime.strptime(delta_time, '%H:%M:%S')
return timedelta(
days=int(delta_day) if delta_day else 0,
hours=parsed_time.hour,
minutes=parsed_time.minute,
seconds=parsed_time.second
)
except ValueError as e:
raise ValueError(errstr)




def validate_state(config, catalog, state):
if isinstance(catalog, Catalog):
catalog = catalog.to_dict()

for stream in catalog["streams"]:
for mdata in stream['metadata']:
if mdata['breadcrumb'] == [] and mdata['metadata'].get('selected') != True:
Expand All @@ -42,7 +87,7 @@ def validate_state(config, catalog, state):
set_currently_syncing(state, None)
break

replication_key = determine_replication_key(stream['tap_stream_id'])
replication_key = sync_.determine_replication_key(stream['tap_stream_id'])
if not replication_key:
continue

Expand All @@ -61,12 +106,15 @@ def validate_state(config, catalog, state):
return state

def _main(config, properties, state, discover_mode=False):
if 'attribution_window' in config:
config['attribution_window'] = parse_attribution_window(config['attribution_window'])

client = Client(**config)
if discover_mode:
discover(client)
elif properties:
state = validate_state(config, properties, state)
sync(client, properties, config, state)
sync_.sync(client, properties, config, state)


def main():
Expand Down
30 changes: 20 additions & 10 deletions tap_marketo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pendulum
import requests
import singer

import json

# By default, jobs will run for 3 hours and be polled every 5 minutes.
JOB_TIMEOUT = 60 * 180
Expand Down Expand Up @@ -244,7 +244,7 @@ def create_export(self, stream_type, fields, query):

endpoint = self.get_bulk_endpoint(stream_type, "create")
endpoint_name = "{}_create".format(stream_type)
singer.log_info('Scheduling export job with query %s', query)
singer.log_info('Scheduling export job for %s with query %s', stream_type, json.dumps(query))
data = self.request("POST", endpoint, endpoint_name=endpoint_name, json=payload)
return data["result"][0]["exportId"]

Expand Down Expand Up @@ -298,10 +298,6 @@ def get_export_status(self, stream_type, export_id):
endpoint_name = "{}_poll".format(stream_type)
return self.request("GET", endpoint, endpoint_name=endpoint_name)

def poll_export(self, stream_type, export_id):
# http://developers.marketo.com/rest-api/bulk-extract/#polling_job_status
return self.get_export_status(stream_type, export_id)["result"][0]["status"]

def stream_export(self, stream_type, export_id):
# http://developers.marketo.com/rest-api/bulk-extract/#retrieving_your_data
endpoint = self.get_bulk_endpoint(stream_type, "file", export_id)
Expand All @@ -313,7 +309,24 @@ def wait_for_export(self, stream_type, export_id):
# exceeds the job timeout time.
timeout_time = pendulum.utcnow().add(seconds=self.job_timeout)
while pendulum.utcnow() < timeout_time:
status = self.poll_export(stream_type, export_id)
export_status = self.get_export_status(stream_type, export_id)["result"][0]
status = export_status["status"]

if status == "Completed":
size_mb = export_status["fileSize"] / (1024 * 1024)
rows = export_status["numberOfRecords"]
wait_sec = pendulum.parse(export_status["startedAt"]) - pendulum.parse(export_status["queuedAt"])
prep_sec = pendulum.parse(export_status["finishedAt"]) - pendulum.parse(export_status["startedAt"])
summary = {
"file_size": "{size_mb:.2f} MB".format(size_mb=size_mb),
"num_records": rows,
"export_prep_time_sec": round(prep_sec.total_seconds()),
"export_wait_time_sec": round(wait_sec.total_seconds())
}
msg = 'export %s status is %s with statistics %s'
singer.log_info(msg, export_id, status, json.dumps(summary))
return True

singer.log_info("export %s status is %s", export_id, status)

if status == "Created":
Expand All @@ -325,9 +338,6 @@ def wait_for_export(self, stream_type, export_id):
# Cancelled and failed exports fail the current sync.
raise ExportFailed(status)

elif status == "Completed":
return True

time.sleep(self.poll_interval)

raise ExportFailed("Export timed out after {} minutes".format(self.job_timeout / 60))
Expand Down
Loading