b0f904950c
There's many tests that need Quorum support in order to run. At the moment each test implements its own check to see if Quorum is enabled. This patch centralizes all those checks in a new function called iotests.supports_quorum(). Signed-off-by: Alberto Garcia <berto@igalia.com> Reviewed-by: Kevin Wolf <kwolf@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
354 lines
15 KiB
Python
354 lines
15 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Test cases for the QMP 'x-blockdev-del' command
|
|
#
|
|
# Copyright (C) 2015 Igalia, S.L.
|
|
# Author: Alberto Garcia <berto@igalia.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import os
|
|
import iotests
|
|
import time
|
|
|
|
base_img = os.path.join(iotests.test_dir, 'base.img')
|
|
new_img = os.path.join(iotests.test_dir, 'new.img')
|
|
|
|
class TestBlockdevDel(iotests.QMPTestCase):
|
|
|
|
def setUp(self):
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M')
|
|
self.vm = iotests.VM()
|
|
self.vm.add_device("virtio-scsi-pci,id=virtio-scsi")
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
os.remove(base_img)
|
|
if os.path.isfile(new_img):
|
|
os.remove(new_img)
|
|
|
|
# Check whether a BlockDriverState exists
|
|
def checkBlockDriverState(self, node, must_exist = True):
|
|
result = self.vm.qmp('query-named-block-nodes')
|
|
nodes = filter(lambda x: x['node-name'] == node, result['return'])
|
|
self.assertLessEqual(len(nodes), 1)
|
|
self.assertEqual(must_exist, len(nodes) == 1)
|
|
|
|
# Add a BlockDriverState without a BlockBackend
|
|
def addBlockDriverState(self, node):
|
|
file_node = '%s_file' % node
|
|
self.checkBlockDriverState(node, False)
|
|
self.checkBlockDriverState(file_node, False)
|
|
opts = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'file': {'driver': 'file',
|
|
'node-name': file_node,
|
|
'filename': base_img}}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(file_node)
|
|
|
|
# Add a BlockDriverState that will be used as overlay for the base_img BDS
|
|
def addBlockDriverStateOverlay(self, node):
|
|
self.checkBlockDriverState(node, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt,
|
|
'-b', base_img, new_img, '1M')
|
|
opts = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'backing': '',
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
|
|
# Delete a BlockDriverState
|
|
def delBlockDriverState(self, node, expect_error = False):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('x-blockdev-del', node_name = node)
|
|
if expect_error:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node, expect_error)
|
|
|
|
# Add a device model
|
|
def addDeviceModel(self, device, backend, driver = 'virtio-blk-pci'):
|
|
result = self.vm.qmp('device_add', id = device,
|
|
driver = driver, drive = backend)
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
# Delete a device model
|
|
def delDeviceModel(self, device, is_virtio_blk = True):
|
|
result = self.vm.qmp('device_del', id = device)
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
result = self.vm.qmp('system_reset')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
if is_virtio_blk:
|
|
device_path = '/machine/peripheral/%s/virtio-backend' % device
|
|
event = self.vm.event_wait(name="DEVICE_DELETED",
|
|
match={'data': {'path': device_path}})
|
|
self.assertNotEqual(event, None)
|
|
|
|
event = self.vm.event_wait(name="DEVICE_DELETED",
|
|
match={'data': {'device': device}})
|
|
self.assertNotEqual(event, None)
|
|
|
|
# Remove a BlockDriverState
|
|
def ejectDrive(self, device, node, expect_error = False,
|
|
destroys_media = True):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('eject', id = device)
|
|
if expect_error:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.checkBlockDriverState(node)
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node, not destroys_media)
|
|
|
|
# Insert a BlockDriverState
|
|
def insertDrive(self, device, node):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('x-blockdev-insert-medium',
|
|
id = device, node_name = node)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
|
|
# Create a snapshot using 'blockdev-snapshot-sync'
|
|
def createSnapshotSync(self, node, overlay):
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay, False)
|
|
opts = {'node-name': node,
|
|
'snapshot-file': new_img,
|
|
'snapshot-node-name': overlay,
|
|
'format': iotests.imgfmt}
|
|
result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
|
|
# Create a snapshot using 'blockdev-snapshot'
|
|
def createSnapshot(self, node, overlay):
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
result = self.vm.qmp('blockdev-snapshot',
|
|
node = node, overlay = overlay)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
|
|
# Create a mirror
|
|
def createMirror(self, node, new_node):
|
|
self.checkBlockDriverState(new_node, False)
|
|
opts = {'device': node,
|
|
'job-id': node,
|
|
'target': new_img,
|
|
'node-name': new_node,
|
|
'sync': 'top',
|
|
'format': iotests.imgfmt}
|
|
result = self.vm.qmp('drive-mirror', conv_keys=False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(new_node)
|
|
|
|
# Complete an existing block job
|
|
def completeBlockJob(self, id, node_before, node_after):
|
|
result = self.vm.qmp('block-job-complete', device=id)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.wait_until_completed(id)
|
|
|
|
# Add a BlkDebug node
|
|
# Note that the purpose of this is to test the x-blockdev-del
|
|
# sanity checks, not to create a usable blkdebug drive
|
|
def addBlkDebug(self, debug, node):
|
|
self.checkBlockDriverState(node, False)
|
|
self.checkBlockDriverState(debug, False)
|
|
image = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
opts = {'driver': 'blkdebug',
|
|
'node-name': debug,
|
|
'image': image}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(debug)
|
|
|
|
# Add a BlkVerify node
|
|
# Note that the purpose of this is to test the x-blockdev-del
|
|
# sanity checks, not to create a usable blkverify drive
|
|
def addBlkVerify(self, blkverify, test, raw):
|
|
self.checkBlockDriverState(test, False)
|
|
self.checkBlockDriverState(raw, False)
|
|
self.checkBlockDriverState(blkverify, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
|
|
node_0 = {'driver': iotests.imgfmt,
|
|
'node-name': test,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
node_1 = {'driver': iotests.imgfmt,
|
|
'node-name': raw,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
opts = {'driver': 'blkverify',
|
|
'node-name': blkverify,
|
|
'test': node_0,
|
|
'raw': node_1}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(test)
|
|
self.checkBlockDriverState(raw)
|
|
self.checkBlockDriverState(blkverify)
|
|
|
|
# Add a Quorum node
|
|
def addQuorum(self, quorum, child0, child1):
|
|
self.checkBlockDriverState(child0, False)
|
|
self.checkBlockDriverState(child1, False)
|
|
self.checkBlockDriverState(quorum, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
|
|
child_0 = {'driver': iotests.imgfmt,
|
|
'node-name': child0,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
child_1 = {'driver': iotests.imgfmt,
|
|
'node-name': child1,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
opts = {'driver': 'quorum',
|
|
'node-name': quorum,
|
|
'vote-threshold': 1,
|
|
'children': [ child_0, child_1 ]}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(child0)
|
|
self.checkBlockDriverState(child1)
|
|
self.checkBlockDriverState(quorum)
|
|
|
|
########################
|
|
# The tests start here #
|
|
########################
|
|
|
|
def testBlockDriverState(self):
|
|
self.addBlockDriverState('node0')
|
|
# You cannot delete a file BDS directly
|
|
self.delBlockDriverState('node0_file', expect_error = True)
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testDeviceModel(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0')
|
|
self.ejectDrive('device0', 'node0', expect_error = True)
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delDeviceModel('device0')
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testAttachMedia(self):
|
|
# This creates a BlockBackend and removes its media
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.ejectDrive('device0', 'node0', destroys_media = False)
|
|
self.delBlockDriverState('node0')
|
|
|
|
# This creates a new BlockDriverState and inserts it into the device
|
|
self.addBlockDriverState('node1')
|
|
self.insertDrive('device0', 'node1')
|
|
# The node can't be removed: the new device has an extra reference
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# The BDS still exists after being ejected, but now it can be removed
|
|
self.ejectDrive('device0', 'node1', destroys_media = False)
|
|
self.delBlockDriverState('node1')
|
|
self.delDeviceModel('device0', False)
|
|
|
|
def testSnapshotSync(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0')
|
|
self.createSnapshotSync('node0', 'overlay0')
|
|
# This fails because node0 is now being used as a backing image
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0', expect_error = True)
|
|
# This succeeds because device0 only has the backend reference
|
|
self.delDeviceModel('device0')
|
|
# FIXME Would still be there if blockdev-snapshot-sync took a ref
|
|
self.checkBlockDriverState('overlay0', False)
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testSnapshot(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.addBlockDriverStateOverlay('overlay0')
|
|
self.createSnapshot('node0', 'overlay0')
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0', expect_error = True)
|
|
self.ejectDrive('device0', 'overlay0', destroys_media = False)
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0')
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testMirror(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.createMirror('node0', 'mirror0')
|
|
# The block job prevents removing the device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('mirror0', expect_error = True)
|
|
self.wait_ready('node0')
|
|
self.completeBlockJob('node0', 'node0', 'mirror0')
|
|
self.assert_no_active_block_jobs()
|
|
# This succeeds because the device now points to mirror0
|
|
self.delBlockDriverState('node0')
|
|
self.delBlockDriverState('mirror0', expect_error = True)
|
|
self.delDeviceModel('device0', False)
|
|
# FIXME mirror0 disappears, drive-mirror doesn't take a reference
|
|
#self.delBlockDriverState('mirror0')
|
|
|
|
def testBlkDebug(self):
|
|
self.addBlkDebug('debug0', 'node0')
|
|
# 'node0' is used by the blkdebug node
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
# But we can remove the blkdebug node directly
|
|
self.delBlockDriverState('debug0')
|
|
self.checkBlockDriverState('node0', False)
|
|
|
|
def testBlkVerify(self):
|
|
self.addBlkVerify('verify0', 'node0', 'node1')
|
|
# We cannot remove the children of a blkverify device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# But we can remove the blkverify node directly
|
|
self.delBlockDriverState('verify0')
|
|
self.checkBlockDriverState('node0', False)
|
|
self.checkBlockDriverState('node1', False)
|
|
|
|
def testQuorum(self):
|
|
if not iotests.supports_quorum():
|
|
return
|
|
|
|
self.addQuorum('quorum0', 'node0', 'node1')
|
|
# We cannot remove the children of a Quorum device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# But we can remove the Quorum node directly
|
|
self.delBlockDriverState('quorum0')
|
|
self.checkBlockDriverState('node0', False)
|
|
self.checkBlockDriverState('node1', False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
iotests.main(supported_fmts=["qcow2"])
|