Pull request

-----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEE+ber27ys35W+dsvQfe+BBqr8OQ4FAmGAJA0ACgkQfe+BBqr8
 OQ5grBAAqTcjraHtvU6qptGTgpfQC7B4Zg4WaY7Ygcq61gOxE7x5UekJplKjfBld
 nFpZ2sAXMsFspcFxL+iZDWG12glJ/EvriPz12UqvC/OixSY9jlh53jObJxy2zKUq
 s66lSwl4N+W9ITDXBIFbawLo4SXwcXiclQodyo8Dw8czB4+gU7v7sAeVZ8RRYcJ0
 hJPwtOMuuNpZzxReBN7ox9kiF1EbrFLtgGufc2CW+60pIIVrUS55ccIpL/lNGD3k
 EnfhtrUscMu5o+vLfeQ/LPp4jzhwQjjymWhHF9GJIj3I7FGfv/U5AUMfgARpErcB
 AO2ofGVyvahoNoZ3T44MWyeAV8FzDNnPw5qYIsJ6YIVSvQvzLlKcehWY9ximmqw+
 GRXwvJrKDX/31Jc86EinYA+oUbm6HTeI4BCiMY+CHJJTGRhTAaNDviX9jRyh5naw
 B85RaK34aeDuGuyDgMbWEaTg7ZtvXhhtTXfx6EBeCencWQTQuRrmOjYSsruiifjJ
 ++8HDY/15UfQW8ArUnOX146cX6XknySCm+r3Y+2W4YEdKE6C37GC5UTKbJMKwaRv
 3PK7WVI0V4qUhU523yDYM5PhrCB62BUBjq93tUzPF44BTTnT8O7WGoKB6g7V0LJO
 0c8PZPP18/ij6vKfUdWTfVQKrwYD4/f2lJH/Ht1uhe2l571epAc=
 =YD4J
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/jsnow/tags/python-pull-request' into staging

Pull request

# gpg: Signature made Mon 01 Nov 2021 01:29:49 PM EDT
# gpg:                using RSA key F9B7ABDBBCACDF95BE76CBD07DEF8106AAFC390E
# gpg: Good signature from "John Snow (John Huston) <jsnow@redhat.com>" [full]

* remotes/jsnow/tags/python-pull-request: (22 commits)
  python, iotests: replace qmp with aqmp
  python/aqmp: Create sync QMP wrapper for iotests
  iotests/300: avoid abnormal shutdown race condition
  iotests: Conditionally silence certain AQMP errors
  iotests: Accommodate async QMP Exception classes
  python/aqmp: Remove scary message
  python/machine: Handle QMP errors on close more meticulously
  python/machine: remove has_quit argument
  python: Add iotest linters to test suite
  iotests/linters: Add workaround for mypy bug #9852
  iotests/linters: Add entry point for linting via Python CI
  iotests: split linters.py out from 297
  iotests/297: split test into sub-cases
  iotests/297: update tool availability checks
  iotests/297: Change run_linter() to raise an exception on failure
  iotests/297: refactor run_[mypy|pylint] as generic execution shim
  iotests/297: Split run_linters apart into run_pylint and run_mypy
  iotests/297: Don't rely on distro-specific linter binaries
  iotests/297: Create main() function
  iotests/297: Add get_files() function
  ...

Signed-off-by: Richard Henderson <richard.henderson@linaro.org>
This commit is contained in:
Richard Henderson 2021-11-01 14:34:15 -04:00
commit 4b0bf11c5a
16 changed files with 430 additions and 125 deletions

View File

@ -22,7 +22,6 @@ managing QMP events.
# the COPYING file in the top-level directory.
import logging
import warnings
from .error import AQMPError
from .events import EventListener
@ -31,17 +30,6 @@ from .protocol import ConnectError, Runstate, StateError
from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
_WMSG = """
The Asynchronous QMP library is currently in development and its API
should be considered highly fluid and subject to change. It should
not be used by any other scripts checked into the QEMU tree.
Proceed with caution!
"""
warnings.warn(_WMSG, FutureWarning)
# Suppress logging unless an application engages it.
logging.getLogger('qemu.aqmp').addHandler(logging.NullHandler())

138
python/qemu/aqmp/legacy.py Normal file
View File

