migration/multifd: Drop MultiFDSendParams.quit, cleanup error paths
Multifd send side has two fields to indicate error quits: - MultiFDSendParams.quit - &multifd_send_state->exiting Merge them into the global one. The replacement is done by changing all p->quit checks into the global var check. The global check doesn't need any lock. A few more things done on top of this altogether: - multifd_send_terminate_threads() Moving the xchg() of &multifd_send_state->exiting upper, so as to cover the tracepoint, migrate_set_error() and migrate_set_state(). - multifd_send_sync_main() In the 2nd loop, add one more check over the global var to make sure we don't keep the looping if QEMU already decided to quit. - multifd_tls_outgoing_handshake() Use multifd_send_terminate_threads() to set the error state. That has a benefit of updating MigrationState.error to that error too, so we can persist that 1st error we hit in that specific channel. - multifd_new_send_channel_async() Take similar approach like above, drop the migrate_set_error() because multifd_send_terminate_threads() already covers that. Unwrap the helper multifd_new_send_channel_cleanup() along the way; not really needed. Reviewed-by: Fabiano Rosas <farosas@suse.de> Link: https://lore.kernel.org/r/20240202102857.110210-4-peterx@redhat.com Signed-off-by: Peter Xu <peterx@redhat.com>
This commit is contained in:
parent
48c0f5d56f
commit
15f3f21d59
|
@ -372,6 +372,11 @@ struct {
|
||||||
MultiFDMethods *ops;
|
MultiFDMethods *ops;
|
||||||
} *multifd_send_state;
|
} *multifd_send_state;
|
||||||
|
|
||||||
|
static bool multifd_send_should_exit(void)
|
||||||
|
{
|
||||||
|
return qatomic_read(&multifd_send_state->exiting);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The migration thread can wait on either of the two semaphores. This
|
* The migration thread can wait on either of the two semaphores. This
|
||||||
* function can be used to kick the main thread out of waiting on either of
|
* function can be used to kick the main thread out of waiting on either of
|
||||||
|
@ -409,7 +414,7 @@ static int multifd_send_pages(void)
|
||||||
MultiFDSendParams *p = NULL; /* make happy gcc */
|
MultiFDSendParams *p = NULL; /* make happy gcc */
|
||||||
MultiFDPages_t *pages = multifd_send_state->pages;
|
MultiFDPages_t *pages = multifd_send_state->pages;
|
||||||
|
|
||||||
if (qatomic_read(&multifd_send_state->exiting)) {
|
if (multifd_send_should_exit()) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,14 +426,11 @@ static int multifd_send_pages(void)
|
||||||
*/
|
*/
|
||||||
next_channel %= migrate_multifd_channels();
|
next_channel %= migrate_multifd_channels();
|
||||||
for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
|
for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
|
||||||
p = &multifd_send_state->params[i];
|
if (multifd_send_should_exit()) {
|
||||||
|
|
||||||
qemu_mutex_lock(&p->mutex);
|
|
||||||
if (p->quit) {
|
|
||||||
error_report("%s: channel %d has already quit!", __func__, i);
|
|
||||||
qemu_mutex_unlock(&p->mutex);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
p = &multifd_send_state->params[i];
|
||||||
|
qemu_mutex_lock(&p->mutex);
|
||||||
if (!p->pending_job) {
|
if (!p->pending_job) {
|
||||||
p->pending_job++;
|
p->pending_job++;
|
||||||
next_channel = (i + 1) % migrate_multifd_channels();
|
next_channel = (i + 1) % migrate_multifd_channels();
|
||||||
|
@ -483,6 +485,16 @@ static void multifd_send_terminate_threads(Error *err)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't want to exit each threads twice. Depending on where
|
||||||
|
* we get the error, or if there are two independent errors in two
|
||||||
|
* threads at the same time, we can end calling this function
|
||||||
|
* twice.
|
||||||
|
*/
|
||||||
|
if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
trace_multifd_send_terminate_threads(err != NULL);
|
trace_multifd_send_terminate_threads(err != NULL);
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -497,26 +509,13 @@ static void multifd_send_terminate_threads(Error *err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* We don't want to exit each threads twice. Depending on where
|
|
||||||
* we get the error, or if there are two independent errors in two
|
|
||||||
* threads at the same time, we can end calling this function
|
|
||||||
* twice.
|
|
||||||
*/
|
|
||||||
if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i = 0; i < migrate_multifd_channels(); i++) {
|
for (i = 0; i < migrate_multifd_channels(); i++) {
|
||||||
MultiFDSendParams *p = &multifd_send_state->params[i];
|
MultiFDSendParams *p = &multifd_send_state->params[i];
|
||||||
|
|
||||||
qemu_mutex_lock(&p->mutex);
|
|
||||||
p->quit = true;
|
|
||||||
qemu_sem_post(&p->sem);
|
qemu_sem_post(&p->sem);
|
||||||
if (p->c) {
|
if (p->c) {
|
||||||
qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
|
qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
|
||||||
}
|
}
|
||||||
qemu_mutex_unlock(&p->mutex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -615,16 +614,13 @@ int multifd_send_sync_main(void)
|
||||||
for (i = 0; i < migrate_multifd_channels(); i++) {
|
for (i = 0; i < migrate_multifd_channels(); i++) {
|
||||||
MultiFDSendParams *p = &multifd_send_state->params[i];
|
MultiFDSendParams *p = &multifd_send_state->params[i];
|
||||||
|
|
||||||
trace_multifd_send_sync_main_signal(p->id);
|
if (multifd_send_should_exit()) {
|
||||||
|
|
||||||
qemu_mutex_lock(&p->mutex);
|
|
||||||
|
|
||||||
if (p->quit) {
|
|
||||||
error_report("%s: channel %d has already quit", __func__, i);
|
|
||||||
qemu_mutex_unlock(&p->mutex);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trace_multifd_send_sync_main_signal(p->id);
|
||||||
|
|
||||||
|
qemu_mutex_lock(&p->mutex);
|
||||||
p->packet_num = multifd_send_state->packet_num++;
|
p->packet_num = multifd_send_state->packet_num++;
|
||||||
p->flags |= MULTIFD_FLAG_SYNC;
|
p->flags |= MULTIFD_FLAG_SYNC;
|
||||||
p->pending_job++;
|
p->pending_job++;
|
||||||
|
@ -634,6 +630,10 @@ int multifd_send_sync_main(void)
|
||||||
for (i = 0; i < migrate_multifd_channels(); i++) {
|
for (i = 0; i < migrate_multifd_channels(); i++) {
|
||||||
MultiFDSendParams *p = &multifd_send_state->params[i];
|
MultiFDSendParams *p = &multifd_send_state->params[i];
|
||||||
|
|
||||||
|
if (multifd_send_should_exit()) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
qemu_sem_wait(&multifd_send_state->channels_ready);
|
qemu_sem_wait(&multifd_send_state->channels_ready);
|
||||||
trace_multifd_send_sync_main_wait(p->id);
|
trace_multifd_send_sync_main_wait(p->id);
|
||||||
qemu_sem_wait(&p->sem_sync);
|
qemu_sem_wait(&p->sem_sync);
|
||||||
|
@ -671,7 +671,7 @@ static void *multifd_send_thread(void *opaque)
|
||||||
qemu_sem_post(&multifd_send_state->channels_ready);
|
qemu_sem_post(&multifd_send_state->channels_ready);
|
||||||
qemu_sem_wait(&p->sem);
|
qemu_sem_wait(&p->sem);
|
||||||
|
|
||||||
if (qatomic_read(&multifd_send_state->exiting)) {
|
if (multifd_send_should_exit()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
qemu_mutex_lock(&p->mutex);
|
qemu_mutex_lock(&p->mutex);
|
||||||
|
@ -786,12 +786,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
|
||||||
|
|
||||||
trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
|
trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
|
||||||
|
|
||||||
migrate_set_error(migrate_get_current(), err);
|
multifd_send_terminate_threads(err);
|
||||||
/*
|
|
||||||
* Error happen, mark multifd_send_thread status as 'quit' although it
|
|
||||||
* is not created, and then tell who pay attention to me.
|
|
||||||
*/
|
|
||||||
p->quit = true;
|
|
||||||
multifd_send_kick_main(p);
|
multifd_send_kick_main(p);
|
||||||
error_free(err);
|
error_free(err);
|
||||||
}
|
}
|
||||||
|
@ -857,22 +852,6 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
|
|
||||||
QIOChannel *ioc, Error *err)
|
|
||||||
{
|
|
||||||
migrate_set_error(migrate_get_current(), err);
|
|
||||||
/* Error happen, we need to tell who pay attention to me */
|
|
||||||
multifd_send_kick_main(p);
|
|
||||||
/*
|
|
||||||
* Although multifd_send_thread is not created, but main migration
|
|
||||||
* thread need to judge whether it is running, so we need to mark
|
|
||||||
* its status.
|
|
||||||
*/
|
|
||||||
p->quit = true;
|
|
||||||
object_unref(OBJECT(ioc));
|
|
||||||
error_free(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
|
static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
|
||||||
{
|
{
|
||||||
MultiFDSendParams *p = opaque;
|
MultiFDSendParams *p = opaque;
|
||||||
|
@ -889,7 +868,10 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
|
||||||
}
|
}
|
||||||
|
|
||||||
trace_multifd_new_send_channel_async_error(p->id, local_err);
|
trace_multifd_new_send_channel_async_error(p->id, local_err);
|
||||||
multifd_new_send_channel_cleanup(p, ioc, local_err);
|
multifd_send_terminate_threads(local_err);
|
||||||
|
multifd_send_kick_main(p);
|
||||||
|
object_unref(OBJECT(ioc));
|
||||||
|
error_free(local_err);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void multifd_new_send_channel_create(gpointer opaque)
|
static void multifd_new_send_channel_create(gpointer opaque)
|
||||||
|
@ -921,7 +903,6 @@ int multifd_save_setup(Error **errp)
|
||||||
qemu_mutex_init(&p->mutex);
|
qemu_mutex_init(&p->mutex);
|
||||||
qemu_sem_init(&p->sem, 0);
|
qemu_sem_init(&p->sem, 0);
|
||||||
qemu_sem_init(&p->sem_sync, 0);
|
qemu_sem_init(&p->sem_sync, 0);
|
||||||
p->quit = false;
|
|
||||||
p->pending_job = 0;
|
p->pending_job = 0;
|
||||||
p->id = i;
|
p->id = i;
|
||||||
p->pages = multifd_pages_init(page_count);
|
p->pages = multifd_pages_init(page_count);
|
||||||
|
|
|
@ -95,8 +95,6 @@ typedef struct {
|
||||||
QemuMutex mutex;
|
QemuMutex mutex;
|
||||||
/* is this channel thread running */
|
/* is this channel thread running */
|
||||||
bool running;
|
bool running;
|
||||||
/* should this thread finish */
|
|
||||||
bool quit;
|
|
||||||
/* multifd flags for each packet */
|
/* multifd flags for each packet */
|
||||||
uint32_t flags;
|
uint32_t flags;
|
||||||
/* global number of generated multifd packets */
|
/* global number of generated multifd packets */
|
||||||
|
|
Loading…
Reference in New Issue