Migration pull for 2.13

Alexey Perevalov postcopy blocktime statistics
 Xiao Guangrong's compression performance improvements
 -----BEGIN PGP SIGNATURE-----
 
 iQIcBAABAgAGBQJa4NUpAAoJEAUWMx68W/3njR4P/2eGZOBNU8W5e8MLr1hi95yS
 3yQiwNAWQ4rfvroyUAIVwvjbRUgXCMUaqOTu+kbwt/bXFbjrjlBAXBY61yEeWq3M
 gSCCnD/JUpEZfkieqhdulRzJDh/6VeQg4AbdIL8MyfxV0jODeVm2JfyMeHCHOEaP
 LaEiiMhteONu+hsTFms2oeTbyBVgVu/ceq5pFClr8dadYQtOEltSRWPwxHOpnm3P
 athGslx480AXDB3FqVucBpIq16qCqb8/jjXY1W2iJtCfrdtDExmmQA3KKSOe7WJJ
 AewsBPv1aFo9cqL7eefNXtMirulXhr38jMJ658bZyf2756TH4hE9eB6R3oz4FpTT
 j9iiTeNIfaYqgqOisW5fPng4gzxOYECmQZroYP2VSXSLfGeMQ5a5s0cJdNas6dpR
 Us0iedgJyp17p5g/68fZExCVlvEcPNPlfE6pd5T+DSEqdp5Dp0Qv/TBh5p/nG/gm
 /IBoEXcK9zqelHPOywVrTkEvkV+xzP+ORs9Ly1DQKWajzxCaLaClAP1INAj7ayFf
 yqJi8OvW66S487lQwdz3wvUGGTI1MCpYyoOplWYk22ohYft2BzYPtt/lX3pZanud
 YutxMSKfZMyFV81MkyRgoui5tG3jSXnv/IbGHaK42CIMuEcJsSseq2Ea3FmhznQV
 E90XGkay5Wi2alBKNEmX
 =3MTi
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/dgilbert/tags/pull-migration-20180425a' into staging

Migration pull for 2.13

Alexey Perevalov postcopy blocktime statistics
Xiao Guangrong's compression performance improvements

# gpg: Signature made Wed 25 Apr 2018 20:21:13 BST
# gpg:                using RSA key 0516331EBC5BFDE7
# gpg: Good signature from "Dr. David Alan Gilbert (RH2) <dgilbert@redhat.com>"
# Primary key fingerprint: 45F5 C71B 4A0C B7FB 977A  9FA9 0516 331E BC5B FDE7

* remotes/dgilbert/tags/pull-migration-20180425a:
  migration: remove ram_save_compressed_page()
  migration: introduce save_normal_page()
  migration: move calling save_zero_page to the common place
  migration: move calling control_save_page to the common place
  migration: move some code to ram_save_host_page
  migration: introduce control_save_page()
  migration: detect compression and decompression errors
  migration: stop decompression to allocate and free memory frequently
  migration: stop compression to allocate and free memory frequently
  migration: stop compressing page in migration thread
  migration: add postcopy total blocktime into query-migrate
  migration: add blocktime calculation into migration-test
  migration: postcopy_blocktime documentation
  migration: calculate vCPU blocktime on dst side
  migration: add postcopy blocktime ctx into MigrationIncomingState
  migration: introduce postcopy-blocktime capability

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2018-04-26 09:12:31 +01:00
commit 8e383d19b4
11 changed files with 719 additions and 218 deletions

View File

@ -401,6 +401,20 @@ will now cause the transition from precopy to postcopy.
It can be issued immediately after migration is started or any It can be issued immediately after migration is started or any
time later on. Issuing it after the end of a migration is harmless. time later on. Issuing it after the end of a migration is harmless.
Blocktime is a postcopy live migration metric, intended to show how
long the vCPU was in state of interruptable sleep due to pagefault.
That metric is calculated both for all vCPUs as overlapped value, and
separately for each vCPU. These values are calculated on destination
side. To enable postcopy blocktime calculation, enter following
command on destination monitor:
``migrate_set_capability postcopy-blocktime on``
Postcopy blocktime can be retrieved by query-migrate qmp command.
postcopy-blocktime value of qmp command will show overlapped blocking
time for all vCPU, postcopy-vcpu-blocktime will show list of blocking
time per vCPU.
.. note:: .. note::
During the postcopy phase, the bandwidth limits set using During the postcopy phase, the bandwidth limits set using
``migrate_set_speed`` is ignored (to avoid delaying requested pages that ``migrate_set_speed`` is ignored (to avoid delaying requested pages that

15
hmp.c
View File

@ -274,6 +274,21 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict)
info->cpu_throttle_percentage); info->cpu_throttle_percentage);
} }
if (info->has_postcopy_blocktime) {
monitor_printf(mon, "postcopy blocktime: %u\n",
info->postcopy_blocktime);
}
if (info->has_postcopy_vcpu_blocktime) {
Visitor *v;
char *str;
v = string_output_visitor_new(false, &str);
visit_type_uint32List(v, NULL, &info->postcopy_vcpu_blocktime, NULL);
visit_complete(v, &str);
monitor_printf(mon, "postcopy vcpu blocktime: %s\n", str);
g_free(str);
visit_free(v);
}
qapi_free_MigrationInfo(info); qapi_free_MigrationInfo(info);
qapi_free_MigrationCapabilityStatusList(caps); qapi_free_MigrationCapabilityStatusList(caps);
} }

View File

