#!/usr/bin/env python3 # group: rw # # Test ssh image creation # # Copyright (C) 2018 Red Hat, Inc. # # Creator/Owner: Kevin Wolf # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import iotests import subprocess import re iotests.script_initialize( supported_fmts=['raw'], supported_protocols=['ssh'], ) def filter_hash(qmsg): def _filter(key, value): if key == 'hash' and re.match('[0-9a-f]+', value): return 'HASH' return value if isinstance(qmsg, str): # Strip key type and fingerprint p = r"\S+ (key fingerprint) '(md5|sha1|sha256):[0-9a-f]+'" return re.sub(p, r"\1 '\2:HASH'", qmsg) else: return iotests.filter_qmp(qmsg, _filter) def blockdev_create(vm, options): vm.blockdev_create(options, filters=[iotests.filter_qmp_testfiles, filter_hash]) with iotests.FilePath('t.img') as disk_path, \ iotests.VM() as vm: remote_path = iotests.remote_filename(disk_path) # # Successful image creation (defaults) # iotests.log("=== Successful image creation (defaults) ===") iotests.log("") vm.launch() blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' } }, 'size': 4194304 }) vm.shutdown() iotests.img_info_log(remote_path) iotests.log("") iotests.img_info_log(disk_path) # # Test host-key-check options # iotests.log("=== Test host-key-check options ===") iotests.log("") iotests.log("--- no host key checking --") iotests.log("") vm.launch() blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'none' } }, 'size': 8388608 }) vm.shutdown() iotests.img_info_log(remote_path) iotests.log("--- known_hosts key checking --") iotests.log("") vm.launch() blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'known_hosts' } }, 'size': 4194304 }) vm.shutdown() iotests.img_info_log(remote_path) keys = subprocess.check_output( 'ssh-keyscan 127.0.0.1 2>/dev/null | grep -v "\\^#" | ' + 'cut -d" " -f3', shell=True).rstrip().decode('ascii').split('\n') # Mappings of base64 representations to digests md5_keys = {} sha1_keys = {} sha256_keys = {} for key in keys: md5_keys[key] = subprocess.check_output( 'echo %s | base64 -d | md5sum -b | cut -d" " -f1' % key, shell=True).rstrip().decode('ascii') sha1_keys[key] = subprocess.check_output( 'echo %s | base64 -d | sha1sum -b | cut -d" " -f1' % key, shell=True).rstrip().decode('ascii') sha256_keys[key] = subprocess.check_output( 'echo %s | base64 -d | sha256sum -b | cut -d" " -f1' % key, shell=True).rstrip().decode('ascii') vm.launch() # Find correct key first matching_key = None for key in keys: result = vm.qmp('blockdev-add', driver='ssh', node_name='node0', path=disk_path, server={ 'host': '127.0.0.1', 'port': '22', }, host_key_check={ 'mode': 'hash', 'type': 'md5', 'hash': md5_keys[key], }) if 'error' not in result: vm.qmp('blockdev-del', node_name='node0') matching_key = key break if matching_key is None: vm.shutdown() iotests.notrun('Did not find a key that fits 127.0.0.1') iotests.log("--- explicit md5 key checking --") iotests.log("") blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'hash', 'type': 'md5', 'hash': 'wrong', } }, 'size': 2097152 }) blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'hash', 'type': 'md5', 'hash': md5_keys[matching_key], } }, 'size': 8388608 }) vm.shutdown() iotests.img_info_log(remote_path) iotests.log("--- explicit sha1 key checking --") iotests.log("") vm.launch() blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'hash', 'type': 'sha1', 'hash': 'wrong', } }, 'size': 2097152 }) blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'hash', 'type': 'sha1', 'hash': sha1_keys[matching_key], } }, 'size': 4194304 }) vm.shutdown() iotests.img_info_log(remote_path) iotests.log("--- explicit sha256 key checking --") iotests.log("") vm.launch() blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'hash', 'type': 'sha256', 'hash': 'wrong', } }, 'size': 2097152 }) blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'hash', 'type': 'sha256', 'hash': sha256_keys[matching_key], } }, 'size': 4194304 }) vm.shutdown() iotests.img_info_log(remote_path) # # Invalid path and user # iotests.log("=== Invalid path and user ===") iotests.log("") vm.launch() blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': '/this/is/not/an/existing/path', 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'none' } }, 'size': 4194304 }) blockdev_create(vm, { 'driver': 'ssh', 'location': { 'path': disk_path, 'user': 'invalid user', 'server': { 'host': '127.0.0.1', 'port': '22' }, 'host-key-check': { 'mode': 'none' } }, 'size': 4194304 }) vm.shutdown()