Skip to content

Commit

Permalink
Merge pull request #110 from schireson/dc/allow-null-backup-name-field
Browse files Browse the repository at this point in the history
  • Loading branch information
DanCardin authored Jul 6, 2023
2 parents 2e1454f + a5a670f commit 33112d1
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 20 deletions.
15 changes: 15 additions & 0 deletions docs/source/config/table.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ tables:
- '*.*'
```
````{note}
`name` **can** also be omitted entirely, with some caveats. The "name" field
populates the `{table}` templated into queries and location paths (**both**
of which default to including the `{table}` template value).

Thus, if you omit the "name" field, you must have also provided a concrete "query"
and "location" field.

```yaml
tables:
- query: select * from for_example_a_view
location: backups/public.for_example_a_view
```
````

### Globbing

Using common globbing rules:
Expand Down
3 changes: 3 additions & 0 deletions src/databudgie/adapter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def materialize_table_dependencies(
tables = set()
dependent_table_ops = []
for table_op in table_ops:
if table_op.full_name is None:
continue

tables.add(table_op.full_name)

if not table_op.raw_conf.follow_foreign_keys:
Expand Down
22 changes: 11 additions & 11 deletions src/databudgie/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def backup_ddl(

for table_op in table_ops:
schema_op = table_op.schema_op()
if schema_op.name in schemas:
if not schema_op or schema_op.name in schemas:
continue

if not table_op.raw_conf.ddl:
Expand Down Expand Up @@ -115,10 +115,10 @@ def backup_ddl(
console.info("Finished backing up schema DDL")

for table_op in table_ops:
if not table_op.raw_conf.ddl:
if not table_op.full_name or not table_op.raw_conf.ddl:
continue

progress.update(task, description=f"Backing up DDL: {table_op.full_name}")
progress.update(task, description=f"Backing up DDL: {table_op.pretty_name}")
result = adapter.export_table_ddl(table_op.full_name)

filename = storage.write_buffer(
Expand All @@ -128,7 +128,7 @@ def backup_ddl(
name=table_op.full_name,
)

console.trace(f"Uploaded {table_op.full_name} to {filename}")
console.trace(f"Uploaded {table_op.pretty_name} to {filename}")
table_names.append(table_op.full_name)

console.info("Finished backing up DDL")
Expand Down Expand Up @@ -158,11 +158,11 @@ def backup_sequences(
task = progress.add_task("Backing up sequence positions", total=len(table_ops))

for table_op in table_ops:
progress.update(task, description=f"Backing up sequence position: {table_op.full_name}")

if not table_op.raw_conf.sequences:
if not table_op.full_name or not table_op.raw_conf.sequences:
continue

progress.update(task, description=f"Backing up sequence position: {table_op.pretty_name}")

sequences = table_sequences.get(table_op.full_name)
if not sequences:
continue
Expand All @@ -182,7 +182,7 @@ def backup_sequences(
name=table_op.full_name,
)

console.trace(f"Wrote {table_op.full_name} sequences to {filename}")
console.trace(f"Wrote {table_op.pretty_name} sequences to {filename}")

console.info("Finished backing up sequence positions")

Expand All @@ -198,7 +198,7 @@ def backup_tables(
task = progress.add_task("Backing up tables", total=len(table_ops))

for table_op in table_ops:
progress.update(task, description=f"Backing up table: {table_op.full_name}")
progress.update(task, description=f"Backing up table: {table_op.pretty_name}")

if not table_op.raw_conf.data:
continue
Expand Down Expand Up @@ -233,7 +233,7 @@ def backup(
path = table_op.location()

if table_op.raw_conf.skip_if_exists and storage.path_exists(path):
console.trace(f"Skipping {table_op.full_name} due to `skip_if_exists`")
console.trace(f"Skipping {table_op.pretty_name} due to `skip_if_exists`")
return

buffer = adapter.export_query(table_op.query())
Expand All @@ -245,4 +245,4 @@ def backup(
path, buffer, file_type=FileTypes.data, name=table_op.full_name, compression=compression
)

console.trace(f"Uploaded {table_op.full_name} to {filename}")
console.trace(f"Uploaded {table_op.pretty_name} to {filename}")
2 changes: 1 addition & 1 deletion src/databudgie/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def from_collection(cls, collection: list | dict | None) -> dict[str, Connection

@dataclass
class BackupTableConfig(Config):
name: str
name: str | None = None
location: str = "backups/{table}"
query: str = "select * from {table}"
compression: str | None = None
Expand Down
8 changes: 5 additions & 3 deletions src/databudgie/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def restore_all_ddl(

for table_op in table_ops:
schema_op = table_op.schema_op()
if schema_op.name in schema_names:
if not schema_op or schema_op.name in schema_names:
continue

if not table_op.raw_conf.ddl:
Expand Down Expand Up @@ -199,7 +199,7 @@ def truncate_tables(table_ops: Sequence[TableOp], adapter: Adapter, console: Con
for table_op in table_ops:
data = table_op.raw_conf.data
truncate = table_op.raw_conf.truncate
if not data or not truncate:
if not data or not truncate or table_op.full_name is None:
continue

progress.update(task, description=f"[trace]Truncating {table_op.full_name}[/trace]", advance=1)
Expand All @@ -220,7 +220,7 @@ def restore_tables(
task = progress.add_task("Restoring tables", total=len(table_ops))

for table_op in table_ops:
if not table_op.raw_conf.data:
if not table_op.full_name or not table_op.raw_conf.data:
continue

progress.update(task, description=f"Restoring table: {table_op.full_name}")
Expand All @@ -246,6 +246,8 @@ def restore(
console: Console = default_console,
) -> None:
"""Restore a CSV file from S3 to the database."""
assert table_op.full_name

# Force table_name to be fully qualified
schema, table = parse_table(table_op.full_name)
table_name = f"{schema}.{table}"
Expand Down
27 changes: 23 additions & 4 deletions src/databudgie/table_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ class TableOp(Generic[T]):
table matching that criteria.
"""

schema: str
table_name: str
full_name: str
schema: str | None
table_name: str | None
full_name: str | None
raw_conf: T

@classmethod
Expand All @@ -61,9 +61,20 @@ def query(self) -> str:

return query.format(table=self.full_name)

def schema_op(self) -> SchemaOp:
def schema_op(self) -> SchemaOp | None:
if self.schema is None:
return None

return SchemaOp(self.schema, self.raw_conf)

@property
def pretty_name(self) -> str:
if self.full_name:
return self.full_name

assert isinstance(self.raw_conf, BackupTableConfig)
return self.raw_conf.query


def expand_table_ops(
session: Session,
Expand All @@ -89,8 +100,13 @@ def expand_table_ops(

# expand table globs into fully qualified mappings to the config.
matching_tables: dict[str, list[T]] = {}
unnamed_tables: list[T] = []
for table_conf in tables:
pattern = table_conf.name
if pattern is None:
unnamed_tables.append(table_conf)
continue

if "." not in pattern:
pattern = f"{default_schema_name}.{pattern}"

Expand Down Expand Up @@ -128,4 +144,7 @@ def expand_table_ops(
table_op = TableOp.from_name(table, raw_conf=table_conf)
result.append(table_op)

for unnamed_table in unnamed_tables:
result.append(TableOp(None, None, None, unnamed_table))

return result
4 changes: 4 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import boto3
import pytest
from freezegun import freeze_time
Expand All @@ -7,6 +9,8 @@
from databudgie.config import RootConfig
from tests.mockmodels.models import Base

logging.basicConfig(level="INFO")


@pytest.fixture(scope="session")
def pmr_postgres_config():
Expand Down
20 changes: 20 additions & 0 deletions tests/test_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ def test_backup_failure(pg):
assert console.call_count == 2


def test_backup_unnamed_table(pg, mf, s3_resource):
"""Validate unnamed table can be backed up."""
customer = mf.customer.new(external_id="cid_123")

config = RootConfig.from_dict(
{
"backup": {
"location": "s3://sample-bucket/databudgie/test/{table}",
"tables": [{"query": "select * from public.customer"}],
**s3_config,
},
}
)
backup_all(pg, config.backup)

_validate_backup_contents(
get_file_buffer("s3://sample/databudgie/test/None/2021-04-26T09:00:00.csv", s3_resource), [customer]
)


def _validate_backup_contents(buffer, expected_contents: List[Customer]):
"""Validate the contents of a backup file. Columns from the file will be raw."""

Expand Down
9 changes: 8 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import List

from botocore.exceptions import ClientError
from mypy_boto3_s3.service_resource import S3ServiceResource

from databudgie.s3 import is_s3_path, S3Location
Expand Down Expand Up @@ -49,7 +50,13 @@ def get_file_buffer(filename, s3_resource=None):
assert s3_resource
location = S3Location(filename)
uploaded_object = s3_resource.Object("sample-bucket", location.key)
uploaded_object.download_fileobj(buffer)

try:
uploaded_object.download_fileobj(buffer)
except ClientError:
log.info(str(list(s3_resource.Bucket("sample-bucket").objects.all())))

raise
else:
try:
with open(filename, "rb") as f:
Expand Down

0 comments on commit 33112d1

Please sign in to comment.