iotests: Add @error to wait_until_completed

Callers can use this new parameter to expect failure during the
completion process.

Signed-off-by: Max Reitz <mreitz@redhat.com>
Reviewed-by: John Snow <jsnow@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Message-id: 20191108123455.39445-5-mreitz@redhat.com
Signed-off-by: Max Reitz <mreitz@redhat.com>
This commit is contained in:
Max Reitz 2019-11-08 13:34:54 +01:00
parent 69c6449ff1
commit 216656f5f9
1 changed files with 12 additions and 6 deletions

View File

@ -811,15 +811,20 @@ class QMPTestCase(unittest.TestCase):
self.assert_no_active_block_jobs()
return result
def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0):
def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0,
error=None):
'''Wait for a block job to finish, returning the event'''
while True:
for event in self.vm.get_qmp_events(wait=wait):
if event['event'] == 'BLOCK_JOB_COMPLETED':
self.assert_qmp(event, 'data/device', drive)
self.assert_qmp_absent(event, 'data/error')
if check_offset:
self.assert_qmp(event, 'data/offset', event['data']['len'])
if error is None:
self.assert_qmp_absent(event, 'data/error')
if check_offset:
self.assert_qmp(event, 'data/offset',
event['data']['len'])
else:
self.assert_qmp(event, 'data/error', error)
self.assert_no_active_block_jobs()
return event
elif event['event'] == 'JOB_STATUS_CHANGE':
@ -837,7 +842,8 @@ class QMPTestCase(unittest.TestCase):
self.assert_qmp(event, 'data/type', 'mirror')
self.assert_qmp(event, 'data/offset', event['data']['len'])
def complete_and_wait(self, drive='drive0', wait_ready=True):
def complete_and_wait(self, drive='drive0', wait_ready=True,
completion_error=None):
'''Complete a block job and wait for it to finish'''
if wait_ready:
self.wait_ready(drive=drive)
@ -845,7 +851,7 @@ class QMPTestCase(unittest.TestCase):
result = self.vm.qmp('block-job-complete', device=drive)
self.assert_qmp(result, 'return', {})
event = self.wait_until_completed(drive=drive)
event = self.wait_until_completed(drive=drive, error=completion_error)
self.assert_qmp(event, 'data/type', 'mirror')
def pause_wait(self, job_id='job0'):