Python Pull request

Moves QMP-related tools not used for build or automatic testing from
 scripts/ to python/qemu/qmp/ where they will be protected from bitrot by
 the check-python-* CI jobs.
 
 stub forwarders are left in the old locations for now.
 -----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEE+ber27ys35W+dsvQfe+BBqr8OQ4FAmDNJhAACgkQfe+BBqr8
 OQ5uZg/8CxwwDVskXya2BWa0WHrqQGmods2XMVupp9qI5HQAPE6AzlypUzvoA+6o
 jr5uenG/CGSNZgWUfGN3UxCw85knEGLekU9u3Rx/N+CmlHvNPZz8UUg0oOy9+QIV
 owFYXPq4Uc9siUjvFhFNmC6ZIUGf2y5vXOECqjTZL4SM9Qq0F3mdrFUdT78AR2jw
 HTUipCiaZQLFArlxqIFvIv/wR4i+zh9cXs4A31THjoLBz6kkgX89D+yb9cUDwcjp
 fDBYWuAN7dJzt1jqxuD1yUj3rqwKj0MZa6QXd0pKBQt3fHJCE4geY+mGzmGd9nTx
 ZZ8RI64CEjH946eNpRFCl+DPqLNE88pTbRpUHXy3YOvNj1ZWGjPtA6aDqcpQdySL
 UcQxc5K0zKROh9vUIMmmXW4oGlv9xgilO0BnMq5oMTvolaflaF1JdPpmjd+0pHJ2
 u9/I/+Bc5tJEmSYPu4ASh0UVllAVSSjnopLcjLJv9g8yApDxuodfyNafdctbsVmL
 YjiSedsPI2Hawm7O54cE1D6fkjopxwFaEuv1CyTqiDcULk6qWSDj3KtnrDOp0BNV
 qpN6ReK75UHBzhkdgoKWLm3WuZZk9s2r58e9GAPb7UxR3EiaPOBLOYrKawooqiI7
 +LDTqXNbVQJBpe1IgkygZiy2GuQ8unQCsxqhjy7nn6M01/SrPEc=
 =YTLz
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/jsnow-gitlab/tags/python-pull-request' into staging

Python Pull request

Moves QMP-related tools not used for build or automatic testing from
scripts/ to python/qemu/qmp/ where they will be protected from bitrot by
the check-python-* CI jobs.

stub forwarders are left in the old locations for now.

# gpg: Signature made Sat 19 Jun 2021 00:02:40 BST
# gpg:                using RSA key F9B7ABDBBCACDF95BE76CBD07DEF8106AAFC390E
# gpg: Good signature from "John Snow (John Huston) <jsnow@redhat.com>" [full]
# Primary key fingerprint: FAEB 9711 A12C F475 812F  18F2 88A9 064D 1835 61EB
#      Subkey fingerprint: F9B7 ABDB BCAC DF95 BE76  CBD0 7DEF 8106 AAFC 390E

* remotes/jsnow-gitlab/tags/python-pull-request: (72 commits)
  scripts/qmp-shell: add redirection shim
  python: add qmp-shell entry point
  scripts/qmp-shell: move to python/qemu/qmp/qmp_shell.py
  scripts/qmp-shell: add docstrings
  scripts/qmp-shell: make QMPShellError inherit QMPError
  scripts/qmp-shell: remove double-underscores
  scripts/qmp-shell: convert usage comment to docstring
  scripts/qmp-shell: Remove too-broad-exception
  scripts/qmp-shell: Fix empty-transaction invocation
  scripts/qmp-shell: remove TODO
  scripts/qmp-shell: use logging to show warnings
  scripts/qmp-shell: Use context manager instead of atexit
  python/qmp: return generic type from context manager
  scripts/qmp-shell: unprivatize 'pretty' property
  scripts/qmp-shell: Accept SocketAddrT instead of string
  scripts/qmp-shell: add mypy types
  python/qmp: add QMPObject type alias
  scripts/qmp-shell: initialize completer early
  scripts/qmp-shell: refactor QMPCompleter
  scripts/qmp-shell: Fix "FuzzyJSON" parser
  ...

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2021-06-21 16:11:33 +01:00
commit 0add99ea3e
15 changed files with 1713 additions and 1155 deletions

97
python/Pipfile.lock generated
View File

