Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Proof-of-concept fix for missing .partial WALs on recovery #629

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
121 changes: 90 additions & 31 deletions barman/recovery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,9 @@ def _xlog_copy(self, required_xlog_files, wal_dest, remote_command):
"""
# List of required WAL files partitioned by containing directory
xlogs = collections.defaultdict(list)
# Keep a separate list of partial xlogs because we must treat these
# differently to the archived xlogs
partial_xlogs = []
# add '/' suffix to ensure it is a directory
wal_dest = "%s/" % wal_dest
# Map of every compressor used with any WAL file in the archive,
Expand All @@ -782,16 +785,39 @@ def _xlog_copy(self, required_xlog_files, wal_dest, remote_command):
compression_manager = self.backup_manager.compression_manager
# Fill xlogs and compressors maps from required_xlog_files
for wal_info in required_xlog_files:
hashdir = xlog.hash_dir(wal_info.name)
xlogs[hashdir].append(wal_info)
# If a compressor is required, make sure it exists in the cache
if (
wal_info.compression is not None
and wal_info.compression not in compressors
):
compressors[wal_info.compression] = compression_manager.get_compressor(
compression=wal_info.compression
xlog.is_partial_file(wal_info.name)
and hasattr(wal_info, "orig_filename")
and (
os.path.dirname(wal_info.orig_filename)
== self.config.streaming_wals_directory
)
):
# If a .partial file is in the `streaming` directory then add it to a
# separate list so it can be renamed and transferred in a separate
# Rsync operation.
partial_xlogs.append(wal_info)
else:
if xlog.is_partial_file(wal_info.name):
# If a .partial file is in the main WAL archive then either a
# standby was promoted or a primary was recovered from a backup.
# In either case we do not attempt to handle it and instead warn
# the user.
output.warning(
".partial WAL file '%s' found in WAL archive", wal_info.name
)
hashdir = xlog.hash_dir(wal_info.name)
xlogs[hashdir].append(wal_info)
# If a compressor is required, make sure it exists in the cache
if (
wal_info.compression is not None
and wal_info.compression not in compressors
):
compressors[
wal_info.compression
] = compression_manager.get_compressor(
compression=wal_info.compression
)

rsync = RsyncPgData(
path=self.server.path,
Expand Down Expand Up @@ -845,29 +871,12 @@ def _xlog_copy(self, required_xlog_files, wal_dest, remote_command):
else:
shutil.copy2(os.path.join(source_dir, segment.name), dst_file)
if remote_command:
try:
# Transfer the WAL files
rsync.from_file_list(
list(segment.name for segment in xlogs[prefix]),
wal_decompression_dest,
wal_dest,
)
except CommandFailedException as e:
msg = (
"data transfer failure while copying WAL files "
"to directory '%s'"
) % (wal_dest[1:],)
raise DataTransferFailure.from_command_error("rsync", e, msg)

# Cleanup files after the transfer
for segment in xlogs[prefix]:
file_name = os.path.join(wal_decompression_dest, segment.name)
try:
os.unlink(file_name)
except OSError as e:
output.warning(
"Error removing temporary file '%s': %s", file_name, e
)
self._rsync_move_files(
rsync,
list(segment.name for segment in xlogs[prefix]),
wal_decompression_dest,
wal_dest,
)
else:
try:
rsync.from_file_list(
Expand All @@ -882,6 +891,30 @@ def _xlog_copy(self, required_xlog_files, wal_dest, remote_command):
)
raise DataTransferFailure.from_command_error("rsync", e, msg)

# Now copy any .partial files we need to care about
if partial_xlogs:
_logger.info("Copying .partial WAL files from streaming directory.")
partial_staging_dir = tempfile.mkdtemp(prefix="barman_wal-partial-")
# Stage a copy of the .partial file with its final name so that Rsync can
# copy the file to its final destination.
for segment in partial_xlogs:
src_file = os.path.join(
self.config.streaming_wals_directory, segment.name
)
dst_file = os.path.join(
partial_staging_dir, segment.name.rstrip(".partial")
)
shutil.copy2(src_file, dst_file)
# Move the renamed files to the final destination
self._rsync_move_files(
rsync,
list(segment.name.rstrip(".partial") for segment in partial_xlogs),
partial_staging_dir,
wal_dest,
)
# Cleanup staging dir
shutil.rmtree(partial_staging_dir)

_logger.info("Finished copying %s WAL files.", total_wals)

# Remove local decompression target directory if different from the
Expand All @@ -890,6 +923,32 @@ def _xlog_copy(self, required_xlog_files, wal_dest, remote_command):
if wal_decompression_dest and wal_decompression_dest != wal_dest:
shutil.rmtree(wal_decompression_dest)

def _rsync_move_files(self, rsync, file_list, src_dir, dst_dir):
"""
Helper function which copies the given file list to dst_dir and removes them
from the src_dir.
"""
try:
# Transfer the WAL files
rsync.from_file_list(
file_list,
src_dir,
dst_dir,
)
except CommandFailedException as e:
msg = (
"data transfer failure while copying WAL files "
"to directory '%s'" % (dst_dir,)
)
raise DataTransferFailure.from_command_error("rsync", e, msg)
# Cleanup files after the transfer
for basename in file_list:
file_name = os.path.join(src_dir, basename)
try:
os.unlink(file_name)
except OSError as e:
output.warning("Error removing temporary file '%s': %s", file_name, e)

def _generate_archive_status(
self, recovery_info, remote_command, required_xlog_files
):
Expand Down
33 changes: 33 additions & 0 deletions barman/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,12 @@ def get_required_xlog_files(
begin = backup.begin_wal
end = backup.end_wal

# This flag is set to False if we find a WAL in the archive which is more
# recent than the requested time, since in such cases we already have all
# the WALs we need. In any other case we will need the .partial file in
# order to minimise the RPO.
should_find_partial_files = True

# Calculate the integer value of TLI if a keyword is provided
calculated_target_tli = target_tli
if target_tli and type(target_tli) is str:
Expand Down Expand Up @@ -1654,13 +1660,40 @@ def get_required_xlog_files(
if wal_info.name > end:
end = wal_info.name
if target_time and wal_info.time > target_time:
# We found a WAL after the target time so we do not need to
# look for partial files
should_find_partial_files = False
break
# return all the remaining history files
for line in fxlogdb:
wal_info = WalFileInfo.from_xlogdb_line(line)
if xlog.is_history_file(wal_info.name):
yield wal_info

# Finally, check the `streaming` directory to see if there are any
# relevant .partial files
if should_find_partial_files:
file_names = sorted(
glob(os.path.join(self.config.streaming_wals_directory, "*"))
)
required_partial_files = []
for file_name in file_names:
if xlog.is_partial_file(file_name):
# We have a partial file - do we need it?
if os.path.basename(file_name) < begin:
continue
tli, _, _ = xlog.decode_segment_name(file_name)
if tli > calculated_target_tli or tli < calculated_target_tli:
continue
required_partial_files.append(file_name)
if required_partial_files:
# If there is more than one `.partial` file then we use the most recent
# for recovery. The older `.partial` files will be moved into the Barman
# WAL archive by the archiver process next time it runs.
yield WalFileInfo.from_file(
required_partial_files[-1], compression=None
)

# TODO: merge with the previous
def get_wal_until_next_backup(self, backup, include_history=False):
"""
Expand Down