python/qemu: Change ConsoleSocket to optionally drain socket.

The primary purpose of this change is to clean up
machine.py's console_socket property to return a single type,
a ConsoleSocket.

ConsoleSocket now derives from a socket, which means that
in the default case (of not draining), machine.py
will see the same behavior as it did prior to ConsoleSocket.

Signed-off-by: Robert Foley <robert.foley@linaro.org>
Signed-off-by: Alex Bennée <alex.bennee@linaro.org>
Message-Id: <20200717203041.9867-3-robert.foley@linaro.org>
Message-Id: <20200724064509.331-16-alex.bennee@linaro.org>
This commit is contained in:
Robert Foley 2020-07-24 07:45:08 +01:00 committed by Alex Bennée
parent 4b84d87449
commit 80ded8e99d
2 changed files with 59 additions and 46 deletions

View File

@ -13,68 +13,75 @@ which can drain a socket and optionally dump the bytes to file.
# the COPYING file in the top-level directory. # the COPYING file in the top-level directory.
# #
import asyncore
import socket import socket
import threading import threading
from collections import deque from collections import deque
import time import time
class ConsoleSocket(asyncore.dispatcher): class ConsoleSocket(socket.socket):
""" """
ConsoleSocket represents a socket attached to a char device. ConsoleSocket represents a socket attached to a char device.
Drains the socket and places the bytes into an in memory buffer Optionally (if drain==True), drains the socket and places the bytes
for later processing. into an in memory buffer for later processing.
Optionally a file path can be passed in and we will also Optionally a file path can be passed in and we will also
dump the characters to this file for debugging purposes. dump the characters to this file for debugging purposes.
""" """
def __init__(self, address, file=None): def __init__(self, address, file=None, drain=False):
self._recv_timeout_sec = 300 self._recv_timeout_sec = 300
self._sleep_time = 0.5 self._sleep_time = 0.5
self._buffer = deque() self._buffer = deque()
self._asyncore_thread = None socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.connect(address)
self._sock.connect(address)
self._logfile = None self._logfile = None
if file: if file:
self._logfile = open(file, "w") self._logfile = open(file, "w")
asyncore.dispatcher.__init__(self, sock=self._sock)
self._open = True self._open = True
self._thread_start() if drain:
self._drain_thread = self._thread_start()
else:
self._drain_thread = None
def _drain_fn(self):
"""Drains the socket and runs while the socket is open."""
while self._open:
try:
self._drain_socket()
except socket.timeout:
# The socket is expected to timeout since we set a
# short timeout to allow the thread to exit when
# self._open is set to False.
time.sleep(self._sleep_time)
def _thread_start(self): def _thread_start(self):
"""Kick off a thread to wait on the asyncore.loop""" """Kick off a thread to drain the socket."""
if self._asyncore_thread is not None: # Configure socket to not block and timeout.
return # This allows our drain thread to not block
self._asyncore_thread = threading.Thread(target=asyncore.loop, # on recieve and exit smoothly.
kwargs={'timeout':1}) socket.socket.setblocking(self, False)
self._asyncore_thread.daemon = True socket.socket.settimeout(self, 1)
self._asyncore_thread.start() drain_thread = threading.Thread(target=self._drain_fn)
drain_thread.daemon = True
def handle_close(self): drain_thread.start()
"""redirect close to base class""" return drain_thread
# Call the base class close, but not self.close() since
# handle_close() occurs in the context of the thread which
# self.close() attempts to join.
asyncore.dispatcher.close(self)
def close(self): def close(self):
"""Close the base object and wait for the thread to terminate""" """Close the base object and wait for the thread to terminate"""
if self._open: if self._open:
self._open = False self._open = False
asyncore.dispatcher.close(self) if self._drain_thread is not None:
if self._asyncore_thread is not None: thread, self._drain_thread = self._drain_thread, None
thread, self._asyncore_thread = self._asyncore_thread, None
thread.join() thread.join()
socket.socket.close(self)
if self._logfile: if self._logfile:
self._logfile.close() self._logfile.close()
self._logfile = None self._logfile = None
def handle_read(self): def _drain_socket(self):
"""process arriving characters into in memory _buffer""" """process arriving characters into in memory _buffer"""
data = asyncore.dispatcher.recv(self, 1) data = socket.socket.recv(self, 1)
# latin1 is needed since there are some chars # latin1 is needed since there are some chars
# we are receiving that cannot be encoded to utf-8 # we are receiving that cannot be encoded to utf-8
# such as 0xe2, 0x80, 0xA6. # such as 0xe2, 0x80, 0xA6.
@ -85,27 +92,38 @@ class ConsoleSocket(asyncore.dispatcher):
for c in string: for c in string:
self._buffer.extend(c) self._buffer.extend(c)
def recv(self, buffer_size=1): def recv(self, bufsize=1):
"""Return chars from in memory buffer. """Return chars from in memory buffer.
Maintains the same API as socket.socket.recv. Maintains the same API as socket.socket.recv.
""" """
if self._drain_thread is None:
# Not buffering the socket, pass thru to socket.
return socket.socket.recv(self, bufsize)
start_time = time.time() start_time = time.time()
while len(self._buffer) < buffer_size: while len(self._buffer) < bufsize:
time.sleep(self._sleep_time) time.sleep(self._sleep_time)
elapsed_sec = time.time() - start_time elapsed_sec = time.time() - start_time
if elapsed_sec > self._recv_timeout_sec: if elapsed_sec > self._recv_timeout_sec:
raise socket.timeout raise socket.timeout
chars = ''.join([self._buffer.popleft() for i in range(buffer_size)]) chars = ''.join([self._buffer.popleft() for i in range(bufsize)])
# We choose to use latin1 to remain consistent with # We choose to use latin1 to remain consistent with
# handle_read() and give back the same data as the user would # handle_read() and give back the same data as the user would
# receive if they were reading directly from the # receive if they were reading directly from the
# socket w/o our intervention. # socket w/o our intervention.
return chars.encode("latin1") return chars.encode("latin1")
def set_blocking(self): def setblocking(self, value):
"""Maintain compatibility with socket API""" """When not draining we pass thru to the socket,
pass since when draining we control socket blocking.
"""
if self._drain_thread is None:
socket.socket.setblocking(self, value)
def settimeout(self, seconds): def settimeout(self, seconds):
"""Set current timeout on recv""" """When not draining we pass thru to the socket,
self._recv_timeout_sec = seconds since when draining we control the timeout.
"""
if seconds is not None:
self._recv_timeout_sec = seconds
if self._drain_thread is None:
socket.socket.settimeout(self, seconds)

View File

@ -23,7 +23,6 @@ import os
import subprocess import subprocess
import shutil import shutil
import signal import signal
import socket
import tempfile import tempfile
from typing import Optional, Type from typing import Optional, Type
from types import TracebackType from types import TracebackType
@ -673,12 +672,8 @@ class QEMUMachine:
Returns a socket connected to the console Returns a socket connected to the console
""" """
if self._console_socket is None: if self._console_socket is None:
if self._drain_console: self._console_socket = console_socket.ConsoleSocket(
self._console_socket = console_socket.ConsoleSocket( self._console_address,
self._console_address, file=self._console_log_path,
file=self._console_log_path) drain=self._drain_console)
else:
self._console_socket = socket.socket(socket.AF_UNIX,
socket.SOCK_STREAM)
self._console_socket.connect(self._console_address)
return self._console_socket return self._console_socket