4c1fe7003c
_bind_hack() was a quick fix to allow async QMP to call bind(2) prior to
calling listen(2) and accept(2). This wasn't sufficient to fully address
the race condition present in synchronous clients.
With the race condition in legacy.py fixed (see the previous commit),
there are no longer any users of _bind_hack(). Drop it.
Fixes: b0b662bb2b
Signed-off-by: John Snow <jsnow@redhat.com>
Acked-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
Message-id: 20220225205948.3693480-11-jsnow@redhat.com
[Expanded commit message. --js]
Signed-off-by: John Snow <jsnow@redhat.com>
178 lines
5.0 KiB
Python
178 lines
5.0 KiB
Python
"""
|
|
Sync QMP Wrapper
|
|
|
|
This class pretends to be qemu.qmp.QEMUMonitorProtocol.
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import (
|
|
Any,
|
|
Awaitable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
|
|
import qemu.qmp
|
|
|
|
from .error import QMPError
|
|
from .protocol import Runstate, SocketAddrT
|
|
from .qmp_client import QMPClient
|
|
|
|
|
|
# (Temporarily) Re-export QMPBadPortError
|
|
QMPBadPortError = qemu.qmp.QMPBadPortError
|
|
|
|
#: QMPMessage is an entire QMP message of any kind.
|
|
QMPMessage = Dict[str, Any]
|
|
|
|
#: QMPReturnValue is the 'return' value of a command.
|
|
QMPReturnValue = object
|
|
|
|
#: QMPObject is any object in a QMP message.
|
|
QMPObject = Dict[str, object]
|
|
|
|
# QMPMessage can be outgoing commands or incoming events/returns.
|
|
# QMPReturnValue is usually a dict/json object, but due to QAPI's
|
|
# 'returns-whitelist', it can actually be anything.
|
|
#
|
|
# {'return': {}} is a QMPMessage,
|
|
# {} is the QMPReturnValue.
|
|
|
|
|
|
# pylint: disable=missing-docstring
|
|
|
|
|
|
class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
|
|
def __init__(self, address: SocketAddrT,
|
|
server: bool = False,
|
|
nickname: Optional[str] = None):
|
|
|
|
# pylint: disable=super-init-not-called
|
|
self._aqmp = QMPClient(nickname)
|
|
self._aloop = asyncio.get_event_loop()
|
|
self._address = address
|
|
self._timeout: Optional[float] = None
|
|
|
|
if server:
|
|
self._sync(self._aqmp.start_server(self._address))
|
|
|
|
_T = TypeVar('_T')
|
|
|
|
def _sync(
|
|
self, future: Awaitable[_T], timeout: Optional[float] = None
|
|
) -> _T:
|
|
return self._aloop.run_until_complete(
|
|
asyncio.wait_for(future, timeout=timeout)
|
|
)
|
|
|
|
def _get_greeting(self) -> Optional[QMPMessage]:
|
|
if self._aqmp.greeting is not None:
|
|
# pylint: disable=protected-access
|
|
return self._aqmp.greeting._asdict()
|
|
return None
|
|
|
|
# __enter__ and __exit__ need no changes
|
|
# parse_address needs no changes
|
|
|
|
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
|
|
self._aqmp.await_greeting = negotiate
|
|
self._aqmp.negotiate = negotiate
|
|
|
|
self._sync(
|
|
self._aqmp.connect(self._address)
|
|
)
|
|
return self._get_greeting()
|
|
|
|
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
|
|
self._aqmp.await_greeting = True
|
|
self._aqmp.negotiate = True
|
|
|
|
self._sync(self._aqmp.accept(), timeout)
|
|
|
|
ret = self._get_greeting()
|
|
assert ret is not None
|
|
return ret
|
|
|
|
def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
|
|
return dict(
|
|
self._sync(
|
|
# pylint: disable=protected-access
|
|
|
|
# _raw() isn't a public API, because turning off
|
|
# automatic ID assignment is discouraged. For
|
|
# compatibility with iotests *only*, do it anyway.
|
|
self._aqmp._raw(qmp_cmd, assign_id=False),
|
|
self._timeout
|
|
)
|
|
)
|
|
|
|
# Default impl of cmd() delegates to cmd_obj
|
|
|
|
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
|
|
return self._sync(
|
|
self._aqmp.execute(cmd, kwds),
|
|
self._timeout
|
|
)
|
|
|
|
def pull_event(self,
|
|
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
|
|
if not wait:
|
|
# wait is False/0: "do not wait, do not except."
|
|
if self._aqmp.events.empty():
|
|
return None
|
|
|
|
# If wait is 'True', wait forever. If wait is False/0, the events
|
|
# queue must not be empty; but it still needs some real amount
|
|
# of time to complete.
|
|
timeout = None
|
|
if wait and isinstance(wait, float):
|
|
timeout = wait
|
|
|
|
return dict(
|
|
self._sync(
|
|
self._aqmp.events.get(),
|
|
timeout
|
|
)
|
|
)
|
|
|
|
def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
|
|
events = [dict(x) for x in self._aqmp.events.clear()]
|
|
if events:
|
|
return events
|
|
|
|
event = self.pull_event(wait)
|
|
return [event] if event is not None else []
|
|
|
|
def clear_events(self) -> None:
|
|
self._aqmp.events.clear()
|
|
|
|
def close(self) -> None:
|
|
self._sync(
|
|
self._aqmp.disconnect()
|
|
)
|
|
|
|
def settimeout(self, timeout: Optional[float]) -> None:
|
|
self._timeout = timeout
|
|
|
|
def send_fd_scm(self, fd: int) -> None:
|
|
self._aqmp.send_fd_scm(fd)
|
|
|
|
def __del__(self) -> None:
|
|
if self._aqmp.runstate == Runstate.IDLE:
|
|
return
|
|
|
|
if not self._aloop.is_running():
|
|
self.close()
|
|
else:
|
|
# Garbage collection ran while the event loop was running.
|
|
# Nothing we can do about it now, but if we don't raise our
|
|
# own error, the user will be treated to a lot of traceback
|
|
# they might not understand.
|
|
raise QMPError(
|
|
"QEMUMonitorProtocol.close()"
|
|
" was not called before object was garbage collected"
|
|
)
|