python/aqmp: add start_server() and accept() methods
Add start_server() and accept() methods that can be used instead of start_server_and_accept() to allow more fine-grained control over the incoming connection process. (Eagle-eyed reviewers will surely notice that it's a bit weird that "CONNECTING" is a state that's shared between both the start_server() and connect() states. That's absolutely true, and it's very true that checking on the presence of _accepted as an indicator of state is a hack. That's also very certainly true. But ... this keeps client code an awful lot simpler, as it doesn't have to care exactly *how* the connection is being made, just that it *is*. Is it worth disrupting that simplicity in order to provide a better state guard on `accept()`? Hm.) Signed-off-by: John Snow <jsnow@redhat.com> Acked-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: Daniel P. Berrangé <berrange@redhat.com> Message-id: 20220225205948.3693480-9-jsnow@redhat.com Signed-off-by: John Snow <jsnow@redhat.com>
This commit is contained in:
parent
32c5abf051
commit
481607c7d3
|
@ -280,6 +280,8 @@ class AsyncProtocol(Generic[T]):
|
||||||
Accept a connection and begin processing message queues.
|
Accept a connection and begin processing message queues.
|
||||||
|
|
||||||
If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
|
If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
|
||||||
|
This method is precisely equivalent to calling `start_server()`
|
||||||
|
followed by `accept()`.
|
||||||
|
|
||||||
:param address:
|
:param address:
|
||||||
Address to listen on; UNIX socket path or TCP address/port.
|
Address to listen on; UNIX socket path or TCP address/port.
|
||||||
|
@ -294,9 +296,62 @@ class AsyncProtocol(Generic[T]):
|
||||||
protocol-level failure occurs while establishing a new
|
protocol-level failure occurs while establishing a new
|
||||||
session, the wrapped error may also be an `QMPError`.
|
session, the wrapped error may also be an `QMPError`.
|
||||||
"""
|
"""
|
||||||
|
await self.start_server(address, ssl)
|
||||||
|
await self.accept()
|
||||||
|
assert self.runstate == Runstate.RUNNING
|
||||||
|
|
||||||
|
@upper_half
|
||||||
|
@require(Runstate.IDLE)
|
||||||
|
async def start_server(self, address: SocketAddrT,
|
||||||
|
ssl: Optional[SSLContext] = None) -> None:
|
||||||
|
"""
|
||||||
|
Start listening for an incoming connection, but do not wait for a peer.
|
||||||
|
|
||||||
|
This method starts listening for an incoming connection, but
|
||||||
|
does not block waiting for a peer. This call will return
|
||||||
|
immediately after binding and listening on a socket. A later
|
||||||
|
call to `accept()` must be made in order to finalize the
|
||||||
|
incoming connection.
|
||||||
|
|
||||||
|
:param address:
|
||||||
|
Address to listen on; UNIX socket path or TCP address/port.
|
||||||
|
:param ssl: SSL context to use, if any.
|
||||||
|
|
||||||
|
:raise StateError: When the `Runstate` is not `IDLE`.
|
||||||
|
:raise ConnectError:
|
||||||
|
When the server could not start listening on this address.
|
||||||
|
|
||||||
|
This exception will wrap a more concrete one. In most cases,
|
||||||
|
the wrapped exception will be `OSError`.
|
||||||
|
"""
|
||||||
await self._session_guard(
|
await self._session_guard(
|
||||||
self._do_start_server(address, ssl),
|
self._do_start_server(address, ssl),
|
||||||
'Failed to establish connection')
|
'Failed to establish connection')
|
||||||
|
assert self.runstate == Runstate.CONNECTING
|
||||||
|
|
||||||
|
@upper_half
|
||||||
|
@require(Runstate.CONNECTING)
|
||||||
|
async def accept(self) -> None:
|
||||||
|
"""
|
||||||
|
Accept an incoming connection and begin processing message queues.
|
||||||
|
|
||||||
|
If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
|
||||||
|
|
||||||
|
:raise StateError: When the `Runstate` is not `CONNECTING`.
|
||||||
|
:raise QMPError: When `start_server()` was not called yet.
|
||||||
|
:raise ConnectError:
|
||||||
|
When a connection or session cannot be established.
|
||||||
|
|
||||||
|
This exception will wrap a more concrete one. In most cases,
|
||||||
|
the wrapped exception will be `OSError` or `EOFError`. If a
|
||||||
|
protocol-level failure occurs while establishing a new
|
||||||
|
session, the wrapped error may also be an `QMPError`.
|
||||||
|
"""
|
||||||
|
if self._accepted is None:
|
||||||
|
raise QMPError("Cannot call accept() before start_server().")
|
||||||
|
await self._session_guard(
|
||||||
|
self._do_accept(),
|
||||||
|
'Failed to establish connection')
|
||||||
await self._session_guard(
|
await self._session_guard(
|
||||||
self._establish_session(),
|
self._establish_session(),
|
||||||
'Failed to establish session')
|
'Failed to establish session')
|
||||||
|
@ -512,7 +567,12 @@ class AsyncProtocol(Generic[T]):
|
||||||
async def _do_start_server(self, address: SocketAddrT,
|
async def _do_start_server(self, address: SocketAddrT,
|
||||||
ssl: Optional[SSLContext] = None) -> None:
|
ssl: Optional[SSLContext] = None) -> None:
|
||||||
"""
|
"""
|
||||||
Acting as the transport server, accept a single connection.
|
Start listening for an incoming connection, but do not wait for a peer.
|
||||||
|
|
||||||
|
This method starts listening for an incoming connection, but does not
|
||||||
|
block waiting for a peer. This call will return immediately after
|
||||||
|
binding and listening to a socket. A later call to accept() must be
|
||||||
|
made in order to finalize the incoming connection.
|
||||||
|
|
||||||
:param address:
|
:param address:
|
||||||
Address to listen on; UNIX socket path or TCP address/port.
|
Address to listen on; UNIX socket path or TCP address/port.
|
||||||
|
@ -554,10 +614,7 @@ class AsyncProtocol(Generic[T]):
|
||||||
# This will start the server (bind(2), listen(2)). It will also
|
# This will start the server (bind(2), listen(2)). It will also
|
||||||
# call accept(2) if we yield, but we don't block on that here.
|
# call accept(2) if we yield, but we don't block on that here.
|
||||||
self._server = await coro
|
self._server = await coro
|
||||||
|
self.logger.debug("Server listening on %s", address)
|
||||||
# Just for this one commit, wait for a peer.
|
|
||||||
# This gets split out in the next patch.
|
|
||||||
await self._do_accept()
|
|
||||||
|
|
||||||
@upper_half
|
@upper_half
|
||||||
async def _do_accept(self) -> None:
|
async def _do_accept(self) -> None:
|
||||||
|
|
|
@ -43,11 +43,18 @@ class NullProtocol(AsyncProtocol[None]):
|
||||||
|
|
||||||
async def _do_start_server(self, address, ssl=None):
|
async def _do_start_server(self, address, ssl=None):
|
||||||
if self.fake_session:
|
if self.fake_session:
|
||||||
|
self._accepted = asyncio.Event()
|
||||||
self._set_state(Runstate.CONNECTING)
|
self._set_state(Runstate.CONNECTING)
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
else:
|
else:
|
||||||
await super()._do_start_server(address, ssl)
|
await super()._do_start_server(address, ssl)
|
||||||
|
|
||||||
|
async def _do_accept(self):
|
||||||
|
if self.fake_session:
|
||||||
|
self._accepted = None
|
||||||
|
else:
|
||||||
|
await super()._do_accept()
|
||||||
|
|
||||||
async def _do_connect(self, address, ssl=None):
|
async def _do_connect(self, address, ssl=None):
|
||||||
if self.fake_session:
|
if self.fake_session:
|
||||||
self._set_state(Runstate.CONNECTING)
|
self._set_state(Runstate.CONNECTING)
|
||||||
|
|
Loading…
Reference in New Issue