@ -630,14 +630,15 @@ static void populate_disk_info(MigrationInfo *info)
} }
} }
MigrationInfo *qmp_query_migrate(Error **errp) static void fill_source_migration_info(MigrationInfo *info)
{ {
MigrationInfo *info = g_malloc0(sizeof(*info));
MigrationState *s = migrate_get_current(); MigrationState *s = migrate_get_current();
switch (s->state) { switch (s->state) {
case MIGRATION_STATUS_NONE: case MIGRATION_STATUS_NONE:
/* no migration has happened ever */ /* no migration has happened ever */
/* do not overwrite destination migration status */
return;
break; break;
case MIGRATION_STATUS_SETUP: case MIGRATION_STATUS_SETUP:
info->has_status = true; info->has_status = true;
@ -688,8 +689,6 @@ MigrationInfo *qmp_query_migrate(Error **errp)
break; break;
} }
info->status = s->state; info->status = s->state;
return info;
} }
/** /**
@ -753,6 +752,41 @@ static bool migrate_caps_check(bool *cap_list,
return true; return true;
} }
static void fill_destination_migration_info(MigrationInfo *info)
{
MigrationIncomingState *mis = migration_incoming_get_current();
switch (mis->state) {
case MIGRATION_STATUS_NONE:
return;
break;
case MIGRATION_STATUS_SETUP:
case MIGRATION_STATUS_CANCELLING:
case MIGRATION_STATUS_CANCELLED:
case MIGRATION_STATUS_ACTIVE:
case MIGRATION_STATUS_POSTCOPY_ACTIVE:
case MIGRATION_STATUS_FAILED:
case MIGRATION_STATUS_COLO:
info->has_status = true;
break;
case MIGRATION_STATUS_COMPLETED:
info->has_status = true;
fill_destination_postcopy_migration_info(info);
break;
}
info->status = mis->state;
}
MigrationInfo *qmp_query_migrate(Error **errp)
{
MigrationInfo *info = g_malloc0(sizeof(*info));
fill_destination_migration_info(info);
fill_source_migration_info(info);
return info;
}
void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
Error **errp) Error **errp)
{ {
@ -1541,6 +1575,15 @@ bool migrate_zero_blocks(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS];
} }
bool migrate_postcopy_blocktime(void)
{
MigrationState *s;
s = migrate_get_current();
return s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_BLOCKTIME];
}
bool migrate_use_compression(void) bool migrate_use_compression(void)
{ {
MigrationState *s; MigrationState *s;

View File

@ -22,6 +22,8 @@
#include "hw/qdev.h" #include "hw/qdev.h"
#include "io/channel.h" #include "io/channel.h"
struct PostcopyBlocktimeContext;
/* State for the incoming migration */ /* State for the incoming migration */
struct MigrationIncomingState { struct MigrationIncomingState {
QEMUFile *from_src_file; QEMUFile *from_src_file;
@ -65,10 +67,20 @@ struct MigrationIncomingState {
/* The coroutine we should enter (back) after failover */ /* The coroutine we should enter (back) after failover */
Coroutine *migration_incoming_co; Coroutine *migration_incoming_co;
QemuSemaphore colo_incoming_sem; QemuSemaphore colo_incoming_sem;
/*
* PostcopyBlocktimeContext to keep information for postcopy
* live migration, to calculate vCPU block time
* */
struct PostcopyBlocktimeContext *blocktime_ctx;
}; };
MigrationIncomingState *migration_incoming_get_current(void); MigrationIncomingState *migration_incoming_get_current(void);
void migration_incoming_state_destroy(void); void migration_incoming_state_destroy(void);
/*
* Functions to work with blocktime context
*/
void fill_destination_postcopy_migration_info(MigrationInfo *info);
#define TYPE_MIGRATION "migration" #define TYPE_MIGRATION "migration"
@ -230,6 +242,7 @@ int migrate_compress_level(void);
int migrate_compress_threads(void); int migrate_compress_threads(void);
int migrate_decompress_threads(void); int migrate_decompress_threads(void);
bool migrate_use_events(void); bool migrate_use_events(void);
bool migrate_postcopy_blocktime(void);
/* Sending on the return path - generic and then for each message type */ /* Sending on the return path - generic and then for each message type */
void migrate_send_rp_shut(MigrationIncomingState *mis, void migrate_send_rp_shut(MigrationIncomingState *mis,

View File

@ -90,6 +90,103 @@ int postcopy_notify(enum PostcopyNotifyReason reason, Error **errp)
#include <sys/eventfd.h> #include <sys/eventfd.h>
#include <linux/userfaultfd.h> #include <linux/userfaultfd.h>
typedef struct PostcopyBlocktimeContext {
/* time when page fault initiated per vCPU */
uint32_t *page_fault_vcpu_time;
/* page address per vCPU */
uintptr_t *vcpu_addr;
uint32_t total_blocktime;
/* blocktime per vCPU */
uint32_t *vcpu_blocktime;
/* point in time when last page fault was initiated */
uint32_t last_begin;
/* number of vCPU are suspended */
int smp_cpus_down;
uint64_t start_time;
/*
* Handler for exit event, necessary for
* releasing whole blocktime_ctx
*/
Notifier exit_notifier;
} PostcopyBlocktimeContext;
static void destroy_blocktime_context(struct PostcopyBlocktimeContext *ctx)
{
g_free(ctx->page_fault_vcpu_time);
g_free(ctx->vcpu_addr);
g_free(ctx->vcpu_blocktime);
g_free(ctx);
}
static void migration_exit_cb(Notifier *n, void *data)
{
PostcopyBlocktimeContext *ctx = container_of(n, PostcopyBlocktimeContext,
exit_notifier);
destroy_blocktime_context(ctx);
}
static struct PostcopyBlocktimeContext *blocktime_context_new(void)
{
PostcopyBlocktimeContext *ctx = g_new0(PostcopyBlocktimeContext, 1);
ctx->page_fault_vcpu_time = g_new0(uint32_t, smp_cpus);
ctx->vcpu_addr = g_new0(uintptr_t, smp_cpus);
ctx->vcpu_blocktime = g_new0(uint32_t, smp_cpus);
ctx->exit_notifier.notify = migration_exit_cb;
ctx->start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
qemu_add_exit_notifier(&ctx->exit_notifier);
return ctx;
}
static uint32List *get_vcpu_blocktime_list(PostcopyBlocktimeContext *ctx)
{
uint32List *list = NULL, *entry = NULL;
int i;
for (i = smp_cpus - 1; i >= 0; i--) {
entry = g_new0(uint32List, 1);
entry->value = ctx->vcpu_blocktime[i];
entry->next = list;
list = entry;
}
return list;
}
/*
* This function just populates MigrationInfo from postcopy's
* blocktime context. It will not populate MigrationInfo,
* unless postcopy-blocktime capability was set.
*
* @info: pointer to MigrationInfo to populate
*/
void fill_destination_postcopy_migration_info(MigrationInfo *info)
{
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyBlocktimeContext *bc = mis->blocktime_ctx;
if (!bc) {
return;
}
info->has_postcopy_blocktime = true;
info->postcopy_blocktime = bc->total_blocktime;
info->has_postcopy_vcpu_blocktime = true;
info->postcopy_vcpu_blocktime = get_vcpu_blocktime_list(bc);
}
static uint32_t get_postcopy_total_blocktime(void)
{
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyBlocktimeContext *bc = mis->blocktime_ctx;
if (!bc) {
return 0;
}
return bc->total_blocktime;
}
/** /**
* receive_ufd_features: check userfault fd features, to request only supported * receive_ufd_features: check userfault fd features, to request only supported
@ -182,6 +279,19 @@ static bool ufd_check_and_apply(int ufd, MigrationIncomingState *mis)
} }
} }
#ifdef UFFD_FEATURE_THREAD_ID
if (migrate_postcopy_blocktime() && mis &&
UFFD_FEATURE_THREAD_ID & supported_features) {
/* kernel supports that feature */
/* don't create blocktime_context if it exists */
if (!mis->blocktime_ctx) {
mis->blocktime_ctx = blocktime_context_new();
}
asked_features |= UFFD_FEATURE_THREAD_ID;
}
#endif
/* /*
* request features, even if asked_features is 0, due to * request features, even if asked_features is 0, due to
* kernel expects UFFD_API before UFFDIO_REGISTER, per * kernel expects UFFD_API before UFFDIO_REGISTER, per
@ -451,6 +561,9 @@ int postcopy_ram_incoming_cleanup(MigrationIncomingState *mis)
munmap(mis->postcopy_tmp_zero_page, mis->largest_page_size); munmap(mis->postcopy_tmp_zero_page, mis->largest_page_size);
mis->postcopy_tmp_zero_page = NULL; mis->postcopy_tmp_zero_page = NULL;
} }
trace_postcopy_ram_incoming_cleanup_blocktime(
get_postcopy_total_blocktime());
trace_postcopy_ram_incoming_cleanup_exit(); trace_postcopy_ram_incoming_cleanup_exit();
return 0; return 0;
} }
@ -575,6 +688,148 @@ int postcopy_request_shared_page(struct PostCopyFD *pcfd, RAMBlock *rb,
return 0; return 0;
} }
static int get_mem_fault_cpu_index(uint32_t pid)
{
CPUState *cpu_iter;
CPU_FOREACH(cpu_iter) {
if (cpu_iter->thread_id == pid) {
trace_get_mem_fault_cpu_index(cpu_iter->cpu_index, pid);
return cpu_iter->cpu_index;
}
}
trace_get_mem_fault_cpu_index(-1, pid);
return -1;
}
static uint32_t get_low_time_offset(PostcopyBlocktimeContext *dc)
{
int64_t start_time_offset = qemu_clock_get_ms(QEMU_CLOCK_REALTIME) -
dc->start_time;
return start_time_offset < 1 ? 1 : start_time_offset & UINT32_MAX;
}
/*
* This function is being called when pagefault occurs. It
* tracks down vCPU blocking time.
*
* @addr: faulted host virtual address
* @ptid: faulted process thread id
* @rb: ramblock appropriate to addr
*/
static void mark_postcopy_blocktime_begin(uintptr_t addr, uint32_t ptid,
RAMBlock *rb)
{
int cpu, already_received;
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyBlocktimeContext *dc = mis->blocktime_ctx;
uint32_t low_time_offset;
if (!dc || ptid == 0) {
return;
}
cpu = get_mem_fault_cpu_index(ptid);
if (cpu < 0) {
return;
}
low_time_offset = get_low_time_offset(dc);
if (dc->vcpu_addr[cpu] == 0) {
atomic_inc(&dc->smp_cpus_down);
}
atomic_xchg(&dc->last_begin, low_time_offset);
atomic_xchg(&dc->page_fault_vcpu_time[cpu], low_time_offset);
atomic_xchg(&dc->vcpu_addr[cpu], addr);
/* check it here, not at the begining of the function,
* due to, check could accur early than bitmap_set in
* qemu_ufd_copy_ioctl */
already_received = ramblock_recv_bitmap_test(rb, (void *)addr);
if (already_received) {
atomic_xchg(&dc->vcpu_addr[cpu], 0);
atomic_xchg(&dc->page_fault_vcpu_time[cpu], 0);
atomic_dec(&dc->smp_cpus_down);
}
trace_mark_postcopy_blocktime_begin(addr, dc, dc->page_fault_vcpu_time[cpu],
cpu, already_received);
}
/*
* This function just provide calculated blocktime per cpu and trace it.
* Total blocktime is calculated in mark_postcopy_blocktime_end.
*
*
* Assume we have 3 CPU
*
* S1 E1 S1 E1
* -----***********------------xxx***************------------------------> CPU1
*
* S2 E2
* ------------****************xxx---------------------------------------> CPU2
*
* S3 E3
* ------------------------****xxx********-------------------------------> CPU3
*
* We have sequence S1,S2,E1,S3,S1,E2,E3,E1
* S2,E1 - doesn't match condition due to sequence S1,S2,E1 doesn't include CPU3
* S3,S1,E2 - sequence includes all CPUs, in this case overlap will be S1,E2 -
* it's a part of total blocktime.
* S1 - here is last_begin
* Legend of the picture is following:
* * - means blocktime per vCPU
* x - means overlapped blocktime (total blocktime)
*
* @addr: host virtual address
*/
static void mark_postcopy_blocktime_end(uintptr_t addr)
{
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyBlocktimeContext *dc = mis->blocktime_ctx;
int i, affected_cpu = 0;
bool vcpu_total_blocktime = false;
uint32_t read_vcpu_time, low_time_offset;
if (!dc) {
return;
}
low_time_offset = get_low_time_offset(dc);
/* lookup cpu, to clear it,
* that algorithm looks straighforward, but it's not
* optimal, more optimal algorithm is keeping tree or hash
* where key is address value is a list of */
for (i = 0; i < smp_cpus; i++) {
uint32_t vcpu_blocktime = 0;
read_vcpu_time = atomic_fetch_add(&dc->page_fault_vcpu_time[i], 0);
if (atomic_fetch_add(&dc->vcpu_addr[i], 0) != addr ||
read_vcpu_time == 0) {
continue;
}
atomic_xchg(&dc->vcpu_addr[i], 0);
vcpu_blocktime = low_time_offset - read_vcpu_time;
affected_cpu += 1;
/* we need to know is that mark_postcopy_end was due to
* faulted page, another possible case it's prefetched
* page and in that case we shouldn't be here */
if (!vcpu_total_blocktime &&
atomic_fetch_add(&dc->smp_cpus_down, 0) == smp_cpus) {
vcpu_total_blocktime = true;
}
/* continue cycle, due to one page could affect several vCPUs */
dc->vcpu_blocktime[i] += vcpu_blocktime;
}
atomic_sub(&dc->smp_cpus_down, affected_cpu);
if (vcpu_total_blocktime) {
dc->total_blocktime += low_time_offset - atomic_fetch_add(
&dc->last_begin, 0);
}
trace_mark_postcopy_blocktime_end(addr, dc, dc->total_blocktime,
affected_cpu);
}
/* /*
* Handle faults detected by the USERFAULT markings * Handle faults detected by the USERFAULT markings
*/ */
@ -681,7 +936,12 @@ static void *postcopy_ram_fault_thread(void *opaque)
rb_offset &= ~(qemu_ram_pagesize(rb) - 1); rb_offset &= ~(qemu_ram_pagesize(rb) - 1);
trace_postcopy_ram_fault_thread_request(msg.arg.pagefault.address, trace_postcopy_ram_fault_thread_request(msg.arg.pagefault.address,
qemu_ram_get_idstr(rb), qemu_ram_get_idstr(rb),
rb_offset); rb_offset,
msg.arg.pagefault.feat.ptid);
mark_postcopy_blocktime_begin(
(uintptr_t)(msg.arg.pagefault.address),
msg.arg.pagefault.feat.ptid, rb);
/* /*
* Send the request to the source - we want to request one * Send the request to the source - we want to request one
* of our host page sizes (which is >= TPS) * of our host page sizes (which is >= TPS)
@ -829,6 +1089,8 @@ static int qemu_ufd_copy_ioctl(int userfault_fd, void *host_addr,
if (!ret) { if (!ret) {
ramblock_recv_bitmap_set_range(rb, host_addr, ramblock_recv_bitmap_set_range(rb, host_addr,
pagesize / qemu_target_page_size()); pagesize / qemu_target_page_size());
mark_postcopy_blocktime_end((uintptr_t)host_addr);
} }
return ret; return ret;
} }
@ -947,6 +1209,10 @@ void *postcopy_get_tmp_page(MigrationIncomingState *mis)
#else #else
/* No target OS support, stubs just fail */ /* No target OS support, stubs just fail */
void fill_destination_postcopy_migration_info(MigrationInfo *info)
{
}
bool postcopy_ram_supported_by_host(MigrationIncomingState *mis) bool postcopy_ram_supported_by_host(MigrationIncomingState *mis)
{ {
error_report("%s: No OS support", __func__); error_report("%s: No OS support", __func__);

View File

@ -658,8 +658,32 @@ uint64_t qemu_get_be64(QEMUFile *f)
return v; return v;
} }
/* Compress size bytes of data start at p with specific compression /* return the size after compression, or negative value on error */
* level and store the compressed data to the buffer of f. static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
const uint8_t *source, size_t source_len)
{
int err;
err = deflateReset(stream);
if (err != Z_OK) {
return -1;
}
stream->avail_in = source_len;
stream->next_in = (uint8_t *)source;
stream->avail_out = dest_len;
stream->next_out = dest;
err = deflate(stream, Z_FINISH);
if (err != Z_STREAM_END) {
return -1;
}
return stream->next_out - dest;
}
/* Compress size bytes of data start at p and store the compressed
* data to the buffer of f.
* *
* When f is not writable, return -1 if f has no space to save the * When f is not writable, return -1 if f has no space to save the
* compressed data. * compressed data.
@ -667,9 +691,8 @@ uint64_t qemu_get_be64(QEMUFile *f)
* do fflush first, if f still has no space to save the compressed * do fflush first, if f still has no space to save the compressed
* data, return -1. * data, return -1.
*/ */
ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, const uint8_t *p, size_t size)
int level)
{ {
ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
@ -683,11 +706,13 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size,
return -1; return -1;
} }
} }
if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen,
(Bytef *)p, size, level) != Z_OK) { blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
error_report("Compress Failed!"); blen, p, size);
return 0; if (blen < 0) {
return -1;
} }
qemu_put_be32(f, blen); qemu_put_be32(f, blen);
if (f->ops->writev_buffer) { if (f->ops->writev_buffer) {
add_to_iovec(f, f->buf + f->buf_index, blen, false); add_to_iovec(f, f->buf + f->buf_index, blen, false);

View File

@ -25,6 +25,8 @@
#ifndef MIGRATION_QEMU_FILE_H #ifndef MIGRATION_QEMU_FILE_H
#define MIGRATION_QEMU_FILE_H #define MIGRATION_QEMU_FILE_H
#include <zlib.h>
/* Read a chunk of data from a file at the given position. The pos argument /* Read a chunk of data from a file at the given position. The pos argument
* can be ignored if the file is only be used for streaming. The number of * can be ignored if the file is only be used for streaming. The number of
* bytes actually read should be returned. * bytes actually read should be returned.
@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f);
size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset);
size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
int level); const uint8_t *p, size_t size);
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
/* /*

View File

@ -269,6 +269,10 @@ struct CompressParam {
QemuCond cond; QemuCond cond;
RAMBlock *block; RAMBlock *block;
ram_addr_t offset; ram_addr_t offset;
/* internally used fields */
z_stream stream;
uint8_t *originbuf;
}; };
typedef struct CompressParam CompressParam; typedef struct CompressParam CompressParam;
@ -280,6 +284,7 @@ struct DecompressParam {
void *des; void *des;
uint8_t *compbuf; uint8_t *compbuf;
int len; int len;
z_stream stream;
}; };
typedef struct DecompressParam DecompressParam; typedef struct DecompressParam DecompressParam;
@ -294,13 +299,14 @@ static QemuCond comp_done_cond;
/* The empty QEMUFileOps will be used by file in CompressParam */ /* The empty QEMUFileOps will be used by file in CompressParam */
static const QEMUFileOps empty_ops = { }; static const QEMUFileOps empty_ops = { };
static QEMUFile *decomp_file;
static DecompressParam *decomp_param; static DecompressParam *decomp_param;
static QemuThread *decompress_threads; static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock; static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond; static QemuCond decomp_done_cond;
static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset); ram_addr_t offset, uint8_t *source_buf);
static void *do_data_compress(void *opaque) static void *do_data_compress(void *opaque)
{ {
@ -316,7 +322,8 @@ static void *do_data_compress(void *opaque)
param->block = NULL; param->block = NULL;
qemu_mutex_unlock(&param->mutex); qemu_mutex_unlock(&param->mutex);
do_compress_ram_page(param->file, block, offset); do_compress_ram_page(param->file, &param->stream, block, offset,
param->originbuf);
qemu_mutex_lock(&comp_done_lock); qemu_mutex_lock(&comp_done_lock);
param->done = true; param->done = true;
@ -357,10 +364,20 @@ static void compress_threads_save_cleanup(void)
terminate_compression_threads(); terminate_compression_threads();
thread_count = migrate_compress_threads(); thread_count = migrate_compress_threads();
for (i = 0; i < thread_count; i++) { for (i = 0; i < thread_count; i++) {
/*
* we use it as a indicator which shows if the thread is
* properly init'd or not
*/
if (!comp_param[i].file) {
break;
}
qemu_thread_join(compress_threads + i); qemu_thread_join(compress_threads + i);
qemu_fclose(comp_param[i].file);
qemu_mutex_destroy(&comp_param[i].mutex); qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond); qemu_cond_destroy(&comp_param[i].cond);
deflateEnd(&comp_param[i].stream);
g_free(comp_param[i].originbuf);
qemu_fclose(comp_param[i].file);
comp_param[i].file = NULL;
} }
qemu_mutex_destroy(&comp_done_lock); qemu_mutex_destroy(&comp_done_lock);
qemu_cond_destroy(&comp_done_cond); qemu_cond_destroy(&comp_done_cond);
@ -370,12 +387,12 @@ static void compress_threads_save_cleanup(void)
comp_param = NULL; comp_param = NULL;
} }
static void compress_threads_save_setup(void) static int compress_threads_save_setup(void)
{ {
int i, thread_count; int i, thread_count;
if (!migrate_use_compression()) { if (!migrate_use_compression()) {
return; return 0;
} }
thread_count = migrate_compress_threads(); thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count); compress_threads = g_new0(QemuThread, thread_count);
@ -383,6 +400,17 @@ static void compress_threads_save_setup(void)
qemu_cond_init(&comp_done_cond); qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock); qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) { for (i = 0; i < thread_count; i++) {
comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
if (!comp_param[i].originbuf) {
goto exit;
}
if (deflateInit(&comp_param[i].stream,
migrate_compress_level()) != Z_OK) {
g_free(comp_param[i].originbuf);
goto exit;
}
/* comp_param[i].file is just used as a dummy buffer to save data, /* comp_param[i].file is just used as a dummy buffer to save data,
* set its ops to empty. * set its ops to empty.
*/ */
@ -395,6 +423,11 @@ static void compress_threads_save_setup(void)
do_data_compress, comp_param + i, do_data_compress, comp_param + i,
QEMU_THREAD_JOINABLE); QEMU_THREAD_JOINABLE);
} }
return 0;
exit:
compress_threads_save_cleanup();
return -1;
} }
/* Multiple fd's */ /* Multiple fd's */
@ -941,6 +974,72 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS); ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
} }
/*
* @pages: the number of pages written by the control path,
* < 0 - error
* > 0 - number of pages written
*
* Return true if the pages has been saved, otherwise false is returned.
*/
static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
int *pages)
{
uint64_t bytes_xmit = 0;
int ret;
*pages = -1;
ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
&bytes_xmit);
if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
return false;
}
if (bytes_xmit) {
ram_counters.transferred += bytes_xmit;
*pages = 1;
}
if (ret == RAM_SAVE_CONTROL_DELAYED) {
return true;
}
if (bytes_xmit > 0) {
ram_counters.normal++;
} else if (bytes_xmit == 0) {
ram_counters.duplicate++;
}
return true;
}
/*
* directly send the page to the stream
*
* Returns the number of pages written.
*
* @rs: current RAM state
* @block: block that contains the page we want to send
* @offset: offset inside the block for the page
* @buf: the page to be sent
* @async: send to page asyncly
*/
static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
uint8_t *buf, bool async)
{
ram_counters.transferred += save_page_header(rs, rs->f, block,
offset | RAM_SAVE_FLAG_PAGE);
if (async) {
qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
migrate_release_ram() &
migration_in_postcopy());
} else {
qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
}
ram_counters.transferred += TARGET_PAGE_SIZE;
ram_counters.normal++;
return 1;
}
/** /**
* ram_save_page: send the given page to the stream * ram_save_page: send the given page to the stream
* *
@ -957,73 +1056,31 @@ static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
{ {
int pages = -1; int pages = -1;
uint64_t bytes_xmit;
ram_addr_t current_addr;
uint8_t *p; uint8_t *p;
int ret;
bool send_async = true; bool send_async = true;
RAMBlock *block = pss->block; RAMBlock *block = pss->block;
ram_addr_t offset = pss->page << TARGET_PAGE_BITS; ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
ram_addr_t current_addr = block->offset + offset;
p = block->host + offset; p = block->host + offset;
trace_ram_save_page(block->idstr, (uint64_t)offset, p); trace_ram_save_page(block->idstr, (uint64_t)offset, p);
/* In doubt sent page as normal */
bytes_xmit = 0;
ret = ram_control_save_page(rs->f, block->offset,
offset, TARGET_PAGE_SIZE, &bytes_xmit);
if (bytes_xmit) {
ram_counters.transferred += bytes_xmit;
pages = 1;
}
XBZRLE_cache_lock(); XBZRLE_cache_lock();
if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
current_addr = block->offset + offset; migrate_use_xbzrle()) {
pages = save_xbzrle_page(rs, &p, current_addr, block,
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) { offset, last_stage);
if (ret != RAM_SAVE_CONTROL_DELAYED) { if (!last_stage) {
if (bytes_xmit > 0) { /* Can't send this cached data async, since the cache page
ram_counters.normal++; * might get updated before it gets to the wire
} else if (bytes_xmit == 0) {
ram_counters.duplicate++;
}
}
} else {
pages = save_zero_page(rs, block, offset);
if (pages > 0) {
/* Must let xbzrle know, otherwise a previous (now 0'd) cached
* page would be stale
*/ */
xbzrle_cache_zero_page(rs, current_addr); send_async = false;
ram_release_pages(block->idstr, offset, pages);
} else if (!rs->ram_bulk_stage &&
!migration_in_postcopy() && migrate_use_xbzrle()) {
pages = save_xbzrle_page(rs, &p, current_addr, block,
offset, last_stage);
if (!last_stage) {
/* Can't send this cached data async, since the cache page
* might get updated before it gets to the wire
*/
send_async = false;
}
} }
} }
/* XBZRLE overflow or normal page */ /* XBZRLE overflow or normal page */
if (pages == -1) { if (pages == -1) {
ram_counters.transferred += pages = save_normal_page(rs, block, offset, p, send_async);
save_page_header(rs, rs->f, block, offset | RAM_SAVE_FLAG_PAGE);
if (send_async) {
qemu_put_buffer_async(rs->f, p, TARGET_PAGE_SIZE,
migrate_release_ram() &
migration_in_postcopy());
} else {
qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE);
}
ram_counters.transferred += TARGET_PAGE_SIZE;
pages = 1;
ram_counters.normal++;
} }
XBZRLE_cache_unlock(); XBZRLE_cache_unlock();
@ -1031,8 +1088,8 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
return pages; return pages;
} }
static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
ram_addr_t offset) ram_addr_t offset, uint8_t *source_buf)
{ {
RAMState *rs = ram_state; RAMState *rs = ram_state;
int bytes_sent, blen; int bytes_sent, blen;
@ -1040,8 +1097,14 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block,
bytes_sent = save_page_header(rs, f, block, offset | bytes_sent = save_page_header(rs, f, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE); RAM_SAVE_FLAG_COMPRESS_PAGE);
blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE,
migrate_compress_level()); /*
* copy it to a internal buffer to avoid it being modified by VM
* so that we can catch up the error during compression and
* decompression
*/
memcpy(source_buf, p, TARGET_PAGE_SIZE);
blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
if (blen < 0) { if (blen < 0) {
bytes_sent = 0; bytes_sent = 0;
qemu_file_set_error(migrate_get_current()->to_dst_file, blen); qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@ -1121,83 +1184,6 @@ static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
return pages; return pages;
} }
/**
* ram_save_compressed_page: compress the given page and send it to the stream
*
* Returns the number of pages written.
*
* @rs: current RAM state
* @block: block that contains the page we want to send
* @offset: offset inside the block for the page
* @last_stage: if we are at the completion stage
*/
static int ram_save_compressed_page(RAMState *rs, PageSearchStatus *pss,
bool last_stage)
{
int pages = -1;
uint64_t bytes_xmit = 0;
uint8_t *p;
int ret, blen;
RAMBlock *block = pss->block;
ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
p = block->host + offset;
ret = ram_control_save_page(rs->f, block->offset,
offset, TARGET_PAGE_SIZE, &bytes_xmit);
if (bytes_xmit) {
ram_counters.transferred += bytes_xmit;
pages = 1;
}
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
if (ret != RAM_SAVE_CONTROL_DELAYED) {
if (bytes_xmit > 0) {
ram_counters.normal++;
} else if (bytes_xmit == 0) {
ram_counters.duplicate++;
}
}
} else {
/* When starting the process of a new block, the first page of
* the block should be sent out before other pages in the same
* block, and all the pages in last block should have been sent
* out, keeping this order is important, because the 'cont' flag
* is used to avoid resending the block name.
*/
if (block != rs->last_sent_block) {
flush_compressed_data(rs);
pages = save_zero_page(rs, block, offset);
if (pages == -1) {
/* Make sure the first page is sent out before other pages */
bytes_xmit = save_page_header(rs, rs->f, block, offset |
RAM_SAVE_FLAG_COMPRESS_PAGE);
blen = qemu_put_compression_data(rs->f, p, TARGET_PAGE_SIZE,
migrate_compress_level());
if (blen > 0) {
ram_counters.transferred += bytes_xmit + blen;
ram_counters.normal++;
pages = 1;
} else {
qemu_file_set_error(rs->f, blen);
error_report("compressed data failed!");
}
}
if (pages > 0) {
ram_release_pages(block->idstr, offset, pages);
}
} else {
pages = save_zero_page(rs, block, offset);
if (pages == -1) {
pages = compress_page_with_multi_thread(rs, block, offset);
} else {
ram_release_pages(block->idstr, offset, pages);
}
}
}
return pages;
}
/** /**
* find_dirty_block: find the next dirty page and update any state * find_dirty_block: find the next dirty page and update any state
* associated with the search process. * associated with the search process.
@ -1434,44 +1420,80 @@ err:
return -1; return -1;
} }
static bool save_page_use_compression(RAMState *rs)
{
if (!migrate_use_compression()) {
return false;
}
/*
* If xbzrle is on, stop using the data compression after first
* round of migration even if compression is enabled. In theory,
* xbzrle can do better than compression.
*/
if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
return true;
}
return false;
}
/** /**
* ram_save_target_page: save one target page * ram_save_target_page: save one target page
* *
* Returns the number of pages written * Returns the number of pages written
* *
* @rs: current RAM state * @rs: current RAM state
* @ms: current migration state
* @pss: data about the page we want to send * @pss: data about the page we want to send
* @last_stage: if we are at the completion stage * @last_stage: if we are at the completion stage
*/ */
static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
bool last_stage) bool last_stage)
{ {
int res = 0; RAMBlock *block = pss->block;
ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
int res;
/* Check the pages is dirty and if it is send it */ if (control_save_page(rs, block, offset, &res)) {
if (migration_bitmap_clear_dirty(rs, pss->block, pss->page)) { return res;
/*
* If xbzrle is on, stop using the data compression after first
* round of migration even if compression is enabled. In theory,
* xbzrle can do better than compression.
*/
if (migrate_use_compression() &&
(rs->ram_bulk_stage || !migrate_use_xbzrle())) {
res = ram_save_compressed_page(rs, pss, last_stage);
} else {
res = ram_save_page(rs, pss, last_stage);
}
if (res < 0) {
return res;
}
if (pss->block->unsentmap) {
clear_bit(pss->page, pss->block->unsentmap);
}
} }
return res; /*
* When starting the process of a new block, the first page of
* the block should be sent out before other pages in the same
* block, and all the pages in last block should have been sent
* out, keeping this order is important, because the 'cont' flag
* is used to avoid resending the block name.
*/
if (block != rs->last_sent_block && save_page_use_compression(rs)) {
flush_compressed_data(rs);
}
res = save_zero_page(rs, block, offset);
if (res > 0) {
/* Must let xbzrle know, otherwise a previous (now 0'd) cached
* page would be stale
*/
if (!save_page_use_compression(rs)) {
XBZRLE_cache_lock();
xbzrle_cache_zero_page(rs, block->offset + offset);
XBZRLE_cache_unlock();
}
ram_release_pages(block->idstr, offset, res);
return res;
}
/*
* Make sure the first page is sent out before other pages.
*
* we post it as normal page as compression will take much
* CPU resource.
*/
if (block == rs->last_sent_block && save_page_use_compression(rs)) {
res = compress_page_with_multi_thread(rs, block, offset);
}
return ram_save_page(rs, pss, last_stage);
} }
/** /**
@ -1500,12 +1522,22 @@ static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS; qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
do { do {
/* Check the pages is dirty and if it is send it */
if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
pss->page++;
continue;
}
tmppages = ram_save_target_page(rs, pss, last_stage); tmppages = ram_save_target_page(rs, pss, last_stage);
if (tmppages < 0) { if (tmppages < 0) {
return tmppages; return tmppages;
} }
pages += tmppages; pages += tmppages;
if (pss->block->unsentmap) {
clear_bit(pss->page, pss->block->unsentmap);
}
pss->page++; pss->page++;
} while ((pss->page & (pagesize_bits - 1)) && } while ((pss->page & (pagesize_bits - 1)) &&
offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS)); offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
@ -2214,9 +2246,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
RAMState **rsp = opaque; RAMState **rsp = opaque;
RAMBlock *block; RAMBlock *block;
if (compress_threads_save_setup()) {
return -1;
}
/* migration has already setup the bitmap, reuse it. */ /* migration has already setup the bitmap, reuse it. */
if (!migration_in_colo_state()) { if (!migration_in_colo_state()) {
if (ram_init_all(rsp) != 0) { if (ram_init_all(rsp) != 0) {
compress_threads_save_cleanup();
return -1; return -1;
} }
} }
@ -2236,7 +2273,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque)
} }
rcu_read_unlock(); rcu_read_unlock();
compress_threads_save_setup();
ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_before_iterate(f, RAM_CONTROL_SETUP);
ram_control_after_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP);
@ -2501,12 +2537,37 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
} }
} }
/* return the size after decompression, or negative value on error */
static int
qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
const uint8_t *source, size_t source_len)
{
int err;
err = inflateReset(stream);
if (err != Z_OK) {
return -1;
}
stream->avail_in = source_len;
stream->next_in = (uint8_t *)source;
stream->avail_out = dest_len;
stream->next_out = dest;
err = inflate(stream, Z_NO_FLUSH);
if (err != Z_STREAM_END) {
return -1;
}
return stream->total_out;
}
static void *do_data_decompress(void *opaque) static void *do_data_decompress(void *opaque)
{ {
DecompressParam *param = opaque; DecompressParam *param = opaque;
unsigned long pagesize; unsigned long pagesize;
uint8_t *des; uint8_t *des;
int len; int len, ret;
qemu_mutex_lock(&param->mutex); qemu_mutex_lock(&param->mutex);
while (!param->quit) { while (!param->quit) {
@ -2517,13 +2578,13 @@ static void *do_data_decompress(void *opaque)
qemu_mutex_unlock(&param->mutex); qemu_mutex_unlock(&param->mutex);
pagesize = TARGET_PAGE_SIZE; pagesize = TARGET_PAGE_SIZE;
/* uncompress() will return failed in some case, especially
* when the page is dirted when doing the compression, it's ret = qemu_uncompress_data(&param->stream, des, pagesize,
* not a problem because the dirty page will be retransferred param->compbuf, len);
* and uncompress() won't break the data in other pages. if (ret < 0) {
*/ error_report("decompress data failed");
uncompress((Bytef *)des, &pagesize, qemu_file_set_error(decomp_file, ret);
(const Bytef *)param->compbuf, len); }
qemu_mutex_lock(&decomp_done_lock); qemu_mutex_lock(&decomp_done_lock);
param->done = true; param->done = true;
@ -2540,12 +2601,12 @@ static void *do_data_decompress(void *opaque)
return NULL; return NULL;
} }
static void wait_for_decompress_done(void) static int wait_for_decompress_done(void)
{ {
int idx, thread_count; int idx, thread_count;
if (!migrate_use_compression()) { if (!migrate_use_compression()) {
return; return 0;
} }
thread_count = migrate_decompress_threads(); thread_count = migrate_decompress_threads();
@ -2556,30 +2617,7 @@ static void wait_for_decompress_done(void)
} }
} }
qemu_mutex_unlock(&decomp_done_lock); qemu_mutex_unlock(&decomp_done_lock);
} return qemu_file_get_error(decomp_file);
static void compress_threads_load_setup(void)
{
int i, thread_count;
if (!migrate_use_compression()) {
return;
}
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
qemu_mutex_init(&decomp_done_lock);
qemu_cond_init(&decomp_done_cond);
for (i = 0; i < thread_count; i++) {
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
decomp_param[i].done = true;
decomp_param[i].quit = false;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
}
} }
static void compress_threads_load_cleanup(void) static void compress_threads_load_cleanup(void)
@ -2591,21 +2629,70 @@ static void compress_threads_load_cleanup(void)
} }
thread_count = migrate_decompress_threads(); thread_count = migrate_decompress_threads();
for (i = 0; i < thread_count; i++) { for (i = 0; i < thread_count; i++) {
/*
* we use it as a indicator which shows if the thread is
* properly init'd or not
*/
if (!decomp_param[i].compbuf) {
break;
}
qemu_mutex_lock(&decomp_param[i].mutex); qemu_mutex_lock(&decomp_param[i].mutex);
decomp_param[i].quit = true; decomp_param[i].quit = true;
qemu_cond_signal(&decomp_param[i].cond); qemu_cond_signal(&decomp_param[i].cond);
qemu_mutex_unlock(&decomp_param[i].mutex); qemu_mutex_unlock(&decomp_param[i].mutex);
} }
for (i = 0; i < thread_count; i++) { for (i = 0; i < thread_count; i++) {
if (!decomp_param[i].compbuf) {
break;
}
qemu_thread_join(decompress_threads + i); qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex); qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond); qemu_cond_destroy(&decomp_param[i].cond);
inflateEnd(&decomp_param[i].stream);
g_free(decomp_param[i].compbuf); g_free(decomp_param[i].compbuf);
decomp_param[i].compbuf = NULL;
} }
g_free(decompress_threads); g_free(decompress_threads);
g_free(decomp_param); g_free(decomp_param);
decompress_threads = NULL; decompress_threads = NULL;
decomp_param = NULL; decomp_param = NULL;
decomp_file = NULL;
}
static int compress_threads_load_setup(QEMUFile *f)
{
int i, thread_count;
if (!migrate_use_compression()) {
return 0;
}
thread_count = migrate_decompress_threads();
decompress_threads = g_new0(QemuThread, thread_count);
decomp_param = g_new0(DecompressParam, thread_count);
qemu_mutex_init(&decomp_done_lock);
qemu_cond_init(&decomp_done_cond);
decomp_file = f;
for (i = 0; i < thread_count; i++) {
if (inflateInit(&decomp_param[i].stream) != Z_OK) {
goto exit;
}
decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
qemu_mutex_init(&decomp_param[i].mutex);
qemu_cond_init(&decomp_param[i].cond);
decomp_param[i].done = true;
decomp_param[i].quit = false;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
}
return 0;
exit:
compress_threads_load_cleanup();
return -1;
} }
static void decompress_data_with_multi_threads(QEMUFile *f, static void decompress_data_with_multi_threads(QEMUFile *f,
@ -2647,8 +2734,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
*/ */
static int ram_load_setup(QEMUFile *f, void *opaque) static int ram_load_setup(QEMUFile *f, void *opaque)
{ {
if (compress_threads_load_setup(f)) {
return -1;
}
xbzrle_load_setup(); xbzrle_load_setup();
compress_threads_load_setup();
ramblock_recv_map_init(); ramblock_recv_map_init();
return 0; return 0;
} }
@ -2999,7 +3089,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
} }
} }
wait_for_decompress_done(); ret |= wait_for_decompress_done();
rcu_read_unlock(); rcu_read_unlock();
trace_ram_load_complete(ret, seq_iter); trace_ram_load_complete(ret, seq_iter);
return ret; return ret;

