diff --git a/Makefile b/Makefile index 502f8f0..df7ae91 100644 --- a/Makefile +++ b/Makefile @@ -6,10 +6,11 @@ KORE=kore INSTALL_DIR=$(PREFIX)/bin INCLUDE_DIR=$(PREFIX)/include/kore -S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c src/config.c \ - src/connection.c src/domain.c src/http.c src/mem.c src/module.c \ - src/net.c src/pool.c src/spdy.c src/timer.c src/validator.c \ - src/utils.c src/websocket.c src/worker.c src/zlib_dict.c +S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c \ + src/config.c src/connection.c src/domain.c src/http.c src/mem.c \ + src/msg.c src/module.c src/net.c src/pool.c src/spdy.c src/timer.c \ + src/validator.c src/utils.c src/websocket.c src/worker.c \ + src/zlib_dict.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes diff --git a/examples/messaging/.gitignore b/examples/messaging/.gitignore new file mode 100644 index 0000000..cb9d927 --- /dev/null +++ b/examples/messaging/.gitignore @@ -0,0 +1,5 @@ +*.o +.objs +messaging.so +assets.h +cert diff --git a/examples/messaging/README.md b/examples/messaging/README.md new file mode 100644 index 0000000..7a35679 --- /dev/null +++ b/examples/messaging/README.md @@ -0,0 +1,15 @@ +Kore message framework example + +Run: +``` + # kore run +``` + +Test: +``` + Perform a simple GET request against the root page. + This should trigger the example app to send a message + to the other workers which will display it. + + # curl -k https://127.0.0.1:8888 +``` diff --git a/examples/messaging/conf/messaging.conf b/examples/messaging/conf/messaging.conf new file mode 100644 index 0000000..e0558e9 --- /dev/null +++ b/examples/messaging/conf/messaging.conf @@ -0,0 +1,12 @@ +# Placeholder configuration + +bind 127.0.0.1 8888 +load ./messaging.so init +tls_dhparam dh2048.pem +workers 4 + +domain 127.0.0.1 { + certfile cert/server.crt + certkey cert/server.key + static / page +} diff --git a/examples/messaging/dh2048.pem b/examples/messaging/dh2048.pem new file mode 100644 index 0000000..511de69 --- /dev/null +++ b/examples/messaging/dh2048.pem @@ -0,0 +1,8 @@ +-----BEGIN DH PARAMETERS----- +MIIBCAKCAQEAn4f4Qn5SudFjEYPWTbUaOTLUH85YWmmPFW1+b5bRa9ygr+1wfamv +VKVT7jO8c4msSNikUf6eEfoH0H4VTCaj+Habwu+Sj+I416r3mliMD4SjNsUJrBrY +Y0QV3ZUgZz4A8ARk/WwQcRl8+ZXJz34IaLwAcpyNhoV46iHVxW0ty8ND0U4DIku/ +PNayKimu4BXWXk4RfwNVP59t8DQKqjshZ4fDnbotskmSZ+e+FHrd+Kvrq/WButvV +Bzy9fYgnUlJ82g/bziCI83R2xAdtH014fR63MpElkqdNeChb94pPbEdFlNUvYIBN +xx2vTUQMqRbB4UdG2zuzzr5j98HDdblQ+wIBAg== +-----END DH PARAMETERS----- \ No newline at end of file diff --git a/examples/messaging/src/messaging.c b/examples/messaging/src/messaging.c new file mode 100644 index 0000000..4e18621 --- /dev/null +++ b/examples/messaging/src/messaging.c @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2015 Joris Vink + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include + +/* + * This example demonstrates how to use the messaging framework + * in Kore. This framework allows you to send messages between + * your workers with custom callbacks defined per message ID. + */ + +/* Your code shouldn't use IDs < 100. */ +#define MY_MESSAGE_ID 100 + +int init(int); +int page(struct http_request *); +void received_message(const void *, u_int32_t); + +/* Initialization callback. */ +int +init(int state) +{ + if (state == KORE_MODULE_UNLOAD) + return (KORE_RESULT_OK); + + /* + * Register our message callback when the module is initialized. + * kore_msg_register() fails if the message ID already exists, + * but in our case that is OK. + */ + (void)kore_msg_register(MY_MESSAGE_ID, received_message); + + return (KORE_RESULT_OK); +} + +/* + * Callback for receiving a message MY_MESSAGE_ID. + */ +void +received_message(const void *data, u_int32_t len) +{ + kore_log(LOG_INFO, "got message (%d bytes): %.*s", len, + len, (const char *)data); +} + +/* + * Page request which will send a message to all other workers + * with the ID set to MY_MESSAGE_ID and a payload of "hello". + */ +int +page(struct http_request *req) +{ + kore_msg_send(MY_MESSAGE_ID, "hello", 5); + http_response(req, 200, NULL, 0); + + return (KORE_RESULT_OK); +} diff --git a/includes/kore.h b/includes/kore.h index 56559ec..33d8a1c 100644 --- a/includes/kore.h +++ b/includes/kore.h @@ -147,6 +147,7 @@ LIST_HEAD(listener_head, listener); #define CONN_PROTO_SPDY 1 #define CONN_PROTO_HTTP 2 #define CONN_PROTO_WEBSOCKET 3 +#define CONN_PROTO_MSG 4 #define CONN_READ_POSSIBLE 0x01 #define CONN_WRITE_POSSIBLE 0x02 @@ -170,6 +171,9 @@ LIST_HEAD(listener_head, listener); #define KORE_TIMER_ONESHOT 0x01 +#define KORE_CONNECTION_PRUNE_DISCONNECT 0 +#define KORE_CONNECTION_PRUNE_ALL 1 + struct connection { u_int8_t type; int fd; @@ -220,7 +224,8 @@ struct connection { }; TAILQ_HEAD(connection_list, connection); -extern struct connection_list worker_clients; +extern struct connection_list connections; +extern struct connection_list disconnected; struct kore_handler_params { char *name; @@ -279,6 +284,8 @@ struct kore_worker { u_int8_t id; u_int8_t cpu; pid_t pid; + int pipe[2]; + struct connection *msg[2]; u_int8_t has_lock; struct kore_module_handle *active_hdlr; }; @@ -358,6 +365,14 @@ struct kore_timer { TAILQ_ENTRY(kore_timer) list; }; +/* These are reserved message types. */ +#define KORE_MSG_WEBSOCKET 1 + +struct kore_msg { + u_int8_t id; + u_int32_t length; +}; + extern pid_t kore_pid; extern int foreground; extern int kore_debug; @@ -400,11 +415,8 @@ void kore_worker_shutdown(void); void kore_worker_dispatch_signal(int); void kore_worker_spawn(u_int16_t, u_int16_t); void kore_worker_entry(struct kore_worker *); -void kore_worker_connection_add(struct connection *); -void kore_worker_connection_move(struct connection *); -void kore_worker_connection_remove(struct connection *); -void kore_worker_websocket_broadcast(struct connection *, - void (*cb)(struct connection *, void *), void *); + +struct kore_worker *kore_worker_data(u_int8_t); void kore_platform_init(void); void kore_platform_event_init(void); @@ -439,7 +451,9 @@ int kore_tls_npn_cb(SSL *, const u_char **, unsigned int *, void *); void kore_tls_info_callback(const SSL *, int, int); void kore_connection_init(void); +void kore_connection_prune(int); struct connection *kore_connection_new(void *); +void kore_connection_check_timeout(void); int kore_connection_nonblock(int); int kore_connection_handle(struct connection *); void kore_connection_remove(struct connection *); @@ -492,6 +506,15 @@ void kore_websocket_send(struct connection *, void kore_websocket_broadcast(struct connection *, u_int8_t, void *, size_t, int); +void kore_msg_init(void); +void kore_msg_worker_init(void); +void kore_msg_parent_init(void); +void kore_msg_parent_add(struct kore_worker *); +void kore_msg_parent_remove(struct kore_worker *); +void kore_msg_send(u_int8_t, void *, u_int32_t); +int kore_msg_register(u_int8_t, + void (*cb)(const void *, u_int32_t)); + void kore_domain_init(void); int kore_domain_new(char *); void kore_module_init(void); diff --git a/src/accesslog.c b/src/accesslog.c index 644e7dc..4c48fd3 100644 --- a/src/accesslog.c +++ b/src/accesslog.c @@ -67,7 +67,7 @@ kore_accesslog_wait(void) pfd[0].events = POLLIN; pfd[0].revents = 0; - nfds = poll(pfd, 1, 1000); + nfds = poll(pfd, 1, 100); if (nfds == -1 || (pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))) { if (nfds == -1 && errno == EINTR) return (KORE_RESULT_OK); diff --git a/src/bsd.c b/src/bsd.c index f20687a..109aaff 100644 --- a/src/bsd.c +++ b/src/bsd.c @@ -82,9 +82,12 @@ kore_platform_event_init(void) event_count = (worker_max_connections * 2) + nlisteners; events = kore_calloc(event_count, sizeof(struct kevent)); - LIST_FOREACH(l, &listeners, list) { - kore_platform_event_schedule(l->fd, - EVFILT_READ, EV_ADD | EV_DISABLE, l); + /* Hack to check if we're running under the parent or not. */ + if (worker != NULL) { + LIST_FOREACH(l, &listeners, list) { + kore_platform_event_schedule(l->fd, + EVFILT_READ, EV_ADD | EV_DISABLE, l); + } } } diff --git a/src/connection.c b/src/connection.c index a8f7b98..9ca0082 100644 --- a/src/connection.c +++ b/src/connection.c @@ -25,10 +25,15 @@ #include "http.h" struct kore_pool connection_pool; +struct connection_list connections; +struct connection_list disconnected; void kore_connection_init(void) { + TAILQ_INIT(&connections); + TAILQ_INIT(&disconnected); + kore_pool_init(&connection_pool, "connection_pool", sizeof(struct connection), worker_max_connections); } @@ -118,13 +123,54 @@ kore_connection_accept(struct listener *l, struct connection **out) http_header_recv); #endif - kore_worker_connection_add(c); + TAILQ_INSERT_TAIL(&connections, c, list); kore_connection_start_idletimer(c); *out = c; return (KORE_RESULT_OK); } +void +kore_connection_check_timeout(void) +{ + struct connection *c; + u_int64_t now; + + now = kore_time_ms(); + TAILQ_FOREACH(c, &connections, list) { + if (c->proto == CONN_PROTO_MSG) + continue; + if (c->proto == CONN_PROTO_SPDY && + c->idle_timer.length == 0 && + !(c->flags & CONN_WRITE_BLOCK) && + !(c->flags & CONN_READ_BLOCK)) + continue; + if (!(c->flags & CONN_IDLE_TIMER_ACT)) + continue; + kore_connection_check_idletimer(now, c); + } +} + +void +kore_connection_prune(int all) +{ + struct connection *c, *cnext; + + if (all) { + for (c = TAILQ_FIRST(&connections); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + net_send_flush(c); + kore_connection_disconnect(c); + } + } + + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + TAILQ_REMOVE(&disconnected, c, list); + kore_connection_remove(c); + } +} + void kore_connection_disconnect(struct connection *c) { @@ -134,7 +180,8 @@ kore_connection_disconnect(struct connection *c) if (c->disconnect) c->disconnect(c); - kore_worker_connection_move(c); + TAILQ_REMOVE(&connections, c, list); + TAILQ_INSERT_TAIL(&disconnected, c, list); } } @@ -326,7 +373,6 @@ kore_connection_remove(struct connection *c) kore_mem_free(s); } - kore_worker_connection_remove(c); kore_pool_put(&connection_pool, c); } diff --git a/src/kore.c b/src/kore.c index 598ebcb..5309d31 100644 --- a/src/kore.c +++ b/src/kore.c @@ -329,6 +329,7 @@ kore_server_sslstart(void) static void kore_server_start(void) { + u_int32_t tmp; int quit; if (foreground == 0 && daemon(1, 1) == -1) @@ -347,9 +348,20 @@ kore_server_start(void) #endif kore_platform_proctitle("kore [parent]"); + kore_msg_init(); kore_worker_init(); + /* Set worker_max_connections for kore_connection_init(). */ + tmp = worker_max_connections; + worker_max_connections = worker_count; + + net_init(); + kore_connection_init(); + kore_platform_event_init(); + kore_msg_parent_init(); + quit = 0; + worker_max_connections = tmp; while (quit != 1) { if (sig_recv != 0) { switch (sig_recv) { @@ -371,10 +383,15 @@ kore_server_start(void) sig_recv = 0; } - if (!kore_accesslog_wait()) + /* XXX - The accesslog should move to our msg framework. */ + if (!kore_accesslog_wait()) { + kore_worker_dispatch_signal(SIGQUIT); break; + } kore_worker_wait(0); + kore_platform_event_wait(100); + kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); } } diff --git a/src/msg.c b/src/msg.c new file mode 100644 index 0000000..f662e44 --- /dev/null +++ b/src/msg.c @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2015 Joris Vink + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include + +#include + +#include "kore.h" +#include "http.h" + +struct msg_type { + u_int8_t id; + void (*cb)(const void *, u_int32_t); + TAILQ_ENTRY(msg_type) list; +}; + +TAILQ_HEAD(, msg_type) msg_types; + +static struct msg_type *msg_type_lookup(u_int8_t); +static int msg_recv_worker(struct netbuf *); +static int msg_recv_parent(struct netbuf *); +static int msg_recv_worker_data(struct netbuf *); +static void msg_type_websocket(const void *, u_int32_t); +static void msg_disconnected_parent(struct connection *); + +void +kore_msg_init(void) +{ + TAILQ_INIT(&msg_types); +} + +void +kore_msg_parent_init(void) +{ + u_int8_t i; + struct kore_worker *kw; + + for (i = 0; i < worker_count; i++) { + kw = kore_worker_data(i); + kore_msg_parent_add(kw); + } +} + +void +kore_msg_parent_add(struct kore_worker *kw) +{ + kw->msg[0] = kore_connection_new(NULL); + kw->msg[0]->fd = kw->pipe[0]; + kw->msg[0]->read = net_read; + kw->msg[0]->write = net_write; + kw->msg[0]->proto = CONN_PROTO_MSG; + kw->msg[0]->state = CONN_STATE_ESTABLISHED; + + TAILQ_INSERT_TAIL(&connections, kw->msg[0], list); + kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]); + + net_recv_queue(kw->msg[0], NETBUF_SEND_PAYLOAD_MAX, + NETBUF_CALL_CB_ALWAYS, msg_recv_parent); +} + +void +kore_msg_parent_remove(struct kore_worker *kw) +{ + kore_connection_disconnect(kw->msg[0]); + kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); + (void)close(kw->pipe[1]); +} + +void +kore_msg_worker_init(void) +{ + kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket); + + worker->msg[1] = kore_connection_new(NULL); + worker->msg[1]->fd = worker->pipe[1]; + worker->msg[1]->read = net_read; + worker->msg[1]->write = net_write; + worker->msg[1]->proto = CONN_PROTO_MSG; + worker->msg[1]->state = CONN_STATE_ESTABLISHED; + worker->msg[1]->disconnect = msg_disconnected_parent; + + TAILQ_INSERT_TAIL(&connections, worker->msg[1], list); + kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]); + + net_recv_queue(worker->msg[1], + sizeof(struct kore_msg), 0, msg_recv_worker); +} + +int +kore_msg_register(u_int8_t id, void (*cb)(const void *, u_int32_t)) +{ + struct msg_type *type; + + if ((type = msg_type_lookup(id)) != NULL) + return (KORE_RESULT_ERROR); + + type = kore_malloc(sizeof(*type)); + type->id = id; + type->cb = cb; + TAILQ_INSERT_TAIL(&msg_types, type, list); + + return (KORE_RESULT_OK); +} + +void +kore_msg_send(u_int8_t id, void *data, u_int32_t len) +{ + struct kore_msg m; + + m.id = id; + m.length = len; + + net_send_queue(worker->msg[1], &m, sizeof(m), NULL, NETBUF_LAST_CHAIN); + net_send_queue(worker->msg[1], data, len, NULL, NETBUF_LAST_CHAIN); + net_send_flush(worker->msg[1]); +} + +static int +msg_recv_worker(struct netbuf *nb) +{ + struct kore_msg *msg = (struct kore_msg *)nb->buf; + + net_recv_expand(nb->owner, msg->length, msg_recv_worker_data); + return (KORE_RESULT_OK); +} + +static int +msg_recv_worker_data(struct netbuf *nb) +{ + struct msg_type *type; + struct kore_msg *msg = (struct kore_msg *)nb->buf; + + if ((type = msg_type_lookup(msg->id)) != NULL) + type->cb(nb->buf + sizeof(*msg), nb->s_off - sizeof(*msg)); + + net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_worker); + return (KORE_RESULT_OK); +} + +static int +msg_recv_parent(struct netbuf *nb) +{ + struct connection *c; + + kore_log(LOG_NOTICE, "got %d bytes", nb->s_off); + + TAILQ_FOREACH(c, &connections, list) { + if (c == nb->owner) + continue; + net_send_queue(c, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN); + net_send_flush(c); + } + + net_recv_reset(nb->owner, NETBUF_SEND_PAYLOAD_MAX, msg_recv_parent); + + return (KORE_RESULT_OK); +} + +static void +msg_disconnected_parent(struct connection *c) +{ + kore_log(LOG_ERR, "parent gone, shutting down"); + if (kill(worker->pid, SIGQUIT) == -1) + kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s); +} + +static void +msg_type_websocket(const void *data, u_int32_t len) +{ +} + +static struct msg_type * +msg_type_lookup(u_int8_t id) +{ + struct msg_type *type; + + TAILQ_FOREACH(type, &msg_types, list) { + if (type->id == id) + return (type); + } + + return (NULL); +} diff --git a/src/websocket.c b/src/websocket.c old mode 100755 new mode 100644 index 1c7e120..d8ecb5a --- a/src/websocket.c +++ b/src/websocket.c @@ -35,11 +35,6 @@ #define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" -struct websocket_data { - u_int8_t op; - void *data; - size_t len; -}; u_int64_t kore_websocket_timeout = 120000; u_int64_t kore_websocket_maxframe = 16384; @@ -47,7 +42,8 @@ u_int64_t kore_websocket_maxframe = 16384; static int websocket_recv_frame(struct netbuf *); static int websocket_recv_opcode(struct netbuf *); static void websocket_disconnect(struct connection *); -static void websocket_send_single(struct connection *, void *); +static void websocket_send_single(struct connection *, + u_int8_t, void *, size_t); void websocket_send(struct connection *, u_int8_t, void *, size_t); void @@ -167,26 +163,24 @@ kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len) } void -kore_websocket_broadcast(struct connection *c, u_int8_t op, void *data, +kore_websocket_broadcast(struct connection *src, u_int8_t op, void *data, size_t len, int scope) { - struct websocket_data arg; + struct connection *c; - arg.op = op; - arg.len = len; - arg.data = data; - kore_worker_websocket_broadcast(c, websocket_send_single, &arg); + TAILQ_FOREACH(c, &connections, list) { + if (c != src && c->proto == CONN_PROTO_WEBSOCKET) + websocket_send_single(c, op, data, len); + } if (scope == WEBSOCKET_BROADCAST_GLOBAL) fatal("kore_websocket_broadcast: no global scope yet"); } static void -websocket_send_single(struct connection *c, void *args) +websocket_send_single(struct connection *c, u_int8_t op, void *data, size_t len) { - struct websocket_data *arg = args; - - kore_websocket_send(c, arg->op, arg->data, arg->len); + kore_websocket_send(c, op, data, len); net_send_flush(c); } diff --git a/src/worker.c b/src/worker.c index 8242715..8e5bc6c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -60,14 +61,12 @@ static void worker_unlock(void); static inline int kore_worker_acceptlock_obtain(void); static inline void kore_worker_acceptlock_release(void); -static struct connection_list disconnected; static struct kore_worker *kore_workers; static int shm_accept_key; static struct wlock *accept_lock; extern volatile sig_atomic_t sig_recv; struct kore_worker *worker = NULL; -struct connection_list worker_clients; u_int8_t worker_set_affinity = 1; u_int32_t worker_accept_threshold = 0; u_int32_t worker_rlimit_nofiles = 1024; @@ -125,6 +124,13 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu) kw->has_lock = 0; kw->active_hdlr = NULL; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, kw->pipe) == -1) + fatal("socketpair(): %s", errno_s); + + if (!kore_connection_nonblock(kw->pipe[0]) || + !kore_connection_nonblock(kw->pipe[1])) + fatal("could not set pipe fds to nonblocking: %s", errno_s); + kw->pid = fork(); if (kw->pid == -1) fatal("could not spawn worker child: %s", errno_s); @@ -136,6 +142,15 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu) } } +struct kore_worker * +kore_worker_data(u_int8_t id) +{ + if (id >= worker_count) + fatal("id %u too large for worker count", id); + + return (WORKER(id)); +} + void kore_worker_shutdown(void) { @@ -183,7 +198,6 @@ kore_worker_entry(struct kore_worker *kw) size_t fd; struct rlimit rl; char buf[16]; - struct connection *c, *cnext; int quit, had_lock, r; u_int64_t now, idle_check, next_lock, netwait; struct passwd *pw = NULL; @@ -261,8 +275,6 @@ kore_worker_entry(struct kore_worker *kw) kore_timer_init(); kore_connection_init(); kore_domain_load_crl(); - TAILQ_INIT(&disconnected); - TAILQ_INIT(&worker_clients); quit = 0; had_lock = 0; @@ -270,6 +282,7 @@ kore_worker_entry(struct kore_worker *kw) idle_check = 0; kore_platform_event_init(); kore_accesslog_worker_init(); + kore_msg_worker_init(); #if defined(KORE_USE_PGSQL) kore_pgsql_init(); @@ -321,78 +334,20 @@ kore_worker_entry(struct kore_worker *kw) if ((now - idle_check) >= 10000) { idle_check = now; - now = kore_time_ms(); - TAILQ_FOREACH(c, &worker_clients, list) { - if (c->proto == CONN_PROTO_SPDY && - c->idle_timer.length == 0 && - !(c->flags & CONN_WRITE_BLOCK) && - !(c->flags & CONN_READ_BLOCK)) - continue; - if (!(c->flags & CONN_IDLE_TIMER_ACT)) - continue; - kore_connection_check_idletimer(now, c); - } + kore_connection_check_timeout(); } - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - TAILQ_REMOVE(&disconnected, c, list); - kore_connection_remove(c); - } + kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); if (quit && http_request_count == 0) break; } - for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - net_send_flush(c); - kore_connection_disconnect(c); - } - - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - net_send_flush(c); - TAILQ_REMOVE(&disconnected, c, list); - kore_connection_remove(c); - } - + kore_connection_prune(KORE_CONNECTION_PRUNE_ALL); kore_debug("worker %d shutting down", kw->id); exit(0); } -void -kore_worker_connection_add(struct connection *c) -{ - TAILQ_INSERT_TAIL(&worker_clients, c, list); - worker_active_connections++; -} - -void -kore_worker_connection_move(struct connection *c) -{ - TAILQ_REMOVE(&worker_clients, c, list); - TAILQ_INSERT_TAIL(&disconnected, c, list); -} - -void -kore_worker_connection_remove(struct connection *c) -{ - worker_active_connections--; -} - -void -kore_worker_websocket_broadcast(struct connection *src, - void (*cb)(struct connection *, void *), void *args) -{ - struct connection *c; - - TAILQ_FOREACH(c, &worker_clients, list) { - if (c != src && c->proto == CONN_PROTO_WEBSOCKET) - cb(c, args); - } -} - void kore_worker_wait(int final) { @@ -447,7 +402,9 @@ kore_worker_wait(int final) } kore_log(LOG_NOTICE, "restarting worker %d", kw->id); + kore_msg_parent_remove(kw); kore_worker_spawn(kw->id, kw->cpu); + kore_msg_parent_add(kw); } else { kore_log(LOG_NOTICE, "worker %d (pid: %d) signaled us (%d)",