diff --git a/python/qemu/aqmp/legacy.py b/python/qemu/aqmp/legacy.py index 0890f95b16..6baa5f3409 100644 --- a/python/qemu/aqmp/legacy.py +++ b/python/qemu/aqmp/legacy.py @@ -56,6 +56,9 @@ class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol): self._address = address self._timeout: Optional[float] = None + if server: + self._aqmp._bind_hack(address) # pylint: disable=protected-access + _T = TypeVar('_T') def _sync( diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 50e973c2f2..33358f5cd7 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -15,6 +15,7 @@ from asyncio import StreamReader, StreamWriter from enum import Enum from functools import wraps import logging +import socket from ssl import SSLContext from typing import ( Any, @@ -238,6 +239,9 @@ class AsyncProtocol(Generic[T]): self._runstate = Runstate.IDLE self._runstate_changed: Optional[asyncio.Event] = None + # Workaround for bind() + self._sock: Optional[socket.socket] = None + def __repr__(self) -> str: cls_name = type(self).__name__ tokens = [] @@ -427,6 +431,34 @@ class AsyncProtocol(Generic[T]): else: await self._do_connect(address, ssl) + def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None: + """ + Used to create a socket in advance of accept(). + + This is a workaround to ensure that we can guarantee timing of + precisely when a socket exists to avoid a connection attempt + bouncing off of nothing. + + Python 3.7+ adds a feature to separate the server creation and + listening phases instead, and should be used instead of this + hack. + """ + if isinstance(address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + + sock = socket.socket(family, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + try: + sock.bind(address) + except: + sock.close() + raise + + self._sock = sock + @upper_half async def _do_accept(self, address: SocketAddrT, ssl: Optional[SSLContext] = None) -> None: @@ -464,24 +496,27 @@ class AsyncProtocol(Generic[T]): if isinstance(address, tuple): coro = asyncio.start_server( _client_connected_cb, - host=address[0], - port=address[1], + host=None if self._sock else address[0], + port=None if self._sock else address[1], ssl=ssl, backlog=1, limit=self._limit, + sock=self._sock, ) else: coro = asyncio.start_unix_server( _client_connected_cb, - path=address, + path=None if self._sock else address, ssl=ssl, backlog=1, limit=self._limit, + sock=self._sock, ) server = await coro # Starts listening await connected.wait() # Waits for the callback to fire (and finish) assert server is None + self._sock = None self.logger.debug("Connection accepted.")