View File

@ -115,6 +115,8 @@ process_incoming_migration_co_end(int ret, int ps) "ret=%d postcopy-state=%d"
process_incoming_migration_co_postcopy_end_main(void) "" process_incoming_migration_co_postcopy_end_main(void) ""
migration_set_incoming_channel(void *ioc, const char *ioctype) "ioc=%p ioctype=%s" migration_set_incoming_channel(void *ioc, const char *ioctype) "ioc=%p ioctype=%s"
migration_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname, void *err) "ioc=%p ioctype=%s hostname=%s err=%p" migration_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname, void *err) "ioc=%p ioctype=%s hostname=%s err=%p"
mark_postcopy_blocktime_begin(uint64_t addr, void *dd, uint32_t time, int cpu, int received) "addr: 0x%" PRIx64 ", dd: %p, time: %u, cpu: %d, already_received: %d"
mark_postcopy_blocktime_end(uint64_t addr, void *dd, uint32_t time, int affected_cpu) "addr: 0x%" PRIx64 ", dd: %p, time: %u, affected_cpu: %d"
# migration/rdma.c # migration/rdma.c
qemu_rdma_accept_incoming_migration(void) "" qemu_rdma_accept_incoming_migration(void) ""
@ -193,11 +195,12 @@ postcopy_ram_fault_thread_exit(void) ""
postcopy_ram_fault_thread_fds_core(int baseufd, int quitfd) "ufd: %d quitfd: %d" postcopy_ram_fault_thread_fds_core(int baseufd, int quitfd) "ufd: %d quitfd: %d"
postcopy_ram_fault_thread_fds_extra(size_t index, const char *name, int fd) "%zd/%s: %d" postcopy_ram_fault_thread_fds_extra(size_t index, const char *name, int fd) "%zd/%s: %d"
postcopy_ram_fault_thread_quit(void) "" postcopy_ram_fault_thread_quit(void) ""
postcopy_ram_fault_thread_request(uint64_t hostaddr, const char *ramblock, size_t offset) "Request for HVA=0x%" PRIx64 " rb=%s offset=0x%zx" postcopy_ram_fault_thread_request(uint64_t hostaddr, const char *ramblock, size_t offset, uint32_t pid) "Request for HVA=0x%" PRIx64 " rb=%s offset=0x%zx pid=%u"
postcopy_ram_incoming_cleanup_closeuf(void) "" postcopy_ram_incoming_cleanup_closeuf(void) ""
postcopy_ram_incoming_cleanup_entry(void) "" postcopy_ram_incoming_cleanup_entry(void) ""
postcopy_ram_incoming_cleanup_exit(void) "" postcopy_ram_incoming_cleanup_exit(void) ""
postcopy_ram_incoming_cleanup_join(void) "" postcopy_ram_incoming_cleanup_join(void) ""
postcopy_ram_incoming_cleanup_blocktime(uint64_t total) "total blocktime %" PRIu64
postcopy_request_shared_page(const char *sharer, const char *rb, uint64_t rb_offset) "for %s in %s offset 0x%"PRIx64 postcopy_request_shared_page(const char *sharer, const char *rb, uint64_t rb_offset) "for %s in %s offset 0x%"PRIx64
postcopy_request_shared_page_present(const char *sharer, const char *rb, uint64_t rb_offset) "%s already %s offset 0x%"PRIx64 postcopy_request_shared_page_present(const char *sharer, const char *rb, uint64_t rb_offset) "%s already %s offset 0x%"PRIx64
postcopy_wake_shared(uint64_t client_addr, const char *rb) "at 0x%"PRIx64" in %s" postcopy_wake_shared(uint64_t client_addr, const char *rb) "at 0x%"PRIx64" in %s"
@ -206,6 +209,7 @@ save_xbzrle_page_skipping(void) ""
save_xbzrle_page_overflow(void) "" save_xbzrle_page_overflow(void) ""
ram_save_iterate_big_wait(uint64_t milliconds, int iterations) "big wait: %" PRIu64 " milliseconds, %d iterations" ram_save_iterate_big_wait(uint64_t milliconds, int iterations) "big wait: %" PRIu64 " milliseconds, %d iterations"
ram_load_complete(int ret, uint64_t seq_iter) "exit_code %d seq iteration %" PRIu64 ram_load_complete(int ret, uint64_t seq_iter) "exit_code %d seq iteration %" PRIu64
get_mem_fault_cpu_index(int cpu, uint32_t pid) "cpu: %d, pid: %u"
# migration/exec.c # migration/exec.c
migration_exec_outgoing(const char *cmd) "cmd=%s" migration_exec_outgoing(const char *cmd) "cmd=%s"

