diff --git a/includes/kore.h b/includes/kore.h index a5c5286..55c5d2d 100644 --- a/includes/kore.h +++ b/includes/kore.h @@ -113,6 +113,7 @@ struct kore_module_handle { struct kore_worker { u_int16_t id; u_int16_t cpu; + u_int16_t load; pid_t pid; TAILQ_ENTRY(kore_worker) list; }; @@ -157,6 +158,7 @@ extern char kore_version_string[]; extern u_int16_t cpu_count; extern u_int8_t worker_count; extern u_int32_t worker_max_connections; +extern u_int32_t worker_active_connections; extern struct listener server; extern struct kore_worker *worker; @@ -174,7 +176,7 @@ void kore_worker_connection_move(struct connection *); void kore_worker_connection_remove(struct connection *); void kore_platform_event_init(void); -void kore_platform_event_wait(void); +int kore_platform_event_wait(void); void kore_platform_proctitle(char *); void kore_platform_enable_accept(void); void kore_platform_disable_accept(void); @@ -184,7 +186,7 @@ void kore_platform_worker_setcpu(struct kore_worker *); void kore_platform_init(void); void kore_accesslog_init(void); int kore_accesslog_wait(void); -void kore_worker_spawn(u_int16_t); +void kore_worker_spawn(u_int16_t, u_int16_t); void kore_accesslog_worker_init(void); void kore_worker_entry(struct kore_worker *); int kore_ssl_sni_cb(SSL *, int *, void *); diff --git a/modules/example/module.conf b/modules/example/module.conf index a3724a4..d7ccbeb 100644 --- a/modules/example/module.conf +++ b/modules/example/module.conf @@ -4,8 +4,7 @@ bind 10.211.55.3 443 # The path worker processes will chroot too after starting. -#chroot /home/joris/src/kore -chroot /tmp +chroot /home/joris/src/kore # Worker processes will run as the specified user. runas joris @@ -44,7 +43,7 @@ load modules/example/example.module # Syntax: # handler path module_callback -# Example domain that responds to 10.211.55.33. +# Example domain that responds to 10.211.55.3. domain 10.211.55.3 { certfile cert/server.crt certkey cert/server.key diff --git a/src/bsd.c b/src/bsd.c index 26c3a56..52b7686 100644 --- a/src/bsd.c +++ b/src/bsd.c @@ -74,19 +74,19 @@ kore_platform_event_init(void) EVFILT_READ, EV_ADD | EV_DISABLE, &server); } -void +int kore_platform_event_wait(void) { struct connection *c; struct timespec timeo; - int n, i, *fd; + int n, i, *fd, count; timeo.tv_sec = 0; timeo.tv_nsec = 100000000; n = kevent(kfd, changelist, nchanges, events, KQUEUE_EVENTS, &timeo); if (n == -1) { if (errno == EINTR) - return; + return (0); fatal("kevent(): %s", errno_s); } @@ -94,6 +94,7 @@ kore_platform_event_wait(void) if (n > 0) kore_debug("main(): %d sockets available", n); + count = 0; for (i = 0; i < n; i++) { fd = (int *)events[i].udata; @@ -108,14 +109,18 @@ kore_platform_event_wait(void) } if (*fd == server.fd) { - kore_connection_accept(&server, &c); - if (c == NULL) - continue; + while (worker_active_connections < + worker_max_connections) { + kore_connection_accept(&server, &c); + if (c == NULL) + continue; - kore_platform_event_schedule(c->fd, - EVFILT_READ, EV_ADD, c); - kore_platform_event_schedule(c->fd, - EVFILT_WRITE, EV_ADD | EV_ONESHOT, c); + count++; + kore_platform_event_schedule(c->fd, + EVFILT_READ, EV_ADD, c); + kore_platform_event_schedule(c->fd, + EVFILT_WRITE, EV_ADD | EV_ONESHOT, c); + } } else { c = (struct connection *)events[i].udata; if (events[i].filter == EVFILT_READ) @@ -134,6 +139,8 @@ kore_platform_event_wait(void) } } } + + return (count); } void diff --git a/src/kore.c b/src/kore.c index 898c424..d1eafea 100644 --- a/src/kore.c +++ b/src/kore.c @@ -80,7 +80,6 @@ main(int argc, char *argv[]) if (getuid() != 0) fatal("kore must be started as root"); - kore_debug = 0; while ((ch = getopt(argc, argv, "c:d")) != -1) { switch (ch) { case 'c': diff --git a/src/linux.c b/src/linux.c index f4d6858..045a324 100644 --- a/src/linux.c +++ b/src/linux.c @@ -83,22 +83,23 @@ kore_platform_event_init(void) events = kore_calloc(EPOLL_EVENTS, sizeof(struct epoll_event)); } -void +int kore_platform_event_wait(void) { struct connection *c; - int n, i, a, *fd; + int n, i, *fd, count; - n = epoll_wait(efd, events, EPOLL_EVENTS, 10); + n = epoll_wait(efd, events, EPOLL_EVENTS, 100); if (n == -1) { if (errno == EINTR) - return; + return (0); fatal("epoll_wait(): %s", errno_s); } if (n > 0) kore_debug("main(): %d sockets available", n); + count = 0; for (i = 0; i < n; i++) { fd = (int *)events[i].data.ptr; @@ -113,11 +114,13 @@ kore_platform_event_wait(void) } if (*fd == server.fd) { - for (a = 0; a < 10; a++) { + while (worker_active_connections < + worker_max_connections) { kore_connection_accept(&server, &c); if (c == NULL) break; + count++; kore_platform_event_schedule(c->fd, EPOLLIN | EPOLLOUT | EPOLLET, 0, c); } @@ -132,6 +135,8 @@ kore_platform_event_wait(void) kore_connection_disconnect(c); } } + + return (count); } void diff --git a/src/net.c b/src/net.c index 0b2c294..1ec1566 100644 --- a/src/net.c +++ b/src/net.c @@ -112,53 +112,53 @@ net_send(struct connection *c) int r; struct netbuf *nb; - if (TAILQ_EMPTY(&(c->send_queue))) - return (KORE_RESULT_OK); - - nb = TAILQ_FIRST(&(c->send_queue)); - if (nb->len == 0) { - kore_debug("net_send(): len is 0"); - return (KORE_RESULT_ERROR); - } - - r = SSL_write(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); - - kore_debug("net_send(%ld/%ld bytes), progress with %d", - nb->offset, nb->len, r); - - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - c->flags &= ~CONN_WRITE_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("SSL_write(): %s", ssl_errno_s); + while (!TAILQ_EMPTY(&(c->send_queue))) { + nb = TAILQ_FIRST(&(c->send_queue)); + if (nb->len == 0) { + kore_debug("net_send(): len is 0"); return (KORE_RESULT_ERROR); } - } - nb->offset += (size_t)r; - if (nb->offset == nb->len) { - if (nb->offset == nb->len) + r = SSL_write(c->ssl, + (nb->buf + nb->offset), (nb->len - nb->offset)); + + kore_debug("net_send(%ld/%ld bytes), progress with %d", + nb->offset, nb->len, r); + + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + case SSL_ERROR_WANT_WRITE: + c->flags &= ~CONN_WRITE_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("SSL_write(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); + } + } + + nb->offset += (size_t)r; + if (nb->offset == nb->len) { TAILQ_REMOVE(&(c->send_queue), nb, list); - if (nb->cb != NULL) - r = nb->cb(nb); - else - r = KORE_RESULT_OK; + if (nb->cb != NULL) + r = nb->cb(nb); + else + r = KORE_RESULT_OK; - if (nb->offset == nb->len) { - if (nb->buf != NULL) - free(nb->buf); - free(nb); + if (nb->offset == nb->len) { + if (nb->buf != NULL) + free(nb->buf); + free(nb); + } + + if (r != KORE_RESULT_OK) + return (r); } - } else { - r = KORE_RESULT_OK; } - return (r); + return (KORE_RESULT_OK); } int @@ -181,50 +181,56 @@ net_recv(struct connection *c) int r; struct netbuf *nb; - if (TAILQ_EMPTY(&(c->recv_queue))) - return (KORE_RESULT_ERROR); - - nb = TAILQ_FIRST(&(c->recv_queue)); - r = SSL_read(c->ssl, (nb->buf + nb->offset), (nb->len - nb->offset)); - - kore_debug("net_recv(%ld/%ld bytes), progress with %d", - nb->offset, nb->len, r); - - if (r <= 0) { - r = SSL_get_error(c->ssl, r); - switch (r) { - case SSL_ERROR_WANT_READ: - case SSL_ERROR_WANT_WRITE: - c->flags &= ~CONN_READ_POSSIBLE; - return (KORE_RESULT_OK); - default: - kore_debug("SSL_read(): %s", ssl_errno_s); - return (KORE_RESULT_ERROR); - } - } - - nb->offset += (size_t)r; - if (nb->offset == nb->len || (nb->flags & NETBUF_CALL_CB_ALWAYS)) { + while (!TAILQ_EMPTY(&(c->recv_queue))) { + nb = TAILQ_FIRST(&(c->recv_queue)); if (nb->cb == NULL) { kore_debug("kore_read_client(): nb->cb == NULL"); return (KORE_RESULT_ERROR); } - r = nb->cb(nb); - if (nb->offset == nb->len || - (nb->flags & NETBUF_FORCE_REMOVE)) { - TAILQ_REMOVE(&(c->recv_queue), nb, list); + r = SSL_read(c->ssl, + (nb->buf + nb->offset), (nb->len - nb->offset)); - if (!(nb->flags & NETBUF_RETAIN)) { - free(nb->buf); - free(nb); + kore_debug("net_recv(%ld/%ld bytes), progress with %d", + nb->offset, nb->len, r); + + if (r <= 0) { + r = SSL_get_error(c->ssl, r); + switch (r) { + case SSL_ERROR_WANT_READ: + c->flags &= ~CONN_READ_POSSIBLE; + if (nb->flags & NETBUF_CALL_CB_ALWAYS) + goto handle; + return (KORE_RESULT_OK); + case SSL_ERROR_WANT_WRITE: + c->flags &= ~CONN_READ_POSSIBLE; + return (KORE_RESULT_OK); + default: + kore_debug("SSL_read(): %s", ssl_errno_s); + return (KORE_RESULT_ERROR); } } - } else { - r = KORE_RESULT_OK; + + nb->offset += (size_t)r; + if (nb->offset == nb->len) { +handle: + r = nb->cb(nb); + if (nb->offset == nb->len || + (nb->flags & NETBUF_FORCE_REMOVE)) { + TAILQ_REMOVE(&(c->recv_queue), nb, list); + + if (!(nb->flags & NETBUF_RETAIN)) { + free(nb->buf); + free(nb); + } + } + + if (r != KORE_RESULT_OK) + return (r); + } } - return (r); + return (KORE_RESULT_OK); } int diff --git a/src/worker.c b/src/worker.c index 81158bb..e254f9c 100644 --- a/src/worker.c +++ b/src/worker.c @@ -47,57 +47,66 @@ #include "kore.h" #include "http.h" -#define KORE_SHM_KEY 15000 - -#if defined(KORE_USE_SEMAPHORE) -#define kore_trylock sem_trywait -#define kore_unlock sem_post -static sem_t *kore_accept_lock; +#if defined(WORKER_DEBUG) +#define worker_debug(fmt, ...) printf(fmt, ##__VA_ARGS__) #else -#define kore_trylock kore_internal_trylock -#define kore_unlock kore_internal_unlock -static int *kore_accept_lock; -static int kore_internal_trylock(int *); -static int kore_internal_unlock(int *); +#define worker_debug(fmt, ...) #endif +#define KORE_SHM_KEY 15000 +#define WORKER(id) \ + (struct kore_worker *)kore_workers + (sizeof(struct kore_worker) * id) + +struct wlock { + pid_t lock; + pid_t next; + pid_t current; + u_int16_t workerid; +}; + +static int worker_trylock(void); +static int worker_unlock(void); +static void worker_decide_next(void); + static void kore_worker_acceptlock_obtain(void); static void kore_worker_acceptlock_release(void); -static u_int16_t workerid = 0; static TAILQ_HEAD(, connection) disconnected; static TAILQ_HEAD(, connection) worker_clients; -static TAILQ_HEAD(, kore_worker) kore_workers; +static struct kore_worker *kore_workers; static int shm_accept_key; -static u_int32_t worker_active_connections = 0; +static struct wlock *accept_lock; static u_int8_t worker_has_acceptlock = 0; extern volatile sig_atomic_t sig_recv; struct kore_worker *worker = NULL; u_int32_t worker_max_connections = 250; +u_int32_t worker_active_connections = 0; void kore_worker_init(void) { + size_t len; u_int16_t i, cpu; if (worker_count == 0) fatal("no workers specified"); - shm_accept_key = shmget(KORE_SHM_KEY, - sizeof(*kore_accept_lock), IPC_CREAT | IPC_EXCL | 0700); + len = sizeof(*accept_lock) + + (sizeof(struct kore_worker) * worker_count); + shm_accept_key = shmget(KORE_SHM_KEY, len, IPC_CREAT | IPC_EXCL | 0700); if (shm_accept_key == -1) fatal("kore_worker_init(): shmget() %s", errno_s); - if ((kore_accept_lock = shmat(shm_accept_key, NULL, 0)) == NULL) + if ((accept_lock = shmat(shm_accept_key, NULL, 0)) == NULL) fatal("kore_worker_init(): shmat() %s", errno_s); -#if defined(KORE_USE_SEMAPHORE) - if (sem_init(kore_accept_lock, 1, 1) == -1) - fatal("kore_worker_init(): sem_init() %s", errno_s); -#else - *kore_accept_lock = 0; -#endif + accept_lock->lock = 0; + accept_lock->current = 0; + accept_lock->workerid = 1; + + kore_workers = (struct kore_worker *)accept_lock + sizeof(*accept_lock); + memset(kore_workers, 0, sizeof(struct kore_worker) * worker_count); kore_debug("kore_worker_init(): system has %d cpu's", cpu_count); kore_debug("kore_worker_init(): starting %d workers", worker_count); @@ -105,22 +114,22 @@ kore_worker_init(void) kore_debug("kore_worker_init(): more workers then cpu's"); cpu = 0; - TAILQ_INIT(&kore_workers); for (i = 0; i < worker_count; i++) { - kore_worker_spawn(cpu++); + kore_worker_spawn(i, cpu++); if (cpu == cpu_count) cpu = 0; } } void -kore_worker_spawn(u_int16_t cpu) +kore_worker_spawn(u_int16_t id, u_int16_t cpu) { struct kore_worker *kw; - kw = (struct kore_worker *)kore_malloc(sizeof(*kw)); - kw->id = workerid++; + kw = WORKER(id); + kw->id = id; kw->cpu = cpu; + kw->load = 0; kw->pid = fork(); if (kw->pid == -1) fatal("could not spawn worker child: %s", errno_s); @@ -130,16 +139,28 @@ kore_worker_spawn(u_int16_t cpu) kore_worker_entry(kw); /* NOTREACHED */ } - - TAILQ_INSERT_TAIL(&kore_workers, kw, list); } void kore_worker_shutdown(void) { + struct kore_worker *kw; + u_int16_t id, done; + kore_log(LOG_NOTICE, "waiting for workers to drain and shutdown"); - while (!TAILQ_EMPTY(&kore_workers)) - kore_worker_wait(1); + for (;;) { + done = 0; + for (id = 0; id < worker_count; id++) { + kw = WORKER(id); + if (kw->pid != 0) + kore_worker_wait(1); + else + done++; + } + + if (done == worker_count) + break; + } if (shmctl(shm_accept_key, IPC_RMID, NULL) == -1) { kore_log(LOG_NOTICE, @@ -150,9 +171,11 @@ kore_worker_shutdown(void) void kore_worker_dispatch_signal(int sig) { + u_int16_t id; struct kore_worker *kw; - TAILQ_FOREACH(kw, &kore_workers, list) { + for (id = 0; id < worker_count; id++) { + kw = WORKER(id); if (kill(kw->pid, sig) == -1) kore_debug("kill(%d, %d): %s", kw->pid, sig, errno_s); } @@ -162,10 +185,8 @@ void kore_worker_entry(struct kore_worker *kw) { int quit; - u_int32_t lowat; char buf[16]; struct connection *c, *cnext; - struct kore_worker *k, *next; worker = kw; @@ -181,15 +202,6 @@ kore_worker_entry(struct kore_worker *kw) kore_platform_proctitle(buf); kore_platform_worker_setcpu(kw); - for (k = TAILQ_FIRST(&kore_workers); k != NULL; k = next) { - next = TAILQ_NEXT(k, list); - if (k == worker) - continue; - - TAILQ_REMOVE(&kore_workers, k, list); - free(k); - } - kore_pid = kw->pid; sig_recv = 0; @@ -205,7 +217,6 @@ kore_worker_entry(struct kore_worker *kw) kore_platform_event_init(); kore_accesslog_worker_init(); - lowat = worker_max_connections / 10; kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu); for (;;) { if (sig_recv != 0) { @@ -216,15 +227,10 @@ kore_worker_entry(struct kore_worker *kw) sig_recv = 0; } - if (!quit && !worker_has_acceptlock && - worker_active_connections < lowat) + if (!worker_has_acceptlock && + worker_active_connections < worker_max_connections) kore_worker_acceptlock_obtain(); - - kore_platform_event_wait(); - - if (worker_has_acceptlock && - (worker_active_connections >= worker_max_connections || - quit == 1)) + if (kore_platform_event_wait() && worker_has_acceptlock) kore_worker_acceptlock_release(); http_process(); @@ -262,6 +268,7 @@ kore_worker_connection_add(struct connection *c) { TAILQ_INSERT_TAIL(&worker_clients, c, list); worker_active_connections++; + worker->load++; } void @@ -275,14 +282,16 @@ void kore_worker_connection_remove(struct connection *c) { worker_active_connections--; + worker->load--; } void kore_worker_wait(int final) { + u_int16_t id; pid_t pid; + struct kore_worker *kw; int status; - struct kore_worker k, *kw, *next; if (final) pid = waitpid(WAIT_ANY, &status, 0); @@ -297,50 +306,44 @@ kore_worker_wait(int final) if (pid == 0) return; - for (kw = TAILQ_FIRST(&kore_workers); kw != NULL; kw = next) { - next = TAILQ_NEXT(kw, list); + for (id = 0; id < worker_count; id++) { + kw = WORKER(id); if (kw->pid != pid) continue; - k = *kw; - TAILQ_REMOVE(&kore_workers, kw, list); kore_log(LOG_NOTICE, "worker %d (%d)-> status %d", kw->id, pid, status); - free(kw); - if (final) - continue; + if (final) { + kw->pid = 0; + break; + } if (WEXITSTATUS(status) || WTERMSIG(status) || WCOREDUMP(status)) { kore_log(LOG_NOTICE, "worker %d (pid: %d) gone, respawning new one", - k.id, k.pid); - kore_worker_spawn(k.cpu); + kw->id, kw->pid); + kore_worker_spawn(kw->id, kw->cpu); } + + break; } } static void kore_worker_acceptlock_obtain(void) { - int ret; - if (worker_count == 1 && !worker_has_acceptlock) { worker_has_acceptlock = 1; kore_platform_enable_accept(); return; } - ret = kore_trylock(kore_accept_lock); - if (ret == -1) { - if (errno == EAGAIN) - return; - kore_log(LOG_WARNING, "kore_worker_acceptlock(): %s", errno_s); - } else { + if (worker_trylock()) { worker_has_acceptlock = 1; kore_platform_enable_accept(); - kore_debug("obtained accept lock (%d/%d)", + worker_debug("%d: obtained accept lock (%d/%d)\n", worker->id, worker_active_connections, worker_max_connections); } } @@ -357,37 +360,64 @@ kore_worker_acceptlock_release(void) return; } - if (kore_unlock(kore_accept_lock) == -1) { - kore_log(LOG_NOTICE, - "kore_worker_acceptlock_release(): %s", errno_s); - } else { + if (worker_unlock()) { worker_has_acceptlock = 0; kore_platform_disable_accept(); - kore_debug("released %d/%d", + worker_debug("%d: released %d/%d\n", worker->id, worker_active_connections, worker_max_connections); } } -#if !defined(KORE_USE_SEMAPHORE) - static int -kore_internal_trylock(int *lock) +worker_trylock(void) { - errno = EAGAIN; - if (__sync_val_compare_and_swap(lock, 0, 1) == 1) - return (-1); + if (__sync_val_compare_and_swap(&(accept_lock->lock), + worker->id, worker->pid) != worker->id) + return (0); - errno = 0; - return (0); + worker_decide_next(); + + return (1); } static int -kore_internal_unlock(int *lock) +worker_unlock(void) { - if (__sync_val_compare_and_swap(lock, 1, 0) != 1) + if (accept_lock->next == worker->id) { + worker_debug("%d: retaining lock\n", worker->id); + worker_decide_next(); + return (0); + } + + if (__sync_val_compare_and_swap(&(accept_lock->lock), + accept_lock->current, accept_lock->next) != accept_lock->current) kore_log(LOG_NOTICE, "kore_internal_unlock(): wasnt locked"); - return (0); + return (1); } -#endif +static void +worker_decide_next(void) +{ + u_int16_t id, load; + struct kore_worker *kw, *low; + + low = NULL; + load = worker_max_connections; + for (id = 0; id < worker_count; id++) { + kw = WORKER(id); + if (kw->load < load) { + load = kw->load; + low = kw; + } + } + + if (low == NULL) { + low = WORKER(accept_lock->workerid++); + if (accept_lock->workerid == worker_count) + accept_lock->workerid = 0; + } + + accept_lock->next = low->id; + accept_lock->current = worker->pid; +}