-----BEGIN PGP SIGNATURE-----

iQIcBAABCAAGBQJan8v5AAoJEL6G67QVEE/ffDwP/1iXeZZXTvyztiuQaP495JQl
 1e9FChjhB+2eeLuBTZHZrxIM89aoBCHtQ8sPsWDShrdSy5REROr6CX696vyaI/wK
 R72LQ+FuyOmeIASOdAqC7xETSP8f88VvfI4YOtOYHTmIMvMh+Uz1DJAgqI9E5Zbp
 sX2BIg43qqJRaRimyihRpEkmvuV/djd4FpBC1G8r8+tv3jH2tJJ9dhhLwAUkJf34
 j3IIEqw3jhl13K1RDCgAqYovcD5sRZMgoaoD+JXxU+r0IBw5Dz9a9sQ19RzAZNaK
 sbjdgTCG7yQ0JOMOKoRDcHdCQvXLE2XhyPHvbrZ/d4nTcpZN4H9EpuDBL7PrR6kp
 c3q5Uff1UC2EBc3frJ+si27B3b/RKdDXXy6Ocb/BL2aivcVwjZlhuHPrfpR97NEL
 ONICf9OW3j1y6vHRTUMmpkEG71MHD26S6oZKljuGAgc8mjwTI7XwiU66R1+0R7b8
 YuXoHthPlWrKQgnVO7V6c/1YzV7mxjRqvPxvR6Flox/P0wxKPzC1K7UH7qxlhJEW
 xydPiBoaXZySa5M1eScwxaU2bap+zeBQTkjmJ2wpqH6OPwzKwBNLYHus78bIrQFI
 vVq8ukZeJYvLlvu4lhoQoSZM1Im6GFI2Pj65sovDuJqZ+clzqJdmI/1OzZimqSgi
 bIX+8GhilfZR+Gji3Sb1
 =dBv4
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/berrange/tags/qio-next-pull-request' into staging

# gpg: Signature made Wed 07 Mar 2018 11:24:41 GMT
# gpg:                using RSA key BE86EBB415104FDF
# gpg: Good signature from "Daniel P. Berrange <dan@berrange.com>"
# gpg:                 aka "Daniel P. Berrange <berrange@redhat.com>"
# Primary key fingerprint: DAF3 A6FD B26B 6291 2D0E  8E3F BE86 EBB4 1510 4FDF

* remotes/berrange/tags/qio-next-pull-request:
  qio: non-default context for TLS handshake
  qio: non-default context for async conn
  qio: non-default context for threaded qtask
  qio: store gsources for net listeners
  qio: introduce qio_channel_add_watch_{full|source}
  qio: rename qio_task_thread_result

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2018-03-08 11:26:13 +00:00
commit 3ef91576b9
21 changed files with 242 additions and 65 deletions

View File