View File

@ -155,6 +155,13 @@
# @error-desc: the human readable error description string, when # @error-desc: the human readable error description string, when
# @status is 'failed'. Clients should not attempt to parse the # @status is 'failed'. Clients should not attempt to parse the
# error strings. (Since 2.7) # error strings. (Since 2.7)
#
# @postcopy-blocktime: total time when all vCPU were blocked during postcopy
# live migration (Since 2.13)
#
# @postcopy-vcpu-blocktime: list of the postcopy blocktime per vCPU (Since 2.13)
#
# #
# Since: 0.14.0 # Since: 0.14.0
## ##
@ -167,7 +174,9 @@
'*downtime': 'int', '*downtime': 'int',
'*setup-time': 'int', '*setup-time': 'int',
'*cpu-throttle-percentage': 'int', '*cpu-throttle-percentage': 'int',
'*error-desc': 'str'} } '*error-desc': 'str',
'*postcopy-blocktime' : 'uint32',
'*postcopy-vcpu-blocktime': ['uint32']} }
## ##
# @query-migrate: # @query-migrate:
@ -354,16 +363,20 @@
# #
# @x-multifd: Use more than one fd for migration (since 2.11) # @x-multifd: Use more than one fd for migration (since 2.11)
# #
#
# @dirty-bitmaps: If enabled, QEMU will migrate named dirty bitmaps. # @dirty-bitmaps: If enabled, QEMU will migrate named dirty bitmaps.
# (since 2.12) # (since 2.12)
# #
# @postcopy-blocktime: Calculate downtime for postcopy live migration
# (since 2.13)
#
# Since: 1.2 # Since: 1.2
## ##
{ 'enum': 'MigrationCapability', { 'enum': 'MigrationCapability',
'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks', 'data': ['xbzrle', 'rdma-pin-all', 'auto-converge', 'zero-blocks',
'compress', 'events', 'postcopy-ram', 'x-colo', 'release-ram', 'compress', 'events', 'postcopy-ram', 'x-colo', 'release-ram',
'block', 'return-path', 'pause-before-switchover', 'x-multifd', 'block', 'return-path', 'pause-before-switchover', 'x-multifd',
'dirty-bitmaps' ] } 'dirty-bitmaps', 'postcopy-blocktime' ] }
## ##
# @MigrationCapabilityStatus: # @MigrationCapabilityStatus:

