python pgsql changes.

- decouple pgsql from the HTTP request allowing it to be used in other
  contexts as well (such as a task, etc).

- change names to dbsetup() and dbquery().

eg:

result = kore.dbquery("db", "select foo from bar")
This commit is contained in:
Joris Vink 2019-09-04 19:57:28 +02:00
parent f3b7cba58c
commit 8e858983bf
5 changed files with 90 additions and 91 deletions

View File

@ -19,13 +19,13 @@
import json
import kore
# Register the path to our database when the worker starts.
def kore_worker_configure():
kore.register_database("db", "host=/tmp dbname=kore")
# Register the path to our database when Kore starts.
def kore_parent_configure(args):
kore.dbsetup("db", "host=/tmp dbname=kore")
# A handler that returns 200 OK with hello as body.
def hello(req):
req.response(200, b'hello\n')
req.response(200, b'hello\n')
#
# The query handler that fires of the query and returns a coroutine.
@ -33,19 +33,19 @@ def hello(req):
# Kore will resume this handler when the query returns a result or
# is succesfull.
#
# The req.pgsql() method can throw exceptions, most notably a
# The kore.pgsql() method can throw exceptions, most notably a
# GeneratorExit in case the client connection went away before
# the query was able to be completed.
#
# In this example we're not doing any exception handling.
#
async def query(req):
result = await req.pgsql("db", "SELECT * FROM coders")
req.response(200, json.dumps(result).encode("utf-8"))
result = await kore.dbquery("db", "SELECT * FROM coders")
req.response(200, json.dumps(result).encode("utf-8"))
#
# A slow query that returns after 10 seconds.
#
async def slow(req):
result = await req.pgsql("db", "SELECT * FROM pg_sleep(10)")
req.response(200, json.dumps(result).encode("utf-8"))
result = await kore.dbquery("db", "SELECT * FROM pg_sleep(10)")
req.response(200, json.dumps(result).encode("utf-8"))

View File

