e7874a50ff
typeshed (included in mypy) recently updated to improve the typing for WriteTransport objects. I was working around this, but now there's a version where I shouldn't work around it. Unfortunately this creates some minor ugliness if I want to support both pre- and post-0.950 versions. For now, for my sanity, just disable the unused-ignores warning. Signed-off-by: John Snow <jsnow@redhat.com> Reviewed-by: Paolo Bonzini <pbonzini@redhat.com> Message-Id: <20220526000921.1581503-2-jsnow@redhat.com> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
220 lines
6.1 KiB
Python
220 lines
6.1 KiB
Python
"""
|
||
Miscellaneous Utilities
|
||
|
||
This module provides asyncio utilities and compatibility wrappers for
|
||
Python 3.6 to provide some features that otherwise become available in
|
||
Python 3.7+.
|
||
|
||
Various logging and debugging utilities are also provided, such as
|
||
`exception_summary()` and `pretty_traceback()`, used primarily for
|
||
adding information into the logging stream.
|
||
"""
|
||
|
||
import asyncio
|
||
import sys
|
||
import traceback
|
||
from typing import (
|
||
Any,
|
||
Coroutine,
|
||
Optional,
|
||
TypeVar,
|
||
cast,
|
||
)
|
||
|
||
|
||
T = TypeVar('T')
|
||
|
||
|
||
# --------------------------
|
||
# Section: Utility Functions
|
||
# --------------------------
|
||
|
||
|
||
async def flush(writer: asyncio.StreamWriter) -> None:
|
||
"""
|
||
Utility function to ensure a StreamWriter is *fully* drained.
|
||
|
||
`asyncio.StreamWriter.drain` only promises we will return to below
|
||
the "high-water mark". This function ensures we flush the entire
|
||
buffer -- by setting the high water mark to 0 and then calling
|
||
drain. The flow control limits are restored after the call is
|
||
completed.
|
||
"""
|
||
transport = cast( # type: ignore[redundant-cast]
|
||
asyncio.WriteTransport, writer.transport
|
||
)
|
||
|
||
# https://github.com/python/typeshed/issues/5779
|
||
low, high = transport.get_write_buffer_limits() # type: ignore
|
||
transport.set_write_buffer_limits(0, 0)
|
||
try:
|
||
await writer.drain()
|
||
finally:
|
||
transport.set_write_buffer_limits(high, low)
|
||
|
||
|
||
def upper_half(func: T) -> T:
|
||
"""
|
||
Do-nothing decorator that annotates a method as an "upper-half" method.
|
||
|
||
These methods must not call bottom-half functions directly, but can
|
||
schedule them to run.
|
||
"""
|
||
return func
|
||
|
||
|
||
def bottom_half(func: T) -> T:
|
||
"""
|
||
Do-nothing decorator that annotates a method as a "bottom-half" method.
|
||
|
||
These methods must take great care to handle their own exceptions whenever
|
||
possible. If they go unhandled, they will cause termination of the loop.
|
||
|
||
These methods do not, in general, have the ability to directly
|
||
report information to a caller’s context and will usually be
|
||
collected as a Task result instead.
|
||
|
||
They must not call upper-half functions directly.
|
||
"""
|
||
return func
|
||
|
||
|
||
# -------------------------------
|
||
# Section: Compatibility Wrappers
|
||
# -------------------------------
|
||
|
||
|
||
def create_task(coro: Coroutine[Any, Any, T],
|
||
loop: Optional[asyncio.AbstractEventLoop] = None
|
||
) -> 'asyncio.Future[T]':
|
||
"""
|
||
Python 3.6-compatible `asyncio.create_task` wrapper.
|
||
|
||
:param coro: The coroutine to execute in a task.
|
||
:param loop: Optionally, the loop to create the task in.
|
||
|
||
:return: An `asyncio.Future` object.
|
||
"""
|
||
if sys.version_info >= (3, 7):
|
||
if loop is not None:
|
||
return loop.create_task(coro)
|
||
return asyncio.create_task(coro) # pylint: disable=no-member
|
||
|
||
# Python 3.6:
|
||
return asyncio.ensure_future(coro, loop=loop)
|
||
|
||
|
||
def is_closing(writer: asyncio.StreamWriter) -> bool:
|
||
"""
|
||
Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
|
||
|
||
:param writer: The `asyncio.StreamWriter` object.
|
||
:return: `True` if the writer is closing, or closed.
|
||
"""
|
||
if sys.version_info >= (3, 7):
|
||
return writer.is_closing()
|
||
|
||
# Python 3.6:
|
||
transport = writer.transport
|
||
assert isinstance(transport, asyncio.WriteTransport)
|
||
return transport.is_closing()
|
||
|
||
|
||
async def wait_closed(writer: asyncio.StreamWriter) -> None:
|
||
"""
|
||
Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
|
||
|
||
:param writer: The `asyncio.StreamWriter` to wait on.
|
||
"""
|
||
if sys.version_info >= (3, 7):
|
||
await writer.wait_closed()
|
||
return
|
||
|
||
# Python 3.6
|
||
transport = writer.transport
|
||
assert isinstance(transport, asyncio.WriteTransport)
|
||
|
||
while not transport.is_closing():
|
||
await asyncio.sleep(0)
|
||
|
||
# This is an ugly workaround, but it's the best I can come up with.
|
||
sock = transport.get_extra_info('socket')
|
||
|
||
if sock is None:
|
||
# Our transport doesn't have a socket? ...
|
||
# Nothing we can reasonably do.
|
||
return
|
||
|
||
while sock.fileno() != -1:
|
||
await asyncio.sleep(0)
|
||
|
||
|
||
def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
|
||
"""
|
||
Python 3.6-compatible `asyncio.run` wrapper.
|
||
|
||
:param coro: A coroutine to execute now.
|
||
:return: The return value from the coroutine.
|
||
"""
|
||
if sys.version_info >= (3, 7):
|
||
return asyncio.run(coro, debug=debug)
|
||
|
||
# Python 3.6
|
||
loop = asyncio.get_event_loop()
|
||
loop.set_debug(debug)
|
||
ret = loop.run_until_complete(coro)
|
||
loop.close()
|
||
|
||
return ret
|
||
|
||
|
||
# ----------------------------
|
||
# Section: Logging & Debugging
|
||
# ----------------------------
|
||
|
||
|
||
def exception_summary(exc: BaseException) -> str:
|
||
"""
|
||
Return a summary string of an arbitrary exception.
|
||
|
||
It will be of the form "ExceptionType: Error Message", if the error
|
||
string is non-empty, and just "ExceptionType" otherwise.
|
||
"""
|
||
name = type(exc).__qualname__
|
||
smod = type(exc).__module__
|
||
if smod not in ("__main__", "builtins"):
|
||
name = smod + '.' + name
|
||
|
||
error = str(exc)
|
||
if error:
|
||
return f"{name}: {error}"
|
||
return name
|
||
|
||
|
||
def pretty_traceback(prefix: str = " | ") -> str:
|
||
"""
|
||
Formats the current traceback, indented to provide visual distinction.
|
||
|
||
This is useful for printing a traceback within a traceback for
|
||
debugging purposes when encapsulating errors to deliver them up the
|
||
stack; when those errors are printed, this helps provide a nice
|
||
visual grouping to quickly identify the parts of the error that
|
||
belong to the inner exception.
|
||
|
||
:param prefix: The prefix to append to each line of the traceback.
|
||
:return: A string, formatted something like the following::
|
||
|
||
| Traceback (most recent call last):
|
||
| File "foobar.py", line 42, in arbitrary_example
|
||
| foo.baz()
|
||
| ArbitraryError: [Errno 42] Something bad happened!
|
||
"""
|
||
output = "".join(traceback.format_exception(*sys.exc_info()))
|
||
|
||
exc_lines = []
|
||
for line in output.split('\n'):
|
||
exc_lines.append(prefix + line)
|
||
|
||
# The last line is always empty, omit it
|
||
return "\n".join(exc_lines[:-1])
|