Rework the way worker processes give each other the accept lock.

Instead of waiting until one worker is filled up on connections
the workers find the next lowest loaded worker and will hand
over the lock to them instead. This will cause a nicer spread of load.

Instead of running one accept per event loop, we attempt to accept
as many as worker_max_connections allows.

Refactor net sending/recv code a bit.
This commit is contained in:
Joris Vink 2013-06-27 00:22:48 +02:00
parent f707749cf4
commit 2fc5233358
7 changed files with 230 additions and 182 deletions

View File

@ -113,6 +113,7 @@ struct kore_module_handle {
struct kore_worker {
u_int16_t id;
u_int16_t cpu;
u_int16_t load;
pid_t pid;
TAILQ_ENTRY(kore_worker) list;
};
@ -157,6 +158,7 @@ extern char kore_version_string[];
extern u_int16_t cpu_count;
extern u_int8_t worker_count;
extern u_int32_t worker_max_connections;
extern u_int32_t worker_active_connections;
extern struct listener server;
extern struct kore_worker *worker;
@ -174,7 +176,7 @@ void kore_worker_connection_move(struct connection *);
void kore_worker_connection_remove(struct connection *);
void kore_platform_event_init(void);
void kore_platform_event_wait(void);
int kore_platform_event_wait(void);
void kore_platform_proctitle(char *);
void kore_platform_enable_accept(void);
void kore_platform_disable_accept(void);
@ -184,7 +186,7 @@ void kore_platform_worker_setcpu(struct kore_worker *);
void kore_platform_init(void);
void kore_accesslog_init(void);
int kore_accesslog_wait(void);
void kore_worker_spawn(u_int16_t);
void kore_worker_spawn(u_int16_t, u_int16_t);
void kore_accesslog_worker_init(void);
void kore_worker_entry(struct kore_worker *);
int kore_ssl_sni_cb(SSL *, int *, void *);

View File

@ -4,8 +4,7 @@
bind 10.211.55.3 443
# The path worker processes will chroot too after starting.
#chroot /home/joris/src/kore
chroot /tmp
chroot /home/joris/src/kore
# Worker processes will run as the specified user.
runas joris
@ -44,7 +43,7 @@ load modules/example/example.module
# Syntax:
# handler path module_callback
# Example domain that responds to 10.211.55.33.
# Example domain that responds to 10.211.55.3.
domain 10.211.55.3 {
certfile cert/server.crt
certkey cert/server.key

View File

@ -74,19 +74,19 @@ kore_platform_event_init(void)
EVFILT_READ, EV_ADD | EV_DISABLE, &server);
}
void
int
kore_platform_event_wait(void)
{
struct connection *c;
struct timespec timeo;
int n, i, *fd;
int n, i, *fd, count;
timeo.tv_sec = 0;
timeo.tv_nsec = 100000000;
n = kevent(kfd, changelist, nchanges, events, KQUEUE_EVENTS, &timeo);
if (n == -1) {
if (errno == EINTR)
return;
return (0);
fatal("kevent(): %s", errno_s);
}
@ -94,6 +94,7 @@ kore_platform_event_wait(void)
if (n > 0)
kore_debug("main(): %d sockets available", n);
count = 0;
for (i = 0; i < n; i++) {
fd = (int *)events[i].udata;
@ -108,14 +109,18 @@ kore_platform_event_wait(void)
}
if (*fd == server.fd) {
kore_connection_accept(&server, &c);
if (c == NULL)
continue;
while (worker_active_connections <
worker_max_connections) {
kore_connection_accept(&server, &c);
if (c == NULL)
continue;
kore_platform_event_schedule(c->fd,
EVFILT_READ, EV_ADD, c);
kore_platform_event_schedule(c->fd,
EVFILT_WRITE, EV_ADD | EV_ONESHOT, c);
count++;
kore_platform_event_schedule(c->fd,
EVFILT_READ, EV_ADD, c);
kore_platform_event_schedule(c->fd,
EVFILT_WRITE, EV_ADD | EV_ONESHOT, c);
}
} else {
c = (struct connection *)events[i].udata;
if (events[i].filter == EVFILT_READ)
@ -134,6 +139,8 @@ kore_platform_event_wait(void)
}
}
}
return (count);
}
void

View File

