Skip to content

Commit

Permalink
Allow distinct source and target projects (#294)
Browse files Browse the repository at this point in the history
* use target.project for destination

* update project var to source_project

* dynamic source based on combined_dataset var

* doc updates

* conditionally apply prehook

* fix quotes

* dynamically swap source project as well as dataset
  • Loading branch information
adamribaudo-velir authored Jan 4, 2024
1 parent ac3e8c3 commit 6c6d756
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,25 @@ This package assumes that you have an existing DBT project with a BigQuery profi
```
vars:
ga4:
project: "your_gcp_project"
dataset: "your_ga4_dataset"
source_project: "my_source_gcp_project" # Project that contains raw GA4 data
property_ids: [11111111] # Array of properties to process
start_date: "YYYYMMDD" # Earliest date to load
static_incremental_days: 3 # Number of days to scan and reprocess on each run
```

## Required Variables (Multi-Project Instance)

When processing multiple properties at a time, the required variables change slightly. See [Multi-Property Support](#multi-property-support) section for details on configuring multiple GA4 properties as a source.

```
vars:
ga4:
source_project: "my_source_gcp_project" # Project that contains raw GA4 data
combined_dataset: "my_combined_data" # Dataset where multi-property data is cloned
property_ids: [11111111,2222222] # Array of properties to process
start_date: "YYYYMMDD" # Earliest date to load
static_incremental_days: 3 # Number of days to scan and reprocess on each run
```
See [Multi-Property Support](#multi-property-support) section for details on configuring multiple GA4 properties as a source.

## Optional Variables

Expand Down Expand Up @@ -295,14 +308,14 @@ Overriding the package's default channel mapping makes use of dbt's dispatch ove

# Multi-Property Support

Multiple GA4 properties are supported by listing out the project IDs in the `property_ids` variable. In this scenario, the `static_incremental_days` variable is required and the `dataset` variable will define the target dataset where source data will be copied.
Multiple GA4 properties are supported by listing out the project IDs in the `property_ids` variable. In this scenario, the `static_incremental_days` variable is required and the `combined_dataset` variable will define the dataset (in your profile's target project) where source data will be copied.

```
vars:
ga4:
property_ids: [11111111, 22222222, 33333333]
static_incremental_days: 3
dataset: "my_combined_dataset"
combined_dataset: "my_combined_dataset"
```

With these variables set, the `combine_property_data` macro will run as a pre-hook to `base_ga4_events` and clone shards to the target dataset. The number of days' worth of data to clone during incremental runs will be based on the `static_incremental_days` variable.
Expand Down
12 changes: 6 additions & 6 deletions macros/combine_property_data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

{% macro default__combine_property_data() %}

create schema if not exists `{{var('project')}}.{{var('dataset')}}`;
create schema if not exists `{{target.project}}.{{var('combined_dataset')}}`;

{# If incremental, then use static_incremental_days variable to find earliest shard to copy #}
{% if not should_full_refresh() %}
Expand All @@ -18,20 +18,20 @@
{% for property_id in var('property_ids') %}
{%- set schema_name = "analytics_" + property_id|string -%}
{# Copy intraday tables #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('project')) -%}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_intraday_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{var('project')}}.{{var('dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_intraday_{{relation_suffix}}`;
{%- endif -%}
{% endfor %}
{# Copy daily tables and drop old intraday table #}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('project')) -%}
{%- set relations = dbt_utils.get_relations_by_pattern(schema_pattern=schema_name, table_pattern='events_%', exclude='events_intraday_%', database=var('source_project')) -%}
{% for relation in relations %}
{%- set relation_suffix = relation.identifier|replace('events_', '') -%}
{%- if relation_suffix|int >= earliest_shard_to_retrieve|int -%}
CREATE OR REPLACE TABLE `{{var('project')}}.{{var('dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
DROP TABLE IF EXISTS `{{var('project')}}.{{var('dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`;
CREATE OR REPLACE TABLE `{{target.project}}.{{var('combined_dataset')}}.events_{{relation_suffix}}{{property_id}}` CLONE `{{var('source_project')}}.analytics_{{property_id}}.events_{{relation_suffix}}`;
DROP TABLE IF EXISTS `{{target.project}}.{{var('combined_dataset')}}.events_intraday_{{relation_suffix}}{{property_id}}`;
{%- endif -%}
{% endfor %}
{% endfor %}
Expand Down
2 changes: 1 addition & 1 deletion models/staging/base/base_ga4__events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

{{
config(
pre_hook="{{ ga4.combine_property_data() }}" if var('property_ids', false) else "",
pre_hook="{{ ga4.combine_property_data() }}" if var('combined_dataset', false) else "",
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by={
Expand Down
10 changes: 8 additions & 2 deletions models/staging/src_ga4.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ version: 2

sources:
- name: ga4
database: "{{var('project')}}"
schema: "{{var('dataset')}}"
database: | # Source from target.project if multi-property, otherwise source from source_project
{%- if var('combined_dataset', false) != false -%} {{target.project}}
{%- else -%} {{var('source_project')}}
{%- endif -%}
schema: | # Source from combined property dataset if set, otherwise source from original GA4 property
{%- if var('combined_dataset', false) != false -%} {{var('combined_dataset')}}
{%- else -%} analytics_{{var('property_ids')[0]}}
{%- endif -%}
tables:
- name: events
identifier: events_* # Scan across all sharded event tables. Use the 'start_date' variable to limit this scan
Expand Down

0 comments on commit 6c6d756

Please sign in to comment.