Add our messaging framework.

With this framework apps can now send messages between worker processes.

A new API function exists:
	int kore_msg_register(u_int8_t id, void (*cb)(const void *, u_int32_t);

This API call allows your app to register a new message callback for a given ID.

You can then send messages on this ID to other workers using:
	void kore_msg_send(u_int8_t id, void *data, u_int32_t length);

This framework will interally be used for a few things such as allowing
websocket data to broadcasted between all workers, adding unified caching
and hopefully eventually moving the access log to this as well.

Some internals have changed with this commit:
	* worker_clients has been called connections.
	* the parent now initializes the net, and event subsystems.
	* kore_worker_websocket_broadcast() is dead.
This commit is contained in:
Joris Vink 2015-06-22 21:13:32 +02:00
parent 63b0c0a903
commit 49ca95f390
14 changed files with 448 additions and 100 deletions

View File

@ -6,10 +6,11 @@ KORE=kore
INSTALL_DIR=$(PREFIX)/bin
INCLUDE_DIR=$(PREFIX)/include/kore
S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c src/config.c \
src/connection.c src/domain.c src/http.c src/mem.c src/module.c \
src/net.c src/pool.c src/spdy.c src/timer.c src/validator.c \
src/utils.c src/websocket.c src/worker.c src/zlib_dict.c
S_SRC= src/kore.c src/accesslog.c src/auth.c src/buf.c src/cli.c \
src/config.c src/connection.c src/domain.c src/http.c src/mem.c \
src/msg.c src/module.c src/net.c src/pool.c src/spdy.c src/timer.c \
src/validator.c src/utils.c src/websocket.c src/worker.c \
src/zlib_dict.c
S_OBJS= $(S_SRC:.c=.o)
CFLAGS+=-Wall -Wstrict-prototypes -Wmissing-prototypes

5
examples/messaging/.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*.o
.objs
messaging.so
assets.h
cert

View File

@ -0,0 +1,15 @@
Kore message framework example
Run:
```
# kore run
```
Test:
```
Perform a simple GET request against the root page.
This should trigger the example app to send a message
to the other workers which will display it.
# curl -k https://127.0.0.1:8888
```

View File

@ -0,0 +1,12 @@
# Placeholder configuration
bind 127.0.0.1 8888
load ./messaging.so init
tls_dhparam dh2048.pem
workers 4
domain 127.0.0.1 {
certfile cert/server.crt
certkey cert/server.key
static / page
}

View File

@ -0,0 +1,8 @@
-----BEGIN DH PARAMETERS-----
MIIBCAKCAQEAn4f4Qn5SudFjEYPWTbUaOTLUH85YWmmPFW1+b5bRa9ygr+1wfamv
VKVT7jO8c4msSNikUf6eEfoH0H4VTCaj+Habwu+Sj+I416r3mliMD4SjNsUJrBrY
Y0QV3ZUgZz4A8ARk/WwQcRl8+ZXJz34IaLwAcpyNhoV46iHVxW0ty8ND0U4DIku/
PNayKimu4BXWXk4RfwNVP59t8DQKqjshZ4fDnbotskmSZ+e+FHrd+Kvrq/WButvV
Bzy9fYgnUlJ82g/bziCI83R2xAdtH014fR63MpElkqdNeChb94pPbEdFlNUvYIBN
xx2vTUQMqRbB4UdG2zuzzr5j98HDdblQ+wIBAg==
-----END DH PARAMETERS-----

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2015 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <kore/kore.h>
#include <kore/http.h>
/*
* This example demonstrates how to use the messaging framework
* in Kore. This framework allows you to send messages between
* your workers with custom callbacks defined per message ID.
*/
/* Your code shouldn't use IDs < 100. */
#define MY_MESSAGE_ID 100
int init(int);
int page(struct http_request *);
void received_message(const void *, u_int32_t);
/* Initialization callback. */
int
init(int state)
{
if (state == KORE_MODULE_UNLOAD)
return (KORE_RESULT_OK);
/*
* Register our message callback when the module is initialized.
* kore_msg_register() fails if the message ID already exists,
* but in our case that is OK.
*/
(void)kore_msg_register(MY_MESSAGE_ID, received_message);
return (KORE_RESULT_OK);
}
/*
* Callback for receiving a message MY_MESSAGE_ID.
*/
void
received_message(const void *data, u_int32_t len)
{
kore_log(LOG_INFO, "got message (%d bytes): %.*s", len,
len, (const char *)data);
}
/*
* Page request which will send a message to all other workers
* with the ID set to MY_MESSAGE_ID and a payload of "hello".
*/
int
page(struct http_request *req)
{
kore_msg_send(MY_MESSAGE_ID, "hello", 5);
http_response(req, 200, NULL, 0);
return (KORE_RESULT_OK);
}

View File

@ -147,6 +147,7 @@ LIST_HEAD(listener_head, listener);
#define CONN_PROTO_SPDY 1
#define CONN_PROTO_HTTP 2
#define CONN_PROTO_WEBSOCKET 3
#define CONN_PROTO_MSG 4
#define CONN_READ_POSSIBLE 0x01
#define CONN_WRITE_POSSIBLE 0x02
@ -170,6 +171,9 @@ LIST_HEAD(listener_head, listener);
#define KORE_TIMER_ONESHOT 0x01
#define KORE_CONNECTION_PRUNE_DISCONNECT 0
#define KORE_CONNECTION_PRUNE_ALL 1
struct connection {
u_int8_t type;
int fd;
@ -220,7 +224,8 @@ struct connection {
};
TAILQ_HEAD(connection_list, connection);
extern struct connection_list worker_clients;
extern struct connection_list connections;
extern struct connection_list disconnected;
struct kore_handler_params {
char *name;
@ -279,6 +284,8 @@ struct kore_worker {
u_int8_t id;
u_int8_t cpu;
pid_t pid;
int pipe[2];
struct connection *msg[2];
u_int8_t has_lock;
struct kore_module_handle *active_hdlr;
};
@ -358,6 +365,14 @@ struct kore_timer {
TAILQ_ENTRY(kore_timer) list;
};
/* These are reserved message types. */
#define KORE_MSG_WEBSOCKET 1
struct kore_msg {
u_int8_t id;
u_int32_t length;
};
extern pid_t kore_pid;
extern int foreground;
extern int kore_debug;
@ -400,11 +415,8 @@ void kore_worker_shutdown(void);
void kore_worker_dispatch_signal(int);
void kore_worker_spawn(u_int16_t, u_int16_t);
void kore_worker_entry(struct kore_worker *);
void kore_worker_connection_add(struct connection *);
void kore_worker_connection_move(struct connection *);
void kore_worker_connection_remove(struct connection *);
void kore_worker_websocket_broadcast(struct connection *,
void (*cb)(struct connection *, void *), void *);
struct kore_worker *kore_worker_data(u_int8_t);
void kore_platform_init(void);
void kore_platform_event_init(void);
@ -439,7 +451,9 @@ int kore_tls_npn_cb(SSL *, const u_char **, unsigned int *, void *);
void kore_tls_info_callback(const SSL *, int, int);
void kore_connection_init(void);
void kore_connection_prune(int);
struct connection *kore_connection_new(void *);
void kore_connection_check_timeout(void);
int kore_connection_nonblock(int);
int kore_connection_handle(struct connection *);
void kore_connection_remove(struct connection *);
@ -492,6 +506,15 @@ void kore_websocket_send(struct connection *,
void kore_websocket_broadcast(struct connection *,
u_int8_t, void *, size_t, int);
void kore_msg_init(void);
void kore_msg_worker_init(void);
void kore_msg_parent_init(void);
void kore_msg_parent_add(struct kore_worker *);
void kore_msg_parent_remove(struct kore_worker *);
void kore_msg_send(u_int8_t, void *, u_int32_t);
int kore_msg_register(u_int8_t,
void (*cb)(const void *, u_int32_t));
void kore_domain_init(void);
int kore_domain_new(char *);
void kore_module_init(void);

View File

@ -67,7 +67,7 @@ kore_accesslog_wait(void)
pfd[0].events = POLLIN;
pfd[0].revents = 0;
nfds = poll(pfd, 1, 1000);
nfds = poll(pfd, 1, 100);
if (nfds == -1 || (pfd[0].revents & (POLLERR | POLLHUP | POLLNVAL))) {
if (nfds == -1 && errno == EINTR)
return (KORE_RESULT_OK);

View File

@ -82,9 +82,12 @@ kore_platform_event_init(void)
event_count = (worker_max_connections * 2) + nlisteners;
events = kore_calloc(event_count, sizeof(struct kevent));
LIST_FOREACH(l, &listeners, list) {
kore_platform_event_schedule(l->fd,
EVFILT_READ, EV_ADD | EV_DISABLE, l);
/* Hack to check if we're running under the parent or not. */
if (worker != NULL) {
LIST_FOREACH(l, &listeners, list) {
kore_platform_event_schedule(l->fd,
EVFILT_READ, EV_ADD | EV_DISABLE, l);
}
}
}

View File

@ -25,10 +25,15 @@
#include "http.h"
struct kore_pool connection_pool;
struct connection_list connections;
struct connection_list disconnected;
void
kore_connection_init(void)
{
TAILQ_INIT(&connections);
TAILQ_INIT(&disconnected);
kore_pool_init(&connection_pool, "connection_pool",
sizeof(struct connection), worker_max_connections);
}
@ -118,13 +123,54 @@ kore_connection_accept(struct listener *l, struct connection **out)
http_header_recv);
#endif
kore_worker_connection_add(c);
TAILQ_INSERT_TAIL(&connections, c, list);
kore_connection_start_idletimer(c);
*out = c;
return (KORE_RESULT_OK);
}
void
kore_connection_check_timeout(void)
{
struct connection *c;
u_int64_t now;
now = kore_time_ms();
TAILQ_FOREACH(c, &connections, list) {
if (c->proto == CONN_PROTO_MSG)
continue;
if (c->proto == CONN_PROTO_SPDY &&
c->idle_timer.length == 0 &&
!(c->flags & CONN_WRITE_BLOCK) &&
!(c->flags & CONN_READ_BLOCK))
continue;
if (!(c->flags & CONN_IDLE_TIMER_ACT))
continue;
kore_connection_check_idletimer(now, c);
}
}
void
kore_connection_prune(int all)
{
struct connection *c, *cnext;
if (all) {
for (c = TAILQ_FIRST(&connections); c != NULL; c = cnext) {
cnext = TAILQ_NEXT(c, list);
net_send_flush(c);
kore_connection_disconnect(c);
}
}
for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
cnext = TAILQ_NEXT(c, list);
TAILQ_REMOVE(&disconnected, c, list);
kore_connection_remove(c);
}
}
void
kore_connection_disconnect(struct connection *c)
{
@ -134,7 +180,8 @@ kore_connection_disconnect(struct connection *c)
if (c->disconnect)
c->disconnect(c);
kore_worker_connection_move(c);
TAILQ_REMOVE(&connections, c, list);
TAILQ_INSERT_TAIL(&disconnected, c, list);
}
}
@ -326,7 +373,6 @@ kore_connection_remove(struct connection *c)
kore_mem_free(s);
}
kore_worker_connection_remove(c);
kore_pool_put(&connection_pool, c);
}