View File

@ -26,6 +26,7 @@
const unsigned start_address = 1024 * 1024; const unsigned start_address = 1024 * 1024;
const unsigned end_address = 100 * 1024 * 1024; const unsigned end_address = 100 * 1024 * 1024;
bool got_stop; bool got_stop;
static bool uffd_feature_thread_id;
#if defined(__linux__) #if defined(__linux__)
#include <sys/syscall.h> #include <sys/syscall.h>
@ -55,6 +56,7 @@ static bool ufd_version_check(void)
g_test_message("Skipping test: UFFDIO_API failed"); g_test_message("Skipping test: UFFDIO_API failed");
return false; return false;
} }
uffd_feature_thread_id = api_struct.features & UFFD_FEATURE_THREAD_ID;
ioctl_mask = (__u64)1 << _UFFDIO_REGISTER | ioctl_mask = (__u64)1 << _UFFDIO_REGISTER |
(__u64)1 << _UFFDIO_UNREGISTER; (__u64)1 << _UFFDIO_UNREGISTER;
@ -223,6 +225,16 @@ static uint64_t get_migration_pass(QTestState *who)
return result; return result;
} }
static void read_blocktime(QTestState *who)
{
QDict *rsp, *rsp_return;
rsp = wait_command(who, "{ 'execute': 'query-migrate' }");
rsp_return = qdict_get_qdict(rsp, "return");
g_assert(qdict_haskey(rsp_return, "postcopy-blocktime"));
QDECREF(rsp);
}
static void wait_for_migration_complete(QTestState *who) static void wait_for_migration_complete(QTestState *who)
{ {
while (true) { while (true) {
@ -533,6 +545,7 @@ static void test_migrate(void)
migrate_set_capability(from, "postcopy-ram", "true"); migrate_set_capability(from, "postcopy-ram", "true");
migrate_set_capability(to, "postcopy-ram", "true"); migrate_set_capability(to, "postcopy-ram", "true");
migrate_set_capability(to, "postcopy-blocktime", "true");
/* We want to pick a speed slow enough that the test completes /* We want to pick a speed slow enough that the test completes
* quickly, but that it doesn't complete precopy even on a slow * quickly, but that it doesn't complete precopy even on a slow
@ -559,6 +572,9 @@ static void test_migrate(void)
wait_for_serial("dest_serial"); wait_for_serial("dest_serial");
wait_for_migration_complete(from); wait_for_migration_complete(from);
if (uffd_feature_thread_id) {
read_blocktime(to);
}
g_free(uri); g_free(uri);
test_migrate_end(from, to, true); test_migrate_end(from, to, true);