diff --git a/arch_init.c b/arch_init.c index 4c8fceed95..179c58c852 100644 --- a/arch_init.c +++ b/arch_init.c @@ -316,6 +316,69 @@ static uint64_t migration_dirty_pages; static uint32_t last_version; static bool ram_bulk_stage; +struct CompressParam { + /* To be done */ +}; +typedef struct CompressParam CompressParam; + +static CompressParam *comp_param; +static QemuThread *compress_threads; +static bool quit_comp_thread; + +static void *do_data_compress(void *opaque) +{ + while (!quit_comp_thread) { + + /* To be done */ + + } + + return NULL; +} + +static inline void terminate_compression_threads(void) +{ + quit_comp_thread = true; + + /* To be done */ +} + +void migrate_compress_threads_join(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + terminate_compression_threads(); + thread_count = migrate_compress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_thread_join(compress_threads + i); + } + g_free(compress_threads); + g_free(comp_param); + compress_threads = NULL; + comp_param = NULL; +} + +void migrate_compress_threads_create(void) +{ + int i, thread_count; + + if (!migrate_use_compression()) { + return; + } + quit_comp_thread = false; + thread_count = migrate_compress_threads(); + compress_threads = g_new0(QemuThread, thread_count); + comp_param = g_new0(CompressParam, thread_count); + for (i = 0; i < thread_count; i++) { + qemu_thread_create(compress_threads + i, "compress", + do_data_compress, comp_param + i, + QEMU_THREAD_JOINABLE); + } +} + /** * save_page_header: Write page header to wire * @@ -692,6 +755,28 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset, return pages; } +/** + * ram_save_compressed_page: compress the given page and send it to the stream + * + * Returns: Number of pages written. + * + * @f: QEMUFile where to send the data + * @block: block that contains the page we want to send + * @offset: offset inside the block for the page + * @last_stage: if we are at the completion stage + * @bytes_transferred: increase it with the number of transferred bytes + */ +static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block, + ram_addr_t offset, bool last_stage, + uint64_t *bytes_transferred) +{ + int pages = -1; + + /* To be done*/ + + return pages; +} + /** * ram_find_and_save_block: Finds a dirty page and sends it to f * @@ -733,8 +818,13 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage, ram_bulk_stage = false; } } else { - pages = ram_save_page(f, block, offset, last_stage, - bytes_transferred); + if (migrate_use_compression()) { + pages = ram_save_compressed_page(f, block, offset, last_stage, + bytes_transferred); + } else { + pages = ram_save_page(f, block, offset, last_stage, + bytes_transferred); + } /* if page is unmodified, continue to the next */ if (pages > 0) { diff --git a/include/migration/migration.h b/include/migration/migration.h index bf09968d76..a3ebbf6c6e 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -50,6 +50,8 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + int compress_thread_count; + int compress_level; int state; MigrationParams params; @@ -104,6 +106,8 @@ bool migration_has_finished(MigrationState *); bool migration_has_failed(MigrationState *); MigrationState *migrate_get_current(void); +void migrate_compress_threads_create(void); +void migrate_compress_threads_join(void); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_transferred(void); uint64_t ram_bytes_total(void); @@ -152,6 +156,10 @@ int64_t migrate_xbzrle_cache_size(void); int64_t xbzrle_cache_resize(int64_t new_size); +bool migrate_use_compression(void); +int migrate_compress_level(void); +int migrate_compress_threads(void); + void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); void ram_control_load_hook(QEMUFile *f, uint64_t flags); diff --git a/migration/migration.c b/migration/migration.c index bc424907f3..5a8b5a7c74 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -33,6 +33,11 @@ #define BUFFER_DELAY 100 #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY) +/* Default compression thread count */ +#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 +/*0: means nocompress, 1: best speed, ... 9: best compress ratio */ +#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 + /* Migration XBZRLE default cache size */ #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) @@ -52,6 +57,8 @@ MigrationState *migrate_get_current(void) .bandwidth_limit = MAX_THROTTLE, .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, .mbps = -1, + .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, + .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, }; return ¤t_migration; @@ -305,6 +312,7 @@ static void migrate_fd_cleanup(void *opaque) qemu_thread_join(&s->thread); qemu_mutex_lock_iothread(); + migrate_compress_threads_join(); qemu_fclose(s->file); s->file = NULL; } @@ -390,6 +398,8 @@ static MigrationState *migrate_init(const MigrationParams *params) int64_t bandwidth_limit = s->bandwidth_limit; bool enabled_capabilities[MIGRATION_CAPABILITY_MAX]; int64_t xbzrle_cache_size = s->xbzrle_cache_size; + int compress_level = s->compress_level; + int compress_thread_count = s->compress_thread_count; memcpy(enabled_capabilities, s->enabled_capabilities, sizeof(enabled_capabilities)); @@ -400,6 +410,8 @@ static MigrationState *migrate_init(const MigrationParams *params) sizeof(enabled_capabilities)); s->xbzrle_cache_size = xbzrle_cache_size; + s->compress_level = compress_level; + s->compress_thread_count = compress_thread_count; s->bandwidth_limit = bandwidth_limit; s->state = MIGRATION_STATUS_SETUP; trace_migrate_set_state(MIGRATION_STATUS_SETUP); @@ -587,6 +599,30 @@ bool migrate_zero_blocks(void) return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; } +bool migrate_use_compression(void) +{ + /* Disable compression before the patch series are applied */ + return false; +} + +int migrate_compress_level(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->compress_level; +} + +int migrate_compress_threads(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->compress_thread_count; +} + int migrate_use_xbzrle(void) { MigrationState *s; @@ -730,6 +766,7 @@ void migrate_fd_connect(MigrationState *s) /* Notify before starting migration thread */ notifier_list_notify(&migration_state_notifiers, s); + migrate_compress_threads_create(); qemu_thread_create(&s->thread, "migration", migration_thread, s, QEMU_THREAD_JOINABLE); }