603a3bad4b
Teach QEMUMonitorProtocol to accept an exisiting socket. Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> Message-id: 20230111080101.969151-3-marcandre.lureau@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
328 lines
10 KiB
Python
328 lines
10 KiB
Python
"""
|
|
(Legacy) Sync QMP Wrapper
|
|
|
|
This module provides the `QEMUMonitorProtocol` class, which is a
|
|
synchronous wrapper around `QMPClient`.
|
|
|
|
Its design closely resembles that of the original QEMUMonitorProtocol
|
|
class, originally written by Luiz Capitulino. It is provided here for
|
|
compatibility with scripts inside the QEMU source tree that expect the
|
|
old interface.
|
|
"""
|
|
|
|
#
|
|
# Copyright (C) 2009-2022 Red Hat Inc.
|
|
#
|
|
# Authors:
|
|
# Luiz Capitulino <lcapitulino@redhat.com>
|
|
# John Snow <jsnow@redhat.com>
|
|
#
|
|
# This work is licensed under the terms of the GNU GPL, version 2. See
|
|
# the COPYING file in the top-level directory.
|
|
#
|
|
|
|
import asyncio
|
|
import socket
|
|
from types import TracebackType
|
|
from typing import (
|
|
Any,
|
|
Awaitable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
|
|
from .error import QMPError
|
|
from .protocol import Runstate, SocketAddrT
|
|
from .qmp_client import QMPClient
|
|
|
|
|
|
#: QMPMessage is an entire QMP message of any kind.
|
|
QMPMessage = Dict[str, Any]
|
|
|
|
#: QMPReturnValue is the 'return' value of a command.
|
|
QMPReturnValue = object
|
|
|
|
#: QMPObject is any object in a QMP message.
|
|
QMPObject = Dict[str, object]
|
|
|
|
# QMPMessage can be outgoing commands or incoming events/returns.
|
|
# QMPReturnValue is usually a dict/json object, but due to QAPI's
|
|
# 'command-returns-exceptions', it can actually be anything.
|
|
#
|
|
# {'return': {}} is a QMPMessage,
|
|
# {} is the QMPReturnValue.
|
|
|
|
|
|
class QMPBadPortError(QMPError):
|
|
"""
|
|
Unable to parse socket address: Port was non-numerical.
|
|
"""
|
|
|
|
|
|
class QEMUMonitorProtocol:
|
|
"""
|
|
Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
|
|
and then allow to handle commands and events.
|
|
|
|
:param address: QEMU address, can be either a unix socket path (string)
|
|
or a tuple in the form ( address, port ) for a TCP
|
|
connection or None
|
|
:param sock: a socket or None
|
|
:param server: Act as the socket server. (See 'accept')
|
|
:param nickname: Optional nickname used for logging.
|
|
"""
|
|
|
|
def __init__(self,
|
|
address: Optional[SocketAddrT] = None,
|
|
sock: Optional[socket.socket] = None,
|
|
server: bool = False,
|
|
nickname: Optional[str] = None):
|
|
|
|
assert address or sock
|
|
self._qmp = QMPClient(nickname)
|
|
self._aloop = asyncio.get_event_loop()
|
|
self._address = address
|
|
self._sock = sock
|
|
self._timeout: Optional[float] = None
|
|
|
|
if server:
|
|
if sock:
|
|
assert self._sock is not None
|
|
self._sync(self._qmp.open_with_socket(self._sock))
|
|
else:
|
|
assert self._address is not None
|
|
self._sync(self._qmp.start_server(self._address))
|
|
|
|
_T = TypeVar('_T')
|
|
|
|
def _sync(
|
|
self, future: Awaitable[_T], timeout: Optional[float] = None
|
|
) -> _T:
|
|
return self._aloop.run_until_complete(
|
|
asyncio.wait_for(future, timeout=timeout)
|
|
)
|
|
|
|
def _get_greeting(self) -> Optional[QMPMessage]:
|
|
if self._qmp.greeting is not None:
|
|
# pylint: disable=protected-access
|
|
return self._qmp.greeting._asdict()
|
|
return None
|
|
|
|
def __enter__(self: _T) -> _T:
|
|
# Implement context manager enter function.
|
|
return self
|
|
|
|
def __exit__(self,
|
|
exc_type: Optional[Type[BaseException]],
|
|
exc_val: Optional[BaseException],
|
|
exc_tb: Optional[TracebackType]) -> None:
|
|
# Implement context manager exit function.
|
|
self.close()
|
|
|
|
@classmethod
|
|
def parse_address(cls, address: str) -> SocketAddrT:
|
|
"""
|
|
Parse a string into a QMP address.
|
|
|
|
Figure out if the argument is in the port:host form.
|
|
If it's not, it's probably a file path.
|
|
"""
|
|
components = address.split(':')
|
|
if len(components) == 2:
|
|
try:
|
|
port = int(components[1])
|
|
except ValueError:
|
|
msg = f"Bad port: '{components[1]}' in '{address}'."
|
|
raise QMPBadPortError(msg) from None
|
|
return (components[0], port)
|
|
|
|
# Treat as filepath.
|
|
return address
|
|
|
|
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
|
|
"""
|
|
Connect to the QMP Monitor and perform capabilities negotiation.
|
|
|
|
:return: QMP greeting dict, or None if negotiate is false
|
|
:raise ConnectError: on connection errors
|
|
"""
|
|
assert self._address is not None
|
|
self._qmp.await_greeting = negotiate
|
|
self._qmp.negotiate = negotiate
|
|
|
|
self._sync(
|
|
self._qmp.connect(self._address)
|
|
)
|
|
return self._get_greeting()
|
|
|
|
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
|
|
"""
|
|
Await connection from QMP Monitor and perform capabilities negotiation.
|
|
|
|
:param timeout:
|
|
timeout in seconds (nonnegative float number, or None).
|
|
If None, there is no timeout, and this may block forever.
|
|
|
|
:return: QMP greeting dict
|
|
:raise ConnectError: on connection errors
|
|
"""
|
|
self._qmp.await_greeting = True
|
|
self._qmp.negotiate = True
|
|
|
|
self._sync(self._qmp.accept(), timeout)
|
|
|
|
ret = self._get_greeting()
|
|
assert ret is not None
|
|
return ret
|
|
|
|
def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
|
|
"""
|
|
Send a QMP command to the QMP Monitor.
|
|
|
|
:param qmp_cmd: QMP command to be sent as a Python dict
|
|
:return: QMP response as a Python dict
|
|
"""
|
|
return dict(
|
|
self._sync(
|
|
# pylint: disable=protected-access
|
|
|
|
# _raw() isn't a public API, because turning off
|
|
# automatic ID assignment is discouraged. For
|
|
# compatibility with iotests *only*, do it anyway.
|
|
self._qmp._raw(qmp_cmd, assign_id=False),
|
|
self._timeout
|
|
)
|
|
)
|
|
|
|
def cmd(self, name: str,
|
|
args: Optional[Dict[str, object]] = None,
|
|
cmd_id: Optional[object] = None) -> QMPMessage:
|
|
"""
|
|
Build a QMP command and send it to the QMP Monitor.
|
|
|
|
:param name: command name (string)
|
|
:param args: command arguments (dict)
|
|
:param cmd_id: command id (dict, list, string or int)
|
|
"""
|
|
qmp_cmd: QMPMessage = {'execute': name}
|
|
if args:
|
|
qmp_cmd['arguments'] = args
|
|
if cmd_id:
|
|
qmp_cmd['id'] = cmd_id
|
|
return self.cmd_obj(qmp_cmd)
|
|
|
|
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
|
|
"""
|
|
Build and send a QMP command to the monitor, report errors if any
|
|
"""
|
|
return self._sync(
|
|
self._qmp.execute(cmd, kwds),
|
|
self._timeout
|
|
)
|
|
|
|
def pull_event(self,
|
|
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
|
|
"""
|
|
Pulls a single event.
|
|
|
|
:param wait:
|
|
If False or 0, do not wait. Return None if no events ready.
|
|
If True, wait forever until the next event.
|
|
Otherwise, wait for the specified number of seconds.
|
|
|
|
:raise asyncio.TimeoutError:
|
|
When a timeout is requested and the timeout period elapses.
|
|
|
|
:return: The first available QMP event, or None.
|
|
"""
|
|
if not wait:
|
|
# wait is False/0: "do not wait, do not except."
|
|
if self._qmp.events.empty():
|
|
return None
|
|
|
|
# If wait is 'True', wait forever. If wait is False/0, the events
|
|
# queue must not be empty; but it still needs some real amount
|
|
# of time to complete.
|
|
timeout = None
|
|
if wait and isinstance(wait, float):
|
|
timeout = wait
|
|
|
|
return dict(
|
|
self._sync(
|
|
self._qmp.events.get(),
|
|
timeout
|
|
)
|
|
)
|
|
|
|
def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
|
|
"""
|
|
Get a list of QMP events and clear all pending events.
|
|
|
|
:param wait:
|
|
If False or 0, do not wait. Return None if no events ready.
|
|
If True, wait until we have at least one event.
|
|
Otherwise, wait for up to the specified number of seconds for at
|
|
least one event.
|
|
|
|
:raise asyncio.TimeoutError:
|
|
When a timeout is requested and the timeout period elapses.
|
|
|
|
:return: A list of QMP events.
|
|
"""
|
|
events = [dict(x) for x in self._qmp.events.clear()]
|
|
if events:
|
|
return events
|
|
|
|
event = self.pull_event(wait)
|
|
return [event] if event is not None else []
|
|
|
|
def clear_events(self) -> None:
|
|
"""Clear current list of pending events."""
|
|
self._qmp.events.clear()
|
|
|
|
def close(self) -> None:
|
|
"""Close the connection."""
|
|
self._sync(
|
|
self._qmp.disconnect()
|
|
)
|
|
|
|
def settimeout(self, timeout: Optional[float]) -> None:
|
|
"""
|
|
Set the timeout for QMP RPC execution.
|
|
|
|
This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
|
|
The `accept`, `pull_event` and `get_event` methods have their
|
|
own configurable timeouts.
|
|
|
|
:param timeout:
|
|
timeout in seconds, or None.
|
|
None will wait indefinitely.
|
|
"""
|
|
self._timeout = timeout
|
|
|
|
def send_fd_scm(self, fd: int) -> None:
|
|
"""
|
|
Send a file descriptor to the remote via SCM_RIGHTS.
|
|
"""
|
|
self._qmp.send_fd_scm(fd)
|
|
|
|
def __del__(self) -> None:
|
|
if self._qmp.runstate == Runstate.IDLE:
|
|
return
|
|
|
|
if not self._aloop.is_running():
|
|
self.close()
|
|
else:
|
|
# Garbage collection ran while the event loop was running.
|
|
# Nothing we can do about it now, but if we don't raise our
|
|
# own error, the user will be treated to a lot of traceback
|
|
# they might not understand.
|
|
raise QMPError(
|
|
"QEMUMonitorProtocol.close()"
|
|
" was not called before object was garbage collected"
|
|
)
|