Rework the accept lock.

Kore no longer passes the accept lock to the "next in line"
worker but instead all workers will attempt to grab the lock
if they can.

Also remember if we had the lock previous iteration of the
event loop and don't constantly disable/enable the accepting sockets.

Makes Kore scale even better across multiple cpu's.
This commit is contained in:
Joris Vink 2014-07-30 15:20:09 +02:00
parent 83f890e89e
commit 26d4d5d63b
4 changed files with 38 additions and 94 deletions

View File

@ -238,12 +238,8 @@ struct kore_module_handle {
struct kore_worker {
u_int8_t id;
u_int8_t cpu;
u_int16_t load;
pid_t pid;
u_int8_t has_lock;
u_int8_t busy_warn;
u_int16_t accepted;
u_int16_t accept_treshold;
struct kore_module_handle *active_hdlr;
};

View File

@ -139,14 +139,12 @@ kore_platform_event_wait(u_int64_t timer)
case KORE_TYPE_LISTENER:
l = (struct listener *)events[i].udata;
while ((worker->accepted < worker->accept_treshold) &&
(worker_active_connections <
worker_max_connections)) {
while (worker_active_connections <
worker_max_connections) {
kore_connection_accept(l, &c);
if (c == NULL)
break;
worker->accepted++;
kore_platform_event_schedule(c->fd,
EVFILT_READ, EV_ADD, c);
kore_platform_event_schedule(c->fd,

View File

@ -124,14 +124,12 @@ kore_platform_event_wait(u_int64_t timer)
case KORE_TYPE_LISTENER:
l = (struct listener *)events[i].data.ptr;
while ((worker->accepted < worker->accept_treshold) &&
(worker_active_connections <
worker_max_connections)) {
while (worker_active_connections <
worker_max_connections) {
kore_connection_accept(l, &c);
if (c == NULL)
break;
worker->accepted++;
kore_platform_event_schedule(c->fd,
EPOLLIN | EPOLLOUT | EPOLLET, 0, c);
}

View File

@ -45,15 +45,12 @@
(sizeof(struct kore_worker) * id))
struct wlock {
pid_t lock;
pid_t next;
pid_t current;
u_int16_t workerid;
volatile int lock;
pid_t current;
};
static int worker_trylock(void);
static int worker_unlock(void);
static void worker_decide_next(void);
static void worker_unlock(void);
static int kore_worker_acceptlock_obtain(void);
@ -93,7 +90,6 @@ kore_worker_init(void)
accept_lock->lock = 0;
accept_lock->current = 0;
accept_lock->workerid = 1;
kore_workers = (struct kore_worker *)((u_int8_t *)accept_lock +
sizeof(*accept_lock));
@ -120,10 +116,7 @@ kore_worker_spawn(u_int16_t id, u_int16_t cpu)
kw = WORKER(id);
kw->id = id;
kw->cpu = cpu;
kw->load = 0;
kw->accepted = 0;
kw->has_lock = 0;
kw->busy_warn = 0;
kw->active_hdlr = NULL;
kw->pid = fork();
@ -180,9 +173,9 @@ kore_worker_dispatch_signal(int sig)
void
kore_worker_entry(struct kore_worker *kw)
{
int quit;
char buf[16];
struct connection *c, *cnext;
int quit, had_lock;
u_int64_t now, idle_check, last_cb_run, timer;
worker = kw;
@ -224,6 +217,7 @@ kore_worker_entry(struct kore_worker *kw)
TAILQ_INIT(&worker_clients);
quit = 0;
had_lock = 0;
now = idle_check = 0;
kore_platform_event_init();
kore_accesslog_worker_init();
@ -237,7 +231,6 @@ kore_worker_entry(struct kore_worker *kw)
kore_task_init();
#endif
worker->accept_treshold = worker_max_connections / 10;
kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu);
kore_module_onload();
@ -250,21 +243,19 @@ kore_worker_entry(struct kore_worker *kw)
sig_recv = 0;
}
/*
* If we do not own the accept lock we want to reduce the time
* spent waiting in kore_platform_event_wait() in case we get
* handed the lock so we can grab it as fast as possible.
*
* Perhaps not the ideal thing to do, but I can't come up with
* anything better right now.
*/
if (!worker->has_lock) {
if (kore_worker_acceptlock_obtain())
timer = 100;
else
timer = 1;
} else {
if (kore_worker_acceptlock_obtain()) {
if (had_lock == 0)
kore_platform_enable_accept();
timer = 100;
had_lock = 1;
} else {
if (had_lock == 1) {
had_lock = 0;
kore_platform_disable_accept();
}
timer = 1;
}
kore_platform_event_wait(timer);
@ -330,7 +321,6 @@ kore_worker_connection_add(struct connection *c)
{
TAILQ_INSERT_TAIL(&worker_clients, c, list);
worker_active_connections++;
worker->load++;
}
void
@ -344,7 +334,6 @@ void
kore_worker_connection_remove(struct connection *c)
{
worker_active_connections--;
worker->load--;
}
void
@ -389,8 +378,8 @@ kore_worker_wait(int final)
(kw->active_hdlr != NULL) ? kw->active_hdlr->func :
"none");
if (kw->pid == accept_lock->lock)
accept_lock->lock = accept_lock->next;
if (kw->pid == accept_lock->current)
worker_unlock();
if (kw->active_hdlr != NULL) {
kw->active_hdlr->errors++;
@ -415,19 +404,14 @@ kore_worker_wait(int final)
void
kore_worker_acceptlock_release(void)
{
if (worker_count == 1) {
worker->accepted = 0;
if (worker_count == 1)
return;
}
if (worker->has_lock != 1)
return;
if (worker_unlock()) {
worker->accepted = 0;
worker->has_lock = 0;
kore_platform_disable_accept();
}
worker_unlock();
worker->has_lock = 0;
}
static int
@ -435,30 +419,18 @@ kore_worker_acceptlock_obtain(void)
{
int r;
if (worker_count == 1 && !worker->has_lock) {
if (worker_count == 1) {
worker->has_lock = 1;
kore_platform_enable_accept();
return (1);
}
if (worker_active_connections >= worker_max_connections)
return (0);
r = 0;
if (worker_trylock()) {
if (worker_active_connections >= worker_max_connections) {
worker->accepted = 0;
worker_unlock();
if (worker->busy_warn == 0) {
kore_log(LOG_NOTICE,
"skipping accept lock, too busy");
worker->busy_warn = 1;
}
} else {
r = 1;
worker->has_lock = 1;
worker->busy_warn = 0;
kore_platform_enable_accept();
}
r = 1;
worker->has_lock = 1;
}
return (r);
@ -467,40 +439,20 @@ kore_worker_acceptlock_obtain(void)
static int
worker_trylock(void)
{
if (__sync_val_compare_and_swap(&(accept_lock->lock),
worker->id, worker->pid) != worker->id)
if (!__sync_bool_compare_and_swap(&(accept_lock->lock), 0, 1))
return (0);
worker_debug("wrk#%d grabbed lock (%d/%d)\n", worker->id,
worker_active_connections, worker_max_connections);
worker_decide_next();
return (1);
}
static int
worker_unlock(void)
{
worker_debug("%d: wrk#%d releasing (%d/%d)\n", worker->id, worker->id,
worker_active_connections, worker_max_connections);
if (__sync_val_compare_and_swap(&(accept_lock->lock),
accept_lock->current, accept_lock->next) != accept_lock->current)
kore_log(LOG_NOTICE, "kore_internal_unlock(): wasnt locked");
accept_lock->current = worker->pid;
return (1);
}
static void
worker_decide_next(void)
worker_unlock(void)
{
struct kore_worker *kw;
kw = WORKER(accept_lock->workerid++);
worker_debug("%d: next wrk#%d (%d, %p)\n",
worker->id, kw->id, kw->pid, kw);
if (accept_lock->workerid == worker_count)
accept_lock->workerid = 0;
accept_lock->next = kw->id;
accept_lock->current = worker->pid;
accept_lock->current = 0;
if (!__sync_bool_compare_and_swap(&(accept_lock->lock), 1, 0))
kore_log(LOG_NOTICE, "worker_unlock(): wasnt locked");
}