@ -80,7 +80,6 @@ main(int argc, char *argv[])
if (getuid() != 0)
fatal("kore must be started as root");
kore_debug = 0;
while ((ch = getopt(argc, argv, "c:d")) != -1) {
switch (ch) {
case 'c':

View File

@ -83,22 +83,23 @@ kore_platform_event_init(void)
events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event));
}
void
int
kore_platform_event_wait(void)
{
struct connection *c;
int n, i, a, *fd;
int n, i, *fd, count;
n = epoll_wait(efd, events, EPOLL_EVENTS, 10);
n = epoll_wait(efd, events, EPOLL_EVENTS, 100);
if (n == -1) {
if (errno == EINTR)
return;
return (0);
fatal("epoll_wait(): %s", errno_s);
}
if (n > 0)
kore_debug("main(): %d sockets available", n);
count = 0;
for (i = 0; i < n; i++) {
fd = (int *)events[i].data.ptr;
@ -113,11 +114,13 @@ kore_platform_event_wait(void)
}
if (*fd == server.fd) {
for (a = 0; a < 10; a++) {
while (worker_active_connections <
worker_max_connections) {
kore_connection_accept(&server, &c);
if (c == NULL)
break;
count++;
kore_platform_event_schedule(c->fd,
EPOLLIN | EPOLLOUT | EPOLLET, 0, c);
}
@ -132,6 +135,8 @@ kore_platform_event_wait(void)
kore_connection_disconnect(c);
}
}
return (count);
}
void

150
src/net.c
View File

