kore/examples/sse/src/sse.c

183 lines
4.8 KiB
C
Raw Normal View History

2015-05-15 19:23:26 +02:00
/*
* Copyright (c) 2015 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
* Simple example of how SSE (Server Side Events) could be used in Kore.
*
* Upon new arrivals, a join event is broadcast to all clients.
* If a client goes away a leave event is broadcasted.
* Each connection gets its own 10 second ping timer which will emit
* a ping event to the connection endpoint.
*/
#include <kore/kore.h>
#include <kore/http.h>
#include "assets.h"
2015-08-06 08:21:28 +02:00
void sse_ping(void *, u_int64_t);
2015-05-15 19:23:26 +02:00
int page(struct http_request *);
int subscribe(struct http_request *);
void sse_disconnect(struct connection *);
void sse_send(struct connection *, void *, size_t);
void sse_broadcast(struct connection *, void *, size_t);
int check_header(struct http_request *, const char *, const char *);
/*
* Each client subscribed to our SSE gets a state attached
* to their hdlr_extra pointer member.
*/
struct sse_state {
struct kore_timer *timer;
};
int
page(struct http_request *req)
{
if (req->method != HTTP_METHOD_GET) {
http_response_header(req, "allow", "get");
http_response(req, 405, NULL, 0);
return (KORE_RESULT_OK);
}
http_response_header(req, "content-type", "text/html");
http_response(req, 200, asset_index_html, asset_len_index_html);
return (KORE_RESULT_OK);
}
int
subscribe(struct http_request *req)
{
struct sse_state *state;
char *hello = "event:join\ndata: client\n\n";
/* Preventive paranoia. */
if (req->hdlr_extra != NULL) {
kore_log(LOG_ERR, "%p: already subscribed", req->owner);
http_response(req, 500, NULL, 0);
return (KORE_RESULT_OK);
}
/* Only allow GET methods. */
if (req->method != HTTP_METHOD_GET) {
http_response_header(req, "allow", "get");
http_response(req, 405, NULL, 0);
return (KORE_RESULT_OK);
}
/* Only do SSE if the client told us it wanted too. */
if (!check_header(req, "accept", "text/event-stream"))
return (KORE_RESULT_OK);
/* Do not include content-length in our response. */
req->flags |= HTTP_REQUEST_NO_CONTENT_LENGTH;
/* Notify existing clients of our new client now. */
sse_broadcast(req->owner, hello, strlen(hello));
/* Set a disconnection method so we know when this client goes away. */
req->owner->disconnect = sse_disconnect;
/* Allocate a state to be carried by our connection. */
state = kore_malloc(sizeof(*state));
req->owner->hdlr_extra = state;
/* Now start a timer to send a ping back every 10 second. */
state->timer = kore_timer_add(sse_ping, 10000, req->owner, 0);
/* Respond that the SSE channel is now open. */
kore_log(LOG_NOTICE, "%p: connected for SSE", req->owner);
http_response_header(req, "content-type", "text/event-stream");
http_response(req, 200, NULL, 0);
return (KORE_RESULT_OK);
}
void
sse_broadcast(struct connection *src, void *data, size_t len)
{
struct connection *c;
/* Broadcast the message to all other clients. */
2015-08-06 08:21:28 +02:00
TAILQ_FOREACH(c, &connections, list) {
2015-05-15 19:23:26 +02:00
if (c == src)
continue;
sse_send(c, data, len);
}
}
void
sse_send(struct connection *c, void *data, size_t len)
{
struct sse_state *state = c->hdlr_extra;
/* Do not send to clients that do not have a state. */
if (state == NULL)
return;
/* Queue outgoing data now. */
net_send_queue(c, data, len);
2015-05-15 19:23:26 +02:00
net_send_flush(c);
}
void
2015-08-06 08:21:28 +02:00
sse_ping(void *arg, u_int64_t now)
2015-05-15 19:23:26 +02:00
{
struct connection *c = arg;
char *ping = "event:ping\ndata:\n\n";
/* Send our ping to the client. */
sse_send(c, ping, strlen(ping));
}
void
sse_disconnect(struct connection *c)
{
struct sse_state *state = c->hdlr_extra;
char *leaving = "event: leave\ndata: client\n\n";
kore_log(LOG_NOTICE, "%p: disconnecting for SSE", c);
/* Tell others we are leaving. */
sse_broadcast(c, leaving, strlen(leaving));
/* Kill our timer and free/remove the state. */
kore_timer_remove(state->timer);
kore_free(state);
2015-05-15 19:23:26 +02:00
/* Prevent us to be called again. */
c->hdlr_extra = NULL;
c->disconnect = NULL;
}
int
check_header(struct http_request *req, const char *name, const char *value)
{
Rework HTTP and worker processes. The HTTP layer used to make a copy of each incoming header and its value for a request. Stop doing that and make HTTP headers zero-copy all across the board. This change comes with some api function changes, notably the http_request_header() function which now takes a const char ** rather than a char ** out pointer. This commit also constifies several members of http_request, beware. Additional rework how the worker processes deal with the accept lock. Before: if a worker held the accept lock and it accepted a new connection it would release the lock for others and back off for 500ms before attempting to grab the lock again. This approach worked but under high load this starts becoming obvious. Now: - workers not holding the accept lock and not having any connections will wait less long before returning from kore_platform_event_wait(). - workers not holding the accept lock will no longer blindly wait an arbitrary amount in kore_platform_event_wait() but will look at how long until the next lock grab is and base their timeout on that. - if a worker its next_lock timeout is up and failed to grab the lock it will try again in half the time again. - the worker process holding the lock will when releasing the lock double check if it still has space for newer connections, if it does it will keep the lock until it is full. This prevents the lock from bouncing between several non busy worker processes all the time. Additional fixes: - Reduce the number of times we check the timeout list, only do it twice per second rather then every event tick. - Fix solo worker count for TLS (we actually hold two processes, not one). - Make sure we don't accidentally miscalculate the idle time causing new connections under heavy load to instantly drop. - Swap from gettimeofday() to clock_gettime() now that MacOS caught up.
2018-02-14 13:48:49 +01:00
const char *hdr;
2015-05-15 19:23:26 +02:00
if (!http_request_header(req, name, &hdr)) {
http_response(req, 400, NULL, 0);
return (KORE_RESULT_ERROR);
}
if (strcmp(hdr, value)) {
http_response(req, 400, NULL, 0);
return (KORE_RESULT_ERROR);
}
return (KORE_RESULT_OK);
}