migration/next for 20170718

-----BEGIN PGP SIGNATURE-----
 
 iQIcBAABCAAGBQJZbiu1AAoJEPSH7xhYctcjIzgP/RvewJbZbofe7kEiQDrueWp2
 Jod3YFPhzoc+UgwzkKM0GFbCeyuyoC+b/mXiJrUcVcnrnKpF+hN/uPK7MjvCBfmb
 wkY1BX6dhYch48YzBWZHXL6y/PkS0rW2Nt7blDl6h0zr0xdIe85MPr1Z9vIkDYbI
 dTf5Mrw9axseoAi3ydZNh5pSkLmbVjNbh1S3y89XS0eZwExliBILKBOqt/liuQxd
 y1FDtliB7GWFq1Ykg2+lZgPcdiPZv1zRkH6n8jbpRLTXGYRx5OyA+FZOQiPV52wf
 leWB38NqyBkFJ9+mdcrA0x6FpVSLJLqKuZcnLVvN7fvvNF6EiB0ozRPVQpa3rSvs
 h339Ouk4GsdmRobhMrptpsPSxM+6c9bhvT2fCBZ6slwbxEdqEYk3+Xm5VGQkiOQ/
 l4a1fHLdwuIKXMGI8zTao+UW/3JZaU/BaOONyO0BJY+I/tde559eJWVHi1wMDJWV
 DOg22PF5Ux1tnr4+MbjXABCr9lUafQxGacVfNFLh0z2/GLe3Vvx7dBgvPZFIMS3W
 YPc/AqOvM2Rlwvi8jlsVM29lTGl9/YjB9EH99M1l30M0RNifcSLxJJoDBhKtB/d2
 CctmTS0b+/jDSwf2BiniqLNpLt33QdY7so9LFnsQWTq4whYxPKLjcivTPHX+mQjy
 Uys+Hyhhps81I2zbBHtx
 =75E9
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/juanquintela/tags/migration/20170718' into staging

migration/next for 20170718

# gpg: Signature made Tue 18 Jul 2017 16:39:33 BST
# gpg:                using RSA key 0xF487EF185872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>"
# gpg:                 aka "Juan Quintela <quintela@trasno.org>"
# gpg: WARNING: This key is not certified with a trusted signature!
# gpg:          There is no indication that the signature belongs to the owner.
# Primary key fingerprint: 1899 FF8E DEBF 58CC EE03  4B82 F487 EF18 5872 D723

* remotes/juanquintela/tags/migration/20170718:
  migration: check global caps for validity
  migration: provide migrate_cap_add()
  migration: provide migrate_caps_check()
  migration: remove check against colo support
  migration: check global params for validity
  migration: provide migrate_params_apply()
  migration: introduce migrate_params_check()
  migration: export capabilities to props
  migration: export parameters to props
  qdev: provide DEFINE_PROP_INT64()
  migration/rdma: Send error during cancelling
  migration/rdma: Safely convert control types
  migration/rdma: Allow cancelling while waiting for wrid
  migration/rdma: fix qemu_rdma_block_for_wrid error paths
  migration: Close file on failed migration load
  migration/rdma: Fix race on source

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2017-07-19 12:30:41 +01:00
commit 988879b66e
6 changed files with 346 additions and 106 deletions

View File

