Add kore_pgsql_query_params().

This function uses PQsendQueryParams() instead of the normal PQsendQuery()
allowing you to pass binary data in a cleaner fashion.

A basic call would look something like:

char *mydata = "Hello";
size_t mydata_len = strlen(mydata);

kore_pgsql_query_params(&pgsql, req,
    "INSERT INTO foo VALUES($1::text)", KORE_PGSQL_FORMAT_TEXT, 1
    mydata, mydata_len, KORE_PGSQL_FORMAT_TEXT);

kore_pgsql_query_params() is variadic, allowing you to pass any
count of parameters where each parameter has the following:
	data pointer, data length, type of parameter.
This commit is contained in:
Joris Vink 2014-09-28 21:39:16 +02:00
parent 0909e6bac1
commit 3b09683f5c
3 changed files with 112 additions and 48 deletions

View File

@ -96,7 +96,7 @@ request_perform_query(struct http_request *req)
req->fsm_state = REQ_STATE_DB_WAIT;
/* Fire off the query. */
if (!kore_pgsql_async(&state->sql, req, "SELECT * FROM coders")) {
if (!kore_pgsql_query(&state->sql, req, "SELECT * FROM coders")) {
/* If the state was still INIT, we'll try again later. */
if (state->sql.state == KORE_PGSQL_STATE_INIT) {
req->fsm_state = REQ_STATE_QUERY;

View File

@ -19,6 +19,9 @@
#include <libpq-fe.h>
#define KORE_PGSQL_FORMAT_TEXT 0
#define KORE_PGSQL_FORMAT_BINARY 1
struct pgsql_conn {
u_int8_t type;
u_int8_t flags;
@ -44,8 +47,10 @@ void kore_pgsql_init(void);
void kore_pgsql_handle(void *, int);
void kore_pgsql_cleanup(struct kore_pgsql *);
void kore_pgsql_continue(struct http_request *, struct kore_pgsql *);
int kore_pgsql_async(struct kore_pgsql *,
struct http_request *, const char *);
int kore_pgsql_query(struct kore_pgsql *, struct http_request *,
const char *);
int kore_pgsql_query_params(struct kore_pgsql *, struct http_request *,
const char *, int, u_int8_t, ...);
int kore_pgsql_ntuples(struct kore_pgsql *);
void kore_pgsql_logerror(struct kore_pgsql *);

View File

@ -43,12 +43,15 @@ struct pgsql_wait {
#define PGSQL_CONN_MAX 2
#define PGSQL_CONN_FREE 0x01
static void pgsql_queue_wakeup(void);
static int pgsql_conn_create(struct kore_pgsql *);
static void pgsql_queue_add(struct http_request *);
static void pgsql_conn_release(struct kore_pgsql *);
static void pgsql_conn_cleanup(struct pgsql_conn *);
static int pgsql_conn_create(struct kore_pgsql *);
static void pgsql_read_result(struct kore_pgsql *, int);
static void pgsql_queue_add(struct http_request *);
static void pgsql_queue_wakeup(void);
static void pgsql_schedule(struct kore_pgsql *, struct http_request *);
static int pgsql_prepare(struct kore_pgsql *, struct http_request *,
const char *);
static struct kore_pool pgsql_job_pool;
static struct kore_pool pgsql_wait_pool;
@ -72,56 +75,59 @@ kore_pgsql_init(void)
}
int
kore_pgsql_async(struct kore_pgsql *pgsql, struct http_request *req,
kore_pgsql_query(struct kore_pgsql *pgsql, struct http_request *req,
const char *query)
{
int fd;
struct pgsql_conn *conn;
if (!pgsql_prepare(pgsql, req, query))
return (KORE_RESULT_ERROR);
pgsql->state = KORE_PGSQL_STATE_INIT;
pgsql->result = NULL;
pgsql->error = NULL;
pgsql->conn = NULL;
if (TAILQ_EMPTY(&pgsql_conn_free)) {
if (pgsql_conn_count >= pgsql_conn_max) {
pgsql_queue_add(req);
return (KORE_RESULT_ERROR);
}
if (!pgsql_conn_create(pgsql))
return (KORE_RESULT_ERROR);
}
http_request_sleep(req);
conn = TAILQ_FIRST(&pgsql_conn_free);
if (!(conn->flags & PGSQL_CONN_FREE))
fatal("received a pgsql conn that was not free?");
conn->flags &= ~PGSQL_CONN_FREE;
TAILQ_REMOVE(&pgsql_conn_free, conn, list);
pgsql->conn = conn;
conn->job = kore_pool_get(&pgsql_job_pool);
conn->job->query = kore_strdup(query);
conn->job->pgsql = pgsql;
conn->job->req = req;
LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist);
if (!PQsendQuery(conn->db, query)) {
pgsql_conn_cleanup(conn);
if (!PQsendQuery(pgsql->conn->db, query)) {
pgsql_conn_cleanup(pgsql->conn);
return (KORE_RESULT_ERROR);
}
fd = PQsocket(conn->db);
if (fd < 0)
fatal("PQsocket returned < 0 fd on open connection");
pgsql_schedule(pgsql, req);
return (KORE_RESULT_OK);
}
kore_platform_schedule_read(fd, conn);
pgsql->state = KORE_PGSQL_STATE_WAIT;
kore_debug("query '%s' for %p sent on %p", query, req, conn);
int
kore_pgsql_query_params(struct kore_pgsql *pgsql, struct http_request *req,
const char *query, int result, u_int8_t count, ...)
{
u_int8_t i;
va_list args;
char **values;
int *lengths, *formats;
if (!pgsql_prepare(pgsql, req, query))
return (KORE_RESULT_ERROR);
va_start(args, count);
lengths = kore_calloc(count, sizeof(int));
formats = kore_calloc(count, sizeof(int));
values = kore_calloc(count, sizeof(char *));
for (i = 0; i < count; i++) {
values[i] = va_arg(args, void *);
lengths[i] = va_arg(args, u_int32_t);
formats[i] = va_arg(args, int);
}
if (!PQsendQueryParams(pgsql->conn->db, query, count, NULL,
(const char * const *)values, lengths, formats, result)) {
kore_mem_free(values);
kore_mem_free(lengths);
kore_mem_free(formats);
pgsql_conn_cleanup(pgsql->conn);
return (KORE_RESULT_ERROR);
}
kore_mem_free(values);
kore_mem_free(lengths);
kore_mem_free(formats);
pgsql_schedule(pgsql, req);
return (KORE_RESULT_OK);
}
@ -244,6 +250,59 @@ kore_pgsql_queue_remove(struct http_request *req)
}
}
static int
pgsql_prepare(struct kore_pgsql *pgsql, struct http_request *req,
const char *query)
{
struct pgsql_conn *conn;
pgsql->state = KORE_PGSQL_STATE_INIT;
pgsql->result = NULL;
pgsql->error = NULL;
pgsql->conn = NULL;
if (TAILQ_EMPTY(&pgsql_conn_free)) {
if (pgsql_conn_count >= pgsql_conn_max) {
pgsql_queue_add(req);
return (KORE_RESULT_ERROR);
}
if (!pgsql_conn_create(pgsql))
return (KORE_RESULT_ERROR);
}
http_request_sleep(req);
conn = TAILQ_FIRST(&pgsql_conn_free);
if (!(conn->flags & PGSQL_CONN_FREE))
fatal("received a pgsql conn that was not free?");
conn->flags &= ~PGSQL_CONN_FREE;
TAILQ_REMOVE(&pgsql_conn_free, conn, list);
pgsql->conn = conn;
conn->job = kore_pool_get(&pgsql_job_pool);
conn->job->query = kore_strdup(query);
conn->job->pgsql = pgsql;
conn->job->req = req;
LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist);
return (KORE_RESULT_OK);
}
static void
pgsql_schedule(struct kore_pgsql *pgsql, struct http_request *req)
{
int fd;
fd = PQsocket(pgsql->conn->db);
if (fd < 0)
fatal("PQsocket returned < 0 fd on open connection");
kore_platform_schedule_read(fd, pgsql->conn);
pgsql->state = KORE_PGSQL_STATE_WAIT;
kore_debug("query '%s' for %p sent on %p", query, req, pgsql->conn);
}
static void
pgsql_queue_add(struct http_request *req)
{