@ -707,6 +707,7 @@ static void tcp_chr_tls_init(Chardev *chr)
qio_channel_tls_handshake(tioc, qio_channel_tls_handshake(tioc,
tcp_chr_tls_handshake, tcp_chr_tls_handshake,
chr, chr,
NULL,
NULL); NULL);
} }
@ -871,7 +872,7 @@ static gboolean socket_reconnect_timeout(gpointer opaque)
tcp_chr_set_client_ioc_name(chr, sioc); tcp_chr_set_client_ioc_name(chr, sioc);
qio_channel_socket_connect_async(sioc, s->addr, qio_channel_socket_connect_async(sioc, s->addr,
qemu_chr_socket_connected, qemu_chr_socket_connected,
chr, NULL); chr, NULL, NULL);
return false; return false;
} }
@ -955,7 +956,7 @@ static void qmp_chardev_open_socket(Chardev *chr,
tcp_chr_set_client_ioc_name(chr, sioc); tcp_chr_set_client_ioc_name(chr, sioc);
qio_channel_socket_connect_async(sioc, s->addr, qio_channel_socket_connect_async(sioc, s->addr,
qemu_chr_socket_connected, qemu_chr_socket_connected,
chr, NULL); chr, NULL, NULL);
} else { } else {
if (s->is_listen) { if (s->is_listen) {
char *name; char *name;

View File

@ -101,6 +101,8 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
* @callback: the function to invoke on completion * @callback: the function to invoke on completion
* @opaque: user data to pass to @callback * @opaque: user data to pass to @callback
* @destroy: the function to free @opaque * @destroy: the function to free @opaque
* @context: the context to run the async task. If %NULL, the default
* context will be used.
* *
* Attempt to connect to the address @addr. This method * Attempt to connect to the address @addr. This method
* will run in the background so the caller will regain * will run in the background so the caller will regain
@ -113,7 +115,8 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
SocketAddress *addr, SocketAddress *addr,
QIOTaskFunc callback, QIOTaskFunc callback,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy); GDestroyNotify destroy,
GMainContext *context);
/** /**
@ -138,6 +141,8 @@ int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
* @callback: the function to invoke on completion * @callback: the function to invoke on completion
* @opaque: user data to pass to @callback * @opaque: user data to pass to @callback
* @destroy: the function to free @opaque * @destroy: the function to free @opaque
* @context: the context to run the async task. If %NULL, the default
* context will be used.
* *
* Attempt to listen to the address @addr. This method * Attempt to listen to the address @addr. This method
* will run in the background so the caller will regain * will run in the background so the caller will regain
@ -150,7 +155,8 @@ void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
SocketAddress *addr, SocketAddress *addr,
QIOTaskFunc callback, QIOTaskFunc callback,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy); GDestroyNotify destroy,
GMainContext *context);
/** /**
@ -179,6 +185,8 @@ int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
* @callback: the function to invoke on completion * @callback: the function to invoke on completion
* @opaque: user data to pass to @callback * @opaque: user data to pass to @callback
* @destroy: the function to free @opaque * @destroy: the function to free @opaque
* @context: the context to run the async task. If %NULL, the default
* context will be used.
* *
* Attempt to initialize a datagram socket bound to * Attempt to initialize a datagram socket bound to
* @localAddr and communicating with peer @remoteAddr. * @localAddr and communicating with peer @remoteAddr.
@ -194,7 +202,8 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
SocketAddress *remoteAddr, SocketAddress *remoteAddr,
QIOTaskFunc callback, QIOTaskFunc callback,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy); GDestroyNotify destroy,
GMainContext *context);
/** /**

View File

@ -116,6 +116,8 @@ qio_channel_tls_new_client(QIOChannel *master,
* @func: the callback to invoke when completed * @func: the callback to invoke when completed
* @opaque: opaque data to pass to @func * @opaque: opaque data to pass to @func
* @destroy: optional callback to free @opaque * @destroy: optional callback to free @opaque
* @context: the context that TLS handshake will run with. If %NULL,
* the default context will be used
* *
* Perform the TLS session handshake. This method * Perform the TLS session handshake. This method
* will return immediately and the handshake will * will return immediately and the handshake will
@ -126,7 +128,8 @@ qio_channel_tls_new_client(QIOChannel *master,
void qio_channel_tls_handshake(QIOChannelTLS *ioc, void qio_channel_tls_handshake(QIOChannelTLS *ioc,
QIOTaskFunc func, QIOTaskFunc func,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy); GDestroyNotify destroy,
GMainContext *context);
/** /**
* qio_channel_tls_get_session: * qio_channel_tls_get_session:

View File

@ -648,6 +648,50 @@ guint qio_channel_add_watch(QIOChannel *ioc,
gpointer user_data, gpointer user_data,
GDestroyNotify notify); GDestroyNotify notify);
/**
* qio_channel_add_watch_full:
* @ioc: the channel object
* @condition: the I/O condition to monitor
* @func: callback to invoke when the source becomes ready
* @user_data: opaque data to pass to @func
* @notify: callback to free @user_data
* @context: the context to run the watch source
*
* Similar as qio_channel_add_watch(), but allows to specify context
* to run the watch source.
*
* Returns: the source ID
*/
guint qio_channel_add_watch_full(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
gpointer user_data,
GDestroyNotify notify,
GMainContext *context);
/**
* qio_channel_add_watch_source:
* @ioc: the channel object
* @condition: the I/O condition to monitor
* @func: callback to invoke when the source becomes ready
* @user_data: opaque data to pass to @func
* @notify: callback to free @user_data
* @context: gcontext to bind the source to
*
* Similar as qio_channel_add_watch(), but allows to specify context
* to run the watch source, meanwhile return the GSource object
* instead of tag ID, with the GSource referenced already.
*
* Note: callers is responsible to unref the source when not needed.
*
* Returns: the source pointer
*/
GSource *qio_channel_add_watch_source(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
gpointer user_data,
GDestroyNotify notify,
GMainContext *context);
/** /**
* qio_channel_attach_aio_context: * qio_channel_attach_aio_context:

View File

@ -53,7 +53,7 @@ struct QIONetListener {
char *name; char *name;
QIOChannelSocket **sioc; QIOChannelSocket **sioc;
gulong *io_tag; GSource **io_source;
size_t nsioc; size_t nsioc;
bool connected; bool connected;
@ -119,6 +119,26 @@ int qio_net_listener_open_sync(QIONetListener *listener,
void qio_net_listener_add(QIONetListener *listener, void qio_net_listener_add(QIONetListener *listener,
QIOChannelSocket *sioc); QIOChannelSocket *sioc);
/**
* qio_net_listener_set_client_func_full:
* @listener: the network listener object
* @func: the callback function
* @data: opaque data to pass to @func
* @notify: callback to free @data
* @context: the context that the sources will be bound to. If %NULL,
* the default context will be used.
*
* Register @func to be invoked whenever a new client
* connects to the listener. @func will be invoked
* passing in the QIOChannelSocket instance for the
* client.
*/
void qio_net_listener_set_client_func_full(QIONetListener *listener,
QIONetListenerClientFunc func,
gpointer data,
GDestroyNotify notify,
GMainContext *context);
/** /**
* qio_net_listener_set_client_func: * qio_net_listener_set_client_func:
* @listener: the network listener object * @listener: the network listener object
@ -126,10 +146,8 @@ void qio_net_listener_add(QIONetListener *listener,
* @data: opaque data to pass to @func * @data: opaque data to pass to @func
* @notify: callback to free @data * @notify: callback to free @data
* *
* Register @func to be invoked whenever a new client * Wrapper of qio_net_listener_set_client_func_full(), only that the
* connects to the listener. @func will be invoked * sources will always be bound to default main context.
* passing in the QIOChannelSocket instance for the
* client.
*/ */
void qio_net_listener_set_client_func(QIONetListener *listener, void qio_net_listener_set_client_func(QIONetListener *listener,
QIONetListenerClientFunc func, QIONetListenerClientFunc func,

View File

@ -227,15 +227,18 @@ QIOTask *qio_task_new(Object *source,
* @worker: the function to invoke in a thread * @worker: the function to invoke in a thread
* @opaque: opaque data to pass to @worker * @opaque: opaque data to pass to @worker
* @destroy: function to free @opaque * @destroy: function to free @opaque
* @context: the context to run the complete hook. If %NULL, the
* default context will be used.
* *
* Run a task in a background thread. When @worker * Run a task in a background thread. When @worker
* returns it will call qio_task_complete() in * returns it will call qio_task_complete() in
* the main event thread context. * the event thread context that provided.
*/ */
void qio_task_run_in_thread(QIOTask *task, void qio_task_run_in_thread(QIOTask *task,
QIOTaskWorker worker, QIOTaskWorker worker,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy); GDestroyNotify destroy,
GMainContext *context);
/** /**
* qio_task_complete: * qio_task_complete:

View File

@ -174,7 +174,8 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
SocketAddress *addr, SocketAddress *addr,
QIOTaskFunc callback, QIOTaskFunc callback,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy) GDestroyNotify destroy,
GMainContext *context)
{ {
QIOTask *task = qio_task_new( QIOTask *task = qio_task_new(
OBJECT(ioc), callback, opaque, destroy); OBJECT(ioc), callback, opaque, destroy);
@ -188,7 +189,8 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
qio_task_run_in_thread(task, qio_task_run_in_thread(task,
qio_channel_socket_connect_worker, qio_channel_socket_connect_worker,
addrCopy, addrCopy,
(GDestroyNotify)qapi_free_SocketAddress); (GDestroyNotify)qapi_free_SocketAddress,
context);
} }
@ -233,7 +235,8 @@ void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
SocketAddress *addr, SocketAddress *addr,
QIOTaskFunc callback, QIOTaskFunc callback,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy) GDestroyNotify destroy,
GMainContext *context)
{ {
QIOTask *task = qio_task_new( QIOTask *task = qio_task_new(
OBJECT(ioc), callback, opaque, destroy); OBJECT(ioc), callback, opaque, destroy);
@ -246,7 +249,8 @@ void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
qio_task_run_in_thread(task, qio_task_run_in_thread(task,
qio_channel_socket_listen_worker, qio_channel_socket_listen_worker,
addrCopy, addrCopy,
(GDestroyNotify)qapi_free_SocketAddress); (GDestroyNotify)qapi_free_SocketAddress,
context);
} }
@ -308,7 +312,8 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
SocketAddress *remoteAddr, SocketAddress *remoteAddr,
QIOTaskFunc callback, QIOTaskFunc callback,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy) GDestroyNotify destroy,
GMainContext *context)
{ {
QIOTask *task = qio_task_new( QIOTask *task = qio_task_new(
OBJECT(ioc), callback, opaque, destroy); OBJECT(ioc), callback, opaque, destroy);
@ -322,7 +327,8 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
qio_task_run_in_thread(task, qio_task_run_in_thread(task,
qio_channel_socket_dgram_worker, qio_channel_socket_dgram_worker,
data, data,
qio_channel_socket_dgram_worker_free); qio_channel_socket_dgram_worker_free,
context);
} }

View File

@ -140,13 +140,19 @@ qio_channel_tls_new_client(QIOChannel *master,
return NULL; return NULL;
} }
struct QIOChannelTLSData {
QIOTask *task;
GMainContext *context;
};
typedef struct QIOChannelTLSData QIOChannelTLSData;
static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc, static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc,
GIOCondition condition, GIOCondition condition,
gpointer user_data); gpointer user_data);
static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc, static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
QIOTask *task) QIOTask *task,
GMainContext *context)
{ {
Error *err = NULL; Error *err = NULL;
QCryptoTLSSessionHandshakeStatus status; QCryptoTLSSessionHandshakeStatus status;
@ -171,6 +177,15 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
qio_task_complete(task); qio_task_complete(task);
} else { } else {
GIOCondition condition; GIOCondition condition;
QIOChannelTLSData *data = g_new0(typeof(*data), 1);
data->task = task;
data->context = context;
if (context) {
g_main_context_ref(context);
}
if (status == QCRYPTO_TLS_HANDSHAKE_SENDING) { if (status == QCRYPTO_TLS_HANDSHAKE_SENDING) {
condition = G_IO_OUT; condition = G_IO_OUT;
} else { } else {
@ -178,11 +193,12 @@ static void qio_channel_tls_handshake_task(QIOChannelTLS *ioc,
} }
trace_qio_channel_tls_handshake_pending(ioc, status); trace_qio_channel_tls_handshake_pending(ioc, status);
qio_channel_add_watch(ioc->master, qio_channel_add_watch_full(ioc->master,
condition, condition,
qio_channel_tls_handshake_io, qio_channel_tls_handshake_io,
task, data,
NULL); NULL,
context);
} }
} }
@ -191,12 +207,18 @@ static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc,
GIOCondition condition, GIOCondition condition,
gpointer user_data) gpointer user_data)
{ {
QIOTask *task = user_data; QIOChannelTLSData *data = user_data;
QIOTask *task = data->task;
GMainContext *context = data->context;
QIOChannelTLS *tioc = QIO_CHANNEL_TLS( QIOChannelTLS *tioc = QIO_CHANNEL_TLS(
qio_task_get_source(task)); qio_task_get_source(task));
qio_channel_tls_handshake_task( g_free(data);
tioc, task); qio_channel_tls_handshake_task(tioc, task, context);
if (context) {
g_main_context_unref(context);
}
return FALSE; return FALSE;
} }
@ -204,7 +226,8 @@ static gboolean qio_channel_tls_handshake_io(QIOChannel *ioc,
void qio_channel_tls_handshake(QIOChannelTLS *ioc, void qio_channel_tls_handshake(QIOChannelTLS *ioc,
QIOTaskFunc func, QIOTaskFunc func,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy) GDestroyNotify destroy,
GMainContext *context)
{ {
QIOTask *task; QIOTask *task;
@ -212,7 +235,7 @@ void qio_channel_tls_handshake(QIOChannelTLS *ioc,
func, opaque, destroy); func, opaque, destroy);
trace_qio_channel_tls_handshake_start(ioc); trace_qio_channel_tls_handshake_start(ioc);
qio_channel_tls_handshake_task(ioc, task); qio_channel_tls_handshake_task(ioc, task, context);
} }

View File

@ -299,11 +299,12 @@ void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque); klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
} }
guint qio_channel_add_watch(QIOChannel *ioc, guint qio_channel_add_watch_full(QIOChannel *ioc,
GIOCondition condition, GIOCondition condition,
QIOChannelFunc func, QIOChannelFunc func,
gpointer user_data, gpointer user_data,
GDestroyNotify notify) GDestroyNotify notify,
GMainContext *context)
{ {
GSource *source; GSource *source;
guint id; guint id;
@ -312,12 +313,39 @@ guint qio_channel_add_watch(QIOChannel *ioc,
g_source_set_callback(source, (GSourceFunc)func, user_data, notify); g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
id = g_source_attach(source, NULL); id = g_source_attach(source, context);
g_source_unref(source); g_source_unref(source);
return id; return id;
} }
guint qio_channel_add_watch(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
gpointer user_data,
GDestroyNotify notify)
{
return qio_channel_add_watch_full(ioc, condition, func,
user_data, notify, NULL);
}
GSource *qio_channel_add_watch_source(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
gpointer user_data,
GDestroyNotify notify,
GMainContext *context)
{
GSource *source;
guint id;
id = qio_channel_add_watch_full(ioc, condition, func,
user_data, notify, context);
source = g_main_context_find_source_by_id(context, id);
g_source_ref(source);
return source;
}
int qio_channel_shutdown(QIOChannel *ioc, int qio_channel_shutdown(QIOChannel *ioc,
QIOChannelShutdown how, QIOChannelShutdown how,

View File

@ -234,7 +234,8 @@ void qio_dns_resolver_lookup_async(QIODNSResolver *resolver,
qio_task_run_in_thread(task, qio_task_run_in_thread(task,
qio_dns_resolver_lookup_worker, qio_dns_resolver_lookup_worker,
data, data,
qio_dns_resolver_lookup_data_free); qio_dns_resolver_lookup_data_free,
NULL);
} }

View File

@ -118,29 +118,32 @@ void qio_net_listener_add(QIONetListener *listener,
listener->sioc = g_renew(QIOChannelSocket *, listener->sioc, listener->sioc = g_renew(QIOChannelSocket *, listener->sioc,
listener->nsioc + 1); listener->nsioc + 1);
listener->io_tag = g_renew(gulong, listener->io_tag, listener->nsioc + 1); listener->io_source = g_renew(typeof(listener->io_source[0]),
listener->io_source,
listener->nsioc + 1);
listener->sioc[listener->nsioc] = sioc; listener->sioc[listener->nsioc] = sioc;
listener->io_tag[listener->nsioc] = 0; listener->io_source[listener->nsioc] = NULL;
object_ref(OBJECT(sioc)); object_ref(OBJECT(sioc));
listener->connected = true; listener->connected = true;
if (listener->io_func != NULL) { if (listener->io_func != NULL) {
object_ref(OBJECT(listener)); object_ref(OBJECT(listener));
listener->io_tag[listener->nsioc] = qio_channel_add_watch( listener->io_source[listener->nsioc] = qio_channel_add_watch_source(
QIO_CHANNEL(listener->sioc[listener->nsioc]), G_IO_IN, QIO_CHANNEL(listener->sioc[listener->nsioc]), G_IO_IN,
qio_net_listener_channel_func, qio_net_listener_channel_func,
listener, (GDestroyNotify)object_unref); listener, (GDestroyNotify)object_unref, NULL);
} }
listener->nsioc++; listener->nsioc++;
} }
void qio_net_listener_set_client_func(QIONetListener *listener, void qio_net_listener_set_client_func_full(QIONetListener *listener,
QIONetListenerClientFunc func, QIONetListenerClientFunc func,
gpointer data, gpointer data,
GDestroyNotify notify) GDestroyNotify notify,
GMainContext *context)
{ {
size_t i; size_t i;
@ -152,23 +155,32 @@ void qio_net_listener_set_client_func(QIONetListener *listener,
listener->io_notify = notify; listener->io_notify = notify;
for (i = 0; i < listener->nsioc; i++) { for (i = 0; i < listener->nsioc; i++) {
if (listener->io_tag[i]) { if (listener->io_source[i]) {
g_source_remove(listener->io_tag[i]); g_source_destroy(listener->io_source[i]);
listener->io_tag[i] = 0; g_source_unref(listener->io_source[i]);
listener->io_source[i] = NULL;
} }
} }
if (listener->io_func != NULL) { if (listener->io_func != NULL) {
for (i = 0; i < listener->nsioc; i++) { for (i = 0; i < listener->nsioc; i++) {
object_ref(OBJECT(listener)); object_ref(OBJECT(listener));
listener->io_tag[i] = qio_channel_add_watch( listener->io_source[i] = qio_channel_add_watch_source(
QIO_CHANNEL(listener->sioc[i]), G_IO_IN, QIO_CHANNEL(listener->sioc[i]), G_IO_IN,
qio_net_listener_channel_func, qio_net_listener_channel_func,
listener, (GDestroyNotify)object_unref); listener, (GDestroyNotify)object_unref, context);
} }
} }
} }
void qio_net_listener_set_client_func(QIONetListener *listener,
QIONetListenerClientFunc func,
gpointer data,
GDestroyNotify notify)
{
qio_net_listener_set_client_func_full(listener, func, data,
notify, NULL);
}
struct QIONetListenerClientWaitData { struct QIONetListenerClientWaitData {
QIOChannelSocket *sioc; QIOChannelSocket *sioc;
@ -211,9 +223,10 @@ QIOChannelSocket *qio_net_listener_wait_client(QIONetListener *listener)
size_t i; size_t i;
for (i = 0; i < listener->nsioc; i++) { for (i = 0; i < listener->nsioc; i++) {
if (listener->io_tag[i]) { if (listener->io_source[i]) {
g_source_remove(listener->io_tag[i]); g_source_destroy(listener->io_source[i]);
listener->io_tag[i] = 0; g_source_unref(listener->io_source[i]);
listener->io_source[i] = NULL;
} }
} }
@ -241,10 +254,10 @@ QIOChannelSocket *qio_net_listener_wait_client(QIONetListener *listener)
if (listener->io_func != NULL) { if (listener->io_func != NULL) {
for (i = 0; i < listener->nsioc; i++) { for (i = 0; i < listener->nsioc; i++) {
object_ref(OBJECT(listener)); object_ref(OBJECT(listener));
listener->io_tag[i] = qio_channel_add_watch( listener->io_source[i] = qio_channel_add_watch_source(
QIO_CHANNEL(listener->sioc[i]), G_IO_IN, QIO_CHANNEL(listener->sioc[i]), G_IO_IN,
qio_net_listener_channel_func, qio_net_listener_channel_func,
listener, (GDestroyNotify)object_unref); listener, (GDestroyNotify)object_unref, NULL);
} }
} }
@ -260,9 +273,10 @@ void qio_net_listener_disconnect(QIONetListener *listener)
} }
for (i = 0; i < listener->nsioc; i++) { for (i = 0; i < listener->nsioc; i++) {
if (listener->io_tag[i]) { if (listener->io_source[i]) {
g_source_remove(listener->io_tag[i]); g_source_destroy(listener->io_source[i]);
listener->io_tag[i] = 0; g_source_unref(listener->io_source[i]);
listener->io_source[i] = NULL;
} }
qio_channel_close(QIO_CHANNEL(listener->sioc[i]), NULL); qio_channel_close(QIO_CHANNEL(listener->sioc[i]), NULL);
} }
@ -285,7 +299,7 @@ static void qio_net_listener_finalize(Object *obj)
for (i = 0; i < listener->nsioc; i++) { for (i = 0; i < listener->nsioc; i++) {
object_unref(OBJECT(listener->sioc[i])); object_unref(OBJECT(listener->sioc[i]));
} }
g_free(listener->io_tag); g_free(listener->io_source);
g_free(listener->sioc); g_free(listener->sioc);
g_free(listener->name); g_free(listener->name);
} }

View File

@ -77,10 +77,11 @@ struct QIOTaskThreadData {
QIOTaskWorker worker; QIOTaskWorker worker;
gpointer opaque; gpointer opaque;
GDestroyNotify destroy; GDestroyNotify destroy;
GMainContext *context;
}; };
static gboolean gio_task_thread_result(gpointer opaque) static gboolean qio_task_thread_result(gpointer opaque)
{ {
struct QIOTaskThreadData *data = opaque; struct QIOTaskThreadData *data = opaque;
@ -91,6 +92,10 @@ static gboolean gio_task_thread_result(gpointer opaque)
data->destroy(data->opaque); data->destroy(data->opaque);
} }
if (data->context) {
g_main_context_unref(data->context);
}
g_free(data); g_free(data);
return FALSE; return FALSE;
@ -100,6 +105,7 @@ static gboolean gio_task_thread_result(gpointer opaque)
static gpointer qio_task_thread_worker(gpointer opaque) static gpointer qio_task_thread_worker(gpointer opaque)
{ {
struct QIOTaskThreadData *data = opaque; struct QIOTaskThreadData *data = opaque;
GSource *idle;
trace_qio_task_thread_run(data->task); trace_qio_task_thread_run(data->task);
data->worker(data->task, data->opaque); data->worker(data->task, data->opaque);
@ -110,7 +116,11 @@ static gpointer qio_task_thread_worker(gpointer opaque)
* the worker results * the worker results
*/ */
trace_qio_task_thread_exit(data->task); trace_qio_task_thread_exit(data->task);
g_idle_add(gio_task_thread_result, data);
idle = g_idle_source_new();
g_source_set_callback(idle, qio_task_thread_result, data, NULL);
g_source_attach(idle, data->context);
return NULL; return NULL;
} }
@ -118,15 +128,21 @@ static gpointer qio_task_thread_worker(gpointer opaque)
void qio_task_run_in_thread(QIOTask *task, void qio_task_run_in_thread(QIOTask *task,
QIOTaskWorker worker, QIOTaskWorker worker,
gpointer opaque, gpointer opaque,
GDestroyNotify destroy) GDestroyNotify destroy,
GMainContext *context)
{ {
struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1); struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
QemuThread thread; QemuThread thread;
if (context) {
g_main_context_ref(context);
}
data->task = task; data->task = task;
data->worker = worker; data->worker = worker;
data->opaque = opaque; data->opaque = opaque;
data->destroy = destroy; data->destroy = destroy;
data->context = context;
trace_qio_task_thread_start(task, worker, opaque); trace_qio_task_thread_start(task, worker, opaque);
qemu_thread_create(&thread, qemu_thread_create(&thread,

View File

@ -103,7 +103,8 @@ static void socket_start_outgoing_migration(MigrationState *s,
saddr, saddr,
socket_outgoing_migration, socket_outgoing_migration,
data, data,
socket_connect_data_free); socket_connect_data_free,
NULL);
qapi_free_SocketAddress(saddr); qapi_free_SocketAddress(saddr);
} }

View File

@ -105,6 +105,7 @@ void migration_tls_channel_process_incoming(MigrationState *s,
qio_channel_tls_handshake(tioc, qio_channel_tls_handshake(tioc,
migration_tls_incoming_handshake, migration_tls_incoming_handshake,
NULL, NULL,
NULL,
NULL); NULL);
} }
@ -159,5 +160,6 @@ void migration_tls_channel_connect(MigrationState *s,
qio_channel_tls_handshake(tioc, qio_channel_tls_handshake(tioc,
migration_tls_outgoing_handshake, migration_tls_outgoing_handshake,
s, s,
NULL,
NULL); NULL);
} }

