diff --git a/conf/kore.conf.example b/conf/kore.conf.example index 1adcf07..9f8ce14 100644 --- a/conf/kore.conf.example +++ b/conf/kore.conf.example @@ -74,6 +74,9 @@ workers 4 #websocket_maxframe 16384 #websocket_timeout 120 +# Configure the number of available threads for background tasks. +#task_threads 2 + # Load modules (you can load multiple at the same time). # An additional parameter can be specified as the "onload" function # which Kore will call when the module is loaded/reloaded. diff --git a/examples/tasks/conf/tasks.conf b/examples/tasks/conf/tasks.conf index 70d97a4..944dfbc 100644 --- a/examples/tasks/conf/tasks.conf +++ b/examples/tasks/conf/tasks.conf @@ -5,6 +5,10 @@ load ./tasks.so tls_dhparam dh2048.pem +task_threads 4 +worker_max_connections 1000 +http_keepalive_time 0 + validator v_user regex ^[a-z]*$ domain 127.0.0.1 { diff --git a/includes/tasks.h b/includes/tasks.h index cb3bede..134623c 100644 --- a/includes/tasks.h +++ b/includes/tasks.h @@ -24,6 +24,8 @@ #define KORE_TASK_STATE_FINISHED 3 #define KORE_TASK_STATE_ABORT 4 +#define KORE_TASK_THREADS 2 + #if defined(__cplusplus) extern "C" { #endif @@ -76,7 +78,9 @@ void kore_task_set_result(struct kore_task *, int); int kore_task_state(struct kore_task *); int kore_task_result(struct kore_task *); - + +extern u_int16_t kore_task_threads; + #if defined(__cplusplus) } #endif diff --git a/src/config.c b/src/config.c index 1569885..e264730 100644 --- a/src/config.c +++ b/src/config.c @@ -28,6 +28,10 @@ #include "pgsql.h" #endif +#if defined(KORE_USE_TASKS) +#include "tasks.h" +#endif + /* XXX - This is becoming a clusterfuck. Fix it. */ static int configure_include(char **); @@ -72,6 +76,10 @@ static int configure_socket_backlog(char **); static int configure_pgsql_conn_max(char **); #endif +#if defined(KORE_USE_TASKS) +static int configure_task_threads(char **); +#endif + static void domain_sslstart(void); static void kore_parse_config_file(char *); @@ -119,6 +127,9 @@ static struct { { "socket_backlog", configure_socket_backlog }, #if defined(KORE_USE_PGSQL) { "pgsql_conn_max", configure_pgsql_conn_max }, +#endif +#if defined(KORE_USE_TASKS) + { "task_threads", configure_task_threads }, #endif { NULL, NULL }, }; @@ -1031,7 +1042,6 @@ domain_sslstart(void) } #if defined(KORE_USE_PGSQL) - static int configure_pgsql_conn_max(char **argv) { @@ -1050,5 +1060,25 @@ configure_pgsql_conn_max(char **argv) return (KORE_RESULT_OK); } - +#endif + +#if defined(KORE_USE_TASKS) +static int +configure_task_threads(char **argv) +{ + int err; + + if (argv[1] == NULL) { + printf("missing parameter for task_threads\n"); + return (KORE_RESULT_ERROR); + } + + kore_task_threads = kore_strtonum(argv[1], 10, 0, UCHAR_MAX, &err); + if (err != KORE_RESULT_OK) { + printf("bad value for task_threads: %s\n", argv[1]); + return (KORE_RESULT_ERROR); + } + + return (KORE_RESULT_OK); +} #endif diff --git a/src/tasks.c b/src/tasks.c index 349d766..01442d8 100644 --- a/src/tasks.c +++ b/src/tasks.c @@ -27,10 +27,10 @@ #include "tasks.h" static u_int8_t threads; -static pthread_mutex_t task_thread_lock; - static TAILQ_HEAD(, kore_task_thread) task_threads; +u_int16_t kore_task_threads = KORE_TASK_THREADS; + static void *task_thread(void *); static void task_channel_read(int, void *, u_int32_t); static void task_channel_write(int, void *, u_int32_t); @@ -48,13 +48,8 @@ static void task_thread_spawn(struct kore_task_thread **); void kore_task_init(void) { - int r; - threads = 0; - TAILQ_INIT(&task_threads); - if ((r = pthread_mutex_init(&task_thread_lock, NULL)) != 0) - fatal("kore_task_init: pthread_mutex_init: %d", r); } void @@ -76,15 +71,17 @@ kore_task_run(struct kore_task *t) struct kore_task_thread *tt; kore_platform_schedule_read(t->fds[0], t); - - pthread_mutex_lock(&task_thread_lock); - if (TAILQ_EMPTY(&task_threads)) + if (threads < kore_task_threads) { + /* task_thread_spawn() will lock tt->lock for us. */ task_thread_spawn(&tt); - else - tt = TAILQ_FIRST(&task_threads); - - pthread_mutex_unlock(&task_thread_lock); - pthread_mutex_lock(&(tt->lock)); + } else { + /* Cycle task around. */ + if ((tt = TAILQ_FIRST(&task_threads)) == NULL) + fatal("no available tasks threads?"); + pthread_mutex_lock(&(tt->lock)); + TAILQ_REMOVE(&task_threads, tt, list); + TAILQ_INSERT_TAIL(&task_threads, tt, list); + } t->thread = tt; TAILQ_INSERT_TAIL(&(tt->tasks), t, list); @@ -293,6 +290,8 @@ task_thread_spawn(struct kore_task_thread **out) TAILQ_INIT(&(tt->tasks)); pthread_cond_init(&(tt->cond), NULL); pthread_mutex_init(&(tt->lock), NULL); + pthread_mutex_lock(&(tt->lock)); + TAILQ_INSERT_TAIL(&task_threads, tt, list); if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0) fatal("pthread_create: %s", errno_s); @@ -310,10 +309,6 @@ task_thread(void *arg) pthread_mutex_lock(&(tt->lock)); - pthread_mutex_lock(&task_thread_lock); - TAILQ_INSERT_TAIL(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - for (;;) { if (TAILQ_EMPTY(&(tt->tasks))) pthread_cond_wait(&(tt->cond), &(tt->lock)); @@ -324,20 +319,12 @@ task_thread(void *arg) TAILQ_REMOVE(&(tt->tasks), t, list); pthread_mutex_unlock(&(tt->lock)); - pthread_mutex_lock(&task_thread_lock); - TAILQ_REMOVE(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - kore_debug("task_thread#%d: executing %p", tt->idx, t); kore_task_set_state(t, KORE_TASK_STATE_RUNNING); kore_task_set_result(t, t->entry(t)); kore_task_finish(t); - pthread_mutex_lock(&task_thread_lock); - TAILQ_INSERT_HEAD(&task_threads, tt, list); - pthread_mutex_unlock(&task_thread_lock); - pthread_mutex_lock(&(tt->lock)); }