684750ab4f
Use a shorter name. We are going to move in iotests from qmp() to command() where possible. But command() is longer than qmp() and don't look better. Let's rename. You can simply grep for '\.command(' and for 'def command(' to check that everything is updated (command() in tests/docker/docker.py is unrelated). Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> Reviewed-by: Cédric Le Goater <clg@kaod.org> Reviewed-by: Juan Quintela <quintela@redhat.com> Message-id: 20231006154125.1068348-6-vsementsov@yandex-team.ru [vsementsov: also update three occurrences in tests/avocado/machine_aspeed.py and keep r-b] Signed-off-by: John Snow <jsnow@redhat.com>
320 lines
9.7 KiB
Python
320 lines
9.7 KiB
Python
"""
|
|
(Legacy) Sync QMP Wrapper
|
|
|
|
This module provides the `QEMUMonitorProtocol` class, which is a
|
|
synchronous wrapper around `QMPClient`.
|
|
|
|
Its design closely resembles that of the original QEMUMonitorProtocol
|
|
class, originally written by Luiz Capitulino. It is provided here for
|
|
compatibility with scripts inside the QEMU source tree that expect the
|
|
old interface.
|
|
"""
|
|
|
|
#
|
|
# Copyright (C) 2009-2022 Red Hat Inc.
|
|
#
|
|
# Authors:
|
|
# Luiz Capitulino <lcapitulino@redhat.com>
|
|
# John Snow <jsnow@redhat.com>
|
|
#
|
|
# This work is licensed under the terms of the GNU GPL, version 2. See
|
|
# the COPYING file in the top-level directory.
|
|
#
|
|
|
|
import asyncio
|
|
import socket
|
|
from types import TracebackType
|
|
from typing import (
|
|
Any,
|
|
Awaitable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
|
|
from .error import QMPError
|
|
from .protocol import Runstate, SocketAddrT
|
|
from .qmp_client import QMPClient
|
|
|
|
|
|
#: QMPMessage is an entire QMP message of any kind.
|
|
QMPMessage = Dict[str, Any]
|
|
|
|
#: QMPReturnValue is the 'return' value of a command.
|
|
QMPReturnValue = object
|
|
|
|
#: QMPObject is any object in a QMP message.
|
|
QMPObject = Dict[str, object]
|
|
|
|
# QMPMessage can be outgoing commands or incoming events/returns.
|
|
# QMPReturnValue is usually a dict/json object, but due to QAPI's
|
|
# 'command-returns-exceptions', it can actually be anything.
|
|
#
|
|
# {'return': {}} is a QMPMessage,
|
|
# {} is the QMPReturnValue.
|
|
|
|
|
|
class QMPBadPortError(QMPError):
|
|
"""
|
|
Unable to parse socket address: Port was non-numerical.
|
|
"""
|
|
|
|
|
|
class QEMUMonitorProtocol:
|
|
"""
|
|
Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
|
|
and then allow to handle commands and events.
|
|
|
|
:param address: QEMU address, can be a unix socket path (string), a tuple
|
|
in the form ( address, port ) for a TCP connection, or an
|
|
existing `socket.socket` object.
|
|
:param server: Act as the socket server. (See 'accept')
|
|
Not applicable when passing a socket directly.
|
|
:param nickname: Optional nickname used for logging.
|
|
"""
|
|
|
|
def __init__(self,
|
|
address: Union[SocketAddrT, socket.socket],
|
|
server: bool = False,
|
|
nickname: Optional[str] = None):
|
|
|
|
if server and isinstance(address, socket.socket):
|
|
raise ValueError(
|
|
"server argument should be False when passing a socket")
|
|
|
|
self._qmp = QMPClient(nickname)
|
|
self._aloop = asyncio.get_event_loop()
|
|
self._address = address
|
|
self._timeout: Optional[float] = None
|
|
|
|
if server:
|
|
assert not isinstance(self._address, socket.socket)
|
|
self._sync(self._qmp.start_server(self._address))
|
|
|
|
_T = TypeVar('_T')
|
|
|
|
def _sync(
|
|
self, future: Awaitable[_T], timeout: Optional[float] = None
|
|
) -> _T:
|
|
return self._aloop.run_until_complete(
|
|
asyncio.wait_for(future, timeout=timeout)
|
|
)
|
|
|
|
def _get_greeting(self) -> Optional[QMPMessage]:
|
|
if self._qmp.greeting is not None:
|
|
# pylint: disable=protected-access
|
|
return self._qmp.greeting._asdict()
|
|
return None
|
|
|
|
def __enter__(self: _T) -> _T:
|
|
# Implement context manager enter function.
|
|
return self
|
|
|
|
def __exit__(self,
|
|
exc_type: Optional[Type[BaseException]],
|
|
exc_val: Optional[BaseException],
|
|
exc_tb: Optional[TracebackType]) -> None:
|
|
# Implement context manager exit function.
|
|
self.close()
|
|
|
|
@classmethod
|
|
def parse_address(cls, address: str) -> SocketAddrT:
|
|
"""
|
|
Parse a string into a QMP address.
|
|
|
|
Figure out if the argument is in the port:host form.
|
|
If it's not, it's probably a file path.
|
|
"""
|
|
components = address.split(':')
|
|
if len(components) == 2:
|
|
try:
|
|
port = int(components[1])
|
|
except ValueError:
|
|
msg = f"Bad port: '{components[1]}' in '{address}'."
|
|
raise QMPBadPortError(msg) from None
|
|
return (components[0], port)
|
|
|
|
# Treat as filepath.
|
|
return address
|
|
|
|
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
|
|
"""
|
|
Connect to the QMP Monitor and perform capabilities negotiation.
|
|
|
|
:return: QMP greeting dict, or None if negotiate is false
|
|
:raise ConnectError: on connection errors
|
|
"""
|
|
self._qmp.await_greeting = negotiate
|
|
self._qmp.negotiate = negotiate
|
|
|
|
self._sync(
|
|
self._qmp.connect(self._address)
|
|
)
|
|
return self._get_greeting()
|
|
|
|
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
|
|
"""
|
|
Await connection from QMP Monitor and perform capabilities negotiation.
|
|
|
|
:param timeout:
|
|
timeout in seconds (nonnegative float number, or None).
|
|
If None, there is no timeout, and this may block forever.
|
|
|
|
:return: QMP greeting dict
|
|
:raise ConnectError: on connection errors
|
|
"""
|
|
self._qmp.await_greeting = True
|
|
self._qmp.negotiate = True
|
|
|
|
self._sync(self._qmp.accept(), timeout)
|
|
|
|
ret = self._get_greeting()
|
|
assert ret is not None
|
|
return ret
|
|
|
|
def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
|
|
"""
|
|
Send a QMP command to the QMP Monitor.
|
|
|
|
:param qmp_cmd: QMP command to be sent as a Python dict
|
|
:return: QMP response as a Python dict
|
|
"""
|
|
return dict(
|
|
self._sync(
|
|
# pylint: disable=protected-access
|
|
|
|
# _raw() isn't a public API, because turning off
|
|
# automatic ID assignment is discouraged. For
|
|
# compatibility with iotests *only*, do it anyway.
|
|
self._qmp._raw(qmp_cmd, assign_id=False),
|
|
self._timeout
|
|
)
|
|
)
|
|
|
|
def cmd_raw(self, name: str,
|
|
args: Optional[Dict[str, object]] = None) -> QMPMessage:
|
|
"""
|
|
Build a QMP command and send it to the QMP Monitor.
|
|
|
|
:param name: command name (string)
|
|
:param args: command arguments (dict)
|
|
"""
|
|
qmp_cmd: QMPMessage = {'execute': name}
|
|
if args:
|
|
qmp_cmd['arguments'] = args
|
|
return self.cmd_obj(qmp_cmd)
|
|
|
|
def cmd(self, cmd: str, **kwds: object) -> QMPReturnValue:
|
|
"""
|
|
Build and send a QMP command to the monitor, report errors if any
|
|
"""
|
|
return self._sync(
|
|
self._qmp.execute(cmd, kwds),
|
|
self._timeout
|
|
)
|
|
|
|
def pull_event(self,
|
|
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
|
|
"""
|
|
Pulls a single event.
|
|
|
|
:param wait:
|
|
If False or 0, do not wait. Return None if no events ready.
|
|
If True, wait forever until the next event.
|
|
Otherwise, wait for the specified number of seconds.
|
|
|
|
:raise asyncio.TimeoutError:
|
|
When a timeout is requested and the timeout period elapses.
|
|
|
|
:return: The first available QMP event, or None.
|
|
"""
|
|
if not wait:
|
|
# wait is False/0: "do not wait, do not except."
|
|
if self._qmp.events.empty():
|
|
return None
|
|
|
|
# If wait is 'True', wait forever. If wait is False/0, the events
|
|
# queue must not be empty; but it still needs some real amount
|
|
# of time to complete.
|
|
timeout = None
|
|
if wait and isinstance(wait, float):
|
|
timeout = wait
|
|
|
|
return dict(
|
|
self._sync(
|
|
self._qmp.events.get(),
|
|
timeout
|
|
)
|
|
)
|
|
|
|
def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
|
|
"""
|
|
Get a list of QMP events and clear all pending events.
|
|
|
|
:param wait:
|
|
If False or 0, do not wait. Return None if no events ready.
|
|
If True, wait until we have at least one event.
|
|
Otherwise, wait for up to the specified number of seconds for at
|
|
least one event.
|
|
|
|
:raise asyncio.TimeoutError:
|
|
When a timeout is requested and the timeout period elapses.
|
|
|
|
:return: A list of QMP events.
|
|
"""
|
|
events = [dict(x) for x in self._qmp.events.clear()]
|
|
if events:
|
|
return events
|
|
|
|
event = self.pull_event(wait)
|
|
return [event] if event is not None else []
|
|
|
|
def clear_events(self) -> None:
|
|
"""Clear current list of pending events."""
|
|
self._qmp.events.clear()
|
|
|
|
def close(self) -> None:
|
|
"""Close the connection."""
|
|
self._sync(
|
|
self._qmp.disconnect()
|
|
)
|
|
|
|
def settimeout(self, timeout: Optional[float]) -> None:
|
|
"""
|
|
Set the timeout for QMP RPC execution.
|
|
|
|
This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
|
|
The `accept`, `pull_event` and `get_event` methods have their
|
|
own configurable timeouts.
|
|
|
|
:param timeout:
|
|
timeout in seconds, or None.
|
|
None will wait indefinitely.
|
|
"""
|
|
self._timeout = timeout
|
|
|
|
def send_fd_scm(self, fd: int) -> None:
|
|
"""
|
|
Send a file descriptor to the remote via SCM_RIGHTS.
|
|
"""
|
|
self._qmp.send_fd_scm(fd)
|
|
|
|
def __del__(self) -> None:
|
|
if self._qmp.runstate == Runstate.IDLE:
|
|
return
|
|
|
|
if not self._aloop.is_running():
|
|
self.close()
|
|
else:
|
|
# Garbage collection ran while the event loop was running.
|
|
# Nothing we can do about it now, but if we don't raise our
|
|
# own error, the user will be treated to a lot of traceback
|
|
# they might not understand.
|
|
raise QMPError(
|
|
"QEMUMonitorProtocol.close()"
|
|
" was not called before object was garbage collected"
|
|
)
|