Fix multifd with big number of channels

-----BEGIN PGP SIGNATURE-----
 
 iQIzBAABCAAdFiEEGJn/jt6/WMzuA0uC9IfvGFhy1yMFAl1vWY8ACgkQ9IfvGFhy
 1yNmbw/+IqmakAyI7NMkKqIvnMbj2D7j8HOk1y2kbX6gKX7hYIX9/8ggdVar5cBT
 /uFcEk1pDuSY4Utx8YXXc/MZYgfO1j5eY9BU6VjaeUB+mxXPZcbX4b6PSuocL/me
 rfVOhQyYn1ZscFs5x+5lypPbWSWSRDz7pbMuaaNdX1USuZr5qEfxhkumZb9SywTD
 eWKnRFvIXznN/88W2nIgR4peneWiMjyqQ8Se8IYFXJ6ZnQshhldgPFA8XvNCiyE1
 JbUWdWNnYVrlDAOkZOSDjMoJ8ozpyv+6qXyM5AEfmSjN6MQH6xbxxp/ysmcFMEzC
 X/HzkU0LlueEo71A1YsUTuuoeWP2cif34Z+jc9u3+LHi95AWDDW5AIEgjbHrLtVS
 i7mUDjZsYfNIN5mRHmD0aRE8U+Pa0fpsDXuL4AM0yKFuCxFchra/lmnJpPESlbL+
 d7n5XQAfbU+w1aeFJwdgSbvOuQhxoWKc+1i+hNsW2FmWfvOosP+JvISKKfMg4It5
 eWdY2ff1yxWynZbi99dYadAgyPiASRpvmgSeJwVYWeeM3rFLySqFYRJm96FI6Ycs
 uOfE3va9D+GnHV57F5rclM8ErmJZF2LHaG1mbEfiNWBCTnsZ76RZ+q/SAt8caun2
 SagBzpwyRsW5If2v36w+AflXXlN7EHC9oahsR0gV7s5GQ0BA9S8=
 =Bszc
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/juanquintela/tags/fail-pull-request' into staging

Fix multifd with big number of channels

# gpg: Signature made Wed 04 Sep 2019 07:28:31 BST
# gpg:                using RSA key 1899FF8EDEBF58CCEE034B82F487EF185872D723
# gpg: Good signature from "Juan Quintela <quintela@redhat.com>" [full]
# gpg:                 aka "Juan Quintela <quintela@trasno.org>" [full]
# Primary key fingerprint: 1899 FF8E DEBF 58CC EE03  4B82 F487 EF18 5872 D723

* remotes/juanquintela/tags/fail-pull-request:
  multifd: Use number of channels as listen backlog
  socket: Add num connections to qio_net_listener_open_sync()
  socket: Add num connections to qio_channel_socket_async()
  socket: Add num connections to qio_channel_socket_sync()
  socket: Add backlog parameter to socket_listen

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2019-09-04 15:38:27 +01:00
commit da9e0c2721
19 changed files with 87 additions and 43 deletions

View File

