kore/src/python.c

7034 lines
152 KiB
C
Raw Permalink Normal View History

/*
* Copyright (c) 2016 Stanislav Yudin <stan@endlessinsomnia.com>
2022-01-31 22:02:06 +01:00
* Copyright (c) 2017-2022 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
2018-11-29 09:11:31 +01:00
#include <sys/param.h>
2018-07-07 13:23:43 +02:00
#include <sys/types.h>
#include <sys/stat.h>
2018-07-07 13:23:43 +02:00
#include <sys/socket.h>
#include <sys/wait.h>
2019-03-01 20:56:03 +01:00
#include <sys/un.h>
#include <ctype.h>
#include <libgen.h>
#include <inttypes.h>
#include <signal.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdarg.h>
#include <stddef.h>
#include "kore.h"
#include "http.h"
#if defined(KORE_USE_PGSQL)
#include "pgsql.h"
#endif
#if defined(KORE_USE_CURL)
#include "curl.h"
#endif
2019-11-13 23:01:24 +01:00
#if defined(KORE_USE_ACME)
#include "acme.h"
#endif
#include "python_api.h"
#include "python_methods.h"
#if defined(KORE_USE_CURL)
#include "python_curlopt.h"
#endif
#include <frameobject.h>
#if PY_VERSION_HEX >= 0x030b0000
#include <internal/pycore_frame.h>
#endif
#if PY_VERSION_HEX < 0x030A0000
typedef enum {
PYGEN_RETURN = 0,
PYGEN_ERROR = -1,
PYGEN_NEXT = 1,
} PySendResult;
#endif
struct reqcall {
PyObject *f;
2019-06-07 21:33:00 +02:00
TAILQ_ENTRY(reqcall) list;
};
union deconst {
char *p;
const char *cp;
};
2019-06-07 21:33:00 +02:00
TAILQ_HEAD(reqcall_list, reqcall);
2021-07-10 10:02:46 +02:00
PyMODINIT_FUNC python_module_init(void);
static PyObject *python_import(const char *);
static int python_resolve_frame_line(void *);
static PyObject *pyconnection_alloc(struct connection *);
static PyObject *python_callable(PyObject *, const char *);
static void python_split_arguments(char *, char **, size_t);
static void python_kore_recvobj(struct kore_msg *, const void *);
static PyObject *python_cmsg_to_list(struct msghdr *);
static const char *python_string_from_dict(PyObject *, const char *);
static int python_bool_from_dict(PyObject *, const char *, int *);
static int python_long_from_dict(PyObject *, const char *, long *);
static int pyhttp_response_sent(struct netbuf *);
2017-02-01 21:23:56 +01:00
static PyObject *pyhttp_file_alloc(struct http_file *);
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
static PyObject *pyhttp_request_alloc(const struct http_request *);
static struct python_coro *python_coro_create(PyObject *,
struct http_request *);
static struct kore_domain *python_route_domain_resolve(struct pyroute *);
static int python_route_install(struct pyroute *);
static int python_route_params(PyObject *, struct kore_route *,
const char *, int, int);
static int python_route_methods(PyObject *, PyObject *,
struct kore_route *);
static int python_route_auth(PyObject *, struct kore_route *);
static int python_route_hooks(PyObject *, struct kore_route *);
static int python_route_hook_set(PyObject *, const char *,
struct kore_runtime_call **);
static int python_coro_run(struct python_coro *);
static void python_coro_wakeup(struct python_coro *);
static void python_coro_suspend(struct python_coro *);
static void python_coro_trace(const char *, struct python_coro *);
static void pysocket_evt_handle(void *, int);
static void pysocket_op_timeout(void *, u_int64_t);
static PyObject *pysocket_op_create(struct pysocket *,
int, const void *, size_t);
static struct pysocket *pysocket_alloc(void);
static PyObject *pysocket_async_recv(struct pysocket_op *);
static PyObject *pysocket_async_send(struct pysocket_op *);
static PyObject *pysocket_async_accept(struct pysocket_op *);
static PyObject *pysocket_async_connect(struct pysocket_op *);
static void pylock_do_release(struct pylock *);
2018-10-23 20:44:43 +02:00
static void pytimer_run(void *, u_int64_t);
static void pyproc_timeout(void *, u_int64_t);
static void pysuspend_wakeup(void *, u_int64_t);
static void pygather_reap_coro(struct pygather_op *,
struct python_coro *);
2019-06-07 21:06:54 +02:00
static int pyhttp_preprocess(struct http_request *);
static int pyhttp_iterobj_chunk_sent(struct netbuf *);
static int pyhttp_iterobj_next(struct pyhttp_iterobj *);
static void pyhttp_iterobj_disconnect(struct connection *);
static int pyconnection_x509_cb(void *, int, int, const char *,
const void *, size_t, int);
#if defined(KORE_USE_PGSQL)
static int pykore_pgsql_result(struct pykore_pgsql *);
static void pykore_pgsql_callback(struct kore_pgsql *, void *);
static int pykore_pgsql_params(struct pykore_pgsql *, PyObject *);
static int pykore_pgsql_params(struct pykore_pgsql *, PyObject *);
#endif
#if defined(KORE_USE_CURL)
static void python_curl_http_callback(struct kore_curl *, void *);
static void python_curl_handle_callback(struct kore_curl *, void *);
static PyObject *pyhttp_client_request(struct pyhttp_client *, int,
PyObject *);
static PyObject *python_curlopt_set(struct pycurl_data *,
long, PyObject *);
static int python_curlopt_from_dict(struct pycurl_data *,
PyObject *);
#endif
static void python_append_path(const char *);
static void python_push_integer(PyObject *, const char *, long);
static void python_push_type(const char *, PyObject *, PyTypeObject *);
static int python_validator_check(PyObject *);
static int python_runtime_resolve(const char *, const struct stat *);
static int python_runtime_http_request(void *, struct http_request *);
static void python_runtime_http_request_free(void *, struct http_request *);
static void python_runtime_http_body_chunk(void *, struct http_request *,
const void *, size_t);
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
static int python_runtime_validator(void *, struct http_request *,
const void *);
static void python_runtime_wsmessage(void *, struct connection *,
u_int8_t, const void *, size_t);
static void python_runtime_execute(void *);
static int python_runtime_onload(void *, int);
static void python_runtime_signal(void *, int);
static void python_runtime_configure(void *, int, char **);
static void python_runtime_connect(void *, struct connection *);
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
static void python_module_load(struct kore_module *);
static void python_module_free(struct kore_module *);
static void python_module_reload(struct kore_module *);
static void *python_module_getsym(struct kore_module *, const char *);
static void *python_malloc(void *, size_t);
static void *python_calloc(void *, size_t, size_t);
static void *python_realloc(void *, void *, size_t);
static void python_free(void *, void *);
struct kore_module_functions kore_python_module = {
.free = python_module_free,
.load = python_module_load,
.getsym = python_module_getsym,
.reload = python_module_reload
};
struct kore_runtime kore_python_runtime = {
KORE_RUNTIME_PYTHON,
.resolve = python_runtime_resolve,
.http_request = python_runtime_http_request,
.http_body_chunk = python_runtime_http_body_chunk,
.http_request_free = python_runtime_http_request_free,
.validator = python_runtime_validator,
.wsconnect = python_runtime_connect,
.wsmessage = python_runtime_wsmessage,
.wsdisconnect = python_runtime_connect,
.onload = python_runtime_onload,
.signal = python_runtime_signal,
.connect = python_runtime_connect,
.execute = python_runtime_execute,
2018-10-23 20:44:43 +02:00
.configure = python_runtime_configure,
};
static struct {
const char *symbol;
int value;
} python_integers[] = {
{ "LOG_ERR", LOG_ERR },
{ "LOG_INFO", LOG_INFO },
{ "LOG_NOTICE", LOG_NOTICE },
{ "RESULT_OK", KORE_RESULT_OK },
{ "RESULT_RETRY", KORE_RESULT_RETRY },
{ "RESULT_ERROR", KORE_RESULT_ERROR },
{ "MODULE_LOAD", KORE_MODULE_LOAD },
{ "MODULE_UNLOAD", KORE_MODULE_UNLOAD },
2018-10-23 20:44:43 +02:00
{ "TIMER_ONESHOT", KORE_TIMER_ONESHOT },
{ "CONN_PROTO_HTTP", CONN_PROTO_HTTP },
{ "CONN_PROTO_UNKNOWN", CONN_PROTO_UNKNOWN },
{ "CONN_PROTO_WEBSOCKET", CONN_PROTO_WEBSOCKET },
{ "CONN_STATE_ESTABLISHED", CONN_STATE_ESTABLISHED },
{ "HTTP_METHOD_GET", HTTP_METHOD_GET },
{ "HTTP_METHOD_PUT", HTTP_METHOD_PUT },
{ "HTTP_METHOD_HEAD", HTTP_METHOD_HEAD },
{ "HTTP_METHOD_POST", HTTP_METHOD_POST },
{ "HTTP_METHOD_DELETE", HTTP_METHOD_DELETE },
{ "HTTP_METHOD_OPTIONS", HTTP_METHOD_OPTIONS },
{ "HTTP_METHOD_PATCH", HTTP_METHOD_PATCH },
{ "WEBSOCKET_OP_TEXT", WEBSOCKET_OP_TEXT },
{ "WEBSOCKET_OP_BINARY", WEBSOCKET_OP_BINARY },
{ "WEBSOCKET_BROADCAST_LOCAL", WEBSOCKET_BROADCAST_LOCAL },
{ "WEBSOCKET_BROADCAST_GLOBAL", WEBSOCKET_BROADCAST_GLOBAL },
{ NULL, -1 }
};
static PyMemAllocatorEx allocator = {
.ctx = NULL,
.malloc = python_malloc,
.calloc = python_calloc,
.realloc = python_realloc,
.free = python_free
};
#if defined(__linux__)
#include "seccomp.h"
static struct sock_filter filter_python[] = {
/* Required for kore.proc */
#if defined(SYS_dup2)
KORE_SYSCALL_ALLOW(dup2),
#endif
#if defined(SYS_dup3)
KORE_SYSCALL_ALLOW(dup3),
#endif
#if defined(SYS_pipe)
KORE_SYSCALL_ALLOW(pipe),
#endif
#if defined(SYS_pipe2)
KORE_SYSCALL_ALLOW(pipe2),
#endif
KORE_SYSCALL_ALLOW(wait4),
KORE_SYSCALL_ALLOW(execve),
/* Socket related. */
KORE_SYSCALL_ALLOW(bind),
KORE_SYSCALL_ALLOW(listen),
KORE_SYSCALL_ALLOW(sendto),
KORE_SYSCALL_ALLOW(recvfrom),
KORE_SYSCALL_ALLOW(getsockname),
KORE_SYSCALL_ALLOW(getpeername),
KORE_SYSCALL_ALLOW_ARG(socket, 0, AF_INET),
KORE_SYSCALL_ALLOW_ARG(socket, 0, AF_INET6),
KORE_SYSCALL_ALLOW_ARG(socket, 0, AF_UNIX),
};
#define PYSECCOMP_ACTION_ALLOW 1
#define PYSECCOMP_ACTION_DENY 2
#define PYSECCOMP_SYSCALL_FILTER 1
#define PYSECCOMP_SYSCALL_ARG 2
#define PYSECCOMP_SYSCALL_MASK 3
#define PYSECCOMP_SYSCALL_FLAG 4
static int pyseccomp_filter_install(struct pyseccomp *,
const char *, int, int, int, int);
static PyObject *pyseccomp_common_action(struct pyseccomp *, PyObject *,
PyObject *, int, int);
static struct pyseccomp *py_seccomp = NULL;
#endif
static TAILQ_HEAD(, pyproc) procs;
static TAILQ_HEAD(, pyroute) routes;
static struct reqcall_list prereq;
static struct kore_pool coro_pool;
static struct kore_pool iterobj_pool;
static struct kore_pool queue_wait_pool;
static struct kore_pool gather_coro_pool;
static struct kore_pool queue_object_pool;
static struct kore_pool gather_result_pool;
static u_int64_t coro_id;
static int coro_count;
static int coro_tracing;
static struct coro_list coro_runnable;
static struct coro_list coro_suspended;
extern const char *__progname;
static PyObject *pickle = NULL;
static PyObject *kore_app = NULL;
static PyObject *pickle_dumps = NULL;
static PyObject *pickle_loads = NULL;
static PyObject *python_tracer = NULL;
/* XXX */
static struct python_coro *coro_running = NULL;
#if !defined(KORE_SINGLE_BINARY)
static const char *kore_pymodule = NULL;
#endif
void
kore_python_init(void)
{
struct kore_runtime_call *rcall;
coro_id = 0;
coro_count = 0;
coro_tracing = 0;
2019-06-07 21:33:00 +02:00
TAILQ_INIT(&prereq);
TAILQ_INIT(&procs);
TAILQ_INIT(&routes);
TAILQ_INIT(&coro_runnable);
TAILQ_INIT(&coro_suspended);
kore_pool_init(&coro_pool, "coropool", sizeof(struct python_coro), 100);
kore_pool_init(&iterobj_pool, "iterobj_pool",
sizeof(struct pyhttp_iterobj), 100);
kore_pool_init(&queue_wait_pool, "queue_wait_pool",
sizeof(struct pyqueue_waiting), 100);
kore_pool_init(&gather_coro_pool, "gather_coro_pool",
sizeof(struct pygather_coro), 100);
kore_pool_init(&queue_object_pool, "queue_object_pool",
sizeof(struct pyqueue_object), 100);
kore_pool_init(&gather_result_pool, "gather_result_pool",
sizeof(struct pygather_result), 100);
PyMem_SetAllocator(PYMEM_DOMAIN_OBJ, &allocator);
PyMem_SetAllocator(PYMEM_DOMAIN_MEM, &allocator);
PyMem_SetAllocator(PYMEM_DOMAIN_RAW, &allocator);
#if defined(KORE_DEBUG)
PyMem_SetupDebugHooks();
#endif
kore_msg_register(KORE_PYTHON_SEND_OBJ, python_kore_recvobj);
if (PyImport_AppendInittab("kore", &python_module_init) == -1)
fatal("kore_python_init: failed to add new module");
rcall = kore_runtime_getcall("kore_python_preinit");
if (rcall != NULL) {
kore_runtime_execute(rcall);
kore_free(rcall);
}
Py_InitializeEx(0);
if ((pickle = PyImport_ImportModule("pickle")) == NULL)
fatal("failed to import pickle module");
if ((pickle_dumps = PyObject_GetAttrString(pickle, "dumps")) == NULL)
fatal("pickle module has no dumps method");
if ((pickle_loads = PyObject_GetAttrString(pickle, "loads")) == NULL)
fatal("pickle module has no loads method");
#if defined(__linux__)
kore_seccomp_filter("python", filter_python,
KORE_FILTER_LEN(filter_python));
#endif
#if !defined(KORE_SINGLE_BINARY)
if (kore_pymodule) {
if (!kore_configure_setting("deployment", "dev"))
fatal("failed to set initial deployment");
}
#endif
}
void
kore_python_cleanup(void)
{
if (Py_IsInitialized()) {
PyErr_Clear();
Py_Finalize();
}
}
void
kore_python_path(const char *path)
{
python_append_path(path);
}
void
kore_python_coro_run(void)
{
struct pygather_op *op;
struct python_coro *coro;
while ((coro = TAILQ_FIRST(&coro_runnable)) != NULL) {
if (coro->state != CORO_STATE_RUNNABLE)
fatal("non-runnable coro on coro_runnable");
if (python_coro_run(coro) == KORE_RESULT_OK) {
if (coro->gatherop != NULL) {
op = coro->gatherop;
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
pygather_reap_coro(op, coro);
} else {
kore_python_coro_delete(coro);
}
}
}
/*
* Let Kore do HTTP processing so awoken coroutines run asap without
* having to wait for a tick from the event loop.
*
* Maybe it is more beneficial that we track if something related
* to HTTP requests was awoken and only run if true?
*/
http_process();
#if defined(KORE_USE_CURL)
/*
* If a coroutine fired off a curl instance, immediately
* let it make progress.
*/
kore_curl_do_timeout();
#endif
}
void
kore_python_coro_delete(void *obj)
{
struct python_coro *coro;
coro = obj;
coro_count--;
python_coro_trace(coro->killed ? "killed" : "deleted", coro);
2019-09-16 20:21:04 +02:00
coro_running = coro;
if (coro->lockop != NULL) {
coro->lockop->active = 0;
TAILQ_REMOVE(&coro->lockop->lock->ops, coro->lockop, list);
Py_DECREF((PyObject *)coro->lockop);
coro->lockop = NULL;
}
Py_DECREF(coro->obj);
coro_running = NULL;
if (coro->state == CORO_STATE_RUNNABLE)
TAILQ_REMOVE(&coro_runnable, coro, list);
else
TAILQ_REMOVE(&coro_suspended, coro, list);
kore_free(coro->name);
Py_XDECREF(coro->result);
kore_pool_put(&coro_pool, coro);
}
int
kore_python_coro_pending(void)
{
return (!TAILQ_EMPTY(&coro_runnable));
}
void
kore_python_routes_resolve(void)
{
struct pyroute *route;
while ((route = TAILQ_FIRST(&routes)) != NULL) {
TAILQ_REMOVE(&routes, route, list);
if (!python_route_install(route))
fatalx("failed to install route for %s", route->path);
Py_DECREF((PyObject *)route);
}
}
void
kore_python_log_error(const char *function)
{
const char *sval;
2018-11-29 09:51:24 +01:00
PyObject *ret, *repr, *type, *value, *traceback;
if (!PyErr_Occurred() || PyErr_ExceptionMatches(PyExc_StopIteration))
return;
PyErr_Fetch(&type, &value, &traceback);
if (type == NULL || value == NULL) {
2017-01-25 21:17:12 +01:00
kore_log(LOG_ERR, "unknown python exception in '%s'", function);
return;
}
if (value == NULL || !PyObject_IsInstance(value, type))
PyErr_NormalizeException(&type, &value, &traceback);
/*
* If we're in an active coroutine and it was tied to a gather
* operation we have to make sure we can use the Exception that
* was thrown as the result value so we can propagate it via the
* return list of kore.gather().
*/
if (coro_running != NULL && coro_running->gatherop != NULL) {
PyErr_SetObject(PyExc_StopIteration, value);
2018-11-29 09:51:24 +01:00
} else if (python_tracer != NULL) {
/*
* Call the user-supplied tracer callback.
*/
ret = PyObject_CallFunctionObjArgs(python_tracer,
type, value, traceback, NULL);
Py_XDECREF(ret);
} else {
if ((repr = PyObject_Repr(value)) == NULL)
sval = "unknown";
else
sval = PyUnicode_AsUTF8(repr);
kore_log(LOG_ERR,
"uncaught exception %s in '%s'", sval, function);
Py_XDECREF(repr);
}
Py_DECREF(type);
Py_DECREF(value);
Py_XDECREF(traceback);
}
void
kore_python_proc_reap(void)
{
struct pyproc *proc;
struct python_coro *coro;
pid_t child;
int status;
for (;;) {
if ((child = waitpid(-1, &status, WNOHANG)) == -1) {
if (errno == ECHILD)
return;
if (errno == EINTR)
continue;
kore_log(LOG_NOTICE, "waitpid: %s", errno_s);
return;
}
if (child == 0)
return;
proc = NULL;
TAILQ_FOREACH(proc, &procs, list) {
if (proc->pid == child)
break;
}
if (proc == NULL)
continue;
proc->pid = -1;
proc->reaped = 1;
proc->status = status;
if (proc->timer != NULL) {
kore_timer_remove(proc->timer);
proc->timer = NULL;
}
/*
* If someone is waiting on proc.reap() then wakeup that
* coroutine, otherwise wakeup the coroutine that created
* the process.
*/
if (proc->op != NULL)
coro = proc->op->coro;
else
coro = proc->coro;
if (coro->request != NULL)
http_request_wakeup(coro->request);
else
python_coro_wakeup(coro);
}
}
#if defined(__linux__)
void
kore_python_seccomp_hook(const char *method)
{
struct kore_runtime *rt;
PyObject *func, *result;
2019-10-04 11:34:49 +02:00
if ((func = kore_module_getsym(method, &rt)) == NULL)
return;
if (rt->type != KORE_RUNTIME_PYTHON)
return;
py_seccomp = PyObject_New(struct pyseccomp, &pyseccomp_type);
if (py_seccomp == NULL)
fatal("failed to create seccomp object");
py_seccomp->elm = 0;
py_seccomp->filters = NULL;
result = PyObject_CallFunctionObjArgs(func,
(PyObject *)py_seccomp, NULL);
2019-10-04 11:34:49 +02:00
kore_python_log_error(method);
kore_seccomp_filter("koreapp", py_seccomp->filters, py_seccomp->elm);
Py_XDECREF(result);
}
void
kore_python_seccomp_cleanup(void)
{
Py_XDECREF(py_seccomp);
py_seccomp = NULL;
}
static void
pyseccomp_dealloc(struct pyseccomp *seccomp)
{
kore_free(seccomp->filters);
seccomp->elm = 0;
seccomp->filters = NULL;
}
static PyObject *
pyseccomp_bpf_stmt(struct pyseccomp *seccomp, PyObject *args)
{
u_int32_t k;
u_int16_t code;
size_t len, off;
struct sock_filter filter[1];
if (!PyArg_ParseTuple(args, "HI", &code, &k))
return (NULL);
filter[0].k = k;
filter[0].jt = 0;
filter[0].jf = 0;
filter[0].code = code;
len = sizeof(struct sock_filter);
off = seccomp->elm * sizeof(struct sock_filter);
seccomp->filters = kore_realloc(seccomp->filters, off + len);
memcpy(seccomp->filters + off, filter, len);
seccomp->elm += 1;
Py_RETURN_NONE;
}
static PyObject *
pyseccomp_allow(struct pyseccomp *seccomp, PyObject *args)
{
const char *syscall;
if (!PyArg_ParseTuple(args, "s", &syscall))
return (NULL);
if (!pyseccomp_filter_install(seccomp, syscall,
PYSECCOMP_SYSCALL_FILTER, 0, 0, SECCOMP_RET_ALLOW))
return (NULL);
Py_RETURN_NONE;
}
static PyObject *
pyseccomp_allow_arg(struct pyseccomp *seccomp, PyObject *args)
{
return (pyseccomp_common_action(seccomp, args, NULL,
PYSECCOMP_SYSCALL_ARG, PYSECCOMP_ACTION_ALLOW));
}
static PyObject *
pyseccomp_allow_flag(struct pyseccomp *seccomp, PyObject *args)
{
return (pyseccomp_common_action(seccomp, args, NULL,
PYSECCOMP_SYSCALL_FLAG, PYSECCOMP_ACTION_ALLOW));
}
static PyObject *
pyseccomp_allow_mask(struct pyseccomp *seccomp, PyObject *args)
{
return (pyseccomp_common_action(seccomp, args, NULL,
PYSECCOMP_SYSCALL_MASK, PYSECCOMP_ACTION_ALLOW));
}
static PyObject *
pyseccomp_deny(struct pyseccomp *seccomp, PyObject *args, PyObject *kwargs)
{
long err;
const char *syscall;
if (!PyArg_ParseTuple(args, "s", &syscall))
return (NULL);
err = EACCES;
if (kwargs != NULL)
python_long_from_dict(kwargs, "errno", &err);
if (!pyseccomp_filter_install(seccomp, syscall,
PYSECCOMP_SYSCALL_FILTER, 0, 0, SECCOMP_RET_ERRNO | (int)err))
return (NULL);
Py_RETURN_NONE;
}
static PyObject *
pyseccomp_deny_arg(struct pyseccomp *seccomp, PyObject *args, PyObject *kwargs)
{
return (pyseccomp_common_action(seccomp, args, kwargs,
PYSECCOMP_SYSCALL_ARG, PYSECCOMP_ACTION_DENY));
}
static PyObject *
pyseccomp_deny_flag(struct pyseccomp *seccomp, PyObject *args, PyObject *kwargs)
{
return (pyseccomp_common_action(seccomp, args, kwargs,
PYSECCOMP_SYSCALL_FLAG, PYSECCOMP_ACTION_DENY));
}
static PyObject *
pyseccomp_deny_mask(struct pyseccomp *seccomp, PyObject *args, PyObject *kwargs)
{
return (pyseccomp_common_action(seccomp, args, kwargs,
PYSECCOMP_SYSCALL_MASK, PYSECCOMP_ACTION_DENY));
}
static PyObject *
pyseccomp_common_action(struct pyseccomp *sc, PyObject *args,
PyObject *kwargs, int which, int action)
{
long err;
const char *syscall;
int arg, val;
if (!PyArg_ParseTuple(args, "sii", &syscall, &arg, &val))
return (NULL);
switch (action) {
case PYSECCOMP_ACTION_ALLOW:
action = SECCOMP_RET_ALLOW;
break;
case PYSECCOMP_ACTION_DENY:
err = EACCES;
if (kwargs != NULL)
python_long_from_dict(kwargs, "errno", &err);
action = SECCOMP_RET_ERRNO | (int)err;
break;
default:
fatal("%s: bad action %d", __func__, action);
}
if (!pyseccomp_filter_install(sc, syscall, which, arg, val, action))
return (NULL);
Py_RETURN_NONE;
}
static int
pyseccomp_filter_install(struct pyseccomp *seccomp, const char *syscall,
int which, int arg, int val, int action)
{
struct sock_filter *filter;
size_t elm, len, off;
switch (which) {
case PYSECCOMP_SYSCALL_FILTER:
filter = kore_seccomp_syscall_filter(syscall, action);
break;
case PYSECCOMP_SYSCALL_ARG:
filter = kore_seccomp_syscall_arg(syscall, action, arg, val);
break;
case PYSECCOMP_SYSCALL_MASK:
filter = kore_seccomp_syscall_mask(syscall, action, arg, val);
break;
case PYSECCOMP_SYSCALL_FLAG:
filter = kore_seccomp_syscall_flag(syscall, action, arg, val);
break;
default:
fatal("%s: invalid syscall instruction %d", __func__, which);
}
if (filter == NULL) {
PyErr_Format(PyExc_RuntimeError,
"system call '%s' does not exist", syscall);
return (KORE_RESULT_ERROR);
}
elm = 0;
/*
* Find the number of elements in the BPF program, by looking for
* the KORE_BPF_GUARD element.
*/
for (;;) {
if (filter[elm].code == USHRT_MAX &&
filter[elm].jt == UCHAR_MAX &&
filter[elm].jf == UCHAR_MAX &&
filter[elm].k == UINT_MAX)
break;
elm++;
}
len = elm * sizeof(struct sock_filter);
off = seccomp->elm * sizeof(struct sock_filter);
seccomp->filters = kore_realloc(seccomp->filters, off + len);
memcpy(seccomp->filters + off, filter, len);
seccomp->elm += elm;
kore_free(filter);
return (KORE_RESULT_OK);
}
#endif
static int
python_long_from_dict(PyObject *dict, const char *key, long *result)
{
PyObject *obj;
if ((obj = PyDict_GetItemString(dict, key)) == NULL)
return (KORE_RESULT_ERROR);
if (!PyLong_CheckExact(obj))
return (KORE_RESULT_ERROR);
PyErr_Clear();
*result = PyLong_AsLong(obj);
if (*result == -1 && PyErr_Occurred()) {
PyErr_Clear();
return (KORE_RESULT_ERROR);
}
return (KORE_RESULT_OK);
}
static int
python_bool_from_dict(PyObject *dict, const char *key, int *result)
{
PyObject *obj;
if ((obj = PyDict_GetItemString(dict, key)) == NULL)
return (KORE_RESULT_ERROR);
if (!PyBool_Check(obj))
return (KORE_RESULT_ERROR);
*result = (obj == Py_True);
return (KORE_RESULT_OK);
}
static const char *
python_string_from_dict(PyObject *dict, const char *key)
{
PyObject *obj;
if ((obj = PyDict_GetItemString(dict, key)) == NULL)
return (NULL);
if (!PyUnicode_Check(obj))
return (NULL);
return (PyUnicode_AsUTF8AndSize(obj, NULL));
}
static PyObject *
python_cmsg_to_list(struct msghdr *msg)
{
struct cmsghdr *c;
size_t len;
Py_ssize_t idx;
PyObject *list, *tuple;
if ((list = PyList_New(0)) == NULL)
return (NULL);
idx = 0;
for (c = CMSG_FIRSTHDR(msg); c != NULL; c = CMSG_NXTHDR(msg, c)) {
len = c->cmsg_len - sizeof(*c);
tuple = Py_BuildValue("(Iiiy#)", len,
c->cmsg_level, c->cmsg_type, CMSG_DATA(c), len);
if (tuple == NULL) {
Py_DECREF(list);
return (NULL);
}
/* Steals a reference to tuple. */
if (PyList_Insert(list, idx++, tuple) == -1) {
Py_DECREF(tuple);
Py_DECREF(list);
return (NULL);
}
}
return (list);
}
static void *
python_malloc(void *ctx, size_t len)
{
return (kore_malloc(len));
}
static void *
python_calloc(void *ctx, size_t memb, size_t len)
{
return (kore_calloc(memb, len));
}
static void *
python_realloc(void *ctx, void *ptr, size_t len)
{
return (kore_realloc(ptr, len));
}
static void
python_free(void *ctx, void *ptr)
{
kore_free(ptr);
}
static void
python_module_free(struct kore_module *module)
{
kore_free(module->path);
Py_DECREF(module->handle);
kore_free(module);
}
static void
python_split_arguments(char *args, char **argv, size_t elm)
{
size_t idx;
char *p, *line, *end;
if (elm <= 1)
fatal("not enough elements (%zu)", elm);
idx = 0;
line = args;
for (p = line; *p != '\0'; p++) {
if (idx >= elm - 1)
break;
if (*p == ' ') {
*p = '\0';
if (*line != '\0')
argv[idx++] = line;
line = p + 1;
continue;
}
if (*p != '"')
continue;
line = p + 1;
if ((end = strchr(line, '"')) == NULL)
break;
*end = '\0';
argv[idx++] = line;
line = end + 1;
while (isspace(*(unsigned char *)line))
line++;
p = line;
}
if (idx < elm - 1 && *line != '\0')
argv[idx++] = line;
argv[idx] = NULL;
}
static void
python_module_reload(struct kore_module *module)
{
PyObject *handle;
PyErr_Clear();
if ((handle = PyImport_ReloadModule(module->handle)) == NULL) {
kore_python_log_error("python_module_reload");
return;
}
Py_DECREF(module->handle);
module->handle = handle;
}
static void
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
python_module_load(struct kore_module *module)
{
module->handle = python_import(module->path);
if (module->handle == NULL)
fatal("%s: failed to import module", module->path);
}
static void *
python_module_getsym(struct kore_module *module, const char *symbol)
{
return (python_callable(module->handle, symbol));
}
static struct python_coro *
python_coro_create(PyObject *obj, struct http_request *req)
{
struct python_coro *coro;
if (!PyCoro_CheckExact(obj))
fatal("%s: object is not a coroutine", __func__);
coro = kore_pool_get(&coro_pool);
coro_count++;
coro->name = NULL;
coro->result = NULL;
coro->sockop = NULL;
coro->lockop = NULL;
coro->gatherop = NULL;
coro->exception = NULL;
coro->exception_msg = NULL;
coro->obj = obj;
coro->killed = 0;
coro->request = req;
coro->id = coro_id++;
coro->state = CORO_STATE_RUNNABLE;
TAILQ_INSERT_TAIL(&coro_runnable, coro, list);
if (coro->request != NULL)
http_request_sleep(coro->request);
2019-09-16 20:21:04 +02:00
python_coro_trace("created", coro);
return (coro);
2017-02-01 21:23:56 +01:00
}
static int
python_coro_run(struct python_coro *coro)
{
PySendResult res;
PyObject *item;
PyObject *type, *traceback;
if (coro->state != CORO_STATE_RUNNABLE)
fatal("non-runnable coro attempted to run");
coro_running = coro;
for (;;) {
python_coro_trace("running", coro);
PyErr_Clear();
#if PY_VERSION_HEX < 0x030A0000
res = PYGEN_RETURN;
item = _PyGen_Send((PyGenObject *)coro->obj, NULL);
#else
/*
* Python 3.10.x its PyIter_Send() will return a PYGEN_ERROR
* if the coro returned (instead of yielding) and the result
* ends up being Py_None. This means the returned item is
* NULL but no StopIteration exception has occurred.
*/
res = PyIter_Send(coro->obj, NULL, &item);
#endif
if (item == NULL || res == PYGEN_ERROR) {
Py_XDECREF(item);
if (coro->gatherop == NULL && PyErr_Occurred() &&
PyErr_ExceptionMatches(PyExc_StopIteration)) {
PyErr_Fetch(&type, &coro->result, &traceback);
Py_DECREF(type);
Py_XDECREF(traceback);
} else if (PyErr_Occurred()) {
kore_python_log_error("coroutine");
if (coro->request != NULL) {
http_response(coro->request,
HTTP_STATUS_INTERNAL_ERROR,
NULL, 0);
}
}
coro_running = NULL;
return (KORE_RESULT_OK);
}
#if PY_VERSION_HEX >= 0x030A0000
if (res == PYGEN_RETURN) {
coro->result = item;
coro_running = NULL;
return (KORE_RESULT_OK);
}
#endif
if (item == Py_None) {
Py_DECREF(item);
break;
}
Py_DECREF(item);
}
python_coro_suspend(coro);
coro_running = NULL;
if (coro->request != NULL)
http_request_sleep(coro->request);
return (KORE_RESULT_RETRY);
}
static void
python_coro_wakeup(struct python_coro *coro)
{
if (coro->state != CORO_STATE_SUSPENDED)
return;
coro->state = CORO_STATE_RUNNABLE;
TAILQ_REMOVE(&coro_suspended, coro, list);
TAILQ_INSERT_TAIL(&coro_runnable, coro, list);
python_coro_trace("wokeup", coro);
}
static void
python_coro_suspend(struct python_coro *coro)
{
if (coro->state != CORO_STATE_RUNNABLE)
return;
coro->state = CORO_STATE_SUSPENDED;
TAILQ_REMOVE(&coro_runnable, coro, list);
TAILQ_INSERT_TAIL(&coro_suspended, coro, list);
python_coro_trace("suspended", coro);
}
static int
python_resolve_frame_line(void *ptr)
{
int line;
#if PY_VERSION_HEX >= 0x030b0000
int addr;
_PyInterpreterFrame *frame;
frame = ptr;
addr = _PyInterpreterFrame_LASTI(frame) * sizeof(_Py_CODEUNIT);
line = PyCode_Addr2Line(frame->f_code, addr);
#else
line = PyFrame_GetLineNumber(ptr);
#endif
return (line);
}
static void
python_coro_trace(const char *label, struct python_coro *coro)
{
int line;
PyCoroObject *obj;
PyCodeObject *code;
#if PY_VERSION_HEX >= 0x030b0000
_PyInterpreterFrame *frame;
#else
PyFrameObject *frame;
#endif
const char *func, *fname, *file;
if (coro_tracing == 0)
return;
obj = (PyCoroObject *)coro->obj;
#if PY_VERSION_HEX >= 0x030b0000
frame = (_PyInterpreterFrame *)obj->cr_iframe;
#else
frame = obj->cr_frame;
#endif
if (frame != NULL && frame->f_code != NULL) {
code = frame->f_code;
2019-09-16 20:21:04 +02:00
func = PyUnicode_AsUTF8AndSize(code->co_name, NULL);
file = PyUnicode_AsUTF8AndSize(code->co_filename, NULL);
2019-09-16 20:21:04 +02:00
if ((fname = strrchr(file, '/')) == NULL)
fname = file;
else
fname++;
} else {
func = "unknown";
fname = "unknown";
}
if (frame != NULL)
line = python_resolve_frame_line(frame);
else
2019-09-16 20:21:04 +02:00
line = -1;
if (coro->name) {
kore_log(LOG_NOTICE, "coro '%s' %s <%s> @ [%s:%d]",
coro->name, label, func, fname, line);
} else {
kore_log(LOG_NOTICE, "coro %" PRIu64 " %s <%s> @ [%s:%d]",
coro->id, label, func, fname, line);
}
}
static void
pyconnection_dealloc(struct pyconnection *pyc)
{
PyObject_Del((PyObject *)pyc);
}
static void
pyhttp_dealloc(struct pyhttp_request *pyreq)
{
Py_XDECREF(pyreq->dict);
Py_XDECREF(pyreq->data);
PyObject_Del((PyObject *)pyreq);
}
static void
pyhttp_file_dealloc(struct pyhttp_file *pyfile)
{
PyObject_Del((PyObject *)pyfile);
}
static int
python_runtime_resolve(const char *module, const struct stat *st)
{
const char *ext;
if (!S_ISDIR(st->st_mode) && !S_ISREG(st->st_mode))
return (KORE_RESULT_ERROR);
if (S_ISDIR(st->st_mode)) {
kore_module_load(module, NULL, KORE_MODULE_PYTHON);
if (chdir(module) == -1)
fatal("chdir(%s): %s", module, errno_s);
} else {
if ((ext = strrchr(module, '.')) == NULL)
return (KORE_RESULT_ERROR);
if (strcasecmp(ext, ".py"))
return (KORE_RESULT_ERROR);
kore_module_load(module, NULL, KORE_MODULE_PYTHON);
}
#if !defined(KORE_SINGLE_BINARY)
kore_pymodule = module;
#endif
kore_hooks_set(KORE_PYTHON_CONFIG_HOOK,
KORE_PYTHON_TEARDOWN_HOOK, KORE_PYTHON_DAEMONIZED_HOOK);
return (KORE_RESULT_OK);
}
static int
python_runtime_http_request(void *addr, struct http_request *req)
{
int ret, idx, cnt;
2019-06-07 21:06:54 +02:00
PyObject *pyret, *args, *callable;
PyObject *cargs[HTTP_CAPTURE_GROUPS + 1];
if (req->py_coro != NULL) {
python_coro_wakeup(req->py_coro);
if (python_coro_run(req->py_coro) == KORE_RESULT_OK) {
kore_python_coro_delete(req->py_coro);
req->py_coro = NULL;
2019-06-07 21:06:54 +02:00
if (req->fsm_state != PYHTTP_STATE_PREPROCESS)
return (KORE_RESULT_OK);
}
return (KORE_RESULT_RETRY);
}
2019-06-07 21:06:54 +02:00
switch (req->fsm_state) {
case PYHTTP_STATE_INIT:
2019-06-07 21:33:00 +02:00
req->py_rqnext = TAILQ_FIRST(&prereq);
2019-06-07 21:06:54 +02:00
req->fsm_state = PYHTTP_STATE_PREPROCESS;
if (req->py_req == NULL) {
if ((req->py_req = pyhttp_request_alloc(req)) == NULL)
fatal("%s: pyreq alloc failed", __func__);
}
2019-06-07 21:06:54 +02:00
/* fallthrough */
case PYHTTP_STATE_PREPROCESS:
ret = pyhttp_preprocess(req);
switch (ret) {
case KORE_RESULT_OK:
req->fsm_state = PYHTTP_STATE_RUN;
break;
case KORE_RESULT_RETRY:
return (KORE_RESULT_RETRY);
case KORE_RESULT_ERROR:
return (KORE_RESULT_OK);
2019-06-07 21:06:54 +02:00
default:
fatal("invalid state pyhttp state %d", req->fsm_state);
}
2019-06-07 21:06:54 +02:00
/* fallthrough */
case PYHTTP_STATE_RUN:
break;
}
cnt = 0;
2019-06-07 21:06:54 +02:00
callable = (PyObject *)addr;
/* starts at 1 to skip the full path. */
if (req->rt->type == HANDLER_TYPE_DYNAMIC) {
for (idx = 1; idx < HTTP_CAPTURE_GROUPS - 1; idx++) {
if (req->cgroups[idx].rm_so == -1 ||
req->cgroups[idx].rm_eo == -1)
break;
cargs[cnt] = PyUnicode_FromStringAndSize(req->path +
req->cgroups[idx].rm_so,
req->cgroups[idx].rm_eo - req->cgroups[idx].rm_so);
if (cargs[cnt] == NULL) {
while (cnt >= 0)
Py_XDECREF(cargs[cnt--]);
kore_python_log_error("http request");
http_response(req,
HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
return (KORE_RESULT_OK);
}
cnt++;
}
}
cargs[cnt] = NULL;
if ((args = PyTuple_New(cnt + 1)) == NULL)
fatal("%s: PyTuple_New failed", __func__);
2019-06-07 21:06:54 +02:00
Py_INCREF(req->py_req);
if (PyTuple_SetItem(args, 0, req->py_req) != 0)
fatal("python_runtime_http_request: PyTuple_SetItem failed");
for (idx = 0; cargs[idx] != NULL; idx++) {
if (PyTuple_SetItem(args, 1 + idx, cargs[idx]) != 0)
fatal("%s: PyTuple_SetItem failed (%d)", __func__, idx);
}
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_http_request");
http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
return (KORE_RESULT_OK);
}
if (PyCoro_CheckExact(pyret)) {
req->py_coro = python_coro_create(pyret, req);
if (python_coro_run(req->py_coro) == KORE_RESULT_OK) {
http_request_wakeup(req);
kore_python_coro_delete(req->py_coro);
req->py_coro = NULL;
return (KORE_RESULT_OK);
}
return (KORE_RESULT_RETRY);
}
if (pyret != Py_None)
fatal("python_runtime_http_request: unexpected return type");
Py_DECREF(pyret);
return (KORE_RESULT_OK);
}
static void
python_runtime_http_request_free(void *addr, struct http_request *req)
{
PyObject *ret;
if (req->py_req == NULL) {
if ((req->py_req = pyhttp_request_alloc(req)) == NULL)
fatal("%s: pyreq alloc failed", __func__);
}
PyErr_Clear();
ret = PyObject_CallFunctionObjArgs(addr, req->py_req, NULL);
if (ret == NULL)
kore_python_log_error("python_runtime_http_request_free");
Py_XDECREF(ret);
}
static void
python_runtime_http_body_chunk(void *addr, struct http_request *req,
const void *data, size_t len)
{
PyObject *args, *ret;
if (req->py_req == NULL) {
if ((req->py_req = pyhttp_request_alloc(req)) == NULL)
fatal("%s: pyreq alloc failed", __func__);
}
if ((args = Py_BuildValue("(Oy#)", req->py_req, data, len)) == NULL) {
kore_python_log_error("python_runtime_http_body_chunk");
return;
}
PyErr_Clear();
ret = PyObject_Call(addr, args, NULL);
if (ret == NULL)
kore_python_log_error("python_runtime_http_body_chunk");
Py_XDECREF(ret);
Py_DECREF(args);
}
static int
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
python_runtime_validator(void *addr, struct http_request *req, const void *data)
{
int ret;
struct python_coro *coro;
2019-06-07 21:06:54 +02:00
PyObject *pyret, *args, *callable, *arg;
if (req->py_req == NULL) {
if ((req->py_req = pyhttp_request_alloc(req)) == NULL)
fatal("%s: pyreq alloc failed", __func__);
}
2017-01-25 21:17:12 +01:00
if (req->py_validator != NULL) {
coro = req->py_validator;
python_coro_wakeup(coro);
if (python_coro_run(coro) == KORE_RESULT_OK) {
ret = python_validator_check(coro->result);
kore_python_coro_delete(coro);
req->py_validator = NULL;
return (ret);
}
2017-01-25 21:17:12 +01:00
return (KORE_RESULT_RETRY);
}
callable = (PyObject *)addr;
2017-01-25 21:17:12 +01:00
if (req->flags & HTTP_VALIDATOR_IS_REQUEST) {
if ((args = PyTuple_New(1)) == NULL)
fatal("%s: PyTuple_New failed", __func__);
2019-06-07 21:06:54 +02:00
Py_INCREF(req->py_req);
if (PyTuple_SetItem(args, 0, req->py_req) != 0)
fatal("%s: PyTuple_SetItem failed", __func__);
2017-01-25 21:17:12 +01:00
} else {
if ((arg = PyUnicode_FromString(data)) == NULL)
fatal("python_runtime_validator: PyUnicode failed");
2017-01-25 21:17:12 +01:00
if ((args = PyTuple_New(2)) == NULL)
fatal("%s: PyTuple_New failed", __func__);
2017-01-25 21:17:12 +01:00
2019-06-07 21:06:54 +02:00
Py_INCREF(req->py_req);
if (PyTuple_SetItem(args, 0, req->py_req) != 0 ||
PyTuple_SetItem(args, 1, arg) != 0)
fatal("%s: PyTuple_SetItem failed", __func__);
}
2017-01-25 21:17:12 +01:00
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_validator");
fatal("failed to execute python call");
2017-01-25 21:17:12 +01:00
}
if (PyCoro_CheckExact(pyret)) {
coro = python_coro_create(pyret, req);
req->py_validator = coro;
if (python_coro_run(coro) == KORE_RESULT_OK) {
http_request_wakeup(req);
ret = python_validator_check(coro->result);
kore_python_coro_delete(coro);
req->py_validator = NULL;
return (ret);
}
return (KORE_RESULT_RETRY);
}
2017-01-25 21:17:12 +01:00
ret = python_validator_check(pyret);
2017-01-25 21:17:12 +01:00
Py_DECREF(pyret);
return (ret);
}
static int
python_validator_check(PyObject *obj)
{
int ret;
if (obj == NULL)
return (KORE_RESULT_ERROR);
if (!PyBool_Check(obj)) {
kore_log(LOG_WARNING,
"validator did not return True/False");
ret = KORE_RESULT_ERROR;
}
if (obj == Py_True)
ret = KORE_RESULT_OK;
else
ret = KORE_RESULT_ERROR;
return (ret);
}
static void
python_runtime_wsmessage(void *addr, struct connection *c, u_int8_t op,
const void *data, size_t len)
{
PyObject *callable, *args, *pyret, *pyc, *pyop, *pydata;
callable = (PyObject *)addr;
if ((pyc = pyconnection_alloc(c)) == NULL)
fatal("python_runtime_wsmessage: pyc alloc failed");
if ((pyop = PyLong_FromLong((long)op)) == NULL)
fatal("python_runtime_wsmessage: PyLong_FromLong failed");
switch (op) {
case WEBSOCKET_OP_TEXT:
if ((pydata = PyUnicode_FromStringAndSize(data, len)) == NULL)
fatal("wsmessage: PyUnicode_AsUTF8AndSize failed");
break;
case WEBSOCKET_OP_BINARY:
if ((pydata = PyBytes_FromStringAndSize(data, len)) == NULL)
fatal("wsmessage: PyBytes_FromString failed");
break;
default:
fatal("python_runtime_wsmessage: invalid op");
}
if ((args = PyTuple_New(3)) == NULL)
fatal("python_runtime_wsmessage: PyTuple_New failed");
if (PyTuple_SetItem(args, 0, pyc) != 0 ||
PyTuple_SetItem(args, 1, pyop) != 0 ||
PyTuple_SetItem(args, 2, pydata) != 0)
fatal("python_runtime_wsmessage: PyTuple_SetItem failed");
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_wsconnect");
fatal("failed to execute python call");
}
Py_DECREF(pyret);
}
static void
python_runtime_execute(void *addr)
{
PyObject *callable, *args, *pyret;
callable = (PyObject *)addr;
if ((args = PyTuple_New(0)) == NULL)
fatal("python_runtime_execute: PyTuple_New failed");
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_execute");
fatal("failed to execute python call");
}
Py_DECREF(pyret);
}
static void
python_runtime_configure(void *addr, int argc, char **argv)
{
int i;
PyObject *callable, *args, *pyret, *pyarg, *list;
callable = (PyObject *)addr;
if ((args = PyTuple_New(1)) == NULL)
fatal("python_runtime_configure: PyTuple_New failed");
if ((list = PyList_New(argc + 1)) == NULL)
fatal("python_runtime_configure: PyList_New failed");
if ((pyarg = PyUnicode_FromString(__progname)) == NULL)
fatal("python_runtime_configure: PyUnicode_FromString");
if (PyList_SetItem(list, 0, pyarg) == -1)
fatal("python_runtime_configure: PyList_SetItem");
for (i = 0; i < argc; i++) {
if ((pyarg = PyUnicode_FromString(argv[i])) == NULL)
fatal("python_runtime_configure: PyUnicode_FromString");
if (PyList_SetItem(list, i + 1, pyarg) == -1)
fatal("python_runtime_configure: PyList_SetItem");
}
if (PyTuple_SetItem(args, 0, list) != 0)
fatal("python_runtime_configure: PyTuple_SetItem");
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_configure");
fatal("failed to configure your application");
}
Py_DECREF(pyret);
}
static int
python_runtime_onload(void *addr, int action)
{
int ret;
PyObject *pyret, *args, *pyact, *callable;
callable = (PyObject *)addr;
if ((pyact = PyLong_FromLong(action)) == NULL)
fatal("python_runtime_onload: PyLong_FromLong failed");
if ((args = PyTuple_New(1)) == NULL)
fatal("python_runtime_onload: PyTuple_New failed");
if (PyTuple_SetItem(args, 0, pyact) != 0)
fatal("python_runtime_onload: PyTuple_SetItem failed");
2017-01-25 21:17:12 +01:00
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_onload");
return (KORE_RESULT_ERROR);
}
if (!PyLong_Check(pyret))
fatal("python_runtime_onload: unexpected return type");
ret = (int)PyLong_AsLong(pyret);
Py_DECREF(pyret);
return (ret);
}
static void
python_runtime_connect(void *addr, struct connection *c)
{
PyObject *pyc, *pyret, *args, *callable;
callable = (PyObject *)addr;
if ((pyc = pyconnection_alloc(c)) == NULL)
fatal("python_runtime_connect: pyc alloc failed");
if ((args = PyTuple_New(1)) == NULL)
fatal("python_runtime_connect: PyTuple_New failed");
if (PyTuple_SetItem(args, 0, pyc) != 0)
fatal("python_runtime_connect: PyTuple_SetItem failed");
2017-01-25 21:17:12 +01:00
PyErr_Clear();
pyret = PyObject_Call(callable, args, NULL);
Py_DECREF(args);
if (pyret == NULL) {
kore_python_log_error("python_runtime_connect");
kore_connection_disconnect(c);
}
Py_DECREF(pyret);
}
static void
python_runtime_signal(void *addr, int sig)
{
PyObject *obj, *ret;
if ((obj = Py_BuildValue("i", sig)) == NULL) {
kore_python_log_error("python_runtime_signal");
return;
}
ret = PyObject_CallFunctionObjArgs(addr, obj, NULL);
Py_DECREF(obj);
Py_XDECREF(ret);
}
2021-07-10 10:02:46 +02:00
PyMODINIT_FUNC
python_module_init(void)
{
int i;
struct pyconfig *config;
PyObject *pykore;
if ((pykore = PyModule_Create(&pykore_module)) == NULL)
fatal("python_module_init: failed to setup pykore module");
python_push_type("pyproc", pykore, &pyproc_type);
python_push_type("pylock", pykore, &pylock_type);
2018-10-23 20:44:43 +02:00
python_push_type("pytimer", pykore, &pytimer_type);
python_push_type("pyqueue", pykore, &pyqueue_type);
python_push_type("pyroute", pykore, &pyroute_type);
python_push_type("pysocket", pykore, &pysocket_type);
python_push_type("pydomain", pykore, &pydomain_type);
python_push_type("pyconnection", pykore, &pyconnection_type);
#if defined(__linux__)
python_push_type("pyseccomp", pykore, &pyseccomp_type);
#endif
#if defined(KORE_USE_CURL)
python_push_type("pycurlhandle", pykore, &pycurl_handle_type);
python_push_type("pyhttpclient", pykore, &pyhttp_client_type);
for (i = 0; py_curlopt[i].name != NULL; i++) {
python_push_integer(pykore, py_curlopt[i].name,
py_curlopt[i].value);
}
#endif
python_push_type("pyhttp_file", pykore, &pyhttp_file_type);
python_push_type("pyhttp_request", pykore, &pyhttp_request_type);
for (i = 0; python_integers[i].symbol != NULL; i++) {
python_push_integer(pykore, python_integers[i].symbol,
python_integers[i].value);
}
if ((config = PyObject_New(struct pyconfig, &pyconfig_type)) == NULL)
fatal("failed to create config object");
if (PyObject_SetAttrString(pykore, "config", (PyObject *)config) == -1)
fatal("failed to add config object");
return (pykore);
}
static int
pyconfig_setattr(PyObject *self, PyObject *attr, PyObject *val)
{
char *v;
int ret;
PyObject *repr;
const char *name, *value;
ret = -1;
repr = NULL;
if (!PyUnicode_Check(attr))
fatal("setattr: attribute name not a unicode string");
if (PyLong_CheckExact(val)) {
if ((repr = PyObject_Repr(val)) == NULL)
return (-1);
value = PyUnicode_AsUTF8(repr);
} else if (PyUnicode_CheckExact(val)) {
value = PyUnicode_AsUTF8(val);
} else if (PyBool_Check(val)) {
if (val == Py_False)
value = "False";
else
value = "True";
} else {
fatal("invalid object, config expects integer, bool or string");
}
name = PyUnicode_AsUTF8(attr);
v = kore_strdup(value);
if (!kore_configure_setting(name, v)) {
ret = -1;
PyErr_SetString(PyExc_RuntimeError,
"configured cannot be changed at runtime");
} else {
ret = 0;
}
kore_free(v);
Py_XDECREF(repr);
return (ret);
}
static void
python_append_path(const char *path)
{
PyObject *mpath, *spath;
if ((mpath = PyUnicode_FromString(path)) == NULL)
fatal("python_append_path: PyUnicode_FromString failed");
if ((spath = PySys_GetObject("path")) == NULL)
fatal("python_append_path: PySys_GetObject failed");
PyList_Append(spath, mpath);
Py_DECREF(mpath);
}
static void
python_push_type(const char *name, PyObject *module, PyTypeObject *type)
{
if (PyType_Ready(type) == -1)
fatal("python_push_type: failed to ready %s", name);
Py_INCREF(type);
if (PyModule_AddObject(module, name, (PyObject *)type) == -1)
fatal("python_push_type: failed to push %s", name);
}
static void
python_push_integer(PyObject *module, const char *name, long value)
{
if (PyModule_AddIntConstant(module, name, value) == -1)
fatal("python_push_integer: failed to add %s", name);
}
#if defined(KORE_USE_PGSQL)
static PyObject *
python_kore_pgsql_register(PyObject *self, PyObject *args)
{
const char *db, *conninfo;
if (!PyArg_ParseTuple(args, "ss", &db, &conninfo))
return (NULL);
(void)kore_pgsql_register(db, conninfo);
Py_RETURN_TRUE;
}
#endif
static PyObject *
python_kore_app(PyObject *self, PyObject *args)
{
PyObject *obj;
if (!PyArg_ParseTuple(args, "O", &obj)) {
PyErr_Clear();
if (kore_app == NULL)
Py_RETURN_NONE;
Py_INCREF(kore_app);
return (kore_app);
}
Py_XDECREF(kore_app);
kore_app = obj;
Py_INCREF(kore_app);
Py_RETURN_TRUE;
}
static PyObject *
python_kore_log(PyObject *self, PyObject *args)
{
int prio;
const char *message;
if (!PyArg_ParseTuple(args, "is", &prio, &message))
return (NULL);
kore_log(prio, "%s", message);
Py_RETURN_TRUE;
}
static PyObject *
python_kore_time(PyObject *self, PyObject *args)
{
u_int64_t now;
now = kore_time_ms();
return (PyLong_FromUnsignedLongLong(now));
}
static PyObject *
python_kore_server(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct kore_server *srv;
const char *name, *ip, *port, *path;
if (kwargs == NULL) {
PyErr_SetString(PyExc_RuntimeError, "missing keyword args");
return (NULL);
}
ip = python_string_from_dict(kwargs, "ip");
path = python_string_from_dict(kwargs, "path");
if (ip == NULL && path == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"missing ip or path keywords");
return (NULL);
}
if (ip != NULL && path != NULL) {
PyErr_SetString(PyExc_RuntimeError, "ip/path are exclusive");
return (NULL);
}
name = python_string_from_dict(kwargs, "name");
if (name == NULL)
name = "default";
if ((srv = kore_server_lookup(name)) != NULL) {
PyErr_Format(PyExc_RuntimeError,
"server '%s' already exist", name);
return (NULL);
}
srv = kore_server_create(name);
python_bool_from_dict(kwargs, "tls", &srv->tls);
if (srv->tls && !kore_tls_supported()) {
kore_server_free(srv);
PyErr_SetString(PyExc_RuntimeError,
"TLS not supported in this Kore build");
return (NULL);
}
if (ip != NULL) {
if ((port = python_string_from_dict(kwargs, "port")) == NULL) {
kore_server_free(srv);
PyErr_SetString(PyExc_RuntimeError,
"missing or invalid 'port' keyword");
return (NULL);
}
if (!kore_server_bind(srv, ip, port, NULL)) {
PyErr_Format(PyExc_RuntimeError,
"failed to bind to '%s:%s'", ip, port);
return (NULL);
}
} else {
if (!kore_server_bind_unix(srv, path, NULL)) {
PyErr_Format(PyExc_RuntimeError,
"failed to bind to '%s'", path);
return (NULL);
}
}
kore_server_finalize(srv);
Py_RETURN_NONE;
}
static PyObject *
python_kore_privsep(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct kore_privsep *ps;
const char *val;
PyObject *skip, *obj;
Py_ssize_t list_len, idx;
if (!PyArg_ParseTuple(args, "s", &val))
return (NULL);
if (!strcmp(val, "worker")) {
ps = &worker_privsep;
} else if (!strcmp(val, "keymgr")) {
ps = &keymgr_privsep;
#if defined(KORE_USE_ACME)
2021-09-07 22:19:21 +02:00
} else if (!strcmp(val, "acme")) {
ps = &acme_privsep;
#endif
} else {
PyErr_Format(PyExc_RuntimeError,
"unknown privsep process '%s'", val);
return (NULL);
}
if ((val = python_string_from_dict(kwargs, "root")) != NULL) {
kore_free(ps->root);
ps->root = kore_strdup(val);
}
if ((val = python_string_from_dict(kwargs, "runas")) != NULL) {
kore_free(ps->runas);
ps->runas = kore_strdup(val);
}
if ((skip = PyDict_GetItemString(kwargs, "skip")) != NULL) {
if (!PyList_CheckExact(skip)) {
PyErr_Format(PyExc_RuntimeError,
"privsep skip keyword needs to be a list");
return (NULL);
}
list_len = PyList_Size(skip);
for (idx = 0; idx < list_len; idx++) {
if ((obj = PyList_GetItem(skip, idx)) == NULL)
return (NULL);
if (!PyUnicode_Check(obj))
return (NULL);
if ((val = PyUnicode_AsUTF8AndSize(obj, NULL)) == NULL)
return (NULL);
if (!strcmp(val, "chroot")) {
ps->skip_chroot = 1;
} else {
PyErr_Format(PyExc_RuntimeError,
"unknown skip keyword '%s'", val);
return (NULL);
}
}
}
Py_RETURN_NONE;
}
static PyObject *
python_kore_prerequest(PyObject *self, PyObject *args)
{
PyObject *f;
struct reqcall *rq;
if (!PyArg_ParseTuple(args, "O", &f))
return (NULL);
rq = kore_calloc(1, sizeof(*rq));
rq->f = f;
Py_INCREF(f);
2019-06-07 21:33:00 +02:00
TAILQ_INSERT_TAIL(&prereq, rq, list);
return (f);
}
static PyObject *
python_kore_task_create(PyObject *self, PyObject *args)
{
PyObject *obj;
struct python_coro *coro;
if (!PyArg_ParseTuple(args, "O", &obj))
return (NULL);
if (!PyCoro_CheckExact(obj))
fatal("%s: object is not a coroutine", __func__);
coro = python_coro_create(obj, NULL);
Py_INCREF(obj);
return (PyLong_FromUnsignedLongLong(coro->id));
}
static PyObject *
python_kore_task_id(PyObject *self, PyObject *args)
{
if (coro_running == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"no coroutine active");
return (NULL);
}
return (PyLong_FromUnsignedLongLong(coro_running->id));
}
static PyObject *
python_kore_task_kill(PyObject *self, PyObject *args)
{
u_int64_t id;
struct python_coro *coro, *active;
if (!PyArg_ParseTuple(args, "K", &id))
return (NULL);
2020-08-12 13:43:38 +02:00
if (coro_running != NULL && coro_running->id == id) {
PyErr_SetString(PyExc_RuntimeError,
"refusing to kill active coroutine");
return (NULL);
}
/* Remember active coro, as delete sets coro_running to NULL. */
active = coro_running;
TAILQ_FOREACH(coro, &coro_runnable, list) {
if (coro->id == id) {
coro->killed++;
kore_python_coro_delete(coro);
coro_running = active;
Py_RETURN_TRUE;
}
}
TAILQ_FOREACH(coro, &coro_suspended, list) {
if (coro->id == id) {
coro->killed++;
kore_python_coro_delete(coro);
coro_running = active;
Py_RETURN_TRUE;
}
}
Py_RETURN_FALSE;
}
static PyObject *
python_kore_socket_wrap(PyObject *self, PyObject *args)
{
struct pysocket *sock;
PyObject *pysock, *pyfd, *pyfam, *pyproto;
sock = NULL;
pyfd = NULL;
pyfam = NULL;
pyproto = NULL;
if (!PyArg_ParseTuple(args, "O", &pysock))
return (NULL);
if ((pyfd = PyObject_CallMethod(pysock, "fileno", NULL)) == NULL)
return (NULL);
if ((pyfam = PyObject_GetAttrString(pysock, "family")) == NULL)
goto out;
if ((pyproto = PyObject_GetAttrString(pysock, "proto")) == NULL)
goto out;
if ((sock = pysocket_alloc()) == NULL)
goto out;
sock->socket = pysock;
Py_INCREF(sock->socket);
sock->fd = (int)PyLong_AsLong(pyfd);
sock->family = (int)PyLong_AsLong(pyfam);
sock->protocol = (int)PyLong_AsLong(pyproto);
memset(&sock->addr, 0, sizeof(sock->addr));
switch (sock->family) {
case AF_INET:
case AF_UNIX:
break;
default:
PyErr_SetString(PyExc_RuntimeError, "unsupported family");
Py_DECREF((PyObject *)sock);
sock = NULL;
goto out;
}
out:
Py_XDECREF(pyfd);
Py_XDECREF(pyfam);
Py_XDECREF(pyproto);
return ((PyObject *)sock);
}
static PyObject *
python_kore_queue(PyObject *self, PyObject *args)
{
struct pyqueue *queue;
if ((queue = PyObject_New(struct pyqueue, &pyqueue_type)) == NULL)
return (NULL);
TAILQ_INIT(&queue->objects);
TAILQ_INIT(&queue->waiting);
return ((PyObject *)queue);
}
static PyObject *
python_kore_worker(PyObject *self, PyObject *args)
{
if (worker == NULL) {
Py_RETURN_NONE;
}
return (PyLong_FromLong(worker->id));
}
2018-11-29 09:51:24 +01:00
static PyObject *
python_kore_tracer(PyObject *self, PyObject *args)
{
PyObject *obj;
if (python_tracer != NULL) {
PyErr_SetString(PyExc_RuntimeError, "tracer already set");
return (NULL);
}
if (!PyArg_ParseTuple(args, "O", &obj))
return (NULL);
if (!PyCallable_Check(obj)) {
PyErr_SetString(PyExc_RuntimeError, "object not callable");
Py_DECREF(obj);
return (NULL);
}
2018-11-29 09:53:26 +01:00
Py_INCREF(obj);
2018-11-29 09:51:24 +01:00
python_tracer = obj;
Py_RETURN_TRUE;
}
static PyObject *
python_kore_domain(PyObject *self, PyObject *args, PyObject *kwargs)
{
2019-11-13 23:01:24 +01:00
#if defined(KORE_USE_ACME)
int acme;
char *acert, *akey;
#endif
struct kore_server *srv;
long depth;
const char *name;
struct pydomain *domain;
const char *cert, *key, *ca, *attach, *crl;
ca = NULL;
depth = -1;
key = NULL;
crl = NULL;
cert = NULL;
attach = NULL;
2019-11-15 07:49:16 +01:00
#if defined(KORE_USE_ACME)
acme = 0;
#endif
if (!PyArg_ParseTuple(args, "s", &name))
return (NULL);
if (kwargs != NULL)
attach = python_string_from_dict(kwargs, "attach");
if (attach == NULL)
attach = "default";
if ((srv = kore_server_lookup(attach)) == NULL) {
PyErr_Format(PyExc_RuntimeError,
"server '%s' does not exist", attach);
return (NULL);
}
if (srv->tls) {
if (kwargs == NULL) {
PyErr_Format(PyExc_RuntimeError,
"no keywords for TLS enabled domain %s", name);
return (NULL);
}
key = python_string_from_dict(kwargs, "key");
cert = python_string_from_dict(kwargs, "cert");
2019-11-13 23:01:24 +01:00
#if defined(KORE_USE_ACME)
python_bool_from_dict(kwargs, "acme", &acme);
if (acme) {
kore_acme_get_paths(name, &akey, &acert);
acme_domains++;
2019-11-13 23:01:24 +01:00
key = akey;
cert = acert;
}
#endif
if (key == NULL || cert == NULL) {
PyErr_Format(PyExc_RuntimeError,
"missing key or cert keywords for TLS listener");
return (NULL);
}
ca = python_string_from_dict(kwargs, "client_verify");
if (ca != NULL) {
python_long_from_dict(kwargs, "verify_depth", &depth);
if (depth < 0) {
PyErr_Format(PyExc_RuntimeError,
"invalid depth '%d'", depth);
return (NULL);
}
crl = python_string_from_dict(kwargs, "crl");
}
} else if (key != NULL || cert != NULL || ca != NULL) {
kore_log(LOG_INFO, "ignoring tls settings for '%s'", name);
}
if (kore_domain_lookup(srv, name) != NULL) {
PyErr_SetString(PyExc_RuntimeError, "domain exists");
return (NULL);
}
if ((domain = PyObject_New(struct pydomain, &pydomain_type)) == NULL)
return (NULL);
domain->next = NULL;
domain->kwargs = NULL;
if ((domain->config = kore_domain_new(name)) == NULL)
fatal("failed to create new domain configuration");
if (!kore_domain_attach(domain->config, srv))
fatal("failed to attach domain configuration");
if (srv->tls) {
domain->config->certkey = kore_strdup(key);
domain->config->certfile = kore_strdup(cert);
2019-11-13 23:01:24 +01:00
#if defined(KORE_USE_ACME)
domain->config->acme = acme;
if (domain->config->acme) {
kore_free(akey);
kore_free(acert);
}
#endif
if (ca != NULL) {
domain->config->cafile = kore_strdup(ca);
domain->config->x509_verify_depth = depth;
if (crl != NULL)
domain->config->crlfile = kore_strdup(crl);
}
}
return ((PyObject *)domain);
}
static PyObject *
python_kore_route(PyObject *self, PyObject *args, PyObject *kwargs)
{
const char *path;
PyObject *inner;
struct pyroute *route;
if ((route = PyObject_New(struct pyroute, &pyroute_type)) == NULL)
return (NULL);
if (!PyArg_ParseTuple(args, "s", &path))
return (NULL);
route->domain = NULL;
route->kwargs = kwargs;
route->path = kore_strdup(path);
Py_XINCREF(route->kwargs);
inner = PyObject_GetAttrString((PyObject *)route, "inner");
if (inner == NULL) {
Py_DECREF((PyObject *)route);
PyErr_SetString(PyExc_RuntimeError, "failed to find inner");
return (NULL);
}
return (inner);
}
static PyObject *
python_kore_gather(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct pygather_op *op;
PyObject *obj;
struct pygather_coro *coro;
Py_ssize_t sz, idx;
int concurrency;
if (coro_running == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"kore.gather only available in coroutines");
return (NULL);
}
sz = PyTuple_Size(args);
if (sz > INT_MAX) {
PyErr_SetString(PyExc_TypeError, "too many arguments");
return (NULL);
}
if (kwargs != NULL &&
(obj = PyDict_GetItemString(kwargs, "concurrency")) != NULL) {
if (!PyLong_Check(obj)) {
PyErr_SetString(PyExc_TypeError,
"concurrency level must be an integer");
return (NULL);
}
PyErr_Clear();
concurrency = (int)PyLong_AsLong(obj);
if (concurrency == -1 && PyErr_Occurred())
return (NULL);
if (concurrency == 0)
concurrency = sz;
} else {
concurrency = sz;
}
op = PyObject_New(struct pygather_op, &pygather_op_type);
if (op == NULL)
return (NULL);
op->running = 0;
op->count = (int)sz;
op->coro = coro_running;
op->concurrency = concurrency;
TAILQ_INIT(&op->results);
TAILQ_INIT(&op->coroutines);
for (idx = 0; idx < sz; idx++) {
2018-10-29 21:18:38 +01:00
if ((obj = PyTuple_GetItem(args, idx)) == NULL) {
Py_DECREF((PyObject *)op);
return (NULL);
2018-10-29 21:18:38 +01:00
}
if (!PyCoro_CheckExact(obj)) {
2018-10-29 21:18:38 +01:00
Py_DECREF((PyObject *)op);
PyErr_SetString(PyExc_TypeError, "not a coroutine");
return (NULL);
}
Py_INCREF(obj);
coro = kore_pool_get(&gather_coro_pool);
coro->coro = python_coro_create(obj, NULL);
coro->coro->gatherop = op;
TAILQ_INSERT_TAIL(&op->coroutines, coro, list);
if (idx > concurrency - 1)
python_coro_suspend(coro->coro);
else
op->running++;
}
return ((PyObject *)op);
}
static PyObject *
python_kore_lock(PyObject *self, PyObject *args)
{
struct pylock *lock;
if ((lock = PyObject_New(struct pylock, &pylock_type)) == NULL)
return (NULL);
lock->owner = NULL;
TAILQ_INIT(&lock->ops);
return ((PyObject *)lock);
}
static PyObject *
python_kore_fatal(PyObject *self, PyObject *args)
{
const char *reason;
if (!PyArg_ParseTuple(args, "s", &reason))
reason = "python_kore_fatal: PyArg_ParseTuple failed";
fatal("%s", reason);
/* not reached */
Py_RETURN_TRUE;
}
static PyObject *
python_kore_fatalx(PyObject *self, PyObject *args)
{
const char *reason;
if (!PyArg_ParseTuple(args, "s", &reason))
reason = "python_kore_fatalx: PyArg_ParseTuple failed";
fatalx("%s", reason);
/* not reached */
Py_RETURN_TRUE;
}
2019-09-04 20:37:33 +02:00
static PyObject *
python_kore_setname(PyObject *self, PyObject *args)
{
const char *name;
extern char *kore_progname;
if (!PyArg_ParseTuple(args, "s", &name))
return (NULL);
kore_free(kore_progname);
kore_progname = kore_strdup(name);
Py_RETURN_NONE;
}
static PyObject *
python_kore_sigtrap(PyObject *self, PyObject *args)
{
int sig;
if (!PyArg_ParseTuple(args, "i", &sig))
return (NULL);
kore_signal_trap(sig);
Py_RETURN_NONE;
}
static PyObject *
python_kore_sendobj(PyObject *self, PyObject *args, PyObject *kwargs)
{
long val;
u_int16_t dst;
char *ptr;
Py_ssize_t length;
PyObject *object, *bytes;
if (!PyArg_ParseTuple(args, "O", &object))
return (NULL);
bytes = PyObject_CallFunctionObjArgs(pickle_dumps, object, NULL);
if (bytes == NULL)
return (NULL);
if (PyBytes_AsStringAndSize(bytes, &ptr, &length) == -1) {
Py_DECREF(bytes);
return (NULL);
}
dst = KORE_MSG_WORKER_ALL;
if (kwargs != NULL) {
if (python_long_from_dict(kwargs, "worker", &val)) {
if (val <= 0 || val > worker_count ||
val >= KORE_WORKER_MAX) {
PyErr_Format(PyExc_RuntimeError,
"worker %ld invalid", val);
Py_DECREF(bytes);
return (NULL);
}
dst = val;
}
}
kore_msg_send(dst, KORE_PYTHON_SEND_OBJ, ptr, length);
Py_DECREF(bytes);
Py_RETURN_NONE;
}
static void
python_kore_recvobj(struct kore_msg *msg, const void *data)
{
struct kore_runtime *rt;
PyObject *onmsg, *ret, *bytes, *obj;
if ((onmsg = kore_module_getsym("koreapp.onmsg", &rt)) == NULL)
return;
if (rt->type != KORE_RUNTIME_PYTHON)
return;
if ((bytes = PyBytes_FromStringAndSize(data, msg->length)) == NULL) {
Py_DECREF(onmsg);
kore_python_log_error("koreapp.onmsg");
return;
}
obj = PyObject_CallFunctionObjArgs(pickle_loads, bytes, NULL);
Py_DECREF(bytes);
if (obj == NULL) {
Py_DECREF(onmsg);
kore_python_log_error("koreapp.onmsg");
return;
}
ret = PyObject_CallFunctionObjArgs(onmsg, obj, NULL);
kore_python_log_error("koreapp.onmsg");
Py_DECREF(obj);
Py_DECREF(onmsg);
Py_XDECREF(ret);
}
static PyObject *
python_kore_suspend(PyObject *self, PyObject *args)
{
struct pysuspend_op *op;
int delay;
if (!PyArg_ParseTuple(args, "i", &delay))
return (NULL);
op = PyObject_New(struct pysuspend_op, &pysuspend_op_type);
if (op == NULL)
return (NULL);
op->timer = NULL;
op->delay = delay;
op->coro = coro_running;
op->state = PYSUSPEND_OP_INIT;
return ((PyObject *)op);
}
static PyObject *
python_kore_shutdown(PyObject *self, PyObject *args)
{
kore_shutdown();
Py_RETURN_TRUE;
}
static PyObject *
python_kore_coroname(PyObject *self, PyObject *args)
{
const char *name;
if (coro_running == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"kore.coroname() only available in coroutines");
return (NULL);
}
if (!PyArg_ParseTuple(args, "s", &name))
return (NULL);
kore_free(coro_running->name);
coro_running->name = kore_strdup(name);
Py_RETURN_NONE;
}
static PyObject *
python_kore_corotrace(PyObject *self, PyObject *args)
{
if (!PyArg_ParseTuple(args, "b", &coro_tracing))
return (NULL);
Py_RETURN_NONE;
}
2018-10-23 20:44:43 +02:00
static PyObject *
python_kore_timer(PyObject *self, PyObject *args, PyObject *kwargs)
2018-10-23 20:44:43 +02:00
{
u_int64_t ms;
PyObject *obj;
int flags;
struct pytimer *timer;
if (worker == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"kore.timer not supported on parent process");
return (NULL);
}
2018-10-23 20:44:43 +02:00
if (!PyArg_ParseTuple(args, "OKi", &obj, &ms, &flags))
return (NULL);
if (flags & ~(KORE_TIMER_FLAGS)) {
PyErr_SetString(PyExc_RuntimeError, "invalid flags");
return (NULL);
}
if ((timer = PyObject_New(struct pytimer, &pytimer_type)) == NULL)
return (NULL);
timer->udata = NULL;
2018-10-23 20:44:43 +02:00
timer->flags = flags;
timer->callable = obj;
timer->run = kore_timer_add(pytimer_run, ms, timer, flags);
Py_INCREF((PyObject *)timer);
2018-10-23 20:44:43 +02:00
Py_INCREF(timer->callable);
if (kwargs != NULL) {
if ((obj = PyDict_GetItemString(kwargs, "data")) != NULL) {
Py_INCREF(obj);
timer->udata = obj;
}
}
2018-10-23 20:44:43 +02:00
return ((PyObject *)timer);
}
static PyObject *
python_kore_proc(PyObject *self, PyObject *args, PyObject *kwargs)
{
union deconst cp;
const char *cmd;
struct pyproc *proc;
Py_ssize_t idx, len;
PyObject *obj, *item;
int timeo, in_pipe[2], out_pipe[2];
char *copy, *argv[32], *env[PYTHON_PROC_MAX_ENV + 1];
timeo = -1;
if (coro_running == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"kore.proc only available in coroutines");
return (NULL);
}
if (!PyArg_ParseTuple(args, "s|i", &cmd, &timeo))
return (NULL);
if (kwargs != NULL &&
(obj = PyDict_GetItemString(kwargs, "env")) != NULL) {
if (!PyList_CheckExact(obj)) {
PyErr_SetString(PyExc_RuntimeError,
"kore.proc: env is not of type 'list'");
return (NULL);
}
len = PyList_Size(obj);
if (len > PYTHON_PROC_MAX_ENV) {
PyErr_SetString(PyExc_RuntimeError,
"kore.proc: too many entries in 'env' keyword");
return (NULL);
}
for (idx = 0; idx < len; idx++) {
if ((item = PyList_GetItem(obj, idx)) == NULL)
return (NULL);
if (!PyUnicode_CheckExact(item))
return (NULL);
if ((cp.cp = PyUnicode_AsUTF8(item)) == NULL)
return (NULL);
env[idx] = cp.p;
}
env[idx] = NULL;
}
if (pipe(in_pipe) == -1) {
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
if (pipe(out_pipe) == -1) {
close(in_pipe[0]);
close(in_pipe[1]);
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
if ((proc = PyObject_New(struct pyproc, &pyproc_type)) == NULL) {
close(in_pipe[0]);
close(in_pipe[1]);
close(out_pipe[0]);
close(out_pipe[1]);
return (NULL);
}
proc->pid = -1;
proc->op = NULL;
proc->apid = -1;
proc->reaped = 0;
proc->status = 0;
proc->timer = NULL;
proc->coro = coro_running;
proc->in = pysocket_alloc();
proc->out = pysocket_alloc();
if (proc->in == NULL || proc->out == NULL) {
Py_DECREF((PyObject *)proc);
return (NULL);
}
TAILQ_INSERT_TAIL(&procs, proc, list);
proc->pid = fork();
if (proc->pid == -1) {
if (errno == ENOSYS) {
Py_DECREF((PyObject *)proc);
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
fatal("python_kore_proc: fork(): %s", errno_s);
}
if (proc->pid == 0) {
close(in_pipe[1]);
close(out_pipe[0]);
if (dup2(out_pipe[1], STDOUT_FILENO) == -1 ||
dup2(out_pipe[1], STDERR_FILENO) == -1 ||
dup2(in_pipe[0], STDIN_FILENO) == -1)
fatal("dup2: %s", errno_s);
copy = kore_strdup(cmd);
python_split_arguments(copy, argv, 32);
2022-02-20 21:20:14 +01:00
(void)execve(argv[0], argv, env);
kore_log(LOG_ERR, "kore.proc failed to execute %s (%s)",
argv[0], errno_s);
exit(1);
}
close(in_pipe[0]);
close(out_pipe[1]);
if (!kore_connection_nonblock(in_pipe[1], 0) ||
!kore_connection_nonblock(out_pipe[0], 0))
fatal("failed to mark kore.proc pipes are non-blocking");
proc->apid = proc->pid;
proc->in->fd = in_pipe[1];
proc->out->fd = out_pipe[0];
if (timeo != -1) {
proc->timer = kore_timer_add(pyproc_timeout,
timeo, proc, KORE_TIMER_ONESHOT);
}
return ((PyObject *)proc);
}
static PyObject *
python_import(const char *path)
{
struct stat st;
PyObject *module;
char *dir, *file, *copy, *p;
if (stat(path, &st) == -1)
fatal("python_import: stat(%s): %s", path, errno_s);
if (!S_ISDIR(st.st_mode) && !S_ISREG(st.st_mode))
fatal("python_import: '%s' is not a file or directory", path);
copy = kore_strdup(path);
if ((p = dirname(copy)) == NULL)
fatal("dirname: %s: %s", path, errno_s);
dir = kore_strdup(p);
kore_free(copy);
copy = kore_strdup(path);
if ((p = basename(copy)) == NULL)
fatal("basename: %s: %s", path, errno_s);
file = kore_strdup(p);
kore_free(copy);
if ((p = strrchr(file, '.')) != NULL)
*p = '\0';
python_append_path(dir);
if (S_ISDIR(st.st_mode))
python_append_path(path);
module = PyImport_ImportModule(file);
if (module == NULL)
PyErr_Print();
kore_free(dir);
kore_free(file);
return (module);
}
static PyObject *
python_callable(PyObject *module, const char *symbol)
{
char *base, *method;
PyObject *res, *obj, *meth;
res = NULL;
obj = NULL;
base = kore_strdup(symbol);
if ((method = strchr(base, '.')) != NULL)
*(method)++ = '\0';
if ((obj = PyObject_GetAttrString(module, base)) == NULL)
goto out;
if (method != NULL) {
if ((meth = PyObject_GetAttrString(obj, method)) == NULL)
goto out;
Py_DECREF(obj);
obj = meth;
}
if (!PyCallable_Check(obj))
goto out;
res = obj;
obj = NULL;
out:
if (obj != NULL)
Py_DECREF(obj);
PyErr_Clear();
kore_free(base);
return (res);
}
static PyObject *
pyconnection_alloc(struct connection *c)
{
struct pyconnection *pyc;
pyc = PyObject_New(struct pyconnection, &pyconnection_type);
if (pyc == NULL)
return (NULL);
pyc->c = c;
return ((PyObject *)pyc);
}
static PyObject *
pyconnection_disconnect(struct pyconnection *pyc, PyObject *args)
{
kore_connection_disconnect(pyc->c);
Py_RETURN_TRUE;
}
static PyObject *
pyconnection_get_fd(struct pyconnection *pyc, void *closure)
{
PyObject *fd;
if ((fd = PyLong_FromLong(pyc->c->fd)) == NULL)
return (PyErr_NoMemory());
return (fd);
}
2017-03-24 12:42:40 +01:00
static PyObject *
pyconnection_get_addr(struct pyconnection *pyc, void *closure)
{
void *ptr;
PyObject *result;
char addr[INET6_ADDRSTRLEN];
2018-10-08 20:12:25 +02:00
switch (pyc->c->family) {
2017-03-24 12:42:40 +01:00
case AF_INET:
ptr = &pyc->c->addr.ipv4.sin_addr;
break;
case AF_INET6:
ptr = &pyc->c->addr.ipv6.sin6_addr;
break;
default:
2018-10-08 20:12:25 +02:00
PyErr_SetString(PyExc_RuntimeError, "invalid family");
2017-03-24 12:42:40 +01:00
return (NULL);
}
2018-10-08 20:12:25 +02:00
if (inet_ntop(pyc->c->family, ptr, addr, sizeof(addr)) == NULL) {
2017-03-24 12:42:40 +01:00
PyErr_SetString(PyExc_RuntimeError, "inet_ntop failed");
return (NULL);
}
if ((result = PyUnicode_FromString(addr)) == NULL)
return (PyErr_NoMemory());
return (result);
}
static PyObject *
pyconnection_get_peer_x509(struct pyconnection *pyc, void *closure)
{
size_t len;
u_int8_t *der;
PyObject *bytes;
2022-02-18 10:20:28 +01:00
if (pyc->c->tls_cert == NULL) {
Py_RETURN_NONE;
}
if (!kore_tls_x509_data(pyc->c, &der, &len)) {
PyErr_SetString(PyExc_RuntimeError,
"failed to obtain certificate data");
return (NULL);
}
bytes = PyBytes_FromStringAndSize((char *)der, len);
kore_free(der);
return (bytes);
}
static PyObject *
pyconnection_get_peer_x509dict(struct pyconnection *pyc, void *closure)
{
KORE_X509_NAMES *name;
PyObject *dict, *issuer, *subject, *ret;
ret = NULL;
issuer = NULL;
subject = NULL;
2022-02-18 10:20:28 +01:00
if (pyc->c->tls_cert == NULL) {
Py_RETURN_NONE;
}
if ((dict = PyDict_New()) == NULL)
goto out;
if ((issuer = PyDict_New()) == NULL)
goto out;
if (PyDict_SetItemString(dict, "issuer", issuer) == -1)
goto out;
if ((subject = PyDict_New()) == NULL)
goto out;
if (PyDict_SetItemString(dict, "subject", subject) == -1)
goto out;
PyErr_Clear();
2022-02-18 10:20:28 +01:00
if ((name = kore_tls_x509_subject_name(pyc->c)) == NULL) {
PyErr_Format(PyExc_RuntimeError,
"failed to obtain x509 subjectName");
goto out;
}
if (!kore_tls_x509name_foreach(name, 0, subject,
pyconnection_x509_cb)) {
if (PyErr_Occurred() == NULL) {
PyErr_Format(PyExc_RuntimeError,
2022-04-20 11:10:01 +02:00
"failed to add subject name to dictionary");
}
goto out;
}
2022-02-18 10:20:28 +01:00
if ((name = kore_tls_x509_issuer_name(pyc->c)) == NULL) {
PyErr_Format(PyExc_RuntimeError,
"failed to obtain x509 issuerName");
goto out;
}
if (!kore_tls_x509name_foreach(name, 0, issuer, pyconnection_x509_cb)) {
if (PyErr_Occurred() == NULL) {
PyErr_Format(PyExc_RuntimeError,
2022-04-20 11:10:01 +02:00
"failed to add issuer name to dictionary");
}
goto out;
}
ret = dict;
dict = NULL;
out:
Py_XDECREF(dict);
Py_XDECREF(issuer);
Py_XDECREF(subject);
return (ret);
}
static int
pyconnection_x509_cb(void *udata, int islast, int nid, const char *field,
const void *data, size_t len, int flags)
{
PyObject *dict, *obj;
dict = udata;
if ((obj = PyUnicode_FromStringAndSize(data, len)) == NULL)
return (KORE_RESULT_ERROR);
if (PyDict_SetItemString(dict, field, obj) == -1) {
Py_DECREF(obj);
return (KORE_RESULT_ERROR);
}
Py_DECREF(obj);
return (KORE_RESULT_OK);
}
2018-10-23 20:44:43 +02:00
static void
pytimer_run(void *arg, u_int64_t now)
{
PyObject *ret;
struct pytimer *timer = arg;
PyErr_Clear();
ret = PyObject_CallFunctionObjArgs(timer->callable, timer->udata, NULL);
Py_XDECREF(ret);
Py_XDECREF(timer->udata);
2018-10-23 20:44:43 +02:00
timer->udata = NULL;
kore_python_log_error("pytimer_run");
2018-10-23 20:44:43 +02:00
if (timer->flags & KORE_TIMER_ONESHOT) {
timer->run = NULL;
Py_DECREF((PyObject *)timer);
}
}
static void
pytimer_dealloc(struct pytimer *timer)
{
if (timer->run != NULL) {
kore_timer_remove(timer->run);
timer->run = NULL;
}
if (timer->callable != NULL) {
Py_DECREF(timer->callable);
timer->callable = NULL;
}
PyObject_Del((PyObject *)timer);
}
static PyObject *
pytimer_close(struct pytimer *timer, PyObject *args)
{
if (timer->run != NULL) {
kore_timer_remove(timer->run);
timer->run = NULL;
}
if (timer->callable != NULL) {
Py_DECREF(timer->callable);
timer->callable = NULL;
}
if (timer->udata != NULL) {
Py_DECREF(timer->udata);
timer->udata = NULL;
}
Py_INCREF((PyObject *)timer);
2018-10-23 20:44:43 +02:00
Py_RETURN_TRUE;
}
static void
pysuspend_op_dealloc(struct pysuspend_op *op)
{
if (op->timer != NULL) {
kore_timer_remove(op->timer);
op->timer = NULL;
}
PyObject_Del((PyObject *)op);
}
static PyObject *
pysuspend_op_await(PyObject *sop)
{
Py_INCREF(sop);
return (sop);
}
static PyObject *
pysuspend_op_iternext(struct pysuspend_op *op)
{
switch (op->state) {
case PYSUSPEND_OP_INIT:
op->timer = kore_timer_add(pysuspend_wakeup, op->delay,
op, KORE_TIMER_ONESHOT);
op->state = PYSUSPEND_OP_WAIT;
break;
case PYSUSPEND_OP_WAIT:
break;
case PYSUSPEND_OP_CONTINUE:
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
default:
fatal("unknown state %d for pysuspend_op", op->state);
}
Py_RETURN_NONE;
}
static void
pysuspend_wakeup(void *arg, u_int64_t now)
{
struct pysuspend_op *op = arg;
op->timer = NULL;
op->state = PYSUSPEND_OP_CONTINUE;
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
}
static struct pysocket *
pysocket_alloc(void)
{
struct pysocket *sock;
if ((sock = PyObject_New(struct pysocket, &pysocket_type)) == NULL)
return (NULL);
sock->fd = -1;
sock->family = -1;
sock->protocol = -1;
sock->scheduled = 0;
sock->socket = NULL;
sock->recvop = NULL;
sock->sendop = NULL;
sock->event.s = sock;
sock->event.evt.flags = 0;
sock->event.evt.type = KORE_TYPE_PYSOCKET;
sock->event.evt.handle = pysocket_evt_handle;
return (sock);
}
static void
pysocket_dealloc(struct pysocket *sock)
{
2019-03-27 22:15:35 +01:00
if (sock->scheduled && sock->fd != -1) {
kore_platform_disable_read(sock->fd);
#if !defined(__linux__)
kore_platform_disable_write(sock->fd);
#endif
}
if (sock->socket != NULL) {
Py_DECREF(sock->socket);
} else if (sock->fd != -1) {
(void)close(sock->fd);
}
PyObject_Del((PyObject *)sock);
}
static PyObject *
pysocket_send(struct pysocket *sock, PyObject *args)
{
Py_buffer buf;
PyObject *ret;
if (!PyArg_ParseTuple(args, "y*", &buf))
return (NULL);
ret = pysocket_op_create(sock, PYSOCKET_TYPE_SEND, buf.buf, buf.len);
PyBuffer_Release(&buf);
return (ret);
}
static PyObject *
pysocket_sendto(struct pysocket *sock, PyObject *args)
{
Py_buffer buf;
struct pysocket_op *op;
PyObject *ret;
int port;
const char *ip, *sockaddr;
switch (sock->family) {
case AF_INET:
if (!PyArg_ParseTuple(args, "siy*", &ip, &port, &buf))
return (NULL);
if (port <= 0 || port >= USHRT_MAX) {
PyErr_SetString(PyExc_RuntimeError, "invalid port");
return (NULL);
}
break;
case AF_UNIX:
if (!PyArg_ParseTuple(args, "sy*", &sockaddr, &buf))
return (NULL);
break;
default:
PyErr_SetString(PyExc_RuntimeError, "unsupported family");
return (NULL);
}
ret = pysocket_op_create(sock, PYSOCKET_TYPE_SENDTO, buf.buf, buf.len);
PyBuffer_Release(&buf);
op = (struct pysocket_op *)ret;
switch (sock->family) {
case AF_INET:
op->sendaddr.ipv4.sin_family = AF_INET;
op->sendaddr.ipv4.sin_port = htons(port);
op->sendaddr.ipv4.sin_addr.s_addr = inet_addr(ip);
break;
case AF_UNIX:
op->sendaddr.sun.sun_family = AF_UNIX;
if (kore_strlcpy(op->sendaddr.sun.sun_path, sockaddr,
sizeof(op->sendaddr.sun.sun_path)) >=
sizeof(op->sendaddr.sun.sun_path)) {
2019-03-01 20:56:03 +01:00
Py_DECREF(ret);
PyErr_SetString(PyExc_RuntimeError,
"unix socket path too long");
return (NULL);
}
break;
default:
2019-03-01 20:56:03 +01:00
Py_DECREF(ret);
PyErr_SetString(PyExc_RuntimeError, "unsupported family");
return (NULL);
}
return (ret);
}
static PyObject *
pysocket_recv(struct pysocket *sock, PyObject *args)
{
Py_ssize_t len;
struct pysocket_op *op;
PyObject *obj;
int timeo;
timeo = -1;
if (!PyArg_ParseTuple(args, "n|i", &len, &timeo))
return (NULL);
obj = pysocket_op_create(sock, PYSOCKET_TYPE_RECV, NULL, len);
if (obj == NULL)
return (NULL);
op = (struct pysocket_op *)obj;
if (timeo != -1) {
op->timer = kore_timer_add(pysocket_op_timeout,
timeo, op, KORE_TIMER_ONESHOT);
}
return (obj);
}
static PyObject *
pysocket_recvmsg(struct pysocket *sock, PyObject *args)
{
Py_ssize_t len;
if (!PyArg_ParseTuple(args, "n", &len))
return (NULL);
return (pysocket_op_create(sock, PYSOCKET_TYPE_RECVMSG, NULL, len));
}
static PyObject *
pysocket_recvfrom(struct pysocket *sock, PyObject *args)
{
Py_ssize_t len;
if (!PyArg_ParseTuple(args, "n", &len))
return (NULL);
return (pysocket_op_create(sock, PYSOCKET_TYPE_RECVFROM, NULL, len));
}
static PyObject *
pysocket_accept(struct pysocket *sock, PyObject *args)
{
return (pysocket_op_create(sock, PYSOCKET_TYPE_ACCEPT, NULL, 0));
}
static PyObject *
pysocket_connect(struct pysocket *sock, PyObject *args)
{
const char *host;
int port, len;
port = 0;
if (!PyArg_ParseTuple(args, "s|i", &host, &port))
return (NULL);
if (port < 0 || port > USHRT_MAX) {
PyErr_SetString(PyExc_RuntimeError, "invalid port number");
return (NULL);
}
switch (sock->family) {
case AF_INET:
sock->addr.ipv4.sin_family = AF_INET;
sock->addr.ipv4.sin_port = htons(port);
if (inet_pton(sock->family, host,
&sock->addr.ipv4.sin_addr) == -1) {
PyErr_SetString(PyExc_RuntimeError, "invalid host");
return (NULL);
}
sock->addr_len = sizeof(sock->addr.ipv4);
break;
case AF_UNIX:
sock->addr.sun.sun_family = AF_UNIX;
len = snprintf(sock->addr.sun.sun_path,
sizeof(sock->addr.sun.sun_path), "%s", host);
if (len == -1 ||
(size_t)len >= sizeof(sock->addr.sun.sun_path)) {
PyErr_SetString(PyExc_RuntimeError, "path too long");
return (NULL);
}
#if defined(__linux__)
/* Assume abstract socket if prefixed with '@'. */
if (sock->addr.sun.sun_path[0] == '@')
sock->addr.sun.sun_path[0] = '\0';
#endif
sock->addr_len = sizeof(sock->addr.sun.sun_family) + len;
break;
default:
fatal("unsupported socket family %d", sock->family);
}
return (pysocket_op_create(sock, PYSOCKET_TYPE_CONNECT, NULL, 0));
}
static PyObject *
pysocket_close(struct pysocket *sock, PyObject *args)
{
if (sock->scheduled) {
sock->scheduled = 0;
kore_platform_disable_read(sock->fd);
#if !defined(__linux__)
kore_platform_disable_write(sock->fd);
#endif
}
if (sock->socket != NULL) {
Py_DECREF(sock->socket);
sock->socket = NULL;
} else if (sock->fd != -1) {
(void)close(sock->fd);
}
2019-03-21 10:09:24 +01:00
sock->fd = -1;
sock->event.evt.handle(&sock->event, 1);
2019-03-21 10:09:24 +01:00
Py_RETURN_TRUE;
}
static void
pysocket_op_dealloc(struct pysocket_op *op)
{
if (op->type == PYSOCKET_TYPE_RECV ||
op->type == PYSOCKET_TYPE_RECVMSG ||
op->type == PYSOCKET_TYPE_RECVFROM ||
op->type == PYSOCKET_TYPE_SEND ||
op->type == PYSOCKET_TYPE_SENDTO)
kore_buf_cleanup(&op->buffer);
switch (op->type) {
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_ACCEPT:
case PYSOCKET_TYPE_RECVMSG:
case PYSOCKET_TYPE_RECVFROM:
if (op->socket->recvop != op)
fatal("recvop mismatch");
op->socket->recvop = NULL;
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
case PYSOCKET_TYPE_CONNECT:
if (op->socket->sendop != op)
fatal("sendop mismatch");
op->socket->sendop = NULL;
break;
}
if (op->timer != NULL) {
kore_timer_remove(op->timer);
op->timer = NULL;
}
op->coro->sockop = NULL;
Py_DECREF(op->socket);
PyObject_Del((PyObject *)op);
}
static PyObject *
pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len)
{
struct pysocket_op *op;
if (coro_running->sockop != NULL)
fatal("pysocket_op_create: coro has active socketop");
switch (type) {
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_ACCEPT:
case PYSOCKET_TYPE_RECVMSG:
case PYSOCKET_TYPE_RECVFROM:
if (sock->recvop != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"only one recv operation can be done per socket");
return (NULL);
}
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
case PYSOCKET_TYPE_CONNECT:
if (sock->sendop != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"only one send operation can be done per socket");
return (NULL);
}
break;
default:
fatal("unknown pysocket_op type %u", type);
}
op = PyObject_New(struct pysocket_op, &pysocket_op_type);
if (op == NULL)
return (NULL);
op->eof = 0;
op->self = op;
op->type = type;
op->timer = NULL;
op->socket = sock;
op->coro = coro_running;
coro_running->sockop = op;
Py_INCREF(op->socket);
switch (type) {
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_RECVMSG:
case PYSOCKET_TYPE_RECVFROM:
sock->recvop = op;
kore_buf_init(&op->buffer, len);
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
sock->sendop = op;
kore_buf_init(&op->buffer, len);
kore_buf_append(&op->buffer, ptr, len);
kore_buf_reset(&op->buffer);
break;
case PYSOCKET_TYPE_ACCEPT:
sock->recvop = op;
break;
case PYSOCKET_TYPE_CONNECT:
sock->sendop = op;
break;
default:
fatal("unknown pysocket_op type %u", type);
}
if (sock->scheduled == 0) {
sock->scheduled = 1;
kore_platform_event_all(sock->fd, &sock->event);
}
return ((PyObject *)op);
}
static PyObject *
pysocket_op_await(PyObject *obj)
{
Py_INCREF(obj);
return (obj);
}
static PyObject *
pysocket_op_iternext(struct pysocket_op *op)
{
PyObject *ret;
if (op->socket->fd == -1) {
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
if (op->eof) {
if (op->coro->exception != NULL) {
PyErr_SetString(op->coro->exception,
op->coro->exception_msg);
op->coro->exception = NULL;
return (NULL);
}
if (op->type != PYSOCKET_TYPE_RECV) {
PyErr_SetString(PyExc_RuntimeError, "socket EOF");
return (NULL);
}
/* Drain the recv socket. */
op->socket->event.evt.flags |= KORE_EVENT_READ;
return (pysocket_async_recv(op));
}
switch (op->type) {
case PYSOCKET_TYPE_CONNECT:
ret = pysocket_async_connect(op);
break;
case PYSOCKET_TYPE_ACCEPT:
ret = pysocket_async_accept(op);
break;
case PYSOCKET_TYPE_RECV:
case PYSOCKET_TYPE_RECVMSG:
case PYSOCKET_TYPE_RECVFROM:
ret = pysocket_async_recv(op);
break;
case PYSOCKET_TYPE_SEND:
case PYSOCKET_TYPE_SENDTO:
ret = pysocket_async_send(op);
break;
default:
PyErr_SetString(PyExc_RuntimeError, "invalid op type");
return (NULL);
}
return (ret);
}
static void
pysocket_op_timeout(void *arg, u_int64_t now)
{
struct pysocket_op *op = arg;
op->eof = 1;
op->timer = NULL;
op->coro->exception = PyExc_TimeoutError;
op->coro->exception_msg = "timeout before operation completed";
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
}
static PyObject *
pysocket_async_connect(struct pysocket_op *op)
{
if (connect(op->socket->fd, (struct sockaddr *)&op->socket->addr,
op->socket->addr_len) == -1) {
if (errno != EALREADY && errno != EINPROGRESS &&
errno != EISCONN && errno != EAGAIN) {
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
if (errno != EISCONN) {
Py_RETURN_NONE;
}
}
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
static PyObject *
pysocket_async_accept(struct pysocket_op *op)
{
int fd;
struct pysocket *sock;
if (!(op->socket->event.evt.flags & KORE_EVENT_READ)) {
Py_RETURN_NONE;
}
if ((sock = pysocket_alloc()) == NULL)
return (NULL);
sock->addr_len = sizeof(sock->addr);
if ((fd = accept(op->socket->fd,
(struct sockaddr *)&sock->addr, &sock->addr_len)) == -1) {
Py_DECREF((PyObject *)sock);
if (errno == EAGAIN || errno == EWOULDBLOCK) {
op->socket->event.evt.flags &= ~KORE_EVENT_READ;
Py_RETURN_NONE;
}
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
if (!kore_connection_nonblock(fd, 0)) {
Py_DECREF((PyObject *)sock);
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
sock->fd = fd;
sock->socket = NULL;
sock->family = op->socket->family;
sock->protocol = op->socket->protocol;
PyErr_SetObject(PyExc_StopIteration, (PyObject *)sock);
Py_DECREF((PyObject *)sock);
return (NULL);
}
static PyObject *
pysocket_async_recv(struct pysocket_op *op)
{
ssize_t ret;
size_t len;
u_int16_t port;
struct iovec iov;
struct msghdr msg;
socklen_t socklen;
struct sockaddr *sendaddr;
const char *ptr, *ip;
u_int8_t ancdata[1024];
PyObject *bytes, *result, *tuple, *list;
if (!(op->socket->event.evt.flags & KORE_EVENT_READ)) {
Py_RETURN_NONE;
}
socklen = 0;
for (;;) {
switch (op->type) {
case PYSOCKET_TYPE_RECV:
ret = read(op->socket->fd, op->buffer.data,
op->buffer.length);
break;
case PYSOCKET_TYPE_RECVMSG:
memset(&msg, 0, sizeof(msg));
iov.iov_base = op->buffer.data;
iov.iov_len = op->buffer.length;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = &op->sendaddr;
msg.msg_namelen = sizeof(op->sendaddr);
msg.msg_control = ancdata;
msg.msg_controllen = sizeof(ancdata);
memset(&op->sendaddr, 0, sizeof(op->sendaddr));
ret = recvmsg(op->socket->fd, &msg, 0);
break;
case PYSOCKET_TYPE_RECVFROM:
sendaddr = (struct sockaddr *)&op->sendaddr;
switch (op->socket->family) {
case AF_INET:
socklen = sizeof(op->sendaddr.ipv4);
break;
case AF_UNIX:
socklen = sizeof(op->sendaddr.sun);
break;
default:
fatal("%s: non AF_INET/AF_UNIX", __func__);
}
memset(sendaddr, 0, socklen);
ret = recvfrom(op->socket->fd, op->buffer.data,
op->buffer.length, 0, sendaddr, &socklen);
break;
default:
fatal("%s: unknown type %d", __func__, op->type);
}
if (ret == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
op->socket->event.evt.flags &= ~KORE_EVENT_READ;
Py_RETURN_NONE;
}
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
break;
}
op->coro->exception = NULL;
op->coro->exception_msg = NULL;
if (op->timer != NULL) {
kore_timer_remove(op->timer);
op->timer = NULL;
}
if (op->type == PYSOCKET_TYPE_RECV && ret == 0) {
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
ptr = (const char *)op->buffer.data;
if ((bytes = PyBytes_FromStringAndSize(ptr, ret)) == NULL)
return (NULL);
list = NULL;
switch (op->type) {
case PYSOCKET_TYPE_RECV:
PyErr_SetObject(PyExc_StopIteration, bytes);
Py_DECREF(bytes);
return (NULL);
case PYSOCKET_TYPE_RECVMSG:
socklen = msg.msg_namelen;
if ((list = python_cmsg_to_list(&msg)) == NULL) {
Py_DECREF(bytes);
return (NULL);
}
break;
case PYSOCKET_TYPE_RECVFROM:
break;
default:
fatal("%s: unknown type %d", __func__, op->type);
}
switch(op->socket->family) {
case AF_INET:
port = ntohs(op->sendaddr.ipv4.sin_port);
ip = inet_ntoa(op->sendaddr.ipv4.sin_addr);
if (op->type == PYSOCKET_TYPE_RECVFROM)
tuple = Py_BuildValue("(sHN)", ip, port, bytes);
else
tuple = Py_BuildValue("(sHNN)", ip, port, bytes, list);
break;
case AF_UNIX:
len = strlen(op->sendaddr.sun.sun_path);
#if defined(__linux__)
if (len == 0 && socklen > 0) {
len = socklen - sizeof(sa_family_t);
op->sendaddr.sun.sun_path[0] = '@';
op->sendaddr.sun.sun_path[len] = '\0';
}
#endif
if (len == 0) {
if (op->type == PYSOCKET_TYPE_RECVFROM) {
tuple = Py_BuildValue("(ON)", Py_None, bytes);
} else {
tuple = Py_BuildValue("(ONN)",
Py_None, bytes, list);
}
} else {
if (op->type == PYSOCKET_TYPE_RECVFROM) {
tuple = Py_BuildValue("(sN)",
op->sendaddr.sun.sun_path, bytes);
} else {
tuple = Py_BuildValue("(sNN)",
op->sendaddr.sun.sun_path, bytes, list);
}
}
break;
default:
fatal("%s: non AF_INET/AF_UNIX", __func__);
}
if (tuple == NULL) {
Py_XDECREF(list);
Py_DECREF(bytes);
return (NULL);
}
result = PyObject_CallFunctionObjArgs(PyExc_StopIteration, tuple, NULL);
if (result == NULL) {
Py_DECREF(tuple);
return (NULL);
}
Py_DECREF(tuple);
PyErr_SetObject(PyExc_StopIteration, result);
Py_DECREF(result);
return (NULL);
}
static PyObject *
pysocket_async_send(struct pysocket_op *op)
{
2019-03-01 20:56:03 +01:00
ssize_t ret;
socklen_t socklen;
const struct sockaddr *sendaddr;
if (!(op->socket->event.evt.flags & KORE_EVENT_WRITE)) {
Py_RETURN_NONE;
}
for (;;) {
if (op->type == PYSOCKET_TYPE_SEND) {
ret = write(op->socket->fd,
op->buffer.data + op->buffer.offset,
op->buffer.length - op->buffer.offset);
} else {
sendaddr = (const struct sockaddr *)&op->sendaddr;
2019-03-01 20:56:03 +01:00
switch (op->socket->family) {
case AF_INET:
socklen = sizeof(op->sendaddr.ipv4);
break;
case AF_UNIX:
socklen = sizeof(op->sendaddr.sun);
#if defined(__linux__)
if (op->sendaddr.sun.sun_path[0] == '@') {
socklen = sizeof(sa_family_t) +
strlen(op->sendaddr.sun.sun_path);
op->sendaddr.sun.sun_path[0] = '\0';
}
#endif
break;
default:
2019-03-01 20:56:03 +01:00
fatal("non AF_INET/AF_UNIX in %s", __func__);
}
ret = sendto(op->socket->fd,
op->buffer.data + op->buffer.offset,
op->buffer.length - op->buffer.offset,
0, sendaddr, socklen);
}
if (ret == -1) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
op->socket->event.evt.flags &=
~KORE_EVENT_WRITE;
Py_RETURN_NONE;
}
PyErr_SetString(PyExc_RuntimeError, errno_s);
return (NULL);
}
break;
}
op->buffer.offset += (size_t)ret;
if (op->buffer.offset == op->buffer.length) {
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
Py_RETURN_NONE;
}
static void
pysocket_evt_handle(void *arg, int eof)
{
struct pysocket_event *event = arg;
struct pysocket *socket = event->s;
2019-03-13 15:45:55 +01:00
if ((eof || (event->evt.flags & KORE_EVENT_READ)) &&
socket->recvop != NULL) {
if (socket->recvop->coro->request != NULL)
http_request_wakeup(socket->recvop->coro->request);
else
python_coro_wakeup(socket->recvop->coro);
socket->recvop->eof = eof;
}
2019-03-13 15:45:55 +01:00
if ((eof || (event->evt.flags & KORE_EVENT_WRITE)) &&
socket->sendop != NULL) {
if (socket->sendop->coro->request != NULL)
http_request_wakeup(socket->sendop->coro->request);
else
python_coro_wakeup(socket->sendop->coro);
socket->sendop->eof = eof;
}
}
static void
pyqueue_dealloc(struct pyqueue *queue)
{
struct pyqueue_object *object;
struct pyqueue_waiting *waiting;
while ((object = TAILQ_FIRST(&queue->objects)) != NULL) {
TAILQ_REMOVE(&queue->objects, object, list);
Py_DECREF(object->obj);
kore_pool_put(&queue_object_pool, object);
}
while ((waiting = TAILQ_FIRST(&queue->waiting)) != NULL) {
TAILQ_REMOVE(&queue->waiting, waiting, list);
if (waiting->op != NULL)
waiting->op->waiting = NULL;
kore_pool_put(&queue_wait_pool, waiting);
}
PyObject_Del((PyObject *)queue);
}
static PyObject *
pyqueue_pop(struct pyqueue *queue, PyObject *args)
{
struct pyqueue_op *op;
if ((op = PyObject_New(struct pyqueue_op, &pyqueue_op_type)) == NULL)
return (NULL);
op->queue = queue;
op->waiting = kore_pool_get(&queue_wait_pool);
op->waiting->op = op;
op->waiting->coro = coro_running;
TAILQ_INSERT_TAIL(&queue->waiting, op->waiting, list);
Py_INCREF((PyObject *)queue);
return ((PyObject *)op);
}
2018-10-22 08:28:03 +02:00
static PyObject *
pyqueue_popnow(struct pyqueue *queue, PyObject *args)
{
PyObject *obj;
struct pyqueue_object *object;
if ((object = TAILQ_FIRST(&queue->objects)) == NULL) {
Py_RETURN_NONE;
}
TAILQ_REMOVE(&queue->objects, object, list);
obj = object->obj;
kore_pool_put(&queue_object_pool, object);
return (obj);
}
static PyObject *
pyqueue_push(struct pyqueue *queue, PyObject *args)
{
PyObject *obj;
struct pyqueue_object *object;
struct pyqueue_waiting *waiting;
if (!PyArg_ParseTuple(args, "O", &obj))
return (NULL);
Py_INCREF(obj);
object = kore_pool_get(&queue_object_pool);
object->obj = obj;
TAILQ_INSERT_TAIL(&queue->objects, object, list);
/* Wakeup first in line if any. */
if ((waiting = TAILQ_FIRST(&queue->waiting)) != NULL) {
TAILQ_REMOVE(&queue->waiting, waiting, list);
/* wakeup HTTP request if one is tied. */
if (waiting->coro->request != NULL)
http_request_wakeup(waiting->coro->request);
else
python_coro_wakeup(waiting->coro);
waiting->op->waiting = NULL;
kore_pool_put(&queue_wait_pool, waiting);
}
Py_RETURN_TRUE;
}
static void
pyqueue_op_dealloc(struct pyqueue_op *op)
{
if (op->waiting != NULL) {
TAILQ_REMOVE(&op->queue->waiting, op->waiting, list);
kore_pool_put(&queue_wait_pool, op->waiting);
op->waiting = NULL;
}
Py_DECREF((PyObject *)op->queue);
PyObject_Del((PyObject *)op);
}
static PyObject *
pyqueue_op_await(PyObject *obj)
{
Py_INCREF(obj);
return (obj);
}
static PyObject *
pyqueue_op_iternext(struct pyqueue_op *op)
{
PyObject *obj;
struct pyqueue_object *object;
struct pyqueue_waiting *waiting;
if ((object = TAILQ_FIRST(&op->queue->objects)) == NULL) {
Py_RETURN_NONE;
}
TAILQ_REMOVE(&op->queue->objects, object, list);
obj = object->obj;
kore_pool_put(&queue_object_pool, object);
TAILQ_FOREACH(waiting, &op->queue->waiting, list) {
if (waiting->coro->id == coro_running->id) {
TAILQ_REMOVE(&op->queue->waiting, waiting, list);
waiting->op->waiting = NULL;
kore_pool_put(&queue_wait_pool, waiting);
break;
}
}
PyErr_SetObject(PyExc_StopIteration, obj);
Py_DECREF(obj);
return (NULL);
}
static void
pylock_dealloc(struct pylock *lock)
{
struct pylock_op *op;
while ((op = TAILQ_FIRST(&lock->ops)) != NULL) {
TAILQ_REMOVE(&lock->ops, op, list);
op->active = 0;
op->coro->lockop = NULL;
Py_DECREF((PyObject *)op);
}
PyObject_Del((PyObject *)lock);
}
static PyObject *
pylock_trylock(struct pylock *lock, PyObject *args)
{
if (lock->owner != NULL)
Py_RETURN_FALSE;
lock->owner = coro_running;
Py_RETURN_TRUE;
}
static PyObject *
pylock_release(struct pylock *lock, PyObject *args)
{
if (lock->owner == NULL) {
PyErr_SetString(PyExc_RuntimeError, "no lock owner set");
return (NULL);
}
if (lock->owner->id != coro_running->id) {
PyErr_SetString(PyExc_RuntimeError, "lock not owned by caller");
return (NULL);
}
pylock_do_release(lock);
Py_RETURN_NONE;
}
static PyObject *
pylock_aenter(struct pylock *lock, PyObject *args)
{
struct pylock_op *op;
if (coro_running->lockop != NULL) {
fatal("%s: lockop not NULL for %" PRIu64,
__func__, coro_running->id);
}
if (lock->owner != NULL && lock->owner->id == coro_running->id) {
PyErr_SetString(PyExc_RuntimeError, "recursive lock detected");
return (NULL);
}
if ((op = PyObject_New(struct pylock_op, &pylock_op_type)) == NULL)
return (NULL);
op->active = 1;
op->lock = lock;
op->locking = 1;
op->coro = coro_running;
coro_running->lockop = op;
Py_INCREF((PyObject *)op);
Py_INCREF((PyObject *)lock);
TAILQ_INSERT_TAIL(&lock->ops, op, list);
return ((PyObject *)op);
}
static PyObject *
pylock_aexit(struct pylock *lock, PyObject *args)
{
struct pylock_op *op;
if (coro_running->lockop != NULL) {
fatal("%s: lockop not NULL for %" PRIu64,
__func__, coro_running->id);
}
if (lock->owner == NULL || lock->owner->id != coro_running->id) {
PyErr_SetString(PyExc_RuntimeError, "invalid lock owner");
return (NULL);
}
if ((op = PyObject_New(struct pylock_op, &pylock_op_type)) == NULL)
return (NULL);
op->active = 1;
op->lock = lock;
op->locking = 0;
op->coro = coro_running;
coro_running->lockop = op;
Py_INCREF((PyObject *)op);
Py_INCREF((PyObject *)lock);
TAILQ_INSERT_TAIL(&lock->ops, op, list);
return ((PyObject *)op);
}
static void
pylock_do_release(struct pylock *lock)
{
struct pylock_op *op;
lock->owner = NULL;
TAILQ_FOREACH(op, &lock->ops, list) {
if (op->locking == 0)
continue;
op->active = 0;
op->coro->lockop = NULL;
TAILQ_REMOVE(&lock->ops, op, list);
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
Py_DECREF((PyObject *)op);
break;
}
}
static void
pylock_op_dealloc(struct pylock_op *op)
{
if (op->active) {
TAILQ_REMOVE(&op->lock->ops, op, list);
op->active = 0;
}
op->coro->lockop = NULL;
Py_DECREF((PyObject *)op->lock);
PyObject_Del((PyObject *)op);
}
static PyObject *
pylock_op_await(PyObject *obj)
{
Py_INCREF(obj);
return (obj);
}
static PyObject *
pylock_op_iternext(struct pylock_op *op)
{
if (op->locking == 0) {
if (op->lock->owner == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"no lock owner set");
return (NULL);
}
if (op->lock->owner->id != coro_running->id) {
PyErr_SetString(PyExc_RuntimeError,
"lock not owned by caller");
return (NULL);
}
pylock_do_release(op->lock);
} else {
if (op->lock->owner != NULL) {
/*
* We could be beat by another coroutine that grabbed
* the lock even if we were the one woken up for it.
*/
if (op->active == 0) {
op->active = 1;
op->coro->lockop = op;
TAILQ_INSERT_HEAD(&op->lock->ops, op, list);
Py_INCREF((PyObject *)op);
}
Py_RETURN_NONE;
}
op->lock->owner = coro_running;
}
if (op->active) {
op->active = 0;
op->coro->lockop = NULL;
TAILQ_REMOVE(&op->lock->ops, op, list);
Py_DECREF((PyObject *)op);
}
PyErr_SetNone(PyExc_StopIteration);
return (NULL);
}
static void
pyproc_timeout(void *arg, u_int64_t now)
{
struct pyproc *proc = arg;
proc->timer = NULL;
if (proc->coro->sockop != NULL)
proc->coro->sockop->eof = 1;
proc->coro->exception = PyExc_TimeoutError;
proc->coro->exception_msg = "timeout before process exited";
if (proc->coro->request != NULL)
http_request_wakeup(proc->coro->request);
else
python_coro_wakeup(proc->coro);
}
static void
pyproc_dealloc(struct pyproc *proc)
{
int status;
TAILQ_REMOVE(&procs, proc, list);
if (proc->timer != NULL) {
kore_timer_remove(proc->timer);
proc->timer = NULL;
}
if (proc->pid != -1) {
if (kill(proc->pid, SIGKILL) == -1) {
kore_log(LOG_NOTICE,
"kore.proc failed to send SIGKILL %d (%s)",
proc->pid, errno_s);
}
for (;;) {
if (waitpid(proc->pid, &status, 0) == -1) {
if (errno == EINTR)
continue;
kore_log(LOG_NOTICE,
"kore.proc failed to wait for %d (%s)",
proc->pid, errno_s);
}
break;
}
}
if (proc->in != NULL) {
Py_DECREF((PyObject *)proc->in);
proc->in = NULL;
}
if (proc->out != NULL) {
Py_DECREF((PyObject *)proc->out);
proc->out = NULL;
}
PyObject_Del((PyObject *)proc);
}
static PyObject *
pyproc_kill(struct pyproc *proc, PyObject *args)
{
if (proc->pid != -1 && kill(proc->pid, SIGKILL) == -1)
kore_log(LOG_NOTICE, "kill(%d): %s", proc->pid, errno_s);
Py_RETURN_TRUE;
}
static PyObject *
pyproc_reap(struct pyproc *proc, PyObject *args)
{
struct pyproc_op *op;
if (proc->op != NULL) {
PyErr_Format(PyExc_RuntimeError,
"process %d already being reaped", proc->apid);
return (NULL);
}
if (proc->timer != NULL) {
kore_timer_remove(proc->timer);
proc->timer = NULL;
}
if ((op = PyObject_New(struct pyproc_op, &pyproc_op_type)) == NULL)
return (NULL);
op->proc = proc;
op->coro = coro_running;
proc->op = op;
Py_INCREF((PyObject *)proc);
return ((PyObject *)op);
}
static PyObject *
pyproc_recv(struct pyproc *proc, PyObject *args)
{
2019-02-26 15:22:55 +01:00
Py_ssize_t len;
struct pysocket_op *op;
PyObject *obj;
int timeo;
timeo = -1;
if (proc->out == NULL) {
PyErr_SetString(PyExc_RuntimeError, "stdout closed");
return (NULL);
}
2019-02-26 15:22:55 +01:00
if (!PyArg_ParseTuple(args, "n|i", &len, &timeo))
return (NULL);
obj = pysocket_op_create(proc->out, PYSOCKET_TYPE_RECV, NULL, len);
if (obj == NULL)
return (NULL);
2019-02-26 15:22:55 +01:00
op = (struct pysocket_op *)obj;
if (timeo != -1) {
op->timer = kore_timer_add(pysocket_op_timeout,
2019-02-26 15:22:55 +01:00
timeo, op, KORE_TIMER_ONESHOT);
}
return (obj);
}
static PyObject *
pyproc_send(struct pyproc *proc, PyObject *args)
{
Py_buffer buf;
PyObject *ret;
if (proc->in == NULL) {
PyErr_SetString(PyExc_RuntimeError, "stdin closed");
return (NULL);
}
if (!PyArg_ParseTuple(args, "y*", &buf))
return (NULL);
ret = pysocket_op_create(proc->in,
PYSOCKET_TYPE_SEND, buf.buf, buf.len);
PyBuffer_Release(&buf);
return (ret);
}
static PyObject *
pyproc_close_stdin(struct pyproc *proc, PyObject *args)
{
if (proc->in != NULL) {
Py_DECREF((PyObject *)proc->in);
proc->in = NULL;
}
Py_RETURN_TRUE;
}
static PyObject *
pyproc_get_pid(struct pyproc *proc, void *closure)
{
return (PyLong_FromLong(proc->apid));
}
static void
pyproc_op_dealloc(struct pyproc_op *op)
{
Py_DECREF((PyObject *)op->proc);
PyObject_Del((PyObject *)op);
}
static PyObject *
pyproc_op_await(PyObject *sop)
{
Py_INCREF(sop);
return (sop);
}
static PyObject *
pyproc_op_iternext(struct pyproc_op *op)
{
int ret;
PyObject *res;
if (op->proc->coro->exception != NULL) {
PyErr_SetString(op->proc->coro->exception,
op->proc->coro->exception_msg);
op->proc->coro->exception = NULL;
return (NULL);
}
if (op->proc->reaped == 0)
Py_RETURN_NONE;
if (WIFSTOPPED(op->proc->status)) {
op->proc->reaped = 0;
Py_RETURN_NONE;
}
if (WIFEXITED(op->proc->status)) {
ret = WEXITSTATUS(op->proc->status);
} else {
ret = op->proc->status;
}
if ((res = PyLong_FromLong(ret)) == NULL)
return (NULL);
PyErr_SetObject(PyExc_StopIteration, res);
Py_DECREF(res);
return (NULL);
}
static void
pygather_reap_coro(struct pygather_op *op, struct python_coro *reap)
{
struct pygather_coro *coro;
struct pygather_result *result;
#if PY_VERSION_HEX >= 0x030A0000
PyObject *type, *traceback;
#endif
TAILQ_FOREACH(coro, &op->coroutines, list) {
if (coro->coro->id == reap->id)
break;
}
if (coro == NULL)
fatal("coroutine %" PRIu64 " not found in gather", reap->id);
op->running--;
if (op->running < 0)
fatal("gatherop: running miscount (%d)", op->running);
result = kore_pool_get(&gather_result_pool);
result->obj = NULL;
#if PY_VERSION_HEX < 0x030A0000
if (_PyGen_FetchStopIterationValue(&result->obj) == -1) {
result->obj = Py_None;
Py_INCREF(Py_None);
}
#else
if (PyErr_Occurred()) {
Py_XDECREF(coro->coro->result);
PyErr_Fetch(&type, &coro->coro->result, &traceback);
Py_DECREF(type);
Py_XDECREF(traceback);
} else {
if (coro->coro->result == NULL) {
coro->coro->result = Py_None;
Py_INCREF(Py_None);
}
}
result->obj = coro->coro->result;
Py_INCREF(result->obj);
#endif
TAILQ_INSERT_TAIL(&op->results, result, list);
TAILQ_REMOVE(&op->coroutines, coro, list);
kore_pool_put(&gather_coro_pool, coro);
kore_python_coro_delete(reap);
}
static void
pygather_op_dealloc(struct pygather_op *op)
{
struct python_coro *old;
struct pygather_coro *coro, *next;
struct pygather_result *res, *rnext;
/*
* Since we are calling kore_python_coro_delete() on all the
* remaining coroutines in this gather op we must remember the
* original coroutine that is running as the removal will end
* up setting coro_running to NULL.
*/
old = coro_running;
for (coro = TAILQ_FIRST(&op->coroutines); coro != NULL; coro = next) {
next = TAILQ_NEXT(coro, list);
TAILQ_REMOVE(&op->coroutines, coro, list);
/* Make sure we don't end up in pygather_reap_coro(). */
coro->coro->gatherop = NULL;
kore_python_coro_delete(coro->coro);
kore_pool_put(&gather_coro_pool, coro);
}
coro_running = old;
for (res = TAILQ_FIRST(&op->results); res != NULL; res = rnext) {
rnext = TAILQ_NEXT(res, list);
TAILQ_REMOVE(&op->results, res, list);
Py_DECREF(res->obj);
kore_pool_put(&gather_result_pool, res);
}
PyObject_Del((PyObject *)op);
}
static PyObject *
pygather_op_await(PyObject *obj)
{
Py_INCREF(obj);
return (obj);
}
static PyObject *
pygather_op_iternext(struct pygather_op *op)
{
int idx;
struct pygather_coro *coro;
struct pygather_result *res, *next;
PyObject *list, *obj;
if (!TAILQ_EMPTY(&op->coroutines)) {
if (op->running > 0)
Py_RETURN_NONE;
TAILQ_FOREACH(coro, &op->coroutines, list) {
if (op->running >= op->concurrency)
break;
python_coro_wakeup(coro->coro);
op->running++;
}
Py_RETURN_NONE;
}
if ((list = PyList_New(op->count)) == NULL)
return (NULL);
idx = 0;
for (res = TAILQ_FIRST(&op->results); res != NULL; res = next) {
next = TAILQ_NEXT(res, list);
TAILQ_REMOVE(&op->results, res, list);
obj = res->obj;
res->obj = NULL;
kore_pool_put(&gather_result_pool, res);
if (PyList_SetItem(list, idx++, obj) != 0) {
Py_DECREF(list);
return (NULL);
}
}
PyErr_SetObject(PyExc_StopIteration, list);
Py_DECREF(list);
return (NULL);
}
static PyObject *
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
pyhttp_request_alloc(const struct http_request *req)
{
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
union { const void *cp; void *p; } ptr;
struct pyhttp_request *pyreq;
pyreq = PyObject_New(struct pyhttp_request, &pyhttp_request_type);
if (pyreq == NULL)
return (NULL);
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
/*
* Hack around all http apis taking a non-const pointer and us having
* a const pointer for the req data structure. This is because we
* could potentially be called from a validator where the argument
* is a http_request pointer.
*/
ptr.cp = req;
pyreq->req = ptr.p;
pyreq->data = NULL;
pyreq->dict = NULL;
return ((PyObject *)pyreq);
}
2017-02-01 21:23:56 +01:00
static PyObject *
pyhttp_file_alloc(struct http_file *file)
{
struct pyhttp_file *pyfile;
pyfile = PyObject_New(struct pyhttp_file, &pyhttp_file_type);
if (pyfile == NULL)
return (NULL);
pyfile->file = file;
return ((PyObject *)pyfile);
}
2019-06-07 21:06:54 +02:00
static int
pyhttp_preprocess(struct http_request *req)
{
struct reqcall *rq;
PyObject *ret;
rq = req->py_rqnext;
while (rq) {
2019-06-07 21:33:00 +02:00
req->py_rqnext = TAILQ_NEXT(rq, list);
2019-06-07 21:06:54 +02:00
PyErr_Clear();
ret = PyObject_CallFunctionObjArgs(rq->f, req->py_req, NULL);
if (ret == NULL) {
kore_python_log_error("preprocess");
http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
return (KORE_RESULT_ERROR);
}
if (ret == Py_False) {
Py_DECREF(ret);
return (KORE_RESULT_ERROR);
}
if (PyCoro_CheckExact(ret)) {
req->py_coro = python_coro_create(ret, req);
if (python_coro_run(req->py_coro) == KORE_RESULT_OK) {
http_request_wakeup(req);
kore_python_coro_delete(req->py_coro);
req->py_coro = NULL;
rq = req->py_rqnext;
continue;
}
return (KORE_RESULT_RETRY);
}
Py_DECREF(ret);
rq = req->py_rqnext;
}
return (KORE_RESULT_OK);
}
static PyObject *
pyhttp_response(struct pyhttp_request *pyreq, PyObject *args)
{
struct connection *c;
char *ptr;
Py_ssize_t length;
int status;
struct pyhttp_iterobj *iterobj;
PyObject *obj, *iterator;
length = -1;
if (!PyArg_ParseTuple(args, "iO", &status, &obj))
return (NULL);
if (PyBytes_CheckExact(obj)) {
if (PyBytes_AsStringAndSize(obj, &ptr, &length) == -1)
return (NULL);
if (length < 0) {
PyErr_SetString(PyExc_TypeError, "invalid length");
return (NULL);
}
Py_INCREF(obj);
http_response_stream(pyreq->req, status, ptr, length,
pyhttp_response_sent, obj);
} else if (obj == Py_None) {
http_response(pyreq->req, status, NULL, 0);
} else {
c = pyreq->req->owner;
if (c->state == CONN_STATE_DISCONNECTING) {
Py_RETURN_FALSE;
}
if ((iterator = PyObject_GetIter(obj)) == NULL)
return (NULL);
iterobj = kore_pool_get(&iterobj_pool);
iterobj->iterator = iterator;
iterobj->connection = c;
iterobj->remove = 0;
kore_buf_init(&iterobj->buf, 4096);
c->hdlr_extra = iterobj;
c->flags |= CONN_IS_BUSY;
c->disconnect = pyhttp_iterobj_disconnect;
pyreq->req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH;
http_response_header(pyreq->req, "transfer-encoding",
"chunked");
http_response(pyreq->req, status, NULL, 0);
pyhttp_iterobj_next(iterobj);
}
Py_RETURN_TRUE;
}
static int
pyhttp_response_sent(struct netbuf *nb)
{
PyObject *data;
data = nb->extra;
Py_DECREF(data);
return (KORE_RESULT_OK);
}
static int
pyhttp_iterobj_next(struct pyhttp_iterobj *iterobj)
{
struct netbuf *nb;
PyObject *obj;
const char *ptr;
Py_ssize_t length;
PyErr_Clear();
if ((obj = PyIter_Next(iterobj->iterator)) == NULL) {
if (PyErr_Occurred()) {
kore_python_log_error("pyhttp_iterobj_next");
return (KORE_RESULT_ERROR);
}
return (KORE_RESULT_OK);
}
if ((ptr = PyUnicode_AsUTF8AndSize(obj, &length)) == NULL) {
kore_python_log_error("pyhttp_iterobj_next");
return (KORE_RESULT_ERROR);
}
kore_buf_reset(&iterobj->buf);
kore_buf_appendf(&iterobj->buf, "%lx\r\n", length);
kore_buf_append(&iterobj->buf, ptr, length);
kore_buf_appendf(&iterobj->buf, "\r\n");
Py_DECREF(obj);
net_send_stream(iterobj->connection, iterobj->buf.data,
iterobj->buf.offset, pyhttp_iterobj_chunk_sent, &nb);
nb->extra = iterobj;
return (KORE_RESULT_RETRY);
}
static int
pyhttp_iterobj_chunk_sent(struct netbuf *nb)
{
int ret;
struct pyhttp_iterobj *iterobj;
iterobj = nb->extra;
if (iterobj->remove) {
ret = KORE_RESULT_ERROR;
} else {
ret = pyhttp_iterobj_next(iterobj);
}
if (ret != KORE_RESULT_RETRY) {
iterobj->connection->hdlr_extra = NULL;
iterobj->connection->disconnect = NULL;
iterobj->connection->flags &= ~CONN_IS_BUSY;
if (iterobj->remove == 0)
http_start_recv(iterobj->connection);
kore_buf_reset(&iterobj->buf);
kore_buf_appendf(&iterobj->buf, "0\r\n\r\n");
net_send_queue(iterobj->connection,
iterobj->buf.data, iterobj->buf.offset);
Py_DECREF(iterobj->iterator);
kore_buf_cleanup(&iterobj->buf);
kore_pool_put(&iterobj_pool, iterobj);
} else {
ret = KORE_RESULT_OK;
}
return (ret);
}
static void
pyhttp_iterobj_disconnect(struct connection *c)
{
struct pyhttp_iterobj *iterobj;
iterobj = c->hdlr_extra;
iterobj->remove = 1;
c->hdlr_extra = NULL;
}
static PyObject *
pyhttp_response_header(struct pyhttp_request *pyreq, PyObject *args)
{
const char *header, *value;
if (!PyArg_ParseTuple(args, "ss", &header, &value))
return (NULL);
http_response_header(pyreq->req, header, value);
Py_RETURN_TRUE;
}
static PyObject *
pyhttp_request_header(struct pyhttp_request *pyreq, PyObject *args)
{
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
const char *value;
const char *header;
PyObject *result;
if (!PyArg_ParseTuple(args, "s", &header))
return (NULL);
if (!http_request_header(pyreq->req, header, &value)) {
Py_RETURN_NONE;
}
if ((result = PyUnicode_FromString(value)) == NULL)
return (PyErr_NoMemory());
return (result);
}
static PyObject *
pyhttp_body_read(struct pyhttp_request *pyreq, PyObject *args)
{
ssize_t ret;
size_t len;
Py_ssize_t pylen;
PyObject *result;
u_int8_t buf[1024];
if (!PyArg_ParseTuple(args, "n", &pylen) || pylen < 0)
return (NULL);
len = (size_t)pylen;
if (len > sizeof(buf)) {
PyErr_SetString(PyExc_RuntimeError, "len > sizeof(buf)");
return (NULL);
}
ret = http_body_read(pyreq->req, buf, len);
if (ret == -1) {
PyErr_SetString(PyExc_RuntimeError, "http_body_read() failed");
return (NULL);
}
result = Py_BuildValue("ny#", ret, buf, ret);
if (result == NULL)
return (PyErr_NoMemory());
return (result);
}
static PyObject *
pyhttp_populate_get(struct pyhttp_request *pyreq, PyObject *args)
{
http_populate_get(pyreq->req);
Py_RETURN_TRUE;
}
static PyObject *
pyhttp_populate_post(struct pyhttp_request *pyreq, PyObject *args)
{
http_populate_post(pyreq->req);
Py_RETURN_TRUE;
}
2017-02-01 21:23:56 +01:00
static PyObject *
pyhttp_populate_multi(struct pyhttp_request *pyreq, PyObject *args)
{
http_populate_multipart_form(pyreq->req);
Py_RETURN_TRUE;
}
static PyObject *
pyhttp_populate_cookies(struct pyhttp_request *pyreq, PyObject *args)
{
http_populate_cookies(pyreq->req);
Py_RETURN_TRUE;
}
static PyObject *
pyhttp_argument(struct pyhttp_request *pyreq, PyObject *args)
{
const char *name;
PyObject *value;
char *string;
if (!PyArg_ParseTuple(args, "s", &name))
return (NULL);
if (!http_argument_get_string(pyreq->req, name, &string)) {
Py_RETURN_NONE;
}
if ((value = PyUnicode_FromString(string)) == NULL)
return (PyErr_NoMemory());
return (value);
}
static PyObject *
pyhttp_cookie(struct pyhttp_request *pyreq, PyObject *args)
{
const char *name;
PyObject *value;
char *string;
if (!PyArg_ParseTuple(args, "s", &name))
return (NULL);
if (!http_request_cookie(pyreq->req, name, &string)) {
Py_RETURN_NONE;
}
if ((value = PyUnicode_FromString(string)) == NULL)
return (NULL);
return (value);
}
static PyObject *
pyhttp_headers(struct pyhttp_request *pyreq, PyObject *args)
{
struct http_header *hdr;
struct http_request *req;
PyObject *obj, *dict, *ret;
ret = NULL;
obj = NULL;
dict = NULL;
req = pyreq->req;
if ((dict = PyDict_New()) == NULL)
goto cleanup;
if ((obj = PyUnicode_FromString(req->host)) == NULL)
goto cleanup;
if (PyDict_SetItemString(dict, "host", obj) == -1)
goto cleanup;
TAILQ_FOREACH(hdr, &req->req_headers, list) {
if ((obj = PyUnicode_FromString(hdr->value)) == NULL)
goto cleanup;
if (PyDict_SetItemString(dict, hdr->header, obj) == -1)
goto cleanup;
}
ret = dict;
obj = NULL;
dict = NULL;
cleanup:
Py_XDECREF(obj);
Py_XDECREF(dict);
return (ret);
}
2017-02-01 21:23:56 +01:00
static PyObject *
pyhttp_file_lookup(struct pyhttp_request *pyreq, PyObject *args)
{
const char *name;
struct http_file *file;
PyObject *pyfile;
if (!PyArg_ParseTuple(args, "s", &name))
2017-02-01 21:23:56 +01:00
return (NULL);
if ((file = http_file_lookup(pyreq->req, name)) == NULL) {
Py_RETURN_NONE;
}
if ((pyfile = pyhttp_file_alloc(file)) == NULL)
return (PyErr_NoMemory());
return (pyfile);
}
static PyObject *
pyhttp_file_read(struct pyhttp_file *pyfile, PyObject *args)
{
ssize_t ret;
size_t len;
Py_ssize_t pylen;
PyObject *result;
u_int8_t buf[1024];
if (!PyArg_ParseTuple(args, "n", &pylen) || pylen < 0)
2017-02-01 21:23:56 +01:00
return (NULL);
len = (size_t)pylen;
if (len > sizeof(buf)) {
PyErr_SetString(PyExc_RuntimeError, "len > sizeof(buf)");
return (NULL);
}
ret = http_file_read(pyfile->file, buf, len);
if (ret == -1) {
PyErr_SetString(PyExc_RuntimeError, "http_file_read() failed");
return (NULL);
}
result = Py_BuildValue("ny#", ret, buf, ret);
2017-02-01 21:23:56 +01:00
if (result == NULL)
return (PyErr_NoMemory());
return (result);
}
static PyObject *
pyhttp_websocket_handshake(struct pyhttp_request *pyreq, PyObject *args)
{
struct connection *c;
PyObject *onconnect, *onmsg, *ondisconnect;
if (!PyArg_ParseTuple(args, "OOO", &onconnect, &onmsg, &ondisconnect))
return (NULL);
kore_websocket_handshake(pyreq->req, NULL, NULL, NULL);
c = pyreq->req->owner;
Py_INCREF(onconnect);
Py_INCREF(onmsg);
Py_INCREF(ondisconnect);
c->ws_connect = kore_calloc(1, sizeof(struct kore_runtime_call));
c->ws_connect->addr = onconnect;
c->ws_connect->runtime = &kore_python_runtime;
c->ws_message = kore_calloc(1, sizeof(struct kore_runtime_call));
c->ws_message->addr = onmsg;
c->ws_message->runtime = &kore_python_runtime;
c->ws_disconnect = kore_calloc(1, sizeof(struct kore_runtime_call));
c->ws_disconnect->addr = ondisconnect;
c->ws_disconnect->runtime = &kore_python_runtime;
python_runtime_connect(onconnect, c);
Py_RETURN_TRUE;
}
static PyObject *
pyconnection_websocket_send(struct pyconnection *pyc, PyObject *args)
{
int op;
ssize_t len;
const char *data;
if (pyc->c->proto != CONN_PROTO_WEBSOCKET) {
PyErr_SetString(PyExc_TypeError, "not a websocket connection");
return (NULL);
}
len = -1;
if (!PyArg_ParseTuple(args, "iy#", &op, &data, &len))
return (NULL);
if (len < 0) {
PyErr_SetString(PyExc_TypeError, "invalid length");
return (NULL);
}
switch (op) {
case WEBSOCKET_OP_TEXT:
case WEBSOCKET_OP_BINARY:
break;
default:
PyErr_SetString(PyExc_TypeError, "invalid op parameter");
return (NULL);
}
kore_websocket_send(pyc->c, op, data, len);
Py_RETURN_TRUE;
}
static PyObject *
python_websocket_broadcast(PyObject *self, PyObject *args)
{
struct connection *c;
ssize_t len;
2017-01-30 20:30:42 +01:00
struct pyconnection *pyc;
const char *data;
PyObject *pysrc;
int op, broadcast;
len = -1;
2017-01-30 20:30:42 +01:00
if (!PyArg_ParseTuple(args, "Oiy#i", &pysrc, &op, &data, &len,
&broadcast))
2017-01-30 20:30:42 +01:00
return (NULL);
if (len < 0) {
PyErr_SetString(PyExc_TypeError, "invalid length");
return (NULL);
}
switch (op) {
case WEBSOCKET_OP_TEXT:
case WEBSOCKET_OP_BINARY:
break;
default:
PyErr_SetString(PyExc_TypeError, "invalid op parameter");
return (NULL);
}
if (pysrc == Py_None) {
c = NULL;
} else {
if (!PyObject_TypeCheck(pysrc, &pyconnection_type))
return (NULL);
pyc = (struct pyconnection *)pysrc;
c = pyc->c;
}
kore_websocket_broadcast(c, op, data, len, broadcast);
Py_RETURN_TRUE;
}
static PyObject *
pyhttp_get_host(struct pyhttp_request *pyreq, void *closure)
{
PyObject *host;
if ((host = PyUnicode_FromString(pyreq->req->host)) == NULL)
return (PyErr_NoMemory());
return (host);
}
static PyObject *
pyhttp_get_path(struct pyhttp_request *pyreq, void *closure)
{
PyObject *path;
if ((path = PyUnicode_FromString(pyreq->req->path)) == NULL)
return (PyErr_NoMemory());
return (path);
}
static PyObject *
pyhttp_get_body(struct pyhttp_request *pyreq, void *closure)
{
ssize_t ret;
struct kore_buf buf;
PyObject *body;
u_int8_t data[BUFSIZ];
kore_buf_init(&buf, 1024);
2017-03-13 11:19:58 +01:00
if (!http_body_rewind(pyreq->req)) {
PyErr_SetString(PyExc_RuntimeError,
"http_body_rewind() failed");
return (NULL);
}
for (;;) {
ret = http_body_read(pyreq->req, data, sizeof(data));
if (ret == -1) {
kore_buf_cleanup(&buf);
PyErr_SetString(PyExc_RuntimeError,
"http_body_read() failed");
return (NULL);
}
if (ret == 0)
break;
kore_buf_append(&buf, data, (size_t)ret);
}
body = PyBytes_FromStringAndSize((char *)buf.data, buf.offset);
kore_buf_free(&buf);
if (body == NULL)
return (PyErr_NoMemory());
return (body);
}
static PyObject *
pyhttp_get_agent(struct pyhttp_request *pyreq, void *closure)
{
return (PyUnicode_FromString(pyreq->req->path));
}
static PyObject *
pyhttp_get_method(struct pyhttp_request *pyreq, void *closure)
{
return (PyLong_FromUnsignedLong(pyreq->req->method));
}
static PyObject *
pyhttp_get_protocol(struct pyhttp_request *pyreq, void *closure)
{
struct connection *c;
const char *proto;
c = pyreq->req->owner;
if (c->owner->server->tls)
proto = "https";
else
proto = "http";
return (PyUnicode_FromString(proto));
}
2017-02-22 17:52:57 +01:00
static PyObject *
pyhttp_get_body_path(struct pyhttp_request *pyreq, void *closure)
{
if (pyreq->req->http_body_path == NULL) {
Py_RETURN_NONE;
}
return (PyUnicode_FromString(pyreq->req->http_body_path));
}
2017-02-22 17:52:57 +01:00
static PyObject *
pyhttp_get_body_digest(struct pyhttp_request *pyreq, void *closure)
{
PyObject *digest;
digest = PyBytes_FromStringAndSize((char *)pyreq->req->http_body_digest,
sizeof(pyreq->req->http_body_digest));
return (digest);
2017-02-22 17:52:57 +01:00
}
static PyObject *
pyhttp_get_connection(struct pyhttp_request *pyreq, void *closure)
{
PyObject *pyc;
if (pyreq->req->owner == NULL) {
Py_RETURN_NONE;
}
if ((pyc = pyconnection_alloc(pyreq->req->owner)) == NULL)
return (PyErr_NoMemory());
return (pyc);
}
2017-02-01 21:23:56 +01:00
static PyObject *
pyhttp_file_get_name(struct pyhttp_file *pyfile, void *closure)
{
PyObject *name;
if ((name = PyUnicode_FromString(pyfile->file->name)) == NULL)
return (PyErr_NoMemory());
return (name);
}
static PyObject *
pyhttp_file_get_filename(struct pyhttp_file *pyfile, void *closure)
{
PyObject *name;
if ((name = PyUnicode_FromString(pyfile->file->filename)) == NULL)
return (PyErr_NoMemory());
return (name);
}
void
pyroute_dealloc(struct pyroute *route)
{
kore_free(route->path);
Py_XDECREF(route->func);
Py_XDECREF(route->kwargs);
PyObject_Del((PyObject *)route);
}
static PyObject *
pyroute_inner(struct pyroute *route, PyObject *args)
{
PyObject *obj;
if (!PyArg_ParseTuple(args, "O", &obj))
return (NULL);
if (!PyCallable_Check(obj))
return (NULL);
route->func = obj;
Py_INCREF(route->func);
TAILQ_INSERT_TAIL(&routes, route, list);
return (route->func);
}
void
pydomain_dealloc(struct pydomain *domain)
{
PyObject_Del((PyObject *)domain);
}
static int
pydomain_set_accesslog(struct pydomain *domain, PyObject *arg, void *closure)
{
const char *path;
if (!PyUnicode_CheckExact(arg))
return (-1);
if (domain->config->accesslog != -1) {
PyErr_Format(PyExc_RuntimeError,
"domain %s accesslog already set", domain->config->domain);
return (-1);
}
path = PyUnicode_AsUTF8(arg);
domain->config->accesslog = open(path,
O_CREAT | O_APPEND | O_WRONLY,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (domain->config->accesslog == -1) {
PyErr_Format(PyExc_RuntimeError,
"failed to open accesslog for %s (%s:%s)",
domain->config->domain, path, errno_s);
return (-1);
}
return (0);
}
static PyObject *
pydomain_filemaps(struct pydomain *domain, PyObject *args)
{
Py_ssize_t idx;
struct kore_route *rt;
const char *url, *path;
PyObject *dict, *key, *value, *auth;
if (!PyArg_ParseTuple(args, "O", &dict))
return (NULL);
if (!PyDict_CheckExact(dict)) {
PyErr_SetString(PyExc_RuntimeError, "filemaps not a dict");
return (NULL);
}
idx = 0;
while (PyDict_Next(dict, &idx, &key, &value)) {
if (!PyUnicode_CheckExact(key)) {
PyErr_SetString(PyExc_RuntimeError,
"filemap key not a string");
return (NULL);
}
url = PyUnicode_AsUTF8(key);
if (!PyUnicode_CheckExact(value) &&
!PyTuple_CheckExact(value)) {
PyErr_SetString(PyExc_RuntimeError,
"filemap value can be either be a string or tuple");
return (NULL);
}
if (PyTuple_CheckExact(value)) {
auth = PyTuple_GetItem(value, 1);
if (!PyDict_CheckExact(auth)) {
PyErr_SetString(PyExc_RuntimeError,
"filemap value tuple auth is not a dict");
return (NULL);
}
value = PyTuple_GetItem(value, 0);
if (!PyUnicode_CheckExact(value)) {
PyErr_SetString(PyExc_RuntimeError,
"filemap value tuple path is invalid");
return (NULL);
}
} else {
auth = NULL;
}
path = PyUnicode_AsUTF8(value);
rt = kore_filemap_create(domain->config, path, url, NULL);
if (rt == NULL) {
PyErr_Format(PyExc_RuntimeError,
"failed to create filemap %s->%s for %s",
url, path, domain->config->domain);
return (NULL);
}
if (auth != NULL) {
if (!python_route_auth(auth, rt)) {
kore_python_log_error("python_route_auth");
kore_route_free(rt);
return (KORE_RESULT_ERROR);
}
}
}
Py_RETURN_NONE;
}
static PyObject *
pydomain_redirect(struct pydomain *domain, PyObject *args)
{
int status;
const char *src, *dst;
if (!PyArg_ParseTuple(args, "sis", &src, &status, &dst))
return (NULL);
if (!http_redirect_add(domain->config, src, status, dst)) {
fatal("failed to add redirect '%s' on '%s'",
src, domain->config->domain);
}
Py_RETURN_NONE;
}
static PyObject *
pydomain_route(struct pydomain *domain, PyObject *args, PyObject *kwargs)
{
PyObject *obj;
const char *path;
struct pyroute *route;
if (!PyArg_ParseTuple(args, "sO", &path, &obj))
return (NULL);
if (!PyCallable_Check(obj))
return (NULL);
if ((route = PyObject_New(struct pyroute, &pyroute_type)) == NULL)
return (NULL);
route->kwargs = kwargs;
route->domain = domain->config;
route->path = kore_strdup(path);
Py_XINCREF(route->kwargs);
route->func = obj;
Py_INCREF(route->func);
TAILQ_INSERT_TAIL(&routes, route, list);
Py_RETURN_NONE;
}
static int
python_route_install(struct pyroute *route)
{
const char *val;
struct kore_domain *domain;
struct kore_route *rt, *entry;
PyObject *kwargs, *repr, *obj;
if ((repr = PyObject_Repr(route->func)) == NULL) {
kore_python_log_error("python_route_install");
return (KORE_RESULT_ERROR);
}
domain = python_route_domain_resolve(route);
rt = kore_calloc(1, sizeof(*rt));
rt->dom = domain;
rt->methods = HTTP_METHOD_ALL;
rt->path = kore_strdup(route->path);
TAILQ_INIT(&rt->params);
val = PyUnicode_AsUTF8(repr);
rt->func = kore_strdup(val);
kwargs = route->kwargs;
rt->rcall = kore_calloc(1, sizeof(struct kore_runtime_call));
rt->rcall->addr = route->func;
rt->rcall->runtime = &kore_python_runtime;
Py_INCREF(rt->rcall->addr);
if (kwargs != NULL) {
if ((obj = PyDict_GetItemString(kwargs, "methods")) != NULL) {
if (!python_route_methods(obj, kwargs, rt)) {
kore_python_log_error("python_route_install");
kore_route_free(rt);
return (KORE_RESULT_ERROR);
}
}
if ((obj = PyDict_GetItemString(kwargs, "auth")) != NULL) {
if (!python_route_auth(obj, rt)) {
kore_python_log_error("python_route_install");
kore_route_free(rt);
return (KORE_RESULT_ERROR);
}
}
if ((obj = PyDict_GetItemString(kwargs, "hooks")) != NULL) {
if (!python_route_hooks(obj, rt)) {
kore_python_log_error("python_route_install");
kore_route_free(rt);
return (KORE_RESULT_ERROR);
}
}
}
if (rt->path[0] == '/') {
rt->type = HANDLER_TYPE_STATIC;
} else {
rt->type = HANDLER_TYPE_DYNAMIC;
if (regcomp(&rt->rctx, rt->path, REG_EXTENDED))
fatal("failed to compile regex for '%s'", rt->path);
}
TAILQ_FOREACH(entry, &domain->routes, list) {
if (!strcmp(entry->path, rt->path) &&
(entry->methods & rt->methods))
fatal("duplicate route for '%s'", route->path);
}
TAILQ_INSERT_TAIL(&domain->routes, rt, list);
return (KORE_RESULT_OK);
}
static struct kore_domain *
python_route_domain_resolve(struct pyroute *route)
{
struct kore_server *srv;
const char *name;
struct kore_domain *domain;
if (route->domain != NULL)
return (route->domain);
if (route->kwargs != NULL)
name = python_string_from_dict(route->kwargs, "domain");
else
name = NULL;
if (name != NULL) {
domain = NULL;
LIST_FOREACH(srv, &kore_servers, list) {
TAILQ_FOREACH(domain, &srv->domains, list) {
if (!strcmp(domain->domain, name))
break;
}
}
if (domain == NULL)
fatal("domain '%s' does not exist", name);
} else {
if ((domain = kore_domain_byid(1)) != NULL)
fatal("ambiguous domain on route, please specify one");
if ((domain = kore_domain_byid(0)) == NULL)
fatal("no domains configured, please configure one");
}
return (domain);
}
static int
python_route_methods(PyObject *obj, PyObject *kwargs, struct kore_route *rt)
{
const char *val;
PyObject *item;
int method;
Py_ssize_t list_len, idx;
if (!PyList_CheckExact(obj)) {
PyErr_SetString(PyExc_RuntimeError, "methods not a list");
return (KORE_RESULT_ERROR);
}
rt->methods = 0;
list_len = PyList_Size(obj);
for (idx = 0; idx < list_len; idx++) {
if ((item = PyList_GetItem(obj, idx)) == NULL)
return (KORE_RESULT_ERROR);
if ((val = PyUnicode_AsUTF8(item)) == NULL)
return (KORE_RESULT_ERROR);
if ((method = http_method_value(val)) == 0) {
PyErr_Format(PyExc_RuntimeError,
"unknown HTTP method: %s", val);
return (KORE_RESULT_ERROR);
}
rt->methods |= method;
if (method == HTTP_METHOD_GET)
rt->methods |= HTTP_METHOD_HEAD;
if (!python_route_params(kwargs, rt, val, method, 0))
return (KORE_RESULT_ERROR);
if (!python_route_params(kwargs, rt, "qs", method, 1))
return (KORE_RESULT_ERROR);
}
return (KORE_RESULT_OK);
}
static int
python_route_params(PyObject *kwargs, struct kore_route *rt,
const char *method, int type, int qs)
{
Py_ssize_t idx;
const char *val;
int vtype;
struct kore_validator *vldr;
struct kore_route_params *param;
PyObject *obj, *key, *item;
if ((obj = PyDict_GetItemString(kwargs, method)) == NULL)
return (KORE_RESULT_OK);
if (!PyDict_CheckExact(obj))
return (KORE_RESULT_ERROR);
idx = 0;
while (PyDict_Next(obj, &idx, &key, &item)) {
if (!PyUnicode_CheckExact(key))
return (KORE_RESULT_ERROR);
val = PyUnicode_AsUTF8(key);
if (PyUnicode_CheckExact(item)) {
vtype = KORE_VALIDATOR_TYPE_REGEX;
} else if (PyCallable_Check(item)) {
vtype = KORE_VALIDATOR_TYPE_FUNCTION;
} else {
PyErr_Format(PyExc_RuntimeError,
"validator '%s' must be regex or function", val);
return (KORE_RESULT_ERROR);
}
vldr = kore_calloc(1, sizeof(*vldr));
vldr->type = vtype;
if (vtype == KORE_VALIDATOR_TYPE_REGEX) {
val = PyUnicode_AsUTF8(item);
if (regcomp(&(vldr->rctx),
val, REG_EXTENDED | REG_NOSUB)) {
PyErr_Format(PyExc_RuntimeError,
"Invalid regex (%s)", val);
kore_free(vldr);
return (KORE_RESULT_ERROR);
}
} else {
vldr->rcall = kore_calloc(1, sizeof(*vldr->rcall));
vldr->rcall->addr = item;
vldr->rcall->runtime = &kore_python_runtime;
Py_INCREF(item);
}
val = PyUnicode_AsUTF8(key);
vldr->name = kore_strdup(val);
param = kore_calloc(1, sizeof(*param));
param->flags = 0;
param->method = type;
param->validator = vldr;
param->name = kore_strdup(val);
if (type == HTTP_METHOD_GET || qs == 1)
param->flags = KORE_PARAMS_QUERY_STRING;
TAILQ_INSERT_TAIL(&rt->params, param, list);
}
return (KORE_RESULT_OK);
}
static int
python_route_auth(PyObject *dict, struct kore_route *rt)
{
int type;
struct kore_auth *auth;
struct kore_validator *vldr;
PyObject *obj, *repr;
const char *value, *redir;
if (!PyDict_CheckExact(dict))
return (KORE_RESULT_ERROR);
if ((value = python_string_from_dict(dict, "type")) == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"missing or invalid 'type' keyword");
return (KORE_RESULT_ERROR);
}
if (!strcmp(value, "cookie")) {
type = KORE_AUTH_TYPE_COOKIE;
} else if (!strcmp(value, "header")) {
type = KORE_AUTH_TYPE_HEADER;
} else {
PyErr_Format(PyExc_RuntimeError,
"invalid 'type' (%s) in auth dictionary for '%s'",
value, rt->path);
return (KORE_RESULT_ERROR);
}
if ((value = python_string_from_dict(dict, "value")) == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"missing or invalid 'value' keyword");
return (KORE_RESULT_ERROR);
}
redir = python_string_from_dict(dict, "redirect");
if ((obj = PyDict_GetItemString(dict, "verify")) == NULL ||
!PyCallable_Check(obj)) {
PyErr_Format(PyExc_RuntimeError,
"missing 'verify' in auth dictionary for '%s'", rt->path);
return (KORE_RESULT_ERROR);
}
auth = kore_calloc(1, sizeof(*auth));
auth->type = type;
auth->value = kore_strdup(value);
if (redir != NULL)
auth->redirect = kore_strdup(redir);
vldr = kore_calloc(1, sizeof(*vldr));
vldr->type = KORE_VALIDATOR_TYPE_FUNCTION;
vldr->rcall = kore_calloc(1, sizeof(*vldr->rcall));
vldr->rcall->addr = obj;
vldr->rcall->runtime = &kore_python_runtime;
Py_INCREF(obj);
if ((repr = PyObject_Repr(obj)) == NULL) {
kore_free(vldr->rcall);
kore_free(vldr);
kore_free(auth);
return (KORE_RESULT_ERROR);
}
value = PyUnicode_AsUTF8(repr);
vldr->name = kore_strdup(value);
Py_DECREF(repr);
auth->validator = vldr;
rt->auth = auth;
return (KORE_RESULT_OK);
}
static int
python_route_hooks(PyObject *dict, struct kore_route *rt)
{
if (!PyDict_CheckExact(dict))
return (KORE_RESULT_ERROR);
if (!python_route_hook_set(dict, "on_free", &rt->on_free))
return (KORE_RESULT_ERROR);
if (!python_route_hook_set(dict, "on_headers", &rt->on_headers))
return (KORE_RESULT_ERROR);
if (!python_route_hook_set(dict, "on_body_chunk", &rt->on_body_chunk))
return (KORE_RESULT_ERROR);
return (KORE_RESULT_OK);
}
static int
python_route_hook_set(PyObject *dict, const char *name,
struct kore_runtime_call **out)
{
PyObject *obj;
struct kore_runtime_call *rcall;
if ((obj = PyDict_GetItemString(dict, name)) == NULL)
return (KORE_RESULT_OK);
if (!PyCallable_Check(obj)) {
PyErr_Format(PyExc_RuntimeError,
"%s for a route not callable", name);
Py_DECREF(obj);
return (KORE_RESULT_ERROR);
}
rcall = kore_calloc(1, sizeof(struct kore_runtime_call));
rcall->addr = obj;
rcall->runtime = &kore_python_runtime;
Py_INCREF(rcall->addr);
*out = rcall;
return (KORE_RESULT_OK);
}
#if defined(KORE_USE_PGSQL)
static PyObject *
python_kore_pgsql_query(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct pykore_pgsql *op;
PyObject *obj;
const char *db, *query;
if (!PyArg_ParseTuple(args, "ss", &db, &query))
return (NULL);
op = PyObject_New(struct pykore_pgsql, &pykore_pgsql_type);
if (op == NULL)
return (NULL);
op->binary = 0;
op->param.count = 0;
op->param.objs = NULL;
op->param.values = NULL;
op->param.lengths = NULL;
op->param.formats = NULL;
op->result = NULL;
op->coro = coro_running;
op->db = kore_strdup(db);
op->query = kore_strdup(query);
op->state = PYKORE_PGSQL_PREINIT;
memset(&op->sql, 0, sizeof(op->sql));
if (kwargs != NULL) {
if ((obj = PyDict_GetItemString(kwargs, "params")) != NULL) {
if (!pykore_pgsql_params(op, obj)) {
Py_DECREF((PyObject *)op);
return (NULL);
}
}
if ((obj = PyDict_GetItemString(kwargs, "binary")) != NULL) {
if (obj == Py_True) {
op->binary = 1;
} else if (obj == Py_False) {
op->binary = 0;
} else {
Py_DECREF((PyObject *)op);
PyErr_SetString(PyExc_RuntimeError,
"pgsql: binary not True or False");
return (NULL);
}
}
}
return ((PyObject *)op);
}
static int
pykore_pgsql_params(struct pykore_pgsql *op, PyObject *list)
{
union { const char *cp; char *p; } ptr;
PyObject *item;
int format;
Py_ssize_t i, len, vlen;
if (!PyList_CheckExact(list)) {
if (list == Py_None)
return (KORE_RESULT_OK);
PyErr_SetString(PyExc_RuntimeError,
"pgsql: params keyword must be a list");
return (KORE_RESULT_ERROR);
}
len = PyList_Size(list);
if (len == 0)
return (KORE_RESULT_OK);
if (len > INT_MAX) {
PyErr_SetString(PyExc_RuntimeError,
"pgsql: list length too large");
return (KORE_RESULT_ERROR);
}
op->param.count = len;
op->param.lengths = kore_calloc(len, sizeof(int));
op->param.formats = kore_calloc(len, sizeof(int));
op->param.values = kore_calloc(len, sizeof(char *));
op->param.objs = kore_calloc(len, sizeof(PyObject *));
for (i = 0; i < len; i++) {
if ((item = PyList_GetItem(list, i)) == NULL)
return (KORE_RESULT_ERROR);
if (PyUnicode_CheckExact(item)) {
format = 0;
ptr.cp = PyUnicode_AsUTF8AndSize(item, &vlen);
} else if (PyBytes_CheckExact(item)) {
format = 1;
if (PyBytes_AsStringAndSize(item, &ptr.p, &vlen) == -1)
ptr.p = NULL;
} else {
PyErr_Format(PyExc_RuntimeError,
"pgsql: item %zu is not a string or bytes", i);
return (KORE_RESULT_ERROR);
}
if (ptr.cp == NULL)
return (KORE_RESULT_ERROR);
op->param.lengths[i] = vlen;
op->param.values[i] = ptr.cp;
op->param.formats[i] = format;
/* Hold on to it since we are directly referencing its data. */
op->param.objs[i] = item;
Py_INCREF(item);
}
return (KORE_RESULT_OK);
}
static void
pykore_pgsql_dealloc(struct pykore_pgsql *pysql)
{
Py_ssize_t i;
kore_free(pysql->db);
kore_free(pysql->query);
kore_pgsql_cleanup(&pysql->sql);
if (pysql->result != NULL)
Py_DECREF(pysql->result);
for (i = 0; i < pysql->param.count; i++)
Py_XDECREF(pysql->param.objs[i]);
kore_free(pysql->param.objs);
kore_free(pysql->param.values);
kore_free(pysql->param.lengths);
kore_free(pysql->param.formats);
PyObject_Del((PyObject *)pysql);
}
static PyObject *
pykore_pgsql_iternext(struct pykore_pgsql *pysql)
{
switch (pysql->state) {
case PYKORE_PGSQL_PREINIT:
kore_pgsql_init(&pysql->sql);
kore_pgsql_bind_callback(&pysql->sql,
pykore_pgsql_callback, pysql);
pysql->state = PYKORE_PGSQL_INITIALIZE;
/* fallthrough */
case PYKORE_PGSQL_INITIALIZE:
if (!kore_pgsql_setup(&pysql->sql, pysql->db,
KORE_PGSQL_ASYNC)) {
if (pysql->sql.state == KORE_PGSQL_STATE_INIT)
break;
PyErr_Format(PyExc_RuntimeError, "pgsql error: %s",
pysql->sql.error);
return (NULL);
}
/* fallthrough */
case PYKORE_PGSQL_QUERY:
if (pysql->param.count > 0) {
if (!kore_pgsql_query_param_fields(&pysql->sql,
pysql->query, pysql->binary,
pysql->param.count, pysql->param.values,
pysql->param.lengths, pysql->param.formats)) {
PyErr_Format(PyExc_RuntimeError,
"pgsql error: %s", pysql->sql.error);
return (NULL);
}
} else {
if (!kore_pgsql_query(&pysql->sql, pysql->query)) {
PyErr_Format(PyExc_RuntimeError,
"pgsql error: %s", pysql->sql.error);
return (NULL);
}
}
pysql->state = PYKORE_PGSQL_WAIT;
break;
wait_again:
case PYKORE_PGSQL_WAIT:
switch (pysql->sql.state) {
case KORE_PGSQL_STATE_WAIT:
break;
case KORE_PGSQL_STATE_COMPLETE:
PyErr_SetNone(PyExc_StopIteration);
if (pysql->result != NULL) {
PyErr_SetObject(PyExc_StopIteration,
pysql->result);
Py_DECREF(pysql->result);
pysql->result = NULL;
} else {
PyErr_SetObject(PyExc_StopIteration, Py_None);
}
return (NULL);
case KORE_PGSQL_STATE_ERROR:
PyErr_Format(PyExc_RuntimeError,
"failed to perform query: %s", pysql->sql.error);
return (NULL);
case KORE_PGSQL_STATE_RESULT:
if (!pykore_pgsql_result(pysql))
return (NULL);
goto wait_again;
default:
kore_pgsql_continue(&pysql->sql);
goto wait_again;
}
break;
default:
PyErr_SetString(PyExc_RuntimeError, "bad pykore_pgsql state");
return (NULL);
}
/* tell caller to wait. */
Py_RETURN_NONE;
}
static void
pykore_pgsql_callback(struct kore_pgsql *pgsql, void *arg)
{
struct pykore_pgsql *op = arg;
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
}
static PyObject *
pykore_pgsql_await(PyObject *obj)
{
Py_INCREF(obj);
return (obj);
}
static int
pykore_pgsql_result(struct pykore_pgsql *pysql)
{
const char *val;
char key[64];
PyObject *list, *pyrow, *pyval;
int rows, row, field, fields, len;
if ((list = PyList_New(0)) == NULL) {
PyErr_SetNone(PyExc_MemoryError);
return (KORE_RESULT_ERROR);
}
rows = kore_pgsql_ntuples(&pysql->sql);
fields = kore_pgsql_nfields(&pysql->sql);
for (row = 0; row < rows; row++) {
if ((pyrow = PyDict_New()) == NULL) {
Py_DECREF(list);
PyErr_SetNone(PyExc_MemoryError);
return (KORE_RESULT_ERROR);
}
for (field = 0; field < fields; field++) {
val = kore_pgsql_getvalue(&pysql->sql, row, field);
len = kore_pgsql_getlength(&pysql->sql, row, field);
if (kore_pgsql_column_binary(&pysql->sql, field)) {
pyval = PyBytes_FromStringAndSize(val, len);
} else {
pyval = PyUnicode_FromString(val);
}
if (pyval == NULL) {
Py_DECREF(pyrow);
Py_DECREF(list);
PyErr_SetNone(PyExc_MemoryError);
return (KORE_RESULT_ERROR);
}
(void)snprintf(key, sizeof(key), "%s",
kore_pgsql_fieldname(&pysql->sql, field));
if (PyDict_SetItemString(pyrow, key, pyval) == -1) {
Py_DECREF(pyval);
Py_DECREF(pyrow);
Py_DECREF(list);
PyErr_SetString(PyExc_RuntimeError,
"failed to add new value to row");
return (KORE_RESULT_ERROR);
}
Py_DECREF(pyval);
}
if (PyList_Insert(list, row, pyrow) == -1) {
Py_DECREF(pyrow);
Py_DECREF(list);
PyErr_SetString(PyExc_RuntimeError,
"failed to add new row to list");
return (KORE_RESULT_ERROR);
}
Py_DECREF(pyrow);
}
pysql->result = list;
kore_pgsql_continue(&pysql->sql);
return (KORE_RESULT_OK);
}
#endif
#if defined(KORE_USE_CURL)
static PyObject *
python_curlopt_set(struct pycurl_data *data, long opt, PyObject *value)
{
int i;
for (i = 0; py_curlopt[i].name != NULL; i++) {
if (py_curlopt[i].value == opt)
break;
}
if (py_curlopt[i].name == NULL) {
2021-08-27 10:06:45 +02:00
PyErr_Format(PyExc_RuntimeError, "invalid option '%ld'", opt);
return (NULL);
}
if (py_curlopt[i].cb == NULL) {
PyErr_Format(PyExc_RuntimeError, "option '%s' not implemented",
py_curlopt[i].name);
return (NULL);
}
return (py_curlopt[i].cb(data, i, value));
}
static int
python_curlopt_from_dict(struct pycurl_data *data, PyObject *dict)
{
long opt;
Py_ssize_t idx;
PyObject *key, *value, *obj;
idx = 0;
if (!PyDict_CheckExact(dict)) {
PyErr_SetString(PyExc_RuntimeError,
"curlopt must be a dictionary");
return (KORE_RESULT_ERROR);
}
while (PyDict_Next(dict, &idx, &key, &value)) {
if (!PyLong_CheckExact(key)) {
PyErr_Format(PyExc_RuntimeError,
"invalid key in curlopt keyword");
return (KORE_RESULT_ERROR);
}
opt = PyLong_AsLong(key);
if ((obj = python_curlopt_set(data, opt, value)) == NULL)
return (KORE_RESULT_ERROR);
Py_DECREF(obj);
}
return (KORE_RESULT_OK);
}
static PyObject *
python_kore_curl_handle(PyObject *self, PyObject *args)
{
const char *url;
struct pycurl_handle *handle;
if (!PyArg_ParseTuple(args, "s", &url))
return (NULL);
handle = PyObject_New(struct pycurl_handle, &pycurl_handle_type);
if (handle == NULL)
return (NULL);
handle->url = kore_strdup(url);
memset(&handle->data.curl, 0, sizeof(handle->data.curl));
handle->body = NULL;
LIST_INIT(&handle->data.slists);
if (!kore_curl_init(&handle->data.curl, handle->url, KORE_CURL_ASYNC)) {
Py_DECREF((PyObject *)handle);
PyErr_SetString(PyExc_RuntimeError, "failed to setup call");
return (NULL);
}
return ((PyObject *)handle);
}
static void
pycurl_handle_dealloc(struct pycurl_handle *handle)
{
struct pycurl_slist *psl;
while ((psl = LIST_FIRST(&handle->data.slists))) {
LIST_REMOVE(psl, list);
curl_slist_free_all(psl->slist);
kore_free(psl);
}
if (handle->body != NULL)
kore_buf_free(handle->body);
kore_free(handle->url);
kore_curl_cleanup(&handle->data.curl);
PyObject_Del((PyObject *)handle);
}
static PyObject *
pycurl_handle_setbody(struct pycurl_handle *handle, PyObject *args)
{
PyObject *obj;
char *ptr;
Py_ssize_t length;
if (!PyArg_ParseTuple(args, "O", &obj))
return (NULL);
if (handle->body != NULL) {
PyErr_SetString(PyExc_RuntimeError,
"curl handle already has body attached");
return (NULL);
}
if (!PyBytes_CheckExact(obj)) {
PyErr_SetString(PyExc_RuntimeError,
"curl.setbody expects bytes");
return (NULL);
}
if (PyBytes_AsStringAndSize(obj, &ptr, &length) == -1)
return (NULL);
if (length < 0) {
PyErr_SetString(PyExc_TypeError, "invalid length");
return (NULL);
}
handle->body = kore_buf_alloc(length);
kore_buf_append(handle->body, ptr, length);
kore_buf_reset(handle->body);
curl_easy_setopt(handle->data.curl.handle,
CURLOPT_READFUNCTION, kore_curl_frombuf);
curl_easy_setopt(handle->data.curl.handle,
CURLOPT_READDATA, handle->body);
curl_easy_setopt(handle->data.curl.handle, CURLOPT_UPLOAD, 1);
Py_RETURN_TRUE;
}
static PyObject *
pycurl_handle_setopt(struct pycurl_handle *handle, PyObject *args)
{
int opt;
PyObject *value;
if (!PyArg_ParseTuple(args, "iO", &opt, &value))
return (NULL);
return (python_curlopt_set(&handle->data, opt, value));
}
static PyObject *
pycurl_handle_setopt_string(struct pycurl_data *data, int idx, PyObject *obj)
{
const char *str;
CURLoption option;
if (!PyUnicode_Check(obj)) {
PyErr_Format(PyExc_RuntimeError,
"option '%s' requires a string as argument",
py_curlopt[idx].name);
return (NULL);
}
if ((str = PyUnicode_AsUTF8(obj)) == NULL)
return (NULL);
option = CURLOPTTYPE_OBJECTPOINT + py_curlopt[idx].value;
curl_easy_setopt(data->curl.handle, option, str);
Py_RETURN_TRUE;
}
static PyObject *
pycurl_handle_setopt_long(struct pycurl_data *data, int idx, PyObject *obj)
{
long val;
2022-12-28 16:05:14 +01:00
CURLoption option;
if (!PyLong_CheckExact(obj)) {
PyErr_Format(PyExc_RuntimeError,
"option '%s' requires a long as argument",
py_curlopt[idx].name);
return (NULL);
}
PyErr_Clear();
val = PyLong_AsLong(obj);
if (val == -1 && PyErr_Occurred())
return (NULL);
2022-12-28 16:05:14 +01:00
option = CURLOPTTYPE_LONG + py_curlopt[idx].value;
curl_easy_setopt(data->curl.handle, option, val);
Py_RETURN_TRUE;
}
static PyObject *
pycurl_handle_setopt_slist(struct pycurl_data *data, int idx, PyObject *obj)
{
struct pycurl_slist *psl;
PyObject *item;
const char *sval;
struct curl_slist *slist;
2022-12-28 16:05:14 +01:00
CURLoption option;
Py_ssize_t list_len, i;
if (!PyList_CheckExact(obj)) {
PyErr_Format(PyExc_RuntimeError,
"option '%s' requires a list as argument",
py_curlopt[idx].name);
return (NULL);
}
slist = NULL;
list_len = PyList_Size(obj);
for (i = 0; i < list_len; i++) {
if ((item = PyList_GetItem(obj, i)) == NULL)
return (NULL);
if (!PyUnicode_Check(item))
return (NULL);
if ((sval = PyUnicode_AsUTF8AndSize(item, NULL)) == NULL)
return (NULL);
if ((slist = curl_slist_append(slist, sval)) == NULL)
fatal("%s: curl_slist_append failed", __func__);
}
psl = kore_calloc(1, sizeof(*psl));
psl->slist = slist;
LIST_INSERT_HEAD(&data->slists, psl, list);
2022-12-28 16:05:14 +01:00
option = CURLOPTTYPE_OBJECTPOINT + py_curlopt[idx].value;
curl_easy_setopt(data->curl.handle, option, slist);
Py_RETURN_TRUE;
}
static PyObject *
pycurl_handle_run(struct pycurl_handle *handle, PyObject *args)
{
struct pycurl_handle_op *op;
op = PyObject_New(struct pycurl_handle_op, &pycurl_handle_op_type);
if (op == NULL)
return (NULL);
Py_INCREF(handle);
op->handle = handle;
op->coro = coro_running;
op->state = CURL_CLIENT_OP_RUN;
kore_curl_bind_callback(&handle->data.curl,
python_curl_handle_callback, op);
return ((PyObject *)op);
}
static void
pycurl_handle_op_dealloc(struct pycurl_handle_op *op)
{
Py_DECREF(op->handle);
PyObject_Del((PyObject *)op);
}
static PyObject *
pycurl_handle_op_await(PyObject *op)
{
Py_INCREF(op);
return (op);
}
static PyObject *
pycurl_handle_op_iternext(struct pycurl_handle_op *op)
{
size_t len;
PyObject *result;
const u_int8_t *response;
if (op->state == CURL_CLIENT_OP_RUN) {
kore_curl_run(&op->handle->data.curl);
op->state = CURL_CLIENT_OP_RESULT;
Py_RETURN_NONE;
}
if (op->handle->body != NULL) {
kore_buf_free(op->handle->body);
op->handle->body = NULL;
}
if (!kore_curl_success(&op->handle->data.curl)) {
/* Do not log the url here, may contain some sensitive data. */
PyErr_Format(PyExc_RuntimeError, "request failed: %s",
kore_curl_strerror(&op->handle->data.curl));
return (NULL);
}
kore_curl_response_as_bytes(&op->handle->data.curl, &response, &len);
if ((result = PyBytes_FromStringAndSize((const char *)response,
len)) == NULL)
return (NULL);
PyErr_SetObject(PyExc_StopIteration, result);
Py_DECREF(result);
return (NULL);
}
static PyObject *
python_kore_httpclient(PyObject *self, PyObject *args, PyObject *kwargs)
{
struct pyhttp_client *client;
const char *url, *v;
if (!PyArg_ParseTuple(args, "s", &url))
return (NULL);
client = PyObject_New(struct pyhttp_client, &pyhttp_client_type);
if (client == NULL)
return (NULL);
client->unix = NULL;
client->tlskey = NULL;
client->curlopt = NULL;
client->tlscert = NULL;
client->cabundle = NULL;
client->tlsverify = 1;
client->url = kore_strdup(url);
if (kwargs != NULL) {
if ((v = python_string_from_dict(kwargs, "tlscert")) != NULL)
client->tlscert = kore_strdup(v);
if ((v = python_string_from_dict(kwargs, "tlskey")) != NULL)
client->tlskey = kore_strdup(v);
if ((v = python_string_from_dict(kwargs, "cabundle")) != NULL)
client->cabundle = kore_strdup(v);
if ((v = python_string_from_dict(kwargs, "unix")) != NULL)
client->unix = kore_strdup(v);
client->curlopt = PyDict_GetItemString(kwargs, "curlopt");
Py_XINCREF(client->curlopt);
python_bool_from_dict(kwargs, "tlsverify", &client->tlsverify);
}
if ((client->tlscert != NULL && client->tlskey == NULL) ||
(client->tlskey != NULL && client->tlscert == NULL)) {
Py_DECREF((PyObject *)client);
PyErr_SetString(PyExc_RuntimeError,
"invalid TLS client configuration");
return (NULL);
}
return ((PyObject *)client);
}
static void
pyhttp_client_dealloc(struct pyhttp_client *client)
{
kore_free(client->url);
kore_free(client->unix);
kore_free(client->tlskey);
kore_free(client->tlscert);
kore_free(client->cabundle);
Py_XDECREF(client->curlopt);
PyObject_Del((PyObject *)client);
}
static PyObject *
pyhttp_client_get(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_GET, kwargs));
}
static PyObject *
pyhttp_client_put(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_PUT, kwargs));
}
static PyObject *
pyhttp_client_post(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_POST, kwargs));
}
static PyObject *
pyhttp_client_head(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_HEAD, kwargs));
}
static PyObject *
pyhttp_client_patch(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_PATCH, kwargs));
}
static PyObject *
pyhttp_client_delete(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_DELETE, kwargs));
}
static PyObject *
pyhttp_client_options(struct pyhttp_client *client, PyObject *args,
PyObject *kwargs)
{
return (pyhttp_client_request(client, HTTP_METHOD_OPTIONS, kwargs));
}
static PyObject *
pyhttp_client_request(struct pyhttp_client *client, int m, PyObject *kwargs)
{
struct pyhttp_client_op *op;
char *ptr;
const char *k, *v;
Py_ssize_t length, idx;
PyObject *data, *headers, *key, *obj;
ptr = NULL;
length = 0;
headers = NULL;
if (kwargs != NULL &&
((headers = PyDict_GetItemString(kwargs, "headers")) != NULL)) {
if (!PyDict_CheckExact(headers)) {
PyErr_SetString(PyExc_RuntimeError,
"headers keyword must be a dict");
return (NULL);
}
}
switch (m) {
case HTTP_METHOD_GET:
case HTTP_METHOD_HEAD:
case HTTP_METHOD_OPTIONS:
break;
case HTTP_METHOD_PUT:
case HTTP_METHOD_POST:
case HTTP_METHOD_PATCH:
case HTTP_METHOD_DELETE:
length = -1;
if (kwargs == NULL) {
if (m == HTTP_METHOD_DELETE) {
length = 0;
break;
}
PyErr_Format(PyExc_RuntimeError,
"no keyword arguments given, but body expected ",
http_method_text(m));
return (NULL);
}
if ((data = PyDict_GetItemString(kwargs, "body")) == NULL)
return (NULL);
if (PyBytes_AsStringAndSize(data, &ptr, &length) == -1)
return (NULL);
if (length < 0) {
PyErr_SetString(PyExc_TypeError, "invalid length");
return (NULL);
}
break;
default:
fatal("%s: unknown method %d", __func__, m);
}
2021-08-27 10:12:11 +02:00
op = PyObject_New(struct pyhttp_client_op, &pyhttp_client_op_type);
if (op == NULL)
return (NULL);
if (!kore_curl_init(&op->data.curl, client->url, KORE_CURL_ASYNC)) {
Py_DECREF((PyObject *)op);
PyErr_SetString(PyExc_RuntimeError, "failed to setup call");
return (NULL);
}
op->headers = 0;
op->coro = coro_running;
op->state = CURL_CLIENT_OP_RUN;
LIST_INIT(&op->data.slists);
Py_INCREF(client);
op->client = client;
kore_curl_http_setup(&op->data.curl, m, ptr, length);
kore_curl_bind_callback(&op->data.curl, python_curl_http_callback, op);
/* Go in with our own bare hands. */
if (client->unix != NULL) {
#if defined(__linux__)
if (client->unix[0] == '@') {
curl_easy_setopt(op->data.curl.handle,
CURLOPT_ABSTRACT_UNIX_SOCKET, client->unix + 1);
} else {
curl_easy_setopt(op->data.curl.handle,
CURLOPT_UNIX_SOCKET_PATH, client->unix);
}
#else
curl_easy_setopt(op->data.curl.handle, CURLOPT_UNIX_SOCKET_PATH,
client->unix);
#endif
}
if (client->tlskey != NULL && client->tlscert != NULL) {
curl_easy_setopt(op->data.curl.handle, CURLOPT_SSLCERT,
client->tlscert);
curl_easy_setopt(op->data.curl.handle, CURLOPT_SSLKEY,
client->tlskey);
}
if (client->tlsverify == 0) {
curl_easy_setopt(op->data.curl.handle,
CURLOPT_SSL_VERIFYHOST, 0);
curl_easy_setopt(op->data.curl.handle,
CURLOPT_SSL_VERIFYPEER, 0);
}
if (client->curlopt != NULL) {
if (!python_curlopt_from_dict(&op->data, client->curlopt)) {
Py_DECREF((PyObject *)op);
return (NULL);
}
}
if (client->cabundle != NULL) {
curl_easy_setopt(op->data.curl.handle, CURLOPT_CAINFO,
client->cabundle);
}
if (headers != NULL) {
idx = 0;
while (PyDict_Next(headers, &idx, &key, &obj)) {
if ((k = PyUnicode_AsUTF8(key)) == NULL) {
Py_DECREF((PyObject *)op);
return (NULL);
}
if ((v = PyUnicode_AsUTF8(obj)) == NULL) {
Py_DECREF((PyObject *)op);
return (NULL);
}
kore_curl_http_set_header(&op->data.curl, k, v);
}
}
if (kwargs != NULL) {
if ((obj = PyDict_GetItemString(kwargs, "curlopt")) != NULL) {
if (!python_curlopt_from_dict(&op->data, obj)) {
Py_DECREF((PyObject *)op);
return (NULL);
}
}
python_bool_from_dict(kwargs, "return_headers", &op->headers);
}
return ((PyObject *)op);
}
static void
pyhttp_client_op_dealloc(struct pyhttp_client_op *op)
{
struct pycurl_slist *psl;
while ((psl = LIST_FIRST(&op->data.slists))) {
LIST_REMOVE(psl, list);
curl_slist_free_all(psl->slist);
kore_free(psl);
}
Py_DECREF(op->client);
kore_curl_cleanup(&op->data.curl);
PyObject_Del((PyObject *)op);
}
static PyObject *
pyhttp_client_op_await(PyObject *op)
{
Py_INCREF(op);
return (op);
}
static PyObject *
pyhttp_client_op_iternext(struct pyhttp_client_op *op)
{
size_t len;
struct http_header *hdr;
const u_int8_t *response;
PyObject *result, *tuple, *dict, *value;
if (op->state == CURL_CLIENT_OP_RUN) {
kore_curl_run(&op->data.curl);
op->state = CURL_CLIENT_OP_RESULT;
Py_RETURN_NONE;
}
if (!kore_curl_success(&op->data.curl)) {
PyErr_Format(PyExc_RuntimeError, "request to '%s' failed: %s",
op->data.curl.url, kore_curl_strerror(&op->data.curl));
return (NULL);
}
kore_curl_response_as_bytes(&op->data.curl, &response, &len);
if (op->headers) {
kore_curl_http_parse_headers(&op->data.curl);
if ((dict = PyDict_New()) == NULL)
return (NULL);
TAILQ_FOREACH(hdr, &op->data.curl.http.resp_hdrs, list) {
value = PyUnicode_FromString(hdr->value);
if (value == NULL) {
Py_DECREF(dict);
return (NULL);
}
if (PyDict_SetItemString(dict,
hdr->header, value) == -1) {
Py_DECREF(dict);
Py_DECREF(value);
return (NULL);
}
Py_DECREF(value);
}
if ((tuple = Py_BuildValue("(iOy#)", op->data.curl.http.status,
dict, (const char *)response, len)) == NULL)
return (NULL);
Py_DECREF(dict);
} else {
if ((tuple = Py_BuildValue("(iy#)", op->data.curl.http.status,
(const char *)response, len)) == NULL)
return (NULL);
}
result = PyObject_CallFunctionObjArgs(PyExc_StopIteration, tuple, NULL);
if (result == NULL) {
Py_DECREF(tuple);
return (NULL);
}
Py_DECREF(tuple);
PyErr_SetObject(PyExc_StopIteration, result);
Py_DECREF(result);
return (NULL);
}
static void
python_curl_http_callback(struct kore_curl *curl, void *arg)
{
struct pyhttp_client_op *op = arg;
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
}
static void
python_curl_handle_callback(struct kore_curl *curl, void *arg)
{
struct pycurl_handle_op *op = arg;
if (op->coro->request != NULL)
http_request_wakeup(op->coro->request);
else
python_coro_wakeup(op->coro);
}
#endif