View File

@ -579,6 +579,7 @@ static QIOChannel *nbd_receive_starttls(QIOChannel *ioc,
qio_channel_tls_handshake(tioc, qio_channel_tls_handshake(tioc,
nbd_tls_handshake, nbd_tls_handshake,
&data, &data,
NULL,
NULL); NULL);
if (!data.complete) { if (!data.complete) {

View File

@ -599,6 +599,7 @@ static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
qio_channel_tls_handshake(tioc, qio_channel_tls_handshake(tioc,
nbd_tls_handshake, nbd_tls_handshake,
&data, &data,
NULL,
NULL); NULL);
if (!data.complete) { if (!data.complete) {

View File

@ -179,7 +179,7 @@ static void test_io_channel_setup_async(SocketAddress *listen_addr,
lioc = qio_channel_socket_new(); lioc = qio_channel_socket_new();
qio_channel_socket_listen_async( qio_channel_socket_listen_async(
lioc, listen_addr, lioc, listen_addr,
test_io_channel_complete, &data, NULL); test_io_channel_complete, &data, NULL, NULL);
g_main_loop_run(data.loop); g_main_loop_run(data.loop);
g_main_context_iteration(g_main_context_default(), FALSE); g_main_context_iteration(g_main_context_default(), FALSE);
@ -200,7 +200,7 @@ static void test_io_channel_setup_async(SocketAddress *listen_addr,
qio_channel_socket_connect_async( qio_channel_socket_connect_async(
QIO_CHANNEL_SOCKET(*src), connect_addr, QIO_CHANNEL_SOCKET(*src), connect_addr,
test_io_channel_complete, &data, NULL); test_io_channel_complete, &data, NULL, NULL);
g_main_loop_run(data.loop); g_main_loop_run(data.loop);
g_main_context_iteration(g_main_context_default(), FALSE); g_main_context_iteration(g_main_context_default(), FALSE);

View File

@ -203,10 +203,12 @@ static void test_io_channel_tls(const void *opaque)
qio_channel_tls_handshake(clientChanTLS, qio_channel_tls_handshake(clientChanTLS,
test_tls_handshake_done, test_tls_handshake_done,
&clientHandshake, &clientHandshake,
NULL,
NULL); NULL);
qio_channel_tls_handshake(serverChanTLS, qio_channel_tls_handshake(serverChanTLS,
test_tls_handshake_done, test_tls_handshake_done,
&serverHandshake, &serverHandshake,
NULL,
NULL); NULL);
/* /*

View File

@ -187,6 +187,7 @@ static void test_task_thread_complete(void)
qio_task_run_in_thread(task, qio_task_run_in_thread(task,
test_task_thread_worker, test_task_thread_worker,
&data, &data,
NULL,
NULL); NULL);
g_main_loop_run(data.loop); g_main_loop_run(data.loop);
@ -228,6 +229,7 @@ static void test_task_thread_failure(void)
qio_task_run_in_thread(task, qio_task_run_in_thread(task,
test_task_thread_worker, test_task_thread_worker,
&data, &data,
NULL,
NULL); NULL);
g_main_loop_run(data.loop); g_main_loop_run(data.loop);

View File

@ -128,6 +128,7 @@ static int protocol_client_vencrypt_auth(VncState *vs, uint8_t *data, size_t len
qio_channel_tls_handshake(tls, qio_channel_tls_handshake(tls,
vnc_tls_handshake_done, vnc_tls_handshake_done,
vs, vs,
NULL,
NULL); NULL);
} }
return 0; return 0;

View File

@ -81,6 +81,7 @@ gboolean vncws_tls_handshake_io(QIOChannel *ioc G_GNUC_UNUSED,
qio_channel_tls_handshake(tls, qio_channel_tls_handshake(tls,
vncws_tls_handshake_done, vncws_tls_handshake_done,
vs, vs,
NULL,
NULL); NULL);
return TRUE; return TRUE;