summaryrefslogtreecommitdiff
path: root/python/qemu/qmp/qmp_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qemu/qmp/qmp_client.py')
-rw-r--r--python/qemu/qmp/qmp_client.py655
1 files changed, 655 insertions, 0 deletions
diff --git a/python/qemu/qmp/qmp_client.py b/python/qemu/qmp/qmp_client.py
new file mode 100644
index 0000000000..5dcda04a75
--- /dev/null
+++ b/python/qemu/qmp/qmp_client.py
@@ -0,0 +1,655 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import asyncio
+import logging
+import socket
+import struct
+from typing import (
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Union,
+ cast,
+)
+
+from .error import ProtocolError, QMPError
+from .events import Events
+from .message import Message
+from .models import ErrorResponse, Greeting
+from .protocol import AsyncProtocol, Runstate, require
+from .util import (
+ bottom_half,
+ exception_summary,
+ pretty_traceback,
+ upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+ """
+ Abstract exception class for Protocol errors that wrap an Exception.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ self.exc = exc
+
+ def __str__(self) -> str:
+ return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+ """
+ An exception occurred during the Greeting phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class NegotiationError(_WrappedProtocolError):
+ """
+ An exception occurred during the Negotiation phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class ExecuteError(QMPError):
+ """
+ Exception raised by `QMPClient.execute()` on RPC failure.
+
+ :param error_response: The RPC error response object.
+ :param sent: The sent RPC message that caused the failure.
+ :param received: The raw RPC error reply received.
+ """
+ def __init__(self, error_response: ErrorResponse,
+ sent: Message, received: Message):
+ super().__init__(error_response.error.desc)
+ #: The sent `Message` that caused the failure
+ self.sent: Message = sent
+ #: The received `Message` that indicated failure
+ self.received: Message = received
+ #: The parsed error response
+ self.error: ErrorResponse = error_response
+ #: The QMP error class
+ self.error_class: str = error_response.error.class_
+
+
+class ExecInterruptedError(QMPError):
+ """
+ Exception raised by `execute()` (et al) when an RPC is interrupted.
+
+ This error is raised when an `execute()` statement could not be
+ completed. This can occur because the connection itself was
+ terminated before a reply was received.
+
+ The true cause of the interruption will be available via `disconnect()`.
+ """
+
+
+class _MsgProtocolError(ProtocolError):
+ """
+ Abstract error class for protocol errors that have a `Message` object.
+
+ This Exception class is used for protocol errors where the `Message`
+ was mechanically understood, but was found to be inappropriate or
+ malformed.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+ def __init__(self, error_message: str, msg: Message):
+ super().__init__(error_message)
+ #: The received `Message` that caused the error.
+ self.msg: Message = msg
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" Message was: {str(self.msg)}\n",
+ ])
+
+
+class ServerParseError(_MsgProtocolError):
+ """
+ The Server sent a `Message` indicating parsing failure.
+
+ i.e. A reply has arrived from the server, but it is missing the "ID"
+ field, indicating a parsing error.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+
+
+class BadReplyError(_MsgProtocolError):
+ """
+ An execution reply was successfully routed, but not understood.
+
+ If a QMP message is received with an 'id' field to allow it to be
+ routed, but is otherwise malformed, this exception will be raised.
+
+ A reply message is malformed if it is missing either the 'return' or
+ 'error' keys, or if the 'error' value has missing keys or members of
+ the wrong type.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The malformed reply that was received.
+ :param sent: The message that was sent that prompted the error.
+ """
+ def __init__(self, error_message: str, msg: Message, sent: Message):
+ super().__init__(error_message, msg)
+ #: The sent `Message` that caused the failure
+ self.sent = sent
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+ """
+ Implements a QMP client connection.
+
+ QMP can be used to establish a connection as either the transport
+ client or server, though this class always acts as the QMP client.
+
+ :param name: Optional nickname for the connection, used for logging.
+
+ Basic script-style usage looks like this::
+
+ qmp = QMPClient('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('block-query')
+ ...
+ await qmp.disconnect()
+
+ Basic async client-style usage looks like this::
+
+ class Client:
+ def __init__(self, name: str):
+ self.qmp = QMPClient(name)
+
+ async def watch_events(self):
+ try:
+ async for event in self.qmp.events:
+ print(f"Event: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self, address='/tmp/qemu.socket'):
+ await self.qmp.connect(address)
+ asyncio.create_task(self.watch_events())
+ await self.qmp.runstate_changed.wait()
+ await self.disconnect()
+
+ See `qmp.events` for more detail on event handling patterns.
+ """
+ #: Logger object used for debugging messages.
+ logger = logging.getLogger(__name__)
+
+ # Read buffer limit; large enough to accept query-qmp-schema
+ _limit = (256 * 1024)
+
+ # Type alias for pending execute() result items
+ _PendingT = Union[Message, ExecInterruptedError]
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ super().__init__(name)
+ Events.__init__(self)
+
+ #: Whether or not to await a greeting after establishing a connection.
+ self.await_greeting: bool = True
+
+ #: Whether or not to perform capabilities negotiation upon connection.
+ #: Implies `await_greeting`.
+ self.negotiate: bool = True
+
+ # Cached Greeting, if one was awaited.
+ self._greeting: Optional[Greeting] = None
+
+ # Command ID counter
+ self._execute_id = 0
+
+ # Incoming RPC reply messages.
+ self._pending: Dict[
+ Union[str, None],
+ 'asyncio.Queue[QMPClient._PendingT]'
+ ] = {}
+
+ @property
+ def greeting(self) -> Optional[Greeting]:
+ """The `Greeting` from the QMP server, if any."""
+ return self._greeting
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Initiate the QMP session.
+
+ Wait for the QMP greeting and perform capabilities negotiation.
+
+ :raise GreetingError: When the greeting is not understood.
+ :raise NegotiationError: If the negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self._greeting = None
+ self._pending = {}
+
+ if self.await_greeting or self.negotiate:
+ self._greeting = await self._get_greeting()
+
+ if self.negotiate:
+ await self._negotiate()
+
+ # This will start the reader/writers:
+ await super()._establish_session()
+
+ @upper_half
+ async def _get_greeting(self) -> Greeting:
+ """
+ :raise GreetingError: When the greeting is not understood.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+
+ :return: the Greeting object given by the server.
+ """
+ self.logger.debug("Awaiting greeting ...")
+
+ try:
+ msg = await self._recv()
+ return Greeting(msg)
+ except (ProtocolError, KeyError, TypeError) as err:
+ emsg = "Did not understand Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise GreetingError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Failed to receive Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @upper_half
+ async def _negotiate(self) -> None:
+ """
+ Perform QMP capabilities negotiation.
+
+ :raise NegotiationError: When negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self.logger.debug("Negotiating capabilities ...")
+
+ arguments: Dict[str, List[str]] = {}
+ if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+ arguments.setdefault('enable', []).append('oob')
+ msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+ # It's not safe to use execute() here, because the reader/writers
+ # aren't running. AsyncProtocol *requires* that a new session
+ # does not fail after the reader/writers are running!
+ try:
+ await self._send(msg)
+ reply = await self._recv()
+ assert 'return' in reply
+ assert 'error' not in reply
+ except (ProtocolError, AssertionError) as err:
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise NegotiationError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @bottom_half
+ async def _bh_disconnect(self) -> None:
+ try:
+ await super()._bh_disconnect()
+ finally:
+ if self._pending:
+ self.logger.debug("Cancelling pending executions")
+ keys = self._pending.keys()
+ for key in keys:
+ self.logger.debug("Cancelling execution '%s'", key)
+ self._pending[key].put_nowait(
+ ExecInterruptedError("Disconnected")
+ )
+
+ self.logger.debug("QMP Disconnected.")
+
+ @upper_half
+ def _cleanup(self) -> None:
+ super()._cleanup()
+ assert not self._pending
+
+ @bottom_half
+ async def _on_message(self, msg: Message) -> None:
+ """
+ Add an incoming message to the appropriate queue/handler.
+
+ :raise ServerParseError: When Message indicates server parse failure.
+ """
+ # Incoming messages are not fully parsed/validated here;
+ # do only light peeking to know how to route the messages.
+
+ if 'event' in msg:
+ await self._event_dispatch(msg)
+ return
+
+ # Below, we assume everything left is an execute/exec-oob response.
+
+ exec_id = cast(Optional[str], msg.get('id'))
+
+ if exec_id in self._pending:
+ await self._pending[exec_id].put(msg)
+ return
+
+ # We have a message we can't route back to a caller.
+
+ is_error = 'error' in msg
+ has_id = 'id' in msg
+
+ if is_error and not has_id:
+ # This is very likely a server parsing error.
+ # It doesn't inherently belong to any pending execution.
+ # Instead of performing clever recovery, just terminate.
+ # See "NOTE" in qmp-spec.txt, section 2.4.2
+ raise ServerParseError(
+ ("Server sent an error response without an ID, "
+ "but there are no ID-less executions pending. "
+ "Assuming this is a server parser failure."),
+ msg
+ )
+
+ # qmp-spec.txt, section 2.4:
+ # 'Clients should drop all the responses
+ # that have an unknown "id" field.'
+ self.logger.log(
+ logging.ERROR if is_error else logging.WARNING,
+ "Unknown ID '%s', message dropped.",
+ exec_id,
+ )
+ self.logger.debug("Unroutable message: %s", str(msg))
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> Message:
+ """
+ :raise OSError: When a stream error is encountered.
+ :raise EOFError: When the stream is at EOF.
+ :raise ProtocolError:
+ When the Message is not understood.
+ See also `Message._deserialize`.
+
+ :return: A single QMP `Message`.
+ """
+ msg_bytes = await self._readline()
+ msg = Message(msg_bytes, eager=True)
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: Message) -> None:
+ """
+ :raise ValueError: JSON serialization failure
+ :raise TypeError: JSON serialization failure
+ :raise OSError: When a stream error is encountered.
+ """
+ assert self._writer is not None
+ self._writer.write(bytes(msg))
+
+ @upper_half
+ def _get_exec_id(self) -> str:
+ exec_id = f"__qmp#{self._execute_id:05d}"
+ self._execute_id += 1
+ return exec_id
+
+ @upper_half
+ async def _issue(self, msg: Message) -> Union[None, str]:
+ """
+ Issue a QMP `Message` and do not wait for a reply.
+
+ :param msg: The QMP `Message` to send to the server.
+
+ :return: The ID of the `Message` sent.
+ """
+ msg_id: Optional[str] = None
+ if 'id' in msg:
+ assert isinstance(msg['id'], str)
+ msg_id = msg['id']
+
+ self._pending[msg_id] = asyncio.Queue(maxsize=1)
+ try:
+ await self._outgoing.put(msg)
+ except:
+ del self._pending[msg_id]
+ raise
+
+ return msg_id
+
+ @upper_half
+ async def _reply(self, msg_id: Union[str, None]) -> Message:
+ """
+ Await a reply to a previously issued QMP message.
+
+ :param msg_id: The ID of the previously issued message.
+
+ :return: The reply from the server.
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ """
+ queue = self._pending[msg_id]
+
+ try:
+ result = await queue.get()
+ if isinstance(result, ExecInterruptedError):
+ raise result
+ return result
+ finally:
+ del self._pending[msg_id]
+
+ @upper_half
+ async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
+ """
+ Send a QMP `Message` to the server and await a reply.
+
+ This method *assumes* you are sending some kind of an execute
+ statement that *will* receive a reply.
+
+ An execution ID will be assigned if assign_id is `True`. It can be
+ disabled, but this requires that an ID is manually assigned
+ instead. For manually assigned IDs, you must not use the string
+ '__qmp#' anywhere in the ID.
+
+ :param msg: The QMP `Message` to execute.
+ :param assign_id: If True, assign a new execution ID.
+
+ :return: Execution reply from the server.
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ """
+ if assign_id:
+ msg['id'] = self._get_exec_id()
+ elif 'id' in msg:
+ assert isinstance(msg['id'], str)
+ assert '__qmp#' not in msg['id']
+
+ exec_id = await self._issue(msg)
+ return await self._reply(exec_id)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ async def _raw(
+ self,
+ msg: Union[Message, Mapping[str, object], bytes],
+ assign_id: bool = True,
+ ) -> Message:
+ """
+ Issue a raw `Message` to the QMP server and await a reply.
+
+ :param msg:
+ A Message to send to the server. It may be a `Message`, any
+ Mapping (including Dict), or raw bytes.
+ :param assign_id:
+ Assign an arbitrary execution ID to this message. If
+ `False`, the existing id must either be absent (and no other
+ such pending execution may omit an ID) or a string. If it is
+ a string, it must not start with '__qmp#' and no other such
+ pending execution may currently be using that ID.
+
+ :return: Execution reply from the server.
+
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ :raise TypeError:
+ When assign_id is `False`, an ID is given, and it is not a string.
+ :raise ValueError:
+ When assign_id is `False`, but the ID is not usable;
+ Either because it starts with '__qmp#' or it is already in-use.
+ """
+ # 1. convert generic Mapping or bytes to a QMP Message
+ # 2. copy Message objects so that we assign an ID only to the copy.
+ msg = Message(msg)
+
+ exec_id = msg.get('id')
+ if not assign_id and 'id' in msg:
+ if not isinstance(exec_id, str):
+ raise TypeError(f"ID ('{exec_id}') must be a string.")
+ if exec_id.startswith('__qmp#'):
+ raise ValueError(
+ f"ID ('{exec_id}') must not start with '__qmp#'."
+ )
+
+ if not assign_id and exec_id in self._pending:
+ raise ValueError(
+ f"ID '{exec_id}' is in-use and cannot be used."
+ )
+
+ return await self._execute(msg, assign_id=assign_id)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ async def execute_msg(self, msg: Message) -> object:
+ """
+ Execute a QMP command and return its value.
+
+ :param msg: The QMP `Message` to execute.
+
+ :return:
+ The command execution return value from the server. The type of
+ object returned depends on the command that was issued,
+ though most in QEMU return a `dict`.
+ :raise ValueError:
+ If the QMP `Message` does not have either the 'execute' or
+ 'exec-oob' fields set.
+ :raise ExecuteError: When the server returns an error response.
+ :raise ExecInterruptedError: if the connection was terminated early.
+ """
+ if not ('execute' in msg or 'exec-oob' in msg):
+ raise ValueError("Requires 'execute' or 'exec-oob' message")
+
+ # Copy the Message so that the ID assigned by _execute() is
+ # local to this method; allowing the ID to be seen in raised
+ # Exceptions but without modifying the caller's held copy.
+ msg = Message(msg)
+ reply = await self._execute(msg)
+
+ if 'error' in reply:
+ try:
+ error_response = ErrorResponse(reply)
+ except (KeyError, TypeError) as err:
+ # Error response was malformed.
+ raise BadReplyError(
+ "QMP error reply is malformed", reply, msg,
+ ) from err
+
+ raise ExecuteError(error_response, msg, reply)
+
+ if 'return' not in reply:
+ raise BadReplyError(
+ "QMP reply is missing a 'error' or 'return' member",
+ reply, msg,
+ )
+
+ return reply['return']
+
+ @classmethod
+ def make_execute_msg(cls, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> Message:
+ """
+ Create an executable message to be sent by `execute_msg` later.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return: An executable QMP `Message`.
+ """
+ msg = Message({'exec-oob' if oob else 'execute': cmd})
+ if arguments is not None:
+ msg['arguments'] = arguments
+ return msg
+
+ @upper_half
+ async def execute(self, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> object:
+ """
+ Execute a QMP command and return its value.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return:
+ The command execution return value from the server. The type of
+ object returned depends on the command that was issued,
+ though most in QEMU return a `dict`.
+ :raise ExecuteError: When the server returns an error response.
+ :raise ExecInterruptedError: if the connection was terminated early.
+ """
+ msg = self.make_execute_msg(cmd, arguments, oob=oob)
+ return await self.execute_msg(msg)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ def send_fd_scm(self, fd: int) -> None:
+ """
+ Send a file descriptor to the remote via SCM_RIGHTS.
+ """
+ assert self._writer is not None
+ sock = self._writer.transport.get_extra_info('socket')
+
+ if sock.family != socket.AF_UNIX:
+ raise QMPError("Sending file descriptors requires a UNIX socket.")
+
+ if not hasattr(sock, 'sendmsg'):
+ # We need to void the warranty sticker.
+ # Access to sendmsg is scheduled for removal in Python 3.11.
+ # Find the real backing socket to use it anyway.
+ sock = sock._sock # pylint: disable=protected-access
+
+ sock.sendmsg(
+ [b' '],
+ [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
+ )