python/aqmp: add execute() interfaces

Add execute() and execute_msg().

_execute() is split into _issue() and _reply() halves so that
hypothetical subclasses of QMP that want to support different execution
paradigms can do so.

I anticipate a synchronous interface may have need of separating the
send/reply phases. However, I do not wish to expose that interface here
and want to actively discourage it, so they remain private interfaces.

Signed-off-by: John Snow <jsnow@redhat.com>
Message-id: 20210915162955.333025-21-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
John Snow 2021-09-15 12:29:48 -04:00
parent 577737be55
commit e0fea0b3ac
2 changed files with 198 additions and 8 deletions

View File

@ -25,7 +25,7 @@ from .error import AQMPError
from .events import EventListener from .events import EventListener
from .message import Message from .message import Message
from .protocol import ConnectError, Runstate, StateError from .protocol import ConnectError, Runstate, StateError
from .qmp_client import QMPClient from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
# The order of these fields impact the Sphinx documentation order. # The order of these fields impact the Sphinx documentation order.
@ -40,4 +40,6 @@ __all__ = (
'AQMPError', 'AQMPError',
'StateError', 'StateError',
'ConnectError', 'ConnectError',
'ExecuteError',
'ExecInterruptedError',
) )

View File

@ -7,8 +7,7 @@ used to either connect to a listening server, or used to listen and
accept an incoming connection from that server. accept an incoming connection from that server.
""" """
# The import workarounds here are fixed in the next commit. import asyncio
import asyncio # pylint: disable=unused-import # noqa
import logging import logging
from typing import ( from typing import (
Dict, Dict,
@ -22,8 +21,8 @@ from typing import (
from .error import AQMPError, ProtocolError from .error import AQMPError, ProtocolError
from .events import Events from .events import Events
from .message import Message from .message import Message
from .models import Greeting from .models import ErrorResponse, Greeting
from .protocol import AsyncProtocol from .protocol import AsyncProtocol, Runstate, require
from .util import ( from .util import (
bottom_half, bottom_half,
exception_summary, exception_summary,
@ -65,11 +64,32 @@ class NegotiationError(_WrappedProtocolError):
""" """
class ExecuteError(AQMPError):
"""
Exception raised by `QMPClient.execute()` on RPC failure.
:param error_response: The RPC error response object.
:param sent: The sent RPC message that caused the failure.
:param received: The raw RPC error reply received.
"""
def __init__(self, error_response: ErrorResponse,
sent: Message, received: Message):
super().__init__(error_response.error.desc)
#: The sent `Message` that caused the failure
self.sent: Message = sent
#: The received `Message` that indicated failure
self.received: Message = received
#: The parsed error response
self.error: ErrorResponse = error_response
#: The QMP error class
self.error_class: str = error_response.error.class_
class ExecInterruptedError(AQMPError): class ExecInterruptedError(AQMPError):
""" """
Exception raised when an RPC is interrupted. Exception raised by `execute()` (et al) when an RPC is interrupted.
This error is raised when an execute() statement could not be This error is raised when an `execute()` statement could not be
completed. This can occur because the connection itself was completed. This can occur because the connection itself was
terminated before a reply was received. terminated before a reply was received.
@ -112,6 +132,27 @@ class ServerParseError(_MsgProtocolError):
""" """
class BadReplyError(_MsgProtocolError):
"""
An execution reply was successfully routed, but not understood.
If a QMP message is received with an 'id' field to allow it to be
routed, but is otherwise malformed, this exception will be raised.
A reply message is malformed if it is missing either the 'return' or
'error' keys, or if the 'error' value has missing keys or members of
the wrong type.
:param error_message: Human-readable string describing the error.
:param msg: The malformed reply that was received.
:param sent: The message that was sent that prompted the error.
"""
def __init__(self, error_message: str, msg: Message, sent: Message):
super().__init__(error_message, msg)
#: The sent `Message` that caused the failure
self.sent = sent
class QMPClient(AsyncProtocol[Message], Events): class QMPClient(AsyncProtocol[Message], Events):
""" """
Implements a QMP client connection. Implements a QMP client connection.
@ -174,6 +215,9 @@ class QMPClient(AsyncProtocol[Message], Events):
# Cached Greeting, if one was awaited. # Cached Greeting, if one was awaited.
self._greeting: Optional[Greeting] = None self._greeting: Optional[Greeting] = None
# Command ID counter
self._execute_id = 0
# Incoming RPC reply messages. # Incoming RPC reply messages.
self._pending: Dict[ self._pending: Dict[
Union[str, None], Union[str, None],
@ -363,12 +407,135 @@ class QMPClient(AsyncProtocol[Message], Events):
assert self._writer is not None assert self._writer is not None
self._writer.write(bytes(msg)) self._writer.write(bytes(msg))
@upper_half
def _get_exec_id(self) -> str:
exec_id = f"__aqmp#{self._execute_id:05d}"
self._execute_id += 1
return exec_id
@upper_half
async def _issue(self, msg: Message) -> Union[None, str]:
"""
Issue a QMP `Message` and do not wait for a reply.
:param msg: The QMP `Message` to send to the server.
:return: The ID of the `Message` sent.
"""
msg_id: Optional[str] = None
if 'id' in msg:
assert isinstance(msg['id'], str)
msg_id = msg['id']
self._pending[msg_id] = asyncio.Queue(maxsize=1)
await self._outgoing.put(msg)
return msg_id
@upper_half
async def _reply(self, msg_id: Union[str, None]) -> Message:
"""
Await a reply to a previously issued QMP message.
:param msg_id: The ID of the previously issued message.
:return: The reply from the server.
:raise ExecInterruptedError:
When the reply could not be retrieved because the connection
was lost, or some other problem.
"""
queue = self._pending[msg_id]
result = await queue.get()
try:
if isinstance(result, ExecInterruptedError):
raise result
return result
finally:
del self._pending[msg_id]
@upper_half
async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
"""
Send a QMP `Message` to the server and await a reply.
This method *assumes* you are sending some kind of an execute
statement that *will* receive a reply.
An execution ID will be assigned if assign_id is `True`. It can be
disabled, but this requires that an ID is manually assigned
instead. For manually assigned IDs, you must not use the string
'__aqmp#' anywhere in the ID.
:param msg: The QMP `Message` to execute.
:param assign_id: If True, assign a new execution ID.
:return: Execution reply from the server.
:raise ExecInterruptedError:
When the reply could not be retrieved because the connection
was lost, or some other problem.
"""
if assign_id:
msg['id'] = self._get_exec_id()
elif 'id' in msg:
assert isinstance(msg['id'], str)
assert '__aqmp#' not in msg['id']
exec_id = await self._issue(msg)
return await self._reply(exec_id)
@upper_half
@require(Runstate.RUNNING)
async def execute_msg(self, msg: Message) -> object:
"""
Execute a QMP command and return its value.
:param msg: The QMP `Message` to execute.
:return:
The command execution return value from the server. The type of
object returned depends on the command that was issued,
though most in QEMU return a `dict`.
:raise ValueError:
If the QMP `Message` does not have either the 'execute' or
'exec-oob' fields set.
:raise ExecuteError: When the server returns an error response.
:raise ExecInterruptedError: if the connection was terminated early.
"""
if not ('execute' in msg or 'exec-oob' in msg):
raise ValueError("Requires 'execute' or 'exec-oob' message")
# Copy the Message so that the ID assigned by _execute() is
# local to this method; allowing the ID to be seen in raised
# Exceptions but without modifying the caller's held copy.
msg = Message(msg)
reply = await self._execute(msg)
if 'error' in reply:
try:
error_response = ErrorResponse(reply)
except (KeyError, TypeError) as err:
# Error response was malformed.
raise BadReplyError(
"QMP error reply is malformed", reply, msg,
) from err
raise ExecuteError(error_response, msg, reply)
if 'return' not in reply:
raise BadReplyError(
"QMP reply is missing a 'error' or 'return' member",
reply, msg,
)
return reply['return']
@classmethod @classmethod
def make_execute_msg(cls, cmd: str, def make_execute_msg(cls, cmd: str,
arguments: Optional[Mapping[str, object]] = None, arguments: Optional[Mapping[str, object]] = None,
oob: bool = False) -> Message: oob: bool = False) -> Message:
""" """
Create an executable message to be sent later. Create an executable message to be sent by `execute_msg` later.
:param cmd: QMP command name. :param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable. :param arguments: Arguments (if any). Must be JSON-serializable.
@ -380,3 +547,24 @@ class QMPClient(AsyncProtocol[Message], Events):
if arguments is not None: if arguments is not None:
msg['arguments'] = arguments msg['arguments'] = arguments
return msg return msg
@upper_half
async def execute(self, cmd: str,
arguments: Optional[Mapping[str, object]] = None,
oob: bool = False) -> object:
"""
Execute a QMP command and return its value.
:param cmd: QMP command name.
:param arguments: Arguments (if any). Must be JSON-serializable.
:param oob: If `True`, execute "out of band".
:return:
The command execution return value from the server. The type of
object returned depends on the command that was issued,
though most in QEMU return a `dict`.
:raise ExecuteError: When the server returns an error response.
:raise ExecInterruptedError: if the connection was terminated early.
"""
msg = self.make_execute_msg(cmd, arguments, oob=oob)
return await self.execute_msg(msg)