@ -404,6 +404,31 @@ static void set_uint64(Object *obj, Visitor *v, const char *name,
visit_type_uint64(v, name, ptr, errp);
}
static void get_int64(Object *obj, Visitor *v, const char *name,
void *opaque, Error **errp)
{
DeviceState *dev = DEVICE(obj);
Property *prop = opaque;
int64_t *ptr = qdev_get_prop_ptr(dev, prop);
visit_type_int64(v, name, ptr, errp);
}
static void set_int64(Object *obj, Visitor *v, const char *name,
void *opaque, Error **errp)
{
DeviceState *dev = DEVICE(obj);
Property *prop = opaque;
int64_t *ptr = qdev_get_prop_ptr(dev, prop);
if (dev->realized) {
qdev_prop_set_after_realize(dev, name, errp);
return;
}
visit_type_int64(v, name, ptr, errp);
}
const PropertyInfo qdev_prop_uint64 = {
.name = "uint64",
.get = get_uint64,
@ -411,6 +436,13 @@ const PropertyInfo qdev_prop_uint64 = {
.set_default_value = set_default_value_uint,
};
const PropertyInfo qdev_prop_int64 = {
.name = "int64",
.get = get_int64,
.set = set_int64,
.set_default_value = set_default_value_int,
};
/* --- string --- */
static void release_string(Object *obj, const char *name, void *opaque)

View File

@ -13,6 +13,7 @@ extern const PropertyInfo qdev_prop_uint16;
extern const PropertyInfo qdev_prop_uint32;
extern const PropertyInfo qdev_prop_int32;
extern const PropertyInfo qdev_prop_uint64;
extern const PropertyInfo qdev_prop_int64;
extern const PropertyInfo qdev_prop_size;
extern const PropertyInfo qdev_prop_string;
extern const PropertyInfo qdev_prop_chr;
@ -157,6 +158,8 @@ extern const PropertyInfo qdev_prop_link;
DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_int32, int32_t)
#define DEFINE_PROP_UINT64(_n, _s, _f, _d) \
DEFINE_PROP_UNSIGNED(_n, _s, _f, _d, qdev_prop_uint64, uint64_t)
#define DEFINE_PROP_INT64(_n, _s, _f, _d) \
DEFINE_PROP_SIGNED(_n, _s, _f, _d, qdev_prop_int64, int64_t)
#define DEFINE_PROP_SIZE(_n, _s, _f, _d) \
DEFINE_PROP_UNSIGNED(_n, _s, _f, _d, qdev_prop_size, uint64_t)
#define DEFINE_PROP_PCI_DEVFN(_n, _s, _f, _d) \

View File

@ -15,7 +15,6 @@
#include "qemu-common.h"
bool colo_supported(void);
void colo_info_init(void);
void migrate_start_colo_process(MigrationState *s);

View File

