io: simplify websocket ping reply handling

We must ensure we don't get flooded with ping replies if the outbound
channel is slow. Currently we do this by keeping the ping reply in a
separate temporary buffer and only writing it if the encoutput buffer
is completely empty. This is overly pessimistic, as it is reasonable
to add a ping reply to the encoutput buffer even if it has previous
data in it, as long as that previous data doesn't include a ping
reply.

To track this better, put the ping reply directly into the encoutput
buffer, and then record the size of encoutput at this time in
pong_remain. As we write encoutput to the underlying channel, we
can decrement the pong_remain counter. Once it hits zero, we can
accept further ping replies for transmission.

Reviewed-by: Eric Blake <eblake@redhat.com>
Signed-off-by: Daniel P. Berrange <berrange@redhat.com>
This commit is contained in:
Daniel P. Berrange 2017-10-09 15:34:06 +01:00
parent a7b20a8efa
commit 57b0cdf152
2 changed files with 16 additions and 14 deletions

View File

@ -60,8 +60,8 @@ struct QIOChannelWebsock {
Buffer encoutput;
Buffer rawinput;
Buffer rawoutput;
Buffer ping_reply;
size_t payload_remain;
size_t pong_remain;
QIOChannelWebsockMask mask;
guint io_tag;
Error *io_err;

View File

@ -825,11 +825,14 @@ static int qio_channel_websock_decode_payload(QIOChannelWebsock *ioc,
}
return -1;
} else if (ioc->opcode == QIO_CHANNEL_WEBSOCK_OPCODE_PING) {
/* ping frames produce an immediate reply */
buffer_reset(&ioc->ping_reply);
qio_channel_websock_encode_buffer(
ioc, &ioc->ping_reply, QIO_CHANNEL_WEBSOCK_OPCODE_PONG,
&ioc->encinput);
/* ping frames produce an immediate reply, as long as we've not still
* got a previous pong queued, in which case we drop the new pong */
if (ioc->pong_remain == 0) {
qio_channel_websock_encode_buffer(
ioc, &ioc->encoutput, QIO_CHANNEL_WEBSOCK_OPCODE_PONG,
&ioc->encinput);
ioc->pong_remain = ioc->encoutput.offset;
}
} /* pong frames are ignored */
if (payload_len) {
@ -888,7 +891,6 @@ static void qio_channel_websock_finalize(Object *obj)
buffer_free(&ioc->encoutput);
buffer_free(&ioc->rawinput);
buffer_free(&ioc->rawoutput);
buffer_free(&ioc->ping_reply);
object_unref(OBJECT(ioc->master));
if (ioc->io_tag) {
g_source_remove(ioc->io_tag);
@ -946,12 +948,7 @@ static ssize_t qio_channel_websock_write_wire(QIOChannelWebsock *ioc,
ssize_t ret;
ssize_t done = 0;
/* ping replies take priority over binary data */
if (!ioc->ping_reply.offset) {
qio_channel_websock_encode(ioc);
} else if (!ioc->encoutput.offset) {
buffer_move_empty(&ioc->encoutput, &ioc->ping_reply);
}
qio_channel_websock_encode(ioc);
while (ioc->encoutput.offset > 0) {
ret = qio_channel_write(ioc->master,
@ -968,6 +965,11 @@ static ssize_t qio_channel_websock_write_wire(QIOChannelWebsock *ioc,
}
buffer_advance(&ioc->encoutput, ret);
done += ret;
if (ioc->pong_remain < ret) {
ioc->pong_remain = 0;
} else {
ioc->pong_remain -= ret;
}
}
return done;
}
@ -1026,7 +1028,7 @@ static void qio_channel_websock_set_watch(QIOChannelWebsock *ioc)
return;
}
if (ioc->encoutput.offset || ioc->ping_reply.offset) {
if (ioc->encoutput.offset) {
cond |= G_IO_OUT;
}
if (ioc->encinput.offset < QIO_CHANNEL_WEBSOCK_MAX_BUFFER &&