thread-pool: avoid passing the pool parameter every time
thread_pool_submit_aio() is always called on a pool taken from qemu_get_current_aio_context(), and that is the only intended use: each pool runs only in the same thread that is submitting work to it, it can't run anywhere else. Therefore simplify the thread_pool_submit* API and remove the ThreadPool function parameter. Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com> Message-Id: <20230203131731.851116-5-eesposit@redhat.com> Reviewed-by: Kevin Wolf <kwolf@redhat.com> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
This commit is contained in:
parent
0fdb73112b
commit
aef04fc790
@ -100,8 +100,6 @@ bool tpm_backend_had_startup_error(TPMBackend *s)
|
||||
|
||||
void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
|
||||
{
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
|
||||
if (s->cmd != NULL) {
|
||||
error_report("There is a TPM request pending");
|
||||
return;
|
||||
@ -109,7 +107,7 @@ void tpm_backend_deliver_request(TPMBackend *s, TPMBackendCmd *cmd)
|
||||
|
||||
s->cmd = cmd;
|
||||
object_ref(OBJECT(s));
|
||||
thread_pool_submit_aio(pool, tpm_backend_worker_thread, s,
|
||||
thread_pool_submit_aio(tpm_backend_worker_thread, s,
|
||||
tpm_backend_request_completed, s);
|
||||
}
|
||||
|
||||
|
@ -2042,9 +2042,7 @@ out:
|
||||
|
||||
static int coroutine_fn raw_thread_pool_submit(ThreadPoolFunc func, void *arg)
|
||||
{
|
||||
/* @bs can be NULL, bdrv_get_aio_context() returns the main context then */
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
|
||||
return thread_pool_submit_co(pool, func, arg);
|
||||
return thread_pool_submit_co(func, arg);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -153,7 +153,6 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
|
||||
BlockCompletionFunc *cb, void *opaque, int type)
|
||||
{
|
||||
RawWin32AIOData *acb = g_new(RawWin32AIOData, 1);
|
||||
ThreadPool *pool;
|
||||
|
||||
acb->bs = bs;
|
||||
acb->hfile = hfile;
|
||||
@ -168,8 +167,7 @@ static BlockAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile,
|
||||
acb->aio_offset = offset;
|
||||
|
||||
trace_file_paio_submit(acb, opaque, offset, count, type);
|
||||
pool = aio_get_thread_pool(qemu_get_current_aio_context());
|
||||
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
|
||||
return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
|
||||
}
|
||||
|
||||
int qemu_ftruncate64(int fd, int64_t length)
|
||||
|
@ -43,7 +43,6 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
|
||||
{
|
||||
int ret;
|
||||
BDRVQcow2State *s = bs->opaque;
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_current_aio_context());
|
||||
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
while (s->nb_threads >= QCOW2_MAX_THREADS) {
|
||||
@ -52,7 +51,7 @@ qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
|
||||
s->nb_threads++;
|
||||
qemu_co_mutex_unlock(&s->lock);
|
||||
|
||||
ret = thread_pool_submit_co(pool, func, arg);
|
||||
ret = thread_pool_submit_co(func, arg);
|
||||
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
s->nb_threads--;
|
||||
|
@ -41,6 +41,5 @@ static int coroutine_enter_func(void *arg)
|
||||
void co_run_in_worker_bh(void *opaque)
|
||||
{
|
||||
Coroutine *co = opaque;
|
||||
thread_pool_submit_aio(aio_get_thread_pool(qemu_get_aio_context()),
|
||||
coroutine_enter_func, co, coroutine_enter_cb, co);
|
||||
thread_pool_submit_aio(coroutine_enter_func, co, coroutine_enter_cb, co);
|
||||
}
|
||||
|
@ -496,7 +496,6 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
|
||||
{
|
||||
SpaprNVDIMMDevice *s_nvdimm = (SpaprNVDIMMDevice *)opaque;
|
||||
SpaprNVDIMMDeviceFlushState *state;
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
HostMemoryBackend *backend = MEMORY_BACKEND(PC_DIMM(s_nvdimm)->hostmem);
|
||||
bool is_pmem = object_property_get_bool(OBJECT(backend), "pmem", NULL);
|
||||
bool pmem_override = object_property_get_bool(OBJECT(s_nvdimm),
|
||||
@ -517,7 +516,7 @@ static int spapr_nvdimm_flush_post_load(void *opaque, int version_id)
|
||||
}
|
||||
|
||||
QLIST_FOREACH(state, &s_nvdimm->pending_nvdimm_flush_states, node) {
|
||||
thread_pool_submit_aio(pool, flush_worker_cb, state,
|
||||
thread_pool_submit_aio(flush_worker_cb, state,
|
||||
spapr_nvdimm_flush_completion_cb, state);
|
||||
}
|
||||
|
||||
@ -664,7 +663,6 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
|
||||
PCDIMMDevice *dimm;
|
||||
HostMemoryBackend *backend = NULL;
|
||||
SpaprNVDIMMDeviceFlushState *state;
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
int fd;
|
||||
|
||||
if (!drc || !drc->dev ||
|
||||
@ -699,7 +697,7 @@ static target_ulong h_scm_flush(PowerPCCPU *cpu, SpaprMachineState *spapr,
|
||||
|
||||
state->drcidx = drc_index;
|
||||
|
||||
thread_pool_submit_aio(pool, flush_worker_cb, state,
|
||||
thread_pool_submit_aio(flush_worker_cb, state,
|
||||
spapr_nvdimm_flush_completion_cb, state);
|
||||
|
||||
continue_token = state->continue_token;
|
||||
|
@ -70,7 +70,6 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
|
||||
VirtIODeviceRequest *req_data;
|
||||
VirtIOPMEM *pmem = VIRTIO_PMEM(vdev);
|
||||
HostMemoryBackend *backend = MEMORY_BACKEND(pmem->memdev);
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
|
||||
trace_virtio_pmem_flush_request();
|
||||
req_data = virtqueue_pop(vq, sizeof(VirtIODeviceRequest));
|
||||
@ -88,7 +87,7 @@ static void virtio_pmem_flush(VirtIODevice *vdev, VirtQueue *vq)
|
||||
req_data->fd = memory_region_get_fd(&backend->mr);
|
||||
req_data->pmem = pmem;
|
||||
req_data->vdev = vdev;
|
||||
thread_pool_submit_aio(pool, worker_cb, req_data, done_cb, req_data);
|
||||
thread_pool_submit_aio(worker_cb, req_data, done_cb, req_data);
|
||||
}
|
||||
|
||||
static void virtio_pmem_get_config(VirtIODevice *vdev, uint8_t *config)
|
||||
|
@ -33,12 +33,10 @@ void thread_pool_free(ThreadPool *pool);
|
||||
* thread_pool_submit* API: submit I/O requests in the thread's
|
||||
* current AioContext.
|
||||
*/
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
|
||||
ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque);
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
|
||||
ThreadPoolFunc *func, void *arg);
|
||||
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque);
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
|
||||
void thread_pool_submit(ThreadPoolFunc *func, void *arg);
|
||||
|
||||
void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
|
||||
|
||||
|
@ -51,7 +51,6 @@ static int pr_manager_worker(void *opaque)
|
||||
int coroutine_fn pr_manager_execute(PRManager *pr_mgr, AioContext *ctx, int fd,
|
||||
struct sg_io_hdr *hdr)
|
||||
{
|
||||
ThreadPool *pool = aio_get_thread_pool(ctx);
|
||||
PRManagerData data = {
|
||||
.pr_mgr = pr_mgr,
|
||||
.fd = fd,
|
||||
@ -62,7 +61,7 @@ int coroutine_fn pr_manager_execute(PRManager *pr_mgr, AioContext *ctx, int fd,
|
||||
|
||||
/* The matching object_unref is in pr_manager_worker. */
|
||||
object_ref(OBJECT(pr_mgr));
|
||||
return thread_pool_submit_co(pool, pr_manager_worker, &data);
|
||||
return thread_pool_submit_co(pr_manager_worker, &data);
|
||||
}
|
||||
|
||||
bool pr_manager_is_connected(PRManager *pr_mgr)
|
||||
|
@ -180,7 +180,6 @@ static int do_sgio_worker(void *opaque)
|
||||
static int do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
uint8_t *buf, int *sz, int dir)
|
||||
{
|
||||
ThreadPool *pool = aio_get_thread_pool(qemu_get_aio_context());
|
||||
int r;
|
||||
|
||||
PRHelperSGIOData data = {
|
||||
@ -192,7 +191,7 @@ static int do_sgio(int fd, const uint8_t *cdb, uint8_t *sense,
|
||||
.dir = dir,
|
||||
};
|
||||
|
||||
r = thread_pool_submit_co(pool, do_sgio_worker, &data);
|
||||
r = thread_pool_submit_co(do_sgio_worker, &data);
|
||||
*sz = data.sz;
|
||||
return r;
|
||||
}
|
||||
|
@ -8,7 +8,6 @@
|
||||
#include "qemu/main-loop.h"
|
||||
|
||||
static AioContext *ctx;
|
||||
static ThreadPool *pool;
|
||||
static int active;
|
||||
|
||||
typedef struct {
|
||||
@ -47,7 +46,7 @@ static void done_cb(void *opaque, int ret)
|
||||
static void test_submit(void)
|
||||
{
|
||||
WorkerTestData data = { .n = 0 };
|
||||
thread_pool_submit(pool, worker_cb, &data);
|
||||
thread_pool_submit(worker_cb, &data);
|
||||
while (data.n == 0) {
|
||||
aio_poll(ctx, true);
|
||||
}
|
||||
@ -57,7 +56,7 @@ static void test_submit(void)
|
||||
static void test_submit_aio(void)
|
||||
{
|
||||
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
|
||||
data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
|
||||
data.aiocb = thread_pool_submit_aio(worker_cb, &data,
|
||||
done_cb, &data);
|
||||
|
||||
/* The callbacks are not called until after the first wait. */
|
||||
@ -78,7 +77,7 @@ static void co_test_cb(void *opaque)
|
||||
active = 1;
|
||||
data->n = 0;
|
||||
data->ret = -EINPROGRESS;
|
||||
thread_pool_submit_co(pool, worker_cb, data);
|
||||
thread_pool_submit_co(worker_cb, data);
|
||||
|
||||
/* The test continues in test_submit_co, after qemu_coroutine_enter... */
|
||||
|
||||
@ -122,7 +121,7 @@ static void test_submit_many(void)
|
||||
for (i = 0; i < 100; i++) {
|
||||
data[i].n = 0;
|
||||
data[i].ret = -EINPROGRESS;
|
||||
thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
|
||||
thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
|
||||
}
|
||||
|
||||
active = 100;
|
||||
@ -150,7 +149,7 @@ static void do_test_cancel(bool sync)
|
||||
for (i = 0; i < 100; i++) {
|
||||
data[i].n = 0;
|
||||
data[i].ret = -EINPROGRESS;
|
||||
data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
|
||||
data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
|
||||
done_cb, &data[i]);
|
||||
}
|
||||
|
||||
@ -235,7 +234,6 @@ int main(int argc, char **argv)
|
||||
{
|
||||
qemu_init_main_loop(&error_abort);
|
||||
ctx = qemu_get_current_aio_context();
|
||||
pool = aio_get_thread_pool(ctx);
|
||||
|
||||
g_test_init(&argc, &argv, NULL);
|
||||
g_test_add_func("/thread-pool/submit", test_submit);
|
||||
|
@ -241,11 +241,12 @@ static const AIOCBInfo thread_pool_aiocb_info = {
|
||||
.get_aio_context = thread_pool_get_aio_context,
|
||||
};
|
||||
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
|
||||
ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque)
|
||||
BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
|
||||
BlockCompletionFunc *cb, void *opaque)
|
||||
{
|
||||
ThreadPoolElement *req;
|
||||
AioContext *ctx = qemu_get_current_aio_context();
|
||||
ThreadPool *pool = aio_get_thread_pool(ctx);
|
||||
|
||||
/* Assert that the thread submitting work is the same running the pool */
|
||||
assert(pool->ctx == qemu_get_current_aio_context());
|
||||
@ -283,19 +284,18 @@ static void thread_pool_co_cb(void *opaque, int ret)
|
||||
aio_co_wake(co->co);
|
||||
}
|
||||
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
|
||||
void *arg)
|
||||
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
|
||||
{
|
||||
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
|
||||
assert(qemu_in_coroutine());
|
||||
thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
|
||||
thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
|
||||
qemu_coroutine_yield();
|
||||
return tpc.ret;
|
||||
}
|
||||
|
||||
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
|
||||
void thread_pool_submit(ThreadPoolFunc *func, void *arg)
|
||||
{
|
||||
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
|
||||
thread_pool_submit_aio(func, arg, NULL, NULL);
|
||||
}
|
||||
|
||||
void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
|
||||
|
Loading…
Reference in New Issue
Block a user