Initial stab at entering postgresql contrib code.

Has support for full async pgsql queries. Most of the logic
is hidden behind a KORE_PGSQL() macro allowing you to insert
these pgsql calls in your page handlers without blocking the
kore worker while the query is going off.

There is place for improvement here, and perhaps KORE_PGSQL won't
stay as I feel this might overcomplicate things instead of making
them simpler as I thought it would.
This commit is contained in:
Joris Vink 2014-03-30 23:54:35 +02:00
parent 8003bad094
commit 2f044cc7eb
10 changed files with 454 additions and 11 deletions

View File

@ -18,6 +18,12 @@ ifneq ("$(DEBUG)", "")
CFLAGS+=-DKORE_DEBUG
endif
ifneq ("$(PGSQL)", "")
S_SRC+=contrib/postgres/kore_pgsql.c
LDFLAGS+=-lpq
CFLAGS+=-DKORE_USE_PGSQL
endif
OSNAME=$(shell uname -s | sed -e 's/[-_].*//g' | tr A-Z a-z)
ifeq ("$(OSNAME)", "darwin")
CFLAGS+=-I/opt/local/include/
@ -38,6 +44,7 @@ all: $(S_OBJS)
$(CC) $(CFLAGS) -c $< -o $@
clean:
rm -f src/*.o $(BIN)
find . -type f -name \*.o -exec rm {} \;
rm -f $(BIN)
.PHONY: clean

View File

@ -0,0 +1,285 @@
/*
* Copyright (c) 2014 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include <sys/param.h>
#include <sys/queue.h>
#include <libpq-fe.h>
#include "kore.h"
#include "http.h"
#include "contrib/postgres/kore_pgsql.h"
struct pgsql_job {
u_int8_t idx;
struct http_request *req;
u_int64_t start;
char *query;
TAILQ_ENTRY(pgsql_job) list;
};
#define PGSQL_CONN_MAX 2
#define PGSQL_CONN_FREE 0x01
struct pgsql_conn {
u_int8_t type;
u_int8_t flags;
PGconn *db;
struct pgsql_job *job;
TAILQ_ENTRY(pgsql_conn) list;
};
static void pgsql_conn_cleanup(struct pgsql_conn *);
static int pgsql_conn_create(struct http_request *, int);
static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free;
static u_int16_t pgsql_conn_count;
void
kore_pgsql_init(void)
{
pgsql_conn_count = 0;
TAILQ_INIT(&pgsql_conn_free);
}
int
kore_pgsql_query(struct http_request *req, char *query, int idx)
{
int fd;
struct pgsql_conn *conn;
if (idx >= HTTP_PGSQL_MAX)
fatal("kore_pgsql_query: %d > %d", idx, HTTP_PGSQL_MAX);
if (req->pgsql[idx] != NULL)
fatal("kore_pgsql_query: %d already exists", idx);
if (TAILQ_EMPTY(&pgsql_conn_free)) {
if (pgsql_conn_count >= PGSQL_CONN_MAX)
return (KORE_RESULT_ERROR);
}
req->pgsql[idx] = kore_malloc(sizeof(struct kore_pgsql));
req->pgsql[idx]->state = KORE_PGSQL_STATE_INIT;
req->pgsql[idx]->result = NULL;
req->pgsql[idx]->error = NULL;
if (TAILQ_EMPTY(&pgsql_conn_free)) {
if (pgsql_conn_create(req, idx) == KORE_RESULT_ERROR)
return (KORE_RESULT_ERROR);
}
req->flags |= HTTP_REQUEST_SLEEPING;
conn = TAILQ_FIRST(&pgsql_conn_free);
if (!(conn->flags & PGSQL_CONN_FREE))
fatal("received a pgsql conn that was not free?");
conn->flags &= ~PGSQL_CONN_FREE;
TAILQ_REMOVE(&pgsql_conn_free, conn, list);
conn->job = kore_malloc(sizeof(struct pgsql_job));
conn->job->query = kore_strdup(query);
conn->job->start = kore_time_ms();
conn->job->req = req;
conn->job->idx = idx;
if (!PQsendQuery(conn->db, query)) {
pgsql_conn_cleanup(conn);
return (KORE_RESULT_ERROR);
}
fd = PQsocket(conn->db);
if (fd < 0)
fatal("PQsocket returned < 0 fd on open connection");
kore_platform_schedule_read(fd, conn);
kore_debug("query '%s' for %p sent on %p", query, req, conn);
req->pgsql[idx]->state = KORE_PGSQL_STATE_WAIT;
return (KORE_RESULT_OK);
}
void
kore_pgsql_handle(void *c, int err)
{
struct http_request *req;
struct pgsql_conn *conn = (struct pgsql_conn *)c;
int fd, i, (*cb)(struct http_request *);
i = conn->job->idx;
req = conn->job->req;
kore_debug("kore_pgsql_handle(): %p (%d)", req, i);
if (!PQconsumeInput(conn->db)) {
req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR;
req->pgsql[i]->error = PQerrorMessage(conn->db);
} else {
if (PQisBusy(conn->db)) {
req->pgsql[i]->state = KORE_PGSQL_STATE_WAIT;
} else {
req->pgsql[i]->result = PQgetResult(conn->db);
if (req->pgsql[i]->result == NULL) {
req->pgsql[i]->state = KORE_PGSQL_STATE_DONE;
} else {
switch (PQresultStatus(req->pgsql[i]->result)) {
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_NONFATAL_ERROR:
case PGRES_COPY_BOTH:
case PGRES_SINGLE_TUPLE:
req->pgsql[i]->state =
KORE_PGSQL_STATE_RESULT;
break;
case PGRES_EMPTY_QUERY:
case PGRES_BAD_RESPONSE:
case PGRES_FATAL_ERROR:
req->pgsql[i]->state =
KORE_PGSQL_STATE_ERROR;
req->pgsql[i]->error =
PQresultErrorMessage(req->pgsql[i]->result);
break;
}
}
}
}
if (req->pgsql[i]->state == KORE_PGSQL_STATE_ERROR ||
req->pgsql[i]->state == KORE_PGSQL_STATE_RESULT) {
cb = req->hdlr->addr;
cb(req);
}
req->pgsql[i]->error = NULL;
if (req->pgsql[i]->result)
PQclear(req->pgsql[i]->result);
switch (req->pgsql[i]->state) {
case KORE_PGSQL_STATE_INIT:
case KORE_PGSQL_STATE_WAIT:
break;
case KORE_PGSQL_STATE_DONE:
req->pgsql[i]->state = KORE_PGSQL_STATE_COMPLETE;
req->flags &= ~HTTP_REQUEST_SLEEPING;
kore_mem_free(conn->job->query);
kore_mem_free(conn->job);
conn->job = NULL;
conn->flags |= PGSQL_CONN_FREE;
TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list);
fd = PQsocket(conn->db);
kore_platform_disable_read(fd);
break;
case KORE_PGSQL_STATE_ERROR:
case KORE_PGSQL_STATE_RESULT:
kore_pgsql_handle(conn, 0);
break;
default:
fatal("unknown pgsql state");
}
}
void
kore_pgsql_cleanup(struct http_request *req)
{
int i;
for (i = 0; i < HTTP_PGSQL_MAX; i++) {
if (req->pgsql[i] == NULL)
continue;
kore_debug("cleaning up pgsql result %d for %p", i, req);
if (req->pgsql[i]->result != NULL) {
kore_log(LOG_NOTICE, "cleaning up leaked pgsql result");
PQclear(req->pgsql[i]->result);
}
kore_mem_free(req->pgsql[i]);
req->pgsql[i] = NULL;
}
}
int
kore_pgsql_ntuples(struct http_request *req, int idx)
{
return (PQntuples(req->pgsql[idx]->result));
}
static int
pgsql_conn_create(struct http_request *req, int idx)
{
struct pgsql_conn *conn;
pgsql_conn_count++;
conn = kore_malloc(sizeof(*conn));
kore_debug("pgsql_conn_create(): %p", conn);
memset(conn, 0, sizeof(*conn));
conn->db = PQconnectdb("host=/tmp/ user=joris");
if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) {
pgsql_conn_cleanup(conn);
return (KORE_RESULT_ERROR);
}
conn->job = NULL;
pgsql_conn_count++;
conn->flags = PGSQL_CONN_FREE;
conn->type = KORE_TYPE_PGSQL_CONN;
TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list);
return (KORE_RESULT_OK);
}
static void
pgsql_conn_cleanup(struct pgsql_conn *conn)
{
struct http_request *req;
int i, (*cb)(struct http_request *);
kore_debug("pgsql_conn_cleanup(): %p", conn);
if (conn->flags & PGSQL_CONN_FREE)
TAILQ_REMOVE(&pgsql_conn_free, conn, list);
if (conn->job) {
i = conn->job->idx;
req = conn->job->req;
req->pgsql[i]->state = KORE_PGSQL_STATE_ERROR;
req->pgsql[i]->error = PQerrorMessage(conn->db);
cb = req->hdlr->addr;
cb(req);
req->pgsql[i]->state = KORE_PGSQL_STATE_COMPLETE;
req->flags &= ~HTTP_REQUEST_SLEEPING;
kore_mem_free(conn->job->query);
kore_mem_free(conn->job);
conn->job = NULL;
}
if (conn->db != NULL)
PQfinish(conn->db);
pgsql_conn_count--;
kore_mem_free(conn);
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2014 Joris Vink <joris@coders.se>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#ifndef _H_KORE_PGSQL
#define _H_KORE_PGSQL
#include <libpq-fe.h>
void kore_pgsql_init(void);
void kore_pgsql_handle(void *, int);
void kore_pgsql_cleanup(struct http_request *);
int kore_pgsql_query(struct http_request *, char *, int);
int kore_pgsql_ntuples(struct http_request *, int);
struct kore_pgsql {
u_int8_t state;
char *error;
PGresult *result;
};
#define KORE_PGSQL_STATE_INIT 1
#define KORE_PGSQL_STATE_WAIT 2
#define KORE_PGSQL_STATE_RESULT 3
#define KORE_PGSQL_STATE_ERROR 4
#define KORE_PGSQL_STATE_DONE 5
#define KORE_PGSQL_STATE_COMPLETE 6
#define KORE_PGSQL(r, q, i, s) \
do { \
if (r->pgsql[i] == NULL) \
kore_pgsql_query(r, q, i); \
if (r->pgsql[i] == NULL) \
return (KORE_RESULT_RETRY); \
switch (r->pgsql[i]->state) { \
case KORE_PGSQL_STATE_ERROR: \
case KORE_PGSQL_STATE_RESULT: \
s; \
return (KORE_RESULT_RETRY); \
case KORE_PGSQL_STATE_COMPLETE: \
break; \
default: \
return (KORE_RESULT_RETRY); \
} \
} while (0);
#endif

View File

@ -118,6 +118,10 @@ struct http_file {
#define HTTP_REQUEST_COMPLETE 0x01
#define HTTP_REQUEST_DELETE 0x02
#define HTTP_REQUEST_SLEEPING 0x04
#define HTTP_PGSQL_MAX 20
struct kore_pgsql;
struct http_request {
u_int8_t method;
@ -136,7 +140,7 @@ struct http_request {
u_int8_t *multipart_body;
struct kore_module_handle *hdlr;
struct kore_pgsql *pgsql[HTTP_PGSQL_MAX];
TAILQ_HEAD(, http_header) req_headers;
TAILQ_HEAD(, http_header) resp_headers;

View File

@ -97,6 +97,7 @@ TAILQ_HEAD(netbuf_head, netbuf);
#define KORE_TYPE_LISTENER 1
#define KORE_TYPE_CONNECTION 2
#define KORE_TYPE_PGSQL_CONN 3
struct listener {
u_int8_t type;
@ -343,8 +344,10 @@ void kore_platform_init(void);
void kore_platform_event_init(void);
void kore_platform_event_wait(void);
void kore_platform_proctitle(char *);
void kore_platform_disable_read(int);
void kore_platform_enable_accept(void);
void kore_platform_disable_accept(void);
void kore_platform_schedule_read(int, void *);
void kore_platform_event_schedule(int, int, int, void *);
void kore_platform_worker_setcpu(struct kore_worker *);

View File

@ -1,8 +1,7 @@
# Example Kore configuration
# Server configuration.
bind 127.0.0.1 443
bind ::1 443
bind 10.211.55.6 443
# The path worker processes will chroot into after starting.
chroot /home/joris/src/kore
@ -44,7 +43,7 @@ workers 4
# (Set to 0 to disable sending this header).
#http_header_max 4096
#http_postbody_max 10240000
#http_keepalive_time 20
http_keepalive_time 0
#http_hsts_enable 31536000
# Load modules (you can load multiple at the same time).
@ -143,11 +142,13 @@ authentication auth_example {
# before allowing access to the page.
# Example domain that responds to localhost.
domain localhost {
domain kore.dev {
certfile cert/server.crt
certkey cert/server.key
accesslog /var/log/kore_access.log
static /pgsql serve_pgsql_test
# Page handlers with no authentication required.
static /css/style.css serve_style_css
static / serve_index

View File

@ -16,6 +16,7 @@
#include "kore.h"
#include "http.h"
#include "contrib/postgres/kore_pgsql.h"
#include "static.h"
@ -32,6 +33,7 @@ int serve_validator(struct http_request *);
int serve_params_test(struct http_request *);
int serve_private(struct http_request *);
int serve_private_test(struct http_request *);
int serve_pgsql_test(struct http_request *);
void my_callback(void);
int v_example_func(struct http_request *, char *);
@ -329,6 +331,27 @@ serve_private_test(struct http_request *req)
return (KORE_RESULT_OK);
}
int
serve_pgsql_test(struct http_request *req)
{
//int r;
KORE_PGSQL(req, "SELECT * FROM test", 0, {
if (req->pgsql[0]->state == KORE_PGSQL_STATE_ERROR) {
kore_log(LOG_NOTICE, "pgsql: %s",
(req->pgsql[0]->error) ?
req->pgsql[0]->error : "unknown");
} else {
//r = kore_pgsql_ntuples(req, 0);
}
});
/* Query successfully completed */
http_response(req, 200, "ok", 2);
return (KORE_RESULT_OK);
}
void
my_callback(void)
{

View File

@ -23,6 +23,10 @@
#include "kore.h"
#include "http.h"
#if defined(KORE_USE_PGSQL)
#include "contrib/postgres/kore_pgsql.h"
#endif
static char *http_status_text(int);
static int http_post_data_recv(struct netbuf *);
static char *http_post_data_text(struct http_request *);
@ -82,10 +86,12 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host,
return (KORE_RESULT_ERROR);
}
#if 0
if (strcasecmp(version, "http/1.1")) {
http_error_response(c, 505);
return (KORE_RESULT_ERROR);
}
#endif
if (!strcasecmp(method, "get")) {
m = HTTP_METHOD_GET;
@ -105,6 +111,7 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host,
req->status = 0;
req->stream = s;
req->method = m;
req->hdlr = NULL;
req->agent = NULL;
req->flags = flags;
req->post_data = NULL;
@ -132,7 +139,7 @@ http_request_new(struct connection *c, struct spdy_stream *s, char *host,
}
http_request_count++;
TAILQ_INSERT_TAIL(&http_requests, req, list);
TAILQ_INSERT_HEAD(&http_requests, req, list);
TAILQ_INSERT_TAIL(&(c->http_requests), req, olist);
if (out != NULL)
@ -158,15 +165,22 @@ http_process(void)
continue;
}
if (req->flags & HTTP_REQUEST_SLEEPING)
continue;
if (!(req->flags & HTTP_REQUEST_COMPLETE))
continue;
hdlr = kore_module_handler_find(req->host, req->path);
if (req->hdlr != NULL)
hdlr = req->hdlr;
else
hdlr = kore_module_handler_find(req->host, req->path);
req->start = kore_time_ms();
if (hdlr == NULL) {
r = http_generic_404(req);
} else {
if (hdlr->auth != NULL)
if (req->hdlr != hdlr && hdlr->auth != NULL)
r = kore_auth(req, hdlr->auth);
else
r = KORE_RESULT_OK;
@ -273,6 +287,10 @@ http_request_free(struct http_request *req)
kore_mem_free(f);
}
#if defined(KORE_USE_PGSQL)
kore_pgsql_cleanup(req);
#endif
if (req->method == HTTP_METHOD_POST && req->post_data != NULL)
kore_buf_free(req->post_data);
if (req->method == HTTP_METHOD_POST && req->multipart_body != NULL)

