From 49ca95f390cbb762282da287c3611746baaa57c4 Mon Sep 17 00:00:00 2001 From: Joris Vink Date: Mon, 22 Jun 2015 21:13:32 +0200 Subject: [PATCH] Add our messaging framework. With this framework apps can now send messages between worker processes. A new API function exists: int kore_msg_register(u_int8_t id, void (*cb)(const void *, u_int32_t); This API call allows your app to register a new message callback for a given ID. You can then send messages on this ID to other workers using: void kore_msg_send(u_int8_t id, void *data, u_int32_t length); This framework will interally be used for a few things such as allowing websocket data to broadcasted between all workers, adding unified caching and hopefully eventually moving the access log to this as well. Some internals have changed with this commit: * worker_clients has been called connections. * the parent now initializes the net, and event subsystems. * kore_worker_websocket_broadcast() is dead. --- Makefile | 9 +- examples/messaging/.gitignore | 5 + examples/messaging/README.md | 15 ++ examples/messaging/conf/messaging.conf | 12 ++ examples/messaging/dh2048.pem | 8 + examples/messaging/src/messaging.c | 71 +++++++++ includes/kore.h | 35 ++++- src/accesslog.c | 2 +- src/bsd.c | 9 +- src/connection.c | 52 ++++++- src/kore.c | 19 ++- src/msg.c | 196 +++++++++++++++++++++++++ src/websocket.c | 26 ++-- src/worker.c | 89 +++-------- 14 files changed, 448 insertions(+), 100 deletions(-) create mode 100644 examples/messaging/.gitignore create mode 100644 examples/messaging/README.md create mode 100644 examples/messaging/conf/messaging.conf create mode 100644 examples/messaging/dh2048.pem create mode 100644 examples/messaging/src/messaging.c create mode 100644 src/msg.c mode change 100755 => 100644 src/websocket.c diff --git a/Makefile b/Makefile index 502f8f0..df7ae91 100644 --- a/Makefile +++ b/Makefile @@ -6,10 +6,11 @@ KORE=kore INSTALL_DIR=$(PREFIX)/bin INCLUDE_DIR=$(PREFIX)/include/kore -S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c src/config.c \ - src/connection.c src/domain.c src/http.c src/mem.c src/module.c \ - src/net.c src/pool.c src/spdy.c src/timer.c src/validator.c \ - src/utils.c src/websocket.c src/worker.c src/zlib_dict.c +S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c \ + src/config.c src/connection.c src/domain.c src/http.c src/mem.c \ + src/msg.c src/module.c src/net.c src/pool.c src/spdy.c src/timer.c \ + src/validator.c src/utils.c src/websocket.c src/worker.c \ + src/zlib_dict.c S_OBJS= $(S_SRC:.c=.o) CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes diff --git a/examples/messaging/.gitignore b/examples/messaging/.gitignore new file mode 100644 index 0000000..cb9d927 --- /dev/null +++ b/examples/messaging/.gitignore @@ -0,0 +1,5 @@ +*.o +.objs +messaging.so +assets.h +cert diff --git a/examples/messaging/README.md b/examples/messaging/README.md new file mode 100644 index 0000000..7a35679 --- /dev/null +++ b/examples/messaging/README.md @@ -0,0 +1,15 @@ +Kore message framework example + +Run: +``` + # kore run +``` + +Test: +``` + Perform a simple GET request against the root page. + This should trigger the example app to send a message + to the other workers which will display it. + + # curl -k https://127.0.0.1:8888 +``` diff --git a/examples/messaging/conf/messaging.conf b/examples/messaging/conf/messaging.conf new file mode 100644 index 0000000..e0558e9 --- /dev/null +++ b/examples/messaging/conf/messaging.conf @@ -0,0 +1,12 @@ +# Placeholder configuration + +bind 127.0.0.1 8888 +load ./messaging.so init +tls_dhparam dh2048.pem +workers 4 + +domain 127.0.0.1 { + certfile cert/server.crt + certkey cert/server.key + static / page +} diff --git a/examples/messaging/dh2048.pem b/examples/messaging/dh2048.pem new file mode 100644 index 0000000..511de69 --- /dev/null +++ b/examples/messaging/dh2048.pem @@ -0,0 +1,8 @@ +-----BEGIN DH PARAMETERS----- +MIIBCAKCAQEAn4f4Qn5SudFjEYPWTbUaOTLUH85YWmmPFW1+b5bRa9ygr+1wfamv +VKVT7jO8c4msSNikUf6eEfoH0H4VTCaj+Habwu+Sj+I416r3mliMD4SjNsUJrBrY +Y0QV3ZUgZz4A8ARk/WwQcRl8+ZXJz34IaLwAcpyNhoV46iHVxW0ty8ND0U4DIku/ +PNayKimu4BXWXk4RfwNVP59t8DQKqjshZ4fDnbotskmSZ+e+FHrd+Kvrq/WButvV +Bzy9fYgnUlJ82g/bziCI83R2xAdtH014fR63MpElkqdNeChb94pPbEdFlNUvYIBN +xx2vTUQMqRbB4UdG2zuzzr5j98HDdblQ+wIBAg== +-----END DH PARAMETERS----- \ No newline at end of file diff --git a/examples/messaging/src/messaging.c b/examples/messaging/src/messaging.c new file mode 100644 index 0000000..4e18621 --- /dev/null +++ b/examples/messaging/src/messaging.c @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2015 Joris Vink + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include + +/* + * This example demonstrates how to use the messaging framework + * in Kore. This framework allows you to send messages between + * your workers with custom callbacks defined per message ID. + */ + +/* Your code shouldn't use IDs < 100. */ +#define MY_MESSAGE_ID 100 + +int init(int); +int page(struct http_request *); +void received_message(const void *, u_int32_t); + +/* Initialization callback. */ +int +init(int state) +{ + if (state == KORE_MODULE_UNLOAD) + return (KORE_RESULT_OK); + + /* + * Register our message callback when the module is initialized. + * kore_msg_register() fails if the message ID already exists, + * but in our case that is OK. + */ + (void)kore_msg_register(MY_MESSAGE_ID, received_message); + + return (KORE_RESULT_OK); +} + +/* + * Callback for receiving a message MY_MESSAGE_ID. + */ +void +received_message(const void *data, u_int32_t len) +{ + kore_log(LOG_INFO, "got message (%d bytes): %.*s", len, + len, (const char *)data); +} + +/* + * Page request which will send a message to all other workers + * with the ID set to MY_MESSAGE_ID and a payload of "hello". + */ +int +page(struct http_request *req) +{ + kore_msg_send(MY_MESSAGE_ID, "hello", 5); + http_response(req, 200, NULL, 0); + + return (KORE_RESULT_OK); +} diff --git a/includes/kore.h b/includes/kore.h index 56559ec..33d8a1c 100644 --- a/includes/kore.h +++ b/includes/kore.h @@ -147,6 +147,7 @@ LIST_HEAD(listener_head, listener); #define CONN_PROTO_SPDY 1 #define CONN_PROTO_HTTP 2 #define CONN_PROTO_WEBSOCKET 3 +#define CONN_PROTO_MSG 4 #define CONN_READ_POSSIBLE 0x01 #define CONN_WRITE_POSSIBLE 0x02 @@ -170,6 +171,9 @@ LIST_HEAD(listener_head, listener); #define KORE_TIMER_ONESHOT 0x01 +#define KORE_CONNECTION_PRUNE_DISCONNECT 0 +#define KORE_CONNECTION_PRUNE_ALL 1 + struct connection { u_int8_t type; int fd; @@ -220,7 +224,8 @@ struct connection { }; TAILQ_HEAD(connection_list, connection); -extern struct connection_list worker_clients; +extern struct connection_list connections; +extern struct connection_list disconnected; struct kore_handler_params { char *name; @@ -279,6 +284,8 @@ struct kore_worker { u_int8_t id; u_int8_t cpu; pid_t pid; + int pipe[2]; + struct connection *msg[2]; u_int8_t has_lock; struct kore_module_handle *active_hdlr; }; @@ -358,6 +365,14 @@ struct kore_timer { TAILQ_ENTRY(kore_timer) list; }; +/* These are reserved message types. */ +#define KORE_MSG_WEBSOCKET 1 + +struct kore_msg { + u_int8_t id; + u_int32_t length; +}; + extern pid_t kore_pid; extern int foreground; extern int kore_debug; @@ -400,11 +415,8 @@ void kore_worker_shutdown(void); void kore_worker_dispatch_signal(int); void kore_worker_spawn(u_int16_t, u_int16_t); void kore_worker_entry(struct kore_worker *); -void kore_worker_connection_add(struct connection *); -void kore_worker_connection_move(struct connection *); -void kore_worker_connection_remove(struct connection *); -void kore_worker_websocket_broadcast(struct connection *, - void (*cb)(struct connection *, void *), void *); + +struct kore_worker *kore_worker_data(u_int8_t); void kore_platform_init(void); void kore_platform_event_init(void); @@ -439,7 +451,9 @@ int kore_tls_npn_cb(SSL *, const u_char **, unsigned int *, void *); void kore_tls_info_callback(const SSL *, int, int); void kore_connection_init(void); +void kore_connection_prune(int); struct connection *kore_connection_new(void *); +void kore_connection_check_timeout(void); int kore_connection_nonblock(int); int kore_connection_handle(struct connection *); void kore_connection_remove(struct connection *); @@ -492,6 +506,15 @@ void kore_websocket_send(struct connection *, void kore_websocket_broadcast(struct connection *, u_int8_t, void *, size_t, int); +void kore_msg_init(void); +void kore_msg_worker_init(void); +void kore_msg_parent_init(void); +void kore_msg_parent_add(struct kore_worker *); +void kore_msg_parent_remove(struct kore_worker *); +void kore_msg_send(u_int8_t, void *, u_int32_t); +int kore_msg_register(u_int8_t, + void (*cb)(const void *, u_int32_t)); + void kore_domain_init(void); int kore_domain_new(char *); void kore_module_init(void); diff --git a/src/accesslog.c b/src/accesslog.c index 644e7dc..4c48fd3 100644 --- a/src/accesslog.c +++ b/src/accesslog.c @@ -67,7 +67,7 @@ kore_accesslog_wait(void) pfd[0].events = POLLIN; pfd[0].revents = 0; - nfds = poll(pfd, 1, 1000); + nfds = poll(pfd, 1, 100); if (nfds == -1 || (pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))) { if (nfds == -1 && errno == EINTR) return (KORE_RESULT_OK); diff --git a/src/bsd.c b/src/bsd.c index f20687a..109aaff 100644 --- a/src/bsd.c +++ b/src/bsd.c @@ -82,9 +82,12 @@ kore_platform_event_init(void) event_count = (worker_max_connections * 2) + nlisteners; events = kore_calloc(event_count, sizeof(struct kevent)); - LIST_FOREACH(l, &listeners, list) { - kore_platform_event_schedule(l->fd, - EVFILT_READ, EV_ADD | EV_DISABLE, l); + /* Hack to check if we're running under the parent or not. */ + if (worker != NULL) { + LIST_FOREACH(l, &listeners, list) { + kore_platform_event_schedule(l->fd, + EVFILT_READ, EV_ADD | EV_DISABLE, l); + } } } diff --git a/src/connection.c b/src/connection.c index a8f7b98..9ca0082 100644 --- a/src/connection.c +++ b/src/connection.c @@ -25,10 +25,15 @@ #include "http.h" struct kore_pool connection_pool; +struct connection_list connections; +struct connection_list disconnected; void kore_connection_init(void) { + TAILQ_INIT(&connections); + TAILQ_INIT(&disconnected); + kore_pool_init(&connection_pool, "connection_pool", sizeof(struct connection), worker_max_connections); } @@ -118,13 +123,54 @@ kore_connection_accept(struct listener *l, struct connection **out) http_header_recv); #endif - kore_worker_connection_add(c); + TAILQ_INSERT_TAIL(&connections, c, list); kore_connection_start_idletimer(c); *out = c; return (KORE_RESULT_OK); } +void +kore_connection_check_timeout(void) +{ + struct connection *c; + u_int64_t now; + + now = kore_time_ms(); + TAILQ_FOREACH(c, &connections, list) { + if (c->proto == CONN_PROTO_MSG) + continue; + if (c->proto == CONN_PROTO_SPDY && + c->idle_timer.length == 0 && + !(c->flags & CONN_WRITE_BLOCK) && + !(c->flags & CONN_READ_BLOCK)) + continue; + if (!(c->flags & CONN_IDLE_TIMER_ACT)) + continue; + kore_connection_check_idletimer(now, c); + } +} + +void +kore_connection_prune(int all) +{ + struct connection *c, *cnext; + + if (all) { + for (c = TAILQ_FIRST(&connections); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + net_send_flush(c); + kore_connection_disconnect(c); + } + } + + for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { + cnext = TAILQ_NEXT(c, list); + TAILQ_REMOVE(&disconnected, c, list); + kore_connection_remove(c); + } +} + void kore_connection_disconnect(struct connection *c) { @@ -134,7 +180,8 @@ kore_connection_disconnect(struct connection *c) if (c->disconnect) c->disconnect(c); - kore_worker_connection_move(c); + TAILQ_REMOVE(&connections, c, list); + TAILQ_INSERT_TAIL(&disconnected, c, list); } } @@ -326,7 +373,6 @@ kore_connection_remove(struct connection *c) kore_mem_free(s); } - kore_worker_connection_remove(c); kore_pool_put(&connection_pool, c); } diff --git a/src/kore.c b/src/kore.c index 598ebcb..5309d31 100644 --- a/src/kore.c +++ b/src/kore.c @@ -329,6 +329,7 @@ kore_server_sslstart(void) static void kore_server_start(void) { + u_int32_t tmp; int quit; if (foreground == 0 && daemon(1, 1) == -1) @@ -347,9 +348,20 @@ kore_server_start(void) #endif kore_platform_proctitle("kore [parent]"); + kore_msg_init(); kore_worker_init(); + /* Set worker_max_connections for kore_connection_init(). */ + tmp = worker_max_connections; + worker_max_connections = worker_count; + + net_init(); + kore_connection_init(); + kore_platform_event_init(); + kore_msg_parent_init(); + quit = 0; + worker_max_connections = tmp; while (quit != 1) { if (sig_recv != 0) { switch (sig_recv) { @@ -371,10 +383,15 @@ kore_server_start(void) sig_recv = 0; } - if (!kore_accesslog_wait()) + /* XXX - The accesslog should move to our msg framework. */ + if (!kore_accesslog_wait()) { + kore_worker_dispatch_signal(SIGQUIT); break; + } kore_worker_wait(0); + kore_platform_event_wait(100); + kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); } } diff --git a/src/msg.c b/src/msg.c new file mode 100644 index 0000000..f662e44 --- /dev/null +++ b/src/msg.c @@ -0,0 +1,196 @@ +/* + * Copyright (c) 2015 Joris Vink + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include + +#include + +#include "kore.h" +#include "http.h" + +struct msg_type { + u_int8_t id; + void (*cb)(const void *, u_int32_t); + TAILQ_ENTRY(msg_type) list; +}; + +TAILQ_HEAD(, msg_type) msg_types; + +static struct msg_type *msg_type_lookup(u_int8_t); +static int msg_recv_worker(struct netbuf *); +static int msg_recv_parent(struct netbuf *); +static int msg_recv_worker_data(struct netbuf *); +static void msg_type_websocket(const void *, u_int32_t); +static void msg_disconnected_parent(struct connection *); + +void +kore_msg_init(void) +{ + TAILQ_INIT(&msg_types); +} + +void +kore_msg_parent_init(void) +{ + u_int8_t i; + struct kore_worker *kw; + + for (i = 0; i < worker_count; i++) { + kw = kore_worker_data(i); + kore_msg_parent_add(kw); + } +} + +void +kore_msg_parent_add(struct kore_worker *kw) +{ + kw->msg[0] = kore_connection_new(NULL); + kw->msg[0]->fd = kw->pipe[0]; + kw->msg[0]->read = net_read; + kw->msg[0]->write = net_write; + kw->msg[0]->proto = CONN_PROTO_MSG; + kw->msg[0]->state = CONN_STATE_ESTABLISHED; + + TAILQ_INSERT_TAIL(&connections, kw->msg[0], list); + kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]); + + net_recv_queue(kw->msg[0], NETBUF_SEND_PAYLOAD_MAX, + NETBUF_CALL_CB_ALWAYS, msg_recv_parent); +} + +void +kore_msg_parent_remove(struct kore_worker *kw) +{ + kore_connection_disconnect(kw->msg[0]); + kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); + (void)close(kw->pipe[1]); +} + +void +kore_msg_worker_init(void) +{ + kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket); + + worker->msg[1] = kore_connection_new(NULL); + worker->msg[1]->fd = worker->pipe[1]; + worker->msg[1]->read = net_read; + worker->msg[1]->write = net_write; + worker->msg[1]->proto = CONN_PROTO_MSG; + worker->msg[1]->state = CONN_STATE_ESTABLISHED; + worker->msg[1]->disconnect = msg_disconnected_parent; + + TAILQ_INSERT_TAIL(&connections, worker->msg[1], list); + kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]); + + net_recv_queue(worker->msg[1], + sizeof(struct kore_msg), 0, msg_recv_worker); +} + +int +kore_msg_register(u_int8_t id, void (*cb)(const void *, u_int32_t)) +{ + struct msg_type *type; + + if ((type = msg_type_lookup(id)) != NULL) + return (KORE_RESULT_ERROR); + + type = kore_malloc(sizeof(*type)); + type->id = id; + type->cb = cb; + TAILQ_INSERT_TAIL(&msg_types, type, list); + + return (KORE_RESULT_OK); +} + +void +kore_msg_send(u_int8_t id, void *data, u_int32_t len) +{ + struct kore_msg m; + + m.id = id; + m.length = len; + + net_send_queue(worker->msg[1], &m, sizeof(m), NULL, NETBUF_LAST_CHAIN); + net_send_queue(worker->msg[1], data, len, NULL, NETBUF_LAST_CHAIN); + net_send_flush(worker->msg[1]); +} + +static int +msg_recv_worker(struct netbuf *nb) +{ + struct kore_msg *msg = (struct kore_msg *)nb->buf; + + net_recv_expand(nb->owner, msg->length, msg_recv_worker_data); + return (KORE_RESULT_OK); +} + +static int +msg_recv_worker_data(struct netbuf *nb) +{ + struct msg_type *type; + struct kore_msg *msg = (struct kore_msg *)nb->buf; + + if ((type = msg_type_lookup(msg->id)) != NULL) + type->cb(nb->buf + sizeof(*msg), nb->s_off - sizeof(*msg)); + + net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_worker); + return (KORE_RESULT_OK); +} + +static int +msg_recv_parent(struct netbuf *nb) +{ + struct connection *c; + + kore_log(LOG_NOTICE, "got %d bytes", nb->s_off); + + TAILQ_FOREACH(c, &connections, list) { + if (c == nb->owner) + continue; + net_send_queue(c, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN); + net_send_flush(c); + } + + net_recv_reset(nb->owner, NETBUF_SEND_PAYLOAD_MAX, msg_recv_parent); + + return (KORE_RESULT_OK); +} + +static void +msg_disconnected_parent(struct connection *c) +{ + kore_log(LOG_ERR, "parent gone, shutting down"); + if (kill(worker->pid, SIGQUIT) == -1) + kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s); +} + +static void +msg_type_websocket(const void *data, u_int32_t len) +{ +} + +static struct msg_type * +msg_type_lookup(u_int8_t id) +{ + struct msg_type *type; + + TAILQ_FOREACH(type, &msg_types, list) { + if (type->id == id) + return (type); + } + + return (NULL); +} diff --git a/src/websocket.c b/src/websocket.c old mode 100755 new mode 100644 index 1c7e120..d8ecb5a --- a/src/websocket.c +++ b/src/websocket.c @@ -35,11 +35,6 @@ #define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" -struct websocket_data { - u_int8_t op; - void *data; - size_t len; -}; u_int64_t kore_websocket_timeout = 120000; u_int64_t kore_websocket_maxframe = 16384; @@ -47,7 +42,8 @@ u_int64_t kore_websocket_maxframe = 16384; static int websocket_recv_frame(struct netbuf *); static int websocket_recv_opcode(struct netbuf *); static void websocket_disconnect(struct connection *); -static void websocket_send_single(struct connection *, void *); +static void websocket_send_single(struct connection *, + u_int8_t, void *, size_t); void websocket_send(struct connection *, u_int8_t, void *, size_t); void @@ -167,26 +163,24 @@ kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len) } void -kore_websocket_broadcast(struct connection *c, u_int8_t op, void *data, +kore_websocket_broadcast(struct connection *src, u_int8_t op, void *data, size_t len, int scope) { - struct websocket_data arg; + struct connection *c; - arg.op = op; - arg.len = len; - arg.data = data; - kore_worker_websocket_broadcast(c, websocket_send_single, &arg); + TAILQ_FOREACH(c, &connections, list) { + if (c != src && c->proto == CONN_PROTO_WEBSOCKET) + websocket_send_single(c, op, data, len); + } if (scope == WEBSOCKET_BROADCAST_GLOBAL) fatal("kore_websocket_broadcast: no global scope yet"); } static void -websocket_send_single(struct connection *c, void *args) +websocket_send_single(struct connection *c, u_int8_t op, void *data, size_t len) { - struct websocket_data *arg = args; - - kore_websocket_send(c, arg->op, arg->data, arg->len); + kore_websocket_send(c, op, data, len); net_send_flush(c); } diff --git a/src/worker.c b/src/worker.c index 8242715..8e5bc6c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -60,14 +61,12 @@ static void worker_unlock(void); static inline int kore_worker_acceptlock_obtain(void); static inline void kore_worker_acceptlock_release(void); -static struct connection_list disconnected; static struct kore_worker *kore_workers; static int shm_accept_key; static struct wlock *accept_lock; extern volatile sig_atomic_t sig_recv; struct kore_worker *worker = NULL; -struct connection_list worker_clients; u_int8_t worker_set_affinity = 1; u_int32_t worker_accept_threshold = 0; u_int32_t worker_rlimit_nofiles = 1024; @@ -125,6 +124,13 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu) kw->has_lock = 0; kw->active_hdlr = NULL; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, kw->pipe) == -1) + fatal("socketpair(): %s", errno_s); + + if (!kore_connection_nonblock(kw->pipe[0]) || + !kore_connection_nonblock(kw->pipe[1])) + fatal("could not set pipe fds to nonblocking: %s", errno_s); + kw->pid = fork(); if (kw->pid == -1) fatal("could not spawn worker child: %s", errno_s); @@ -136,6 +142,15 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu) } } +struct kore_worker * +kore_worker_data(u_int8_t id) +{ + if (id >= worker_count) + fatal("id %u too large for worker count", id); + + return (WORKER(id)); +} + void kore_worker_shutdown(void) { @@ -183,7 +198,6 @@ kore_worker_entry(struct kore_worker *kw) size_t fd; struct rlimit rl; char buf[16]; - struct connection *c, *cnext; int quit, had_lock, r; u_int64_t now, idle_check, next_lock, netwait; struct passwd *pw = NULL; @@ -261,8 +275,6 @@ kore_worker_entry(struct kore_worker *kw) kore_timer_init(); kore_connection_init(); kore_domain_load_crl(); - TAILQ_INIT(&disconnected); - TAILQ_INIT(&worker_clients); quit = 0; had_lock = 0; @@ -270,6 +282,7 @@ kore_worker_entry(struct kore_worker *kw) idle_check = 0; kore_platform_event_init(); kore_accesslog_worker_init(); + kore_msg_worker_init(); #if defined(KORE_USE_PGSQL) kore_pgsql_init(); @@ -321,78 +334,20 @@ kore_worker_entry(struct kore_worker *kw) if ((now - idle_check) >= 10000) { idle_check = now; - now = kore_time_ms(); - TAILQ_FOREACH(c, &worker_clients, list) { - if (c->proto == CONN_PROTO_SPDY && - c->idle_timer.length == 0 && - !(c->flags & CONN_WRITE_BLOCK) && - !(c->flags & CONN_READ_BLOCK)) - continue; - if (!(c->flags & CONN_IDLE_TIMER_ACT)) - continue; - kore_connection_check_idletimer(now, c); - } + kore_connection_check_timeout(); } - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - TAILQ_REMOVE(&disconnected, c, list); - kore_connection_remove(c); - } + kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT); if (quit && http_request_count == 0) break; } - for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - net_send_flush(c); - kore_connection_disconnect(c); - } - - for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) { - cnext = TAILQ_NEXT(c, list); - net_send_flush(c); - TAILQ_REMOVE(&disconnected, c, list); - kore_connection_remove(c); - } - + kore_connection_prune(KORE_CONNECTION_PRUNE_ALL); kore_debug("worker %d shutting down", kw->id); exit(0); } -void -kore_worker_connection_add(struct connection *c) -{ - TAILQ_INSERT_TAIL(&worker_clients, c, list); - worker_active_connections++; -} - -void -kore_worker_connection_move(struct connection *c) -{ - TAILQ_REMOVE(&worker_clients, c, list); - TAILQ_INSERT_TAIL(&disconnected, c, list); -} - -void -kore_worker_connection_remove(struct connection *c) -{ - worker_active_connections--; -} - -void -kore_worker_websocket_broadcast(struct connection *src, - void (*cb)(struct connection *, void *), void *args) -{ - struct connection *c; - - TAILQ_FOREACH(c, &worker_clients, list) { - if (c != src && c->proto == CONN_PROTO_WEBSOCKET) - cb(c, args); - } -} - void kore_worker_wait(int final) { @@ -447,7 +402,9 @@ kore_worker_wait(int final) } kore_log(LOG_NOTICE, "restarting worker %d", kw->id); + kore_msg_parent_remove(kw); kore_worker_spawn(kw->id, kw->cpu); + kore_msg_parent_add(kw); } else { kore_log(LOG_NOTICE, "worker %d (pid: %d) signaled us (%d)",