Skip to content

Commit

Permalink
guest agent: revamp synchronisation and assist thawing
Browse files Browse the repository at this point in the history
We think that this is the cause of PL-132809
(VMs unavailable due to hung tasks / guest freeze).

The previous way to establish a connection was broken as it
misunderstood the synchronisation mechanism and the way the guest agent
tries to send pending messages to the socket.

The new approach first tries to send a gratuitous thaw,
flush out any pending messages and then tries once to ping and
synchronize - otherwise erroring out immediately.

Also, when freezing we try harder to thaw if we failed and we increased
the freezing timeout to 5 minutes.

Fixes PL-132809
  • Loading branch information
ctheune committed Oct 11, 2024
1 parent e50127e commit 5d6ce18
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 196 deletions.
6 changes: 4 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
Release notes
=============

1.4.7 (unreleased)
1.5.0 (unreleased)
------------------

- Nothing changed yet.
- Revamp guest agent connection handling and try harder to thaw a VM
that didn't timely manage to freeze. Increase freeze timeout, too.
(PL-132809)


1.4.6 (2024-09-26)
Expand Down
41 changes: 23 additions & 18 deletions src/fc/qemu/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,10 @@ def ensure_offline(self):
self.stop()
else:
self.log.info(
"ensure-state", wanted="offline", found="offline", action="none"
"ensure-state",
wanted="offline",
found="offline",
action="none",
)

