Skip to content

Commit

Permalink
Merge pull request #422 from aiven/alex-add-delta-stats-to-local-tar-…
Browse files Browse the repository at this point in the history
…backup

Add basebackup mode with delta statistics [BF-356]

#422
  • Loading branch information
rikonen authored Apr 27, 2021
2 parents 2d160a4 + a5b2d60 commit 469e75f
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ disable=

[FORMAT]
max-line-length=125
max-module-lines=1000
max-module-lines=1100

[REPORTS]
output-format=text
Expand Down
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,16 @@ tablespaces.
Note that the ``local-tar`` backup mode can not be used on replica servers
prior to PostgreSQL 9.6 unless the pgespresso extension is installed.

When using ``delta`` mode, only changed files are uploaded into the storage.
On every backup snapshot of the data files is taken, this results in a manifest file,
describing the hashes of all the files needed to be backed up.
New hashes are uploaded to the storage and used together with complementary
manifest from control file for restoration.
In order to properly assess the efficiency of ``delta`` mode in comparison with
``local-tar``, one can use ``local-tar-delta-stats`` mode, which behaves the same as
``local-tar``, but also collects the metrics as if it was ``delta`` mode. It can help
in decision making of switching to ``delta`` mode.

``basebackup_threads`` (default ``1``)