@ -101,7 +101,7 @@ void nbd_server_start(SocketAddress *addr, const char *tls_creds,
qio_net_listener_set_name(nbd_server->listener,
"nbd-listener");
if (qio_net_listener_open_sync(nbd_server->listener, addr, errp) < 0) {
if (qio_net_listener_open_sync(nbd_server->listener, addr, 1, errp) < 0) {
goto error;
}

View File

@ -1170,7 +1170,7 @@ static int qmp_chardev_open_socket_server(Chardev *chr,
qio_net_listener_set_name(s->listener, name);
g_free(name);
if (qio_net_listener_open_sync(s->listener, s->addr, errp) < 0) {
if (qio_net_listener_open_sync(s->listener, s->addr, 1, errp) < 0) {
object_unref(OBJECT(s->listener));
s->listener = NULL;
return -1;

View File

@ -123,6 +123,7 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
* qio_channel_socket_listen_sync:
* @ioc: the socket channel object
* @addr: the address to listen to
* @num: the expected ammount of connections
* @errp: pointer to a NULL-initialized error object
*
* Attempt to listen to the address @addr. This method
@ -132,12 +133,14 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
*/
int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
SocketAddress *addr,
int num,
Error **errp);
/**
* qio_channel_socket_listen_async:
* @ioc: the socket channel object
* @addr: the address to listen to
* @num: the expected ammount of connections
* @callback: the function to invoke on completion
* @opaque: user data to pass to @callback
* @destroy: the function to free @opaque
@ -153,6 +156,7 @@ int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
*/
void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
SocketAddress *addr,
int num,
QIOTaskFunc callback,
gpointer opaque,
GDestroyNotify destroy,

View File

@ -95,6 +95,7 @@ void qio_net_listener_set_name(QIONetListener *listener,
* qio_net_listener_open_sync:
* @listener: the network listener object
* @addr: the address to listen on
* @num: the amount of expected connections
* @errp: pointer to a NULL initialized error object
*
* Synchronously open a listening connection on all
@ -104,6 +105,7 @@ void qio_net_listener_set_name(QIONetListener *listener,
*/
int qio_net_listener_open_sync(QIONetListener *listener,
SocketAddress *addr,
int num,
Error **errp);
/**

View File

@ -41,7 +41,7 @@ int unix_connect(const char *path, Error **errp);
SocketAddress *socket_parse(const char *str, Error **errp);
int socket_connect(SocketAddress *addr, Error **errp);
int socket_listen(SocketAddress *addr, Error **errp);
int socket_listen(SocketAddress *addr, int num, Error **errp);
void socket_listen_cleanup(int fd, Error **errp);
int socket_dgram(SocketAddress *remote, SocketAddress *local, Error **errp);

View File

@ -197,12 +197,13 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
SocketAddress *addr,
int num,
Error **errp)
{
int fd;
trace_qio_channel_socket_listen_sync(ioc, addr);
fd = socket_listen(addr, errp);
trace_qio_channel_socket_listen_sync(ioc, addr, num);
fd = socket_listen(addr, num, errp);
if (fd < 0) {
trace_qio_channel_socket_listen_fail(ioc);
return -1;
@ -219,14 +220,27 @@ int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
}
struct QIOChannelListenWorkerData {
SocketAddress *addr;
int num; /* amount of expected connections */
};
static void qio_channel_listen_worker_free(gpointer opaque)
{
struct QIOChannelListenWorkerData *data = opaque;
qapi_free_SocketAddress(data->addr);
g_free(data);
}
static void qio_channel_socket_listen_worker(QIOTask *task,
gpointer opaque)
{
QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
SocketAddress *addr = opaque;
struct QIOChannelListenWorkerData *data = opaque;
Error *err = NULL;
qio_channel_socket_listen_sync(ioc, addr, &err);
qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
qio_task_set_error(task, err);
}
@ -234,6 +248,7 @@ static void qio_channel_socket_listen_worker(QIOTask *task,
void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
SocketAddress *addr,
int num,
QIOTaskFunc callback,
gpointer opaque,
GDestroyNotify destroy,
@ -241,16 +256,18 @@ void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
{
QIOTask *task = qio_task_new(
OBJECT(ioc), callback, opaque, destroy);
SocketAddress *addrCopy;
struct QIOChannelListenWorkerData *data;
addrCopy = QAPI_CLONE(SocketAddress, addr);
data = g_new0(struct QIOChannelListenWorkerData, 1);
data->addr = QAPI_CLONE(SocketAddress, addr);
data->num = num;
/* socket_listen() blocks in DNS lookups, so we must use a thread */
trace_qio_channel_socket_listen_async(ioc, addr);
trace_qio_channel_socket_listen_async(ioc, addr, num);
qio_task_run_in_thread(task,
qio_channel_socket_listen_worker,
addrCopy,
(GDestroyNotify)qapi_free_SocketAddress,
data,
qio_channel_listen_worker_free,
context);
}

View File

@ -62,6 +62,7 @@ static gboolean qio_net_listener_channel_func(QIOChannel *ioc,
int qio_net_listener_open_sync(QIONetListener *listener,
SocketAddress *addr,
int num,
Error **errp)
{
QIODNSResolver *resolver = qio_dns_resolver_get_instance();
@ -82,7 +83,7 @@ int qio_net_listener_open_sync(QIONetListener *listener,
for (i = 0; i < nresaddrs; i++) {
QIOChannelSocket *sioc = qio_channel_socket_new();
if (qio_channel_socket_listen_sync(sioc, resaddrs[i],
if (qio_channel_socket_listen_sync(sioc, resaddrs[i], num,
err ? NULL : &err) == 0) {
success = true;

View File

@ -17,8 +17,8 @@ qio_channel_socket_connect_sync(void *ioc, void *addr) "Socket connect sync ioc=
qio_channel_socket_connect_async(void *ioc, void *addr) "Socket connect async ioc=%p addr=%p"
qio_channel_socket_connect_fail(void *ioc) "Socket connect fail ioc=%p"
qio_channel_socket_connect_complete(void *ioc, int fd) "Socket connect complete ioc=%p fd=%d"
qio_channel_socket_listen_sync(void *ioc, void *addr) "Socket listen sync ioc=%p addr=%p"
qio_channel_socket_listen_async(void *ioc, void *addr) "Socket listen async ioc=%p addr=%p"
qio_channel_socket_listen_sync(void *ioc, void *addr, int num) "Socket listen sync ioc=%p addr=%p num=%d"
qio_channel_socket_listen_async(void *ioc, void *addr, int num) "Socket listen async ioc=%p addr=%p num=%d"
qio_channel_socket_listen_fail(void *ioc) "Socket listen fail ioc=%p"
qio_channel_socket_listen_complete(void *ioc, int fd) "Socket listen complete ioc=%p fd=%d"
qio_channel_socket_dgram_sync(void *ioc, void *localAddr, void *remoteAddr) "Socket dgram sync ioc=%p localAddr=%p remoteAddr=%p"

View File

@ -178,10 +178,15 @@ static void socket_start_incoming_migration(SocketAddress *saddr,
{
QIONetListener *listener = qio_net_listener_new();
size_t i;
int num = 1;
qio_net_listener_set_name(listener, "migration-socket-listener");
if (qio_net_listener_open_sync(listener, saddr, errp) < 0) {
if (migrate_use_multifd()) {
num = migrate_multifd_channels();
}
if (qio_net_listener_open_sync(listener, saddr, num, errp) < 0) {
object_unref(OBJECT(listener));
return;
}

View File

@ -1054,7 +1054,7 @@ int main(int argc, char **argv)
server = qio_net_listener_new();
if (socket_activation == 0) {
saddr = nbd_build_socket_address(sockpath, bindto, port);
if (qio_net_listener_open_sync(server, saddr, &local_err) < 0) {
if (qio_net_listener_open_sync(server, saddr, 1, &local_err) < 0) {
object_unref(OBJECT(server));
error_report_err(local_err);
exit(EXIT_FAILURE);

View File

@ -215,7 +215,7 @@ static gboolean ga_channel_open(GAChannel *c, const gchar *path,
return false;
}
fd = socket_listen(addr, &local_err);
fd = socket_listen(addr, 1, &local_err);
qapi_free_SocketAddress(addr);
if (local_err != NULL) {
g_critical("%s", error_get_pretty(local_err));

View File

@ -1005,7 +1005,8 @@ int main(int argc, char **argv)
.u.q_unix.path = socket_path,
};
server_ioc = qio_channel_socket_new();
if (qio_channel_socket_listen_sync(server_ioc, &saddr, &local_err) < 0) {
if (qio_channel_socket_listen_sync(server_ioc, &saddr,
1, &local_err) < 0) {
object_unref(OBJECT(server_ioc));
error_report_err(local_err);
return 1;

View File

@ -667,7 +667,7 @@ char_socket_addr_to_opt_str(SocketAddress *addr, bool fd_pass,
char *optstr;
g_assert(!reconnect);
if (is_listen) {
qio_channel_socket_listen_sync(ioc, addr, &error_abort);
qio_channel_socket_listen_sync(ioc, addr, 1, &error_abort);
} else {
qio_channel_socket_connect_sync(ioc, addr, &error_abort);
}
@ -892,7 +892,7 @@ static void char_socket_client_test(gconstpointer opaque)
*/
ioc = qio_channel_socket_new();
g_assert_nonnull(ioc);
qio_channel_socket_listen_sync(ioc, config->addr, &error_abort);
qio_channel_socket_listen_sync(ioc, config->addr, 1, &error_abort);
addr = qio_channel_socket_get_local_address(ioc, &error_abort);
g_assert_nonnull(addr);

View File

@ -57,7 +57,7 @@ static void test_io_channel_setup_sync(SocketAddress *listen_addr,
QIOChannelSocket *lioc;
lioc = qio_channel_socket_new();
qio_channel_socket_listen_sync(lioc, listen_addr, &error_abort);
qio_channel_socket_listen_sync(lioc, listen_addr, 1, &error_abort);
if (listen_addr->type == SOCKET_ADDRESS_TYPE_INET) {
SocketAddress *laddr = qio_channel_socket_get_local_address(
@ -113,7 +113,7 @@ static void test_io_channel_setup_async(SocketAddress *listen_addr,
lioc = qio_channel_socket_new();
qio_channel_socket_listen_async(
lioc, listen_addr,
lioc, listen_addr, 1,
test_io_channel_complete, &data, NULL, NULL);
g_main_loop_run(data.loop);

View File

@ -93,7 +93,7 @@ static void test_socket_fd_pass_name_good(void)
g_assert_cmpint(fd, !=, mon_fd);
close(fd);
fd = socket_listen(&addr, &error_abort);
fd = socket_listen(&addr, 1, &error_abort);
g_assert_cmpint(fd, !=, -1);
g_assert_cmpint(fd, !=, mon_fd);
close(fd);
@ -124,7 +124,7 @@ static void test_socket_fd_pass_name_bad(void)
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
fd = socket_listen(&addr, &err);
fd = socket_listen(&addr, 1, &err);
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
@ -151,7 +151,7 @@ static void test_socket_fd_pass_name_nomon(void)
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
fd = socket_listen(&addr, &err);
fd = socket_listen(&addr, 1, &err);
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
@ -174,7 +174,7 @@ static void test_socket_fd_pass_num_good(void)
fd = socket_connect(&addr, &error_abort);
g_assert_cmpint(fd, ==, sfd);
fd = socket_listen(&addr, &error_abort);
fd = socket_listen(&addr, 1, &error_abort);
g_assert_cmpint(fd, ==, sfd);
g_free(addr.u.fd.str);
@ -197,7 +197,7 @@ static void test_socket_fd_pass_num_bad(void)
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
fd = socket_listen(&addr, &err);
fd = socket_listen(&addr, 1, &err);
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
@ -220,7 +220,7 @@ static void test_socket_fd_pass_num_nocli(void)
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);
fd = socket_listen(&addr, &err);
fd = socket_listen(&addr, 1, &err);
g_assert_cmpint(fd, ==, -1);
error_free_or_abort(&err);

View File

@ -76,7 +76,7 @@ void *tpm_emu_ctrl_thread(void *data)
QIOChannelSocket *lioc = qio_channel_socket_new();
QIOChannel *ioc;
qio_channel_socket_listen_sync(lioc, s->addr, &error_abort);
qio_channel_socket_listen_sync(lioc, s->addr, 1, &error_abort);
g_mutex_lock(&s->data_mutex);
s->data_cond_signal = true;

View File

@ -3765,7 +3765,7 @@ static int vnc_display_listen(VncDisplay *vd,
qio_net_listener_set_name(vd->listener, "vnc-listen");
for (i = 0; i < nsaddr; i++) {
if (qio_net_listener_open_sync(vd->listener,
saddr[i],
saddr[i], 1,
errp) < 0) {
return -1;
}
@ -3780,7 +3780,7 @@ static int vnc_display_listen(VncDisplay *vd,
qio_net_listener_set_name(vd->wslistener, "vnc-ws-listen");
for (i = 0; i < nwsaddr; i++) {
if (qio_net_listener_open_sync(vd->wslistener,
wsaddr[i],
wsaddr[i], 1,
errp) < 0) {
return -1;
}

View File

@ -31,6 +31,7 @@
#include "qapi/qobject-input-visitor.h"
#include "qapi/qobject-output-visitor.h"
#include "qemu/cutils.h"
#include "trace.h"
#ifndef AI_ADDRCONFIG
# define AI_ADDRCONFIG 0
@ -207,6 +208,7 @@ static int try_bind(int socket, InetSocketAddress *saddr, struct addrinfo *e)
static int inet_listen_saddr(InetSocketAddress *saddr,
int port_offset,
int num,
Error **errp)
{
struct addrinfo ai,*res,*e;
@ -309,7 +311,7 @@ static int inet_listen_saddr(InetSocketAddress *saddr,
goto listen_failed;
}
} else {
if (!listen(slisten, 1)) {
if (!listen(slisten, num)) {
goto listen_ok;
}
if (errno != EADDRINUSE) {
@ -774,6 +776,7 @@ static int vsock_connect_saddr(VsockSocketAddress *vaddr, Error **errp)
}
static int vsock_listen_saddr(VsockSocketAddress *vaddr,
int num,
Error **errp)
{
struct sockaddr_vm svm;
@ -795,7 +798,7 @@ static int vsock_listen_saddr(VsockSocketAddress *vaddr,
return -1;
}
if (listen(slisten, 1) != 0) {
if (listen(slisten, num) != 0) {
error_setg_errno(errp, errno, "Failed to listen on socket");
closesocket(slisten);
return -1;
@ -836,6 +839,7 @@ static int vsock_connect_saddr(VsockSocketAddress *vaddr, Error **errp)
}
static int vsock_listen_saddr(VsockSocketAddress *vaddr,
int num,
Error **errp)
{
vsock_unsupported(errp);
@ -853,6 +857,7 @@ static int vsock_parse(VsockSocketAddress *addr, const char *str,
#ifndef _WIN32
static int unix_listen_saddr(UnixSocketAddress *saddr,
int num,
Error **errp)
{
struct sockaddr_un un;
@ -914,7 +919,7 @@ static int unix_listen_saddr(UnixSocketAddress *saddr,
error_setg_errno(errp, errno, "Failed to bind socket to %s", path);
goto err;
}
if (listen(sock, 1) < 0) {
if (listen(sock, num) < 0) {
error_setg_errno(errp, errno, "Failed to listen on socket");
goto err;
}
@ -981,6 +986,7 @@ static int unix_connect_saddr(UnixSocketAddress *saddr, Error **errp)
#else
static int unix_listen_saddr(UnixSocketAddress *saddr,
int num,
Error **errp)
{
error_setg(errp, "unix sockets are not available on windows");
@ -1004,7 +1010,7 @@ int unix_listen(const char *str, Error **errp)
saddr = g_new0(UnixSocketAddress, 1);
saddr->path = g_strdup(str);
sock = unix_listen_saddr(saddr, errp);
sock = unix_listen_saddr(saddr, 1, errp);
qapi_free_UnixSocketAddress(saddr);
return sock;
}
@ -1061,9 +1067,13 @@ fail:
return NULL;
}
static int socket_get_fd(const char *fdstr, Error **errp)
static int socket_get_fd(const char *fdstr, int num, Error **errp)
{
int fd;
if (num != 1) {
error_setg_errno(errp, EINVAL, "socket_get_fd: too many connections");
return -1;
}
if (cur_mon) {
fd = monitor_get_fd(cur_mon, fdstr, errp);
if (fd < 0) {
@ -1099,7 +1109,7 @@ int socket_connect(SocketAddress *addr, Error **errp)
break;
case SOCKET_ADDRESS_TYPE_FD:
fd = socket_get_fd(addr->u.fd.str, errp);
fd = socket_get_fd(addr->u.fd.str, 1, errp);
break;
case SOCKET_ADDRESS_TYPE_VSOCK:
@ -1112,25 +1122,26 @@ int socket_connect(SocketAddress *addr, Error **errp)
return fd;
}
int socket_listen(SocketAddress *addr, Error **errp)
int socket_listen(SocketAddress *addr, int num, Error **errp)
{
int fd;
trace_socket_listen(num);
switch (addr->type) {
case SOCKET_ADDRESS_TYPE_INET:
fd = inet_listen_saddr(&addr->u.inet, 0, errp);
fd = inet_listen_saddr(&addr->u.inet, 0, num, errp);
break;
case SOCKET_ADDRESS_TYPE_UNIX:
fd = unix_listen_saddr(&addr->u.q_unix, errp);
fd = unix_listen_saddr(&addr->u.q_unix, num, errp);
break;
case SOCKET_ADDRESS_TYPE_FD:
fd = socket_get_fd(addr->u.fd.str, errp);
fd = socket_get_fd(addr->u.fd.str, num, errp);
break;
case SOCKET_ADDRESS_TYPE_VSOCK:
fd = vsock_listen_saddr(&addr->u.vsock, errp);
fd = vsock_listen_saddr(&addr->u.vsock, num, errp);
break;
default:

View File

@ -64,6 +64,9 @@ lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d"
lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d"
lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter"
# qemu-sockets.c
socket_listen(int num) "backlog: %d"
# qemu-thread-common.h
qemu_mutex_lock(void *mutex, const char *file, const int line) "waiting on mutex %p (%s:%d)"
qemu_mutex_locked(void *mutex, const char *file, const int line) "taken mutex %p (%s:%d)"