qemu-e2k/python/qemu/aqmp/qmp_client.py
John Snow 41f4f92260 python/aqmp: add _raw() execution interface
This is added in anticipation of wanting it for a synchronous wrapper
for the iotest interface. Normally, execute() and execute_msg() both
raise QMP errors in the form of Python exceptions.

Many iotests expect the entire reply as-is. To reduce churn there, add a
private execution interface that will ease transition churn. However, I
do not wish to encourage its use, so it will remain a private interface.

Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210915162955.333025-22-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
2021-09-27 12:10:29 -04:00

622 lines
21 KiB
Python

"""
QMP Protocol Implementation
This module provides the `QMPClient` class, which can be used to connect
and send commands to a QMP server such as QEMU. The QMP class can be
used to either connect to a listening server, or used to listen and
accept an incoming connection from that server.
"""
import asyncio
import logging
from typing import (
Dict,
List,
Mapping,
Optional,
Union,
cast,
)
from .error import AQMPError, ProtocolError
from .events import Events
from .message import Message
from .models import ErrorResponse, Greeting
from .protocol import AsyncProtocol, Runstate, require
from .util import (
bottom_half,
exception_summary,
pretty_traceback,
upper_half,
)
class _WrappedProtocolError(ProtocolError):
"""
Abstract exception class for Protocol errors that wrap an Exception.
:param error_message: Human-readable string describing the error.
:param exc: The root-cause exception.
"""
def __init__(self, error_message: str, exc: Exception):
super().__init__(error_message)
self.exc = exc
def __str__(self) -> str:
return f"{self.error_message}: {self.exc!s}"
class GreetingError(_WrappedProtocolError):
"""
An exception occurred during the Greeting phase.
:param error_message: Human-readable string describing the error.
:param exc: The root-cause exception.
"""
class NegotiationError(_WrappedProtocolError):
"""
An exception occurred during the Negotiation phase.
:param error_message: Human-readable string describing the error.
:param exc: The root-cause exception.
"""
class ExecuteError(AQMPError):
"""
Exception raised by `QMPClient.execute()` on RPC failure.
:param error_response: The RPC error response object.
:param sent: The sent RPC message that caused the failure.
:param received: The raw RPC error reply received.
"""
def __init__(self, error_response: ErrorResponse,
sent: Message, received: Message):
super().__init__(error_response.error.desc)
#: The sent `Message` that caused the failure
self.sent: Message = sent
#: The received `Message` that indicated failure
self.received: Message = received
#: The parsed error response
self.error: ErrorResponse = error_response
#: The QMP error class
self.error_class: str = error_response.error.class_
class ExecInterruptedError(AQMPError):
"""
Exception raised by `execute()` (et al) when an RPC is interrupted.
This error is raised when an `execute()` statement could not be
completed. This can occur because the connection itself was
terminated before a reply was received.
The true cause of the interruption will be available via `disconnect()`.
"""
class _MsgProtocolError(ProtocolError):
"""
Abstract error class for protocol errors that have a `Message` object.
This Exception class is used for protocol errors where the `Message`
was mechanically understood, but was found to be inappropriate or
malformed.
:param error_message: Human-readable string describing the error.
:param msg: The QMP `Message` that caused the error.
"""
def __init__(self, error_message: str, msg: Message):
super().__init__(error_message)
#: The received `Message` that caused the error.
self.msg: Message = msg
def __str__(self) -> str:
return "\n".join([
super().__str__(),
f" Message was: {str(self.msg)}\n",
])
class ServerParseError(_MsgProtocolError):
"""
The Server sent a `Message` indicating parsing failure.
i.e. A reply has arrived from the server, but it is missing the "ID"
field, indicating a parsing error.
:param error_message: Human-readable string describing the error.
:param msg: The QMP `Message` that caused the error.
"""
class BadReplyError(_MsgProtocolError):
"""
An execution reply was successfully routed, but not understood.
If a QMP message is received with an 'id' field to allow it to be
routed, but is otherwise malformed, this exception will be raised.
A reply message is malformed if it is missing either the 'return' or
'error' keys, or if the 'error' value has missing keys or members of
the wrong type.
:param error_message: Human-readable string describing the error.
:param msg: The malformed reply that was received.
:param sent: The message that was sent that prompted the error.
"""
def __init__(self, error_message: str, msg: Message, sent: Message):
super().__init__(error_message, msg)
#: The sent `Message` that caused the failure
self.sent = sent
class QMPClient(AsyncProtocol[Message], Events):
"""
Implements a QMP client connection.
QMP can be used to establish a connection as either the transport
client or server, though this class always acts as the QMP client.
:param name: Optional nickname for the connection, used for logging.
Basic script-style usage looks like this::
qmp = QMPClient('my_virtual_machine_name')
await qmp.connect(('127.0.0.1', 1234))
...
res = await qmp.execute('block-query')
...
await qmp.disconnect()
Basic async client-style usage looks like this::
class Client:
def __init__(self, name: str):
self.qmp = QMPClient(name)
async def watch_events(self):
try:
async for event in self.qmp.events:
print(f"Event: {event['event']}")
except asyncio.CancelledError:
return
async def run(self, address='/tmp/qemu.socket'):
await self.qmp.connect(address)
asyncio.create_task(self.watch_events())
await self.qmp.runstate_changed.wait()
await self.disconnect()
See `aqmp.events` for more detail on event handling patterns.
"""
#: Logger object used for debugging messages.
logger = logging.getLogger(__name__)
# Read buffer limit; large enough to accept query-qmp-schema
_limit = (256 * 1024)
# Type alias for pending execute() result items
_PendingT = Union[Message, ExecInterruptedError]
def __init__(self, name: Optional[str] = None) -> None:
super().__init__(name)
Events.__init__(self)
#: Whether or not to await a greeting after establishing a connection.
self.await_greeting: bool = True
#: Whether or not to perform capabilities negotiation upon connection.
#: Implies `await_greeting`.
self.negotiate: bool = True
# Cached Greeting, if one was awaited.
self._greeting: Optional[Greeting] = None
# Command ID counter
self._execute_id = 0
# Incoming RPC reply messages.
self._pending: Dict[
Union[str, None],
'asyncio.Queue[QMPClient._PendingT]'
] = {}
@upper_half
async def _establish_session(self) -> None:
"""
Initiate the QMP session.
Wait for the QMP greeting and perform capabilities negotiation.
:raise GreetingError: When the greeting is not understood.
:raise NegotiationError: If the negotiation fails.
:raise EOFError: When the server unexpectedly hangs up.
:raise OSError: For underlying stream errors.
"""
self._greeting = None
self._pending = {}
if self.await_greeting or self.negotiate:
self._greeting = await self._get_greeting()
if self.negotiate:
await self._negotiate()
# This will start the reader/writers:
await super()._establish_session()
@upper_half
async def _get_greeting(self) -> Greeting:
"""
:raise GreetingError: When the greeting is not understood.
:raise EOFError: When the server unexpectedly hangs up.
:raise OSError: For underlying stream errors.
:return: the Greeting object given by the server.
"""
self.logger.debug("Awaiting greeting ...")
try:
msg = await self._recv()
return Greeting(msg)
except (ProtocolError, KeyError, TypeError) as err:
emsg = "Did not understand Greeting"
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise GreetingError(emsg, err) from err
except BaseException as err:
# EOFError, OSError, or something unexpected.
emsg = "Failed to receive Greeting"
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise
@upper_half
async def _negotiate(self) -> None:
"""
Perform QMP capabilities negotiation.
:raise NegotiationError: When negotiation fails.
:raise EOFError: When the server unexpectedly hangs up.
:raise OSError: For underlying stream errors.
"""
self.logger.debug("Negotiating capabilities ...")
arguments: Dict[str, List[str]] = {'enable': []}
if self._greeting and 'oob' in self._greeting.QMP.capabilities:
arguments['enable'].append('oob')
msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
# It's not safe to use execute() here, because the reader/writers
# aren't running. AsyncProtocol *requires* that a new session
# does not fail after the reader/writers are running!
try:
await self._send(msg)
reply = await self._recv()
assert 'return' in reply
assert 'error' not in reply
except (ProtocolError, AssertionError) as err:
emsg = "Negotiation failed"
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise NegotiationError(emsg, err) from err
except BaseException as err:
# EOFError, OSError, or something unexpected.
emsg = "Negotiation failed"
self.logger.error("%s: %s", emsg, exception_summary(err))
self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
raise
@bottom_half
async def _bh_disconnect(self) -> None:
try:
await super()._bh_disconnect()
finally:
if self._pending:
self.logger.debug("Cancelling pending executions")
keys = self._pending.keys()
for key in keys:
self.logger.debug("Cancelling execution '%s'", key)
self._pending[key].put_nowait(
ExecInterruptedError("Disconnected")
)
self.logger.debug("QMP Disconnected.")
@upper_half
def _cleanup(self) -> None:
super()._cleanup()
assert not self._pending
@bottom_half
async def _on_message(self, msg: Message) -> None:
"""
Add an incoming message to the appropriate queue/handler.
:raise ServerParseError: When Message indicates server parse failure.
"""
# Incoming messages are not fully parsed/validated here;
# do only light peeking to know how to route the messages.
if 'event' in msg:
await self._event_dispatch(msg)
return
# Below, we assume everything left is an execute/exec-oob response.
exec_id = cast(Optional[str], msg.get('id'))
if exec_id in self._pending:
await self._pending[exec_id].put(msg)
return
# We have a message we can't route back to a caller.
is_error = 'error' in msg
has_id = 'id' in msg
if is_error and not has_id:
# This is very likely a server parsing error.
# It doesn't inherently belong to any pending execution.
# Instead of performing clever recovery, just terminate.
# See "NOTE" in qmp-spec.txt, section 2.4.2
raise ServerParseError(
("Server sent an error response without an ID, "
"but there are no ID-less executions pending. "
"Assuming this is a server parser failure."),
msg
)
# qmp-spec.txt, section 2.4:
# 'Clients should drop all the responses
# that have an unknown "id" field.'
self.logger.log(
logging.ERROR if is_error else logging.WARNING,
"Unknown ID '%s', message dropped.",
exec_id,
)
self.logger.debug("Unroutable message: %s", str(msg))
@upper_half
@bottom_half
async def _do_recv(self) -> Message:
"""
:raise OSError: When a stream error is encountered.
:raise EOFError: When the stream is at EOF.
:raise ProtocolError:
When the Message is not understood.
See also `Message._deserialize`.
:return: A single QMP `Message`.
"""
msg_bytes = await self._readline()
msg = Message(msg_bytes, eager=True)
return msg
@upper_half
@bottom_half
def _do_send(self, msg: Message) -> None:
"""
:raise ValueError: JSON serialization failure
:raise TypeError: JSON serialization failure
:raise OSError: When a stream error is encountered.
"""
assert self._writer is not None
self._writer.write(bytes(msg))
@upper_half
def _get_exec_id(self) -> str:
exec_id = f"__aqmp#{self._execute_id:05d}"
self._execute_id += 1
return exec_id
@upper_half
async def _issue(self, msg: Message) -> Union[None, str]:
"""
Issue a QMP `Message` and do not wait for a reply.
:param msg: The QMP `Message` to send to the server.
:return: The ID of the `Message` sent.
"""
msg_id: Optional[str] = None
if 'id' in msg:
assert isinstance(msg['id'], str)
msg_id = msg['id']
self._pending[msg_id] = asyncio.Queue(maxsize=1)
await self._outgoing.put(msg)
return msg_id
@upper_half
async def _reply(self, msg_id: Union[str, None]) -> Message:
"""
Await a reply to a previously issued QMP message.
:param msg_id: The ID of the previously issued message.
:return: The reply from the server.
:raise ExecInterruptedError:
When the reply could not be retrieved because the connection
was lost, or some other problem.
"""
queue = self._pending[msg_id]
result = await queue.get()
try:
if isinstance(result, ExecInterruptedError):
raise result
return result
finally:
del self._pending[msg_id]
@upper_half
async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
"""
Send a QMP `Message` to the server and await a reply.
This method *assumes* you are sending some kind of an execute
statement that *will* receive a reply.
An execution ID will be assigned if assign_id is `True`. It can be
disabled, but this requires that an ID is manually assigned
instead. For manually assigned IDs, you must not use the string
'__aqmp#' anywhere in the ID.
:param msg: The QMP `Message` to execute.
:param assign_id: If True, assign a new execution ID.
:return: Execution reply from the server.
:raise ExecInterruptedError:
When the reply could not be retrieved because the connection
was lost, or some other problem.
"""
if assign_id:
msg['id'] = self._get_exec_id()
elif 'id' in msg:
assert isinstance(msg['id'], str)
assert '__aqmp#' not in msg['id']
exec_id = await self._issue(msg)
return await self._reply(exec_id)
@upper_half
@require(Runstate.RUNNING)
async def _raw(
self,
msg: Union[Message, Mapping[str, object], bytes],
assign_id: bool = True,
) -> Message:
"""
Issue a raw `Message` to the QMP server and await a reply.
:param msg:
A Message to send to the server. It may be a `Message`, any
Mapping (including Dict), or raw bytes.
:param assign_id:
Assign an arbitrary execution ID to this message. If
`False`, the existing id must either be absent (and no other
such pending execution may omit an ID) or a string. If it is
a string, it must not start with '__aqmp#' and no other such
pending execution may currently be using that ID.
:return: Execution reply from the server.
:raise ExecInterruptedError:
When the reply could not be retrieved because the connection
was lost, or some other problem.
:raise TypeError:
When assign_id is `False`, an ID is given, and it is not a string.
:raise ValueError:
When assign_id is `False`, but the ID is not usable;
Either because it starts with '__aqmp#' or it is already in-use.
"""
# 1. convert generic Mapping or bytes to a QMP Message
# 2. copy Message objects so that we assign an ID only to the copy.
msg = Message(msg)
exec_id = msg.get('id')
if not assign_id and 'id' in msg:
if not isinstance(exec_id, str):
raise TypeError(f"ID ('{exec_id}') must be a string.")
if exec_id.startswith('__aqmp#'):
raise ValueError(
f"ID ('{exec_id}') must not start with '__aqmp#'."
)
if not assign_id and exec_id in self._pending:
raise ValueError(
f"ID '{exec_id}' is in-use and cannot be used."
)
return await self._execute(msg, assign_id=assign_id)
@upper_half
@require(Runstate.RUNNING)
async def execute_msg(self, msg: Message) -> object:
"""
Execute a QMP command and return its value.
:param msg: The QMP `Message` to execute.
:return:
The command execution return value from the server. The type of
object returned depends on the command that was issued,
though most in QEMU return a `dict`.
:raise ValueError:
If the QMP `Message` does not have either the 'execute' or
'exec-oob' fields set.
:raise ExecuteError: When the server returns an error response.
:raise ExecInterruptedError: if the connection was terminated early.
"""
if not ('execute' in msg or 'exec-oob' in msg):
raise ValueError("Requires 'execute' or 'exec-oob' message")
# Copy the Message so that the ID assigned by _execute() is
# local to this method; allowing the ID to be seen in raised
# Exceptions but without modifying the caller's held copy.
msg = Message(msg)
reply = await self._execute(msg)
if 'error' in reply:
try:
error_response = ErrorResponse(reply)
except (KeyError, TypeError) as err:
# Error response was malformed.
raise BadReplyError(
"QMP error reply is malformed", reply, msg,
) from err
raise ExecuteError(error_response, msg, reply)
if 'return' not in reply:
raise BadReplyError(
"QMP reply is missing a 'error' or 'return' member",
reply, msg,
)
return reply['return']
@classmethod
def make_execute_msg(cls, cmd: str,
arguments: Optional[Mapping[str, object]] = None,
oob: bool = False) -> Message:
"""
Create an executable message to be sent by `execute_msg` later.
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
:param oob: If `True`, execute "out of band".
:return: An executable QMP `Message`.
"""
msg = Message({'exec-oob' if oob else 'execute': cmd})
if arguments is not None:
msg['arguments'] = arguments
return msg
@upper_half
async def execute(self, cmd: str,
arguments: Optional[Mapping[str, object]] = None,
oob: bool = False) -> object:
"""
Execute a QMP command and return its value.
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
:param oob: If `True`, execute "out of band".
:return:
The command execution return value from the server. The type of
object returned depends on the command that was issued,
though most in QEMU return a `dict`.
:raise ExecuteError: When the server returns an error response.
:raise ExecInterruptedError: if the connection was terminated early.
"""
msg = self.make_execute_msg(cmd, arguments, oob=oob)
return await self.execute_msg(msg)