@ -29,11 +29,6 @@ static bool vmstate_loading;
#define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
bool colo_supported(void)
{
return true;
}
bool migration_in_colo_state(void)
{
MigrationState *s = migrate_get_current();

View File

@ -102,14 +102,22 @@ enum mig_rp_message_type {
static MigrationState *current_migration;
static bool migration_object_check(MigrationState *ms, Error **errp);
void migration_object_init(void)
{
MachineState *ms = MACHINE(qdev_get_machine());
Error *err = NULL;
/* This can only be called once. */
assert(!current_migration);
current_migration = MIGRATION_OBJ(object_new(TYPE_MIGRATION));
if (!migration_object_check(current_migration, &err)) {
error_report_err(err);
exit(1);
}
/*
* We cannot really do this in migration_instance_init() since at
* that time global properties are not yet applied, then this
@ -348,6 +356,7 @@ static void process_incoming_migration_co(void *opaque)
migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
MIGRATION_STATUS_FAILED);
error_report("load of migration failed: %s", strerror(-ret));
qemu_fclose(mis->from_src_file);
exit(EXIT_FAILURE);
}
mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
@ -403,9 +412,6 @@ MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp)
continue;
}
#endif
if (i == MIGRATION_CAPABILITY_X_COLO && !colo_supported()) {
continue;
}
if (head == NULL) {
head = g_malloc0(sizeof(*caps));
caps = head;
@ -582,51 +588,49 @@ MigrationInfo *qmp_query_migrate(Error **errp)
return info;
}
void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
Error **errp)
/**
* @migration_caps_check - check capability validity
*
* @cap_list: old capability list, array of bool
* @params: new capabilities to be applied soon
* @errp: set *errp if the check failed, with reason
*
* Returns true if check passed, otherwise false.
*/
static bool migrate_caps_check(bool *cap_list,
MigrationCapabilityStatusList *params,
Error **errp)
{
MigrationState *s = migrate_get_current();
MigrationCapabilityStatusList *cap;
bool old_postcopy_cap = migrate_postcopy_ram();
bool old_postcopy_cap;
if (migration_is_setup_or_active(s->state)) {
error_setg(errp, QERR_MIGRATION_ACTIVE);
return;
}
old_postcopy_cap = cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM];
for (cap = params; cap; cap = cap->next) {
#ifndef CONFIG_LIVE_BLOCK_MIGRATION
if (cap->value->capability == MIGRATION_CAPABILITY_BLOCK
&& cap->value->state) {
error_setg(errp, "QEMU compiled without old-style (blk/-b, inc/-i) "
"block migration");
error_append_hint(errp, "Use drive_mirror+NBD instead.\n");
continue;
}
#endif
if (cap->value->capability == MIGRATION_CAPABILITY_X_COLO) {
if (!colo_supported()) {
error_setg(errp, "COLO is not currently supported, please"
" configure with --enable-colo option in order to"
" support COLO feature");
continue;
}
}
s->enabled_capabilities[cap->value->capability] = cap->value->state;
cap_list[cap->value->capability] = cap->value->state;
}
if (migrate_postcopy_ram()) {
if (migrate_use_compression()) {
#ifndef CONFIG_LIVE_BLOCK_MIGRATION
if (cap_list[MIGRATION_CAPABILITY_BLOCK]) {
error_setg(errp, "QEMU compiled without old-style (blk/-b, inc/-i) "
"block migration");
error_append_hint(errp, "Use drive_mirror+NBD instead.\n");
return false;
}
#endif
if (cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM]) {
if (cap_list[MIGRATION_CAPABILITY_COMPRESS]) {
/* The decompression threads asynchronously write into RAM
* rather than use the atomic copies needed to avoid
* userfaulting. It should be possible to fix the decompression
* threads for compatibility in future.
*/
error_report("Postcopy is not currently compatible with "
"compression");
s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_RAM] =
false;
error_setg(errp, "Postcopy is not currently compatible "
"with compression");
return false;
}
/* This check is reasonably expensive, so only when it's being
* set the first time, also it's only the destination that needs
* special support.
@ -636,96 +640,141 @@ void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
/* postcopy_ram_supported_by_host will have emitted a more
* detailed message
*/
error_report("Postcopy is not supported");
s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_RAM] =
false;
error_setg(errp, "Postcopy is not supported");
return false;
}
}
return true;
}
void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
Error **errp)
{
MigrationState *s = migrate_get_current();
MigrationCapabilityStatusList *cap;
if (migration_is_setup_or_active(s->state)) {
error_setg(errp, QERR_MIGRATION_ACTIVE);
return;
}
if (!migrate_caps_check(s->enabled_capabilities, params, errp)) {
return;
}
for (cap = params; cap; cap = cap->next) {
s->enabled_capabilities[cap->value->capability] = cap->value->state;
}
}
void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
/*
* Check whether the parameters are valid. Error will be put into errp
* (if provided). Return true if valid, otherwise false.
*/
static bool migrate_params_check(MigrationParameters *params, Error **errp)
{
MigrationState *s = migrate_get_current();
if (params->has_compress_level &&
(params->compress_level < 0 || params->compress_level > 9)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
"is invalid, it should be in the range of 0 to 9");
return;
return false;
}
if (params->has_compress_threads &&
(params->compress_threads < 1 || params->compress_threads > 255)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
"compress_threads",
"is invalid, it should be in the range of 1 to 255");
return;
return false;
}
if (params->has_decompress_threads &&
(params->decompress_threads < 1 || params->decompress_threads > 255)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
"decompress_threads",
"is invalid, it should be in the range of 1 to 255");
return;
return false;
}
if (params->has_cpu_throttle_initial &&
(params->cpu_throttle_initial < 1 ||
params->cpu_throttle_initial > 99)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
"cpu_throttle_initial",
"an integer in the range of 1 to 99");
return;
return false;
}
if (params->has_cpu_throttle_increment &&
(params->cpu_throttle_increment < 1 ||
params->cpu_throttle_increment > 99)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
"cpu_throttle_increment",
"an integer in the range of 1 to 99");
return;
return false;
}
if (params->has_max_bandwidth &&
(params->max_bandwidth < 0 || params->max_bandwidth > SIZE_MAX)) {
error_setg(errp, "Parameter 'max_bandwidth' expects an integer in the"
" range of 0 to %zu bytes/second", SIZE_MAX);
return;
return false;
}
if (params->has_downtime_limit &&
(params->downtime_limit < 0 ||
params->downtime_limit > MAX_MIGRATE_DOWNTIME)) {
error_setg(errp, "Parameter 'downtime_limit' expects an integer in "
"the range of 0 to %d milliseconds",
MAX_MIGRATE_DOWNTIME);
return;
return false;
}
if (params->has_x_checkpoint_delay && (params->x_checkpoint_delay < 0)) {
error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
"x_checkpoint_delay",
"is invalid, it should be positive");
return false;
}
return true;
}
static void migrate_params_apply(MigrationParameters *params)
{
MigrationState *s = migrate_get_current();
if (params->has_compress_level) {
s->parameters.compress_level = params->compress_level;
}
if (params->has_compress_threads) {
s->parameters.compress_threads = params->compress_threads;
}
if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
if (params->has_cpu_throttle_initial) {
s->parameters.cpu_throttle_initial = params->cpu_throttle_initial;
}
if (params->has_cpu_throttle_increment) {
s->parameters.cpu_throttle_increment = params->cpu_throttle_increment;
}
if (params->has_tls_creds) {
g_free(s->parameters.tls_creds);
s->parameters.tls_creds = g_strdup(params->tls_creds);
}
if (params->has_tls_hostname) {
g_free(s->parameters.tls_hostname);
s->parameters.tls_hostname = g_strdup(params->tls_hostname);
}
if (params->has_max_bandwidth) {
s->parameters.max_bandwidth = params->max_bandwidth;
if (s->to_dst_file) {
@ -733,6 +782,7 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
s->parameters.max_bandwidth / XFER_LIMIT_RATIO);
}
}
if (params->has_downtime_limit) {
s->parameters.downtime_limit = params->downtime_limit;
}
@ -743,11 +793,22 @@ void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
colo_checkpoint_notify(s);
}
}
if (params->has_block_incremental) {
s->parameters.block_incremental = params->block_incremental;
}
}
void qmp_migrate_set_parameters(MigrationParameters *params, Error **errp)
{
if (!migrate_params_check(params, errp)) {
/* Invalid parameter */
return;
}
migrate_params_apply(params);
}
void qmp_migrate_start_postcopy(Error **errp)
{
@ -781,14 +842,27 @@ void migrate_set_state(int *state, int old_state, int new_state)
}
}
void migrate_set_block_enabled(bool value, Error **errp)
static MigrationCapabilityStatusList *migrate_cap_add(
MigrationCapabilityStatusList *list,
MigrationCapability index,
bool state)
{
MigrationCapabilityStatusList *cap;
cap = g_new0(MigrationCapabilityStatusList, 1);
cap->value = g_new0(MigrationCapabilityStatus, 1);
cap->value->capability = MIGRATION_CAPABILITY_BLOCK;
cap->value->state = value;
cap->value->capability = index;
cap->value->state = state;
cap->next = list;
return cap;
}
void migrate_set_block_enabled(bool value, Error **errp)
{
MigrationCapabilityStatusList *cap;
cap = migrate_cap_add(NULL, MIGRATION_CAPABILITY_BLOCK, value);
qmp_migrate_set_capabilities(cap, errp);
qapi_free_MigrationCapabilityStatusList(cap);
}
@ -2001,6 +2075,9 @@ void migration_global_dump(Monitor *mon)
ms->send_configuration, ms->send_section_footer);
}
#define DEFINE_PROP_MIG_CAP(name, x) \
DEFINE_PROP_BOOL(name, MigrationState, enabled_capabilities[x], false)
static Property migration_properties[] = {
DEFINE_PROP_BOOL("store-global-state", MigrationState,
store_global_state, true),
@ -2009,6 +2086,45 @@ static Property migration_properties[] = {
send_configuration, true),
DEFINE_PROP_BOOL("send-section-footer", MigrationState,
send_section_footer, true),
/* Migration parameters */
DEFINE_PROP_INT64("x-compress-level", MigrationState,
parameters.compress_level,
DEFAULT_MIGRATE_COMPRESS_LEVEL),
DEFINE_PROP_INT64("x-compress-threads", MigrationState,
parameters.compress_threads,
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
DEFINE_PROP_INT64("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
DEFINE_PROP_INT64("x-cpu-throttle-initial", MigrationState,
parameters.cpu_throttle_initial,
DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL),
DEFINE_PROP_INT64("x-cpu-throttle-increment", MigrationState,
parameters.cpu_throttle_increment,
DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT),
DEFINE_PROP_INT64("x-max-bandwidth", MigrationState,
parameters.max_bandwidth, MAX_THROTTLE),
DEFINE_PROP_INT64("x-downtime-limit", MigrationState,
parameters.downtime_limit,
DEFAULT_MIGRATE_SET_DOWNTIME),
DEFINE_PROP_INT64("x-checkpoint-delay", MigrationState,
parameters.x_checkpoint_delay,
DEFAULT_MIGRATE_X_CHECKPOINT_DELAY),
/* Migration capabilities */
DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
DEFINE_PROP_MIG_CAP("x-rdma-pin-all", MIGRATION_CAPABILITY_RDMA_PIN_ALL),
DEFINE_PROP_MIG_CAP("x-auto-converge", MIGRATION_CAPABILITY_AUTO_CONVERGE),
DEFINE_PROP_MIG_CAP("x-zero-blocks", MIGRATION_CAPABILITY_ZERO_BLOCKS),
DEFINE_PROP_MIG_CAP("x-compress", MIGRATION_CAPABILITY_COMPRESS),
DEFINE_PROP_MIG_CAP("x-events", MIGRATION_CAPABILITY_EVENTS),
DEFINE_PROP_MIG_CAP("x-postcopy-ram", MIGRATION_CAPABILITY_POSTCOPY_RAM),
DEFINE_PROP_MIG_CAP("x-colo", MIGRATION_CAPABILITY_X_COLO),
DEFINE_PROP_MIG_CAP("x-release-ram", MIGRATION_CAPABILITY_RELEASE_RAM),
DEFINE_PROP_MIG_CAP("x-block", MIGRATION_CAPABILITY_BLOCK),
DEFINE_PROP_MIG_CAP("x-return-path", MIGRATION_CAPABILITY_RETURN_PATH),
DEFINE_PROP_END_OF_LIST(),
};
@ -2023,22 +2139,54 @@ static void migration_class_init(ObjectClass *klass, void *data)
static void migration_instance_init(Object *obj)
{
MigrationState *ms = MIGRATION_OBJ(obj);
MigrationParameters *params = &ms->parameters;
ms->state = MIGRATION_STATUS_NONE;
ms->xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE;
ms->mbps = -1;
ms->parameters = (MigrationParameters) {
.compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
.compress_threads = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
.decompress_threads = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
.cpu_throttle_initial = DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL,
.cpu_throttle_increment = DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT,
.max_bandwidth = MAX_THROTTLE,
.downtime_limit = DEFAULT_MIGRATE_SET_DOWNTIME,
.x_checkpoint_delay = DEFAULT_MIGRATE_X_CHECKPOINT_DELAY,
};
ms->parameters.tls_creds = g_strdup("");
ms->parameters.tls_hostname = g_strdup("");
params->tls_hostname = g_strdup("");
params->tls_creds = g_strdup("");
/* Set has_* up only for parameter checks */
params->has_compress_level = true;
params->has_compress_threads = true;
params->has_decompress_threads = true;
params->has_cpu_throttle_initial = true;
params->has_cpu_throttle_increment = true;
params->has_max_bandwidth = true;
params->has_downtime_limit = true;
params->has_x_checkpoint_delay = true;
params->has_block_incremental = true;
}
/*
* Return true if check pass, false otherwise. Error will be put
* inside errp if provided.
*/
static bool migration_object_check(MigrationState *ms, Error **errp)
{
MigrationCapabilityStatusList *head = NULL;
/* Assuming all off */
bool cap_list[MIGRATION_CAPABILITY__MAX] = { 0 }, ret;
int i;
if (!migrate_params_check(&ms->parameters, errp)) {
return false;
}
for (i = 0; i < MIGRATION_CAPABILITY__MAX; i++) {
if (ms->enabled_capabilities[i]) {
head = migrate_cap_add(head, i, true);
}
}
ret = migrate_caps_check(cap_list, head, errp);
/* It works with head == NULL */
qapi_free_MigrationCapabilityStatusList(head);
return ret;
}
static const TypeInfo migration_type = {

View File

@ -165,20 +165,6 @@ enum {
RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
};
static const char *control_desc[] = {
[RDMA_CONTROL_NONE] = "NONE",
[RDMA_CONTROL_ERROR] = "ERROR",
[RDMA_CONTROL_READY] = "READY",
[RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
[RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
[RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
[RDMA_CONTROL_COMPRESS] = "COMPRESS",
[RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
[RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
[RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
[RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
[RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
};
/*
* Memory and MR structures used to represent an IB Send/Recv work request.
@ -251,6 +237,30 @@ typedef struct QEMU_PACKED RDMADestBlock {
uint32_t padding;
} RDMADestBlock;
static const char *control_desc(unsigned int rdma_control)
{
static const char *strs[] = {
[RDMA_CONTROL_NONE] = "NONE",
[RDMA_CONTROL_ERROR] = "ERROR",
[RDMA_CONTROL_READY] = "READY",
[RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
[RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
[RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
[RDMA_CONTROL_COMPRESS] = "COMPRESS",
[RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
[RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
[RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
[RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
[RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
};
if (rdma_control > RDMA_CONTROL_UNREGISTER_FINISHED) {
return "??BAD CONTROL VALUE??";
}
return strs[rdma_control];
}
static uint64_t htonll(uint64_t v)
{
union { uint32_t lv[2]; uint64_t llv; } u;
@ -1466,6 +1476,56 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
return 0;
}
/* Wait for activity on the completion channel.
* Returns 0 on success, none-0 on error.
*/
static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
{
/*
* Coroutine doesn't start until migration_fd_process_incoming()
* so don't yield unless we know we're running inside of a coroutine.
*/
if (rdma->migration_started_on_destination) {
yield_until_fd_readable(rdma->comp_channel->fd);
} else {
/* This is the source side, we're in a separate thread
* or destination prior to migration_fd_process_incoming()
* we can't yield; so we have to poll the fd.
* But we need to be able to handle 'cancel' or an error
* without hanging forever.
*/
while (!rdma->error_state && !rdma->received_error) {
GPollFD pfds[1];
pfds[0].fd = rdma->comp_channel->fd;
pfds[0].events = G_IO_IN | G_IO_HUP | G_IO_ERR;
/* 0.1s timeout, should be fine for a 'cancel' */
switch (qemu_poll_ns(pfds, 1, 100 * 1000 * 1000)) {
case 1: /* fd active */
return 0;
case 0: /* Timeout, go around again */
break;
default: /* Error of some type -
* I don't trust errno from qemu_poll_ns
*/
error_report("%s: poll failed", __func__);
return -EPIPE;
}
if (migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) {
/* Bail out and let the cancellation happen */
return -EPIPE;
}
}
}
if (rdma->received_error) {
return -EPIPE;
}
return rdma->error_state;
}
/*
* Block until the next work request has completed.
*
@ -1513,22 +1573,21 @@ static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
}
while (1) {
/*
* Coroutine doesn't start until migration_fd_process_incoming()
* so don't yield unless we know we're running inside of a coroutine.
*/
if (rdma->migration_started_on_destination) {
yield_until_fd_readable(rdma->comp_channel->fd);
ret = qemu_rdma_wait_comp_channel(rdma);
if (ret) {
goto err_block_for_wrid;
}
if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
ret = ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx);
if (ret) {
perror("ibv_get_cq_event");
goto err_block_for_wrid;
}
num_cq_events++;
if (ibv_req_notify_cq(cq, 0)) {
ret = -ibv_req_notify_cq(cq, 0);
if (ret) {
goto err_block_for_wrid;
}
@ -1564,6 +1623,8 @@ err_block_for_wrid:
if (num_cq_events) {
ibv_ack_cq_events(cq, num_cq_events);
}
rdma->error_state = ret;
return ret;
}
@ -1590,7 +1651,7 @@ static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
.num_sge = 1,
};
trace_qemu_rdma_post_send_control(control_desc[head->type]);
trace_qemu_rdma_post_send_control(control_desc(head->type));
/*
* We don't actually need to do a memcpy() in here if we used
@ -1669,16 +1730,16 @@ static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
network_to_control((void *) rdma->wr_data[idx].control);
memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
trace_qemu_rdma_exchange_get_response_start(control_desc[expecting]);
trace_qemu_rdma_exchange_get_response_start(control_desc(expecting));
if (expecting == RDMA_CONTROL_NONE) {
trace_qemu_rdma_exchange_get_response_none(control_desc[head->type],
trace_qemu_rdma_exchange_get_response_none(control_desc(head->type),
head->type);
} else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
error_report("Was expecting a %s (%d) control message"
", but got: %s (%d), length: %d",
control_desc[expecting], expecting,
control_desc[head->type], head->type, head->len);
control_desc(expecting), expecting,
control_desc(head->type), head->type, head->len);
if (head->type == RDMA_CONTROL_ERROR) {
rdma->received_error = true;
}
@ -1788,7 +1849,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
}
}
trace_qemu_rdma_exchange_send_waiting(control_desc[resp->type]);
trace_qemu_rdma_exchange_send_waiting(control_desc(resp->type));
ret = qemu_rdma_exchange_get_response(rdma, resp,
resp->type, RDMA_WRID_DATA);
@ -1800,7 +1861,7 @@ static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
if (resp_idx) {
*resp_idx = RDMA_WRID_DATA;
}
trace_qemu_rdma_exchange_send_received(control_desc[resp->type]);
trace_qemu_rdma_exchange_send_received(control_desc(resp->type));
}
rdma->control_ready_expected = 1;
@ -2208,7 +2269,9 @@ static void qemu_rdma_cleanup(RDMAContext *rdma)
int ret, idx;
if (rdma->cm_id && rdma->connected) {
if (rdma->error_state && !rdma->received_error) {
if ((rdma->error_state ||
migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
!rdma->received_error) {
RDMAControlHeader head = { .len = 0,
.type = RDMA_CONTROL_ERROR,
.repeat = 1,
@ -2365,6 +2428,12 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
caps_to_network(&cap);
ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
if (ret) {
ERROR(errp, "posting second control recv");
goto err_rdma_source_connect;
}
ret = rdma_connect(rdma->cm_id, &conn_param);
if (ret) {
perror("rdma_connect");
@ -2405,12 +2474,6 @@ static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
rdma_ack_cm_event(cm_event);
ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
if (ret) {
ERROR(errp, "posting second control recv!");
goto err_rdma_source_connect;
}
rdma->control_ready_expected = 1;
rdma->nb_sent = 0;
return 0;
@ -3350,7 +3413,7 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
ret = -EIO;
goto out;
default:
error_report("Unknown control message %s", control_desc[head.type]);
error_report("Unknown control message %s", control_desc(head.type));
ret = -EIO;
goto out;
}