Add task_threads configuration option.

Before Kore would spawn a task thread per task started
if none were available. This was an obvious bad idiom
but never really hit me hard until now.

Kore will now only spawn as many task threads as configured
by "task_threads" and queue up any newly started tasks ontop
of already running threads if the limit was hit.
This commit is contained in:
Joris Vink 2015-06-04 10:29:22 +02:00
parent 854b916597
commit 1d604643b5
5 changed files with 58 additions and 30 deletions

View File

@ -74,6 +74,9 @@ workers 4
#websocket_maxframe 16384
#websocket_timeout 120
# Configure the number of available threads for background tasks.
#task_threads 2
# Load modules (you can load multiple at the same time).
# An additional parameter can be specified as the "onload" function
# which Kore will call when the module is loaded/reloaded.

View File

@ -5,6 +5,10 @@ load ./tasks.so
tls_dhparam dh2048.pem
task_threads 4
worker_max_connections 1000
http_keepalive_time 0
validator v_user regex ^[a-z]*$
domain 127.0.0.1 {

View File

@ -24,6 +24,8 @@
#define KORE_TASK_STATE_FINISHED 3
#define KORE_TASK_STATE_ABORT 4
#define KORE_TASK_THREADS 2
#if defined(__cplusplus)
extern "C" {
#endif
@ -76,7 +78,9 @@ void kore_task_set_result(struct kore_task *, int);
int kore_task_state(struct kore_task *);
int kore_task_result(struct kore_task *);
extern u_int16_t kore_task_threads;
#if defined(__cplusplus)
}
#endif

View File

@ -28,6 +28,10 @@
#include "pgsql.h"
#endif
#if defined(KORE_USE_TASKS)
#include "tasks.h"
#endif
/* XXX - This is becoming a clusterfuck. Fix it. */
static int configure_include(char **);
@ -72,6 +76,10 @@ static int configure_socket_backlog(char **);
static int configure_pgsql_conn_max(char **);
#endif
#if defined(KORE_USE_TASKS)
static int configure_task_threads(char **);
#endif
static void domain_sslstart(void);
static void kore_parse_config_file(char *);
@ -119,6 +127,9 @@ static struct {
{ "socket_backlog", configure_socket_backlog },
#if defined(KORE_USE_PGSQL)
{ "pgsql_conn_max", configure_pgsql_conn_max },
#endif
#if defined(KORE_USE_TASKS)
{ "task_threads", configure_task_threads },
#endif
{ NULL, NULL },
};
@ -1031,7 +1042,6 @@ domain_sslstart(void)
}
#if defined(KORE_USE_PGSQL)
static int
configure_pgsql_conn_max(char **argv)
{
@ -1050,5 +1060,25 @@ configure_pgsql_conn_max(char **argv)
return (KORE_RESULT_OK);
}
#endif
#if defined(KORE_USE_TASKS)
static int
configure_task_threads(char **argv)
{
int err;
if (argv[1] == NULL) {
printf("missing parameter for task_threads\n");
return (KORE_RESULT_ERROR);
}
kore_task_threads = kore_strtonum(argv[1], 10, 0, UCHAR_MAX, &err);
if (err != KORE_RESULT_OK) {
printf("bad value for task_threads: %s\n", argv[1]);
return (KORE_RESULT_ERROR);
}
return (KORE_RESULT_OK);
}
#endif

View File

@ -27,10 +27,10 @@
#include "tasks.h"
static u_int8_t threads;
static pthread_mutex_t task_thread_lock;
static TAILQ_HEAD(, kore_task_thread) task_threads;
u_int16_t kore_task_threads = KORE_TASK_THREADS;
static void *task_thread(void *);
static void task_channel_read(int, void *, u_int32_t);
static void task_channel_write(int, void *, u_int32_t);
@ -48,13 +48,8 @@ static void task_thread_spawn(struct kore_task_thread **);
void
kore_task_init(void)
{
int r;
threads = 0;
TAILQ_INIT(&task_threads);
if ((r = pthread_mutex_init(&task_thread_lock, NULL)) != 0)
fatal("kore_task_init: pthread_mutex_init: %d", r);
}
void
@ -76,15 +71,17 @@ kore_task_run(struct kore_task *t)
struct kore_task_thread *tt;
kore_platform_schedule_read(t->fds[0], t);
pthread_mutex_lock(&task_thread_lock);
if (TAILQ_EMPTY(&task_threads))
if (threads < kore_task_threads) {
/* task_thread_spawn() will lock tt->lock for us. */
task_thread_spawn(&tt);
else
tt = TAILQ_FIRST(&task_threads);
pthread_mutex_unlock(&task_thread_lock);
pthread_mutex_lock(&(tt->lock));
} else {
/* Cycle task around. */
if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
fatal("no available tasks threads?");
pthread_mutex_lock(&(tt->lock));
TAILQ_REMOVE(&task_threads, tt, list);
TAILQ_INSERT_TAIL(&task_threads, tt, list);
}
t->thread = tt;
TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
@ -293,6 +290,8 @@ task_thread_spawn(struct kore_task_thread **out)
TAILQ_INIT(&(tt->tasks));
pthread_cond_init(&(tt->cond), NULL);
pthread_mutex_init(&(tt->lock), NULL);
pthread_mutex_lock(&(tt->lock));
TAILQ_INSERT_TAIL(&task_threads, tt, list);
if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
fatal("pthread_create: %s", errno_s);
@ -310,10 +309,6 @@ task_thread(void *arg)
pthread_mutex_lock(&(tt->lock));
pthread_mutex_lock(&task_thread_lock);
TAILQ_INSERT_TAIL(&task_threads, tt, list);
pthread_mutex_unlock(&task_thread_lock);
for (;;) {
if (TAILQ_EMPTY(&(tt->tasks)))
pthread_cond_wait(&(tt->cond), &(tt->lock));
@ -324,20 +319,12 @@ task_thread(void *arg)
TAILQ_REMOVE(&(tt->tasks), t, list);
pthread_mutex_unlock(&(tt->lock));
pthread_mutex_lock(&task_thread_lock);
TAILQ_REMOVE(&task_threads, tt, list);
pthread_mutex_unlock(&task_thread_lock);
kore_debug("task_thread#%d: executing %p", tt->idx, t);
kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
kore_task_set_result(t, t->entry(t));
kore_task_finish(t);
pthread_mutex_lock(&task_thread_lock);
TAILQ_INSERT_HEAD(&task_threads, tt, list);
pthread_mutex_unlock(&task_thread_lock);
pthread_mutex_lock(&(tt->lock));
}