@ -0,0 +1,138 @@
"""
Sync QMP Wrapper
This class pretends to be qemu.qmp.QEMUMonitorProtocol.
"""
import asyncio
from typing import (
Awaitable,
List,
Optional,
TypeVar,
Union,
)
import qemu.qmp
from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
from .qmp_client import QMPClient
# pylint: disable=missing-docstring
class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
def __init__(self, address: SocketAddrT,
server: bool = False,
nickname: Optional[str] = None):
# pylint: disable=super-init-not-called
self._aqmp = QMPClient(nickname)
self._aloop = asyncio.get_event_loop()
self._address = address
self._timeout: Optional[float] = None
_T = TypeVar('_T')
def _sync(
self, future: Awaitable[_T], timeout: Optional[float] = None
) -> _T:
return self._aloop.run_until_complete(
asyncio.wait_for(future, timeout=timeout)
)
def _get_greeting(self) -> Optional[QMPMessage]:
if self._aqmp.greeting is not None:
# pylint: disable=protected-access
return self._aqmp.greeting._asdict()
return None
# __enter__ and __exit__ need no changes
# parse_address needs no changes
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
self._aqmp.await_greeting = negotiate
self._aqmp.negotiate = negotiate
self._sync(
self._aqmp.connect(self._address)
)
return self._get_greeting()
def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
self._aqmp.await_greeting = True
self._aqmp.negotiate = True
self._sync(
self._aqmp.accept(self._address),
timeout
)
ret = self._get_greeting()
assert ret is not None
return ret
def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
return dict(
self._sync(
# pylint: disable=protected-access
# _raw() isn't a public API, because turning off
# automatic ID assignment is discouraged. For
# compatibility with iotests *only*, do it anyway.
self._aqmp._raw(qmp_cmd, assign_id=False),
self._timeout
)
)
# Default impl of cmd() delegates to cmd_obj
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
return self._sync(
self._aqmp.execute(cmd, kwds),
self._timeout
)
def pull_event(self,
wait: Union[bool, float] = False) -> Optional[QMPMessage]:
if not wait:
# wait is False/0: "do not wait, do not except."
if self._aqmp.events.empty():
return None
# If wait is 'True', wait forever. If wait is False/0, the events
# queue must not be empty; but it still needs some real amount
# of time to complete.
timeout = None
if wait and isinstance(wait, float):
timeout = wait
return dict(
self._sync(
self._aqmp.events.get(),
timeout
)
)
def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
events = [dict(x) for x in self._aqmp.events.clear()]
if events:
return events
event = self.pull_event(wait)
return [event] if event is not None else []
def clear_events(self) -> None:
self._aqmp.events.clear()
def close(self) -> None:
self._sync(
self._aqmp.disconnect()
)
def settimeout(self, timeout: Optional[float]) -> None:
self._timeout = timeout
def send_fd_scm(self, fd: int) -> None:
self._aqmp.send_fd_scm(fd)

View File

