""" Sync QMP Wrapper This class pretends to be qemu.qmp.QEMUMonitorProtocol. """ import asyncio from typing import ( Any, Awaitable, Dict, List, Optional, TypeVar, Union, ) import qemu.qmp from .error import QMPError from .protocol import Runstate, SocketAddrT from .qmp_client import QMPClient # (Temporarily) Re-export QMPBadPortError QMPBadPortError = qemu.qmp.QMPBadPortError #: QMPMessage is an entire QMP message of any kind. QMPMessage = Dict[str, Any] #: QMPReturnValue is the 'return' value of a command. QMPReturnValue = object #: QMPObject is any object in a QMP message. QMPObject = Dict[str, object] # QMPMessage can be outgoing commands or incoming events/returns. # QMPReturnValue is usually a dict/json object, but due to QAPI's # 'returns-whitelist', it can actually be anything. # # {'return': {}} is a QMPMessage, # {} is the QMPReturnValue. # pylint: disable=missing-docstring class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): def __init__(self, address: SocketAddrT, server: bool = False, nickname: Optional[str] = None): # pylint: disable=super-init-not-called self._aqmp = QMPClient(nickname) self._aloop = asyncio.get_event_loop() self._address = address self._timeout: Optional[float] = None if server: self._aqmp._bind_hack(address) # pylint: disable=protected-access _T = TypeVar('_T') def _sync( self, future: Awaitable[_T], timeout: Optional[float] = None ) -> _T: return self._aloop.run_until_complete( asyncio.wait_for(future, timeout=timeout) ) def _get_greeting(self) -> Optional[QMPMessage]: if self._aqmp.greeting is not None: # pylint: disable=protected-access return self._aqmp.greeting._asdict() return None # __enter__ and __exit__ need no changes # parse_address needs no changes def connect(self, negotiate: bool = True) -> Optional[QMPMessage]: self._aqmp.await_greeting = negotiate self._aqmp.negotiate = negotiate self._sync( self._aqmp.connect(self._address) ) return self._get_greeting() def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage: self._aqmp.await_greeting = True self._aqmp.negotiate = True self._sync( self._aqmp.accept(self._address), timeout ) ret = self._get_greeting() assert ret is not None return ret def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage: return dict( self._sync( # pylint: disable=protected-access # _raw() isn't a public API, because turning off # automatic ID assignment is discouraged. For # compatibility with iotests *only*, do it anyway. self._aqmp._raw(qmp_cmd, assign_id=False), self._timeout ) ) # Default impl of cmd() delegates to cmd_obj def command(self, cmd: str, **kwds: object) -> QMPReturnValue: return self._sync( self._aqmp.execute(cmd, kwds), self._timeout ) def pull_event(self, wait: Union[bool, float] = False) -> Optional[QMPMessage]: if not wait: # wait is False/0: "do not wait, do not except." if self._aqmp.events.empty(): return None # If wait is 'True', wait forever. If wait is False/0, the events # queue must not be empty; but it still needs some real amount # of time to complete. timeout = None if wait and isinstance(wait, float): timeout = wait return dict( self._sync( self._aqmp.events.get(), timeout ) ) def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]: events = [dict(x) for x in self._aqmp.events.clear()] if events: return events event = self.pull_event(wait) return [event] if event is not None else [] def clear_events(self) -> None: self._aqmp.events.clear() def close(self) -> None: self._sync( self._aqmp.disconnect() ) def settimeout(self, timeout: Optional[float]) -> None: self._timeout = timeout def send_fd_scm(self, fd: int) -> None: self._aqmp.send_fd_scm(fd) def __del__(self) -> None: if self._aqmp.runstate == Runstate.IDLE: return if not self._aloop.is_running(): self.close() else: # Garbage collection ran while the event loop was running. # Nothing we can do about it now, but if we don't raise our # own error, the user will be treated to a lot of traceback # they might not understand. raise QMPError( "QEMUMonitorProtocol.close()" " was not called before object was garbage collected" )