View File

@ -21,6 +21,10 @@
#include "kore.h"
#if defined(KORE_USE_PGSQL)
#include "contrib/postgres/kore_pgsql.h"
#endif
static int efd = -1;
static u_int32_t event_count = 0;
static struct epoll_event *events = NULL;
@ -92,12 +96,20 @@ kore_platform_event_wait(void)
if (type == KORE_TYPE_LISTENER)
fatal("failed on listener socket");
#if defined(KORE_USE_PGSQL)
if (type == KORE_TYPE_PGSQL_CONN) {
kore_pgsql_handle(events[i].data.ptr, 1);
continue;
}
#endif
c = (struct connection *)events[i].data.ptr;
kore_connection_disconnect(c);
continue;
}
if (type == KORE_TYPE_LISTENER) {
switch (type) {
case KORE_TYPE_LISTENER:
l = (struct listener *)events[i].data.ptr;
while ((worker->accepted < worker->accept_treshold) &&
@ -111,7 +123,8 @@ kore_platform_event_wait(void)
kore_platform_event_schedule(c->fd,
EPOLLIN | EPOLLOUT | EPOLLET, 0, c);
}
} else {
break;
case KORE_TYPE_CONNECTION:
c = (struct connection *)events[i].data.ptr;
if (events[i].events & EPOLLIN &&
!(c->flags & CONN_READ_BLOCK))
@ -122,6 +135,14 @@ kore_platform_event_wait(void)
if (!kore_connection_handle(c))
kore_connection_disconnect(c);
break;
#if defined(KORE_USE_PGSQL)
case KORE_TYPE_PGSQL_CONN:
kore_pgsql_handle(events[i].data.ptr, 0);
break;
#endif
default:
fatal("wrong type in event %d", type);
}
}
}
@ -146,6 +167,19 @@ kore_platform_event_schedule(int fd, int type, int flags, void *udata)
}
}
void
kore_platform_schedule_read(int fd, void *data)
{
kore_platform_event_schedule(fd, EPOLLIN | EPOLLET, 0, data);
}
void
kore_platform_disable_read(int fd)
{
if (epoll_ctl(efd, EPOLL_CTL_DEL, fd, NULL) == -1)
fatal("kore_platform_disable_read: %s", errno_s);
}
void
kore_platform_enable_accept(void)
{

View File

@ -25,6 +25,10 @@
#include "kore.h"
#include "http.h"
#if defined(KORE_USE_PGSQL)
#include "contrib/postgres/kore_pgsql.h"
#endif
#if defined(WORKER_DEBUG)
#define worker_debug(fmt, ...) printf(fmt, ##__VA_ARGS__)
#else
@ -221,6 +225,10 @@ kore_worker_entry(struct kore_worker *kw)
kore_accesslog_worker_init();
last_cb_run = kore_time_ms();
#if defined(KORE_USE_PGSQL)
kore_pgsql_init();
#endif
worker->accept_treshold = worker_max_connections / 10;
kore_log(LOG_NOTICE, "worker %d started (cpu#%d)", kw->id, kw->cpu);