@ -41,7 +41,6 @@ from typing import (
)
from qemu.qmp import ( # pylint: disable=import-error
QEMUMonitorProtocol,
QMPMessage,
QMPReturnValue,
SocketAddrT,
@ -50,6 +49,12 @@ from qemu.qmp import ( # pylint: disable=import-error
from . import console_socket
if os.environ.get('QEMU_PYTHON_LEGACY_QMP'):
from qemu.qmp import QEMUMonitorProtocol
else:
from qemu.aqmp.legacy import QEMUMonitorProtocol
LOG = logging.getLogger(__name__)
@ -170,6 +175,7 @@ class QEMUMachine:
self._console_socket: Optional[socket.socket] = None
self._remove_files: List[str] = []
self._user_killed = False
self._quit_issued = False
def __enter__(self: _T) -> _T:
return self
@ -341,9 +347,15 @@ class QEMUMachine:
# Comprehensive reset for the failed launch case:
self._early_cleanup()
if self._qmp_connection:
self._qmp.close()
self._qmp_connection = None
try:
self._close_qmp_connection()
except Exception as err: # pylint: disable=broad-except
LOG.warning(
"Exception closing QMP connection: %s",
str(err) if str(err) else type(err).__name__
)
finally:
assert self._qmp_connection is None
self._close_qemu_log_file()
@ -368,6 +380,7 @@ class QEMUMachine:
command = ''
LOG.warning(msg, -int(exitcode), command)
self._quit_issued = False
self._user_killed = False
self._launched = False
@ -418,6 +431,31 @@ class QEMUMachine:
close_fds=False)
self._post_launch()
def _close_qmp_connection(self) -> None:
"""
Close the underlying QMP connection, if any.
Dutifully report errors that occurred while closing, but assume
that any error encountered indicates an abnormal termination
process and not a failure to close.
"""
if self._qmp_connection is None:
return
try:
self._qmp.close()
except EOFError:
# EOF can occur as an Exception here when using the Async
# QMP backend. It indicates that the server closed the
# stream. If we successfully issued 'quit' at any point,
# then this was expected. If the remote went away without
# our permission, it's worth reporting that as an abnormal
# shutdown case.
if not (self._user_killed or self._quit_issued):
raise
finally:
self._qmp_connection = None
def _early_cleanup(self) -> None:
"""
Perform any cleanup that needs to happen before the VM exits.
@ -443,15 +481,13 @@ class QEMUMachine:
self._subp.kill()
self._subp.wait(timeout=60)
def _soft_shutdown(self, timeout: Optional[int],
has_quit: bool = False) -> None:
def _soft_shutdown(self, timeout: Optional[int]) -> None:
"""
Perform early cleanup, attempt to gracefully shut down the VM, and wait
for it to terminate.
:param timeout: Timeout in seconds for graceful shutdown.
A value of None is an infinite wait.
:param has_quit: When True, don't attempt to issue 'quit' QMP command
:raise ConnectionReset: On QMP communication errors
:raise subprocess.TimeoutExpired: When timeout is exceeded waiting for
@ -460,21 +496,24 @@ class QEMUMachine:
self._early_cleanup()
if self._qmp_connection:
if not has_quit:
# Might raise ConnectionReset
self._qmp.cmd('quit')
try:
if not self._quit_issued:
# May raise ExecInterruptedError or StateError if the
# connection dies or has *already* died.
self.qmp('quit')
finally:
# Regardless, we want to quiesce the connection.
self._close_qmp_connection()
# May raise subprocess.TimeoutExpired
self._subp.wait(timeout=timeout)
def _do_shutdown(self, timeout: Optional[int],
has_quit: bool = False) -> None:
def _do_shutdown(self, timeout: Optional[int]) -> None:
"""
Attempt to shutdown the VM gracefully; fallback to a hard shutdown.
:param timeout: Timeout in seconds for graceful shutdown.
A value of None is an infinite wait.
:param has_quit: When True, don't attempt to issue 'quit' QMP command
:raise AbnormalShutdown: When the VM could not be shut down gracefully.
The inner exception will likely be ConnectionReset or
@ -482,13 +521,13 @@ class QEMUMachine:
may result in its own exceptions, likely subprocess.TimeoutExpired.
"""
try:
self._soft_shutdown(timeout, has_quit)
self._soft_shutdown(timeout)
except Exception as exc:
self._hard_shutdown()
raise AbnormalShutdown("Could not perform graceful shutdown") \
from exc
def shutdown(self, has_quit: bool = False,
def shutdown(self,
hard: bool = False,
timeout: Optional[int] = 30) -> None:
"""
@ -498,7 +537,6 @@ class QEMUMachine:
If the VM has not yet been launched, or shutdown(), wait(), or kill()
have already been called, this method does nothing.
:param has_quit: When true, do not attempt to issue 'quit' QMP command.
:param hard: When true, do not attempt graceful shutdown, and
suppress the SIGKILL warning log message.
:param timeout: Optional timeout in seconds for graceful shutdown.
@ -512,7 +550,7 @@ class QEMUMachine:
self._user_killed = True
self._hard_shutdown()
else:
self._do_shutdown(timeout, has_quit)
self._do_shutdown(timeout)
finally:
self._post_shutdown()
@ -529,7 +567,8 @@ class QEMUMachine:
:param timeout: Optional timeout in seconds. Default 30 seconds.
A value of `None` is an infinite wait.
"""
self.shutdown(has_quit=True, timeout=timeout)
self._quit_issued = True
self.shutdown(timeout=timeout)
def set_qmp_monitor(self, enabled: bool = True) -> None:
"""
@ -574,7 +613,10 @@ class QEMUMachine:
conv_keys = True
qmp_args = self._qmp_args(conv_keys, args)
return self._qmp.cmd(cmd, args=qmp_args)
ret = self._qmp.cmd(cmd, args=qmp_args)
if cmd == 'quit' and 'error' not in ret and 'return' in ret:
self._quit_issued = True
return ret
def command(self, cmd: str,
conv_keys: bool = True,
@ -585,7 +627,10 @@ class QEMUMachine:
On failure raise an exception.
"""
qmp_args = self._qmp_args(conv_keys, args)
return self._qmp.command(cmd, **qmp_args)
ret = self._qmp.command(cmd, **qmp_args)
if cmd == 'quit':
self._quit_issued = True
return ret
def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]:
"""

4
python/tests/iotests-mypy.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh -e
cd ../tests/qemu-iotests/
python3 -m linters --mypy

4
python/tests/iotests-pylint.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh -e
cd ../tests/qemu-iotests/
python3 -m linters --pylint

View File

@ -28,6 +28,7 @@ import json
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.machine import QEMUMachine
from qemu.qmp import QMPConnectError
from qemu.aqmp import ConnectError
def bench_block_job(cmd, cmd_args, qemu_args):
@ -49,7 +50,7 @@ def bench_block_job(cmd, cmd_args, qemu_args):
vm.launch()
except OSError as e:
return {'error': 'popen failed: ' + str(e)}
except (QMPConnectError, socket.timeout):
except (QMPConnectError, ConnectError, socket.timeout):
return {'error': 'qemu failed: ' + str(vm.get_log())}
try:

View File

@ -92,10 +92,9 @@ class TestSingleDrive(ImageCommitTestCase):
self.vm.add_device('virtio-scsi')
self.vm.add_device("scsi-hd,id=scsi0,drive=drive0")
self.vm.launch()
self.has_quit = False
def tearDown(self):
self.vm.shutdown(has_quit=self.has_quit)
self.vm.shutdown()
os.remove(test_img)
os.remove(mid_img)
os.remove(backing_img)
@ -127,8 +126,6 @@ class TestSingleDrive(ImageCommitTestCase):
result = self.vm.qmp('quit')
self.assert_qmp(result, 'return', {})
self.has_quit = True
# Same as above, but this time we add the filter after starting the job
@iotests.skip_if_unsupported(['throttle'])
def test_commit_plus_filter_and_quit(self):
@ -147,8 +144,6 @@ class TestSingleDrive(ImageCommitTestCase):
result = self.vm.qmp('quit')
self.assert_qmp(result, 'return', {})
self.has_quit = True
def test_device_not_found(self):
result = self.vm.qmp('block-commit', device='nonexistent', top='%s' % mid_img)
self.assert_qmp(result, 'error/class', 'DeviceNotFound')

View File

@ -187,4 +187,4 @@ with iotests.VM() as vm, \
log(vm.qmp('quit'))
with iotests.Timeout(5, 'Timeout waiting for VM to quit'):
vm.shutdown(has_quit=True)
vm.shutdown()

View File

@ -123,4 +123,4 @@ with iotests.FilePath('src.qcow2') as src_path, \
vm.qmp_log('block-job-cancel', device='job0')
vm.qmp_log('quit')
vm.shutdown(has_quit=True)
vm.shutdown()

View File

@ -17,89 +17,66 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import shutil
import subprocess
import sys
from typing import List
import iotests
import linters
# TODO: Empty this list!
SKIP_FILES = (
'030', '040', '041', '044', '045', '055', '056', '057', '065', '093',
'096', '118', '124', '132', '136', '139', '147', '148', '149',
'151', '152', '155', '163', '165', '194', '196', '202',
'203', '205', '206', '207', '208', '210', '211', '212', '213', '216',
'218', '219', '224', '228', '234', '235', '236', '237', '238',
'240', '242', '245', '246', '248', '255', '256', '257', '258', '260',
'262', '264', '266', '274', '277', '280', '281', '295', '296', '298',
'299', '302', '303', '304', '307',
'nbd-fault-injector.py', 'qcow2.py', 'qcow2_format.py', 'qed.py'
)
# Looking for something?
#
# List of files to exclude from linting: linters.py
# mypy configuration: mypy.ini
# pylint configuration: pylintrc
def is_python_file(filename):
if not os.path.isfile(filename):
def check_linter(linter: str) -> bool:
try:
linters.run_linter(linter, ['--version'], suppress_output=True)
except subprocess.CalledProcessError:
iotests.case_notrun(f"'{linter}' not found")
return False
if filename.endswith('.py'):
return True
with open(filename, encoding='utf-8') as f:
try:
first_line = f.readline()
return re.match('^#!.*python', first_line) is not None
except UnicodeDecodeError: # Ignore binary files
return False
return True
def run_linters():
named_tests = [f'tests/{entry}' for entry in os.listdir('tests')]
check_tests = set(os.listdir('.') + named_tests) - set(SKIP_FILES)
files = [filename for filename in check_tests if is_python_file(filename)]
def test_pylint(files: List[str]) -> None:
print('=== pylint ===')
sys.stdout.flush()
if not check_linter('pylint'):
return
linters.run_linter('pylint', files)
def test_mypy(files: List[str]) -> None:
print('=== mypy ===')
sys.stdout.flush()
if not check_linter('mypy'):
return
env = os.environ.copy()
env['MYPYPATH'] = env['PYTHONPATH']
linters.run_linter('mypy', files, env=env, suppress_output=True)
def main() -> None:
files = linters.get_test_files()
iotests.logger.debug('Files to be checked:')
iotests.logger.debug(', '.join(sorted(files)))
print('=== pylint ===')
sys.stdout.flush()
# Todo notes are fine, but fixme's or xxx's should probably just be
# fixed (in tests, at least)
env = os.environ.copy()
subprocess.run(('pylint-3', '--score=n', '--notes=FIXME,XXX', *files),
env=env, check=False)
print('=== mypy ===')
sys.stdout.flush()
env['MYPYPATH'] = env['PYTHONPATH']
p = subprocess.run(('mypy',
'--warn-unused-configs',
'--disallow-subclassing-any',
'--disallow-any-generics',
'--disallow-incomplete-defs',
'--disallow-untyped-decorators',
'--no-implicit-optional',
'--warn-redundant-casts',
'--warn-unused-ignores',
'--no-implicit-reexport',
'--namespace-packages',
'--scripts-are-modules',
*files),
env=env,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True)
if p.returncode != 0:
print(p.stdout)
for test in (test_pylint, test_mypy):
try:
test(files)
except subprocess.CalledProcessError as exc:
# Linter failure will be caught by diffing the IO.
if exc.output:
print(exc.output)
for linter in ('pylint-3', 'mypy'):
if shutil.which(linter) is None:
iotests.notrun(f'{linter} not found')
iotests.script_main(run_linters)
iotests.script_main(main)

View File

@ -24,8 +24,6 @@ import random
import re
from typing import Dict, List, Optional
from qemu.machine import machine
import iotests
@ -461,12 +459,11 @@ class TestBlockBitmapMappingErrors(TestDirtyBitmapMigration):
f"'{self.src_node_name}': Name is longer than 255 bytes",
log)
# Expect abnormal shutdown of the destination VM because of
# the failed migration
try:
self.vm_b.shutdown()
except machine.AbnormalShutdown:
pass
# Destination VM will terminate w/ error of its own accord
# due to the failed migration.
self.vm_b.wait()
rc = self.vm_b.exitcode()
assert rc is not None and rc > 0
def test_aliased_bitmap_name_too_long(self) -> None:
# Longer than the maximum for bitmap names

View File

@ -30,7 +30,7 @@ import struct
import subprocess
import sys
import time
from typing import (Any, Callable, Dict, Iterable,
from typing import (Any, Callable, Dict, Iterable, Iterator,
List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
import unittest
@ -114,6 +114,24 @@ luks_default_key_secret_opt = 'key-secret=keysec0'
sample_img_dir = os.environ['SAMPLE_IMG_DIR']
@contextmanager
def change_log_level(
logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
"""
Utility function for temporarily changing the log level of a logger.
This can be used to silence errors that are expected or uninteresting.
"""
_logger = logging.getLogger(logger_name)
current_level = _logger.level
_logger.setLevel(level)
try:
yield
finally:
_logger.setLevel(current_level)
def unarchive_sample_image(sample, fname):
sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:

View File

@ -0,0 +1,105 @@
# Copyright (C) 2020 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import re
import subprocess
import sys
from typing import List, Mapping, Optional
# TODO: Empty this list!
SKIP_FILES = (
'030', '040', '041', '044', '045', '055', '056', '057', '065', '093',
'096', '118', '124', '132', '136', '139', '147', '148', '149',
'151', '152', '155', '163', '165', '194', '196', '202',
'203', '205', '206', '207', '208', '210', '211', '212', '213', '216',
'218', '219', '224', '228', '234', '235', '236', '237', '238',
'240', '242', '245', '246', '248', '255', '256', '257', '258', '260',
'262', '264', '266', '274', '277', '280', '281', '295', '296', '298',
'299', '302', '303', '304', '307',
'nbd-fault-injector.py', 'qcow2.py', 'qcow2_format.py', 'qed.py'
)
def is_python_file(filename):
if not os.path.isfile(filename):
return False
if filename.endswith('.py'):
return True
with open(filename, encoding='utf-8') as f:
try:
first_line = f.readline()
return re.match('^#!.*python', first_line) is not None
except UnicodeDecodeError: # Ignore binary files
return False
def get_test_files() -> List[str]:
named_tests = [f'tests/{entry}' for entry in os.listdir('tests')]
check_tests = set(os.listdir('.') + named_tests) - set(SKIP_FILES)
return list(filter(is_python_file, check_tests))
def run_linter(
tool: str,
args: List[str],
env: Optional[Mapping[str, str]] = None,
suppress_output: bool = False,
) -> None:
"""
Run a python-based linting tool.
:param suppress_output: If True, suppress all stdout/stderr output.
:raise CalledProcessError: If the linter process exits with failure.
"""
subprocess.run(
('python3', '-m', tool, *args),
env=env,
check=True,
stdout=subprocess.PIPE if suppress_output else None,
stderr=subprocess.STDOUT if suppress_output else None,
universal_newlines=True,
)
def main() -> None:
"""
Used by the Python CI system as an entry point to run these linters.
"""
def show_usage() -> None:
print(f"Usage: {sys.argv[0]} < --mypy | --pylint >", file=sys.stderr)
sys.exit(1)
if len(sys.argv) != 2:
show_usage()
files = get_test_files()
if sys.argv[1] == '--pylint':
run_linter('pylint', files)
elif sys.argv[1] == '--mypy':
# mypy bug #9852; disable incremental checking as a workaround.
args = ['--no-incremental'] + files
run_linter('mypy', args)
else:
print(f"Unrecognized argument: '{sys.argv[1]}'", file=sys.stderr)
show_usage()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,12 @@
[mypy]
disallow_any_generics = True
disallow_incomplete_defs = True
disallow_subclassing_any = True
disallow_untyped_decorators = True
implicit_reexport = False
namespace_packages = True
no_implicit_optional = True
scripts_are_modules = True
warn_redundant_casts = True
warn_unused_configs = True
warn_unused_ignores = True

View File

@ -31,6 +31,22 @@ disable=invalid-name,
too-many-statements,
consider-using-f-string,
[REPORTS]
# Activate the evaluation score.
score=no
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
# TODO notes are fine, but FIXMEs or XXXs should probably just be
# fixed (in tests, at least).
notes=FIXME,
XXX,
[FORMAT]
# Maximum number of characters on a single line.

View File

@ -21,11 +21,12 @@
import os
from qemu import qmp
from qemu.aqmp import ConnectError
from qemu.machine import machine
from qemu.qmp import QMPConnectError
import iotests
from iotests import qemu_img
from iotests import change_log_level, qemu_img
image_size = 1 * 1024 * 1024
@ -99,10 +100,14 @@ class TestMirrorTopPerms(iotests.QMPTestCase):
self.vm_b.add_blockdev(f'file,node-name=drive0,filename={source}')
self.vm_b.add_device('virtio-blk,drive=drive0,share-rw=on')
try:
self.vm_b.launch()
print('ERROR: VM B launched successfully, this should not have '
'happened')
except qmp.QMPConnectError:
# Silence AQMP errors temporarily.
# TODO: Remove this and just allow the errors to be logged when
# AQMP fully replaces QMP.
with change_log_level('qemu.aqmp'):
self.vm_b.launch()
print('ERROR: VM B launched successfully, '
'this should not have happened')
except (QMPConnectError, ConnectError):
assert 'Is another process using the image' in self.vm_b.get_log()
result = self.vm.qmp('block-job-cancel',