From 1d604643b53f2e9d3ee8c3f07ad87e2b40f25eed Mon Sep 17 00:00:00 2001 From: Joris Vink Date: Thu, 4 Jun 2015 10:29:22 +0200 Subject: [PATCH] Add task_threads configuration option. Before Kore would spawn a task thread per task started if none were available. This was an obvious bad idiom but never really hit me hard until now. Kore will now only spawn as many task threads as configured by "task_threads" and queue up any newly started tasks ontop of already running threads if the limit was hit. --- conf/kore.conf.example | 3 +++ examples/tasks/conf/tasks.conf | 4 ++++ includes/tasks.h | 6 ++++- src/config.c | 34 ++++++++++++++++++++++++++-- src/tasks.c | 41 ++++++++++++---------------------- 5 files changed, 58 insertions(+), 30 deletions(-) diff --git a/conf/kore.conf.example b/conf/kore.conf.example index 1adcf07..9f8ce14 100644 --- a/conf/kore.conf.example +++ b/conf/kore.conf.example @@ -74,6 +74,9 @@ workers 4 #websocket_maxframe 16384 #websocket_timeout 120 +# Configure the number of available threads for background tasks. +#task_threads 2 + # Load modules (you can load multiple at the same time). # An additional parameter can be specified as the "onload" function # which Kore will call when the module is loaded/reloaded. diff --git a/examples/tasks/conf/tasks.conf b/examples/tasks/conf/tasks.conf index 70d97a4..944dfbc 100644 --- a/examples/tasks/conf/tasks.conf +++ b/examples/tasks/conf/tasks.conf @@ -5,6 +5,10 @@ load ./tasks.so tls_dhparam dh2048.pem +task_threads 4 +worker_max_connections 1000 +http_keepalive_time 0 + validator v_user regex ^[a-z]*$ domain 127.0.0.1 { diff --git a/includes/tasks.h b/includes/tasks.h index cb3bede..134623c 100644 --- a/includes/tasks.h +++ b/includes/tasks.h @@ -24,6 +24,8 @@ #define KORE_TASK_STATE_FINISHED 3 #define KORE_TASK_STATE_ABORT 4 +#define KORE_TASK_THREADS 2 + #if defined(__cplusplus) extern "C" { #endif @@ -76,7 +78,9 @@ void kore_task_set_result(struct kore_task *, int); int kore_task_state(struct kore_task *); int kore_task_result(struct kore_task *); - + +extern u_int16_t kore_task_threads; + #if defined(__cplusplus) } #endif diff --git a/src/config.c b/src/config.c index 1569885..e264730 100644 --- a/src/config.c +++ b/src/config.c @@ -28,6 +28,10 @@ #include "pgsql.h" #endif +#if defined(KORE_USE_TASKS) +#include "tasks.h" +#endif + /* XXX - This is becoming a clusterfuck. Fix it. */ static int configure_include(char **); @@ -72,6 +76,10 @@ static int configure_socket_backlog(char **); static int configure_pgsql_conn_max(char **); #endif +#if defined(KORE_USE_TASKS) +static int configure_task_threads(char **); +#endif + static void domain_sslstart(void); static void kore_parse_config_file(char *); @@ -119,6 +127,9 @@ static struct { { "socket_backlog", configure_socket_backlog }, #if defined(KORE_USE_PGSQL) { "pgsql_conn_max", configure_pgsql_conn_max }, +#endif +#if defined(KORE_USE_TASKS) + { "task_threads", configure_task_threads }, #endif { NULL, NULL }, }; @@ -1031,7 +1042,6 @@ domain_sslstart(void) } #if defined(KORE_USE_PGSQL) - static int configure_pgsql_conn_max(char **argv) { @@ -1050,5 +1060,25 @@ configure_pgsql_conn_max(char **argv) return (KORE_RESULT_OK); } - +#endif + +#if defined(KORE_USE_TASKS) +static int +configure_task_threads(char **argv) +{ + int err; + + if (argv[1] == NULL) { + printf("missing parameter for task_threads\n"); + return (KORE_RESULT_ERROR); + } + + kore_task_threads = kore_strtonum(argv[1], 10, 0, UCHAR_MAX, &err); + if (err != KORE_RESULT_OK) { + printf("bad value for task_threads: %s\n", argv[1]); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} #endif diff --git a/src/tasks.c b/src/tasks.c index 349d766..01442d8 100644 --- a/src/tasks.c +++ b/src/tasks.c @@ -27,10 +27,10 @@ #include "tasks.h" static u_int8_t threads; -static pthread_mutex_t task_thread_lock; - static TAILQ_HEAD(, kore_task_thread) task_threads; +u_int16_t kore_task_threads = KORE_TASK_THREADS; + static void *task_thread(void *); static void task_channel_read(int, void *, u_int32_t); static void task_channel_write(int, void *, u_int32_t); @@ -48,13 +48,8 @@ static void task_thread_spawn(struct kore_task_thread **); void kore_task_init(void) { - int r; - threads = 0; - TAILQ_INIT(&task_threads); - if ((r = pthread_mutex_init(&task_thread_lock, NULL)) != 0) - fatal("kore_task_init: pthread_mutex_init: %d", r); } void @@ -76,15 +71,17 @@ kore_task_run(struct kore_task *t) struct kore_task_thread *tt; kore_platform_schedule_read(t->fds[0], t); - - pthread_mutex_lock(&task_thread_lock); - if (TAILQ_EMPTY(&task_threads)) + if (threads < kore_task_threads) { + /* task_thread_spawn() will lock tt->lock for us. */ task_thread_spawn(&tt); - else - tt = TAILQ_FIRST(&task_threads); - - pthread_mutex_unlock(&task_thread_lock); - pthread_mutex_lock(&(tt->lock)); + } else { + /* Cycle task around. */ + if ((tt = TAILQ_FIRST(&task_threads)) == NULL) + fatal("no available tasks threads?"); + pthread_mutex_lock(&(tt->lock)); + TAILQ_REMOVE(&task_threads, tt, list); + TAILQ_INSERT_TAIL(&task_threads, tt, list); + } t->thread = tt; TAILQ_INSERT_TAIL(&(tt->tasks), t, list); @@ -293,6 +290,8 @@ task_thread_spawn(struct kore_task_thread **out) TAILQ_INIT(&(tt->tasks)); pthread_cond_init(&(tt->cond), NULL); pthread_mutex_init(&(tt->lock), NULL); + pthread_mutex_lock(&(tt->lock)); + TAILQ_INSERT_TAIL(&task_threads, tt, list); if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0) fatal("pthread_create: %s", errno_s); @@ -310,10 +309,6 @@ task_thread(void *arg) pthread_mutex_lock(&(tt->lock)); - pthread_mutex_lock(&task_thread_lock); - TAILQ_INSERT_TAIL(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - for (;;) { if (TAILQ_EMPTY(&(tt->tasks))) pthread_cond_wait(&(tt->cond), &(tt->lock)); @@ -324,20 +319,12 @@ task_thread(void *arg) TAILQ_REMOVE(&(tt->tasks), t, list); pthread_mutex_unlock(&(tt->lock)); - pthread_mutex_lock(&task_thread_lock); - TAILQ_REMOVE(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - kore_debug("task_thread#%d: executing %p", tt->idx, t); kore_task_set_state(t, KORE_TASK_STATE_RUNNING); kore_task_set_result(t, t->entry(t)); kore_task_finish(t); - pthread_mutex_lock(&task_thread_lock); - TAILQ_INSERT_HEAD(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - pthread_mutex_lock(&(tt->lock)); }