@ -22,6 +22,13 @@
}
},
"develop": {
"appdirs": {
"hashes": [
"sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41",
"sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"
],
"version": "==1.4.4"
},
"astroid": {
"hashes": [
"sha256:4db03ab5fc3340cf619dbc25e42c2cc3755154ce6009469766d7143d1fc2ee4e",
@ -38,6 +45,20 @@
"markers": "python_version >= '3.6'",
"version": "==88.1"
},
"distlib": {
"hashes": [
"sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736",
"sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c"
],
"version": "==0.3.2"
},
"filelock": {
"hashes": [
"sha256:18d82244ee114f543149c66a6e0c14e9c4f8a1044b5cdaadd0f82159d6a6ff59",
"sha256:929b7d63ec5b7d6b71b0fa5ac14e030b3f70b75747cef1b10da9b879fef15836"
],
"version": "==3.0.12"
},
"flake8": {
"hashes": [
"sha256:07528381786f2a6237b061f6e96610a4167b226cb926e2aa2b6b1d78057c576b",
@ -46,6 +67,12 @@
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==3.9.2"
},
"fusepy": {
"hashes": [
"sha256:72ff783ec2f43de3ab394e3f7457605bf04c8cf288a2f4068b4cde141d4ee6bd"
],
"version": "==3.0.1"
},
"importlib-metadata": {
"hashes": [
"sha256:8c501196e49fb9df5df43833bdb1e4328f64847763ec8a50703148b73784d581",
@ -54,6 +81,14 @@
"markers": "python_version < '3.8'",
"version": "==4.0.1"
},
"importlib-resources": {
"hashes": [
"sha256:54161657e8ffc76596c4ede7080ca68cb02962a2e074a2586b695a93a925d36e",
"sha256:e962bff7440364183203d179d7ae9ad90cb1f2b74dcb84300e88ecc42dca3351"
],
"markers": "python_version < '3.7'",
"version": "==5.1.4"
},
"isort": {
"hashes": [
"sha256:0a943902919f65c5684ac4e0154b1ad4fac6dcaa5d9f3426b732f1c8b5419be6",
@ -132,6 +167,30 @@
],
"version": "==0.4.3"
},
"packaging": {
"hashes": [
"sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5",
"sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==20.9"
},
"pluggy": {
"hashes": [
"sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0",
"sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.13.1"
},
"py": {
"hashes": [
"sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3",
"sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.10.0"
},
"pycodestyle": {
"hashes": [
"sha256:514f76d918fcc0b55c6680472f0a37970994e07bbb80725808c17089be302068",
@ -156,18 +215,42 @@
"markers": "python_version ~= '3.6'",
"version": "==2.8.2"
},
"pyparsing": {
"hashes": [
"sha256:c203ec8783bf771a155b207279b9bccb8dea02d8f0c9e5f8ead507bc3246ecc1",
"sha256:ef9d7589ef3c200abe66653d3f1ab1033c3c419ae9b9bdb1240a85b024efc88b"
],
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==2.4.7"
},
"qemu": {
"editable": true,
"path": "."
},
"six": {
"hashes": [
"sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926",
"sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==1.16.0"
},
"toml": {
"hashes": [
"sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b",
"sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"
],
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2'",
"markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==0.10.2"
},
"tox": {
"hashes": [
"sha256:307a81ddb82bd463971a273f33e9533a24ed22185f27db8ce3386bff27d324e3",
"sha256:b0b5818049a1c1997599d42012a637a33f24c62ab8187223fdd318fa8522637b"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==3.23.1"
},
"typed-ast": {
"hashes": [
"sha256:01ae5f73431d21eead5015997ab41afa53aa1fbe252f9da060be5dad2c730ace",
@ -201,7 +284,7 @@
"sha256:f8afcf15cc511ada719a88e013cec87c11aff7b91f019295eb4530f96fe5ef2f",
"sha256:fb1bbeac803adea29cedd70781399c99138358c26d05fcbd23c13016b7f5ec65"
],
"markers": "implementation_name == 'cpython' and python_version < '3.8'",
"markers": "python_version < '3.8' and implementation_name == 'cpython'",
"version": "==1.4.3"
},
"typing-extensions": {
@ -213,6 +296,14 @@
"markers": "python_version < '3.8'",
"version": "==3.10.0.0"
},
"virtualenv": {
"hashes": [
"sha256:14fdf849f80dbb29a4eb6caa9875d476ee2a5cf76a5f5415fa2f1606010ab467",
"sha256:2b0126166ea7c9c3661f5b8e06773d28f83322de7a3ff7d06f0aed18c9de6a76"
],
"markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'",
"version": "==20.4.7"
},
"wrapt": {
"hashes": [
"sha256:b62ffa81fb85f4332a4f609cab4ac40709470da05643a082ec1eb88e6d9b97d7"
@ -224,7 +315,7 @@
"sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76",
"sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098"
],
"markers": "python_version >= '3.6'",
"markers": "python_version < '3.10'",
"version": "==3.4.1"
}
}

View File

@ -30,21 +30,30 @@ from typing import (
TextIO,
Tuple,
Type,
TypeVar,
Union,
cast,
)
# QMPMessage is a QMP Message of any kind.
# e.g. {'yee': 'haw'}
#
# QMPReturnValue is the inner value of return values only.
# {'return': {}} is the QMPMessage,
# {} is the QMPReturnValue.
#: QMPMessage is an entire QMP message of any kind.
QMPMessage = Dict[str, Any]
QMPReturnValue = Dict[str, Any]
InternetAddrT = Tuple[str, str]
#: QMPReturnValue is the 'return' value of a command.
QMPReturnValue = object
#: QMPObject is any object in a QMP message.
QMPObject = Dict[str, object]
# QMPMessage can be outgoing commands or incoming events/returns.
# QMPReturnValue is usually a dict/json object, but due to QAPI's
# 'returns-whitelist', it can actually be anything.
#
# {'return': {}} is a QMPMessage,
# {} is the QMPReturnValue.
InternetAddrT = Tuple[str, int]
UnixAddrT = str
SocketAddrT = Union[InternetAddrT, UnixAddrT]
@ -92,6 +101,12 @@ class QMPResponseError(QMPError):
self.reply = reply
class QMPBadPortError(QMPError):
"""
Unable to parse socket address: Port was non-numerical.
"""
class QEMUMonitorProtocol:
"""
Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
@ -206,7 +221,9 @@ class QEMUMonitorProtocol:
if ret is None:
raise QMPConnectError("Error while reading from socket")
def __enter__(self) -> 'QEMUMonitorProtocol':
T = TypeVar('T')
def __enter__(self: T) -> T:
# Implement context manager enter function.
return self
@ -219,6 +236,26 @@ class QEMUMonitorProtocol:
# Implement context manager exit function.
self.close()
@classmethod
def parse_address(cls, address: str) -> SocketAddrT:
"""
Parse a string into a QMP address.
Figure out if the argument is in the port:host form.
If it's not, it's probably a file path.
"""
components = address.split(':')
if len(components) == 2:
try:
port = int(components[1])
except ValueError:
msg = f"Bad port: '{components[1]}' in '{address}'."
raise QMPBadPortError(msg) from None
return (components[0], port)
# Treat as filepath.
return address
def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
"""
Connect to the QMP Monitor and perform capabilities negotiation.
@ -271,8 +308,8 @@ class QEMUMonitorProtocol:
return resp
def cmd(self, name: str,
args: Optional[Dict[str, Any]] = None,
cmd_id: Optional[Any] = None) -> QMPMessage:
args: Optional[Dict[str, object]] = None,
cmd_id: Optional[object] = None) -> QMPMessage:
"""
Build a QMP command and send it to the QMP Monitor.
@ -287,7 +324,7 @@ class QEMUMonitorProtocol:
qmp_cmd['id'] = cmd_id
return self.cmd_obj(qmp_cmd)
def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
"""
Build and send a QMP command to the monitor, report errors if any
"""

View File

@ -0,0 +1,323 @@
"""
QEMU Guest Agent Client
Usage:
Start QEMU with:
# qemu [...] -chardev socket,path=/tmp/qga.sock,server,wait=off,id=qga0 \
-device virtio-serial \
-device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0
Run the script:
$ qemu-ga-client --address=/tmp/qga.sock <command> [args...]
or
$ export QGA_CLIENT_ADDRESS=/tmp/qga.sock
$ qemu-ga-client <command> [args...]
For example:
$ qemu-ga-client cat /etc/resolv.conf
# Generated by NetworkManager
nameserver 10.0.2.3
$ qemu-ga-client fsfreeze status
thawed
$ qemu-ga-client fsfreeze freeze
2 filesystems frozen
See also: https://wiki.qemu.org/Features/QAPI/GuestAgent
"""
# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
import argparse
import base64
import errno
import os
import random
import sys
from typing import (
Any,
Callable,
Dict,
Optional,
Sequence,
)
from qemu import qmp
from qemu.qmp import SocketAddrT
# This script has not seen many patches or careful attention in quite
# some time. If you would like to improve it, please review the design
# carefully and add docstrings at that point in time. Until then:
# pylint: disable=missing-docstring
class QemuGuestAgent(qmp.QEMUMonitorProtocol):
def __getattr__(self, name: str) -> Callable[..., Any]:
def wrapper(**kwds: object) -> object:
return self.command('guest-' + name.replace('_', '-'), **kwds)
return wrapper
class QemuGuestAgentClient:
def __init__(self, address: SocketAddrT):
self.qga = QemuGuestAgent(address)
self.qga.connect(negotiate=False)
def sync(self, timeout: Optional[float] = 3) -> None:
# Avoid being blocked forever
if not self.ping(timeout):
raise EnvironmentError('Agent seems not alive')
uid = random.randint(0, (1 << 32) - 1)
while True:
ret = self.qga.sync(id=uid)
if isinstance(ret, int) and int(ret) == uid:
break
def __file_read_all(self, handle: int) -> bytes:
eof = False
data = b''
while not eof:
ret = self.qga.file_read(handle=handle, count=1024)
_data = base64.b64decode(ret['buf-b64'])
data += _data
eof = ret['eof']
return data
def read(self, path: str) -> bytes:
handle = self.qga.file_open(path=path)
try:
data = self.__file_read_all(handle)
finally:
self.qga.file_close(handle=handle)
return data
def info(self) -> str:
info = self.qga.info()
msgs = []
msgs.append('version: ' + info['version'])
msgs.append('supported_commands:')
enabled = [c['name'] for c in info['supported_commands']
if c['enabled']]
msgs.append('\tenabled: ' + ', '.join(enabled))
disabled = [c['name'] for c in info['supported_commands']
if not c['enabled']]
msgs.append('\tdisabled: ' + ', '.join(disabled))
return '\n'.join(msgs)
@classmethod
def __gen_ipv4_netmask(cls, prefixlen: int) -> str:
mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2)
return '.'.join([str(mask >> 24),
str((mask >> 16) & 0xff),
str((mask >> 8) & 0xff),
str(mask & 0xff)])
def ifconfig(self) -> str:
nifs = self.qga.network_get_interfaces()
msgs = []
for nif in nifs:
msgs.append(nif['name'] + ':')
if 'ip-addresses' in nif:
for ipaddr in nif['ip-addresses']:
if ipaddr['ip-address-type'] == 'ipv4':
addr = ipaddr['ip-address']
mask = self.__gen_ipv4_netmask(int(ipaddr['prefix']))
msgs.append(f"\tinet {addr} netmask {mask}")
elif ipaddr['ip-address-type'] == 'ipv6':
addr = ipaddr['ip-address']
prefix = ipaddr['prefix']
msgs.append(f"\tinet6 {addr} prefixlen {prefix}")
if nif['hardware-address'] != '00:00:00:00:00:00':
msgs.append("\tether " + nif['hardware-address'])
return '\n'.join(msgs)
def ping(self, timeout: Optional[float]) -> bool:
self.qga.settimeout(timeout)
try:
self.qga.ping()
except TimeoutError:
return False
return True
def fsfreeze(self, cmd: str) -> object:
if cmd not in ['status', 'freeze', 'thaw']:
raise Exception('Invalid command: ' + cmd)
# Can be int (freeze, thaw) or GuestFsfreezeStatus (status)
return getattr(self.qga, 'fsfreeze' + '_' + cmd)()
def fstrim(self, minimum: int) -> Dict[str, object]:
# returns GuestFilesystemTrimResponse
ret = getattr(self.qga, 'fstrim')(minimum=minimum)
assert isinstance(ret, dict)
return ret
def suspend(self, mode: str) -> None:
if mode not in ['disk', 'ram', 'hybrid']:
raise Exception('Invalid mode: ' + mode)
try:
getattr(self.qga, 'suspend' + '_' + mode)()
# On error exception will raise
except TimeoutError:
# On success command will timed out
return
def shutdown(self, mode: str = 'powerdown') -> None:
if mode not in ['powerdown', 'halt', 'reboot']:
raise Exception('Invalid mode: ' + mode)
try:
self.qga.shutdown(mode=mode)
except TimeoutError:
pass
def _cmd_cat(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
if len(args) != 1:
print('Invalid argument')
print('Usage: cat <file>')
sys.exit(1)
print(client.read(args[0]))
def _cmd_fsfreeze(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
usage = 'Usage: fsfreeze status|freeze|thaw'
if len(args) != 1:
print('Invalid argument')
print(usage)
sys.exit(1)
if args[0] not in ['status', 'freeze', 'thaw']:
print('Invalid command: ' + args[0])
print(usage)
sys.exit(1)
cmd = args[0]
ret = client.fsfreeze(cmd)
if cmd == 'status':
print(ret)
return
assert isinstance(ret, int)
verb = 'frozen' if cmd == 'freeze' else 'thawed'
print(f"{ret:d} filesystems {verb}")
def _cmd_fstrim(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
if len(args) == 0:
minimum = 0
else:
minimum = int(args[0])
print(client.fstrim(minimum))
def _cmd_ifconfig(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
assert not args
print(client.ifconfig())
def _cmd_info(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
assert not args
print(client.info())
def _cmd_ping(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
timeout = 3.0 if len(args) == 0 else float(args[0])
alive = client.ping(timeout)
if not alive:
print("Not responded in %s sec" % args[0])
sys.exit(1)
def _cmd_suspend(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
usage = 'Usage: suspend disk|ram|hybrid'
if len(args) != 1:
print('Less argument')
print(usage)
sys.exit(1)
if args[0] not in ['disk', 'ram', 'hybrid']:
print('Invalid command: ' + args[0])
print(usage)
sys.exit(1)
client.suspend(args[0])
def _cmd_shutdown(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
assert not args
client.shutdown()
_cmd_powerdown = _cmd_shutdown
def _cmd_halt(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
assert not args
client.shutdown('halt')
def _cmd_reboot(client: QemuGuestAgentClient, args: Sequence[str]) -> None:
assert not args
client.shutdown('reboot')
commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m]
def send_command(address: str, cmd: str, args: Sequence[str]) -> None:
if not os.path.exists(address):
print('%s not found' % address)
sys.exit(1)
if cmd not in commands:
print('Invalid command: ' + cmd)
print('Available commands: ' + ', '.join(commands))
sys.exit(1)
try:
client = QemuGuestAgentClient(address)
except OSError as err:
print(err)
if err.errno == errno.ECONNREFUSED:
print('Hint: qemu is not running?')
sys.exit(1)
if cmd == 'fsfreeze' and args[0] == 'freeze':
client.sync(60)
elif cmd != 'ping':
client.sync()
globals()['_cmd_' + cmd](client, args)
def main() -> None:
address = os.environ.get('QGA_CLIENT_ADDRESS')
parser = argparse.ArgumentParser()
parser.add_argument('--address', action='store',
default=address,
help='Specify a ip:port pair or a unix socket path')
parser.add_argument('command', choices=commands)
parser.add_argument('args', nargs='*')
args = parser.parse_args()
if args.address is None:
parser.error('address is not specified')
sys.exit(1)
send_command(args.address, args.command, args.args)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,535 @@
#
# Copyright (C) 2009, 2010 Red Hat Inc.
#
# Authors:
# Luiz Capitulino <lcapitulino@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
#
"""
Low-level QEMU shell on top of QMP.
usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
positional arguments:
qmp_server < UNIX socket path | TCP address:port >
optional arguments:
-h, --help show this help message and exit
-H, --hmp Use HMP interface
-N, --skip-negotiation
Skip negotiate (for qemu-ga)
-v, --verbose Verbose (echo commands sent and received)
-p, --pretty Pretty-print JSON
Start QEMU with:
# qemu [...] -qmp unix:./qmp-sock,server
Run the shell:
$ qmp-shell ./qmp-sock
Commands have the following format:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
For example:
(QEMU) device_add driver=e1000 id=net1
{'return': {}}
(QEMU)
key=value pairs also support Python or JSON object literal subset notations,
without spaces. Dictionaries/objects {} are supported as are arrays [].
example-command arg-name1={'key':'value','obj'={'prop':"value"}}
Both JSON and Python formatting should work, including both styles of
string literal quotes. Both paradigms of literal values should work,
including null/true/false for JSON and None/True/False for Python.
Transactions have the following multi-line format:
transaction(
action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
...
action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
)
One line transactions are also supported:
transaction( action-name1 ... )
For example:
(QEMU) transaction(
TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
TRANS> )
{"return": {}}
(QEMU)
Use the -v and -p options to activate the verbose and pretty-print options,
which will echo back the properly formatted JSON-compliant QMP that is being
sent to QEMU, which is useful for debugging and documentation generation.
"""
import argparse
import ast
import json
import logging
import os
import re
import readline
import sys
from typing import (
Iterator,
List,
NoReturn,
Optional,
Sequence,
)
from qemu import qmp
from qemu.qmp import QMPMessage
LOG = logging.getLogger(__name__)
class QMPCompleter:
"""
QMPCompleter provides a readline library tab-complete behavior.
"""
# NB: Python 3.9+ will probably allow us to subclass list[str] directly,
# but pylint as of today does not know that List[str] is simply 'list'.
def __init__(self) -> None:
self._matches: List[str] = []
def append(self, value: str) -> None:
"""Append a new valid completion to the list of possibilities."""
return self._matches.append(value)
def complete(self, text: str, state: int) -> Optional[str]:
"""readline.set_completer() callback implementation."""
for cmd in self._matches:
if cmd.startswith(text):
if state == 0:
return cmd
state -= 1
return None
class QMPShellError(qmp.QMPError):
"""
QMP Shell Base error class.
"""
class FuzzyJSON(ast.NodeTransformer):
"""
This extension of ast.NodeTransformer filters literal "true/false/null"
values in a Python AST and replaces them by proper "True/False/None" values
that Python can properly evaluate.
"""
@classmethod
def visit_Name(cls, # pylint: disable=invalid-name
node: ast.Name) -> ast.AST:
"""
Transform Name nodes with certain values into Constant (keyword) nodes.
"""
if node.id == 'true':
return ast.Constant(value=True)
if node.id == 'false':
return ast.Constant(value=False)
if node.id == 'null':
return ast.Constant(value=None)
return node
class QMPShell(qmp.QEMUMonitorProtocol):
"""
QMPShell provides a basic readline-based QMP shell.
:param address: Address of the QMP server.
:param pretty: Pretty-print QMP messages.
:param verbose: Echo outgoing QMP messages to console.
"""
def __init__(self, address: qmp.SocketAddrT,
pretty: bool = False, verbose: bool = False):
super().__init__(address)
self._greeting: Optional[QMPMessage] = None
self._completer = QMPCompleter()
self._transmode = False
self._actions: List[QMPMessage] = []
self._histfile = os.path.join(os.path.expanduser('~'),
'.qmp-shell_history')
self.pretty = pretty
self.verbose = verbose
def close(self) -> None:
# Hook into context manager of parent to save shell history.
self._save_history()
super().close()
def _fill_completion(self) -> None:
cmds = self.cmd('query-commands')
if 'error' in cmds:
return
for cmd in cmds['return']:
self._completer.append(cmd['name'])
def _completer_setup(self) -> None:
self._completer = QMPCompleter()
self._fill_completion()
readline.set_history_length(1024)
readline.set_completer(self._completer.complete)
readline.parse_and_bind("tab: complete")
# NB: default delimiters conflict with some command names
# (eg. query-), clearing everything as it doesn't seem to matter
readline.set_completer_delims('')
try:
readline.read_history_file(self._histfile)
except FileNotFoundError:
pass
except IOError as err:
msg = f"Failed to read history '{self._histfile}': {err!s}"
LOG.warning(msg)
def _save_history(self) -> None:
try:
readline.write_history_file(self._histfile)
except IOError as err:
msg = f"Failed to save history file '{self._histfile}': {err!s}"
LOG.warning(msg)
@classmethod
def _parse_value(cls, val: str) -> object:
try:
return int(val)
except ValueError:
pass
if val.lower() == 'true':
return True
if val.lower() == 'false':
return False
if val.startswith(('{', '[')):
# Try first as pure JSON:
try:
return json.loads(val)
except ValueError:
pass
# Try once again as FuzzyJSON:
try:
tree = ast.parse(val, mode='eval')
transformed = FuzzyJSON().visit(tree)
return ast.literal_eval(transformed)
except (SyntaxError, ValueError):
pass
return val
def _cli_expr(self,
tokens: Sequence[str],
parent: qmp.QMPObject) -> None:
for arg in tokens:
(key, sep, val) = arg.partition('=')
if sep != '=':
raise QMPShellError(
f"Expected a key=value pair, got '{arg!s}'"
)
value = self._parse_value(val)
optpath = key.split('.')
curpath = []
for path in optpath[:-1]:
curpath.append(path)
obj = parent.get(path, {})
if not isinstance(obj, dict):
msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
raise QMPShellError(msg.format('.'.join(curpath)))
parent[path] = obj
parent = obj
if optpath[-1] in parent:
if isinstance(parent[optpath[-1]], dict):
msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
raise QMPShellError(msg.format('.'.join(curpath)))
raise QMPShellError(f'Cannot set "{key}" multiple times')
parent[optpath[-1]] = value
def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
"""
Build a QMP input object from a user provided command-line in the
following format:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
"""
argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
cmdargs = re.findall(argument_regex, cmdline)
qmpcmd: QMPMessage
# Transactional CLI entry:
if cmdargs and cmdargs[0] == 'transaction(':
self._transmode = True
self._actions = []
cmdargs.pop(0)
# Transactional CLI exit:
if cmdargs and cmdargs[0] == ')' and self._transmode:
self._transmode = False
if len(cmdargs) > 1:
msg = 'Unexpected input after close of Transaction sub-shell'
raise QMPShellError(msg)
qmpcmd = {
'execute': 'transaction',
'arguments': {'actions': self._actions}
}
return qmpcmd
# No args, or no args remaining
if not cmdargs:
return None
if self._transmode:
# Parse and cache this Transactional Action
finalize = False
action = {'type': cmdargs[0], 'data': {}}
if cmdargs[-1] == ')':
cmdargs.pop(-1)
finalize = True
self._cli_expr(cmdargs[1:], action['data'])
self._actions.append(action)
return self._build_cmd(')') if finalize else None
# Standard command: parse and return it to be executed.
qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
return qmpcmd
def _print(self, qmp_message: object) -> None:
jsobj = json.dumps(qmp_message,
indent=4 if self.pretty else None,
sort_keys=self.pretty)
print(str(jsobj))
def _execute_cmd(self, cmdline: str) -> bool:
try:
qmpcmd = self._build_cmd(cmdline)
except QMPShellError as err:
print(
f"Error while parsing command line: {err!s}\n"
"command format: <command-name> "
"[arg-name1=arg1] ... [arg-nameN=argN",
file=sys.stderr
)
return True
# For transaction mode, we may have just cached the action:
if qmpcmd is None:
return True
if self.verbose:
self._print(qmpcmd)
resp = self.cmd_obj(qmpcmd)
if resp is None:
print('Disconnected')
return False
self._print(resp)
return True
def connect(self, negotiate: bool = True) -> None:
self._greeting = super().connect(negotiate)
self._completer_setup()
def show_banner(self,
msg: str = 'Welcome to the QMP low-level shell!') -> None:
"""
Print to stdio a greeting, and the QEMU version if available.
"""
print(msg)
if not self._greeting:
print('Connected')
return
version = self._greeting['QMP']['version']['qemu']
print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
@property
def prompt(self) -> str:
"""
Return the current shell prompt, including a trailing space.
"""
if self._transmode:
return 'TRANS> '
return '(QEMU) '
def read_exec_command(self) -> bool:
"""
Read and execute a command.
@return True if execution was ok, return False if disconnected.
"""
try:
cmdline = input(self.prompt)
except EOFError:
print()
return False
if cmdline == '':
for event in self.get_events():
print(event)
self.clear_events()
return True
return self._execute_cmd(cmdline)
def repl(self) -> Iterator[None]:
"""
Return an iterator that implements the REPL.
"""
self.show_banner()
while self.read_exec_command():
yield
self.close()
class HMPShell(QMPShell):
"""
HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
:param address: Address of the QMP server.
:param pretty: Pretty-print QMP messages.
:param verbose: Echo outgoing QMP messages to console.
"""
def __init__(self, address: qmp.SocketAddrT,
pretty: bool = False, verbose: bool = False):
super().__init__(address, pretty, verbose)
self._cpu_index = 0
def _cmd_completion(self) -> None:
for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
if cmd and cmd[0] != '[' and cmd[0] != '\t':
name = cmd.split()[0] # drop help text
if name == 'info':
continue
if name.find('|') != -1:
# Command in the form 'foobar|f' or 'f|foobar', take the
# full name
opt = name.split('|')
if len(opt[0]) == 1:
name = opt[1]
else:
name = opt[0]
self._completer.append(name)
self._completer.append('help ' + name) # help completion
def _info_completion(self) -> None:
for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
if cmd:
self._completer.append('info ' + cmd.split()[1])
def _other_completion(self) -> None:
# special cases
self._completer.append('help info')
def _fill_completion(self) -> None:
self._cmd_completion()
self._info_completion()
self._other_completion()
def _cmd_passthrough(self, cmdline: str,
cpu_index: int = 0) -> QMPMessage:
return self.cmd_obj({
'execute': 'human-monitor-command',
'arguments': {
'command-line': cmdline,
'cpu-index': cpu_index
}
})
def _execute_cmd(self, cmdline: str) -> bool:
if cmdline.split()[0] == "cpu":
# trap the cpu command, it requires special setting
try:
idx = int(cmdline.split()[1])
if 'return' not in self._cmd_passthrough('info version', idx):
print('bad CPU index')
return True
self._cpu_index = idx
except ValueError:
print('cpu command takes an integer argument')
return True
resp = self._cmd_passthrough(cmdline, self._cpu_index)
if resp is None:
print('Disconnected')
return False
assert 'return' in resp or 'error' in resp
if 'return' in resp:
# Success
if len(resp['return']) > 0:
print(resp['return'], end=' ')
else:
# Error
print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
return True
def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
QMPShell.show_banner(self, msg)
def die(msg: str) -> NoReturn:
"""Write an error to stderr, then exit with a return code of 1."""
sys.stderr.write('ERROR: %s\n' % msg)
sys.exit(1)
def main() -> None:
"""
qmp-shell entry point: parse command line arguments and start the REPL.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--hmp', action='store_true',
help='Use HMP interface')
parser.add_argument('-N', '--skip-negotiation', action='store_true',
help='Skip negotiate (for qemu-ga)')
parser.add_argument('-v', '--verbose', action='store_true',
help='Verbose (echo commands sent and received)')
parser.add_argument('-p', '--pretty', action='store_true',
help='Pretty-print JSON')
default_server = os.environ.get('QMP_SOCKET')
parser.add_argument('qmp_server', action='store',
default=default_server,
help='< UNIX socket path | TCP address:port >')
args = parser.parse_args()
if args.qmp_server is None:
parser.error("QMP socket or TCP address must be specified")
shell_class = HMPShell if args.hmp else QMPShell
try:
address = shell_class.parse_address(args.qmp_server)
except qmp.QMPBadPortError:
parser.error(f"Bad port number: {args.qmp_server}")
return # pycharm doesn't know error() is noreturn
with shell_class(address, args.pretty, args.verbose) as qemu:
try:
qemu.connect(negotiate=not args.skip_negotiation)
except qmp.QMPConnectError:
die("Didn't get QMP greeting message")
except qmp.QMPCapabilitiesError:
die("Couldn't negotiate capabilities")
except OSError as err:
die(f"Couldn't connect to {args.qmp_server}: {err!s}")
for _ in qemu.repl():
pass
if __name__ == '__main__':
main()

272
python/qemu/qmp/qom.py Normal file
View File

@ -0,0 +1,272 @@
"""
QEMU Object Model testing tools.
usage: qom [-h] {set,get,list,tree,fuse} ...
Query and manipulate QOM data
optional arguments:
-h, --help show this help message and exit
QOM commands:
{set,get,list,tree,fuse}
set Set a QOM property value
get Get a QOM property value
list List QOM properties at a given path
tree Show QOM tree from a given path
fuse Mount a QOM tree as a FUSE filesystem
"""
##
# Copyright John Snow 2020, for Red Hat, Inc.
# Copyright IBM, Corp. 2011
#
# Authors:
# John Snow <jsnow@redhat.com>
# Anthony Liguori <aliguori@amazon.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later.
# See the COPYING file in the top-level directory.
#
# Based on ./scripts/qmp/qom-[set|get|tree|list]
##
import argparse
from . import QMPResponseError
from .qom_common import QOMCommand
try:
from .qom_fuse import QOMFuse
except ModuleNotFoundError as err:
if err.name != 'fuse':
raise
else:
assert issubclass(QOMFuse, QOMCommand)
class QOMSet(QOMCommand):
"""
QOM Command - Set a property to a given value.
usage: qom-set [-h] [--socket SOCKET] <path>.<property> <value>
Set a QOM property value
positional arguments:
<path>.<property> QOM path and property, separated by a period '.'
<value> new QOM property value
optional arguments:
-h, --help show this help message and exit
--socket SOCKET, -s SOCKET
QMP socket path or address (addr:port). May also be
set via QMP_SOCKET environment variable.
"""
name = 'set'
help = 'Set a QOM property value'
@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
super().configure_parser(parser)
cls.add_path_prop_arg(parser)
parser.add_argument(
'value',
metavar='<value>',
action='store',
help='new QOM property value'
)
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.path, self.prop = args.path_prop.rsplit('.', 1)
self.value = args.value
def run(self) -> int:
rsp = self.qmp.command(
'qom-set',
path=self.path,
property=self.prop,
value=self.value
)
print(rsp)
return 0
class QOMGet(QOMCommand):
"""
QOM Command - Get a property's current value.
usage: qom-get [-h] [--socket SOCKET] <path>.<property>
Get a QOM property value
positional arguments:
<path>.<property> QOM path and property, separated by a period '.'
optional arguments:
-h, --help show this help message and exit
--socket SOCKET, -s SOCKET
QMP socket path or address (addr:port). May also be
set via QMP_SOCKET environment variable.
"""
name = 'get'
help = 'Get a QOM property value'
@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
super().configure_parser(parser)
cls.add_path_prop_arg(parser)
def __init__(self, args: argparse.Namespace):
super().__init__(args)
try:
tmp = args.path_prop.rsplit('.', 1)
except ValueError as err:
raise ValueError('Invalid format for <path>.<property>') from err
self.path = tmp[0]
self.prop = tmp[1]
def run(self) -> int:
rsp = self.qmp.command(
'qom-get',
path=self.path,
property=self.prop
)
if isinstance(rsp, dict):
for key, value in rsp.items():
print(f"{key}: {value}")
else:
print(rsp)
return 0
class QOMList(QOMCommand):
"""
QOM Command - List the properties at a given path.
usage: qom-list [-h] [--socket SOCKET] <path>
List QOM properties at a given path
positional arguments:
<path> QOM path
optional arguments:
-h, --help show this help message and exit
--socket SOCKET, -s SOCKET
QMP socket path or address (addr:port). May also be
set via QMP_SOCKET environment variable.
"""
name = 'list'
help = 'List QOM properties at a given path'
@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
super().configure_parser(parser)
parser.add_argument(
'path',
metavar='<path>',
action='store',
help='QOM path',
)
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.path = args.path
def run(self) -> int:
rsp = self.qom_list(self.path)
for item in rsp:
if item.child:
print(f"{item.name}/")
elif item.link:
print(f"@{item.name}/")
else:
print(item.name)
return 0
class QOMTree(QOMCommand):
"""
QOM Command - Show the full tree below a given path.
usage: qom-tree [-h] [--socket SOCKET] [<path>]
Show QOM tree from a given path
positional arguments:
<path> QOM path
optional arguments:
-h, --help show this help message and exit
--socket SOCKET, -s SOCKET
QMP socket path or address (addr:port). May also be
set via QMP_SOCKET environment variable.
"""
name = 'tree'
help = 'Show QOM tree from a given path'
@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
super().configure_parser(parser)
parser.add_argument(
'path',
metavar='<path>',
action='store',
help='QOM path',
nargs='?',
default='/'
)
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.path = args.path
def _list_node(self, path: str) -> None:
print(path)
items = self.qom_list(path)
for item in items:
if item.child:
continue
try:
rsp = self.qmp.command('qom-get', path=path,
property=item.name)
print(f" {item.name}: {rsp} ({item.type})")
except QMPResponseError as err:
print(f" {item.name}: <EXCEPTION: {err!s}> ({item.type})")
print('')
for item in items:
if not item.child:
continue
if path == '/':
path = ''
self._list_node(f"{path}/{item.name}")
def run(self) -> int:
self._list_node(self.path)
return 0
def main() -> int:
"""QOM script main entry point."""
parser = argparse.ArgumentParser(
description='Query and manipulate QOM data'
)
subparsers = parser.add_subparsers(
title='QOM commands',
dest='command'
)
for command in QOMCommand.__subclasses__():
command.register(subparsers)
args = parser.parse_args()
if args.command is None:
parser.error('Command not specified.')
return 1
cmd_class = args.cmd_class
assert isinstance(cmd_class, type(QOMCommand))
return cmd_class.command_runner(args)

View File

@ -0,0 +1,178 @@
"""
QOM Command abstractions.
"""
##
# Copyright John Snow 2020, for Red Hat, Inc.
# Copyright IBM, Corp. 2011
#
# Authors:
# John Snow <jsnow@redhat.com>
# Anthony Liguori <aliguori@amazon.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later.
# See the COPYING file in the top-level directory.
#
# Based on ./scripts/qmp/qom-[set|get|tree|list]
##
import argparse
import os
import sys
from typing import (
Any,
Dict,
List,
Optional,
Type,
TypeVar,
)
from . import QEMUMonitorProtocol, QMPError
# The following is needed only for a type alias.
Subparsers = argparse._SubParsersAction # pylint: disable=protected-access
class ObjectPropertyInfo:
"""
Represents the return type from e.g. qom-list.
"""
def __init__(self, name: str, type_: str,
description: Optional[str] = None,
default_value: Optional[object] = None):
self.name = name
self.type = type_
self.description = description
self.default_value = default_value
@classmethod
def make(cls, value: Dict[str, Any]) -> 'ObjectPropertyInfo':
"""
Build an ObjectPropertyInfo from a Dict with an unknown shape.
"""
assert value.keys() >= {'name', 'type'}
assert value.keys() <= {'name', 'type', 'description', 'default-value'}
return cls(value['name'], value['type'],
value.get('description'),
value.get('default-value'))
@property
def child(self) -> bool:
"""Is this property a child property?"""
return self.type.startswith('child<')
@property
def link(self) -> bool:
"""Is this property a link property?"""
return self.type.startswith('link<')
CommandT = TypeVar('CommandT', bound='QOMCommand')
class QOMCommand:
"""
Represents a QOM sub-command.
:param args: Parsed arguments, as returned from parser.parse_args.
"""
name: str
help: str
def __init__(self, args: argparse.Namespace):
if args.socket is None:
raise QMPError("No QMP socket path or address given")
self.qmp = QEMUMonitorProtocol(
QEMUMonitorProtocol.parse_address(args.socket)
)
self.qmp.connect()
@classmethod
def register(cls, subparsers: Subparsers) -> None:
"""
Register this command with the argument parser.
:param subparsers: argparse subparsers object, from "add_subparsers".
"""
subparser = subparsers.add_parser(cls.name, help=cls.help,
description=cls.help)
cls.configure_parser(subparser)
@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
"""
Configure a parser with this command's arguments.
:param parser: argparse parser or subparser object.
"""
default_path = os.environ.get('QMP_SOCKET')
parser.add_argument(
'--socket', '-s',
dest='socket',
action='store',
help='QMP socket path or address (addr:port).'
' May also be set via QMP_SOCKET environment variable.',
default=default_path
)
parser.set_defaults(cmd_class=cls)
@classmethod
def add_path_prop_arg(cls, parser: argparse.ArgumentParser) -> None:
"""
Add the <path>.<proptery> positional argument to this command.
:param parser: The parser to add the argument to.
"""
parser.add_argument(
'path_prop',
metavar='<path>.<property>',
action='store',
help="QOM path and property, separated by a period '.'"
)
def run(self) -> int:
"""
Run this command.
:return: 0 on success, 1 otherwise.
"""
raise NotImplementedError
def qom_list(self, path: str) -> List[ObjectPropertyInfo]:
"""
:return: a strongly typed list from the 'qom-list' command.
"""
rsp = self.qmp.command('qom-list', path=path)
# qom-list returns List[ObjectPropertyInfo]
assert isinstance(rsp, list)
return [ObjectPropertyInfo.make(x) for x in rsp]
@classmethod
def command_runner(
cls: Type[CommandT],
args: argparse.Namespace
) -> int:
"""
Run a fully-parsed subcommand, with error-handling for the CLI.
:return: The return code from `.run()`.
"""
try:
cmd = cls(args)
return cmd.run()
except QMPError as err:
print(f"{type(err).__name__}: {err!s}", file=sys.stderr)
return -1
@classmethod
def entry_point(cls) -> int:
"""
Build this command's parser, parse arguments, and run the command.
:return: `run`'s return code.
"""
parser = argparse.ArgumentParser(description=cls.help)
cls.configure_parser(parser)
args = parser.parse_args()
return cls.command_runner(args)

206
python/qemu/qmp/qom_fuse.py Normal file
View File

@ -0,0 +1,206 @@
"""
QEMU Object Model FUSE filesystem tool
This script offers a simple FUSE filesystem within which the QOM tree
may be browsed, queried and edited using traditional shell tooling.
This script requires the 'fusepy' python package.
usage: qom-fuse [-h] [--socket SOCKET] <mount>
Mount a QOM tree as a FUSE filesystem
positional arguments:
<mount> Mount point
optional arguments:
-h, --help show this help message and exit
--socket SOCKET, -s SOCKET
QMP socket path or address (addr:port). May also be
set via QMP_SOCKET environment variable.
"""
##
# Copyright IBM, Corp. 2012
# Copyright (C) 2020 Red Hat, Inc.
#
# Authors:
# Anthony Liguori <aliguori@us.ibm.com>
# Markus Armbruster <armbru@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later.
# See the COPYING file in the top-level directory.
##
import argparse
from errno import ENOENT, EPERM
import stat
import sys
from typing import (
IO,
Dict,
Iterator,
Mapping,
Optional,
Union,
)
import fuse
from fuse import FUSE, FuseOSError, Operations
from . import QMPResponseError
from .qom_common import QOMCommand
fuse.fuse_python_api = (0, 2)
class QOMFuse(QOMCommand, Operations):
"""
QOMFuse implements both fuse.Operations and QOMCommand.
Operations implements the FS, and QOMCommand implements the CLI command.
"""
name = 'fuse'
help = 'Mount a QOM tree as a FUSE filesystem'
fuse: FUSE
@classmethod
def configure_parser(cls, parser: argparse.ArgumentParser) -> None:
super().configure_parser(parser)
parser.add_argument(
'mount',
metavar='<mount>',
action='store',
help="Mount point",
)
def __init__(self, args: argparse.Namespace):
super().__init__(args)
self.mount = args.mount
self.ino_map: Dict[str, int] = {}
self.ino_count = 1
def run(self) -> int:
print(f"Mounting QOMFS to '{self.mount}'", file=sys.stderr)
self.fuse = FUSE(self, self.mount, foreground=True)
return 0
def get_ino(self, path: str) -> int:
"""Get an inode number for a given QOM path."""
if path in self.ino_map:
return self.ino_map[path]
self.ino_map[path] = self.ino_count
self.ino_count += 1
return self.ino_map[path]
def is_object(self, path: str) -> bool:
"""Is the given QOM path an object?"""
try:
self.qom_list(path)
return True
except QMPResponseError:
return False
def is_property(self, path: str) -> bool:
"""Is the given QOM path a property?"""
path, prop = path.rsplit('/', 1)
if path == '':
path = '/'
try:
for item in self.qom_list(path):
if item.name == prop:
return True
return False
except QMPResponseError:
return False
def is_link(self, path: str) -> bool:
"""Is the given QOM path a link?"""
path, prop = path.rsplit('/', 1)
if path == '':
path = '/'
try:
for item in self.qom_list(path):
if item.name == prop and item.link:
return True
return False
except QMPResponseError:
return False
def read(self, path: str, size: int, offset: int, fh: IO[bytes]) -> bytes:
if not self.is_property(path):
raise FuseOSError(ENOENT)
path, prop = path.rsplit('/', 1)
if path == '':
path = '/'
try:
data = str(self.qmp.command('qom-get', path=path, property=prop))
data += '\n' # make values shell friendly
except QMPResponseError as err:
raise FuseOSError(EPERM) from err
if offset > len(data):
return b''
return bytes(data[offset:][:size], encoding='utf-8')
def readlink(self, path: str) -> Union[bool, str]:
if not self.is_link(path):
return False
path, prop = path.rsplit('/', 1)
prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
return prefix + str(self.qmp.command('qom-get', path=path,
property=prop))
def getattr(self, path: str,
fh: Optional[IO[bytes]] = None) -> Mapping[str, object]:
if self.is_link(path):
value = {
'st_mode': 0o755 | stat.S_IFLNK,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 2,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0
}
elif self.is_object(path):
value = {
'st_mode': 0o755 | stat.S_IFDIR,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 2,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0
}
elif self.is_property(path):
value = {
'st_mode': 0o644 | stat.S_IFREG,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 1,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0
}
else:
raise FuseOSError(ENOENT)
return value
def readdir(self, path: str, fh: IO[bytes]) -> Iterator[str]:
yield '.'
yield '..'
for item in self.qom_list(path):
yield item.name

View File

@ -32,11 +32,27 @@ packages =
devel =
avocado-framework >= 87.0
flake8 >= 3.6.0
fusepy >= 2.0.4
isort >= 5.1.2
mypy >= 0.770
pylint >= 2.8.0
tox >= 3.18.0
# Provides qom-fuse functionality
fuse =
fusepy >= 2.0.4
[options.entry_points]
console_scripts =
qom = qemu.qmp.qom:main
qom-set = qemu.qmp.qom:QOMSet.entry_point
qom-get = qemu.qmp.qom:QOMGet.entry_point
qom-list = qemu.qmp.qom:QOMList.entry_point
qom-tree = qemu.qmp.qom:QOMTree.entry_point
qom-fuse = qemu.qmp.qom_fuse:QOMFuse.entry_point [fuse]
qemu-ga-client = qemu.qmp.qemu_ga_client:main
qmp-shell = qemu.qmp.qmp_shell:main
[flake8]
extend-ignore = E722 # Prefer pylint's bare-except checks to flake8's
exclude = __pycache__,
@ -49,6 +65,14 @@ python_version = 3.6
warn_unused_configs = True
namespace_packages = True
[mypy-qemu.qmp.qom_fuse]
# fusepy has no type stubs:
allow_subclassing_any = True
[mypy-fuse]
# fusepy has no type stubs:
ignore_missing_imports = True
[pylint.messages control]
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
@ -70,9 +94,10 @@ good-names=i,
k,
ex,
Run,
_,
fd,
c,
_, # By convention: Unused variable
fh, # fh = open(...)
fd, # fd = os.open(...)
c, # for c in string: ...
[pylint.similarities]
# Ignore imports when computing similarities.
@ -97,6 +122,8 @@ envlist = py36, py37, py38, py39, py310
[testenv]
allowlist_externals = make
deps = .[devel]
deps =
.[devel]
.[fuse] # Workaround to trigger tox venv rebuild
commands =
make check

View File

@ -1,304 +1,11 @@
#!/usr/bin/env python3
# QEMU Guest Agent Client
#
# Copyright (C) 2012 Ryota Ozaki <ozaki.ryota@gmail.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
#
# Usage:
#
# Start QEMU with:
#
# # qemu [...] -chardev socket,path=/tmp/qga.sock,server=on,wait=off,id=qga0 \
# -device virtio-serial -device virtserialport,chardev=qga0,name=org.qemu.guest_agent.0
#
# Run the script:
#
# $ qemu-ga-client --address=/tmp/qga.sock <command> [args...]
#
# or
#
# $ export QGA_CLIENT_ADDRESS=/tmp/qga.sock
# $ qemu-ga-client <command> [args...]
#
# For example:
#
# $ qemu-ga-client cat /etc/resolv.conf
# # Generated by NetworkManager
# nameserver 10.0.2.3
# $ qemu-ga-client fsfreeze status
# thawed
# $ qemu-ga-client fsfreeze freeze
# 2 filesystems frozen
#
# See also: https://wiki.qemu.org/Features/QAPI/GuestAgent
#
import os
import sys
import base64
import random
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu import qmp
class QemuGuestAgent(qmp.QEMUMonitorProtocol):
def __getattr__(self, name):
def wrapper(**kwds):
return self.command('guest-' + name.replace('_', '-'), **kwds)
return wrapper
class QemuGuestAgentClient:
error = QemuGuestAgent.error
def __init__(self, address):
self.qga = QemuGuestAgent(address)
self.qga.connect(negotiate=False)
def sync(self, timeout=3):
# Avoid being blocked forever
if not self.ping(timeout):
raise EnvironmentError('Agent seems not alive')
uid = random.randint(0, (1 << 32) - 1)
while True:
ret = self.qga.sync(id=uid)
if isinstance(ret, int) and int(ret) == uid:
break
def __file_read_all(self, handle):
eof = False
data = ''
while not eof:
ret = self.qga.file_read(handle=handle, count=1024)
_data = base64.b64decode(ret['buf-b64'])
data += _data
eof = ret['eof']
return data
def read(self, path):
handle = self.qga.file_open(path=path)
try:
data = self.__file_read_all(handle)
finally:
self.qga.file_close(handle=handle)
return data
def info(self):
info = self.qga.info()
msgs = []
msgs.append('version: ' + info['version'])
msgs.append('supported_commands:')
enabled = [c['name'] for c in info['supported_commands'] if c['enabled']]
msgs.append('\tenabled: ' + ', '.join(enabled))
disabled = [c['name'] for c in info['supported_commands'] if not c['enabled']]
msgs.append('\tdisabled: ' + ', '.join(disabled))
return '\n'.join(msgs)
def __gen_ipv4_netmask(self, prefixlen):
mask = int('1' * prefixlen + '0' * (32 - prefixlen), 2)
return '.'.join([str(mask >> 24),
str((mask >> 16) & 0xff),
str((mask >> 8) & 0xff),
str(mask & 0xff)])
def ifconfig(self):
nifs = self.qga.network_get_interfaces()
msgs = []
for nif in nifs:
msgs.append(nif['name'] + ':')
if 'ip-addresses' in nif:
for ipaddr in nif['ip-addresses']:
if ipaddr['ip-address-type'] == 'ipv4':
addr = ipaddr['ip-address']
mask = self.__gen_ipv4_netmask(int(ipaddr['prefix']))
msgs.append("\tinet %s netmask %s" % (addr, mask))
elif ipaddr['ip-address-type'] == 'ipv6':
addr = ipaddr['ip-address']
prefix = ipaddr['prefix']
msgs.append("\tinet6 %s prefixlen %s" % (addr, prefix))
if nif['hardware-address'] != '00:00:00:00:00:00':
msgs.append("\tether " + nif['hardware-address'])
return '\n'.join(msgs)
def ping(self, timeout):
self.qga.settimeout(timeout)
try:
self.qga.ping()
except self.qga.timeout:
return False
return True
def fsfreeze(self, cmd):
if cmd not in ['status', 'freeze', 'thaw']:
raise Exception('Invalid command: ' + cmd)
return getattr(self.qga, 'fsfreeze' + '_' + cmd)()
def fstrim(self, minimum=0):
return getattr(self.qga, 'fstrim')(minimum=minimum)
def suspend(self, mode):
if mode not in ['disk', 'ram', 'hybrid']:
raise Exception('Invalid mode: ' + mode)
try:
getattr(self.qga, 'suspend' + '_' + mode)()
# On error exception will raise
except self.qga.timeout:
# On success command will timed out
return
def shutdown(self, mode='powerdown'):
if mode not in ['powerdown', 'halt', 'reboot']:
raise Exception('Invalid mode: ' + mode)
try:
self.qga.shutdown(mode=mode)
except self.qga.timeout:
return
def _cmd_cat(client, args):
if len(args) != 1:
print('Invalid argument')
print('Usage: cat <file>')
sys.exit(1)
print(client.read(args[0]))
def _cmd_fsfreeze(client, args):
usage = 'Usage: fsfreeze status|freeze|thaw'
if len(args) != 1:
print('Invalid argument')
print(usage)
sys.exit(1)
if args[0] not in ['status', 'freeze', 'thaw']:
print('Invalid command: ' + args[0])
print(usage)
sys.exit(1)
cmd = args[0]
ret = client.fsfreeze(cmd)
if cmd == 'status':
print(ret)
elif cmd == 'freeze':
print("%d filesystems frozen" % ret)
else:
print("%d filesystems thawed" % ret)
def _cmd_fstrim(client, args):
if len(args) == 0:
minimum = 0
else:
minimum = int(args[0])
print(client.fstrim(minimum))
def _cmd_ifconfig(client, args):
print(client.ifconfig())
def _cmd_info(client, args):
print(client.info())
def _cmd_ping(client, args):
if len(args) == 0:
timeout = 3
else:
timeout = float(args[0])
alive = client.ping(timeout)
if not alive:
print("Not responded in %s sec" % args[0])
sys.exit(1)
def _cmd_suspend(client, args):
usage = 'Usage: suspend disk|ram|hybrid'
if len(args) != 1:
print('Less argument')
print(usage)
sys.exit(1)
if args[0] not in ['disk', 'ram', 'hybrid']:
print('Invalid command: ' + args[0])
print(usage)
sys.exit(1)
client.suspend(args[0])
def _cmd_shutdown(client, args):
client.shutdown()
_cmd_powerdown = _cmd_shutdown
def _cmd_halt(client, args):
client.shutdown('halt')
def _cmd_reboot(client, args):
client.shutdown('reboot')
commands = [m.replace('_cmd_', '') for m in dir() if '_cmd_' in m]
def main(address, cmd, args):
if not os.path.exists(address):
print('%s not found' % address)
sys.exit(1)
if cmd not in commands:
print('Invalid command: ' + cmd)
print('Available commands: ' + ', '.join(commands))
sys.exit(1)
try:
client = QemuGuestAgentClient(address)
except QemuGuestAgent.error as e:
import errno
print(e)
if e.errno == errno.ECONNREFUSED:
print('Hint: qemu is not running?')
sys.exit(1)
if cmd == 'fsfreeze' and args[0] == 'freeze':
client.sync(60)
elif cmd != 'ping':
client.sync()
globals()['_cmd_' + cmd](client, args)
from qemu.qmp import qemu_ga_client
if __name__ == '__main__':
import sys
import os
import optparse
address = os.environ['QGA_CLIENT_ADDRESS'] if 'QGA_CLIENT_ADDRESS' in os.environ else None
usage = "%prog [--address=<unix_path>|<ipv4_address>] <command> [args...]\n"
usage += '<command>: ' + ', '.join(commands)
parser = optparse.OptionParser(usage=usage)
parser.add_option('--address', action='store', type='string',
default=address, help='Specify a ip:port pair or a unix socket path')
options, args = parser.parse_args()
address = options.address
if address is None:
parser.error('address is not specified')
sys.exit(1)
if len(args) == 0:
parser.error('Less argument')
sys.exit(1)
main(address, args[0], args[1:])
sys.exit(qemu_ga_client.main())

View File

@ -1,459 +1,11 @@
#!/usr/bin/env python3
#
# Low-level QEMU shell on top of QMP.
#
# Copyright (C) 2009, 2010 Red Hat Inc.
#
# Authors:
# Luiz Capitulino <lcapitulino@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2. See
# the COPYING file in the top-level directory.
#
# Usage:
#
# Start QEMU with:
#
# # qemu [...] -qmp unix:./qmp-sock,server
#
# Run the shell:
#
# $ qmp-shell ./qmp-sock
#
# Commands have the following format:
#
# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
#
# For example:
#
# (QEMU) device_add driver=e1000 id=net1
# {u'return': {}}
# (QEMU)
#
# key=value pairs also support Python or JSON object literal subset notations,
# without spaces. Dictionaries/objects {} are supported as are arrays [].
#
# example-command arg-name1={'key':'value','obj'={'prop':"value"}}
#
# Both JSON and Python formatting should work, including both styles of
# string literal quotes. Both paradigms of literal values should work,
# including null/true/false for JSON and None/True/False for Python.
#
#
# Transactions have the following multi-line format:
#
# transaction(
# action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
# ...
# action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
# )
#
# One line transactions are also supported:
#
# transaction( action-name1 ... )
#
# For example:
#
# (QEMU) transaction(
# TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
# TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
# TRANS> )
# {"return": {}}
# (QEMU)
#
# Use the -v and -p options to activate the verbose and pretty-print options,
# which will echo back the properly formatted JSON-compliant QMP that is being
# sent to QEMU, which is useful for debugging and documentation generation.
import json
import ast
import readline
import sys
import os
import errno
import atexit
import re
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu import qmp
from qemu.qmp import qmp_shell
class QMPCompleter(list):
def complete(self, text, state):
for cmd in self:
if cmd.startswith(text):
if not state:
return cmd
else:
state -= 1
class QMPShellError(Exception):
pass
class QMPShellBadPort(QMPShellError):
pass
class FuzzyJSON(ast.NodeTransformer):
'''This extension of ast.NodeTransformer filters literal "true/false/null"
values in an AST and replaces them by proper "True/False/None" values that
Python can properly evaluate.'''
def visit_Name(self, node):
if node.id == 'true':
node.id = 'True'
if node.id == 'false':
node.id = 'False'
if node.id == 'null':
node.id = 'None'
return node
# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
# _execute_cmd()). Let's design a better one.
class QMPShell(qmp.QEMUMonitorProtocol):
def __init__(self, address, pretty=False):
super(QMPShell, self).__init__(self.__get_address(address))
self._greeting = None
self._completer = None
self._pretty = pretty
self._transmode = False
self._actions = list()
self._histfile = os.path.join(os.path.expanduser('~'),
'.qmp-shell_history')
def __get_address(self, arg):
"""
Figure out if the argument is in the port:host form, if it's not it's
probably a file path.
"""
addr = arg.split(':')
if len(addr) == 2:
try:
port = int(addr[1])
except ValueError:
raise QMPShellBadPort
return ( addr[0], port )
# socket path
return arg
def _fill_completion(self):
cmds = self.cmd('query-commands')
if 'error' in cmds:
return
for cmd in cmds['return']:
self._completer.append(cmd['name'])
def __completer_setup(self):
self._completer = QMPCompleter()
self._fill_completion()
readline.set_history_length(1024)
readline.set_completer(self._completer.complete)
readline.parse_and_bind("tab: complete")
# XXX: default delimiters conflict with some command names (eg. query-),
# clearing everything as it doesn't seem to matter
readline.set_completer_delims('')
try:
readline.read_history_file(self._histfile)
except Exception as e:
if isinstance(e, IOError) and e.errno == errno.ENOENT:
# File not found. No problem.
pass
else:
print("Failed to read history '%s'; %s" % (self._histfile, e))
atexit.register(self.__save_history)
def __save_history(self):
try:
readline.write_history_file(self._histfile)
except Exception as e:
print("Failed to save history file '%s'; %s" % (self._histfile, e))
def __parse_value(self, val):
try:
return int(val)
except ValueError:
pass
if val.lower() == 'true':
return True
if val.lower() == 'false':
return False
if val.startswith(('{', '[')):
# Try first as pure JSON:
try:
return json.loads(val)
except ValueError:
pass
# Try once again as FuzzyJSON:
try:
st = ast.parse(val, mode='eval')
return ast.literal_eval(FuzzyJSON().visit(st))
except SyntaxError:
pass
except ValueError:
pass
return val
def __cli_expr(self, tokens, parent):
for arg in tokens:
(key, sep, val) = arg.partition('=')
if sep != '=':
raise QMPShellError("Expected a key=value pair, got '%s'" % arg)
value = self.__parse_value(val)
optpath = key.split('.')
curpath = []
for p in optpath[:-1]:
curpath.append(p)
d = parent.get(p, {})
if type(d) is not dict:
raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
parent[p] = d
parent = d
if optpath[-1] in parent:
if type(parent[optpath[-1]]) is dict:
raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
else:
raise QMPShellError('Cannot set "%s" multiple times' % key)
parent[optpath[-1]] = value
def __build_cmd(self, cmdline):
"""
Build a QMP input object from a user provided command-line in the
following format:
< command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
"""
cmdargs = re.findall(r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+''', cmdline)
# Transactional CLI entry/exit:
if cmdargs[0] == 'transaction(':
self._transmode = True
cmdargs.pop(0)
elif cmdargs[0] == ')' and self._transmode:
self._transmode = False
if len(cmdargs) > 1:
raise QMPShellError("Unexpected input after close of Transaction sub-shell")
qmpcmd = { 'execute': 'transaction',
'arguments': { 'actions': self._actions } }
self._actions = list()
return qmpcmd
# Nothing to process?
if not cmdargs:
return None
# Parse and then cache this Transactional Action
if self._transmode:
finalize = False
action = { 'type': cmdargs[0], 'data': {} }
if cmdargs[-1] == ')':
cmdargs.pop(-1)
finalize = True
self.__cli_expr(cmdargs[1:], action['data'])
self._actions.append(action)
return self.__build_cmd(')') if finalize else None
# Standard command: parse and return it to be executed.
qmpcmd = { 'execute': cmdargs[0], 'arguments': {} }
self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
return qmpcmd
def _print(self, qmp):
indent = None
if self._pretty:
indent = 4
jsobj = json.dumps(qmp, indent=indent, sort_keys=self._pretty)
print(str(jsobj))
def _execute_cmd(self, cmdline):
try:
qmpcmd = self.__build_cmd(cmdline)
except Exception as e:
print('Error while parsing command line: %s' % e)
print('command format: <command-name> ', end=' ')
print('[arg-name1=arg1] ... [arg-nameN=argN]')
return True
# For transaction mode, we may have just cached the action:
if qmpcmd is None:
return True
if self._verbose:
self._print(qmpcmd)
resp = self.cmd_obj(qmpcmd)
if resp is None:
print('Disconnected')
return False
self._print(resp)
return True
def connect(self, negotiate):
self._greeting = super(QMPShell, self).connect(negotiate)
self.__completer_setup()
def show_banner(self, msg='Welcome to the QMP low-level shell!'):
print(msg)
if not self._greeting:
print('Connected')
return
version = self._greeting['QMP']['version']['qemu']
print('Connected to QEMU %d.%d.%d\n' % (version['major'],version['minor'],version['micro']))
def get_prompt(self):
if self._transmode:
return "TRANS> "
return "(QEMU) "
def read_exec_command(self, prompt):
"""
Read and execute a command.
@return True if execution was ok, return False if disconnected.
"""
try:
cmdline = input(prompt)
except EOFError:
print()
return False
if cmdline == '':
for ev in self.get_events():
print(ev)
self.clear_events()
return True
else:
return self._execute_cmd(cmdline)
def set_verbosity(self, verbose):
self._verbose = verbose
class HMPShell(QMPShell):
def __init__(self, address):
QMPShell.__init__(self, address)
self.__cpu_index = 0
def __cmd_completion(self):
for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
if cmd and cmd[0] != '[' and cmd[0] != '\t':
name = cmd.split()[0] # drop help text
if name == 'info':
continue
if name.find('|') != -1:
# Command in the form 'foobar|f' or 'f|foobar', take the
# full name
opt = name.split('|')
if len(opt[0]) == 1:
name = opt[1]
else:
name = opt[0]
self._completer.append(name)
self._completer.append('help ' + name) # help completion
def __info_completion(self):
for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
if cmd:
self._completer.append('info ' + cmd.split()[1])
def __other_completion(self):
# special cases
self._completer.append('help info')
def _fill_completion(self):
self.__cmd_completion()
self.__info_completion()
self.__other_completion()
def __cmd_passthrough(self, cmdline, cpu_index = 0):
return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments':
{ 'command-line': cmdline,
'cpu-index': cpu_index } })
def _execute_cmd(self, cmdline):
if cmdline.split()[0] == "cpu":
# trap the cpu command, it requires special setting
try:
idx = int(cmdline.split()[1])
if not 'return' in self.__cmd_passthrough('info version', idx):
print('bad CPU index')
return True
self.__cpu_index = idx
except ValueError:
print('cpu command takes an integer argument')
return True
resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
if resp is None:
print('Disconnected')
return False
assert 'return' in resp or 'error' in resp
if 'return' in resp:
# Success
if len(resp['return']) > 0:
print(resp['return'], end=' ')
else:
# Error
print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
return True
def show_banner(self):
QMPShell.show_banner(self, msg='Welcome to the HMP shell!')
def die(msg):
sys.stderr.write('ERROR: %s\n' % msg)
sys.exit(1)
def fail_cmdline(option=None):
if option:
sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option)
sys.stderr.write('qmp-shell [ -v ] [ -p ] [ -H ] [ -N ] < UNIX socket path> | < TCP address:port >\n')
sys.stderr.write(' -v Verbose (echo command sent and received)\n')
sys.stderr.write(' -p Pretty-print JSON\n')
sys.stderr.write(' -H Use HMP interface\n')
sys.stderr.write(' -N Skip negotiate (for qemu-ga)\n')
sys.exit(1)
def main():
addr = ''
qemu = None
hmp = False
pretty = False
verbose = False
negotiate = True
try:
for arg in sys.argv[1:]:
if arg == "-H":
if qemu is not None:
fail_cmdline(arg)
hmp = True
elif arg == "-p":
pretty = True
elif arg == "-N":
negotiate = False
elif arg == "-v":
verbose = True
else:
if qemu is not None:
fail_cmdline(arg)
if hmp:
qemu = HMPShell(arg)
else:
qemu = QMPShell(arg, pretty)
addr = arg
if qemu is None:
fail_cmdline()
except QMPShellBadPort:
die('bad port number in command-line')
try:
qemu.connect(negotiate)
except qmp.QMPConnectError:
die('Didn\'t get QMP greeting message')
except qmp.QMPCapabilitiesError:
die('Could not negotiate capabilities')
except qemu.error:
die('Could not connect to %s' % addr)
qemu.show_banner()
qemu.set_verbosity(verbose)
while qemu.read_exec_command(qemu.get_prompt()):
pass
qemu.close()
if __name__ == '__main__':
main()
qmp_shell.main()

View File

@ -1,147 +1,11 @@
#!/usr/bin/env python3
##
# QEMU Object Model test tools
#
# Copyright IBM, Corp. 2012
# Copyright (C) 2020 Red Hat, Inc.
#
# Authors:
# Anthony Liguori <aliguori@us.ibm.com>
# Markus Armbruster <armbru@redhat.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later. See
# the COPYING file in the top-level directory.
##
import fuse, stat
from fuse import FUSE, FuseOSError, Operations
import os, posix, sys
from errno import *
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.qmp import QEMUMonitorProtocol
from qemu.qmp.qom_fuse import QOMFuse
fuse.fuse_python_api = (0, 2)
class QOMFS(Operations):
def __init__(self, qmp):
self.qmp = qmp
self.qmp.connect()
self.ino_map = {}
self.ino_count = 1
def get_ino(self, path):
if path in self.ino_map:
return self.ino_map[path]
self.ino_map[path] = self.ino_count
self.ino_count += 1
return self.ino_map[path]
def is_object(self, path):
try:
items = self.qmp.command('qom-list', path=path)
return True
except:
return False
def is_property(self, path):
path, prop = path.rsplit('/', 1)
if path == '':
path = '/'
try:
for item in self.qmp.command('qom-list', path=path):
if item['name'] == prop:
return True
return False
except:
return False
def is_link(self, path):
path, prop = path.rsplit('/', 1)
if path == '':
path = '/'
try:
for item in self.qmp.command('qom-list', path=path):
if item['name'] == prop:
if item['type'].startswith('link<'):
return True
return False
return False
except:
return False
def read(self, path, length, offset, fh):
if not self.is_property(path):
return -ENOENT
path, prop = path.rsplit('/', 1)
if path == '':
path = '/'
try:
data = self.qmp.command('qom-get', path=path, property=prop)
data += '\n' # make values shell friendly
except:
raise FuseOSError(EPERM)
if offset > len(data):
return ''
return bytes(data[offset:][:length], encoding='utf-8')
def readlink(self, path):
if not self.is_link(path):
return False
path, prop = path.rsplit('/', 1)
prefix = '/'.join(['..'] * (len(path.split('/')) - 1))
return prefix + str(self.qmp.command('qom-get', path=path,
property=prop))
def getattr(self, path, fh=None):
if self.is_link(path):
value = { 'st_mode': 0o755 | stat.S_IFLNK,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 2,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0 }
elif self.is_object(path):
value = { 'st_mode': 0o755 | stat.S_IFDIR,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 2,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0 }
elif self.is_property(path):
value = { 'st_mode': 0o644 | stat.S_IFREG,
'st_ino': self.get_ino(path),
'st_dev': 0,
'st_nlink': 1,
'st_uid': 1000,
'st_gid': 1000,
'st_size': 4096,
'st_atime': 0,
'st_mtime': 0,
'st_ctime': 0 }
else:
raise FuseOSError(ENOENT)
return value
def readdir(self, path, fh):
yield '.'
yield '..'
for item in self.qmp.command('qom-list', path=path):
yield str(item['name'])
if __name__ == '__main__':
import os
fuse = FUSE(QOMFS(QEMUMonitorProtocol(os.environ['QMP_SOCKET'])),
sys.argv[1], foreground=True)
sys.exit(QOMFuse.entry_point())

View File

@ -1,69 +1,11 @@
#!/usr/bin/env python3
##
# QEMU Object Model test tools
#
# Copyright IBM, Corp. 2011
#
# Authors:
# Anthony Liguori <aliguori@us.ibm.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later. See
# the COPYING file in the top-level directory.
##
import sys
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.qmp import QEMUMonitorProtocol
from qemu.qmp.qom import QOMGet
cmd, args = sys.argv[0], sys.argv[1:]
socket_path = None
path = None
prop = None
def usage():
return '''environment variables:
QMP_SOCKET=<path | addr:port>
usage:
%s [-h] [-s <QMP socket path | addr:port>] <path>.<property>
''' % cmd
def usage_error(error_msg = "unspecified error"):
sys.stderr.write('%s\nERROR: %s\n' % (usage(), error_msg))
exit(1)
if len(args) > 0:
if args[0] == "-h":
print(usage())
exit(0);
elif args[0] == "-s":
try:
socket_path = args[1]
except:
usage_error("missing argument: QMP socket path or address");
args = args[2:]
if not socket_path:
if 'QMP_SOCKET' in os.environ:
socket_path = os.environ['QMP_SOCKET']
else:
usage_error("no QMP socket path or address given");
if len(args) > 0:
try:
path, prop = args[0].rsplit('.', 1)
except:
usage_error("invalid format for path/property/value")
else:
usage_error("not enough arguments")
srv = QEMUMonitorProtocol(socket_path)
srv.connect()
rsp = srv.command('qom-get', path=path, property=prop)
if type(rsp) == dict:
for i in rsp.keys():
print('%s: %s' % (i, rsp[i]))
else:
print(rsp)
if __name__ == '__main__':
sys.exit(QOMGet.entry_point())

View File

@ -1,66 +1,11 @@
#!/usr/bin/env python3
##
# QEMU Object Model test tools
#
# Copyright IBM, Corp. 2011
#
# Authors:
# Anthony Liguori <aliguori@us.ibm.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later. See
# the COPYING file in the top-level directory.
##
import sys
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.qmp import QEMUMonitorProtocol
from qemu.qmp.qom import QOMList
cmd, args = sys.argv[0], sys.argv[1:]
socket_path = None
path = None
prop = None
def usage():
return '''environment variables:
QMP_SOCKET=<path | addr:port>
usage:
%s [-h] [-s <QMP socket path | addr:port>] [<path>]
''' % cmd
def usage_error(error_msg = "unspecified error"):
sys.stderr.write('%s\nERROR: %s\n' % (usage(), error_msg))
exit(1)
if len(args) > 0:
if args[0] == "-h":
print(usage())
exit(0);
elif args[0] == "-s":
try:
socket_path = args[1]
except:
usage_error("missing argument: QMP socket path or address");
args = args[2:]
if not socket_path:
if 'QMP_SOCKET' in os.environ:
socket_path = os.environ['QMP_SOCKET']
else:
usage_error("no QMP socket path or address given");
srv = QEMUMonitorProtocol(socket_path)
srv.connect()
if len(args) == 0:
print('/')
sys.exit(0)
for item in srv.command('qom-list', path=args[0]):
if item['type'].startswith('child<'):
print('%s/' % item['name'])
elif item['type'].startswith('link<'):
print('@%s/' % item['name'])
else:
print('%s' % item['name'])
if __name__ == '__main__':
sys.exit(QOMList.entry_point())

View File

@ -1,66 +1,11 @@
#!/usr/bin/env python3
##
# QEMU Object Model test tools
#
# Copyright IBM, Corp. 2011
#
# Authors:
# Anthony Liguori <aliguori@us.ibm.com>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later. See
# the COPYING file in the top-level directory.
##
import sys
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.qmp import QEMUMonitorProtocol
from qemu.qmp.qom import QOMSet
cmd, args = sys.argv[0], sys.argv[1:]
socket_path = None
path = None
prop = None
value = None
def usage():
return '''environment variables:
QMP_SOCKET=<path | addr:port>
usage:
%s [-h] [-s <QMP socket path | addr:port>] <path>.<property> <value>
''' % cmd
def usage_error(error_msg = "unspecified error"):
sys.stderr.write('%s\nERROR: %s\n' % (usage(), error_msg))
exit(1)
if len(args) > 0:
if args[0] == "-h":
print(usage())
exit(0);
elif args[0] == "-s":
try:
socket_path = args[1]
except:
usage_error("missing argument: QMP socket path or address");
args = args[2:]
if not socket_path:
if 'QMP_SOCKET' in os.environ:
socket_path = os.environ['QMP_SOCKET']
else:
usage_error("no QMP socket path or address given");
if len(args) > 1:
try:
path, prop = args[0].rsplit('.', 1)
except:
usage_error("invalid format for path/property/value")
value = args[1]
else:
usage_error("not enough arguments")
srv = QEMUMonitorProtocol(socket_path)
srv.connect()
print(srv.command('qom-set', path=path, property=prop, value=value))
if __name__ == '__main__':
sys.exit(QOMSet.entry_point())

View File

@ -1,77 +1,11 @@
#!/usr/bin/env python3
##
# QEMU Object Model test tools
#
# Copyright IBM, Corp. 2011
# Copyright (c) 2013 SUSE LINUX Products GmbH
#
# Authors:
# Anthony Liguori <aliguori@amazon.com>
# Andreas Faerber <afaerber@suse.de>
#
# This work is licensed under the terms of the GNU GPL, version 2 or later. See
# the COPYING file in the top-level directory.
##
import sys
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
from qemu.qmp import QEMUMonitorProtocol
from qemu.qmp.qom import QOMTree
cmd, args = sys.argv[0], sys.argv[1:]
socket_path = None
path = None
prop = None
def usage():
return '''environment variables:
QMP_SOCKET=<path | addr:port>
usage:
%s [-h] [-s <QMP socket path | addr:port>] [<path>]
''' % cmd
def usage_error(error_msg = "unspecified error"):
sys.stderr.write('%s\nERROR: %s\n' % (usage(), error_msg))
exit(1)
if len(args) > 0:
if args[0] == "-h":
print(usage())
exit(0);
elif args[0] == "-s":
try:
socket_path = args[1]
except:
usage_error("missing argument: QMP socket path or address");
args = args[2:]
if not socket_path:
if 'QMP_SOCKET' in os.environ:
socket_path = os.environ['QMP_SOCKET']
else:
usage_error("no QMP socket path or address given");
srv = QEMUMonitorProtocol(socket_path)
srv.connect()
def list_node(path):
print('%s' % path)
items = srv.command('qom-list', path=path)
for item in items:
if not item['type'].startswith('child<'):
try:
print(' %s: %s (%s)' % (item['name'], srv.command('qom-get', path=path, property=item['name']), item['type']))
except:
print(' %s: <EXCEPTION> (%s)' % (item['name'], item['type']))
print('')
for item in items:
if item['type'].startswith('child<'):
list_node((path if (path != '/') else '') + '/' + item['name'])
if len(args) == 0:
path = '/'
else:
path = args[0]
list_node(path)
if __name__ == '__main__':
sys.exit(QOMTree.entry_point())