Rework HTTP and worker processes.

The HTTP layer used to make a copy of each incoming header and its
value for a request. Stop doing that and make HTTP headers zero-copy
all across the board.

This change comes with some api function changes, notably the
http_request_header() function which now takes a const char ** rather
than a char ** out pointer.

This commit also constifies several members of http_request, beware.

Additional rework how the worker processes deal with the accept lock.

Before:
	if a worker held the accept lock and it accepted a new connection
	it would release the lock for others and back off for 500ms before
	attempting to grab the lock again.

	This approach worked but under high load this starts becoming obvious.

Now:
	- workers not holding the accept lock and not having any connections
	  will wait less long before returning from kore_platform_event_wait().

	- workers not holding the accept lock will no longer blindly wait
	  an arbitrary amount in kore_platform_event_wait() but will look
	  at how long until the next lock grab is and base their timeout
	  on that.

	- if a worker its next_lock timeout is up and failed to grab the
	  lock it will try again in half the time again.

	- the worker process holding the lock will when releasing the lock
	  double check if it still has space for newer connections, if it does
	  it will keep the lock until it is full. This prevents the lock from
	  bouncing between several non busy worker processes all the time.

Additional fixes:

- Reduce the number of times we check the timeout list, only do it twice
  per second rather then every event tick.
- Fix solo worker count for TLS (we actually hold two processes, not one).
- Make sure we don't accidentally miscalculate the idle time causing new
  connections under heavy load to instantly drop.
- Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
This commit is contained in:
Joris Vink 2018-02-14 13:48:49 +01:00
parent f348feef82
commit dd2dff2318
32 changed files with 411 additions and 376 deletions

View File

@ -20,7 +20,7 @@ FEATURES_INC=
CFLAGS+=-Wall -Werror -Wstrict-prototypes -Wmissing-prototypes
CFLAGS+=-Wmissing-declarations -Wshadow -Wpointer-arith -Wcast-qual
CFLAGS+=-Wsign-compare -Iincludes -std=c99 -pedantic
CFLAGS+=-DPREFIX='"$(PREFIX)"'
CFLAGS+=-DPREFIX='"$(PREFIX)"' -fstack-protector-all
LDFLAGS=-rdynamic -lssl -lcrypto
ifneq ("$(KORE_SINGLE_BINARY)", "")
@ -37,7 +37,7 @@ endif
ifneq ("$(NOOPT)", "")
CFLAGS+=-O0
else
CFLAGS+=-O2
CFLAGS+=-O3
endif
ifneq ("$(NOHTTP)", "")
@ -95,7 +95,7 @@ ifeq ("$(OSNAME)", "darwin")
LDFLAGS+=-L/opt/local/lib -L/usr/local/opt/openssl/lib
S_SRC+=src/bsd.c
else ifeq ("$(OSNAME)", "linux")
CFLAGS+=-D_GNU_SOURCE=1
CFLAGS+=-D_GNU_SOURCE=1 -D_FORTIFY_SOURCE=2
LDFLAGS+=-ldl
S_SRC+=src/linux.c
else

View File

