diff --git a/includes/http.h b/includes/http.h index 42db119..be39503 100644 --- a/includes/http.h +++ b/includes/http.h @@ -156,6 +156,7 @@ struct http_file { #define HTTP_REQUEST_COMPLETE 0x01 #define HTTP_REQUEST_DELETE 0x02 #define HTTP_REQUEST_SLEEPING 0x04 +#define HTTP_REQUEST_PGSQL_QUEUE 0x10 struct kore_task; diff --git a/includes/pgsql.h b/includes/pgsql.h index 4f28cc1..b104007 100644 --- a/includes/pgsql.h +++ b/includes/pgsql.h @@ -50,6 +50,7 @@ int kore_pgsql_async(struct kore_pgsql *, int kore_pgsql_ntuples(struct kore_pgsql *); void kore_pgsql_logerror(struct kore_pgsql *); +void kore_pgsql_queue_remove(struct http_request *); char *kore_pgsql_getvalue(struct kore_pgsql *, int, int); #define KORE_PGSQL_STATE_INIT 1 diff --git a/src/http.c b/src/http.c index 457793e..1b607b3 100644 --- a/src/http.c +++ b/src/http.c @@ -336,6 +336,9 @@ http_request_free(struct http_request *req) pgsql = LIST_FIRST(&(req->pgsqls)); kore_pgsql_cleanup(pgsql); } + + if (req->flags & HTTP_REQUEST_PGSQL_QUEUE) + kore_pgsql_queue_remove(req); #endif kore_debug("http_request_free: %p->%p", req->owner, req); diff --git a/src/pgsql.c b/src/pgsql.c index 830840f..c9419f2 100644 --- a/src/pgsql.c +++ b/src/pgsql.c @@ -34,6 +34,11 @@ struct pgsql_job { TAILQ_ENTRY(pgsql_job) list; }; +struct pgsql_wait { + struct http_request *req; + TAILQ_ENTRY(pgsql_wait) list; +}; + #define PGSQL_IS_BLOCKING 0 #define PGSQL_IS_ASYNC 1 @@ -44,8 +49,13 @@ static void pgsql_conn_release(struct kore_pgsql *); static void pgsql_conn_cleanup(struct pgsql_conn *); static int pgsql_conn_create(struct kore_pgsql *); static void pgsql_read_result(struct kore_pgsql *, int); +static void pgsql_queue_add(struct http_request *); +static void pgsql_queue_wakeup(void); +static struct kore_pool pgsql_job_pool; +static struct kore_pool pgsql_wait_pool; static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free; +static TAILQ_HEAD(, pgsql_wait) pgsql_wait_queue; static u_int16_t pgsql_conn_count; char *pgsql_conn_string = NULL; u_int16_t pgsql_conn_max = PGSQL_CONN_MAX; @@ -55,6 +65,12 @@ kore_pgsql_init(void) { pgsql_conn_count = 0; TAILQ_INIT(&pgsql_conn_free); + TAILQ_INIT(&pgsql_wait_queue); + + kore_pool_init(&pgsql_job_pool, "pgsql_job_pool", + sizeof(struct pgsql_job), 100); + kore_pool_init(&pgsql_wait_pool, "pgsql_wait_pool", + sizeof(struct pgsql_wait), 100); } int @@ -70,8 +86,12 @@ kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req, pgsql->conn = NULL; if (TAILQ_EMPTY(&pgsql_conn_free)) { - if ((pgsql_conn_count >= pgsql_conn_max) || - !pgsql_conn_create(pgsql)) + if (pgsql_conn_count >= pgsql_conn_max) { + pgsql_queue_add(req); + return (KORE_RESULT_ERROR); + } + + if (!pgsql_conn_create(pgsql)) return (KORE_RESULT_ERROR); } @@ -84,7 +104,7 @@ kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req, TAILQ_REMOVE(&pgsql_conn_free, conn, list); pgsql->conn = conn; - conn->job = kore_malloc(sizeof(struct pgsql_job)); + conn->job = kore_pool_get(&pgsql_job_pool); conn->job->query = kore_strdup(query); conn->job->start = kore_time_ms(); conn->job->pgsql = pgsql; @@ -182,11 +202,8 @@ kore_pgsql_cleanup(struct kore_pgsql *pgsql) if (pgsql->error != NULL) kore_mem_free(pgsql->error); - if (pgsql->conn != NULL) { - while (PQgetResult(pgsql->conn->db) != NULL) - ; + if (pgsql->conn != NULL) pgsql_conn_release(pgsql); - } pgsql->result = NULL; pgsql->error = NULL; @@ -214,6 +231,55 @@ kore_pgsql_getvalue(struct kore_pgsql *pgsql, int row, int col) return (PQgetvalue(pgsql->result, row, col)); } +void +kore_pgsql_queue_remove(struct http_request *req) +{ + struct pgsql_wait *pgw, *next; + + for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) { + next = TAILQ_NEXT(pgw, list); + if (pgw->req != req) + continue; + + TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); + kore_pool_put(&pgsql_wait_pool, pgw); + return; + } +} + +static void +pgsql_queue_add(struct http_request *req) +{ + struct pgsql_wait *pgw; + + http_request_sleep(req); + + pgw = kore_pool_get(&pgsql_wait_pool); + pgw->req = req; + pgw->req->flags |= HTTP_REQUEST_PGSQL_QUEUE; + + TAILQ_INSERT_TAIL(&pgsql_wait_queue, pgw, list); +} + +static void +pgsql_queue_wakeup(void) +{ + struct pgsql_wait *pgw, *next; + + for (pgw = TAILQ_FIRST(&pgsql_wait_queue); pgw != NULL; pgw = next) { + next = TAILQ_NEXT(pgw, list); + if (pgw->req->flags & HTTP_REQUEST_DELETE) + continue; + + http_request_wakeup(pgw->req); + pgw->req->flags &= ~HTTP_REQUEST_PGSQL_QUEUE; + + TAILQ_REMOVE(&pgsql_wait_queue, pgw, list); + kore_pool_put(&pgsql_wait_pool, pgw); + return; + } +} + static int pgsql_conn_create(struct kore_pgsql *pgsql) { @@ -252,7 +318,7 @@ pgsql_conn_release(struct kore_pgsql *pgsql) return; kore_mem_free(pgsql->conn->job->query); - kore_mem_free(pgsql->conn->job); + kore_pool_put(&pgsql_job_pool, pgsql->conn->job); /* Drain just in case. */ while (PQgetResult(pgsql->conn->db) != NULL) @@ -264,9 +330,11 @@ pgsql_conn_release(struct kore_pgsql *pgsql) fd = PQsocket(pgsql->conn->db); kore_platform_disable_read(fd); - pgsql->state = KORE_PGSQL_STATE_COMPLETE; pgsql->conn = NULL; + pgsql->state = KORE_PGSQL_STATE_COMPLETE; + + pgsql_queue_wakeup(); } static void @@ -290,7 +358,7 @@ pgsql_conn_cleanup(struct pgsql_conn *conn) pgsql->error = kore_strdup(PQerrorMessage(conn->db)); kore_mem_free(conn->job->query); - kore_mem_free(conn->job); + kore_pool_put(&pgsql_job_pool, conn->job); conn->job = NULL; }