Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] Add docs for reworked Fivetran API #26026

Open
wants to merge 7 commits into
base: maxime/move-fivetran-docs-to-legacy-page
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions docs/content/integrations/fivetran/fivetran.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
---
title: "Using Dagster with Fivetran"
description: Represent your Fivetran connectors in Dagster
---

# Using Dagster with Fivetran

This guide provides instructions for using Dagster with Fivetran using the `dagster-fivetran` library. Your Fivetran connector tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Fivetran assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Fivetran connectors, allowing you to trigger syncs for these on a cadence or based on upstream data changes.

## What you'll learn

- How to represent Fivetran assets in the Dagster asset graph, including lineage to other Dagster assets.
- How to customize asset definition metadata for these Fivetran assets.
- How to materialize Fivetran connector tables from Dagster.
- How to customize how Fivetran connector tables are materialized.

<details>
<summary>Prerequisites</summary>

- The `dagster` and `dagster-fivetran` libraries installed in your environment
- Familiarity with asset definitions and the Dagster asset graph
- Familiarity with Dagster resources
- Familiarity with Fivetran concepts, like connectors and connector tables
- A Fivetran workspace
- A Fivetran API key and API secret. For more information, see [Getting Started](https://fivetran.com/docs/rest-api/getting-started) in the Fivetran REST API documentation.

</details>

## Represent Fivetran assets in the asset graph

To load Fivetran assets into the Dagster asset graph, you must first construct a <PyObject module="dagster_fivetran" object="FivetranWorkspace" /> resource, which allows Dagster to communicate with your Fivetran workspace. You'll need to supply your account ID, API key and API secret. See [Getting Started](https://fivetran.com/docs/rest-api/getting-started) in the Fivetran REST API documentation for more information on how to create your API key and API secret.

Dagster can automatically load all connector tables from your Fivetran workspace as asset specs. Call the <PyObject module="dagster_fivetran" method="load_fivetran_asset_specs" /> function, which returns list of <PyObject object="AssetSpec" />s representing your Fivetran assets. You can then include these asset specs in your <PyObject object="Definitions" /> object:

```python file=/integrations/fivetran/representing-fivetran-assets.py
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(
assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
)
```

### Sync and materialize Fivetran assets

You can use Dagster to sync Fivetran connectors and materialize Fivetran connector tables. You can use the <PyObject module="dagster_fivetran" method="fivetran_assets" /> decorator to create the assets definition for a given connector, or the <PyObject module="dagster_fivetran" method="build_fivetran_assets_definitions" /> factory to create all assets definitions for your Fivetran workspace.

```python file=/integrations/fivetran/sync-and-materialize-fivetran-assets.py
from dagster_fivetran import (
FivetranWorkspace,
build_fivetran_assets_definitions,
fivetran_assets,
)

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


# Creating assets definition for a given connector using the `@fivetran_assets` decorator
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
yield from fivetran.sync_and_poll(context=context)


# Alternatively, creating all assets definitions for the Fivetran workspace
# using the `build_fivetran_assets_definitions` factory
all_fivetran_assets = build_fivetran_assets_definitions(fivetran_workspace)

defs = dg.Definitions(
assets=[*all_fivetran_assets],
resources={"fivetran": fivetran_workspace},
)
```

### Customize asset definition metadata for Fivetran assets

By default, Dagster will generate asset specs for each Fivetran asset and populate default metadata. You can further customize asset properties by passing a custom <PyObject module="dagster_fivetran" object="DagsterFivetranTranslator" /> subclass to the <PyObject module="dagster_fivetran" method="load_fivetran_asset_specs" /> function, the <PyObject module="dagster_fivetran" method="fivetran_assets" /> decorator or the <PyObject module="dagster_fivetran" method="build_fivetran_assets_definitions" /> factory.

```python file=/integrations/fivetran/customize-fivetran-asset-defs.py
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
build_fivetran_assets_definitions,
fivetran_assets,
load_fivetran_asset_specs,
)

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


# A translator class lets us customize properties of the built
# Fivetran assets, such as the owners or asset key
class MyCustomFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(props)
# We customize the metadata, asset key prefix and team owner tag for all assets
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
owners=["team:my_team"],
).merge_attributes(metadata={"custom": "metadata"})


# When loading asset specs
fivetran_specs = load_fivetran_asset_specs(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)


# Alternatively, when creating assets definition for a given connector using the `@fivetran_assets` decorator
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
dagster_fivetran_translator=MyCustomFivetranTranslator(),
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
yield from fivetran.sync_and_poll(context=context)


# Alternatively, when creating all assets definitions for the Fivetran workspace
# using the `build_fivetran_assets_definitions` factory
all_fivetran_assets = build_fivetran_assets_definitions(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)

defs = dg.Definitions(
assets=[*all_fivetran_assets], resources={"fivetran": fivetran_workspace}
)
```

Note that `super()` is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.

### Load Fivetran assets from multiple workspaces

Definitions from multiple Fivetran workspaces can be combined by instantiating multiple <PyObject module="dagster_fivetran" object="FivetranWorkspace" /> resources and merging their specs. This lets you view all your Fivetran assets in a single asset graph:

```python file=/integrations/fivetran/multiple-fivetran-workspaces.py
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

sales_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"),
)
marketing_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"),
)

sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace)
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace)

# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_fivetran_specs, *marketing_fivetran_specs],
resources={
"marketing_fivetran": marketing_fivetran_workspace,
"sales_fivetran": sales_fivetran_workspace,
},
)
```
29 changes: 16 additions & 13 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-fivetran.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,31 @@ This library provides a Dagster integration with `Fivetran <https://www.fivetran
.. currentmodule:: dagster_fivetran


Resources
=========
Assets (Fivetran API)
====================

.. autoconfigurable:: FivetranResource
:annotation: ResourceDefinition

Assets
======
.. autoclass:: FivetranWorkspace

.. autofunction:: load_assets_from_fivetran_instance

.. autofunction:: build_fivetran_assets
.. autoclass:: DagsterFivetranTranslator

.. autodecorator:: fivetran_assets

Ops
===
.. autofunction:: load_fivetran_asset_specs

.. autoconfigurable:: fivetran_sync_op
.. autofunction:: build_fivetran_assets_definitions


Legacy
======

.. autoconfigurable:: fivetran_resource
:annotation: ResourceDefinition

.. autoconfigurable:: FivetranResource
:annotation: ResourceDefinition

.. autofunction:: load_assets_from_fivetran_instance

.. autofunction:: build_fivetran_assets

.. autoconfigurable:: fivetran_sync_op
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
build_fivetran_assets_definitions,
fivetran_assets,
load_fivetran_asset_specs,
)

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


# A translator class lets us customize properties of the built
# Fivetran assets, such as the owners or asset key
class MyCustomFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(props)
# We customize the metadata, asset key prefix and team owner tag for all assets
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
owners=["team:my_team"],
).merge_attributes(metadata={"custom": "metadata"})


# When loading asset specs
fivetran_specs = load_fivetran_asset_specs(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)


# Alternatively, when creating assets definition for a given connector using the `@fivetran_assets` decorator
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
dagster_fivetran_translator=MyCustomFivetranTranslator(),
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
yield from fivetran.sync_and_poll(context=context)


# Alternatively, when creating all assets definitions for the Fivetran workspace
# using the `build_fivetran_assets_definitions` factory
all_fivetran_assets = build_fivetran_assets_definitions(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)

defs = dg.Definitions(
assets=[*all_fivetran_assets], resources={"fivetran": fivetran_workspace}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

sales_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"),
)
marketing_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"),
)

sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace)
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace)

# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_fivetran_specs, *marketing_fivetran_specs],
resources={
"marketing_fivetran": marketing_fivetran_workspace,
"sales_fivetran": sales_fivetran_workspace,
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(
assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dagster_fivetran import (
FivetranWorkspace,
build_fivetran_assets_definitions,
fivetran_assets,
)

import dagster as dg

fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


# Creating assets definition for a given connector using the `@fivetran_assets` decorator
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
yield from fivetran.sync_and_poll(context=context)


# Alternatively, creating all assets definitions for the Fivetran workspace
# using the `build_fivetran_assets_definitions` factory
all_fivetran_assets = build_fivetran_assets_definitions(fivetran_workspace)

defs = dg.Definitions(
assets=[*all_fivetran_assets],
resources={"fivetran": fivetran_workspace},
)
Loading