Skip to content

Commit

Permalink
Fix apt-get lock held by another process error by retry
Browse files Browse the repository at this point in the history
  • Loading branch information
Umit Kablan committed Oct 10, 2024
1 parent 2666d01 commit 4360058
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 36 deletions.
89 changes: 74 additions & 15 deletions pleskdistup/common/src/dpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import re
import subprocess
import typing
import time

from . import files, util
from . import files, log, util

DPKG_TEMPFAIL_RETRY: typing.List[int] = [30, 60, 90, 120]

APT_CHOOSE_OLD_FILES_OPTIONS = [
"-o", "Dpkg::Options::=--force-confdef",
Expand Down Expand Up @@ -122,9 +125,59 @@ def safely_install_packages(
install_packages(pkgs, repository, force_package_config)


def _exec_retry_when_locked(
apt_get_cmd: typing.List[str],
tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None,
collect_stdout: bool = False,
) -> str:
cant_get_lock = False
stdout = []

if tmpfail_retry_intervals is None:
tmpfail_retry_intervals = DPKG_TEMPFAIL_RETRY

def process_stdout(line: str) -> None:
if collect_stdout:
nonlocal stdout
stdout.append(line)
log.info("stdout: {}".format(line.rstrip('\n')), to_stream=False)

def process_stderr(line: str) -> None:
log.info("stderr: {}".format(line.rstrip('\n')))
nonlocal cant_get_lock
if cant_get_lock:
return
if "E: Could not get lock" in line:
cant_get_lock = True

i = 0
while True:
cant_get_lock = False
stdout.clear()
log.info(f"Executing: {' '.join(apt_get_cmd)}")
exit_code = util.exec_get_output_streamed(
apt_get_cmd, process_stdout, process_stderr,
env={
"PATH": os.environ["PATH"],
"DEBIAN_FRONTEND": "noninteractive",
"LC_ALL": "C",
"LANG": "C",
},
)
if exit_code == 0:
break
if i >= len(tmpfail_retry_intervals) or not cant_get_lock:
raise subprocess.CalledProcessError(returncode=exit_code, cmd=apt_get_cmd)
log.info(f"{apt_get_cmd[0]} failed because lock is already held, will retry in {tmpfail_retry_intervals[i]} seconds..")
time.sleep(tmpfail_retry_intervals[i])
i += 1
return "".join(stdout)


def remove_packages(
pkgs: typing.List[str],
simulate: bool = False,
tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None,
) -> typing.Optional[typing.Dict[str, typing.List[PackageEntry]]]:
if len(pkgs) == 0:
return None
Expand All @@ -133,7 +186,7 @@ def remove_packages(
if simulate:
cmd.append("--simulate")
cmd += pkgs
cmd_out = util.logged_check_call(cmd)
cmd_out = _exec_retry_when_locked(cmd, tmpfail_retry_intervals, collect_stdout=True)
if simulate:
return _parse_apt_get_simulation(cmd_out)
return None
Expand All @@ -142,35 +195,40 @@ def remove_packages(
def safely_remove_packages(
pkgs: typing.List[str],
protected_pkgs: typing.Optional[typing.Iterable[str]] = None,
tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None,
) -> None:
sim_res = remove_packages(pkgs, simulate=True)
if sim_res is not None and protected_pkgs is not None:
protected_set = set(protected_pkgs)
violations = _find_protection_violations(sim_res, protected_set)
if violations:
raise PackageProtectionError(protected_packages=violations)
remove_packages(pkgs)
remove_packages(pkgs, False, tmpfail_retry_intervals)


def find_related_repofiles(repository_file: str) -> typing.List[str]:
return files.find_files_case_insensitive("/etc/apt/sources.list.d", repository_file)


def update_package_list() -> None:
util.logged_check_call(["/usr/bin/apt-get", "update", "-y"])
def update_package_list(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None:
cmd = ["/usr/bin/apt-get", "update", "-y"]
_exec_retry_when_locked(cmd, tmpfail_retry_intervals)


def upgrade_packages(pkgs: typing.Optional[typing.List[str]] = None) -> None:
def upgrade_packages(
pkgs: typing.Optional[typing.List[str]] = None,
tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None,
) -> None:
if pkgs is None:
pkgs = []

cmd = ["/usr/bin/apt-get", "upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS + pkgs
util.logged_check_call(cmd, env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"})
_exec_retry_when_locked(cmd, tmpfail_retry_intervals)


def autoremove_outdated_packages() -> None:
util.logged_check_call(["/usr/bin/apt-get", "autoremove", "-y"],
env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"})
def autoremove_outdated_packages(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None:
cmd = ["/usr/bin/apt-get", "autoremove", "-y"]
_exec_retry_when_locked(cmd, tmpfail_retry_intervals)


def depconfig_parameter_set(parameter: str, value: str) -> None:
Expand All @@ -184,13 +242,14 @@ def depconfig_parameter_get(parameter: str) -> str:
return process.stdout.split(" ")[1].strip()


def restore_installation() -> None:
util.logged_check_call(["/usr/bin/apt-get", "-f", "install", "-y"])
def restore_installation(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None:
cmd = ["/usr/bin/apt-get", "-f", "install", "-y"]
_exec_retry_when_locked(cmd, tmpfail_retry_intervals)


def do_distupgrade() -> None:
util.logged_check_call(["apt-get", "dist-upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS,
env={"PATH": os.environ["PATH"], "DEBIAN_FRONTEND": "noninteractive"})
def do_distupgrade(tmpfail_retry_intervals: typing.Optional[typing.List[int]] = None) -> None:
cmd = ["apt-get", "dist-upgrade", "-y"] + APT_CHOOSE_OLD_FILES_OPTIONS
_exec_retry_when_locked(cmd, tmpfail_retry_intervals)


def get_installed_packages_list(regex: str) -> typing.List[typing.Tuple[str, str]]:
Expand Down
82 changes: 61 additions & 21 deletions pleskdistup/common/src/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,74 @@
from . import log


def log_outputs_check_call(
cmd: typing.Union[typing.Sequence[str], str],
collect_return_stdout: bool = False,
**kwargs,
) -> str:
'''
Runs cmd and raises on nonzero exit code. Returns stdout when collect_return_stdout
'''
log.info(f"Running: {cmd!r}. Output:")
stdout = []

def proc_stdout(line: str) -> None:
log.info("stdout: {}".format(line.rstrip('\n')), to_stream=False)
if collect_return_stdout:
stdout.append(line)

def proc_stderr(line: str) -> None:
log.info("stderr: {}".format(line.rstrip('\n')))

exit_code = exec_get_output_streamed(cmd, proc_stdout, proc_stderr, **kwargs)
if exit_code != 0:
log.err(f"Command {cmd!r} failed with return code {exit_code}")
raise subprocess.CalledProcessError(returncode=exit_code, cmd=cmd)

log.info(f"Command {cmd!r} finished successfully")
return "".join(stdout)


# Returns standard output
def logged_check_call(cmd: typing.Union[typing.Sequence[str], str], **kwargs) -> str:
log.info(f"Running: {cmd!r}. Output:")
return log_outputs_check_call(cmd, collect_return_stdout=True, **kwargs)


# I beleive we should be able pass argument to the subprocess function
# from the caller. So we have to inject stdout/stderr/universal_newlines
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT
def exec_get_output_streamed(
cmd: typing.Union[typing.Sequence[str], str],
process_stdout_line: typing.Optional[typing.Callable[[str], None]],
process_stderr_line: typing.Optional[typing.Callable[[str], None]],
**kwargs,
) -> int:
'''
Allows to get stdout/stderr by streaming line by line, by calling callbacks
and returns process exit code
'''
kwargs["stdout"] = (subprocess.DEVNULL if process_stdout_line is None
else subprocess.PIPE)
kwargs["stderr"] = (subprocess.DEVNULL if process_stderr_line is None
else subprocess.PIPE)
kwargs["universal_newlines"] = True

stdout = []
process = subprocess.Popen(cmd, **kwargs)
while None is process.poll():
if not process.stdout:
log.err(f"Can't get process output from {cmd!r}")
raise RuntimeError(f"Can't get process output from {cmd!r}")
line = process.stdout.readline()
if line:
stdout.append(line)
if line.strip():
log.info(line.strip(), to_stream=False)
if process_stdout_line is None and process_stderr_line is None:
process.communicate()
return process.returncode

if process.returncode != 0:
log.err(f"Command {cmd!r} failed with return code {process.returncode}")
raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd, output="\n".join(stdout))

log.info(f"Command {cmd!r} finished successfully")
return "\n".join(stdout)
while process.poll() is None:
if process_stdout_line is not None:
if not process.stdout:
raise RuntimeError(f"Cannot get process stdout of command {cmd!r}")
line = process.stdout.readline()
if line:
process_stdout_line(line)
if process_stderr_line is not None:
if not process.stderr:
raise RuntimeError(f"Cannot get process stderr of command {cmd!r}")
line = process.stderr.readline()
if line:
process_stderr_line(line)
return process.returncode


def merge_dicts_of_lists(
Expand Down

0 comments on commit 4360058

Please sign in to comment.