Skip to content

Commit

Permalink
feat: Support partitioning, clustering, and protection properties for…
Browse files Browse the repository at this point in the history
… BQ tables (#116)

* feat: modify BQ table template to use partitioning and clustering

* added BQ table optional properties on pipeline.yaml sample

* added deletion_protection property for BQ tables

* tests for new optional BQ table properties

* fix: partition Google Trends tables (#118)
  • Loading branch information
adlersantos authored Jul 12, 2021
1 parent 0c08a1f commit 288c5a2
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 4 deletions.
80 changes: 80 additions & 0 deletions datasets/google_trends/_terraform/top_terms_pipeline.tf
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,47 @@ resource "google_bigquery_table" "top_terms" {
table_id = "top_terms"

description = "Daily top 25 terms in the United States with score, ranking, time, and designated market area"
time_partitioning {
type = "DAY"

field = "refresh_date"

require_partition_filter = false
}


schema = <<EOF
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]
EOF
depends_on = [
google_bigquery_dataset.google_trends
]
Expand All @@ -41,7 +81,47 @@ resource "google_bigquery_table" "top_rising_terms" {
table_id = "top_rising_terms"

description = "Daily top rising terms in the United States with score, ranking, time, and designated market area"
time_partitioning {
type = "DAY"

field = "refresh_date"

require_partition_filter = false
}


schema = <<EOF
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]
EOF
depends_on = [
google_bigquery_dataset.google_trends
]
Expand Down
70 changes: 70 additions & 0 deletions datasets/google_trends/top_terms/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,80 @@ resources:
- type: bigquery_table
table_id: top_terms
description: "Daily top 25 terms in the United States with score, ranking, time, and designated market area"
time_partitioning:
type: "DAY"
field: "refresh_date"
require_partition_filter: false
schema: |-
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]
- type: bigquery_table
table_id: top_rising_terms
description: "Daily top rising terms in the United States with score, ranking, time, and designated market area"
time_partitioning:
type: "DAY"
field: "refresh_date"
require_partition_filter: false
schema: |-
[
{
"name": "rank",
"type": "INTEGER"
},
{
"name": "refresh_date",
"type": "DATE"
},
{
"name": "dma_name",
"type": "STRING"
},
{
"name": "dma_id",
"type": "INTEGER"
},
{
"name": "term",
"type": "STRING"
},
{
"name": "week",
"type": "DATE"
},
{
"name": "score",
"type": "INTEGER"
}
]
dag:
initialize:
Expand Down
22 changes: 22 additions & 0 deletions samples/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,31 @@ resources:
# table_id
table_id: PIPELINE_FOLDER_NAME

# Optional Properties:
# Description of the table
description: "This is a table description."

# Time-based partitioning configuration. There is no need for this property
# if you have a relatively small dataset to host on a BigQuery table.
time_partitioning:

# The supported types are DAY, HOUR, MONTH, and YEAR, which will generate one partition per day, hour, month, and year, respectively.
type: "DAY"

# If set to true, queries over this table require a partition filter that can be used for partition elimination to be specified.
require_partition_filter: false

# Specifies column names to use for data clustering. Up to four top-level columns are allowed, and should be specified in descending priority order.
clustering:
- "column_1"
- "column_2"
- "column_3"

# The table cannot be deleted without first disabling this property.
# Unless this field is set to false in Terraform state, a `terraform destroy`
# or `terraform apply` that would delete the table will fail.
deletion_protection: true

dag:
# The DAG acronym stands for directed acyclic graph. This block represents
# your data pipeline along with every property and configuration it needs to
Expand Down
13 changes: 13 additions & 0 deletions templates/terraform/google_bigquery_table.tf.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ resource "google_bigquery_table" "{{ tf_resource_name }}" {
{% if description -%}
description = {{ description|tojson }}
{%- endif %}
{% if time_partitioning -%}
time_partitioning {
{%- for key, val in time_partitioning.items() %}
{{ key }} = {{ val|tojson }}
{% endfor -%}
}
{%- endif %}
{% if clustering -%}
clustering = {{ clustering|tojson }}
{%- endif %}
{% if deletion_protection -%}
deletion_protection = {{ deletion_protection|tojson }}
{%- endif %}
{% if schema -%}
schema = <<EOF
{{ schema }}
Expand Down
20 changes: 16 additions & 4 deletions tests/scripts/test_generate_terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def test_dataset_tf_has_no_bq_dataset_description_when_unspecified(
assert not re.search(r"description\s+\=", result.group(1))


def test_pipeline_tf_contains_bq_table_description_when_specified(
def test_pipeline_tf_contains_optional_properties_when_specified(
dataset_path,
pipeline_path,
project_id,
Expand Down Expand Up @@ -571,10 +571,13 @@ def test_pipeline_tf_contains_bq_table_description_when_specified(
)
assert bq_table
assert bq_table["description"]
assert bq_table["time_partitioning"]
assert bq_table["clustering"]
assert bq_table["deletion_protection"]

# Match the "google_bigquery_table" properties, i.e. any lines between the
# curly braces, in the *_pipeline.tf file
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)\}"
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)^\}"
bq_table_tf_string = re.compile(regexp, flags=re.MULTILINE | re.DOTALL)

for path_prefix in (
Expand All @@ -587,9 +590,12 @@ def test_pipeline_tf_contains_bq_table_description_when_specified(

assert re.search(r"table_id\s+\=", result.group(1))
assert re.search(r"description\s+\=", result.group(1))
assert re.search(r"time_partitioning\s+\{", result.group(1))
assert re.search(r"clustering\s+\=", result.group(1))
assert re.search(r"deletion_protection\s+\=", result.group(1))


def test_pipeline_tf_has_no_bq_table_description_when_unspecified(
def test_pipeline_tf_has_no_optional_properties_when_unspecified(
dataset_path,
pipeline_path,
project_id,
Expand All @@ -608,6 +614,9 @@ def test_pipeline_tf_has_no_bq_table_description_when_unspecified(
(r for r in config["resources"] if r["type"] == "bigquery_table"), None
)
del bq_table["description"]
del bq_table["time_partitioning"]
del bq_table["clustering"]
del bq_table["deletion_protection"]
with open(pipeline_path / "pipeline.yaml", "w") as file:
yaml.dump(config, file)

Expand All @@ -624,7 +633,7 @@ def test_pipeline_tf_has_no_bq_table_description_when_unspecified(

# Match the "google_bigquery_table" properties, i.e. any lines between the
# curly braces, in the *_pipeline.tf file
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)\}"
regexp = r"\"google_bigquery_table\" \"" + bq_table["table_id"] + r"\" \{(.*?)^\}"
bq_table_tf_string = re.compile(regexp, flags=re.MULTILINE | re.DOTALL)

for path_prefix in (
Expand All @@ -637,6 +646,9 @@ def test_pipeline_tf_has_no_bq_table_description_when_unspecified(

assert re.search(r"table_id\s+\=", result.group(1))
assert not re.search(r"description\s+\=", result.group(1))
assert not re.search(r"time_partitioning\s+\{", result.group(1))
assert not re.search(r"clustering\s+\=", result.group(1))
assert not re.search(r"deletion_protection\s+\=", result.group(1))


def test_bq_table_can_have_a_description_with_newlines_and_quotes(
Expand Down

0 comments on commit 288c5a2

Please sign in to comment.