Rework pysocket async/await.

Attach the events directly to the pysocket data structure instead of
one event per pysocket_op.

Makes the code easier, gives us a good performance boost and reduces
the number of system calls required when doing an await on a socket.
This commit is contained in:
Joris Vink 2019-03-13 11:07:15 +01:00
parent 01f9b4fcde
commit 3b4574d791
3 changed files with 180 additions and 151 deletions

View File

@ -22,6 +22,7 @@ class EchoServer:
def __init__(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("127.0.0.1", 6969))
sock.listen()

View File

@ -147,13 +147,28 @@ static PyTypeObject pytimer_type = {
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
};
/* XXX */
struct pysocket;
struct pysocket_op;
struct pysocket_event {
struct kore_event evt;
struct pysocket *s;
};
struct pysocket {
PyObject_HEAD
int fd;
int family;
int protocol;
int scheduled;
PyObject *socket;
socklen_t addr_len;
struct pysocket_event event;
struct pysocket_op *recvop;
struct pysocket_op *sendop;
union {
struct sockaddr_in ipv4;
struct sockaddr_un sun;
@ -198,9 +213,8 @@ static PyTypeObject pysocket_type = {
#define PYSOCKET_TYPE_RECVFROM 5
#define PYSOCKET_TYPE_SENDTO 6
struct pysocket_data {
struct kore_event evt;
int fd;
struct pysocket_op {
PyObject_HEAD
int eof;
int type;
void *self;
@ -217,11 +231,6 @@ struct pysocket_data {
} sendaddr;
};
struct pysocket_op {
PyObject_HEAD
struct pysocket_data data;
};
static void pysocket_op_dealloc(struct pysocket_op *);
static PyObject *pysocket_op_await(PyObject *);

View File

@ -1640,7 +1640,16 @@ pysocket_alloc(void)
sock->fd = -1;
sock->family = -1;
sock->protocol = -1;
sock->scheduled = 0;
sock->socket = NULL;
sock->recvop = NULL;
sock->sendop = NULL;
sock->event.s = sock;
sock->event.evt.flags = 0;
sock->event.evt.type = KORE_TYPE_PYSOCKET;
sock->event.evt.handle = pysocket_evt_handle;
return (sock);
}
@ -1701,15 +1710,15 @@ pysocket_sendto(struct pysocket *sock, PyObject *args)
switch (sock->family) {
case AF_INET:
op->data.sendaddr.ipv4.sin_family = AF_INET;
op->data.sendaddr.ipv4.sin_port = htons(port);
op->data.sendaddr.ipv4.sin_addr.s_addr = inet_addr(ip);
op->sendaddr.ipv4.sin_family = AF_INET;
op->sendaddr.ipv4.sin_port = htons(port);
op->sendaddr.ipv4.sin_addr.s_addr = inet_addr(ip);
break;
case AF_UNIX:
op->data.sendaddr.sun.sun_family = AF_UNIX;
if (kore_strlcpy(op->data.sendaddr.sun.sun_path, sockaddr,
sizeof(op->data.sendaddr.sun.sun_path)) >=
sizeof(op->data.sendaddr.sun.sun_path)) {
op->sendaddr.sun.sun_family = AF_UNIX;
if (kore_strlcpy(op->sendaddr.sun.sun_path, sockaddr,
sizeof(op->sendaddr.sun.sun_path)) >=
sizeof(op->sendaddr.sun.sun_path)) {
Py_DECREF(ret);
PyErr_SetString(PyExc_RuntimeError,
"unix socket path too long");
@ -1745,7 +1754,7 @@ pysocket_recv(struct pysocket *sock, PyObject *args)
op = (struct pysocket_op *)obj;
if (timeo != -1) {
op->data.timer = kore_timer_add(pysocket_op_timeout,
op->timer = kore_timer_add(pysocket_op_timeout,
timeo, op, KORE_TIMER_ONESHOT);
}
@ -1836,38 +1845,35 @@ pysocket_close(struct pysocket *sock, PyObject *args)
static void
pysocket_op_dealloc(struct pysocket_op *op)
{
#if defined(__linux__)
kore_platform_disable_read(op->data.fd);
close(op->data.fd);
#else
switch (op->data.type) {
if (op->type == PYSOCKET_TYPE_RECV ||
op->type == PYSOCKET_TYPE_RECVFROM ||
op->type == PYSOCKET_TYPE_SEND)
kore_buf_cleanup(&op->buffer);
switch (op->type) {
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_ACCEPT:
case PYSOCKET_TYPE_RECVFROM:
kore_platform_disable_read(op->data.fd);
if (op->socket->recvop != op)
fatal("recvop mismatch");
op->socket->recvop = NULL;
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
case PYSOCKET_TYPE_CONNECT:
kore_platform_disable_write(op->data.fd);
if (op->socket->sendop != op)
fatal("sendop mismatch");
op->socket->sendop = NULL;
break;
default:
fatal("unknown pysocket_op type %u", op->data.type);
}
#endif
if (op->data.type == PYSOCKET_TYPE_RECV ||
op->data.type == PYSOCKET_TYPE_RECVFROM ||
op->data.type == PYSOCKET_TYPE_SEND)
kore_buf_cleanup(&op->data.buffer);
if (op->data.timer != NULL) {
kore_timer_remove(op->data.timer);
op->data.timer = NULL;
}
op->data.coro->sockop = NULL;
Py_DECREF(op->data.socket);
if (op->timer != NULL) {
kore_timer_remove(op->timer);
op->timer = NULL;
}
op->coro->sockop = NULL;
Py_DECREF(op->socket);
PyObject_Del((PyObject *)op);
}
@ -1880,63 +1886,71 @@ pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len)
if (coro_running->sockop != NULL)
fatal("pysocket_op_create: coro has active socketop");
switch (type) {
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_ACCEPT:
case PYSOCKET_TYPE_RECVFROM:
if (sock->recvop != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"only one recv operation can be done per socket");
return (NULL);
}
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
case PYSOCKET_TYPE_CONNECT:
if (sock->sendop != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"only one send operation can be done per socket");
return (NULL);
}
break;
default:
fatal("unknown pysocket_op type %u", type);
}
op = PyObject_New(struct pysocket_op, &pysocket_op_type);
if (op == NULL)
return (NULL);
#if defined(__linux__)
/*
* Duplicate the socket so each pysocket_op gets its own unique
* descriptor for epoll. This is so we can easily call EPOLL_CTL_DEL
* on the fd when the pysocket_op is finished as our event code
* does not track queued events.
*/
if ((op->data.fd = dup(sock->fd)) == -1)
fatal("dup: %s", errno_s);
#else
op->data.fd = sock->fd;
#endif
op->data.eof = 0;
op->data.self = op;
op->data.type = type;
op->data.timer = NULL;
op->data.socket = sock;
op->data.evt.flags = 0;
op->data.coro = coro_running;
op->data.evt.type = KORE_TYPE_PYSOCKET;
op->data.evt.handle = pysocket_evt_handle;
op->eof = 0;
op->self = op;
op->type = type;
op->timer = NULL;
op->socket = sock;
op->coro = coro_running;
coro_running->sockop = op;
Py_INCREF(op->data.socket);
Py_INCREF(op->socket);
switch (type) {
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_RECVFROM:
op->data.evt.flags |= KORE_EVENT_READ;
kore_buf_init(&op->data.buffer, len);
kore_platform_schedule_read(op->data.fd, &op->data);
sock->recvop = op;
kore_buf_init(&op->buffer, len);
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
op->data.evt.flags |= KORE_EVENT_WRITE;
kore_buf_init(&op->data.buffer, len);
kore_buf_append(&op->data.buffer, ptr, len);
kore_buf_reset(&op->data.buffer);
kore_platform_schedule_write(op->data.fd, &op->data);
sock->sendop = op;
kore_buf_init(&op->buffer, len);
kore_buf_append(&op->buffer, ptr, len);
kore_buf_reset(&op->buffer);
break;
case PYSOCKET_TYPE_ACCEPT:
op->data.evt.flags |= KORE_EVENT_READ;
kore_platform_schedule_read(op->data.fd, &op->data);
sock->recvop = op;
break;
case PYSOCKET_TYPE_CONNECT:
op->data.evt.flags |= KORE_EVENT_WRITE;
kore_platform_schedule_write(op->data.fd, &op->data);
sock->sendop = op;
break;
default:
fatal("unknown pysocket_op type %u", type);
}
if (sock->scheduled == 0) {
sock->scheduled = 1;
kore_platform_event_all(sock->fd, &sock->event);
}
return ((PyObject *)op);
}
@ -1952,25 +1966,25 @@ pysocket_op_iternext(struct pysocket_op *op)
{
PyObject *ret;
if (op->data.eof) {
if (op->data.coro->exception != NULL) {
PyErr_SetString(op->data.coro->exception,
op->data.coro->exception_msg);
op->data.coro->exception = NULL;
if (op->eof) {
if (op->coro->exception != NULL) {
PyErr_SetString(op->coro->exception,
op->coro->exception_msg);
op->coro->exception = NULL;
return (NULL);
}
if (op->data.type != PYSOCKET_TYPE_RECV) {
if (op->type != PYSOCKET_TYPE_RECV) {
PyErr_SetString(PyExc_RuntimeError, "socket EOF");
return (NULL);
}
/* Drain the recv socket. */
op->data.evt.flags |= KORE_EVENT_READ;
op->socket->event.evt.flags |= KORE_EVENT_READ;
return (pysocket_async_recv(op));
}
switch (op->data.type) {
switch (op->type) {
case PYSOCKET_TYPE_CONNECT:
ret = pysocket_async_connect(op);
break;
@ -1998,23 +2012,23 @@ pysocket_op_timeout(void *arg, u_int64_t now)
{
struct pysocket_op *op = arg;
op->data.eof = 1;
op->data.timer = NULL;
op->eof = 1;
op->timer = NULL;
op->data.coro->exception = PyExc_TimeoutError;
op->data.coro->exception_msg = "timeout before operation completed";
op->coro->exception = PyExc_TimeoutError;
op->coro->exception_msg = "timeout before operation completed";
if (op->data.coro->request != NULL)
http_request_wakeup(op->data.coro->request);
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->data.coro);
python_coro_wakeup(op->coro);
}
static PyObject *
pysocket_async_connect(struct pysocket_op *op)
{
if (connect(op->data.fd, (struct sockaddr *)&op->data.socket->addr,
op->data.socket->addr_len) == -1) {
if (connect(op->socket->fd, (struct sockaddr *)&op->socket->addr,
op->socket->addr_len) == -1) {
if (errno != EALREADY && errno != EINPROGRESS &&
errno != EISCONN && errno != EAGAIN) {
PyErr_SetString(PyExc_RuntimeError, errno_s);
@ -2036,15 +2050,20 @@ pysocket_async_accept(struct pysocket_op *op)
int fd;
struct pysocket *sock;
if ((sock = pysocket_alloc() ) == NULL)
if (!(op->socket->event.evt.flags & KORE_EVENT_READ)) {
Py_RETURN_NONE;
}
if ((sock = pysocket_alloc()) == NULL)
return (NULL);
sock->addr_len = sizeof(sock->addr);
if ((fd = accept(op->data.fd,
if ((fd = accept(op->socket->fd,
(struct sockaddr *)&sock->addr, &sock->addr_len)) == -1) {
Py_DECREF((PyObject *)sock);
if (errno == EAGAIN || errno == EWOULDBLOCK) {
op->socket->event.evt.flags &= ~KORE_EVENT_READ;
Py_RETURN_NONE;
}
PyErr_SetString(PyExc_RuntimeError, errno_s);
@ -2059,8 +2078,8 @@ pysocket_async_accept(struct pysocket_op *op)
sock->fd = fd;
sock->socket = NULL;
sock->family = op->data.socket->family;
sock->protocol = op->data.socket->protocol;
sock->family = op->socket->family;
sock->protocol = op->socket->protocol;
PyErr_SetObject(PyExc_StopIteration, (PyObject *)sock);
Py_DECREF((PyObject *)sock);
@ -2078,36 +2097,36 @@ pysocket_async_recv(struct pysocket_op *op)
const char *ptr, *ip;
PyObject *bytes, *result, *tuple;
if (!(op->data.evt.flags & KORE_EVENT_READ)) {
if (!(op->socket->event.evt.flags & KORE_EVENT_READ)) {
Py_RETURN_NONE;
}
for (;;) {
if (op->data.type == PYSOCKET_TYPE_RECV) {
ret = read(op->data.fd, op->data.buffer.data,
op->data.buffer.length);
if (op->type == PYSOCKET_TYPE_RECV) {
ret = read(op->socket->fd, op->buffer.data,
op->buffer.length);
} else {
sendaddr = (struct sockaddr *)&op->data.sendaddr;
switch (op->data.socket->family) {
sendaddr = (struct sockaddr *)&op->sendaddr;
switch (op->socket->family) {
case AF_INET:
socklen = sizeof(op->data.sendaddr.ipv4);
socklen = sizeof(op->sendaddr.ipv4);
break;
case AF_UNIX:
socklen = sizeof(op->data.sendaddr.sun);
socklen = sizeof(op->sendaddr.sun);
break;
default:
fatal("non AF_INET/AF_UNIX in %s", __func__);
}
ret = recvfrom(op->data.fd, op->data.buffer.data,
op->data.buffer.length, 0, sendaddr, &socklen);
ret = recvfrom(op->socket->fd, op->buffer.data,
op->buffer.length, 0, sendaddr, &socklen);
}
if (ret == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
op->data.evt.flags &= ~KORE_EVENT_READ;
op->socket->event.evt.flags &= ~KORE_EVENT_READ;
Py_RETURN_NONE;
}
PyErr_SetString(PyExc_RuntimeError, errno_s);
@ -2117,40 +2136,40 @@ pysocket_async_recv(struct pysocket_op *op)
break;
}
op->data.coro->exception = NULL;
op->data.coro->exception_msg = NULL;
op->coro->exception = NULL;
op->coro->exception_msg = NULL;
if (op->data.timer != NULL) {
kore_timer_remove(op->data.timer);
op->data.timer = NULL;
if (op->timer != NULL) {
kore_timer_remove(op->timer);
op->timer = NULL;
}
if (op->data.type == PYSOCKET_TYPE_RECV && ret == 0) {
if (op->type == PYSOCKET_TYPE_RECV && ret == 0) {
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
ptr = (const char *)op->data.buffer.data;
ptr = (const char *)op->buffer.data;
if ((bytes = PyBytes_FromStringAndSize(ptr, ret)) == NULL)
return (NULL);
if (op->data.type == PYSOCKET_TYPE_RECV) {
if (op->type == PYSOCKET_TYPE_RECV) {
PyErr_SetObject(PyExc_StopIteration, bytes);
Py_DECREF(bytes);
return (NULL);
}
switch(op->data.socket->family) {
switch(op->socket->family) {
case AF_INET:
port = ntohs(op->data.sendaddr.ipv4.sin_port);
ip = inet_ntoa(op->data.sendaddr.ipv4.sin_addr);
port = ntohs(op->sendaddr.ipv4.sin_port);
ip = inet_ntoa(op->sendaddr.ipv4.sin_addr);
if ((tuple = Py_BuildValue("(sHN)", ip, port, bytes)) == NULL)
return (NULL);
break;
case AF_UNIX:
if ((tuple = Py_BuildValue("(sN)",
op->data.sendaddr.sun.sun_path, bytes)) == NULL)
op->sendaddr.sun.sun_path, bytes)) == NULL)
return (NULL);
break;
default:
@ -2177,32 +2196,32 @@ pysocket_async_send(struct pysocket_op *op)
socklen_t socklen;
const struct sockaddr *sendaddr;
if (!(op->data.evt.flags & KORE_EVENT_WRITE)) {
if (!(op->socket->event.evt.flags & KORE_EVENT_WRITE)) {
Py_RETURN_NONE;
}
for (;;) {
if (op->data.type == PYSOCKET_TYPE_SEND) {
ret = write(op->data.fd,
op->data.buffer.data + op->data.buffer.offset,
op->data.buffer.length - op->data.buffer.offset);
if (op->type == PYSOCKET_TYPE_SEND) {
ret = write(op->socket->fd,
op->buffer.data + op->buffer.offset,
op->buffer.length - op->buffer.offset);
} else {
sendaddr = (const struct sockaddr *)&op->data.sendaddr;
sendaddr = (const struct sockaddr *)&op->sendaddr;
switch (op->data.socket->family) {
switch (op->socket->family) {
case AF_INET:
socklen = sizeof(op->data.sendaddr.ipv4);
socklen = sizeof(op->sendaddr.ipv4);
break;
case AF_UNIX:
socklen = sizeof(op->data.sendaddr.sun);
socklen = sizeof(op->sendaddr.sun);
break;
default:
fatal("non AF_INET/AF_UNIX in %s", __func__);
}
ret = sendto(op->data.fd,
op->data.buffer.data + op->data.buffer.offset,
op->data.buffer.length - op->data.buffer.offset,
ret = sendto(op->socket->fd,
op->buffer.data + op->buffer.offset,
op->buffer.length - op->buffer.offset,
0, sendaddr, socklen);
}
@ -2210,7 +2229,8 @@ pysocket_async_send(struct pysocket_op *op)
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
op->data.evt.flags &= ~KORE_EVENT_WRITE;
op->socket->event.evt.flags &=
~KORE_EVENT_WRITE;
Py_RETURN_NONE;
}
PyErr_SetString(PyExc_RuntimeError, errno_s);
@ -2219,9 +2239,9 @@ pysocket_async_send(struct pysocket_op *op)
break;
}
op->data.buffer.offset += (size_t)ret;
op->buffer.offset += (size_t)ret;
if (op->data.buffer.offset == op->data.buffer.length) {
if (op->buffer.offset == op->buffer.length) {
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
@ -2232,25 +2252,24 @@ pysocket_async_send(struct pysocket_op *op)
static void
pysocket_evt_handle(void *arg, int eof)
{
struct pysocket_data *data = arg;
struct python_coro *coro = data->coro;
struct pysocket_event *event = arg;
struct pysocket *socket = event->s;
if (coro->sockop == NULL)
fatal("pysocket_evt_handle: sockop == NULL");
if ((event->evt.flags & KORE_EVENT_READ) && socket->recvop != NULL) {
if (socket->recvop->coro->request != NULL)
http_request_wakeup(socket->recvop->coro->request);
else
python_coro_wakeup(socket->recvop->coro);
socket->recvop->eof = eof;
}
/*
* If we are a coroutine tied to an HTTP request wake-up the
* HTTP request instead. That in turn will wakeup the coro and
* continue it.
*
* Otherwise just wakeup the coroutine so it will run next tick.
*/
if (coro->request != NULL)
http_request_wakeup(coro->request);
else
python_coro_wakeup(coro);
coro->sockop->data.eof = eof;
if ((event->evt.flags & KORE_EVENT_WRITE) && socket->sendop != NULL) {
if (socket->sendop->coro->request != NULL)
http_request_wakeup(socket->sendop->coro->request);
else
python_coro_wakeup(socket->sendop->coro);
socket->sendop->eof = eof;
}
}
static void
@ -2549,7 +2568,7 @@ pyproc_timeout(void *arg, u_int64_t now)
proc->timer = NULL;
if (proc->coro->sockop != NULL)
proc->coro->sockop->data.eof = 1;
proc->coro->sockop->eof = 1;
proc->coro->exception = PyExc_TimeoutError;
proc->coro->exception_msg = "timeout before process exited";
@ -2658,7 +2677,7 @@ pyproc_recv(struct pyproc *proc, PyObject *args)
op = (struct pysocket_op *)obj;
if (timeo != -1) {
op->data.timer = kore_timer_add(pysocket_op_timeout,
op->timer = kore_timer_add(pysocket_op_timeout,
timeo, op, KORE_TIMER_ONESHOT);
}