aef633e765
Pylint 2.16 adds a few new checks that cause the optional check-tox CI job to fail. 1. The superfluous-parens check seems to be a bit more aggressive, 2. broad-exception-raised is new; it discourages "raise Exception". Fix these minor issues and turn the lights green. Signed-off-by: John Snow <jsnow@redhat.com> Reviewed-by: Philippe Mathieu-Daudé <philmd@linaro.org> Reviewed-by: Beraldo Leal <bleal@redhat.com> Message-id: 20230210003147.1309376-2-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
656 lines
22 KiB
Python
656 lines
22 KiB
Python
"""
|
|
QMP Protocol Implementation
|
|
|
|
This module provides the `QMPClient` class, which can be used to connect
|
|
and send commands to a QMP server such as QEMU. The QMP class can be
|
|
used to either connect to a listening server, or used to listen and
|
|
accept an incoming connection from that server.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import socket
|
|
import struct
|
|
from typing import (
|
|
Dict,
|
|
List,
|
|
Mapping,
|
|
Optional,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
from .error import ProtocolError, QMPError
|
|
from .events import Events
|
|
from .message import Message
|
|
from .models import ErrorResponse, Greeting
|
|
from .protocol import AsyncProtocol, Runstate, require
|
|
from .util import (
|
|
bottom_half,
|
|
exception_summary,
|
|
pretty_traceback,
|
|
upper_half,
|
|
)
|
|
|
|
|
|
class _WrappedProtocolError(ProtocolError):
|
|
"""
|
|
Abstract exception class for Protocol errors that wrap an Exception.
|
|
|
|
:param error_message: Human-readable string describing the error.
|
|
:param exc: The root-cause exception.
|
|
"""
|
|
def __init__(self, error_message: str, exc: Exception):
|
|
super().__init__(error_message)
|
|
self.exc = exc
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.error_message}: {self.exc!s}"
|
|
|
|
|
|
class GreetingError(_WrappedProtocolError):
|
|
"""
|
|
An exception occurred during the Greeting phase.
|
|
|
|
:param error_message: Human-readable string describing the error.
|
|
:param exc: The root-cause exception.
|
|
"""
|
|
|
|
|
|
class NegotiationError(_WrappedProtocolError):
|
|
"""
|
|
An exception occurred during the Negotiation phase.
|
|
|
|
:param error_message: Human-readable string describing the error.
|
|
:param exc: The root-cause exception.
|
|
"""
|
|
|
|
|
|
class ExecuteError(QMPError):
|
|
"""
|
|
Exception raised by `QMPClient.execute()` on RPC failure.
|
|
|
|
:param error_response: The RPC error response object.
|
|
:param sent: The sent RPC message that caused the failure.
|
|
:param received: The raw RPC error reply received.
|
|
"""
|
|
def __init__(self, error_response: ErrorResponse,
|
|
sent: Message, received: Message):
|
|
super().__init__(error_response.error.desc)
|
|
#: The sent `Message` that caused the failure
|
|
self.sent: Message = sent
|
|
#: The received `Message` that indicated failure
|
|
self.received: Message = received
|
|
#: The parsed error response
|
|
self.error: ErrorResponse = error_response
|
|
#: The QMP error class
|
|
self.error_class: str = error_response.error.class_
|
|
|
|
|
|
class ExecInterruptedError(QMPError):
|
|
"""
|
|
Exception raised by `execute()` (et al) when an RPC is interrupted.
|
|
|
|
This error is raised when an `execute()` statement could not be
|
|
completed. This can occur because the connection itself was
|
|
terminated before a reply was received.
|
|
|
|
The true cause of the interruption will be available via `disconnect()`.
|
|
"""
|
|
|
|
|
|
class _MsgProtocolError(ProtocolError):
|
|
"""
|
|
Abstract error class for protocol errors that have a `Message` object.
|
|
|
|
This Exception class is used for protocol errors where the `Message`
|
|
was mechanically understood, but was found to be inappropriate or
|
|
malformed.
|
|
|
|
:param error_message: Human-readable string describing the error.
|
|
:param msg: The QMP `Message` that caused the error.
|
|
"""
|
|
def __init__(self, error_message: str, msg: Message):
|
|
super().__init__(error_message)
|
|
#: The received `Message` that caused the error.
|
|
self.msg: Message = msg
|
|
|
|
def __str__(self) -> str:
|
|
return "\n".join([
|
|
super().__str__(),
|
|
f" Message was: {str(self.msg)}\n",
|
|
])
|
|
|
|
|
|
class ServerParseError(_MsgProtocolError):
|
|
"""
|
|
The Server sent a `Message` indicating parsing failure.
|
|
|
|
i.e. A reply has arrived from the server, but it is missing the "ID"
|
|
field, indicating a parsing error.
|
|
|
|
:param error_message: Human-readable string describing the error.
|
|
:param msg: The QMP `Message` that caused the error.
|
|
"""
|
|
|
|
|
|
class BadReplyError(_MsgProtocolError):
|
|
"""
|
|
An execution reply was successfully routed, but not understood.
|
|
|
|
If a QMP message is received with an 'id' field to allow it to be
|
|
routed, but is otherwise malformed, this exception will be raised.
|
|
|
|
A reply message is malformed if it is missing either the 'return' or
|
|
'error' keys, or if the 'error' value has missing keys or members of
|
|
the wrong type.
|
|
|
|
:param error_message: Human-readable string describing the error.
|
|
:param msg: The malformed reply that was received.
|
|
:param sent: The message that was sent that prompted the error.
|
|
"""
|
|
def __init__(self, error_message: str, msg: Message, sent: Message):
|
|
super().__init__(error_message, msg)
|
|
#: The sent `Message` that caused the failure
|
|
self.sent = sent
|
|
|
|
|
|
class QMPClient(AsyncProtocol[Message], Events):
|
|
"""
|
|
Implements a QMP client connection.
|
|
|
|
QMP can be used to establish a connection as either the transport
|
|
client or server, though this class always acts as the QMP client.
|
|
|
|
:param name: Optional nickname for the connection, used for logging.
|
|
|
|
Basic script-style usage looks like this::
|
|
|
|
qmp = QMPClient('my_virtual_machine_name')
|
|
await qmp.connect(('127.0.0.1', 1234))
|
|
...
|
|
res = await qmp.execute('block-query')
|
|
...
|
|
await qmp.disconnect()
|
|
|
|
Basic async client-style usage looks like this::
|
|
|
|
class Client:
|
|
def __init__(self, name: str):
|
|
self.qmp = QMPClient(name)
|
|
|
|
async def watch_events(self):
|
|
try:
|
|
async for event in self.qmp.events:
|
|
print(f"Event: {event['event']}")
|
|
except asyncio.CancelledError:
|
|
return
|
|
|
|
async def run(self, address='/tmp/qemu.socket'):
|
|
await self.qmp.connect(address)
|
|
asyncio.create_task(self.watch_events())
|
|
await self.qmp.runstate_changed.wait()
|
|
await self.disconnect()
|
|
|
|
See `qmp.events` for more detail on event handling patterns.
|
|
"""
|
|
#: Logger object used for debugging messages.
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Read buffer limit; 10MB like libvirt default
|
|
_limit = 10 * 1024 * 1024
|
|
|
|
# Type alias for pending execute() result items
|
|
_PendingT = Union[Message, ExecInterruptedError]
|
|
|
|
def __init__(self, name: Optional[str] = None) -> None:
|
|
super().__init__(name)
|
|
Events.__init__(self)
|
|
|
|
#: Whether or not to await a greeting after establishing a connection.
|
|
self.await_greeting: bool = True
|
|
|
|
#: Whether or not to perform capabilities negotiation upon connection.
|
|
#: Implies `await_greeting`.
|
|
self.negotiate: bool = True
|
|
|
|
# Cached Greeting, if one was awaited.
|
|
self._greeting: Optional[Greeting] = None
|
|
|
|
# Command ID counter
|
|
self._execute_id = 0
|
|
|
|
# Incoming RPC reply messages.
|
|
self._pending: Dict[
|
|
Union[str, None],
|
|
'asyncio.Queue[QMPClient._PendingT]'
|
|
] = {}
|
|
|
|
@property
|
|
def greeting(self) -> Optional[Greeting]:
|
|
"""The `Greeting` from the QMP server, if any."""
|
|
return self._greeting
|
|
|
|
@upper_half
|
|
async def _establish_session(self) -> None:
|
|
"""
|
|
Initiate the QMP session.
|
|
|
|
Wait for the QMP greeting and perform capabilities negotiation.
|
|
|
|
:raise GreetingError: When the greeting is not understood.
|
|
:raise NegotiationError: If the negotiation fails.
|
|
:raise EOFError: When the server unexpectedly hangs up.
|
|
:raise OSError: For underlying stream errors.
|
|
"""
|
|
self._greeting = None
|
|
self._pending = {}
|
|
|
|
if self.await_greeting or self.negotiate:
|
|
self._greeting = await self._get_greeting()
|
|
|
|
if self.negotiate:
|
|
await self._negotiate()
|
|
|
|
# This will start the reader/writers:
|
|
await super()._establish_session()
|
|
|
|
@upper_half
|
|
async def _get_greeting(self) -> Greeting:
|
|
"""
|
|
:raise GreetingError: When the greeting is not understood.
|
|
:raise EOFError: When the server unexpectedly hangs up.
|
|
:raise OSError: For underlying stream errors.
|
|
|
|
:return: the Greeting object given by the server.
|
|
"""
|
|
self.logger.debug("Awaiting greeting ...")
|
|
|
|
try:
|
|
msg = await self._recv()
|
|
return Greeting(msg)
|
|
except (ProtocolError, KeyError, TypeError) as err:
|
|
emsg = "Did not understand Greeting"
|
|
self.logger.error("%s: %s", emsg, exception_summary(err))
|
|
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
|
|
raise GreetingError(emsg, err) from err
|
|
except BaseException as err:
|
|
# EOFError, OSError, or something unexpected.
|
|
emsg = "Failed to receive Greeting"
|
|
self.logger.error("%s: %s", emsg, exception_summary(err))
|
|
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
|
|
raise
|
|
|
|
@upper_half
|
|
async def _negotiate(self) -> None:
|
|
"""
|
|
Perform QMP capabilities negotiation.
|
|
|
|
:raise NegotiationError: When negotiation fails.
|
|
:raise EOFError: When the server unexpectedly hangs up.
|
|
:raise OSError: For underlying stream errors.
|
|
"""
|
|
self.logger.debug("Negotiating capabilities ...")
|
|
|
|
arguments: Dict[str, List[str]] = {}
|
|
if self._greeting and 'oob' in self._greeting.QMP.capabilities:
|
|
arguments.setdefault('enable', []).append('oob')
|
|
msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
|
|
|
|
# It's not safe to use execute() here, because the reader/writers
|
|
# aren't running. AsyncProtocol *requires* that a new session
|
|
# does not fail after the reader/writers are running!
|
|
try:
|
|
await self._send(msg)
|
|
reply = await self._recv()
|
|
assert 'return' in reply
|
|
assert 'error' not in reply
|
|
except (ProtocolError, AssertionError) as err:
|
|
emsg = "Negotiation failed"
|
|
self.logger.error("%s: %s", emsg, exception_summary(err))
|
|
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
|
|
raise NegotiationError(emsg, err) from err
|
|
except BaseException as err:
|
|
# EOFError, OSError, or something unexpected.
|
|
emsg = "Negotiation failed"
|
|
self.logger.error("%s: %s", emsg, exception_summary(err))
|
|
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
|
|
raise
|
|
|
|
@bottom_half
|
|
async def _bh_disconnect(self) -> None:
|
|
try:
|
|
await super()._bh_disconnect()
|
|
finally:
|
|
if self._pending:
|
|
self.logger.debug("Cancelling pending executions")
|
|
keys = self._pending.keys()
|
|
for key in keys:
|
|
self.logger.debug("Cancelling execution '%s'", key)
|
|
self._pending[key].put_nowait(
|
|
ExecInterruptedError("Disconnected")
|
|
)
|
|
|
|
self.logger.debug("QMP Disconnected.")
|
|
|
|
@upper_half
|
|
def _cleanup(self) -> None:
|
|
super()._cleanup()
|
|
assert not self._pending
|
|
|
|
@bottom_half
|
|
async def _on_message(self, msg: Message) -> None:
|
|
"""
|
|
Add an incoming message to the appropriate queue/handler.
|
|
|
|
:raise ServerParseError: When Message indicates server parse failure.
|
|
"""
|
|
# Incoming messages are not fully parsed/validated here;
|
|
# do only light peeking to know how to route the messages.
|
|
|
|
if 'event' in msg:
|
|
await self._event_dispatch(msg)
|
|
return
|
|
|
|
# Below, we assume everything left is an execute/exec-oob response.
|
|
|
|
exec_id = cast(Optional[str], msg.get('id'))
|
|
|
|
if exec_id in self._pending:
|
|
await self._pending[exec_id].put(msg)
|
|
return
|
|
|
|
# We have a message we can't route back to a caller.
|
|
|
|
is_error = 'error' in msg
|
|
has_id = 'id' in msg
|
|
|
|
if is_error and not has_id:
|
|
# This is very likely a server parsing error.
|
|
# It doesn't inherently belong to any pending execution.
|
|
# Instead of performing clever recovery, just terminate.
|
|
# See "NOTE" in qmp-spec.txt, section 2.4.2
|
|
raise ServerParseError(
|
|
("Server sent an error response without an ID, "
|
|
"but there are no ID-less executions pending. "
|
|
"Assuming this is a server parser failure."),
|
|
msg
|
|
)
|
|
|
|
# qmp-spec.txt, section 2.4:
|
|
# 'Clients should drop all the responses
|
|
# that have an unknown "id" field.'
|
|
self.logger.log(
|
|
logging.ERROR if is_error else logging.WARNING,
|
|
"Unknown ID '%s', message dropped.",
|
|
exec_id,
|
|
)
|
|
self.logger.debug("Unroutable message: %s", str(msg))
|
|
|
|
@upper_half
|
|
@bottom_half
|
|
async def _do_recv(self) -> Message:
|
|
"""
|
|
:raise OSError: When a stream error is encountered.
|
|
:raise EOFError: When the stream is at EOF.
|
|
:raise ProtocolError:
|
|
When the Message is not understood.
|
|
See also `Message._deserialize`.
|
|
|
|
:return: A single QMP `Message`.
|
|
"""
|
|
msg_bytes = await self._readline()
|
|
msg = Message(msg_bytes, eager=True)
|
|
return msg
|
|
|
|
@upper_half
|
|
@bottom_half
|
|
def _do_send(self, msg: Message) -> None:
|
|
"""
|
|
:raise ValueError: JSON serialization failure
|
|
:raise TypeError: JSON serialization failure
|
|
:raise OSError: When a stream error is encountered.
|
|
"""
|
|
assert self._writer is not None
|
|
self._writer.write(bytes(msg))
|
|
|
|
@upper_half
|
|
def _get_exec_id(self) -> str:
|
|
exec_id = f"__qmp#{self._execute_id:05d}"
|
|
self._execute_id += 1
|
|
return exec_id
|
|
|
|
@upper_half
|
|
async def _issue(self, msg: Message) -> Union[None, str]:
|
|
"""
|
|
Issue a QMP `Message` and do not wait for a reply.
|
|
|
|
:param msg: The QMP `Message` to send to the server.
|
|
|
|
:return: The ID of the `Message` sent.
|
|
"""
|
|
msg_id: Optional[str] = None
|
|
if 'id' in msg:
|
|
assert isinstance(msg['id'], str)
|
|
msg_id = msg['id']
|
|
|
|
self._pending[msg_id] = asyncio.Queue(maxsize=1)
|
|
try:
|
|
await self._outgoing.put(msg)
|
|
except:
|
|
del self._pending[msg_id]
|
|
raise
|
|
|
|
return msg_id
|
|
|
|
@upper_half
|
|
async def _reply(self, msg_id: Union[str, None]) -> Message:
|
|
"""
|
|
Await a reply to a previously issued QMP message.
|
|
|
|
:param msg_id: The ID of the previously issued message.
|
|
|
|
:return: The reply from the server.
|
|
:raise ExecInterruptedError:
|
|
When the reply could not be retrieved because the connection
|
|
was lost, or some other problem.
|
|
"""
|
|
queue = self._pending[msg_id]
|
|
|
|
try:
|
|
result = await queue.get()
|
|
if isinstance(result, ExecInterruptedError):
|
|
raise result
|
|
return result
|
|
finally:
|
|
del self._pending[msg_id]
|
|
|
|
@upper_half
|
|
async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
|
|
"""
|
|
Send a QMP `Message` to the server and await a reply.
|
|
|
|
This method *assumes* you are sending some kind of an execute
|
|
statement that *will* receive a reply.
|
|
|
|
An execution ID will be assigned if assign_id is `True`. It can be
|
|
disabled, but this requires that an ID is manually assigned
|
|
instead. For manually assigned IDs, you must not use the string
|
|
'__qmp#' anywhere in the ID.
|
|
|
|
:param msg: The QMP `Message` to execute.
|
|
:param assign_id: If True, assign a new execution ID.
|
|
|
|
:return: Execution reply from the server.
|
|
:raise ExecInterruptedError:
|
|
When the reply could not be retrieved because the connection
|
|
was lost, or some other problem.
|
|
"""
|
|
if assign_id:
|
|
msg['id'] = self._get_exec_id()
|
|
elif 'id' in msg:
|
|
assert isinstance(msg['id'], str)
|
|
assert '__qmp#' not in msg['id']
|
|
|
|
exec_id = await self._issue(msg)
|
|
return await self._reply(exec_id)
|
|
|
|
@upper_half
|
|
@require(Runstate.RUNNING)
|
|
async def _raw(
|
|
self,
|
|
msg: Union[Message, Mapping[str, object], bytes],
|
|
assign_id: bool = True,
|
|
) -> Message:
|
|
"""
|
|
Issue a raw `Message` to the QMP server and await a reply.
|
|
|
|
:param msg:
|
|
A Message to send to the server. It may be a `Message`, any
|
|
Mapping (including Dict), or raw bytes.
|
|
:param assign_id:
|
|
Assign an arbitrary execution ID to this message. If
|
|
`False`, the existing id must either be absent (and no other
|
|
such pending execution may omit an ID) or a string. If it is
|
|
a string, it must not start with '__qmp#' and no other such
|
|
pending execution may currently be using that ID.
|
|
|
|
:return: Execution reply from the server.
|
|
|
|
:raise ExecInterruptedError:
|
|
When the reply could not be retrieved because the connection
|
|
was lost, or some other problem.
|
|
:raise TypeError:
|
|
When assign_id is `False`, an ID is given, and it is not a string.
|
|
:raise ValueError:
|
|
When assign_id is `False`, but the ID is not usable;
|
|
Either because it starts with '__qmp#' or it is already in-use.
|
|
"""
|
|
# 1. convert generic Mapping or bytes to a QMP Message
|
|
# 2. copy Message objects so that we assign an ID only to the copy.
|
|
msg = Message(msg)
|
|
|
|
exec_id = msg.get('id')
|
|
if not assign_id and 'id' in msg:
|
|
if not isinstance(exec_id, str):
|
|
raise TypeError(f"ID ('{exec_id}') must be a string.")
|
|
if exec_id.startswith('__qmp#'):
|
|
raise ValueError(
|
|
f"ID ('{exec_id}') must not start with '__qmp#'."
|
|
)
|
|
|
|
if not assign_id and exec_id in self._pending:
|
|
raise ValueError(
|
|
f"ID '{exec_id}' is in-use and cannot be used."
|
|
)
|
|
|
|
return await self._execute(msg, assign_id=assign_id)
|
|
|
|
@upper_half
|
|
@require(Runstate.RUNNING)
|
|
async def execute_msg(self, msg: Message) -> object:
|
|
"""
|
|
Execute a QMP command and return its value.
|
|
|
|
:param msg: The QMP `Message` to execute.
|
|
|
|
:return:
|
|
The command execution return value from the server. The type of
|
|
object returned depends on the command that was issued,
|
|
though most in QEMU return a `dict`.
|
|
:raise ValueError:
|
|
If the QMP `Message` does not have either the 'execute' or
|
|
'exec-oob' fields set.
|
|
:raise ExecuteError: When the server returns an error response.
|
|
:raise ExecInterruptedError: if the connection was terminated early.
|
|
"""
|
|
if not ('execute' in msg or 'exec-oob' in msg):
|
|
raise ValueError("Requires 'execute' or 'exec-oob' message")
|
|
|
|
# Copy the Message so that the ID assigned by _execute() is
|
|
# local to this method; allowing the ID to be seen in raised
|
|
# Exceptions but without modifying the caller's held copy.
|
|
msg = Message(msg)
|
|
reply = await self._execute(msg)
|
|
|
|
if 'error' in reply:
|
|
try:
|
|
error_response = ErrorResponse(reply)
|
|
except (KeyError, TypeError) as err:
|
|
# Error response was malformed.
|
|
raise BadReplyError(
|
|
"QMP error reply is malformed", reply, msg,
|
|
) from err
|
|
|
|
raise ExecuteError(error_response, msg, reply)
|
|
|
|
if 'return' not in reply:
|
|
raise BadReplyError(
|
|
"QMP reply is missing a 'error' or 'return' member",
|
|
reply, msg,
|
|
)
|
|
|
|
return reply['return']
|
|
|
|
@classmethod
|
|
def make_execute_msg(cls, cmd: str,
|
|
arguments: Optional[Mapping[str, object]] = None,
|
|
oob: bool = False) -> Message:
|
|
"""
|
|
Create an executable message to be sent by `execute_msg` later.
|
|
|
|
:param cmd: QMP command name.
|
|
:param arguments: Arguments (if any). Must be JSON-serializable.
|
|
:param oob: If `True`, execute "out of band".
|
|
|
|
:return: An executable QMP `Message`.
|
|
"""
|
|
msg = Message({'exec-oob' if oob else 'execute': cmd})
|
|
if arguments is not None:
|
|
msg['arguments'] = arguments
|
|
return msg
|
|
|
|
@upper_half
|
|
async def execute(self, cmd: str,
|
|
arguments: Optional[Mapping[str, object]] = None,
|
|
oob: bool = False) -> object:
|
|
"""
|
|
Execute a QMP command and return its value.
|
|
|
|
:param cmd: QMP command name.
|
|
:param arguments: Arguments (if any). Must be JSON-serializable.
|
|
:param oob: If `True`, execute "out of band".
|
|
|
|
:return:
|
|
The command execution return value from the server. The type of
|
|
object returned depends on the command that was issued,
|
|
though most in QEMU return a `dict`.
|
|
:raise ExecuteError: When the server returns an error response.
|
|
:raise ExecInterruptedError: if the connection was terminated early.
|
|
"""
|
|
msg = self.make_execute_msg(cmd, arguments, oob=oob)
|
|
return await self.execute_msg(msg)
|
|
|
|
@upper_half
|
|
@require(Runstate.RUNNING)
|
|
def send_fd_scm(self, fd: int) -> None:
|
|
"""
|
|
Send a file descriptor to the remote via SCM_RIGHTS.
|
|
"""
|
|
assert self._writer is not None
|
|
sock = self._writer.transport.get_extra_info('socket')
|
|
|
|
if sock.family != socket.AF_UNIX:
|
|
raise QMPError("Sending file descriptors requires a UNIX socket.")
|
|
|
|
if not hasattr(sock, 'sendmsg'):
|
|
# We need to void the warranty sticker.
|
|
# Access to sendmsg is scheduled for removal in Python 3.11.
|
|
# Find the real backing socket to use it anyway.
|
|
sock = sock._sock # pylint: disable=protected-access
|
|
|
|
sock.sendmsg(
|
|
[b' '],
|
|
[(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
|
|
)
|