John Snow 6dede6a493 iotests: rebase qemu_io() on top of qemu_tool()
Rework qemu_io() to be analogous to qemu_img(); a function that requires
a return code of zero by default unless disabled explicitly.

Tests that use qemu_io():
030 040 041 044 055 056 093 124 129 132 136 148 149 151 152 163 165 205
209 219 236 245 248 254 255 257 260 264 280 298 300 302 304
image-fleecing migrate-bitmaps-postcopy-test migrate-bitmaps-test
migrate-during-backup migration-permissions

Test that use qemu_io_log():
242 245 255 274 303 307 nbd-reconnect-on-open

Copy-pastables for testing/verification:

./check -qcow2 030 040 041 044 055 056 124 129 132 151 152 163 165 209 \
               219 236 242 245 248 254 255 257 260 264 274 \
               280 298 300 302 303 304 307 image-fleecing \
               migrate-bitmaps-postcopy-test migrate-bitmaps-test \
               migrate-during-backup nbd-reconnect-on-open
./check -raw 093 136 148 migration-permissions
./check -nbd 205

# ./configure configure --disable-gnutls --enable-gcrypt
# this ALSO requires passwordless sudo.
./check -luks 149

# Just the tests that were edited in this commit:
./check -qcow2 030 040 242 245
./check -raw migration-permissions
./check -nbd 205
./check -luks 149

Signed-off-by: John Snow <jsnow@redhat.com>
Message-Id: <20220418211504.943969-8-jsnow@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Hanna Reitz <hreitz@redhat.com>
2022-04-25 14:30:27 +02:00

159 lines
5.2 KiB
Python
Executable File

#!/usr/bin/env python3
# group: rw quick
#
# Tests for qmp command nbd-server-remove.
#
# Copyright (c) 2017 Virtuozzo International GmbH
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import sys
import iotests
import time
from iotests import qemu_img_create, qemu_io, filter_qemu_io, QemuIoInteractive
nbd_sock = os.path.join(iotests.sock_dir, 'nbd_sock')
nbd_uri = 'nbd+unix:///exp?socket=' + nbd_sock
disk = os.path.join(iotests.test_dir, 'disk')
class TestNbdServerRemove(iotests.QMPTestCase):
def setUp(self):
qemu_img_create('-f', iotests.imgfmt, disk, '1M')
self.vm = iotests.VM().add_drive(disk)
self.vm.launch()
address = {
'type': 'unix',
'data': {
'path': nbd_sock
}
}
result = self.vm.qmp('nbd-server-start', addr=address)
self.assert_qmp(result, 'return', {})
result = self.vm.qmp('nbd-server-add', device='drive0', name='exp')
self.assert_qmp(result, 'return', {})
def tearDown(self):
self.vm.shutdown()
os.remove(nbd_sock)
os.remove(disk)
def remove_export(self, name, mode=None):
if mode is None:
return self.vm.qmp('nbd-server-remove', name=name)
else:
return self.vm.qmp('nbd-server-remove', name=name, mode=mode)
def assertExportNotFound(self, name):
result = self.vm.qmp('nbd-server-remove', name=name)
self.assert_qmp(result, 'error/desc', "Export 'exp' is not found")
def assertExistingClients(self, result):
self.assert_qmp(result, 'error/desc', "export 'exp' still in use")
def assertReadOk(self, qemu_io_output):
self.assertEqual(
filter_qemu_io(qemu_io_output).strip(),
'read 512/512 bytes at offset 0\n' +
'512 bytes, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)')
def assertReadFailed(self, qemu_io_output):
self.assertEqual(filter_qemu_io(qemu_io_output).strip(),
'read failed: Input/output error')
def assertConnectFailed(self, qemu_io_output):
self.assertEqual(filter_qemu_io(qemu_io_output).strip(),
"qemu-io: can't open device " + nbd_uri +
": Requested export not available\n"
"server reported: export 'exp' not present")
def do_test_connect_after_remove(self, mode=None):
args = ('-r', '-f', 'raw', '-c', 'read 0 512', nbd_uri)
self.assertReadOk(qemu_io(*args).stdout)
result = self.remove_export('exp', mode)
self.assert_qmp(result, 'return', {})
self.assertExportNotFound('exp')
self.assertConnectFailed(qemu_io(*args, check=False).stdout)
def test_connect_after_remove_default(self):
self.do_test_connect_after_remove()
def test_connect_after_remove_safe(self):
self.do_test_connect_after_remove('safe')
def test_connect_after_remove_force(self):
self.do_test_connect_after_remove('hard')
def do_test_remove_during_connect_safe(self, mode=None):
qio = QemuIoInteractive('-r', '-f', 'raw', nbd_uri)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', mode)
self.assertExistingClients(result)
self.assertReadOk(qio.cmd('read 0 512'))
qio.close()
result = self.remove_export('exp', mode)
self.assert_qmp(result, 'return', {})
self.assertExportNotFound('exp')
def test_remove_during_connect_default(self):
self.do_test_remove_during_connect_safe()
def test_remove_during_connect_safe(self):
self.do_test_remove_during_connect_safe('safe')
def test_remove_during_connect_hard(self):
qio = QemuIoInteractive('-r', '-f', 'raw', nbd_uri)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', 'hard')
self.assert_qmp(result, 'return', {})
self.assertReadFailed(qio.cmd('read 0 512'))
self.assertExportNotFound('exp')
qio.close()
def test_remove_during_connect_safe_hard(self):
qio = QemuIoInteractive('-r', '-f', 'raw', nbd_uri)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', 'safe')
self.assertExistingClients(result)
self.assertReadOk(qio.cmd('read 0 512'))
result = self.remove_export('exp', 'hard')
self.assert_qmp(result, 'return', {})
self.assertExportNotFound('exp')
self.assertReadFailed(qio.cmd('read 0 512'))
qio.close()
if __name__ == '__main__':
iotests.main(supported_fmts=['raw'],
supported_protocols=['nbd'])