b66ff2c298
There are many existing qcow2 images that specify a backing file but no format. This has been the source of CVEs in the past, but has become more prominent of a problem now that libvirt has switched to -blockdev. With older -drive, at least the probing was always done by qemu (so the only risk of a changed format between successive boots of a guest was if qemu was upgraded and probed differently). But with newer -blockdev, libvirt must specify a format; if libvirt guesses raw where the image was formatted, this results in data corruption visible to the guest; conversely, if libvirt guesses qcow2 where qemu was using raw, this can result in potential security holes, so modern libvirt instead refuses to use images without explicit backing format. The change in libvirt to reject images without explicit backing format has pointed out that a number of tools have been far too reliant on probing in the past. It's time to set a better example in our own iotests of properly setting this parameter. iotest calls to create, rebase, and convert are all impacted to some degree. It's a bit annoying that we are inconsistent on command line - while all of those accept -o backing_file=...,backing_fmt=..., the shortcuts are different: create and rebase have -b and -F, while convert has -B but no -F. (amend has no shortcuts, but the previous patch just deprecated the use of amend to change backing chains). Signed-off-by: Eric Blake <eblake@redhat.com> Message-Id: <20200706203954.341758-9-eblake@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
360 lines
15 KiB
Python
Executable File
360 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
#
|
|
# Test cases for the QMP 'blockdev-del' command
|
|
#
|
|
# Copyright (C) 2015 Igalia, S.L.
|
|
# Author: Alberto Garcia <berto@igalia.com>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import os
|
|
import iotests
|
|
import time
|
|
|
|
base_img = os.path.join(iotests.test_dir, 'base.img')
|
|
new_img = os.path.join(iotests.test_dir, 'new.img')
|
|
if iotests.qemu_default_machine == 's390-ccw-virtio':
|
|
default_virtio_blk = 'virtio-blk-ccw'
|
|
else:
|
|
default_virtio_blk = 'virtio-blk-pci'
|
|
|
|
class TestBlockdevDel(iotests.QMPTestCase):
|
|
|
|
def setUp(self):
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M')
|
|
self.vm = iotests.VM()
|
|
self.vm.add_device("{},id=virtio-scsi".format(
|
|
iotests.get_virtio_scsi_device()))
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
os.remove(base_img)
|
|
if os.path.isfile(new_img):
|
|
os.remove(new_img)
|
|
|
|
# Check whether a BlockDriverState exists
|
|
def checkBlockDriverState(self, node, must_exist = True):
|
|
result = self.vm.qmp('query-named-block-nodes')
|
|
nodes = [x for x in result['return'] if x['node-name'] == node]
|
|
self.assertLessEqual(len(nodes), 1)
|
|
self.assertEqual(must_exist, len(nodes) == 1)
|
|
|
|
# Add a BlockDriverState without a BlockBackend
|
|
def addBlockDriverState(self, node):
|
|
file_node = '%s_file' % node
|
|
self.checkBlockDriverState(node, False)
|
|
self.checkBlockDriverState(file_node, False)
|
|
opts = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'file': {'driver': 'file',
|
|
'node-name': file_node,
|
|
'filename': base_img}}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(file_node)
|
|
|
|
# Add a BlockDriverState that will be used as overlay for the base_img BDS
|
|
def addBlockDriverStateOverlay(self, node):
|
|
self.checkBlockDriverState(node, False)
|
|
iotests.qemu_img('create', '-u', '-f', iotests.imgfmt,
|
|
'-b', base_img, '-F', iotests.imgfmt, new_img, '1M')
|
|
opts = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'backing': None,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
|
|
# Delete a BlockDriverState
|
|
def delBlockDriverState(self, node, expect_error = False):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('blockdev-del', node_name = node)
|
|
if expect_error:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node, expect_error)
|
|
|
|
# Add a device model
|
|
def addDeviceModel(self, device, backend, driver = default_virtio_blk):
|
|
result = self.vm.qmp('device_add', id = device,
|
|
driver = driver, drive = backend)
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
# Delete a device model
|
|
def delDeviceModel(self, device, is_virtio_blk = True):
|
|
result = self.vm.qmp('device_del', id = device)
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
result = self.vm.qmp('system_reset')
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
if is_virtio_blk:
|
|
device_path = '/machine/peripheral/%s/virtio-backend' % device
|
|
event = self.vm.event_wait(name="DEVICE_DELETED",
|
|
match={'data': {'path': device_path}})
|
|
self.assertNotEqual(event, None)
|
|
|
|
event = self.vm.event_wait(name="DEVICE_DELETED",
|
|
match={'data': {'device': device}})
|
|
self.assertNotEqual(event, None)
|
|
|
|
# Remove a BlockDriverState
|
|
def ejectDrive(self, device, node, expect_error = False,
|
|
destroys_media = True):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('eject', id = device)
|
|
if expect_error:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
self.checkBlockDriverState(node)
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node, not destroys_media)
|
|
|
|
# Insert a BlockDriverState
|
|
def insertDrive(self, device, node):
|
|
self.checkBlockDriverState(node)
|
|
result = self.vm.qmp('blockdev-insert-medium',
|
|
id = device, node_name = node)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
|
|
# Create a snapshot using 'blockdev-snapshot-sync'
|
|
def createSnapshotSync(self, node, overlay):
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay, False)
|
|
opts = {'node-name': node,
|
|
'snapshot-file': new_img,
|
|
'snapshot-node-name': overlay,
|
|
'format': iotests.imgfmt}
|
|
result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
|
|
# Create a snapshot using 'blockdev-snapshot'
|
|
def createSnapshot(self, node, overlay):
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
result = self.vm.qmp('blockdev-snapshot',
|
|
node = node, overlay = overlay)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(overlay)
|
|
|
|
# Create a mirror
|
|
def createMirror(self, node, new_node):
|
|
self.checkBlockDriverState(new_node, False)
|
|
opts = {'device': node,
|
|
'job-id': node,
|
|
'target': new_img,
|
|
'node-name': new_node,
|
|
'sync': 'top',
|
|
'format': iotests.imgfmt}
|
|
result = self.vm.qmp('drive-mirror', conv_keys=False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(new_node)
|
|
|
|
# Complete an existing block job
|
|
def completeBlockJob(self, id, node_before, node_after):
|
|
result = self.vm.qmp('block-job-complete', device=id)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.wait_until_completed(id)
|
|
|
|
# Add a BlkDebug node
|
|
# Note that the purpose of this is to test the blockdev-del
|
|
# sanity checks, not to create a usable blkdebug drive
|
|
def addBlkDebug(self, debug, node):
|
|
self.checkBlockDriverState(node, False)
|
|
self.checkBlockDriverState(debug, False)
|
|
image = {'driver': iotests.imgfmt,
|
|
'node-name': node,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
opts = {'driver': 'blkdebug',
|
|
'node-name': debug,
|
|
'image': image}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(node)
|
|
self.checkBlockDriverState(debug)
|
|
|
|
# Add a BlkVerify node
|
|
# Note that the purpose of this is to test the blockdev-del
|
|
# sanity checks, not to create a usable blkverify drive
|
|
def addBlkVerify(self, blkverify, test, raw):
|
|
self.checkBlockDriverState(test, False)
|
|
self.checkBlockDriverState(raw, False)
|
|
self.checkBlockDriverState(blkverify, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
|
|
node_0 = {'driver': iotests.imgfmt,
|
|
'node-name': test,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
node_1 = {'driver': iotests.imgfmt,
|
|
'node-name': raw,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
opts = {'driver': 'blkverify',
|
|
'node-name': blkverify,
|
|
'test': node_0,
|
|
'raw': node_1}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(test)
|
|
self.checkBlockDriverState(raw)
|
|
self.checkBlockDriverState(blkverify)
|
|
|
|
# Add a Quorum node
|
|
def addQuorum(self, quorum, child0, child1):
|
|
self.checkBlockDriverState(child0, False)
|
|
self.checkBlockDriverState(child1, False)
|
|
self.checkBlockDriverState(quorum, False)
|
|
iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
|
|
child_0 = {'driver': iotests.imgfmt,
|
|
'node-name': child0,
|
|
'file': {'driver': 'file',
|
|
'filename': base_img}}
|
|
child_1 = {'driver': iotests.imgfmt,
|
|
'node-name': child1,
|
|
'file': {'driver': 'file',
|
|
'filename': new_img}}
|
|
opts = {'driver': 'quorum',
|
|
'node-name': quorum,
|
|
'vote-threshold': 1,
|
|
'children': [ child_0, child_1 ]}
|
|
result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
|
|
self.assert_qmp(result, 'return', {})
|
|
self.checkBlockDriverState(child0)
|
|
self.checkBlockDriverState(child1)
|
|
self.checkBlockDriverState(quorum)
|
|
|
|
########################
|
|
# The tests start here #
|
|
########################
|
|
|
|
def testBlockDriverState(self):
|
|
self.addBlockDriverState('node0')
|
|
# You cannot delete a file BDS directly
|
|
self.delBlockDriverState('node0_file', expect_error = True)
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testDeviceModel(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0')
|
|
self.ejectDrive('device0', 'node0', expect_error = True)
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delDeviceModel('device0')
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testAttachMedia(self):
|
|
# This creates a BlockBackend and removes its media
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.ejectDrive('device0', 'node0', destroys_media = False)
|
|
self.delBlockDriverState('node0')
|
|
|
|
# This creates a new BlockDriverState and inserts it into the device
|
|
self.addBlockDriverState('node1')
|
|
self.insertDrive('device0', 'node1')
|
|
# The node can't be removed: the new device has an extra reference
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# The BDS still exists after being ejected, but now it can be removed
|
|
self.ejectDrive('device0', 'node1', destroys_media = False)
|
|
self.delBlockDriverState('node1')
|
|
self.delDeviceModel('device0', False)
|
|
|
|
def testSnapshotSync(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0')
|
|
self.createSnapshotSync('node0', 'overlay0')
|
|
# This fails because node0 is now being used as a backing image
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0', expect_error = True)
|
|
# This succeeds because device0 only has the backend reference
|
|
self.delDeviceModel('device0')
|
|
# FIXME Would still be there if blockdev-snapshot-sync took a ref
|
|
self.checkBlockDriverState('overlay0', False)
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testSnapshot(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.addBlockDriverStateOverlay('overlay0')
|
|
self.createSnapshot('node0', 'overlay0')
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0', expect_error = True)
|
|
self.ejectDrive('device0', 'overlay0', destroys_media = False)
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('overlay0')
|
|
self.delBlockDriverState('node0')
|
|
|
|
def testMirror(self):
|
|
self.addBlockDriverState('node0')
|
|
self.addDeviceModel('device0', 'node0', 'scsi-cd')
|
|
self.createMirror('node0', 'mirror0')
|
|
# The block job prevents removing the device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('mirror0', expect_error = True)
|
|
self.wait_ready('node0')
|
|
self.completeBlockJob('node0', 'node0', 'mirror0')
|
|
self.assert_no_active_block_jobs()
|
|
# This succeeds because the device now points to mirror0
|
|
self.delBlockDriverState('node0')
|
|
self.delBlockDriverState('mirror0', expect_error = True)
|
|
self.delDeviceModel('device0', False)
|
|
# FIXME mirror0 disappears, drive-mirror doesn't take a reference
|
|
#self.delBlockDriverState('mirror0')
|
|
|
|
@iotests.skip_if_unsupported(['blkdebug'])
|
|
def testBlkDebug(self):
|
|
self.addBlkDebug('debug0', 'node0')
|
|
# 'node0' is used by the blkdebug node
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
# But we can remove the blkdebug node directly
|
|
self.delBlockDriverState('debug0')
|
|
self.checkBlockDriverState('node0', False)
|
|
|
|
@iotests.skip_if_unsupported(['blkverify'])
|
|
def testBlkVerify(self):
|
|
self.addBlkVerify('verify0', 'node0', 'node1')
|
|
# We cannot remove the children of a blkverify device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# But we can remove the blkverify node directly
|
|
self.delBlockDriverState('verify0')
|
|
self.checkBlockDriverState('node0', False)
|
|
self.checkBlockDriverState('node1', False)
|
|
|
|
@iotests.skip_if_unsupported(['quorum'])
|
|
def testQuorum(self):
|
|
self.addQuorum('quorum0', 'node0', 'node1')
|
|
# We cannot remove the children of a Quorum device
|
|
self.delBlockDriverState('node0', expect_error = True)
|
|
self.delBlockDriverState('node1', expect_error = True)
|
|
# But we can remove the Quorum node directly
|
|
self.delBlockDriverState('quorum0')
|
|
self.checkBlockDriverState('node0', False)
|
|
self.checkBlockDriverState('node1', False)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
iotests.main(supported_fmts=["qcow2"],
|
|
supported_protocols=["file"])
|