Skip to content

Commit

Permalink
feat: Add a --merge-state flag to meltano run to merge the curren…
Browse files Browse the repository at this point in the history
…t pipeline state with that of the latest run (meltano#8258)

* Removed --verbose from CLI

* Removed Verbose From CLI

* Resolved Build Error

* feat: Added a flag that tells Meltano whether to merge or overwrite state at the end of a run

* Remove global flag

* Update tests/meltano/core/runner/test_runner.py

* [pre-commit.ci] auto fixes from pre-commit.ci hooks

for more information, see https://pre-commit.ci

* Update docs

* Add option to `run`

* Add integration test

* Add test case to matrix

* Fix shebang

* Use --environment

* Link from docs to example

* Add state output to md example

* Improve wording a bit

* docs: Add a note explaining that setting a value with `--from-file` for objects and arrays expects valid JSON contents

Also fixed some typos.

---------

Co-authored-by: Rohan Arora <[email protected]>
Co-authored-by: Rohan Arora <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 5, 2023
1 parent f2358d3 commit 8d86e65
Show file tree
Hide file tree
Showing 18 changed files with 426 additions and 22 deletions.
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
- { integration_test: "meltano-config", needs_postgres: false}
- { integration_test: "meltano-annotations", needs_postgres: false}
- { integration_test: "meltano-manifest", needs_postgres: false}
- { integration_test: "meltano-run-merge-states", needs_postgres: false}
- { integration_test: "meltano-expand-envvars-in-array", needs_postgres: false}
fail-fast: false

Expand Down
20 changes: 15 additions & 5 deletions docs/docs/reference/command-line-interface.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,16 @@ meltano add extractor this-will-be-ignored --from-ref tap-spotify--matatika.yml
# The above also applies to the plugin variant, if provided
meltano add extractor this-will-be-ignored --variant this-will-also-be-ignored --from-ref tap-spotify--matatika.yml

# Once added, the custom plugin defintion can be updated by removing the plugin
# Once added, the custom plugin definition can be updated by removing the plugin
# and re-adding it with the same `meltano add --from-ref` command
meltano remove extractor tap-spotify
meltano add extractor tap-spotify --from-ref tap-spotify--matatika.yml
```

Using `--from-ref` allows you to add a plugin before it is avilable on [Meltano Hub](https://hub.meltano.com/), such as during development or testing of a plugin. It can also be used to try out plugins that have their [definition](/concepts/project#custom-plugin-definitions) published an accessible at a public URL, external to the Hub.
Using `--from-ref` allows you to add a plugin before it is available on [Meltano Hub](https://hub.meltano.com/), such as during development or testing of a plugin. It can also be used to try out plugins that have their [definition](/concepts/project#custom-plugin-definitions) published an accessible at a public URL, external to the Hub.

:::note
Meltano will throw an error if the referenced plugin definiton is invalid or missing any required properties - see the [Meltano Hub plugin definition syntax](/reference/plugin-definition-syntax) for more information.
Meltano will throw an error if the referenced plugin definition is invalid or missing any required properties - see the [Meltano Hub plugin definition syntax](/reference/plugin-definition-syntax) for more information.
:::

By default, `meltano add` will attempt to install the plugin after adding it. Use `--no-install` to skip this behavior:
Expand Down Expand Up @@ -454,6 +454,10 @@ meltano config <plugin> set <name> --from-file ./file.txt
uuidgen | meltano config <plugin> set <name> --from-file -
```

:::info
<p>When setting a config value for an <code>object</code> or <code>array</code> setting, the file contents must be valid JSON.</p>
:::

## `docs`

Open the Meltano documentation site in the default browser.
Expand Down Expand Up @@ -495,6 +499,8 @@ meltano el <extractor> <loader> [--state-id TEXT]
- One or more `--select <entity>` options can be passed to only extract records for matching [selected entities](#select).
Similarly, `--exclude <entity>` can be used to extract records for all selected entities _except_ for those specified.

- A `--merge-state` flag can be passed to merge state with that of previous runs.

Notes:

- The entities that are currently selected for extraction can be discovered using [`meltano select --list <extractor>`](#select).
Expand Down Expand Up @@ -915,7 +921,7 @@ Run a set of command blocks in series.
Command blocks are specified as a list of plugin names, e.g. `meltano run some_tap some_mapping some_target some_plugin:some_cmd` and
are run in the order they are specified from left to right. A failure in any block will cause the entire run to abort.

Multiple commmand blocks can be chained together or repeated, and extractor/loader pairs will automatically be linked to perform EL work.
Multiple command blocks can be chained together or repeated, and extractor/loader pairs will automatically be linked to perform EL work.
If you have an active environment defined, a State ID is autogenerated for each extractor/loader pair and used to store and look up the [incremental replication state](/guide/integration#incremental-replication-state) in the [system database](/guide/production#storing-metadata).
This allows subsequent runs with the same extractor and loader combinations to start where the previous run ended.
The format of the generated id's is `<environment_name>:<tap_name>-to-<target_name>(:<state_id_suffix)`.
Expand Down Expand Up @@ -949,6 +955,7 @@ meltano run --state-id-suffix=<STATE_ID_SUFFIX> tap-gitlab target-postgres
- `--full-refresh` will force a full refresh and ignore the prior state. The new state after completion will still be updated with the execution results, unless `--no-state-update` is also specified.
- `--force` will force a job run even if a conflicting job with the same generated ID is in progress.
- `--state-id-suffix` define a custom suffix to generate a state ID with for each EL pair.
- `--merge-state` will merge state with that of previous runs. See the [example in the Meltano repository](https://github.com/meltano/meltano/blob/main/integration/example-library/meltano-run-merge-states/index.md).

Examples:

Expand All @@ -967,6 +974,9 @@ meltano --environment=dev run --force tap-gitlab target-postgres tap-salesforce
# run a pipeline with a custom state ID suffix
# the autogenerated ID for the EL pair will be 'dev:tap-gitlab-to-target-postgres:pipeline-alias'
meltano --environment=dev --state-id-suffix pipeline-alias run tap-gitlab hide-secrets target-postgres

# run a pipeline, merging state with that of previous runs.
meltano --environment=dev run --merge-state tap-gitlab target-postgres
```

### Using `run` with Environments
Expand Down Expand Up @@ -1437,7 +1447,7 @@ meltano state copy <src_state_id> <dst_state_id>
#### Examples
```bash
# Use prod state to update dev environemnt
# Use prod state to update dev environment
meltano state copy prod:tap-gitlab-to-target-jsonl dev:tap-gitlab-to-target-jsonl
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/venv
/.meltano
.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: 1
default_environment: dev
project_id: a3e6d53c-8ccc-4cac-a89c-08b70120f243
environments:
- name: dev
- name: staging
- name: prod
send_anonymous_usage_stats: false
plugins:
extractors:
- name: tap-with-state
namespace: tap_with_state
variant: custom
executable: ./tap.py
capabilities:
- discover
- catalog
- state
settings:
- name: ts
kind: date_iso8601
description: Dummy timestamp
select: ["*.*"]
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
89 changes: 89 additions & 0 deletions integration/example-library/meltano-run-merge-states/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Get setup

This example shows how state from sequential invocations of `meltano run` can be merged together to create a state object that combines bookmarks from streams synced in different runs.

This is useful for when you want to backfill/refresh a single stream, without losing the state of other streams.

```shell
meltano install
```

## Without state merging (default)

### Extract all streams

```shell
TAP_WITH_STATE_TS='2023-01-01T00:00:00Z' \
meltano run tap-with-state target-jsonl --state-id-suffix=no-merge
```

### Extract a single stream

Run a 'full refresh' pipeline of a single stream.

```shell
TAP_WITH_STATE_TS='2023-01-01T01:00:00Z' \
TAP_WITH_STATE__SELECT_FILTER='["stream_1"]' \
meltano run tap-with-state target-jsonl --full-refresh --state-id-suffix=no-merge
```

Note that the state will only contain the bookmark for `stream_1`.

```shell
meltano --environment=dev state get dev:tap-with-state-to-target-jsonl:no-merge
```

```json
{
"singer_state": {
"bookmarks": {
"stream_1": {
"created_at": "2023-01-01T01:00:00Z"
}
}
}
}
```

## With state merging

### Extract all streams

```shell
TAP_WITH_STATE_TS='2023-01-01T00:00:00Z' \
meltano run tap-with-state target-jsonl --state-id-suffix=merge
```

### Filter a single stream, merging states

Run a 'full refresh' pipeline of a single stream, but merge the current pipelines state with the latest stored state.

```shell
TAP_WITH_STATE_TS='2023-01-01T01:00:00Z' \
TAP_WITH_STATE__SELECT_FILTER='["stream_1"]' \
meltano run tap-with-state target-jsonl --full-refresh --state-id-suffix=merge --merge-state
```

Note that the state will now contain both the new bookmark for `stream_1` and the old bookmarks for the other streams.

```shell
meltano --environment=dev state get dev:tap-with-state-to-target-jsonl:merge
```

```json
{
"singer_state": {
"bookmarks": {
"stream_1": {
"created_at": "2023-01-01T01:00:00Z"
},
"stream_2": {
"created_at": "2023-01-01T00:00:00Z"
},
"stream_3": {
"created_at": "2023-01-01T00:00:00Z"
}
}
}
}
```
27 changes: 27 additions & 0 deletions integration/example-library/meltano-run-merge-states/meltano.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: 1
default_environment: dev
project_id: a3e6d53c-8ccc-4cac-a89c-08b70120f243
environments:
- name: dev
- name: staging
- name: prod
send_anonymous_usage_stats: false
plugins:
extractors:
- name: tap-with-state
namespace: tap_with_state
variant: custom
executable: ./tap.py
capabilities:
- discover
- catalog
- state
settings:
- name: ts
kind: date_iso8601
description: Dummy timestamp
select: ["*.*"]
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
Loading

0 comments on commit 8d86e65

Please sign in to comment.