qemu-e2k/tests/qemu-iotests/205
Max Reitz ab5d4a30f7 iotests: Fix 205 for concurrent runs
Tests should place their files into the test directory.  This includes
Unix sockets.  205 currently fails to do so, which prevents it from
being run concurrently.

Signed-off-by: Max Reitz <mreitz@redhat.com>
Message-id: 20190618210238.9524-1-mreitz@redhat.com
Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Max Reitz <mreitz@redhat.com>
2019-06-24 16:01:40 +02:00

157 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python
#
# Tests for qmp command nbd-server-remove.
#
# Copyright (c) 2017 Virtuozzo International GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import sys
import iotests
import time
from iotests import qemu_img_create, qemu_io, filter_qemu_io, QemuIoInteractive
nbd_sock = os.path.join(iotests.test_dir, 'nbd_sock')
nbd_uri = 'nbd+unix:///exp?socket=' + nbd_sock
disk = os.path.join(iotests.test_dir, 'disk')
class TestNbdServerRemove(iotests.QMPTestCase):
def setUp(self):
qemu_img_create('-f', iotests.imgfmt, disk, '1M')
self.vm = iotests.VM().add_drive(disk)
self.vm.launch()
address = {
'type': 'unix',
'data': {
'path': nbd_sock
}
}
result = self.vm.qmp('nbd-server-start', addr=address)
self.assert_qmp(result, 'return', {})
result = self.vm.qmp('nbd-server-add', device='drive0', name='exp')
self.assert_qmp(result, 'return', {})
def tearDown(self):
self.vm.shutdown()
os.remove(nbd_sock)
os.remove(disk)
def remove_export(self, name, mode=None):
if mode is None:
return self.vm.qmp('nbd-server-remove', name=name)
else:
return self.vm.qmp('nbd-server-remove', name=name, mode=mode)
def assertExportNotFound(self, name):
result = self.vm.qmp('nbd-server-remove', name=name)
self.assert_qmp(result, 'error/desc', "Export 'exp' is not found")
def assertExistingClients(self, result):
self.assert_qmp(result, 'error/desc', "export 'exp' still in use")
def assertReadOk(self, qemu_io_output):
self.assertEqual(
filter_qemu_io(qemu_io_output).strip(),
'read 512/512 bytes at offset 0\n' +
'512 bytes, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)')
def assertReadFailed(self, qemu_io_output):
self.assertEqual(filter_qemu_io(qemu_io_output).strip(),
'read failed: Input/output error')
def assertConnectFailed(self, qemu_io_output):
self.assertEqual(filter_qemu_io(qemu_io_output).strip(),
"qemu-io: can't open device " + nbd_uri +
": Requested export not available\n"
"server reported: export 'exp' not present")
def do_test_connect_after_remove(self, mode=None):
args = ('-r', '-f', 'raw', '-c', 'read 0 512', nbd_uri)
self.assertReadOk(qemu_io(*args))
result = self.remove_export('exp', mode)
self.assert_qmp(result, 'return', {})
self.assertExportNotFound('exp')
self.assertConnectFailed(qemu_io(*args))
def test_connect_after_remove_default(self):
self.do_test_connect_after_remove()
def test_connect_after_remove_safe(self):
self.do_test_connect_after_remove('safe')
def test_connect_after_remove_force(self):
self.do_test_connect_after_remove('hard')
def do_test_remove_during_connect_safe(self, mode=None):
qio = QemuIoInteractive('-r', '-f', 'raw', nbd_uri)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', mode)
self.assertExistingClients(result)
self.assertReadOk(qio.cmd('read 0 512'))
qio.close()
result = self.remove_export('exp', mode)
self.assert_qmp(result, 'return', {})
self.assertExportNotFound('exp')
def test_remove_during_connect_default(self):
self.do_test_remove_during_connect_safe()
def test_remove_during_connect_safe(self):
self.do_test_remove_during_connect_safe('safe')
def test_remove_during_connect_hard(self):
qio = QemuIoInteractive('-r', '-f', 'raw', nbd_uri)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', 'hard')
self.assert_qmp(result, 'return', {})
self.assertReadFailed(qio.cmd('read 0 512'))
self.assertExportNotFound('exp')
qio.close()
def test_remove_during_connect_safe_hard(self):
qio = QemuIoInteractive('-r', '-f', 'raw', nbd_uri)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', 'safe')
self.assertExistingClients(result)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', 'hard')
self.assert_qmp(result, 'return', {})
self.assertExportNotFound('exp')
self.assertReadFailed(qio.cmd('read 0 512'))
qio.close()
if __name__ == '__main__':
iotests.main(supported_fmts=['generic'])