@ -2,22 +2,22 @@ About
-----
[![Build Status](https://travis-ci.org/jorisvink/kore.svg?branch=master)](https://travis-ci.org/jorisvink/kore)
Kore (https://kore.io) is an easy to use web application framework for
Kore (https://kore.io) is an easy to use web application platform for
writing scalable web APIs in C. Its main goals are security, scalability
and allowing rapid development and deployment of such APIs.
Because of this Kore is an ideal candidate for building robust, scalable and secure web things.
Features
--------
Key Features
------------
* Supports SNI
* Supports HTTP/1.1
* Websocket support
* Privseps by default
* TLS enabled by default
* Lightweight background tasks
* Optional background tasks
* Built-in parameter validation
* Built-in asynchronous PostgreSQL support
* Optional asynchronous PostgreSQL support
* Optional support for page handlers in Python
* Private keys isolated in separate process (RSA and ECDSA)
* Default sane TLS ciphersuites (PFS in all major browsers)
@ -25,6 +25,8 @@ Features
* Event driven (epoll/kqueue) architecture with per CPU worker processes
* Build your web application as a precompiled dynamic library or single binary
And loads more.
License
-------
* Kore is licensed under the ISC license
@ -32,20 +34,21 @@ License
Documentation
--------------
[Read the documentation](https://jorisvink.gitbooks.io/kore-doc/content/)
This documentation is severly outdated at this time.
Platforms supported
-------------------
* Linux
* OpenBSD
* FreeBSD
* OSX
* MacOS
Building Kore
-------------
Grab the [latest release](https://github.com/jorisvink/kore/releases/tag/2.0.0-release) tarball or clone the repository.
Requirements
* openssl (1.0.2k or 1.1.0e)
* openssl (1.0.2k+ or 1.1.0e+)
(note: this requirement drops away when building with NOTLS=1 NOHTTP=1)
(note: libressl should work as a replacement)

View File

@ -6,7 +6,7 @@ int page(struct http_request *);
int
page(struct http_request *req)
{
char *custom;
const char *custom;
/*
* We'll lookup if the X-Custom-Header is given in the request.

View File

@ -169,7 +169,7 @@ sse_disconnect(struct connection *c)
int
check_header(struct http_request *req, const char *name, const char *value)
{
char *hdr;
const char *hdr;
if (!http_request_header(req, name, &hdr)) {
http_response(req, 400, NULL, 0);

View File

@ -172,7 +172,7 @@ int
run_curl(struct kore_task *t)
{
struct kore_buf *b;
u_int32_t len;
size_t len;
CURLcode res;
u_int8_t *data;
CURL *curl;

View File

@ -6,6 +6,8 @@ cflags=-Wall -Wmissing-declarations -Wshadow
cflags=-Wstrict-prototypes -Wmissing-prototypes
cflags=-Wpointer-arith -Wcast-qual -Wsign-compare
mime_add=html:text/html; charset=utf-8
dev {
# These cflags are added to the shared ones when
# you build the "dev" flavor.

View File

@ -12,6 +12,6 @@ domain * {
certkey cert/key.pem
accesslog access.log
static / serve_page
static / assert_serve_video_html
dynamic ^/[a-z]*.[a-z0-9]{3}$ video_stream
}

View File

@ -38,7 +38,6 @@ struct video {
};
int init(int);
int serve_page(struct http_request *);
int video_stream(struct http_request *);
static void video_unmap(struct video *);
@ -60,23 +59,14 @@ init(int state)
return (KORE_RESULT_OK);
}
int
serve_page(struct http_request *req)
{
http_response_header(req, "content-type", "text/html");
http_response_stream(req, 200, asset_video_html,
asset_len_video_html, NULL, NULL);
return (KORE_RESULT_OK);
}
int
video_stream(struct http_request *req)
{
struct video *v;
const char *header;
off_t start, end;
int n, err, status;
char *header, *bytes, *range[3], rb[128], *ext, ctype[32];
char *bytes, *range[3], rb[128], *ext, ctype[32];
if (!video_open(req, &v))
return (KORE_RESULT_OK);

View File

@ -197,10 +197,11 @@ struct http_request {
u_int64_t start;
u_int64_t end;
u_int64_t total;
char *host;
char *path;
char *agent;
const char *path;
const char *host;
const char *agent;
struct connection *owner;
u_int8_t *headers;
struct kore_buf *http_body;
int http_body_fd;
char *http_body_path;
@ -264,12 +265,9 @@ void http_serveable(struct http_request *, const void *,
void http_response_stream(struct http_request *, int, void *,
size_t, int (*cb)(struct netbuf *), void *);
int http_request_header(struct http_request *,
const char *, char **);
const char *, const char **);
void http_response_header(struct http_request *,
const char *, const char *);
int http_request_new(struct connection *, const char *,
const char *, const char *, const char *,
struct http_request **);
int http_state_run(struct http_state *, u_int8_t,
struct http_request *);
int http_request_cookie(struct http_request *,

View File

@ -147,7 +147,7 @@ TAILQ_HEAD(netbuf_head, netbuf);
#define CONN_CLOSE_EMPTY 0x40
#define CONN_WS_CLOSE_SENT 0x80
#define KORE_IDLE_TIMER_MAX 20000
#define KORE_IDLE_TIMER_MAX 5000
#define WEBSOCKET_OP_CONT 0x00
#define WEBSOCKET_OP_TEXT 0x01
@ -219,7 +219,7 @@ struct kore_runtime {
int type;
#if !defined(KORE_NO_HTTP)
int (*http_request)(void *, struct http_request *);
int (*validator)(void *, struct http_request *, void *);
int (*validator)(void *, struct http_request *, const void *);
void (*wsconnect)(void *, struct connection *);
void (*wsdisconnect)(void *, struct connection *);
void (*wsmessage)(void *, struct connection *,
@ -297,7 +297,7 @@ struct kore_module_functions {
void (*free)(struct kore_module *);
void (*reload)(struct kore_module *);
int (*callback)(struct kore_module *, int);
void (*load)(struct kore_module *, const char *);
void (*load)(struct kore_module *);
void *(*getsym)(struct kore_module *, const char *);
};
@ -337,6 +337,7 @@ struct kore_worker {
int pipe[2];
struct connection *msg[2];
u_int8_t has_lock;
u_int64_t time_locked;
struct kore_module_handle *active_hdlr;
};
@ -539,8 +540,8 @@ void kore_connection_init(void);
void kore_connection_cleanup(void);
void kore_connection_prune(int);
struct connection *kore_connection_new(void *);
void kore_connection_check_timeout(void);
int kore_connection_nonblock(int, int);
void kore_connection_check_timeout(u_int64_t);
int kore_connection_handle(struct connection *);
void kore_connection_remove(struct connection *);
void kore_connection_disconnect(struct connection *);
@ -637,7 +638,7 @@ void kore_runtime_connect(struct kore_runtime_call *, struct connection *);
int kore_runtime_http_request(struct kore_runtime_call *,
struct http_request *);
int kore_runtime_validator(struct kore_runtime_call *,
struct http_request *, void *);
struct http_request *, const void *);
void kore_runtime_wsconnect(struct kore_runtime_call *, struct connection *);
void kore_runtime_wsdisconnect(struct kore_runtime_call *,
struct connection *);
@ -655,7 +656,7 @@ void kore_validator_reload(void);
int kore_validator_add(const char *, u_int8_t, const char *);
int kore_validator_run(struct http_request *, const char *, char *);
int kore_validator_check(struct http_request *,
struct kore_validator *, void *);
struct kore_validator *, const void *);
struct kore_validator *kore_validator_lookup(const char *);
#endif

View File

@ -106,14 +106,15 @@ kore_auth_run(struct http_request *req, struct kore_auth *auth)
static int
kore_auth_cookie(struct http_request *req, struct kore_auth *auth)
{
const char *hdr;
int i, v;
size_t len, slen;
char *value, *c, *cookie, *cookies[HTTP_MAX_COOKIES];
if (!http_request_header(req, "cookie", &c))
if (!http_request_header(req, "cookie", &hdr))
return (KORE_RESULT_ERROR);
cookie = kore_strdup(c);
cookie = kore_strdup(hdr);
slen = strlen(auth->value);
v = kore_split_string(cookie, ";", cookies, HTTP_MAX_COOKIES);
@ -146,7 +147,7 @@ kore_auth_cookie(struct http_request *req, struct kore_auth *auth)
static int
kore_auth_header(struct http_request *req, struct kore_auth *auth)
{
char *header;
const char *header;
if (!http_request_header(req, auth->value, &header))
return (KORE_RESULT_ERROR);

View File

@ -36,7 +36,7 @@
#endif
static int kfd = -1;
static struct kevent *events;
static struct kevent *events = NULL;
static u_int32_t event_count = 0;
void
@ -70,7 +70,6 @@ kore_platform_worker_setcpu(struct kore_worker *kw)
-1, sizeof(cpuset), &cpuset) == -1) {
fatal("failed: %s", errno_s);
}
#endif /* __FreeBSD_version */
}
@ -79,6 +78,11 @@ kore_platform_event_init(void)
{
struct listener *l;
if (kfd != -1)
close(kfd);
if (events != NULL)
kore_free(events);
if ((kfd = kqueue()) == -1)
fatal("kqueue(): %s", errno_s);
@ -172,10 +176,8 @@ kore_platform_event_wait(u_int64_t timer)
r >= worker_accept_threshold)
break;
if (!kore_connection_accept(l, &c)) {
r = 1;
if (!kore_connection_accept(l, &c))
break;
}
if (c == NULL)
break;

View File

@ -64,7 +64,7 @@ kore_buf_free(struct kore_buf *buf)
}
void
kore_buf_append(struct kore_buf *buf, const void *d, size_t len)
kore_buf_append(struct kore_buf *buf, const void *data, size_t len)
{
if ((buf->offset + len) < len)
fatal("overflow in kore_buf_append");
@ -74,7 +74,7 @@ kore_buf_append(struct kore_buf *buf, const void *d, size_t len)
buf->data = kore_realloc(buf->data, buf->length);
}
memcpy((buf->data + buf->offset), d, len);
memcpy((buf->data + buf->offset), data, len);
buf->offset += len;
}

View File

@ -228,6 +228,7 @@ kore_parse_config_file(const char *fpath)
if ((fp = fopen(fpath, "r")) == NULL)
fatal("configuration given cannot be opened: %s", fpath);
#else
(void)fpath;
fp = config_file_write();
#endif
@ -305,6 +306,7 @@ static int
configure_include(char *path)
{
kore_parse_config_file(path);
return (KORE_RESULT_OK);
}

View File

@ -19,6 +19,7 @@
#include <netinet/tcp.h>
#include <inttypes.h>
#include <fcntl.h>
#include "kore.h"
@ -31,11 +32,16 @@ struct connection_list disconnected;
void
kore_connection_init(void)
{
u_int32_t elm;
TAILQ_INIT(&connections);
TAILQ_INIT(&disconnected);
/* Add some overhead so we don't rollover for internal items. */
elm = worker_max_connections + 10;
kore_pool_init(&connection_pool, "connection_pool",
sizeof(struct connection), worker_max_connections);
sizeof(struct connection), elm);
}
void
@ -150,13 +156,12 @@ kore_connection_accept(struct listener *listener, struct connection **out)
}
void
kore_connection_check_timeout(void)
kore_connection_check_timeout(u_int64_t now)
{
struct connection *c;
u_int64_t now;
struct connection *c, *next;
now = kore_time_ms();
TAILQ_FOREACH(c, &connections, list) {
for (c = TAILQ_FIRST(&connections); c != NULL; c = next) {
next = TAILQ_NEXT(c, list);
if (c->proto == CONN_PROTO_MSG)
continue;
if (!(c->flags & CONN_IDLE_TIMER_ACT))
@ -374,9 +379,13 @@ kore_connection_check_idletimer(u_int64_t now, struct connection *c)
{
u_int64_t d;
d = now - c->idle_timer.start;
if (now > c->idle_timer.start)
d = now - c->idle_timer.start;
else
d = 0;
if (d >= c->idle_timer.length) {
kore_debug("%p idle for %d ms, expiring", c, d);
kore_debug("%p idle for %" PRIu64 " ms, expiring", c, d);
kore_connection_disconnect(c);
}
}

View File

@ -51,10 +51,9 @@ DH *tls_dhparam = NULL;
int tls_version = KORE_TLS_VERSION_1_2;
#endif
static void domain_load_crl(struct kore_domain *);
#if !defined(KORE_NO_TLS)
static int domain_x509_verify(int, X509_STORE_CTX *);
static void domain_load_crl(struct kore_domain *);
static void keymgr_init(void);
static void keymgr_await_data(void);
@ -458,10 +457,12 @@ kore_domain_closelogs(void)
void
kore_domain_load_crl(void)
{
#if !defined(KORE_NO_TLS)
struct kore_domain *dom;
TAILQ_FOREACH(dom, &domains, list)
domain_load_crl(dom);
#endif
}
void
@ -473,14 +474,12 @@ kore_domain_keymgr_init(void)
#endif
}
#if !defined(KORE_NO_TLS)
static void
domain_load_crl(struct kore_domain *dom)
{
#if !defined(KORE_NO_TLS)
X509_STORE *store;
ERR_clear_error();
if (dom->cafile == NULL)
return;
@ -489,6 +488,7 @@ domain_load_crl(struct kore_domain *dom)
return;
}
ERR_clear_error();
if ((store = SSL_CTX_get_cert_store(dom->ssl_ctx)) == NULL) {
kore_log(LOG_ERR, "SSL_CTX_get_cert_store(): %s", ssl_errno_s);
return;
@ -502,8 +502,8 @@ domain_load_crl(struct kore_domain *dom)
X509_STORE_set_flags(store,
X509_V_FLAG_CRL_CHECK | X509_V_FLAG_CRL_CHECK_ALL);
#endif
}
#endif
#if !defined(KORE_NO_TLS)
static void

View File

@ -57,6 +57,10 @@ static int multipart_parse_headers(struct http_request *,
struct kore_buf *, struct kore_buf *,
const char *, const int);
static struct http_request *http_request_new(struct connection *,
const char *, const char *, char *,
const char *);
static struct kore_buf *header_buf;
static struct kore_buf *ckhdr_buf;
static char http_version[32];
@ -129,8 +133,6 @@ http_cleanup(void)
kore_pool_cleanup(&http_request_pool);
kore_pool_cleanup(&http_header_pool);
kore_pool_cleanup(&http_host_pool);
kore_pool_cleanup(&http_path_pool);
kore_pool_cleanup(&http_body_path);
}
@ -147,169 +149,6 @@ http_server_version(const char *version)
http_version_len = l;
}
int
http_request_new(struct connection *c, const char *host,
const char *method, const char *path, const char *version,
struct http_request **out)
{
struct http_request *req;
struct kore_module_handle *hdlr;
char *p, *hp;
int m, flags;
size_t hostlen, pathlen, qsoff;
kore_debug("http_request_new(%p, %s, %s, %s, %s)", c, host,
method, path, version);
if (http_request_count >= http_request_limit) {
http_error_response(c, 503);
return (KORE_RESULT_ERROR);
}
if ((hostlen = strlen(host)) >= KORE_DOMAINNAME_LEN - 1) {
http_error_response(c, 400);
return (KORE_RESULT_ERROR);
}
if ((pathlen = strlen(path)) >= HTTP_URI_LEN - 1) {
http_error_response(c, 414);
return (KORE_RESULT_ERROR);
}
if (strcasecmp(version, "http/1.1")) {
http_error_response(c, 505);
return (KORE_RESULT_ERROR);
}
if ((p = strchr(path, '?')) != NULL) {
*p = '\0';
qsoff = p - path;
} else {
qsoff = 0;
}
hp = NULL;
switch (c->addrtype) {
case AF_INET6:
if (*host == '[') {
if ((hp = strrchr(host, ']')) == NULL) {
http_error_response(c, 400);
return (KORE_RESULT_ERROR);
}
hp++;
if (*hp == ':')
*hp = '\0';
else
hp = NULL;
}
break;
default:
if ((hp = strrchr(host, ':')) != NULL)
*hp = '\0';
break;
}
if ((hdlr = kore_module_handler_find(host, path)) == NULL) {
http_error_response(c, 404);
return (KORE_RESULT_ERROR);
}
if (hp != NULL)
*hp = ':';
if (p != NULL)
*p = '?';
if (!strcasecmp(method, "get")) {
m = HTTP_METHOD_GET;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "delete")) {
m = HTTP_METHOD_DELETE;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "post")) {
m = HTTP_METHOD_POST;
flags = HTTP_REQUEST_EXPECT_BODY;
} else if (!strcasecmp(method, "put")) {
m = HTTP_METHOD_PUT;
flags = HTTP_REQUEST_EXPECT_BODY;
} else if (!strcasecmp(method, "head")) {
m = HTTP_METHOD_HEAD;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "options")) {
m = HTTP_METHOD_OPTIONS;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "patch")) {
m = HTTP_METHOD_PATCH;
flags = HTTP_REQUEST_EXPECT_BODY;
} else {
http_error_response(c, 400);
return (KORE_RESULT_ERROR);
}
req = kore_pool_get(&http_request_pool);
req->end = 0;
req->total = 0;
req->start = 0;
req->owner = c;
req->status = 0;
req->method = m;
req->hdlr = hdlr;
req->agent = NULL;
req->flags = flags;
req->fsm_state = 0;
req->http_body = NULL;
req->http_body_fd = -1;
req->hdlr_extra = NULL;
req->query_string = NULL;
req->http_body_length = 0;
req->http_body_offset = 0;
req->http_body_path = NULL;
#if defined(KORE_USE_PYTHON)
req->py_coro = NULL;
#endif
req->host = kore_pool_get(&http_host_pool);
memcpy(req->host, host, hostlen);
req->host[hostlen] = '\0';
req->path = kore_pool_get(&http_path_pool);
memcpy(req->path, path, pathlen);
req->path[pathlen] = '\0';
if (qsoff > 0) {
req->query_string = req->path + qsoff;
*(req->query_string)++ = '\0';
} else {
req->query_string = NULL;
}
TAILQ_INIT(&(req->resp_headers));
TAILQ_INIT(&(req->req_headers));
TAILQ_INIT(&(req->resp_cookies));
TAILQ_INIT(&(req->req_cookies));
TAILQ_INIT(&(req->arguments));
TAILQ_INIT(&(req->files));
#if defined(KORE_USE_TASKS)
LIST_INIT(&(req->tasks));
#endif
#if defined(KORE_USE_PGSQL)
LIST_INIT(&(req->pgsqls));
#endif
http_request_count++;
TAILQ_INSERT_HEAD(&http_requests, req, list);
TAILQ_INSERT_TAIL(&(c->http_requests), req, olist);
if (out != NULL)
*out = req;
return (KORE_RESULT_OK);
}
void
http_request_sleep(struct http_request *req)
{
@ -477,12 +316,11 @@ http_request_free(struct http_request *req)
#endif
kore_debug("http_request_free: %p->%p", req->owner, req);
kore_pool_put(&http_host_pool, req->host);
kore_pool_put(&http_path_pool, req->path);
kore_free(req->headers);
req->host = NULL;
req->path = NULL;
req->headers = NULL;
TAILQ_REMOVE(&http_requests, req, list);
if (req->owner != NULL)
@ -490,7 +328,6 @@ http_request_free(struct http_request *req)
for (hdr = TAILQ_FIRST(&(req->resp_headers)); hdr != NULL; hdr = next) {
next = TAILQ_NEXT(hdr, list);
TAILQ_REMOVE(&(req->resp_headers), hdr, list);
kore_free(hdr->header);
kore_free(hdr->value);
@ -499,16 +336,12 @@ http_request_free(struct http_request *req)
for (hdr = TAILQ_FIRST(&(req->req_headers)); hdr != NULL; hdr = next) {
next = TAILQ_NEXT(hdr, list);
TAILQ_REMOVE(&(req->req_headers), hdr, list);
kore_free(hdr->header);
kore_free(hdr->value);
kore_pool_put(&http_header_pool, hdr);
}
for (ck = TAILQ_FIRST(&(req->resp_cookies)); ck != NULL; ck = cknext) {
cknext = TAILQ_NEXT(ck, list);
TAILQ_REMOVE(&(req->resp_cookies), ck, list);
kore_free(ck->name);
kore_free(ck->value);
@ -519,7 +352,6 @@ http_request_free(struct http_request *req)
for (ck = TAILQ_FIRST(&(req->req_cookies)); ck != NULL; ck = cknext) {
cknext = TAILQ_NEXT(ck, list);
TAILQ_REMOVE(&(req->req_cookies), ck, list);
kore_free(ck->name);
kore_free(ck->value);
@ -528,7 +360,6 @@ http_request_free(struct http_request *req)
for (q = TAILQ_FIRST(&(req->arguments)); q != NULL; q = qnext) {
qnext = TAILQ_NEXT(q, list);
TAILQ_REMOVE(&(req->arguments), q, list);
kore_free(q->name);
kore_free(q->s_value);
@ -538,7 +369,6 @@ http_request_free(struct http_request *req)
for (f = TAILQ_FIRST(&(req->files)); f != NULL; f = fnext) {
fnext = TAILQ_NEXT(f, list);
TAILQ_REMOVE(&(req->files), f, list);
kore_free(f->filename);
kore_free(f->name);
kore_free(f);
@ -570,7 +400,7 @@ void
http_serveable(struct http_request *req, const void *data, size_t len,
const char *etag, const char *type)
{
char *match;
const char *match;
if (req->method != HTTP_METHOD_GET) {
http_response_header(req, "allow", "get");
@ -593,13 +423,12 @@ http_serveable(struct http_request *req, const void *data, size_t len,
void
http_response(struct http_request *req, int status, const void *d, size_t l)
{
kore_debug("http_response(%p, %d, %p, %zu)", req, status, d, l);
if (req->owner == NULL)
return;
req->status = status;
kore_debug("http_response(%p, %d, %p, %zu)", req, status, d, l);
req->status = status;
switch (req->owner->proto) {
case CONN_PROTO_HTTP:
case CONN_PROTO_WEBSOCKET:
@ -638,7 +467,8 @@ http_response_stream(struct http_request *req, int status, void *base,
}
int
http_request_header(struct http_request *req, const char *header, char **out)
http_request_header(struct http_request *req, const char *header,
const char **out)
{
struct http_header *hdr;
@ -679,6 +509,7 @@ http_header_recv(struct netbuf *nb)
ssize_t ret;
struct http_header *hdr;
struct http_request *req;
const char *clp;
u_int64_t bytes_left;
u_int8_t *end_headers;
int h, i, v, skip, l;
@ -745,10 +576,15 @@ http_header_recv(struct netbuf *nb)
return (KORE_RESULT_OK);
}
if (!http_request_new(c, host,
request[0], request[1], request[2], &req))
req = http_request_new(c, host, request[0], request[1], request[2]);
if (req == NULL)
return (KORE_RESULT_OK);
/* take full ownership of the buffer. */
req->headers = nb->buf;
nb->buf = NULL;
nb->m_len = 0;
for (i = 1; i < h; i++) {
if (i == skip)
continue;
@ -763,8 +599,8 @@ http_header_recv(struct netbuf *nb)
if (*p == ' ')
p++;
hdr = kore_pool_get(&http_header_pool);
hdr->header = kore_strdup(headers[i]);
hdr->value = kore_strdup(p);
hdr->header = headers[i];
hdr->value = p;
TAILQ_INSERT_TAIL(&(req->req_headers), hdr, list);
if (req->agent == NULL &&
@ -779,16 +615,16 @@ http_header_recv(struct netbuf *nb)
return (KORE_RESULT_OK);
}
if (!http_request_header(req, "content-length", &p)) {
if (!http_request_header(req, "content-length", &clp)) {
kore_debug("expected body but no content-length");
req->flags |= HTTP_REQUEST_DELETE;
http_error_response(req->owner, 411);
return (KORE_RESULT_OK);
}
req->content_length = kore_strtonum(p, 10, 0, LONG_MAX, &v);
req->content_length = kore_strtonum(clp, 10, 0, LONG_MAX, &v);
if (v == KORE_RESULT_ERROR) {
kore_debug("content-length invalid: %s", p);
kore_debug("content-length invalid: %s", clp);
req->flags |= HTTP_REQUEST_DELETE;
http_error_response(req->owner, 411);
return (KORE_RESULT_OK);
@ -1070,14 +906,15 @@ void
http_populate_cookies(struct http_request *req)
{
struct http_cookie *ck;
const char *hdr;
int i, v, n;
char *c, *header, *pair[3];
char *cookies[HTTP_MAX_COOKIES];
if (!http_request_header(req, "cookie", &c))
if (!http_request_header(req, "cookie", &hdr))
return;
header = kore_strdup(c);
header = kore_strdup(hdr);
v = kore_split_string(header, ";", cookies, HTTP_MAX_COOKIES);
for (i = 0; i < v; i++) {
for (c = cookies[i]; isspace(*(unsigned char *)c); c++)
@ -1160,6 +997,7 @@ http_populate_qs(struct http_request *req)
void
http_populate_multipart_form(struct http_request *req)
{
const char *hdr;
int h, blen;
struct kore_buf in, out;
char *type, *val, *args[3];
@ -1168,27 +1006,28 @@ http_populate_multipart_form(struct http_request *req)
if (req->method != HTTP_METHOD_POST)
return;
if (!http_request_header(req, "content-type", &type))
return;
h = kore_split_string(type, ";", args, 3);
if (h != 2)
return;
if (strcasecmp(args[0], "multipart/form-data"))
return;
if ((val = strchr(args[1], '=')) == NULL)
return;
val++;
blen = snprintf(boundary, sizeof(boundary), "--%s", val);
if (blen == -1 || (size_t)blen >= sizeof(boundary))
if (!http_request_header(req, "content-type", &hdr))
return;
kore_buf_init(&in, 128);
kore_buf_init(&out, 128);
type = kore_strdup(hdr);
h = kore_split_string(type, ";", args, 3);
if (h != 2)
goto cleanup;
if (strcasecmp(args[0], "multipart/form-data"))
goto cleanup;
if ((val = strchr(args[1], '=')) == NULL)
goto cleanup;
val++;
blen = snprintf(boundary, sizeof(boundary), "--%s", val);
if (blen == -1 || (size_t)blen >= sizeof(boundary))
goto cleanup;
if (!multipart_find_data(&in, NULL, NULL, req, boundary, blen))
goto cleanup;
@ -1206,6 +1045,7 @@ http_populate_multipart_form(struct http_request *req)
}
cleanup:
kore_free(type);
kore_buf_cleanup(&in);
kore_buf_cleanup(&out);
}
@ -1317,13 +1157,11 @@ http_state_exists(struct http_request *req)
void *
http_state_create(struct http_request *req, size_t len)
{
if (req->hdlr_extra != NULL) {
if (req->state_len != len)
fatal("http_state_create: state already set");
} else {
req->state_len = len;
req->hdlr_extra = kore_calloc(1, len);
}
if (req->hdlr_extra != NULL)
fatal("http_state_create: state already exists");
req->state_len = len;
req->hdlr_extra = kore_calloc(1, len);
return (req->hdlr_extra);
}
@ -1341,6 +1179,160 @@ http_state_cleanup(struct http_request *req)
req->hdlr_extra = NULL;
}
static struct http_request *
http_request_new(struct connection *c, const char *host,
const char *method, char *path, const char *version)
{
struct http_request *req;
struct kore_module_handle *hdlr;
char *p, *hp;
int m, flags;
size_t hostlen, pathlen, qsoff;
if (http_request_count >= http_request_limit) {
http_error_response(c, 503);
return (NULL);
}
kore_debug("http_request_new(%p, %s, %s, %s, %s)", c, host,
method, path, version);
if ((hostlen = strlen(host)) >= KORE_DOMAINNAME_LEN - 1) {
http_error_response(c, 400);
return (NULL);
}
if ((pathlen = strlen(path)) >= HTTP_URI_LEN - 1) {
http_error_response(c, 414);
return (NULL);
}
if (strcasecmp(version, "http/1.1")) {
http_error_response(c, 505);
return (NULL);
}
if ((p = strchr(path, '?')) != NULL) {
*p = '\0';
qsoff = p - path;
} else {
qsoff = 0;
}
hp = NULL;
switch (c->addrtype) {
case AF_INET6:
if (*host == '[') {
if ((hp = strrchr(host, ']')) == NULL) {
http_error_response(c, 400);
return (NULL);
}
hp++;
if (*hp == ':')
*hp = '\0';
else
hp = NULL;
}
break;
default:
if ((hp = strrchr(host, ':')) != NULL)
*hp = '\0';
break;
}
if ((hdlr = kore_module_handler_find(host, path)) == NULL) {
http_error_response(c, 404);
return (NULL);
}
if (hp != NULL)
*hp = ':';
if (p != NULL)
*p = '?';
if (!strcasecmp(method, "get")) {
m = HTTP_METHOD_GET;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "delete")) {
m = HTTP_METHOD_DELETE;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "post")) {
m = HTTP_METHOD_POST;
flags = HTTP_REQUEST_EXPECT_BODY;
} else if (!strcasecmp(method, "put")) {
m = HTTP_METHOD_PUT;
flags = HTTP_REQUEST_EXPECT_BODY;
} else if (!strcasecmp(method, "head")) {
m = HTTP_METHOD_HEAD;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "options")) {
m = HTTP_METHOD_OPTIONS;
flags = HTTP_REQUEST_COMPLETE;
} else if (!strcasecmp(method, "patch")) {
m = HTTP_METHOD_PATCH;
flags = HTTP_REQUEST_EXPECT_BODY;
} else {
http_error_response(c, 400);
return (NULL);
}
req = kore_pool_get(&http_request_pool);
req->end = 0;
req->total = 0;
req->start = 0;
req->owner = c;
req->status = 0;
req->method = m;
req->hdlr = hdlr;
req->agent = NULL;
req->flags = flags;
req->fsm_state = 0;
req->http_body = NULL;
req->http_body_fd = -1;
req->hdlr_extra = NULL;
req->query_string = NULL;
req->http_body_length = 0;
req->http_body_offset = 0;
req->http_body_path = NULL;
#if defined(KORE_USE_PYTHON)
req->py_coro = NULL;
#endif
req->host = host;
req->path = path;
if (qsoff > 0) {
req->query_string = path + qsoff;
*(req->query_string)++ = '\0';
} else {
req->query_string = NULL;
}
TAILQ_INIT(&(req->resp_headers));
TAILQ_INIT(&(req->req_headers));
TAILQ_INIT(&(req->resp_cookies));
TAILQ_INIT(&(req->req_cookies));
TAILQ_INIT(&(req->arguments));
TAILQ_INIT(&(req->files));
#if defined(KORE_USE_TASKS)
LIST_INIT(&(req->tasks));
#endif
#if defined(KORE_USE_PGSQL)
LIST_INIT(&(req->pgsqls));
#endif
http_request_count++;
TAILQ_INSERT_HEAD(&http_requests, req, list);
TAILQ_INSERT_TAIL(&(c->http_requests), req, olist);
return (req);
}
static int
multipart_find_data(struct kore_buf *in, struct kore_buf *out,
size_t *olen, struct http_request *req, const void *needle, size_t len)
@ -1613,7 +1605,6 @@ static void
http_error_response(struct connection *c, int status)
{
kore_debug("http_error_response(%p, %d)", c, status);
c->flags |= CONN_CLOSE_EMPTY;
switch (c->proto) {
@ -1635,7 +1626,7 @@ http_response_normal(struct http_request *req, struct connection *c,
{
struct http_cookie *ck;
struct http_header *hdr;
char *conn;
const char *conn;
int connection_close;
kore_buf_reset(header_buf);
@ -1652,8 +1643,9 @@ http_response_normal(struct http_request *req, struct connection *c,
if (connection_close == 0 && req != NULL) {
if (http_request_header(req, "connection", &conn)) {
if ((*conn == 'c' || *conn == 'C') &&
!strcasecmp(conn, "close"))
!strcasecmp(conn, "close")) {
connection_close = 1;
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2018 Joris Vink <joris@coders.se>
* Copyright (c) 2017-2018 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above

View File

@ -64,6 +64,11 @@ kore_platform_worker_setcpu(struct kore_worker *kw)
void
kore_platform_event_init(void)
{
if (efd != -1)
close(efd);
if (events != NULL)
kore_free(events);
if ((efd = epoll_create(10000)) == -1)
fatal("epoll_create(): %s", errno_s);
@ -147,10 +152,8 @@ kore_platform_event_wait(u_int64_t timer)
r >= worker_accept_threshold)
break;
if (!kore_connection_accept(l, &c)) {
r = 1;
if (!kore_connection_accept(l, &c))
break;
}
if (c == NULL)
break;

View File

@ -31,8 +31,8 @@
static TAILQ_HEAD(, kore_module) modules;
static void native_free(struct kore_module *);
static void native_load(struct kore_module *);
static void native_reload(struct kore_module *);
static void native_load(struct kore_module *, const char *);
static void *native_getsym(struct kore_module *, const char *);
struct kore_module_functions kore_native_module = {
@ -100,7 +100,7 @@ kore_module_load(const char *path, const char *onload, int type)
fatal("kore_module_load: unknown type %d", type);
}
module->fun->load(module, onload);
module->fun->load(module);
TAILQ_INSERT_TAIL(&modules, module, list);
if (onload != NULL) {
@ -346,11 +346,11 @@ native_reload(struct kore_module *module)
{
if (dlclose(module->handle))
fatal("cannot close existing module: %s", dlerror());
module->fun->load(module, module->onload);
module->fun->load(module);
}
static void
native_load(struct kore_module *module, const char *onload)
native_load(struct kore_module *module)
{
module->handle = dlopen(module->path, RTLD_NOW | RTLD_GLOBAL);
if (module->handle == NULL)

View File

@ -34,7 +34,6 @@ static int msg_recv_packet(struct netbuf *);
static int msg_recv_data(struct netbuf *);
static void msg_disconnected_parent(struct connection *);
static void msg_disconnected_worker(struct connection *);
static void msg_type_shutdown(struct kore_msg *msg, const void *data);
#if !defined(KORE_NO_HTTP)
static void msg_type_accesslog(struct kore_msg *, const void *);
@ -58,8 +57,6 @@ kore_msg_parent_init(void)
kore_msg_parent_add(kw);
}
kore_msg_register(KORE_MSG_SHUTDOWN, msg_type_shutdown);
#if !defined(KORE_NO_HTTP)
kore_msg_register(KORE_MSG_ACCESSLOG, msg_type_accesslog);
#endif
@ -219,13 +216,6 @@ msg_disconnected_worker(struct connection *c)
c->hdlr_extra = NULL;
}
static void
msg_type_shutdown(struct kore_msg *msg, const void *data)
{
kore_log(LOG_NOTICE, "worker requested shutdown");
kore_signal(SIGQUIT);
}
#if !defined(KORE_NO_HTTP)
static void
msg_type_accesslog(struct kore_msg *msg, const void *data)

View File

@ -33,7 +33,11 @@ struct kore_pool nb_pool;
void
net_init(void)
{
kore_pool_init(&nb_pool, "nb_pool", sizeof(struct netbuf), 1000);
u_int32_t elm;
/* Add some overhead so we don't roll over for internal items. */
elm = worker_max_connections + 10;
kore_pool_init(&nb_pool, "nb_pool", sizeof(struct netbuf), elm);
}
void
@ -86,8 +90,7 @@ net_send_queue(struct connection *c, const void *data, size_t len)
nb->m_len = nb->b_len;
nb->buf = kore_malloc(nb->m_len);
if (len > 0)
memcpy(nb->buf, d, nb->b_len);
memcpy(nb->buf, d, nb->b_len);
TAILQ_INSERT_TAIL(&(c->send_queue), nb, list);
}
@ -120,14 +123,11 @@ net_recv_reset(struct connection *c, size_t len, int (*cb)(struct netbuf *))
{
kore_debug("net_recv_reset(): %p %zu", c, len);
if (c->rnb->type != NETBUF_RECV)
fatal("net_recv_reset(): wrong netbuf type");
c->rnb->cb = cb;
c->rnb->s_off = 0;
c->rnb->b_len = len;
if (c->rnb->b_len <= c->rnb->m_len &&
if (c->rnb->buf != NULL && c->rnb->b_len <= c->rnb->m_len &&
c->rnb->m_len < (NETBUF_SEND_PAYLOAD_MAX / 2))
return;
@ -143,7 +143,7 @@ net_recv_queue(struct connection *c, size_t len, int flags,
kore_debug("net_recv_queue(): %p %zu %d", c, len, flags);
if (c->rnb != NULL)
fatal("net_recv_queue(): called incorrectly for %p", c);
fatal("net_recv_queue(): called incorrectly");
c->rnb = kore_pool_get(&nb_pool);
c->rnb->cb = cb;
@ -162,9 +162,6 @@ net_recv_expand(struct connection *c, size_t len, int (*cb)(struct netbuf *))
{
kore_debug("net_recv_expand(): %p %d", c, len);
if (c->rnb->type != NETBUF_RECV)
fatal("net_recv_expand(): wrong netbuf type");
c->rnb->cb = cb;
c->rnb->b_len += len;
c->rnb->m_len = c->rnb->b_len;
@ -210,8 +207,9 @@ net_send_flush(struct connection *c)
return (KORE_RESULT_ERROR);
}
if ((c->flags & CONN_CLOSE_EMPTY) && TAILQ_EMPTY(&(c->send_queue)))
if ((c->flags & CONN_CLOSE_EMPTY) && TAILQ_EMPTY(&(c->send_queue))) {
kore_connection_disconnect(c);
}
return (KORE_RESULT_OK);
}

View File

@ -272,9 +272,7 @@ kore_pgsql_query_params(struct kore_pgsql *pgsql,
va_list args;
va_start(args, count);
ret = kore_pgsql_v_query_params(pgsql, query, result, count, args);
va_end(args);
return (ret);
@ -374,7 +372,6 @@ void
kore_pgsql_cleanup(struct kore_pgsql *pgsql)
{
kore_debug("kore_pgsql_cleanup(%p)", pgsql);
pgsql_queue_remove(pgsql);
if (pgsql->result != NULL)
@ -547,6 +544,7 @@ pgsql_queue_add(struct kore_pgsql *pgsql)
pgsql_queue_count++;
TAILQ_INSERT_TAIL(&pgsql_wait_queue, pgw, list);
pgsql_queue_count++;
}
static void
@ -590,6 +588,7 @@ pgsql_queue_wakeup(void)
TAILQ_REMOVE(&pgsql_wait_queue, pgw, list);
kore_pool_put(&pgsql_wait_pool, pgw);
pgsql_queue_count--;
return;
}
}
@ -668,7 +667,6 @@ pgsql_conn_cleanup(struct pgsql_conn *conn)
struct pgsql_db *pgsqldb;
kore_debug("pgsql_conn_cleanup(): %p", conn);
if (conn->flags & PGSQL_CONN_FREE)
TAILQ_REMOVE(&pgsql_conn_free, conn, list);

View File

@ -62,10 +62,8 @@ kore_pool_cleanup(struct kore_pool *pool)
pool->elen = 0;
pool->slen = 0;
if (pool->name != NULL) {
free(pool->name);
pool->name = NULL;
}
free(pool->name);
pool->name = NULL;
pool_region_destroy(pool);
}

View File

@ -36,7 +36,7 @@ static PyObject *pyconnection_alloc(struct connection *);
static PyObject *python_callable(PyObject *, const char *);
static PyObject *pyhttp_file_alloc(struct http_file *);
static PyObject *pyhttp_request_alloc(struct http_request *);
static PyObject *pyhttp_request_alloc(const struct http_request *);
#if defined(KORE_USE_PGSQL)
static PyObject *pykore_pgsql_alloc(struct http_request *,
@ -49,16 +49,17 @@ static void python_push_integer(PyObject *, const char *, long);
static void python_push_type(const char *, PyObject *, PyTypeObject *);
static int python_runtime_http_request(void *, struct http_request *);
static int python_runtime_validator(void *, struct http_request *, void *);
static int python_runtime_validator(void *, struct http_request *,
const void *);
static void python_runtime_wsmessage(void *, struct connection *,
u_int8_t, const void *, size_t);
static void python_runtime_execute(void *);
static int python_runtime_onload(void *, int);
static void python_runtime_connect(void *, struct connection *);
static void python_module_load(struct kore_module *);
static void python_module_free(struct kore_module *);
static void python_module_reload(struct kore_module *);
static void python_module_load(struct kore_module *, const char *);
static void *python_module_getsym(struct kore_module *, const char *);
static void *python_malloc(void *, size_t);
@ -211,7 +212,6 @@ python_module_reload(struct kore_module *module)
PyObject *handle;
PyErr_Clear();
if ((handle = PyImport_ReloadModule(module->handle)) == NULL) {
python_log_error("python_module_reload");
return;
@ -222,7 +222,7 @@ python_module_reload(struct kore_module *module)
}
static void
python_module_load(struct kore_module *module, const char *onload)
python_module_load(struct kore_module *module)
{
module->handle = python_import(module->path);
if (module->handle == NULL)
@ -322,7 +322,7 @@ python_runtime_http_request(void *addr, struct http_request *req)
}
static int
python_runtime_validator(void *addr, struct http_request *req, void *data)
python_runtime_validator(void *addr, struct http_request *req, const void *data)
{
int ret;
PyObject *pyret, *pyreq, *args, *callable, *arg;
@ -729,15 +729,23 @@ pyconnection_get_addr(struct pyconnection *pyc, void *closure)
}
static PyObject *
pyhttp_request_alloc(struct http_request *req)
pyhttp_request_alloc(const struct http_request *req)
{
struct pyhttp_request *pyreq;
union { const void *cp; void *p; } ptr;
struct pyhttp_request *pyreq;
pyreq = PyObject_New(struct pyhttp_request, &pyhttp_request_type);
if (pyreq == NULL)
return (NULL);
pyreq->req = req;
/*
* Hack around all http apis taking a non-const pointer and us having
* a const pointer for the req data structure. This is because we
* could potentially be called from a validator where the argument
* is a http_request pointer.
*/
ptr.cp = req;
pyreq->req = ptr.p;
pyreq->data = NULL;
return ((PyObject *)pyreq);
@ -798,7 +806,7 @@ pyhttp_response_header(struct pyhttp_request *pyreq, PyObject *args)
static PyObject *
pyhttp_request_header(struct pyhttp_request *pyreq, PyObject *args)
{
char *value;
const char *value;
const char *header;
PyObject *result;

View File

@ -31,7 +31,8 @@ static int native_runtime_onload(void *, int);
static void native_runtime_connect(void *, struct connection *);
#if !defined(KORE_NO_HTTP)
static int native_runtime_http_request(void *, struct http_request *);
static int native_runtime_validator(void *, struct http_request *, void *);
static int native_runtime_validator(void *, struct http_request *,
const void *);
static void native_runtime_wsmessage(void *, struct connection *, u_int8_t,
const void *, size_t);
@ -97,7 +98,7 @@ kore_runtime_http_request(struct kore_runtime_call *rcall,
int
kore_runtime_validator(struct kore_runtime_call *rcall,
struct http_request *req, void *data)
struct http_request *req, const void *data)
{
return (rcall->runtime->validator(rcall->addr, req, data));
}
@ -160,9 +161,9 @@ native_runtime_http_request(void *addr, struct http_request *req)
}
static int
native_runtime_validator(void *addr, struct http_request *req, void *data)
native_runtime_validator(void *addr, struct http_request *req, const void *data)
{
int (*cb)(struct http_request *, void *);
int (*cb)(struct http_request *, const void *);
*(void **)&(cb) = addr;
return (cb(req, data));

View File

@ -157,7 +157,6 @@ void
kore_task_finish(struct kore_task *t)
{
kore_debug("kore_task_finished: %p (%d)", t, t->result);
pthread_rwlock_wrlock(&(t->lock));
if (t->fds[1] != -1) {
@ -174,6 +173,7 @@ kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
int fd;
kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len);
THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
task_channel_write(fd, &len, sizeof(len));
task_channel_write(fd, data, len);

View File

@ -65,7 +65,7 @@ kore_timer_run(u_int64_t now)
struct kore_timer *timer, *t;
u_int64_t next_timer;
next_timer = 100;
next_timer = 1000;
while ((timer = TAILQ_FIRST(&kore_timers)) != NULL) {
if (timer->nextrun > now) {

View File

@ -47,6 +47,7 @@ static struct {
static char b64table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
#if defined(KORE_DEBUG)
void
kore_debug_internal(char *file, int line, const char *fmt, ...)
{
@ -59,6 +60,7 @@ kore_debug_internal(char *file, int line, const char *fmt, ...)
printf("[%d] %s:%d - %s\n", kore_pid, file, line, buf);
}
#endif
void
kore_log_init(void)
@ -368,12 +370,11 @@ kore_time_to_date(time_t now)
u_int64_t
kore_time_ms(void)
{
struct timeval tv;
struct timespec ts;
if (gettimeofday(&tv, NULL) == -1)
return (0);
(void)clock_gettime(CLOCK_MONOTONIC, &ts);
return (tv.tv_sec * 1000 + (tv.tv_usec / 1000));
return ((u_int64_t)(ts.tv_sec * 1000 + (ts.tv_nsec / 1000000)));
}
int

View File

@ -80,7 +80,7 @@ kore_validator_run(struct http_request *req, const char *name, char *data)
int
kore_validator_check(struct http_request *req, struct kore_validator *val,
void *data)
const void *data)
{
int r;

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2014 Joris Vink <joris@coders.se>
* Copyright (c) 2014-2018 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@ -54,7 +54,8 @@ kore_websocket_handshake(struct http_request *req, const char *onconnect,
{
SHA_CTX sctx;
struct kore_buf *buf;
char *key, *base64, *version;
char *base64;
const char *key, *version;
u_int8_t digest[SHA_DIGEST_LENGTH];
if (!http_request_header(req, "sec-websocket-key", &key)) {
@ -141,7 +142,6 @@ kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
struct kore_buf frame;
kore_buf_init(&frame, len);
websocket_frame_build(&frame, op, data, len);
net_send_stream(c, frame.data, frame.offset, NULL, NULL);
@ -213,7 +213,8 @@ websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
}
}
kore_buf_append(frame, data, len);
if (data != NULL && len > 0)
kore_buf_append(frame, data, len);
}
static int
@ -279,7 +280,6 @@ websocket_recv_frame(struct netbuf *nb)
u_int8_t op, moff, extra;
c = nb->owner;
op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);

View File

@ -48,17 +48,17 @@
#include "python_api.h"
#endif
#if defined(WORKER_DEBUG)
#define worker_debug(fmt, ...) printf(fmt, ##__VA_ARGS__)
#else
#define worker_debug(fmt, ...)
#endif
#if !defined(WAIT_ANY)
#define WAIT_ANY (-1)
#endif
#define WORKER_LOCK_TIMEOUT 100
#define WORKER_LOCK_TIMEOUT 500
#if !defined(KORE_NO_TLS)
#define WORKER_SOLO_COUNT 2
#else
#define WORKER_SOLO_COUNT 1
#endif
#define WORKER(id) \
(struct kore_worker *)((u_int8_t *)kore_workers + \
@ -72,8 +72,8 @@ struct wlock {
static int worker_trylock(void);
static void worker_unlock(void);
static inline int kore_worker_acceptlock_obtain(void);
static inline void kore_worker_acceptlock_release(void);
static inline int kore_worker_acceptlock_obtain(u_int64_t);
static inline int kore_worker_acceptlock_release(u_int64_t);
#if !defined(KORE_NO_TLS)
static void worker_entropy_recv(struct kore_msg *, const void *);
@ -280,7 +280,8 @@ kore_worker_entry(struct kore_worker *kw)
struct kore_runtime_call *rcall;
char buf[16];
int quit, had_lock, r;
u_int64_t now, next_lock, netwait;
u_int64_t timerwait, netwait;
u_int64_t now, next_lock, next_prune;
#if !defined(KORE_NO_TLS)
u_int64_t last_seed;
#endif
@ -332,6 +333,7 @@ kore_worker_entry(struct kore_worker *kw)
quit = 0;
had_lock = 0;
next_lock = 0;
next_prune = 0;
worker_active_connections = 0;
kore_platform_event_init();
@ -381,10 +383,8 @@ kore_worker_entry(struct kore_worker *kw)
sig_recv = 0;
}
netwait = 100;
now = kore_time_ms();
netwait = kore_timer_run(now);
if (netwait > 10)
netwait = 10;
#if !defined(KORE_NO_TLS)
if ((now - last_seed) > KORE_RESEED_TIME) {
@ -394,15 +394,38 @@ kore_worker_entry(struct kore_worker *kw)
}
#endif
if (now > next_lock) {
if (kore_worker_acceptlock_obtain()) {
if (!worker->has_lock && next_lock <= now) {
if (kore_worker_acceptlock_obtain(now)) {
if (had_lock == 0) {
kore_platform_enable_accept();
had_lock = 1;
}
} else {
next_lock = now + WORKER_LOCK_TIMEOUT / 2;
}
}
if (!worker->has_lock) {
if (worker_active_connections > 0) {
if (next_lock > now)
netwait = next_lock - now;
} else {
netwait = 10;
}
}
timerwait = kore_timer_run(now);
if (timerwait < netwait)
netwait = timerwait;
r = kore_platform_event_wait(netwait);
if (worker->has_lock && r > 0) {
if (netwait > 10)
now = kore_time_ms();
if (kore_worker_acceptlock_release(now))
next_lock = now + WORKER_LOCK_TIMEOUT;
}
if (!worker->has_lock) {
if (had_lock == 1) {
had_lock = 0;
@ -410,18 +433,15 @@ kore_worker_entry(struct kore_worker *kw)
}
}
r = kore_platform_event_wait(netwait);
if (worker->has_lock && r > 0) {
kore_worker_acceptlock_release();
next_lock = now + WORKER_LOCK_TIMEOUT;
}
#if !defined(KORE_NO_HTTP)
http_process();
#endif
kore_connection_check_timeout();
kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
if (next_prune <= now) {
kore_connection_check_timeout(now);
kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
next_prune = now + 500;
}
if (quit)
break;
@ -530,28 +550,44 @@ kore_worker_wait(int final)
}
}
static inline void
kore_worker_acceptlock_release(void)
static inline int
kore_worker_acceptlock_release(u_int64_t now)
{
if (worker_count == 1 || worker_no_lock == 1)
return;
if (worker_count == WORKER_SOLO_COUNT || worker_no_lock == 1)
return (0);
if (worker->has_lock != 1)
return;
return (0);
if (worker_active_connections < worker_max_connections) {
#if !defined(KORE_NO_HTTP)
if (http_request_count < http_request_limit)
return (0);
#else
return (0);
#endif
}
#if defined(WORKER_DEBUG)
kore_log(LOG_DEBUG, "worker busy, releasing lock");
kore_log(LOG_DEBUG, "had lock for %lu ms", now - worker->time_locked);
#endif
worker_unlock();
worker->has_lock = 0;
return (1);
}
static inline int
kore_worker_acceptlock_obtain(void)
kore_worker_acceptlock_obtain(u_int64_t now)
{
int r;
if (worker->has_lock == 1)
return (1);
if (worker_count == 1 || worker_no_lock == 1) {
if (worker_count == WORKER_SOLO_COUNT || worker_no_lock == 1) {
worker->has_lock = 1;
return (1);
}
@ -568,6 +604,10 @@ kore_worker_acceptlock_obtain(void)
if (worker_trylock()) {
r = 1;
worker->has_lock = 1;
worker->time_locked = now;
#if defined(WORKER_DEBUG)
kore_log(LOG_DEBUG, "got lock");
#endif
}
return (r);
@ -579,8 +619,6 @@ worker_trylock(void)
if (!__sync_bool_compare_and_swap(&(accept_lock->lock), 0, 1))
return (0);
worker_debug("wrk#%d grabbed lock (%d/%d)\n", worker->id,
worker_active_connections, worker_max_connections);
accept_lock->current = worker->pid;
return (1);
@ -600,7 +638,7 @@ worker_entropy_recv(struct kore_msg *msg, const void *data)
{
if (msg->length != 1024) {
kore_log(LOG_WARNING,
"short entropy response (got:%u - wanted:1024)",
"invalid entropy response (got:%u - wanted:1024)",
msg->length);
}