kore/src/websocket.c

367 lines
9.1 KiB
C
Raw Normal View History

/*
2022-01-31 22:02:06 +01:00
* Copyright (c) 2014-2022 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <sys/param.h>
#include <sys/types.h>
#include <limits.h>
2016-01-22 12:08:13 +01:00
#include <string.h>
#include "kore.h"
#include "http.h"
#include "sha1.h"
#define WEBSOCKET_FRAME_HDR 2
#define WEBSOCKET_MASK_LEN 4
#define WEBSOCKET_PAYLOAD_SINGLE 125
#define WEBSOCKET_PAYLOAD_EXTEND_1 126
#define WEBSOCKET_PAYLOAD_EXTEND_2 127
#define WEBSOCKET_OPCODE_MASK 0x0f
#define WEBSOCKET_FRAME_LENGTH(x) ((x) & ~(1 << 7))
#define WEBSOCKET_HAS_MASK(x) ((x) & (1 << 7))
#define WEBSOCKET_HAS_FINFLAG(x) ((x) & (1 << 7))
#define WEBSOCKET_RSV(x, i) ((x) & (1 << (7 - i)))
#define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
u_int64_t kore_websocket_timeout = 120000;
u_int64_t kore_websocket_maxframe = 16384;
static int websocket_recv_frame(struct netbuf *);
static int websocket_recv_opcode(struct netbuf *);
static void websocket_disconnect(struct connection *);
static void websocket_frame_build(struct kore_buf *, u_int8_t,
const void *, size_t);
void
kore_websocket_handshake(struct http_request *req, const char *onconnect,
const char *onmessage, const char *ondisconnect)
{
SHA1_CTX sctx;
struct kore_buf *buf;
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
char *base64;
const char *key, *version;
u_int8_t digest[SHA1_DIGEST_LENGTH];
if (!http_request_header(req, "sec-websocket-key", &key)) {
http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
return;
}
if (!http_request_header(req, "sec-websocket-version", &version)) {
http_response_header(req, "sec-websocket-version", "13");
http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
return;
}
if (strcmp(version, "13")) {
http_response_header(req, "sec-websocket-version", "13");
http_response(req, HTTP_STATUS_BAD_REQUEST, NULL, 0);
return;
}
2016-07-14 12:34:29 +02:00
buf = kore_buf_alloc(128);
kore_buf_appendf(buf, "%s%s", key, WEBSOCKET_SERVER_RESPONSE);
SHA1Init(&sctx);
SHA1Update(&sctx, buf->data, buf->offset);
SHA1Final(digest, &sctx);
kore_buf_free(buf);
if (!kore_base64_encode(digest, sizeof(digest), &base64)) {
http_response(req, HTTP_STATUS_INTERNAL_ERROR, NULL, 0);
return;
}
http_response_header(req, "upgrade", "websocket");
http_response_header(req, "connection", "upgrade");
http_response_header(req, "sec-websocket-accept", base64);
kore_free(base64);
req->owner->proto = CONN_PROTO_WEBSOCKET;
http_response(req, HTTP_STATUS_SWITCHING_PROTOCOLS, NULL, 0);
net_recv_reset(req->owner, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
req->owner->disconnect = websocket_disconnect;
req->owner->rnb->flags &= ~NETBUF_CALL_CB_ALWAYS;
req->owner->http_timeout = 0;
req->owner->idle_timer.start = kore_time_ms();
req->owner->idle_timer.length = kore_websocket_timeout;
if (onconnect != NULL) {
req->owner->ws_connect = kore_runtime_getcall(onconnect);
if (req->owner->ws_connect == NULL)
fatal("no symbol '%s' for ws_connect", onconnect);
} else {
req->owner->ws_connect = NULL;
}
if (onmessage != NULL) {
req->owner->ws_message = kore_runtime_getcall(onmessage);
if (req->owner->ws_message == NULL)
fatal("no symbol '%s' for ws_message", onmessage);
} else {
req->owner->ws_message = NULL;
}
if (ondisconnect != NULL) {
req->owner->ws_disconnect = kore_runtime_getcall(ondisconnect);
if (req->owner->ws_disconnect == NULL)
fatal("no symbol '%s' for ws_disconnect", ondisconnect);
} else {
req->owner->ws_disconnect = NULL;
}
if (req->owner->ws_connect != NULL)
kore_runtime_wsconnect(req->owner->ws_connect, req->owner);
}
int
kore_websocket_send_clean(struct netbuf *nb)
{
2018-04-13 07:41:22 +02:00
kore_free(nb->buf);
2018-08-30 09:13:11 +02:00
return (0);
}
void
kore_websocket_send(struct connection *c, u_int8_t op, const void *data,
size_t len)
{
struct kore_buf frame;
kore_buf_init(&frame, len);
websocket_frame_build(&frame, op, data, len);
2018-04-13 07:41:22 +02:00
net_send_stream(c, frame.data, frame.offset,
kore_websocket_send_clean, NULL);
/* net_send_stream() takes over the buffer data pointer. */
frame.data = NULL;
kore_buf_cleanup(&frame);
net_send_flush(c);
}
void
kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data,
size_t len, int scope)
{
struct connection *c;
struct kore_buf *frame;
2016-07-14 12:34:29 +02:00
frame = kore_buf_alloc(len);
websocket_frame_build(frame, op, data, len);
TAILQ_FOREACH(c, &connections, list) {
if (c != src && c->proto == CONN_PROTO_WEBSOCKET) {
net_send_queue(c, frame->data, frame->offset);
net_send_flush(c);
}
}
if (scope == WEBSOCKET_BROADCAST_GLOBAL) {
kore_msg_send(KORE_MSG_WORKER_ALL,
KORE_MSG_WEBSOCKET, frame->data, frame->offset);
}
kore_buf_free(frame);
}
static void
websocket_frame_build(struct kore_buf *frame, u_int8_t op, const void *data,
size_t len)
{
u_int8_t len_1;
u_int16_t len16;
u_int64_t len64;
if (len > WEBSOCKET_PAYLOAD_SINGLE) {
if (len <= USHRT_MAX)
len_1 = WEBSOCKET_PAYLOAD_EXTEND_1;
else
len_1 = WEBSOCKET_PAYLOAD_EXTEND_2;
} else {
len_1 = len;
}
op |= (1 << 7);
kore_buf_append(frame, &op, sizeof(op));
len_1 &= ~(1 << 7);
kore_buf_append(frame, &len_1, sizeof(len_1));
if (len_1 > WEBSOCKET_PAYLOAD_SINGLE) {
switch (len_1) {
case WEBSOCKET_PAYLOAD_EXTEND_1:
net_write16((u_int8_t *)&len16, len);
kore_buf_append(frame, &len16, sizeof(len16));
break;
case WEBSOCKET_PAYLOAD_EXTEND_2:
net_write64((u_int8_t *)&len64, len);
kore_buf_append(frame, &len64, sizeof(len64));
break;
}
}
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
if (data != NULL && len > 0)
kore_buf_append(frame, data, len);
}
static int
websocket_recv_opcode(struct netbuf *nb)
{
u_int8_t op, len;
struct connection *c = nb->owner;
if (!WEBSOCKET_HAS_MASK(nb->buf[1]))
return (KORE_RESULT_ERROR);
if (WEBSOCKET_RSV(nb->buf[0], 1) || WEBSOCKET_RSV(nb->buf[0], 2) ||
WEBSOCKET_RSV(nb->buf[0], 3))
return (KORE_RESULT_ERROR);
len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
switch (op) {
case WEBSOCKET_OP_CONT:
case WEBSOCKET_OP_TEXT:
case WEBSOCKET_OP_BINARY:
break;
case WEBSOCKET_OP_CLOSE:
case WEBSOCKET_OP_PING:
case WEBSOCKET_OP_PONG:
if (len > WEBSOCKET_PAYLOAD_SINGLE ||
!WEBSOCKET_HAS_FINFLAG(nb->buf[0]))
return (KORE_RESULT_ERROR);
break;
default:
return (KORE_RESULT_ERROR);
}
switch (len) {
case WEBSOCKET_PAYLOAD_EXTEND_1:
len += sizeof(u_int16_t);
break;
case WEBSOCKET_PAYLOAD_EXTEND_2:
len += sizeof(u_int64_t);
break;
}
len += WEBSOCKET_MASK_LEN;
net_recv_expand(c, len, websocket_recv_frame);
return (KORE_RESULT_OK);
}
static int
websocket_recv_frame(struct netbuf *nb)
{
struct connection *c;
int ret;
u_int64_t len, i, total;
u_int8_t op, moff, extra;
c = nb->owner;
op = nb->buf[0] & WEBSOCKET_OPCODE_MASK;
len = WEBSOCKET_FRAME_LENGTH(nb->buf[1]);
switch (len) {
case WEBSOCKET_PAYLOAD_EXTEND_1:
moff = 4;
extra = sizeof(u_int16_t);
len = net_read16(&nb->buf[2]);
break;
case WEBSOCKET_PAYLOAD_EXTEND_2:
moff = 10;
extra = sizeof(u_int64_t);
len = net_read64(&nb->buf[2]);
break;
default:
extra = 0;
moff = 2;
break;
}
if (len > kore_websocket_maxframe)
return (KORE_RESULT_ERROR);
extra += WEBSOCKET_FRAME_HDR;
total = len + extra + WEBSOCKET_MASK_LEN;
if (total > nb->b_len) {
total -= nb->b_len;
net_recv_expand(c, total, websocket_recv_frame);
return (KORE_RESULT_OK);
}
if (total != nb->b_len)
return (KORE_RESULT_ERROR);
for (i = 0; i < len; i++)
nb->buf[moff + 4 + i] ^= nb->buf[moff + (i % 4)];
ret = KORE_RESULT_OK;
switch (op) {
case WEBSOCKET_OP_PONG:
break;
case WEBSOCKET_OP_CONT:
ret = KORE_RESULT_ERROR;
2017-02-06 11:39:50 +01:00
kore_log(LOG_ERR,
"%p: we do not support op 0x%02x yet", (void *)c, op);
break;
case WEBSOCKET_OP_TEXT:
case WEBSOCKET_OP_BINARY:
if (c->ws_message != NULL) {
kore_runtime_wsmessage(c->ws_message,
c, op, &nb->buf[moff + 4], len);
}
break;
case WEBSOCKET_OP_CLOSE:
c->evt.flags &= ~KORE_EVENT_READ;
if (!(c->flags & CONN_WS_CLOSE_SENT)) {
c->flags |= CONN_WS_CLOSE_SENT;
kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
}
kore_connection_disconnect(c);
break;
case WEBSOCKET_OP_PING:
kore_websocket_send(c, WEBSOCKET_OP_PONG,
&nb->buf[moff + 4], len);
break;
default:
return (KORE_RESULT_ERROR);
}
net_recv_reset(c, WEBSOCKET_FRAME_HDR, websocket_recv_opcode);
2017-01-30 22:35:34 +01:00
return (ret);
}
static void
websocket_disconnect(struct connection *c)
{
2017-01-30 22:25:08 +01:00
if (c->ws_disconnect != NULL)
kore_runtime_wsdisconnect(c->ws_disconnect, c);
if (!(c->flags & CONN_WS_CLOSE_SENT)) {
c->flags |= CONN_WS_CLOSE_SENT;
c->evt.flags &= ~KORE_EVENT_READ;
kore_websocket_send(c, WEBSOCKET_OP_CLOSE, NULL, 0);
}
}