iotests: add QMP event waiting queue
A filter is added to allow callers to request very specific events to be pulled from the event queue, while leaving undesired events still in the stream. This allows us to poll for completion data for multiple asynchronous events in any arbitrary order. A new timeout context is added to the qmp pull_event method's wait parameter to allow tests to fail if they do not complete within some expected period of time. Also fixed is a bug in qmp.pull_event where we try to retrieve an event from an empty list if we attempt to retrieve an event with wait=False but no events have occurred. Signed-off-by: John Snow <jsnow@redhat.com> Reviewed-by: Max Reitz <mreitz@redhat.com> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Message-id: 1429314609-29776-19-git-send-email-jsnow@redhat.com Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
This commit is contained in:
parent
9f7264f57c
commit
7898f74e78
@ -21,6 +21,9 @@ class QMPConnectError(QMPError):
|
|||||||
class QMPCapabilitiesError(QMPError):
|
class QMPCapabilitiesError(QMPError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class QMPTimeoutError(QMPError):
|
||||||
|
pass
|
||||||
|
|
||||||
class QEMUMonitorProtocol:
|
class QEMUMonitorProtocol:
|
||||||
def __init__(self, address, server=False):
|
def __init__(self, address, server=False):
|
||||||
"""
|
"""
|
||||||
@ -72,6 +75,44 @@ class QEMUMonitorProtocol:
|
|||||||
|
|
||||||
error = socket.error
|
error = socket.error
|
||||||
|
|
||||||
|
def __get_events(self, wait=False):
|
||||||
|
"""
|
||||||
|
Check for new events in the stream and cache them in __events.
|
||||||
|
|
||||||
|
@param wait (bool): block until an event is available.
|
||||||
|
@param wait (float): If wait is a float, treat it as a timeout value.
|
||||||
|
|
||||||
|
@raise QMPTimeoutError: If a timeout float is provided and the timeout
|
||||||
|
period elapses.
|
||||||
|
@raise QMPConnectError: If wait is True but no events could be retrieved
|
||||||
|
or if some other error occurred.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Check for new events regardless and pull them into the cache:
|
||||||
|
self.__sock.setblocking(0)
|
||||||
|
try:
|
||||||
|
self.__json_read()
|
||||||
|
except socket.error, err:
|
||||||
|
if err[0] == errno.EAGAIN:
|
||||||
|
# No data available
|
||||||
|
pass
|
||||||
|
self.__sock.setblocking(1)
|
||||||
|
|
||||||
|
# Wait for new events, if needed.
|
||||||
|
# if wait is 0.0, this means "no wait" and is also implicitly false.
|
||||||
|
if not self.__events and wait:
|
||||||
|
if isinstance(wait, float):
|
||||||
|
self.__sock.settimeout(wait)
|
||||||
|
try:
|
||||||
|
ret = self.__json_read(only_event=True)
|
||||||
|
except socket.timeout:
|
||||||
|
raise QMPTimeoutError("Timeout waiting for event")
|
||||||
|
except:
|
||||||
|
raise QMPConnectError("Error while reading from socket")
|
||||||
|
if ret is None:
|
||||||
|
raise QMPConnectError("Error while reading from socket")
|
||||||
|
self.__sock.settimeout(None)
|
||||||
|
|
||||||
def connect(self, negotiate=True):
|
def connect(self, negotiate=True):
|
||||||
"""
|
"""
|
||||||
Connect to the QMP Monitor and perform capabilities negotiation.
|
Connect to the QMP Monitor and perform capabilities negotiation.
|
||||||
@ -140,43 +181,37 @@ class QEMUMonitorProtocol:
|
|||||||
"""
|
"""
|
||||||
Get and delete the first available QMP event.
|
Get and delete the first available QMP event.
|
||||||
|
|
||||||
@param wait: block until an event is available (bool)
|
@param wait (bool): block until an event is available.
|
||||||
|
@param wait (float): If wait is a float, treat it as a timeout value.
|
||||||
|
|
||||||
|
@raise QMPTimeoutError: If a timeout float is provided and the timeout
|
||||||
|
period elapses.
|
||||||
|
@raise QMPConnectError: If wait is True but no events could be retrieved
|
||||||
|
or if some other error occurred.
|
||||||
|
|
||||||
|
@return The first available QMP event, or None.
|
||||||
"""
|
"""
|
||||||
self.__sock.setblocking(0)
|
self.__get_events(wait)
|
||||||
try:
|
|
||||||
self.__json_read()
|
if self.__events:
|
||||||
except socket.error, err:
|
return self.__events.pop(0)
|
||||||
if err[0] == errno.EAGAIN:
|
return None
|
||||||
# No data available
|
|
||||||
pass
|
|
||||||
self.__sock.setblocking(1)
|
|
||||||
if not self.__events and wait:
|
|
||||||
self.__json_read(only_event=True)
|
|
||||||
event = self.__events[0]
|
|
||||||
del self.__events[0]
|
|
||||||
return event
|
|
||||||
|
|
||||||
def get_events(self, wait=False):
|
def get_events(self, wait=False):
|
||||||
"""
|
"""
|
||||||
Get a list of available QMP events.
|
Get a list of available QMP events.
|
||||||
|
|
||||||
@param wait: block until an event is available (bool)
|
@param wait (bool): block until an event is available.
|
||||||
"""
|
@param wait (float): If wait is a float, treat it as a timeout value.
|
||||||
self.__sock.setblocking(0)
|
|
||||||
try:
|
|
||||||
self.__json_read()
|
|
||||||
except socket.error, err:
|
|
||||||
if err[0] == errno.EAGAIN:
|
|
||||||
# No data available
|
|
||||||
pass
|
|
||||||
self.__sock.setblocking(1)
|
|
||||||
if not self.__events and wait:
|
|
||||||
ret = self.__json_read(only_event=True)
|
|
||||||
if ret == None:
|
|
||||||
# We are in blocking mode, if don't get anything, something
|
|
||||||
# went wrong
|
|
||||||
raise QMPConnectError("Error while reading from socket")
|
|
||||||
|
|
||||||
|
@raise QMPTimeoutError: If a timeout float is provided and the timeout
|
||||||
|
period elapses.
|
||||||
|
@raise QMPConnectError: If wait is True but no events could be retrieved
|
||||||
|
or if some other error occurred.
|
||||||
|
|
||||||
|
@return The list of available QMP events.
|
||||||
|
"""
|
||||||
|
self.__get_events(wait)
|
||||||
return self.__events
|
return self.__events
|
||||||
|
|
||||||
def clear_events(self):
|
def clear_events(self):
|
||||||
|
@ -78,6 +78,23 @@ def create_image(name, size):
|
|||||||
i = i + 512
|
i = i + 512
|
||||||
file.close()
|
file.close()
|
||||||
|
|
||||||
|
# Test if 'match' is a recursive subset of 'event'
|
||||||
|
def event_match(event, match=None):
|
||||||
|
if match is None:
|
||||||
|
return True
|
||||||
|
|
||||||
|
for key in match:
|
||||||
|
if key in event:
|
||||||
|
if isinstance(event[key], dict):
|
||||||
|
if not event_match(event[key], match[key]):
|
||||||
|
return False
|
||||||
|
elif event[key] != match[key]:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
class VM(object):
|
class VM(object):
|
||||||
'''A QEMU VM'''
|
'''A QEMU VM'''
|
||||||
|
|
||||||
@ -92,6 +109,7 @@ class VM(object):
|
|||||||
'-machine', 'accel=qtest',
|
'-machine', 'accel=qtest',
|
||||||
'-display', 'none', '-vga', 'none']
|
'-display', 'none', '-vga', 'none']
|
||||||
self._num_drives = 0
|
self._num_drives = 0
|
||||||
|
self._events = []
|
||||||
|
|
||||||
# This can be used to add an unused monitor instance.
|
# This can be used to add an unused monitor instance.
|
||||||
def add_monitor_telnet(self, ip, port):
|
def add_monitor_telnet(self, ip, port):
|
||||||
@ -202,14 +220,34 @@ class VM(object):
|
|||||||
|
|
||||||
def get_qmp_event(self, wait=False):
|
def get_qmp_event(self, wait=False):
|
||||||
'''Poll for one queued QMP events and return it'''
|
'''Poll for one queued QMP events and return it'''
|
||||||
|
if len(self._events) > 0:
|
||||||
|
return self._events.pop(0)
|
||||||
return self._qmp.pull_event(wait=wait)
|
return self._qmp.pull_event(wait=wait)
|
||||||
|
|
||||||
def get_qmp_events(self, wait=False):
|
def get_qmp_events(self, wait=False):
|
||||||
'''Poll for queued QMP events and return a list of dicts'''
|
'''Poll for queued QMP events and return a list of dicts'''
|
||||||
events = self._qmp.get_events(wait=wait)
|
events = self._qmp.get_events(wait=wait)
|
||||||
|
events.extend(self._events)
|
||||||
|
del self._events[:]
|
||||||
self._qmp.clear_events()
|
self._qmp.clear_events()
|
||||||
return events
|
return events
|
||||||
|
|
||||||
|
def event_wait(self, name='BLOCK_JOB_COMPLETED', timeout=60.0, match=None):
|
||||||
|
# Search cached events
|
||||||
|
for event in self._events:
|
||||||
|
if (event['event'] == name) and event_match(event, match):
|
||||||
|
self._events.remove(event)
|
||||||
|
return event
|
||||||
|
|
||||||
|
# Poll for new events
|
||||||
|
while True:
|
||||||
|
event = self._qmp.pull_event(wait=timeout)
|
||||||
|
if (event['event'] == name) and event_match(event, match):
|
||||||
|
return event
|
||||||
|
self._events.append(event)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
|
index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
|
||||||
|
|
||||||
class QMPTestCase(unittest.TestCase):
|
class QMPTestCase(unittest.TestCase):
|
||||||
|
Loading…
Reference in New Issue
Block a user