@ -112,53 +112,53 @@ net_send(struct connection *c)
int r;
struct netbuf *nb;
if (TAILQ_EMPTY(&(c->send_queue)))
return (KORE_RESULT_OK);
nb = TAILQ_FIRST(&(c->send_queue));
if (nb->len == 0) {
kore_debug("net_send(): len is 0");
return (KORE_RESULT_ERROR);
}
r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset));
kore_debug("net_send(%ld/%ld bytes), progress with %d",
nb->offset, nb->len, r);
if (r <= 0) {
r = SSL_get_error(c->ssl, r);
switch (r) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
c->flags &= ~CONN_WRITE_POSSIBLE;
return (KORE_RESULT_OK);
default:
kore_debug("SSL_write(): %s", ssl_errno_s);
while (!TAILQ_EMPTY(&(c->send_queue))) {
nb = TAILQ_FIRST(&(c->send_queue));
if (nb->len == 0) {
kore_debug("net_send(): len is 0");
return (KORE_RESULT_ERROR);
}
}
nb->offset += (size_t)r;
if (nb->offset == nb->len) {
if (nb->offset == nb->len)
r = SSL_write(c->ssl,
(nb->buf + nb->offset), (nb->len - nb->offset));
kore_debug("net_send(%ld/%ld bytes), progress with %d",
nb->offset, nb->len, r);
if (r <= 0) {
r = SSL_get_error(c->ssl, r);
switch (r) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
c->flags &= ~CONN_WRITE_POSSIBLE;
return (KORE_RESULT_OK);
default:
kore_debug("SSL_write(): %s", ssl_errno_s);
return (KORE_RESULT_ERROR);
}
}
nb->offset += (size_t)r;
if (nb->offset == nb->len) {
TAILQ_REMOVE(&(c->send_queue), nb, list);
if (nb->cb != NULL)
r = nb->cb(nb);
else
r = KORE_RESULT_OK;
if (nb->cb != NULL)
r = nb->cb(nb);
else
r = KORE_RESULT_OK;
if (nb->offset == nb->len) {
if (nb->buf != NULL)
free(nb->buf);
free(nb);
if (nb->offset == nb->len) {
if (nb->buf != NULL)
free(nb->buf);
free(nb);
}
if (r != KORE_RESULT_OK)
return (r);
}
} else {
r = KORE_RESULT_OK;
}
return (r);
return (KORE_RESULT_OK);
}
int
@ -181,50 +181,56 @@ net_recv(struct connection *c)
int r;
struct netbuf *nb;
if (TAILQ_EMPTY(&(c->recv_queue)))
return (KORE_RESULT_ERROR);
nb = TAILQ_FIRST(&(c->recv_queue));
r = SSL_read(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset));
kore_debug("net_recv(%ld/%ld bytes), progress with %d",
nb->offset, nb->len, r);
if (r <= 0) {
r = SSL_get_error(c->ssl, r);
switch (r) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
c->flags &= ~CONN_READ_POSSIBLE;
return (KORE_RESULT_OK);
default:
kore_debug("SSL_read(): %s", ssl_errno_s);
return (KORE_RESULT_ERROR);
}
}
nb->offset += (size_t)r;
if (nb->offset == nb->len || (nb->flags & NETBUF_CALL_CB_ALWAYS)) {
while (!TAILQ_EMPTY(&(c->recv_queue))) {
nb = TAILQ_FIRST(&(c->recv_queue));
if (nb->cb == NULL) {
kore_debug("kore_read_client(): nb->cb == NULL");
return (KORE_RESULT_ERROR);
}
r = nb->cb(nb);
if (nb->offset == nb->len ||
(nb->flags & NETBUF_FORCE_REMOVE)) {
TAILQ_REMOVE(&(c->recv_queue), nb, list);
r = SSL_read(c->ssl,
(nb->buf + nb->offset), (nb->len - nb->offset));
if (!(nb->flags & NETBUF_RETAIN)) {
free(nb->buf);
free(nb);
kore_debug("net_recv(%ld/%ld bytes), progress with %d",
nb->offset, nb->len, r);
if (r <= 0) {
r = SSL_get_error(c->ssl, r);
switch (r) {
case SSL_ERROR_WANT_READ:
c->flags &= ~CONN_READ_POSSIBLE;
if (nb->flags & NETBUF_CALL_CB_ALWAYS)
goto handle;
return (KORE_RESULT_OK);
case SSL_ERROR_WANT_WRITE:
c->flags &= ~CONN_READ_POSSIBLE;
return (KORE_RESULT_OK);
default:
kore_debug("SSL_read(): %s", ssl_errno_s);
return (KORE_RESULT_ERROR);
}
}
} else {
r = KORE_RESULT_OK;
nb->offset += (size_t)r;
if (nb->offset == nb->len) {
handle:
r = nb->cb(nb);
if (nb->offset == nb->len ||
(nb->flags & NETBUF_FORCE_REMOVE)) {
TAILQ_REMOVE(&(c->recv_queue), nb, list);
if (!(nb->flags & NETBUF_RETAIN)) {
free(nb->buf);
free(nb);
}
}
if (r != KORE_RESULT_OK)
return (r);
}
}
return (r);
return (KORE_RESULT_OK);
}
int

View File

@ -47,57 +47,66 @@
#include "kore.h"
#include "http.h"
#define KORE_SHM_KEY 15000
#if defined(KORE_USE_SEMAPHORE)
#define kore_trylock sem_trywait
#define kore_unlock sem_post
static sem_t *kore_accept_lock;
#if defined(WORKER_DEBUG)
#define worker_debug(fmt, ...) printf(fmt, ##__VA_ARGS__)
#else
#define kore_trylock kore_internal_trylock
#define kore_unlock kore_internal_unlock
static int *kore_accept_lock;
static int kore_internal_trylock(int *);
static int kore_internal_unlock(int *);
#define worker_debug(fmt, ...)
#endif
#define KORE_SHM_KEY 15000
#define WORKER(id) \
(struct kore_worker *)kore_workers + (sizeof(struct kore_worker) * id)
struct wlock {
pid_t lock;
pid_t next;
pid_t current;
u_int16_t workerid;
};
static int worker_trylock(void);
static int worker_unlock(void);
static void worker_decide_next(void);
static void kore_worker_acceptlock_obtain(void);
static void kore_worker_acceptlock_release(void);
static u_int16_t workerid = 0;
static TAILQ_HEAD(, connection) disconnected;
static TAILQ_HEAD(, connection) worker_clients;
static TAILQ_HEAD(, kore_worker) kore_workers;
static struct kore_worker *kore_workers;
static int shm_accept_key;
static u_int32_t worker_active_connections = 0;
static struct wlock *accept_lock;
static u_int8_t worker_has_acceptlock = 0;
extern volatile sig_atomic_t sig_recv;
struct kore_worker *worker = NULL;
u_int32_t worker_max_connections = 250;
u_int32_t worker_active_connections = 0;
void
kore_worker_init(void)
{
size_t len;
u_int16_t i, cpu;
if (worker_count == 0)
fatal("no workers specified");
shm_accept_key = shmget(KORE_SHM_KEY,
sizeof(*kore_accept_lock), IPC_CREAT | IPC_EXCL | 0700);
len = sizeof(*accept_lock) +
(sizeof(struct kore_worker) * worker_count);
shm_accept_key = shmget(KORE_SHM_KEY, len, IPC_CREAT | IPC_EXCL | 0700);
if (shm_accept_key == -1)
fatal("kore_worker_init(): shmget() %s", errno_s);
if ((kore_accept_lock = shmat(shm_accept_key, NULL, 0)) == NULL)
if ((accept_lock = shmat(shm_accept_key, NULL, 0)) == NULL)
fatal("kore_worker_init(): shmat() %s", errno_s);
#if defined(KORE_USE_SEMAPHORE)
if (sem_init(kore_accept_lock, 1, 1) == -1)
fatal("kore_worker_init(): sem_init() %s", errno_s);
#else
*kore_accept_lock = 0;
#endif
accept_lock->lock = 0;
accept_lock->current = 0;
accept_lock->workerid = 1;
kore_workers = (struct kore_worker *)accept_lock + sizeof(*accept_lock);
memset(kore_workers, 0, sizeof(struct kore_worker) * worker_count);
kore_debug("kore_worker_init(): system has %d cpu's", cpu_count);
kore_debug("kore_worker_init(): starting %d workers", worker_count);
@ -105,22 +114,22 @@ kore_worker_init(void)
kore_debug("kore_worker_init(): more workers then cpu's");
cpu = 0;
TAILQ_INIT(&kore_workers);
for (i = 0; i < worker_count; i++) {
kore_worker_spawn(cpu++);
kore_worker_spawn(i, cpu++);
if (cpu == cpu_count)
cpu = 0;
}
}
void
kore_worker_spawn(u_int16_t cpu)
kore_worker_spawn(u_int16_t id, u_int16_t cpu)
{
struct kore_worker *kw;
kw = (struct kore_worker *)kore_malloc(sizeof(*kw));
kw->id = workerid++;
kw = WORKER(id);
kw->id = id;
kw->cpu = cpu;
kw->load = 0;
kw->pid = fork();
if (kw->pid == -1)
fatal("could not spawn worker child: %s", errno_s);
@ -130,16 +139,28 @@ kore_worker_spawn(u_int16_t cpu)
kore_worker_entry(kw);
/* NOTREACHED */
}
TAILQ_INSERT_TAIL(&kore_workers, kw, list);
}
void
kore_worker_shutdown(void)
{
struct kore_worker *kw;
u_int16_t id, done;
kore_log(LOG_NOTICE, "waiting for workers to drain and shutdown");
while (!TAILQ_EMPTY(&kore_workers))
kore_worker_wait(1);
for (;;) {
done = 0;
for (id = 0; id < worker_count; id++) {
kw = WORKER(id);
if (kw->pid != 0)
kore_worker_wait(1);
else
done++;
}
if (done == worker_count)
break;
}
if (shmctl(shm_accept_key, IPC_RMID, NULL) == -1) {
kore_log(LOG_NOTICE,
@ -150,9 +171,11 @@ kore_worker_shutdown(void)
void
kore_worker_dispatch_signal(int sig)
{
u_int16_t id;
struct kore_worker *kw;
TAILQ_FOREACH(kw, &kore_workers, list) {
for (id = 0; id < worker_count; id++) {
kw = WORKER(id);
if (kill(kw->pid, sig) == -1)
kore_debug("kill(%d, %d): %s", kw->pid, sig, errno_s);
}
@ -162,10 +185,8 @@ void
kore_worker_entry(struct kore_worker *kw)
{
int quit;
u_int32_t lowat;
char buf[16];
struct connection *c, *cnext;
struct kore_worker *k, *next;
worker = kw;
@ -181,15 +202,6 @@ kore_worker_entry(struct kore_worker *kw)
kore_platform_proctitle(buf);
kore_platform_worker_setcpu(kw);
for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) {
next = TAILQ_NEXT(k, list);
if (k == worker)
continue;
TAILQ_REMOVE(&kore_workers, k, list);
free(k);
}
kore_pid = kw->pid;
sig_recv = 0;
@ -205,7 +217,6 @@ kore_worker_entry(struct kore_worker *kw)
kore_platform_event_init();
kore_accesslog_worker_init();
lowat = worker_max_connections / 10;
kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu);
for (;;) {
if (sig_recv != 0) {
@ -216,15 +227,10 @@ kore_worker_entry(struct kore_worker *kw)
sig_recv = 0;
}
if (!quit && !worker_has_acceptlock &&
worker_active_connections < lowat)
if (!worker_has_acceptlock &&
worker_active_connections < worker_max_connections)
kore_worker_acceptlock_obtain();
kore_platform_event_wait();
if (worker_has_acceptlock &&
(worker_active_connections >= worker_max_connections ||
quit == 1))
if (kore_platform_event_wait() && worker_has_acceptlock)
kore_worker_acceptlock_release();
http_process();
@ -262,6 +268,7 @@ kore_worker_connection_add(struct connection *c)
{
TAILQ_INSERT_TAIL(&worker_clients, c, list);
worker_active_connections++;
worker->load++;
}
void
@ -275,14 +282,16 @@ void
kore_worker_connection_remove(struct connection *c)
{
worker_active_connections--;
worker->load--;
}
void
kore_worker_wait(int final)
{
u_int16_t id;
pid_t pid;
struct kore_worker *kw;
int status;
struct kore_worker k, *kw, *next;
if (final)
pid = waitpid(WAIT_ANY, &status, 0);
@ -297,50 +306,44 @@ kore_worker_wait(int final)
if (pid == 0)
return;
for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) {
next = TAILQ_NEXT(kw, list);
for (id = 0; id < worker_count; id++) {
kw = WORKER(id);
if (kw->pid != pid)
continue;
k = *kw;
TAILQ_REMOVE(&kore_workers, kw, list);
kore_log(LOG_NOTICE, "worker %d (%d)-> status %d",
kw->id, pid, status);
free(kw);
if (final)
continue;
if (final) {
kw->pid = 0;
break;
}
if (WEXITSTATUS(status) || WTERMSIG(status) ||
WCOREDUMP(status)) {
kore_log(LOG_NOTICE,
"worker %d (pid: %d) gone, respawning new one",
k.id, k.pid);
kore_worker_spawn(k.cpu);
kw->id, kw->pid);
kore_worker_spawn(kw->id, kw->cpu);
}
break;
}
}
static void
kore_worker_acceptlock_obtain(void)
{
int ret;
if (worker_count == 1 && !worker_has_acceptlock) {
worker_has_acceptlock = 1;
kore_platform_enable_accept();
return;
}
ret = kore_trylock(kore_accept_lock);
if (ret == -1) {
if (errno == EAGAIN)
return;
kore_log(LOG_WARNING, "kore_worker_acceptlock(): %s", errno_s);
} else {
if (worker_trylock()) {
worker_has_acceptlock = 1;
kore_platform_enable_accept();
kore_debug("obtained accept lock (%d/%d)",
worker_debug("%d: obtained accept lock (%d/%d)\n", worker->id,
worker_active_connections, worker_max_connections);
}
}
@ -357,37 +360,64 @@ kore_worker_acceptlock_release(void)
return;
}
if (kore_unlock(kore_accept_lock) == -1) {
kore_log(LOG_NOTICE,
"kore_worker_acceptlock_release(): %s", errno_s);
} else {
if (worker_unlock()) {
worker_has_acceptlock = 0;
kore_platform_disable_accept();
kore_debug("released %d/%d",
worker_debug("%d: released %d/%d\n", worker->id,
worker_active_connections, worker_max_connections);
}
}
#if !defined(KORE_USE_SEMAPHORE)
static int
kore_internal_trylock(int *lock)
worker_trylock(void)
{
errno = EAGAIN;
if (__sync_val_compare_and_swap(lock, 0, 1) == 1)
return (-1);
if (__sync_val_compare_and_swap(&(accept_lock->lock),
worker->id, worker->pid) != worker->id)
return (0);
errno = 0;
return (0);
worker_decide_next();
return (1);
}
static int
kore_internal_unlock(int *lock)
worker_unlock(void)
{
if (__sync_val_compare_and_swap(lock, 1, 0) != 1)
if (accept_lock->next == worker->id) {
worker_debug("%d: retaining lock\n", worker->id);
worker_decide_next();
return (0);
}
if (__sync_val_compare_and_swap(&(accept_lock->lock),
accept_lock->current, accept_lock->next) != accept_lock->current)
kore_log(LOG_NOTICE, "kore_internal_unlock(): wasnt locked");
return (0);
return (1);
}
#endif
static void
worker_decide_next(void)
{
u_int16_t id, load;
struct kore_worker *kw, *low;
low = NULL;
load = worker_max_connections;
for (id = 0; id < worker_count; id++) {
kw = WORKER(id);
if (kw->load < load) {
load = kw->load;
low = kw;
}
}
if (low == NULL) {
low = WORKER(accept_lock->workerid++);
if (accept_lock->workerid == worker_count)
accept_lock->workerid = 0;
}
accept_lock->next = low->id;
accept_lock->current = worker->pid;
}