@ -52,6 +52,8 @@ static PyObject *python_kore_socket_wrap(PyObject *, PyObject *);
static PyObject *python_kore_gather(PyObject *, PyObject *, PyObject *);
#if defined(KORE_USE_PGSQL)
static PyObject *python_kore_pgsql_query(PyObject *, PyObject *,
PyObject *);
static PyObject *python_kore_pgsql_register(PyObject *, PyObject *);
#endif
@ -88,7 +90,9 @@ static struct PyMethodDef pykore_methods[] = {
METHOD("socket_wrap", python_kore_socket_wrap, METH_VARARGS),
METHOD("websocket_broadcast", python_websocket_broadcast, METH_VARARGS),
#if defined(KORE_USE_PGSQL)
METHOD("register_database", python_kore_pgsql_register, METH_VARARGS),
METHOD("dbsetup", python_kore_pgsql_register, METH_VARARGS),
METHOD("dbquery", python_kore_pgsql_query,
METH_VARARGS | METH_KEYWORDS),
#endif
#if defined(KORE_USE_CURL)
METHOD("httpclient", python_kore_httpclient,
@ -571,9 +575,6 @@ struct pyhttp_file {
static void pyhttp_dealloc(struct pyhttp_request *);
static void pyhttp_file_dealloc(struct pyhttp_file *);
#if defined(KORE_USE_PGSQL)
static PyObject *pyhttp_pgsql(struct pyhttp_request *, PyObject *, PyObject *);
#endif
static PyObject *pyhttp_cookie(struct pyhttp_request *, PyObject *);
static PyObject *pyhttp_response(struct pyhttp_request *, PyObject *);
static PyObject *pyhttp_argument(struct pyhttp_request *, PyObject *);
@ -589,9 +590,6 @@ static PyObject *pyhttp_websocket_handshake(struct pyhttp_request *,
PyObject *);
static PyMethodDef pyhttp_request_methods[] = {
#if defined(KORE_USE_PGSQL)
METHOD("pgsql", pyhttp_pgsql, METH_VARARGS | METH_KEYWORDS),
#endif
METHOD("cookie", pyhttp_cookie, METH_VARARGS),
METHOD("response", pyhttp_response, METH_VARARGS),
METHOD("argument", pyhttp_argument, METH_VARARGS),
@ -759,14 +757,15 @@ static PyTypeObject pyhttp_client_op_type = {
struct pykore_pgsql {
PyObject_HEAD
struct kore_pgsql sql;
int state;
int binary;
struct kore_pgsql sql;
char *db;
struct http_request *req;
struct python_coro *coro;
char *query;
PyObject *result;
struct {
int count;
const char **values;
@ -777,8 +776,6 @@ struct pykore_pgsql {
};
static void pykore_pgsql_dealloc(struct pykore_pgsql *);
static int pykore_pgsql_result(struct pykore_pgsql *);
static int pykore_pgsql_params(struct pykore_pgsql *, PyObject *);
static PyObject *pykore_pgsql_await(PyObject *);
static PyObject *pykore_pgsql_iternext(struct pykore_pgsql *);

View File

@ -37,6 +37,10 @@
#include "curl.h"
#endif
#if defined(KORE_USE_PGSQL)
#include "pgsql.h"
#endif
#if defined(KORE_USE_PYTHON)
#include "python_api.h"
#endif
@ -227,6 +231,9 @@ main(int argc, char *argv[])
http_parent_init();
#if defined(KORE_USE_CURL)
kore_curl_sysinit();
#endif
#if defined(KORE_USE_PGSQL)
kore_pgsql_sys_init();
#endif
kore_auth_init();
kore_validator_init();

View File

@ -89,8 +89,9 @@ static int pyhttp_iterobj_next(struct pyhttp_iterobj *);
static void pyhttp_iterobj_disconnect(struct connection *);
#if defined(KORE_USE_PGSQL)
static PyObject *pykore_pgsql_alloc(struct http_request *,
const char *, const char *, PyObject *);
static int pykore_pgsql_result(struct pykore_pgsql *);
static void pykore_pgsql_callback(struct kore_pgsql *, void *);
static int pykore_pgsql_params(struct pykore_pgsql *, PyObject *);
static int pykore_pgsql_params(struct pykore_pgsql *, PyObject *);
#endif
@ -3829,70 +3830,50 @@ pyhttp_file_get_filename(struct pyhttp_file *pyfile, void *closure)
}
#if defined(KORE_USE_PGSQL)
static void
pykore_pgsql_dealloc(struct pykore_pgsql *pysql)
{
Py_ssize_t i;
kore_free(pysql->db);
kore_free(pysql->query);
kore_pgsql_cleanup(&pysql->sql);
if (pysql->result != NULL)
Py_DECREF(pysql->result);
for (i = 0; i < pysql->param.count; i++)
Py_XDECREF(pysql->param.objs[i]);
kore_free(pysql->param.objs);
kore_free(pysql->param.values);
kore_free(pysql->param.lengths);
kore_free(pysql->param.formats);
PyObject_Del((PyObject *)pysql);
}
static PyObject *
pykore_pgsql_alloc(struct http_request *req, const char *db, const char *query,
PyObject *kwargs)
python_kore_pgsql_query(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct pykore_pgsql *op;
PyObject *obj;
struct pykore_pgsql *pysql;
const char *db, *query;
pysql = PyObject_New(struct pykore_pgsql, &pykore_pgsql_type);
if (pysql == NULL)
if (!PyArg_ParseTuple(args, "ss", &db, &query))
return (NULL);
pysql->req = req;
pysql->result = NULL;
pysql->db = kore_strdup(db);
pysql->query = kore_strdup(query);
pysql->state = PYKORE_PGSQL_PREINIT;
op = PyObject_New(struct pykore_pgsql, &pykore_pgsql_type);
if (op == NULL)
return (NULL);
pysql->binary = 0;
pysql->param.count = 0;
pysql->param.objs = NULL;
pysql->param.values = NULL;
pysql->param.lengths = NULL;
pysql->param.formats = NULL;
op->binary = 0;
op->param.count = 0;
op->param.objs = NULL;
op->param.values = NULL;
op->param.lengths = NULL;
op->param.formats = NULL;
memset(&pysql->sql, 0, sizeof(pysql->sql));
op->result = NULL;
op->coro = coro_running;
op->db = kore_strdup(db);
op->query = kore_strdup(query);
op->state = PYKORE_PGSQL_PREINIT;
memset(&op->sql, 0, sizeof(op->sql));
if (kwargs != NULL) {
if ((obj = PyDict_GetItemString(kwargs, "params")) != NULL) {
if (!pykore_pgsql_params(pysql, obj)) {
Py_DECREF((PyObject *)pysql);
if (!pykore_pgsql_params(op, obj)) {
Py_DECREF((PyObject *)op);
return (NULL);
}
}
if ((obj = PyDict_GetItemString(kwargs, "binary")) != NULL) {
if (obj == Py_True) {
pysql->binary = 1;
op->binary = 1;
} else if (obj == Py_False) {
pysql->binary = 0;
op->binary = 0;
} else {
Py_DECREF((PyObject *)pysql);
Py_DECREF((PyObject *)op);
PyErr_SetString(PyExc_RuntimeError,
"pgsql: binary not True or False");
return (NULL);
@ -3900,7 +3881,7 @@ pykore_pgsql_alloc(struct http_request *req, const char *db, const char *query,
}
}
return ((PyObject *)pysql);
return ((PyObject *)op);
}
static int
@ -3968,13 +3949,37 @@ pykore_pgsql_params(struct pykore_pgsql *op, PyObject *list)
return (KORE_RESULT_OK);
}
static void
pykore_pgsql_dealloc(struct pykore_pgsql *pysql)
{
Py_ssize_t i;
kore_free(pysql->db);
kore_free(pysql->query);
kore_pgsql_cleanup(&pysql->sql);
if (pysql->result != NULL)
Py_DECREF(pysql->result);
for (i = 0; i < pysql->param.count; i++)
Py_XDECREF(pysql->param.objs[i]);
kore_free(pysql->param.objs);
kore_free(pysql->param.values);
kore_free(pysql->param.lengths);
kore_free(pysql->param.formats);
PyObject_Del((PyObject *)pysql);
}
static PyObject *
pykore_pgsql_iternext(struct pykore_pgsql *pysql)
{
switch (pysql->state) {
case PYKORE_PGSQL_PREINIT:
kore_pgsql_init(&pysql->sql);
kore_pgsql_bind_request(&pysql->sql, pysql->req);
kore_pgsql_bind_callback(&pysql->sql,
pykore_pgsql_callback, pysql);
pysql->state = PYKORE_PGSQL_INITIALIZE;
/* fallthrough */
case PYKORE_PGSQL_INITIALIZE:
@ -4009,6 +4014,7 @@ wait_again:
PyErr_SetObject(PyExc_StopIteration,
pysql->result);
Py_DECREF(pysql->result);
pysql->result = NULL;
} else {
PyErr_SetObject(PyExc_StopIteration, Py_None);
}
@ -4035,6 +4041,17 @@ wait_again:
Py_RETURN_NONE;
}
static void
pykore_pgsql_callback(struct kore_pgsql *pgsql, void *arg)
{
struct pykore_pgsql *op = arg;
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
}
static PyObject *
pykore_pgsql_await(PyObject *obj)
{
@ -4113,24 +4130,6 @@ pykore_pgsql_result(struct pykore_pgsql *pysql)
return (KORE_RESULT_OK);
}
static PyObject *
pyhttp_pgsql(struct pyhttp_request *pyreq, PyObject *args, PyObject *kwargs)
{
PyObject *obj;
const char *db, *query;
if (!PyArg_ParseTuple(args, "ss", &db, &query))
return (NULL);
if ((obj = pykore_pgsql_alloc(pyreq->req, db, query, kwargs)) == NULL)
return (PyErr_NoMemory());
Py_INCREF(obj);
pyreq->data = obj;
return ((PyObject *)obj);
}
#endif
#if defined(KORE_USE_CURL)

View File

@ -371,10 +371,6 @@ kore_worker_entry(struct kore_worker *kw)
accept_avail = 1;
worker_active_connections = 0;
#if defined(KORE_USE_PGSQL)
kore_pgsql_sys_init();
#endif
#if defined(KORE_USE_TASKS)
kore_task_init();
#endif