From 4ca7f29649bbd7dbfae8e93a66a7696f95fdb51d Mon Sep 17 00:00:00 2001 From: Joris Vink Date: Mon, 25 Mar 2019 10:13:52 +0100 Subject: [PATCH] Add a concurrency parameter to kore.gather() --- include/kore/python_methods.h | 6 ++- src/python.c | 69 +++++++++++++++++++++++++++++------ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/include/kore/python_methods.h b/include/kore/python_methods.h index 9c4872b..ec6633b 100644 --- a/include/kore/python_methods.h +++ b/include/kore/python_methods.h @@ -41,13 +41,13 @@ static PyObject *python_kore_timer(PyObject *, PyObject *); static PyObject *python_kore_fatal(PyObject *, PyObject *); static PyObject *python_kore_queue(PyObject *, PyObject *); static PyObject *python_kore_tracer(PyObject *, PyObject *); -static PyObject *python_kore_gather(PyObject *, PyObject *); static PyObject *python_kore_fatalx(PyObject *, PyObject *); static PyObject *python_kore_suspend(PyObject *, PyObject *); static PyObject *python_kore_shutdown(PyObject *, PyObject *); static PyObject *python_kore_bind_unix(PyObject *, PyObject *); static PyObject *python_kore_task_create(PyObject *, PyObject *); static PyObject *python_kore_socket_wrap(PyObject *, PyObject *); +static PyObject *python_kore_gather(PyObject *, PyObject *, PyObject *); #if defined(KORE_USE_PGSQL) static PyObject *python_kore_pgsql_register(PyObject *, PyObject *); @@ -69,7 +69,7 @@ static struct PyMethodDef pykore_methods[] = { METHOD("timer", python_kore_timer, METH_VARARGS), METHOD("queue", python_kore_queue, METH_VARARGS), METHOD("tracer", python_kore_tracer, METH_VARARGS), - METHOD("gather", python_kore_gather, METH_VARARGS), + METHOD("gather", python_kore_gather, METH_VARARGS | METH_KEYWORDS), METHOD("fatal", python_kore_fatal, METH_VARARGS), METHOD("fatalx", python_kore_fatalx, METH_VARARGS), METHOD("suspend", python_kore_suspend, METH_VARARGS), @@ -460,6 +460,8 @@ struct pygather_result { struct pygather_op { PyObject_HEAD int count; + int running; + int concurrency; struct python_coro *coro; TAILQ_HEAD(, pygather_result) results; TAILQ_HEAD(, pygather_coro) coroutines; diff --git a/src/python.c b/src/python.c index 4f98bd5..3bc633f 100644 --- a/src/python.c +++ b/src/python.c @@ -50,6 +50,7 @@ static struct python_coro *python_coro_create(PyObject *, struct http_request *); static int python_coro_run(struct python_coro *); static void python_coro_wakeup(struct python_coro *); +static void python_coro_suspend(struct python_coro *); static void pysocket_evt_handle(void *, int); static void pysocket_op_timeout(void *, u_int64_t); @@ -238,11 +239,9 @@ void kore_python_coro_run(void) { struct pygather_op *op; - struct python_coro *coro, *next; - - for (coro = TAILQ_FIRST(&coro_runnable); coro != NULL; coro = next) { - next = TAILQ_NEXT(coro, list); + struct python_coro *coro; + while ((coro = TAILQ_FIRST(&coro_runnable)) != NULL) { if (coro->state != CORO_STATE_RUNNABLE) fatal("non-runnable coro on coro_runnable"); @@ -563,10 +562,7 @@ python_coro_run(struct python_coro *coro) Py_DECREF(item); } - coro->state = CORO_STATE_SUSPENDED; - TAILQ_REMOVE(&coro_runnable, coro, list); - TAILQ_INSERT_HEAD(&coro_suspended, coro, list); - + python_coro_suspend(coro); coro_running = NULL; if (coro->request != NULL) @@ -586,6 +582,17 @@ python_coro_wakeup(struct python_coro *coro) TAILQ_INSERT_HEAD(&coro_runnable, coro, list); } +static void +python_coro_suspend(struct python_coro *coro) +{ + if (coro->state != CORO_STATE_RUNNABLE) + return; + + coro->state = CORO_STATE_SUSPENDED; + TAILQ_REMOVE(&coro_runnable, coro, list); + TAILQ_INSERT_HEAD(&coro_suspended, coro, list); +} + static void pyconnection_dealloc(struct pyconnection *pyc) { @@ -1185,12 +1192,13 @@ python_kore_tracer(PyObject *self, PyObject *args) } static PyObject * -python_kore_gather(PyObject *self, PyObject *args) +python_kore_gather(PyObject *self, PyObject *args, PyObject *kwargs) { struct pygather_op *op; PyObject *obj; struct pygather_coro *coro; Py_ssize_t sz, idx; + int concurrency; if (coro_running == NULL) { PyErr_SetString(PyExc_RuntimeError, @@ -1205,12 +1213,33 @@ python_kore_gather(PyObject *self, PyObject *args) return (NULL); } + if (kwargs != NULL && + (obj = PyDict_GetItemString(kwargs, "concurrency")) != NULL) { + if (!PyLong_Check(obj)) { + PyErr_SetString(PyExc_TypeError, + "concurrency level must be an integer"); + return (NULL); + } + + PyErr_Clear(); + concurrency = (int)PyLong_AsLong(obj); + if (concurrency == -1 && PyErr_Occurred()) + return (NULL); + + if (concurrency == 0) + concurrency = sz; + } else { + concurrency = sz; + } + op = PyObject_New(struct pygather_op, &pygather_op_type); if (op == NULL) return (NULL); + op->running = 0; op->count = (int)sz; op->coro = coro_running; + op->concurrency = concurrency; TAILQ_INIT(&op->results); TAILQ_INIT(&op->coroutines); @@ -1230,11 +1259,14 @@ python_kore_gather(PyObject *self, PyObject *args) Py_INCREF(obj); coro = kore_pool_get(&gather_coro_pool); - coro->coro = python_coro_create(obj, NULL); coro->coro->gatherop = op; - TAILQ_INSERT_TAIL(&op->coroutines, coro, list); + + if (idx > concurrency - 1) + python_coro_suspend(coro->coro); + else + op->running++; } return ((PyObject *)op); @@ -2908,6 +2940,10 @@ pygather_reap_coro(struct pygather_op *op, struct python_coro *reap) if (coro == NULL) fatal("coroutine %" PRIu64 " not found in gather", reap->id); + op->running--; + if (op->running < 0) + fatal("gatherop: running miscount (%d)", op->running); + result = kore_pool_get(&gather_result_pool); result->obj = NULL; @@ -2974,10 +3010,21 @@ static PyObject * pygather_op_iternext(struct pygather_op *op) { int idx; + struct pygather_coro *coro; struct pygather_result *res, *next; PyObject *list, *obj; if (!TAILQ_EMPTY(&op->coroutines)) { + if (op->running > 0) + Py_RETURN_NONE; + + TAILQ_FOREACH(coro, &op->coroutines, list) { + if (op->running >= op->concurrency) + break; + python_coro_wakeup(coro->coro); + op->running++; + } + Py_RETURN_NONE; }