def ensure_online_remote(self):
Expand Down Expand Up @@ -1225,23 +1228,25 @@ def frozen_vm(self):
"""
frozen = False
try:
if self.qemu.is_running():
self.log.info("freeze", volume="root")
try:
self.qemu.freeze()
frozen = True
except Exception as e:
self.log.error(
"freeze-failed",
reason=str(e),
action="continue",
machine=self.name,
)
yield frozen
finally:
if self.qemu.is_running():
self.ensure_thawed()
if self.qemu.is_running():
self.log.info("freeze", volume="root")
try:
self.qemu.freeze()
frozen = True
except Exception as e:
self.log.error(
"freeze-failed",
reason=str(e),
action="continue",
machine=self.name,
)
yield frozen
# We're not doing this with a finally because if qemu.freeze failed
# initially then we can't reliably communicate with the agent and
# there are measures already in place to assist in the unreliable
# case in `freeze()`
if frozen and self.qemu.is_running():
self.ensure_thawed()

@locked()
@running(True)
Expand Down
5 changes: 5 additions & 0 deletions src/fc/qemu/hazmat/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ class ClientStub(object):
messages_sent: typing.List[bytes]
responses: typing.List[str]

receive_buffer = ""

def __init__(self):
self.messages_sent = []
self.responses = []
Expand All @@ -278,6 +280,9 @@ def close(self):
def send(self, msg: bytes):
self.messages_sent.append(msg)

def recv(self, buffersize):
return self.receive_buffer

def makefile(self):
pseudo_socket_filename = tempfile.mktemp(dir=tmpdir)
with open(pseudo_socket_filename, "w") as f:
Expand Down
143 changes: 95 additions & 48 deletions src/fc/qemu/hazmat/guestagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,129 @@ class GuestAgent(object):
def __init__(self, machine, timeout, client_factory=socket.socket):
self.machine = machine
self.timeout = timeout
self.log = log.bind(machine=machine)
self.log = log.bind(machine=machine, subsystem="qemu/guestagent")
self.file = None

self.client_factory = client_factory
self.client = None

def read(self):
def read(self, unwrap=True):
"""Reads single response from the GA and returns the result.
Blocks and runs into a timeout if no response is available.
"""
result = json.loads(self.file.readline())
try:
result = self.file.readline()
except socket.timeout:
self.log.debug("read-timeout")
self.disconnect()
raise
result = json.loads(result)
self.log.debug("read", result=result)
if "error" in result:
raise ClientError(result)
return result["return"]
if unwrap:
# Some commands, like guest-ping do not return a result. But we do

This comment has been minimized.

Copy link
@osnyx

osnyx Oct 11, 2024

Member

I do not understand how guest-ping could have worked before this change without having raised a key error. Or did we just not use it until now?

This comment has been minimized.

Copy link
@osnyx

osnyx Oct 11, 2024

Member

Ok, I've seen that we've just introduced the usage of guest-ping in these code changes.

# not want to silently swallow errors if a return value is missing
# but expected.
return result["return"]
# However, to ensure that things like the guest-ping did receive a
# proper result structure (e.g. {}) we do return it, so the command
# can detect whether everything is as expected. We explicitly do not
# just silently return `None` here.
return result

def cmd(self, cmd, flush_ga_parser=False, timeout=None, **args):
def cmd(self, cmd, timeout=None, fire_and_forget=False, **args):
"""Issues GA command and returns the result.
All **args need to be serialisable to JSON, that implies that `bytes` are *not*
valid.
All **args need to be serialisable to JSON, that implies that `bytes`
are *not* valid.
"""
self.connect()
message = json.dumps({"execute": cmd, "arguments": args})
message = message.encode("ascii")
if flush_ga_parser:
# \xff is an invalid utf-8 character and recommended to safely
# ensure that the parser of the guest agent at the other end
# is reset to a known state. This is recommended for sync.
# http://wiki.qemu-project.org/index.php/Features/GuestAgent#guest-sync
message = b"\xff" + message
timeout = timeout or self.timeout
# Allow setting temporary timeouts for operations that are known to be
# slow.
self.client.settimeout(timeout)
message = message.encode("utf-8")
self.client.send(message)
return self.read()
if not fire_and_forget:
self.client.settimeout(timeout or self.timeout)
return self.read(unwrap=(cmd != "guest-ping"))

def sync(self):
"""Ensures that request and response are in order."""
sync_id = random.randint(0, 0xFFFF)
n = 0

# Phase 1: ensure a low-level thaw command. This is an emergency safety
# belt. We really do not want the VM to be accidentally stuck in a
# frozen state.
self.log.debug("sync-gratuitous-thaw")
self.client.send(
json.dumps({"execute": "guest-fsfreeze-thaw"}).encode("utf-8")
)

# Phase 2: clear the connection buffer from previous connections.
# We set a very short timeout (nonblocking does not help, the kernel
# needs a little time to give access to the data already buffered).
# However, we want to keep it as short as possible because this timeout
# will always happen in the happy case which is most of the time.
self.client.settimeout(1)
self.log.debug("clear-buffer")
try:
result = self.cmd("guest-sync", id=sync_id, flush_ga_parser=True)
except ClientError:
# we tripped a client error as we caused the guest agent to notice
# invalid json, which in turn triggers an error response
result = self.read()
except socket.error as e:
# Maybe a timeout. Keep trying a little bit harder.
result = str(e)
while n < 10:
if result == sync_id:
return
self.log.error(
"incorrect-sync-id", expected=sync_id, got=result, tries=n
while buffer := self.client.recv(4096):

This comment has been minimized.

Copy link
@osnyx

osnyx Oct 11, 2024

Member

How did you arrive at a buffer read size of 4096?

This comment has been minimized.

Copy link
@ctheune

ctheune Oct 12, 2024

Author Collaborator

On Unix buffers should generally be sized both along powers of 2 and also typical page sizes. As we went to clear out the buffer here as quickly as possible I typically use the general page size (4k) to balance memory requirements and performance. Smaller sizes than page sizes lead to inefficient CPU usage (as a page will need to be allocated anyway) due to more cycles needed. Larger buffer sizes like 8/16/64k would be fine too, but I'm not expecting those to actually happen. That would mostly be the case if the guest agent is stuck sending file content from a previous read-file command, which we don't use for any large sizes at all.

self.log.debug("found-buffer-garbage", buffer=buffer)
except (BlockingIOError, socket.timeout):
self.log.debug("cleared-buffer")

# Phase 3: ensure we see proper agent interactions. To be sure we
# test this with two diagnostic calls. The timeout can be higher now
# as we expect the agent to actually have to respond to us.

# \xff is an invalid utf-8 character and recommended to safely
# ensure that the parser of the guest agent at the other end
# is reset to a known state. This is recommended for sync.
# http://wiki.qemu-project.org/index.php/Features/GuestAgent#guest-sync
self.log.debug("clear-guest-parser")
self.client.send(b"\xff")

ping_result = self.cmd("guest-ping")
if ping_result != {}:
raise ClientError(
f"Agent did not respond properly to ping command: {ping_result}"
)
n += 1
try:
result = self.read()
except (ClientError, socket.error):
# we tripped a client error later than right now. There may
# have been a response still in the queue.
pass

sync_id = random.randint(0, 0xFFFF)
result = self.cmd("guest-sync", timeout=30, id=sync_id)

self.log.debug("sync-response", expected=sync_id, got=result)
if result == sync_id:
return

raise ClientError(
"Unable to sync with guest agent after {} tries.".format(n)
f"Unable to sync with guest agent. Got invalid sync_id {sync_id}"
)

def __enter__(self):
def connect(self):
if self.client and self.file:
return
self.disconnect()
self.client = self.client_factory(socket.AF_UNIX, socket.SOCK_STREAM)
self.client.settimeout(self.timeout)
self.client.connect("/run/qemu.{}.gqa.sock".format(self.machine))
self.file = self.client.makefile()
fcntl.flock(self.file.fileno(), fcntl.LOCK_EX)
self.sync()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.file.close()
self.client.close()
def disconnect(self):
if self.file or self.client:
self.log.debug("disconnect")
try:
if self.file:
self.file.close()
except Exception:
pass
finally:
self.file = None
try:
if self.client:
self.client.close()
except Exception:
pass
finally:
self.client = None
78 changes: 38 additions & 40 deletions src/fc/qemu/hazmat/qemu.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
# timeout of 3 seconds will cause everything to explode when
# the guest takes too long. We've seen 16 seconds as a regular
# period in some busy and large machines. I'm being _very_
# generous using a 120s timeout here.
# generous using a 5 minute timeout here. We've seen it get stuck longer
# than 2 minutes and the agent is very stubborn in those cases and really
# doesn't like if the client goes away ...
# This is a global variable so we can instrument it during testing.
FREEZE_TIMEOUT = 120
FREEZE_TIMEOUT = 300


class InvalidMigrationStatus(Exception):
Expand Down Expand Up @@ -75,7 +77,9 @@ def locked(self, *args, **kw):
finally:
self._global_lock_count -= 1
self.log.debug(
"global-lock-status", target=LOCK, count=self._global_lock_count
"global-lock-status",
target=LOCK,
count=self._global_lock_count,
)
if self._global_lock_count == 0:
self.log.debug("global-lock-release", target=LOCK)
Expand Down Expand Up @@ -186,6 +190,7 @@ def __enter__(self):
pass

def __exit__(self, exc_value, exc_type, exc_tb):
self.guestagent.disconnect()
if self.__qmp:
self.__qmp.close()
self.__qmp = None
Expand Down Expand Up @@ -361,52 +366,44 @@ def start(self):
return

def freeze(self):
with self.guestagent as guest:
try:
# This request may take a _long_ _long_ time and the default
# timeout of 3 seconds will cause everything to explode when
# the guest takes too long. We've seen 16 seconds as a regular
# period in some busy and large machines. I'm being _very_
# generous using a 120s timeout here.
guest.cmd("guest-fsfreeze-freeze", timeout=FREEZE_TIMEOUT)
except ClientError:
self.log.debug("guest-fsfreeze-freeze-failed", exc_info=True)
assert guest.cmd("guest-fsfreeze-status") == "frozen"

def _thaw_via_guest_agent(self):
with self.guestagent as guest:
try:
guest.cmd("guest-fsfreeze-thaw")
except ClientError:
self.log.debug("guest-fsfreeze-freeze-thaw", exc_info=True)
raise
result = guest.cmd("guest-fsfreeze-status")
if result != "thawed":
raise RuntimeError("Unexpected thaw result: {}".format(result))
try:
# This request may take a _long_ _long_ time and the default
# timeout of 3 seconds will cause everything to explode when
# the guest takes too long. We've seen 16 seconds as a regular
# period in some busy and large machines. So we increase this
# to a lot more and also perform a gratuitous thaw in case
# we error out.
self.guestagent.cmd("guest-fsfreeze-freeze", timeout=FREEZE_TIMEOUT)
except ClientError:
self.log.debug("guest-fsfreeze-freeze-failed", exc_info=True)
self.guestagent.cmd("guest-fsfreeze-thaw", fire_and_forget=True)
assert self.guestagent.cmd("guest-fsfreeze-status") == "frozen"

def thaw(self):
try:
self._thaw_via_guest_agent()
self.guestagent.cmd("guest-fsfreeze-thaw")
result = self.guestagent.cmd("guest-fsfreeze-status")
if result != "thawed":
raise RuntimeError("Unexpected thaw result: {}".format(result))
except Exception:
self.log.warning("guest-fsfreeze-thaw-failed", exc_info=True)
raise

def write_file(self, path, content: bytes):
if not isinstance(content, bytes):
raise TypeError("Expected bytes, got string.")
with self.guestagent as guest:
try:
handle = guest.cmd("guest-file-open", path=path, mode="w")
guest.cmd(
"guest-file-write",
handle=handle,
# The ASCII armour needs to be turned into text again, because the
# JSON encoder doesn't handle bytes-like objects.
**{"buf-b64": encode(content, "base64").decode("ascii")},
)
guest.cmd("guest-file-close", handle=handle)
except ClientError:
self.log.error("guest-write-file", exc_info=True)
try:
handle = self.guestagent.cmd("guest-file-open", path=path, mode="w")
self.guestagent.cmd(
"guest-file-write",
handle=handle,
# The ASCII armour needs to be turned into text again, because the
# JSON encoder doesn't handle bytes-like objects.
**{"buf-b64": encode(content, "base64").decode("ascii")},
)
self.guestagent.cmd("guest-file-close", handle=handle)
except ClientError:
self.log.error("guest-write-file", exc_info=True)

def inmigrate(self):
self._start(["-incoming {}".format(self.migration_address)])
Expand Down Expand Up @@ -436,7 +433,8 @@ def migrate(self, address):
)
self.qmp.command("migrate", uri=address)
self.log.debug(
"migrate-parameters", **self.qmp.command("query-migrate-parameters")
"migrate-parameters",
**self.qmp.command("query-migrate-parameters"),
)

def poll_migration_status(self, timeout=30):
Expand Down
Loading

0 comments on commit 5d6ce18

Please sign in to comment.