How many threads to use for tar, compress and encrypt tasks. Only applies for
Expand Down
131 changes: 121 additions & 10 deletions pghoard/basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
See LICENSE for details
"""
import datetime
import hashlib
import io
import logging
import os
Expand All @@ -18,7 +19,7 @@
from queue import Empty, Queue
from tempfile import NamedTemporaryFile
from threading import Thread
from typing import Optional
from typing import Dict, Optional

import psycopg2

Expand All @@ -29,11 +30,12 @@
from . import common, version, wal
from .basebackup_delta import DeltaBaseBackup
from .common import (
BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass,
BackupFailure, BaseBackupFormat, BaseBackupMode, connection_string_using_pgpass, extract_pghoard_bb_v2_metadata,
replication_connection_string_and_slot_using_pgpass, set_stream_nonblocking,
set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess
)
from .patchedtarfile import tarfile
from .rohmu.delta.common import EMBEDDED_FILE_SIZE

BASEBACKUP_NAME = "pghoard_base_backup"
EMPTY_DIRS = [
Expand Down Expand Up @@ -82,6 +84,26 @@ def from_config(config) -> "CompressionData":
return CompressionData(algorithm=algorithm, level=level)


class HashFile:
def __init__(self, *, path):
self._file = open(path, "rb")
self.hash = hashlib.blake2s()

def __enter__(self):
return self

def __exit__(self, t, v, tb):
self._file.close()

def read(self, n=None):
data = self._file.read(n)
self.hash.update(data)
return data

def __getattr__(self, attr):
return getattr(self._file, attr)


class PGBaseBackup(Thread):
def __init__(
self,
Expand Down Expand Up @@ -126,6 +148,8 @@ def run(self):
self.run_local_tar_basebackup()
elif basebackup_mode == BaseBackupMode.delta:
self.run_local_tar_basebackup(delta=True)
elif basebackup_mode == BaseBackupMode.local_tar_delta_stats:
self.run_local_tar_basebackup(with_delta_stats=True)
elif basebackup_mode == BaseBackupMode.pipe:
self.run_piped_basebackup()
else:
Expand Down Expand Up @@ -409,7 +433,7 @@ def get_control_entries_for_tar(self, *, metadata, pg_control, backup_label):
ti.mtime = mtime
yield ti, None, False

def write_files_to_tar(self, *, files, tar):
def write_files_to_tar(self, *, files, tar, delta_stats=None):
for archive_path, local_path, missing_ok in files:
if not self.running:
raise BackupFailure("thread termination requested")
Expand All @@ -419,7 +443,18 @@ def write_files_to_tar(self, *, files, tar):
continue

try:
tar.add(local_path, arcname=archive_path, recursive=False)
if delta_stats is None:
tar.add(local_path, arcname=archive_path, recursive=False)
else:
if os.path.isdir(local_path):
tar.add(local_path, arcname=archive_path, recursive=False)
else:
with HashFile(path=local_path) as fileobj:
ti = tar.gettarinfo(name=local_path, arcname=archive_path)
tar.addfile(ti, fileobj=fileobj)
if ti.size > EMBEDDED_FILE_SIZE:
# Tiny files are not uploaded separately, they are embed into the manifest, so skip them
delta_stats[fileobj.hash.hexdigest()] = ti.size
except (FileNotFoundError if missing_ok else NoException):
self.log.warning("File %r went away while writing to tar, ignoring", local_path)

Expand Down Expand Up @@ -508,7 +543,15 @@ def compression_data(self) -> CompressionData:
return CompressionData.from_config(self.config)

def tar_one_file(
self, *, temp_dir, chunk_path, files_to_backup, callback_queue, filetype="basebackup_chunk", extra_metadata=None
self,
*,
temp_dir,
chunk_path,
files_to_backup,
callback_queue,
filetype="basebackup_chunk",
extra_metadata=None,
delta_stats=None
):
start_time = time.monotonic()

Expand All @@ -522,7 +565,7 @@ def tar_one_file(
fileobj=raw_output_obj
) as output_obj:
with tarfile.TarFile(fileobj=output_obj, mode="w") as output_tar:
self.write_files_to_tar(files=files_to_backup, tar=output_tar)
self.write_files_to_tar(files=files_to_backup, tar=output_tar, delta_stats=delta_stats)

input_size = output_obj.tell()

Expand Down Expand Up @@ -585,13 +628,14 @@ def wait_for_chunk_transfer_to_complete(self, chunk_count, upload_results, chunk
)
return False

def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index, temp_dir):
def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index, temp_dir, delta_stats=None):
one_chunk_files = chunks[index]
chunk_name, input_size, result_size = self.tar_one_file(
callback_queue=chunk_callback_queue,
chunk_path=chunk_path,
temp_dir=temp_dir,
files_to_backup=one_chunk_files,
delta_stats=delta_stats,
)
self.log.info(
"Queued backup chunk %r for transfer, chunks on disk (including partial): %r, current: %r, total chunks: %r",
Expand All @@ -604,7 +648,9 @@ def handle_single_chunk(self, *, chunk_callback_queue, chunk_path, chunks, index
"files": [chunk[0] for chunk in one_chunk_files]
}

def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
def create_and_upload_chunks(
self, chunks, data_file_format, temp_base_dir, delta_stats: Optional[Dict[str, int]] = None
):
start_time = time.monotonic()
chunk_files = []
upload_results = []
Expand Down Expand Up @@ -633,6 +679,7 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):
chunks=chunks,
index=i,
temp_dir=temp_base_dir,
delta_stats=delta_stats,
)
pending_compress_and_encrypt_tasks.append(task)
self.chunks_on_disk += 1
Expand All @@ -650,7 +697,31 @@ def create_and_upload_chunks(self, chunks, data_file_format, temp_base_dir):

return chunk_files

def run_local_tar_basebackup(self, delta=False):
def fetch_all_data_files_hashes(self):
hashes: Dict[str, int] = {}

for backup in self.get_remote_basebackups_info(self.site):
if backup["metadata"].get("format") != BaseBackupFormat.v2:
continue

key = os.path.join(self.site_config["prefix"], "basebackup", backup["name"])
bmeta_compressed = self.storage.get_contents_to_string(key)[0]

with rohmufile.file_reader(
fileobj=io.BytesIO(bmeta_compressed),
metadata=backup["metadata"],
key_lookup=lambda key_id: self.site_config["encryption_keys"][key_id]["private"]
) as input_obj:
meta = extract_pghoard_bb_v2_metadata(input_obj)

if "delta_stats" not in meta:
continue

hashes.update(meta["delta_stats"]["hashes"])

return hashes

def run_local_tar_basebackup(self, delta=False, with_delta_stats=False):
control_files_metadata_extra = {}
pgdata = self.site_config["pg_data_directory"]
if not os.path.isdir(pgdata):
Expand Down Expand Up @@ -756,13 +827,53 @@ def run_local_tar_basebackup(self, delta=False):
pgdata=pgdata, tablespaces=tablespaces, target_chunk_size=target_chunk_size
)
chunks_count = len(chunks)

delta_stats: Optional[Dict[str, int]] = None
if with_delta_stats:
delta_stats = {}

# Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0
# is reserved for special files and metadata and will be generated last.
chunk_files = self.create_and_upload_chunks(chunks, data_file_format, temp_base_dir)
chunk_files = self.create_and_upload_chunks(
chunks, data_file_format, temp_base_dir, delta_stats=delta_stats
)

total_size_plain = sum(item["input_size"] for item in chunk_files)
total_size_enc = sum(item["result_size"] for item in chunk_files)

if with_delta_stats:
control_files_metadata_extra["delta_stats"] = {"hashes": delta_stats}

existing_hashes = self.fetch_all_data_files_hashes()
new_hashes = {k: delta_stats[k] for k in set(delta_stats).difference(set(existing_hashes))}

planned_upload_size = sum(new_hashes.values())
planned_upload_count = len(new_hashes)

if existing_hashes:
# Send ratio metrics for every backup except for the first one
planned_total_size = sum(delta_stats.values())
planned_total_count = len(delta_stats)
if planned_total_count:
self.metrics.gauge(
"pghoard.planned_delta_backup_changed_data_files_ratio",
planned_upload_count / planned_total_count
)
if planned_total_size:
self.metrics.gauge(
"pghoard.planned_delta_backup_changed_data_size_ratio",
planned_upload_size / planned_total_size
)
self.metrics.gauge(
"pghoard.planned_delta_backup_remained_data_size_raw",
planned_total_size - planned_upload_size,
)

self.metrics.increase("pghoard.planned_delta_backup_total_size", inc_value=planned_upload_size)
self.metrics.gauge("pghoard.planned_delta_backup_upload_size", planned_upload_size)
self.metrics.increase("pghoard.planned_delta_backup_total_files", inc_value=planned_upload_count)
self.metrics.gauge("pghoard.planned_delta_backup_upload_files", planned_upload_count)

control_files_metadata_extra["chunks"] = chunk_files

# Everything is now tarred up, grab the latest pg_control and stop the backup process
Expand Down
1 change: 1 addition & 0 deletions pghoard/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BaseBackupMode(StrEnum):
basic = "basic"
delta = "delta"
local_tar = "local-tar"
local_tar_delta_stats = "local-tar-delta-stats"
pipe = "pipe"


Expand Down
2 changes: 1 addition & 1 deletion requirements.dev.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Use pip for build requirements to harmonize between OS versions
mock
pylint>=2.4.3
pylint>=2.4.3,<=2.7.2
pylint-quotes
pytest
pytest-mock
Expand Down
3 changes: 3 additions & 0 deletions test/test_basebackup.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ def test_basebackups_basic_lzma(self, capsys, db, pghoard_lzma, tmpdir):
def test_basebackups_delta(self, capsys, db, pghoard, tmpdir):
self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.delta)

def test_basebackups_local_tar_with_delta_stats(self, capsys, db, pghoard, tmpdir):
self._test_basebackups(capsys, db, pghoard, tmpdir, BaseBackupMode.local_tar_delta_stats)

def test_basebackups_local_tar_nonexclusive(self, capsys, db, pghoard, tmpdir):
if db.pgver < "9.6":
pytest.skip("PostgreSQL 9.6+ required for non-exclusive backups")
Expand Down

0 comments on commit 469e75f

Please sign in to comment.