c3d479aab9
Due to a typo, in this case the SOCK_DIR was not being created. Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Tested-by: Emanuele Giuseppe Esposito <eesposit@redhat.com> Message-Id: <20210323181928.311862-6-pbonzini@redhat.com> Message-Id: <20210503110110.476887-6-pbonzini@redhat.com> Signed-off-by: Max Reitz <mreitz@redhat.com>
299 lines
10 KiB
Python
299 lines
10 KiB
Python
# TestEnv class to manage test environment variables.
|
|
#
|
|
# Copyright (c) 2020-2021 Virtuozzo International GmbH
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
import shutil
|
|
import collections
|
|
import random
|
|
import subprocess
|
|
import glob
|
|
from typing import List, Dict, Any, Optional, ContextManager
|
|
|
|
|
|
def isxfile(path: str) -> bool:
|
|
return os.path.isfile(path) and os.access(path, os.X_OK)
|
|
|
|
|
|
def get_default_machine(qemu_prog: str) -> str:
|
|
outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True,
|
|
universal_newlines=True,
|
|
stdout=subprocess.PIPE).stdout
|
|
|
|
machines = outp.split('\n')
|
|
try:
|
|
default_machine = next(m for m in machines if m.endswith(' (default)'))
|
|
except StopIteration:
|
|
return ''
|
|
default_machine = default_machine.split(' ', 1)[0]
|
|
|
|
alias_suf = ' (alias of {})'.format(default_machine)
|
|
alias = next((m for m in machines if m.endswith(alias_suf)), None)
|
|
if alias is not None:
|
|
default_machine = alias.split(' ', 1)[0]
|
|
|
|
return default_machine
|
|
|
|
|
|
class TestEnv(ContextManager['TestEnv']):
|
|
"""
|
|
Manage system environment for running tests
|
|
|
|
The following variables are supported/provided. They are represented by
|
|
lower-cased TestEnv attributes.
|
|
"""
|
|
|
|
# We store environment variables as instance attributes, and there are a
|
|
# lot of them. Silence pylint:
|
|
# pylint: disable=too-many-instance-attributes
|
|
|
|
env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR',
|
|
'OUTPUT_DIR', 'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG',
|
|
'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG',
|
|
'SOCKET_SCM_HELPER', 'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS',
|
|
'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT',
|
|
'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO',
|
|
'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU',
|
|
'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX',
|
|
'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_']
|
|
|
|
def prepare_subprocess(self, args: List[str]) -> Dict[str, str]:
|
|
if self.debug:
|
|
args.append('-d')
|
|
|
|
with open(args[0], encoding="utf-8") as f:
|
|
try:
|
|
if f.readline().rstrip() == '#!/usr/bin/env python3':
|
|
args.insert(0, self.python)
|
|
except UnicodeDecodeError: # binary test? for future.
|
|
pass
|
|
|
|
os_env = os.environ.copy()
|
|
os_env.update(self.get_env())
|
|
return os_env
|
|
|
|
def get_env(self) -> Dict[str, str]:
|
|
env = {}
|
|
for v in self.env_variables:
|
|
val = getattr(self, v.lower(), None)
|
|
if val is not None:
|
|
env[v] = val
|
|
|
|
return env
|
|
|
|
def init_directories(self) -> None:
|
|
"""Init directory variables:
|
|
PYTHONPATH
|
|
TEST_DIR
|
|
SOCK_DIR
|
|
SAMPLE_IMG_DIR
|
|
OUTPUT_DIR
|
|
"""
|
|
self.pythonpath = os.getenv('PYTHONPATH')
|
|
if self.pythonpath:
|
|
self.pythonpath = self.source_iotests + os.pathsep + \
|
|
self.pythonpath
|
|
else:
|
|
self.pythonpath = self.source_iotests
|
|
|
|
self.test_dir = os.getenv('TEST_DIR',
|
|
os.path.join(os.getcwd(), 'scratch'))
|
|
Path(self.test_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
self.sock_dir = os.environ['SOCK_DIR']
|
|
self.tmp_sock_dir = False
|
|
Path(self.sock_dir).mkdir(parents=True, exist_ok=True)
|
|
except KeyError:
|
|
self.sock_dir = tempfile.mkdtemp()
|
|
self.tmp_sock_dir = True
|
|
|
|
self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR',
|
|
os.path.join(self.source_iotests,
|
|
'sample_images'))
|
|
|
|
self.output_dir = os.getcwd() # OUTPUT_DIR
|
|
|
|
def init_binaries(self) -> None:
|
|
"""Init binary path variables:
|
|
PYTHON (for bash tests)
|
|
QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG
|
|
SOCKET_SCM_HELPER
|
|
"""
|
|
self.python = sys.executable
|
|
|
|
def root(*names: str) -> str:
|
|
return os.path.join(self.build_root, *names)
|
|
|
|
arch = os.uname().machine
|
|
if 'ppc64' in arch:
|
|
arch = 'ppc64'
|
|
|
|
self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}'))
|
|
if not os.path.exists(self.qemu_prog):
|
|
pattern = root('qemu-system-*')
|
|
try:
|
|
progs = sorted(glob.iglob(pattern))
|
|
self.qemu_prog = next(p for p in progs if isxfile(p))
|
|
except StopIteration:
|
|
sys.exit("Not found any Qemu executable binary by pattern "
|
|
f"'{pattern}'")
|
|
|
|
self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img'))
|
|
self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io'))
|
|
self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd'))
|
|
self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon',
|
|
'qemu-storage-daemon'))
|
|
|
|
for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog,
|
|
self.qemu_prog, self.qsd_prog]:
|
|
if not os.path.exists(b):
|
|
sys.exit('No such file: ' + b)
|
|
if not isxfile(b):
|
|
sys.exit('Not executable: ' + b)
|
|
|
|
helper_path = os.path.join(self.build_iotests, 'socket_scm_helper')
|
|
if isxfile(helper_path):
|
|
self.socket_scm_helper = helper_path # SOCKET_SCM_HELPER
|
|
|
|
def __init__(self, imgfmt: str, imgproto: str, aiomode: str,
|
|
cachemode: Optional[str] = None,
|
|
imgopts: Optional[str] = None,
|
|
misalign: bool = False,
|
|
debug: bool = False,
|
|
valgrind: bool = False) -> None:
|
|
self.imgfmt = imgfmt
|
|
self.imgproto = imgproto
|
|
self.aiomode = aiomode
|
|
self.imgopts = imgopts
|
|
self.misalign = misalign
|
|
self.debug = debug
|
|
|
|
if valgrind:
|
|
self.valgrind_qemu = 'y'
|
|
|
|
if cachemode is None:
|
|
self.cachemode_is_default = 'true'
|
|
self.cachemode = 'writeback'
|
|
else:
|
|
self.cachemode_is_default = 'false'
|
|
self.cachemode = cachemode
|
|
|
|
# Initialize generic paths: build_root, build_iotests, source_iotests,
|
|
# which are needed to initialize some environment variables. They are
|
|
# used by init_*() functions as well.
|
|
|
|
if os.path.islink(sys.argv[0]):
|
|
# called from the build tree
|
|
self.source_iotests = os.path.dirname(os.readlink(sys.argv[0]))
|
|
self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0]))
|
|
else:
|
|
# called from the source tree
|
|
self.source_iotests = os.getcwd()
|
|
self.build_iotests = self.source_iotests
|
|
|
|
self.build_root = os.path.join(self.build_iotests, '..', '..')
|
|
|
|
self.init_directories()
|
|
self.init_binaries()
|
|
|
|
self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_',
|
|
str(random.randrange(1, 255)))
|
|
|
|
# QEMU_OPTIONS
|
|
self.qemu_options = '-nodefaults -display none -accel qtest'
|
|
machine_map = (
|
|
('arm', 'virt'),
|
|
('aarch64', 'virt'),
|
|
('avr', 'mega2560'),
|
|
('m68k', 'virt'),
|
|
('rx', 'gdbsim-r5f562n8'),
|
|
('tricore', 'tricore_testboard')
|
|
)
|
|
for suffix, machine in machine_map:
|
|
if self.qemu_prog.endswith(f'qemu-system-{suffix}'):
|
|
self.qemu_options += f' -machine {machine}'
|
|
|
|
# QEMU_DEFAULT_MACHINE
|
|
self.qemu_default_machine = get_default_machine(self.qemu_prog)
|
|
|
|
self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
|
|
self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
|
|
|
|
is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
|
|
self.imgfmt_generic = 'true' if is_generic else 'false'
|
|
|
|
self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
|
|
if self.misalign:
|
|
self.qemu_io_options += ' --misalign'
|
|
|
|
self.qemu_io_options_no_fmt = self.qemu_io_options
|
|
|
|
if self.imgfmt == 'luks':
|
|
self.imgoptssyntax = 'true'
|
|
self.imgkeysecret = '123456'
|
|
if not self.imgopts:
|
|
self.imgopts = 'iter-time=10'
|
|
elif 'iter-time=' not in self.imgopts:
|
|
self.imgopts += ',iter-time=10'
|
|
else:
|
|
self.imgoptssyntax = 'false'
|
|
self.qemu_io_options += ' -f ' + self.imgfmt
|
|
|
|
if self.imgfmt == 'vmdk':
|
|
if not self.imgopts:
|
|
self.imgopts = 'zeroed_grain=on'
|
|
elif 'zeroed_grain=' not in self.imgopts:
|
|
self.imgopts += ',zeroed_grain=on'
|
|
|
|
def close(self) -> None:
|
|
if self.tmp_sock_dir:
|
|
shutil.rmtree(self.sock_dir)
|
|
|
|
def __enter__(self) -> 'TestEnv':
|
|
return self
|
|
|
|
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
|
|
self.close()
|
|
|
|
def print_env(self) -> None:
|
|
template = """\
|
|
QEMU -- "{QEMU_PROG}" {QEMU_OPTIONS}
|
|
QEMU_IMG -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS}
|
|
QEMU_IO -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS}
|
|
QEMU_NBD -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS}
|
|
IMGFMT -- {IMGFMT}{imgopts}
|
|
IMGPROTO -- {IMGPROTO}
|
|
PLATFORM -- {platform}
|
|
TEST_DIR -- {TEST_DIR}
|
|
SOCK_DIR -- {SOCK_DIR}
|
|
SOCKET_SCM_HELPER -- {SOCKET_SCM_HELPER}
|
|
"""
|
|
|
|
args = collections.defaultdict(str, self.get_env())
|
|
|
|
if 'IMGOPTS' in args:
|
|
args['imgopts'] = f" ({args['IMGOPTS']})"
|
|
|
|
u = os.uname()
|
|
args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}'
|
|
|
|
print(template.format_map(args))
|