View File

@ -329,6 +329,7 @@ kore_server_sslstart(void)
static void
kore_server_start(void)
{
u_int32_t tmp;
int quit;
if (foreground == 0 && daemon(1, 1) == -1)
@ -347,9 +348,20 @@ kore_server_start(void)
#endif
kore_platform_proctitle("kore [parent]");
kore_msg_init();
kore_worker_init();
/* Set worker_max_connections for kore_connection_init(). */
tmp = worker_max_connections;
worker_max_connections = worker_count;
net_init();
kore_connection_init();
kore_platform_event_init();
kore_msg_parent_init();
quit = 0;
worker_max_connections = tmp;
while (quit != 1) {
if (sig_recv != 0) {
switch (sig_recv) {
@ -371,10 +383,15 @@ kore_server_start(void)
sig_recv = 0;
}
if (!kore_accesslog_wait())
/* XXX - The accesslog should move to our msg framework. */
if (!kore_accesslog_wait()) {
kore_worker_dispatch_signal(SIGQUIT);
break;
}
kore_worker_wait(0);
kore_platform_event_wait(100);
kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
}
}

196
src/msg.c Normal file
View File

@ -0,0 +1,196 @@
/*
* Copyright (c) 2015 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <sys/socket.h>
#include <signal.h>
#include "kore.h"
#include "http.h"
struct msg_type {
u_int8_t id;
void (*cb)(const void *, u_int32_t);
TAILQ_ENTRY(msg_type) list;
};
TAILQ_HEAD(, msg_type) msg_types;
static struct msg_type *msg_type_lookup(u_int8_t);
static int msg_recv_worker(struct netbuf *);
static int msg_recv_parent(struct netbuf *);
static int msg_recv_worker_data(struct netbuf *);
static void msg_type_websocket(const void *, u_int32_t);
static void msg_disconnected_parent(struct connection *);
void
kore_msg_init(void)
{
TAILQ_INIT(&msg_types);
}
void
kore_msg_parent_init(void)
{
u_int8_t i;
struct kore_worker *kw;
for (i = 0; i < worker_count; i++) {
kw = kore_worker_data(i);
kore_msg_parent_add(kw);
}
}
void
kore_msg_parent_add(struct kore_worker *kw)
{
kw->msg[0] = kore_connection_new(NULL);
kw->msg[0]->fd = kw->pipe[0];
kw->msg[0]->read = net_read;
kw->msg[0]->write = net_write;
kw->msg[0]->proto = CONN_PROTO_MSG;
kw->msg[0]->state = CONN_STATE_ESTABLISHED;
TAILQ_INSERT_TAIL(&connections, kw->msg[0], list);
kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]);
net_recv_queue(kw->msg[0], NETBUF_SEND_PAYLOAD_MAX,
NETBUF_CALL_CB_ALWAYS, msg_recv_parent);
}
void
kore_msg_parent_remove(struct kore_worker *kw)
{
kore_connection_disconnect(kw->msg[0]);
kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
(void)close(kw->pipe[1]);
}
void
kore_msg_worker_init(void)
{
kore_msg_register(KORE_MSG_WEBSOCKET, msg_type_websocket);
worker->msg[1] = kore_connection_new(NULL);
worker->msg[1]->fd = worker->pipe[1];
worker->msg[1]->read = net_read;
worker->msg[1]->write = net_write;
worker->msg[1]->proto = CONN_PROTO_MSG;
worker->msg[1]->state = CONN_STATE_ESTABLISHED;
worker->msg[1]->disconnect = msg_disconnected_parent;
TAILQ_INSERT_TAIL(&connections, worker->msg[1], list);
kore_platform_event_all(worker->msg[1]->fd, worker->msg[1]);
net_recv_queue(worker->msg[1],
sizeof(struct kore_msg), 0, msg_recv_worker);
}
int
kore_msg_register(u_int8_t id, void (*cb)(const void *, u_int32_t))
{
struct msg_type *type;
if ((type = msg_type_lookup(id)) != NULL)
return (KORE_RESULT_ERROR);
type = kore_malloc(sizeof(*type));
type->id = id;
type->cb = cb;
TAILQ_INSERT_TAIL(&msg_types, type, list);
return (KORE_RESULT_OK);
}
void
kore_msg_send(u_int8_t id, void *data, u_int32_t len)
{
struct kore_msg m;
m.id = id;
m.length = len;
net_send_queue(worker->msg[1], &m, sizeof(m), NULL, NETBUF_LAST_CHAIN);
net_send_queue(worker->msg[1], data, len, NULL, NETBUF_LAST_CHAIN);
net_send_flush(worker->msg[1]);
}
static int
msg_recv_worker(struct netbuf *nb)
{
struct kore_msg *msg = (struct kore_msg *)nb->buf;
net_recv_expand(nb->owner, msg->length, msg_recv_worker_data);
return (KORE_RESULT_OK);
}
static int
msg_recv_worker_data(struct netbuf *nb)
{
struct msg_type *type;
struct kore_msg *msg = (struct kore_msg *)nb->buf;
if ((type = msg_type_lookup(msg->id)) != NULL)
type->cb(nb->buf + sizeof(*msg), nb->s_off - sizeof(*msg));
net_recv_reset(nb->owner, sizeof(struct kore_msg), msg_recv_worker);
return (KORE_RESULT_OK);
}
static int
msg_recv_parent(struct netbuf *nb)
{
struct connection *c;
kore_log(LOG_NOTICE, "got %d bytes", nb->s_off);
TAILQ_FOREACH(c, &connections, list) {
if (c == nb->owner)
continue;
net_send_queue(c, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN);
net_send_flush(c);
}
net_recv_reset(nb->owner, NETBUF_SEND_PAYLOAD_MAX, msg_recv_parent);
return (KORE_RESULT_OK);
}
static void
msg_disconnected_parent(struct connection *c)
{
kore_log(LOG_ERR, "parent gone, shutting down");
if (kill(worker->pid, SIGQUIT) == -1)
kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s);
}
static void
msg_type_websocket(const void *data, u_int32_t len)
{
}
static struct msg_type *
msg_type_lookup(u_int8_t id)
{
struct msg_type *type;
TAILQ_FOREACH(type, &msg_types, list) {
if (type->id == id)
return (type);
}
return (NULL);
}

26
src/websocket.c Executable file → Normal file
View File

@ -35,11 +35,6 @@
#define WEBSOCKET_SERVER_RESPONSE "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
struct websocket_data {
u_int8_t op;
void *data;
size_t len;
};
u_int64_t kore_websocket_timeout = 120000;
u_int64_t kore_websocket_maxframe = 16384;
@ -47,7 +42,8 @@ u_int64_t kore_websocket_maxframe = 16384;
static int websocket_recv_frame(struct netbuf *);
static int websocket_recv_opcode(struct netbuf *);
static void websocket_disconnect(struct connection *);
static void websocket_send_single(struct connection *, void *);
static void websocket_send_single(struct connection *,
u_int8_t, void *, size_t);
void websocket_send(struct connection *, u_int8_t, void *, size_t);
void
@ -167,26 +163,24 @@ kore_websocket_send(struct connection *c, u_int8_t op, void *data, size_t len)
}
void
kore_websocket_broadcast(struct connection *c, u_int8_t op, void *data,
kore_websocket_broadcast(struct connection *src, u_int8_t op, void *data,
size_t len, int scope)
{
struct websocket_data arg;
struct connection *c;
arg.op = op;
arg.len = len;
arg.data = data;
kore_worker_websocket_broadcast(c, websocket_send_single, &arg);
TAILQ_FOREACH(c, &connections, list) {
if (c != src && c->proto == CONN_PROTO_WEBSOCKET)
websocket_send_single(c, op, data, len);
}
if (scope == WEBSOCKET_BROADCAST_GLOBAL)
fatal("kore_websocket_broadcast: no global scope yet");
}
static void
websocket_send_single(struct connection *c, void *args)
websocket_send_single(struct connection *c, u_int8_t op, void *data, size_t len)
{
struct websocket_data *arg = args;
kore_websocket_send(c, arg->op, arg->data, arg->len);
kore_websocket_send(c, op, data, len);
net_send_flush(c);
}

View File

@ -19,6 +19,7 @@
#include <sys/wait.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <grp.h>
@ -60,14 +61,12 @@ static void worker_unlock(void);
static inline int kore_worker_acceptlock_obtain(void);
static inline void kore_worker_acceptlock_release(void);
static struct connection_list disconnected;
static struct kore_worker *kore_workers;
static int shm_accept_key;
static struct wlock *accept_lock;
extern volatile sig_atomic_t sig_recv;
struct kore_worker *worker = NULL;
struct connection_list worker_clients;
u_int8_t worker_set_affinity = 1;
u_int32_t worker_accept_threshold = 0;
u_int32_t worker_rlimit_nofiles = 1024;
@ -125,6 +124,13 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu)
kw->has_lock = 0;
kw->active_hdlr = NULL;
if (socketpair(AF_UNIX, SOCK_STREAM, 0, kw->pipe) == -1)
fatal("socketpair(): %s", errno_s);
if (!kore_connection_nonblock(kw->pipe[0]) ||
!kore_connection_nonblock(kw->pipe[1]))
fatal("could not set pipe fds to nonblocking: %s", errno_s);
kw->pid = fork();
if (kw->pid == -1)
fatal("could not spawn worker child: %s", errno_s);
@ -136,6 +142,15 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu)
}
}
struct kore_worker *
kore_worker_data(u_int8_t id)
{
if (id >= worker_count)
fatal("id %u too large for worker count", id);
return (WORKER(id));
}
void
kore_worker_shutdown(void)
{
@ -183,7 +198,6 @@ kore_worker_entry(struct kore_worker *kw)
size_t fd;
struct rlimit rl;
char buf[16];
struct connection *c, *cnext;
int quit, had_lock, r;
u_int64_t now, idle_check, next_lock, netwait;
struct passwd *pw = NULL;
@ -261,8 +275,6 @@ kore_worker_entry(struct kore_worker *kw)
kore_timer_init();
kore_connection_init();
kore_domain_load_crl();
TAILQ_INIT(&disconnected);
TAILQ_INIT(&worker_clients);
quit = 0;
had_lock = 0;
@ -270,6 +282,7 @@ kore_worker_entry(struct kore_worker *kw)
idle_check = 0;
kore_platform_event_init();
kore_accesslog_worker_init();
kore_msg_worker_init();
#if defined(KORE_USE_PGSQL)
kore_pgsql_init();
@ -321,78 +334,20 @@ kore_worker_entry(struct kore_worker *kw)
if ((now - idle_check) >= 10000) {
idle_check = now;
now = kore_time_ms();
TAILQ_FOREACH(c, &worker_clients, list) {
if (c->proto == CONN_PROTO_SPDY &&
c->idle_timer.length == 0 &&
!(c->flags & CONN_WRITE_BLOCK) &&
!(c->flags & CONN_READ_BLOCK))
continue;
if (!(c->flags & CONN_IDLE_TIMER_ACT))
continue;
kore_connection_check_idletimer(now, c);
}
kore_connection_check_timeout();
}
for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
cnext = TAILQ_NEXT(c, list);
TAILQ_REMOVE(&disconnected, c, list);
kore_connection_remove(c);
}
kore_connection_prune(KORE_CONNECTION_PRUNE_DISCONNECT);
if (quit && http_request_count == 0)
break;
}
for (c = TAILQ_FIRST(&worker_clients); c != NULL; c = cnext) {
cnext = TAILQ_NEXT(c, list);
net_send_flush(c);
kore_connection_disconnect(c);
}
for (c = TAILQ_FIRST(&disconnected); c != NULL; c = cnext) {
cnext = TAILQ_NEXT(c, list);
net_send_flush(c);
TAILQ_REMOVE(&disconnected, c, list);
kore_connection_remove(c);
}
kore_connection_prune(KORE_CONNECTION_PRUNE_ALL);
kore_debug("worker %d shutting down", kw->id);
exit(0);
}
void
kore_worker_connection_add(struct connection *c)
{
TAILQ_INSERT_TAIL(&worker_clients, c, list);
worker_active_connections++;
}
void
kore_worker_connection_move(struct connection *c)
{
TAILQ_REMOVE(&worker_clients, c, list);
TAILQ_INSERT_TAIL(&disconnected, c, list);
}
void
kore_worker_connection_remove(struct connection *c)
{
worker_active_connections--;
}
void
kore_worker_websocket_broadcast(struct connection *src,
void (*cb)(struct connection *, void *), void *args)
{
struct connection *c;
TAILQ_FOREACH(c, &worker_clients, list) {
if (c != src && c->proto == CONN_PROTO_WEBSOCKET)
cb(c, args);
}
}
void
kore_worker_wait(int final)
{
@ -447,7 +402,9 @@ kore_worker_wait(int final)
}
kore_log(LOG_NOTICE, "restarting worker %d", kw->id);
kore_msg_parent_remove(kw);
kore_worker_spawn(kw->id, kw->cpu);
kore_msg_parent_add(kw);
} else {
kore_log(LOG_NOTICE,
"worker %d (pid: %d) signaled us (%d)",