diff --git a/examples/messaging/src/messaging.c b/examples/messaging/src/messaging.c index 1d8b725..cc3b480 100644 --- a/examples/messaging/src/messaging.c +++ b/examples/messaging/src/messaging.c @@ -53,8 +53,8 @@ init(int state) void received_message(struct kore_msg *msg, const void *data) { - kore_log(LOG_INFO, "got message (%d bytes): %.*s", msg->length, - msg->length, (const char *)data); + kore_log(LOG_INFO, "got message from %u (%d bytes): %.*s", msg->src, + msg->length, msg->length, (const char *)data); } /* @@ -64,8 +64,12 @@ received_message(struct kore_msg *msg, const void *data) int page(struct http_request *req) { - kore_msg_send(MY_MESSAGE_ID, "hello", 5); - http_response(req, 200, NULL, 0); + /* Send to all workers first. */ + kore_msg_send(KORE_MSG_WORKER_ALL, MY_MESSAGE_ID, "hello", 5); + /* Now send something to worker number #2 only. */ + kore_msg_send(2, MY_MESSAGE_ID, "hello number 2", 14); + + http_response(req, 200, NULL, 0); return (KORE_RESULT_OK); } diff --git a/includes/kore.h b/includes/kore.h index a415ccf..b139c28 100644 --- a/includes/kore.h +++ b/includes/kore.h @@ -369,8 +369,14 @@ struct kore_timer { #define KORE_MSG_ACCESSLOG 1 #define KORE_MSG_WEBSOCKET 2 +/* Predefined message targets. */ +#define KORE_MSG_PARENT 1000 +#define KORE_MSG_WORKER_ALL 1001 + struct kore_msg { u_int8_t id; + u_int16_t src; + u_int16_t dst; u_int32_t length; }; @@ -512,7 +518,7 @@ void kore_msg_worker_init(void); void kore_msg_parent_init(void); void kore_msg_parent_add(struct kore_worker *); void kore_msg_parent_remove(struct kore_worker *); -void kore_msg_send(u_int8_t, void *, u_int32_t); +void kore_msg_send(u_int16_t, u_int8_t, void *, u_int32_t); int kore_msg_register(u_int8_t, void (*cb)(struct kore_msg *, const void *)); diff --git a/src/accesslog.c b/src/accesslog.c index f4a1552..9360a0a 100644 --- a/src/accesslog.c +++ b/src/accesslog.c @@ -167,5 +167,6 @@ kore_accesslog(struct http_request *req) } #endif - kore_msg_send(KORE_MSG_ACCESSLOG, &logpacket, sizeof(logpacket)); + kore_msg_send(KORE_MSG_PARENT, + KORE_MSG_ACCESSLOG, &logpacket, sizeof(logpacket)); } diff --git a/src/msg.c b/src/msg.c index e1706bc..edfb640 100644 --- a/src/msg.c +++ b/src/msg.c @@ -33,6 +33,7 @@ static struct msg_type *msg_type_lookup(u_int8_t); static int msg_recv_packet(struct netbuf *); static int msg_recv_data(struct netbuf *); static void msg_disconnected_parent(struct connection *); +static void msg_disconnected_worker(struct connection *); static void msg_type_accesslog(struct kore_msg *, const void *); static void msg_type_websocket(struct kore_msg *, const void *); @@ -59,12 +60,19 @@ kore_msg_parent_init(void) void kore_msg_parent_add(struct kore_worker *kw) { + u_int8_t *worker_id; + + worker_id = kore_malloc(sizeof(*worker_id)); + *worker_id = kw->id; + kw->msg[0] = kore_connection_new(NULL); kw->msg[0]->fd = kw->pipe[0]; kw->msg[0]->read = net_read; kw->msg[0]->write = net_write; kw->msg[0]->proto = CONN_PROTO_MSG; kw->msg[0]->state = CONN_STATE_ESTABLISHED; + kw->msg[0]->hdlr_extra = worker_id; + kw->msg[0]->disconnect = msg_disconnected_worker; TAILQ_INSERT_TAIL(&connections, kw->msg[0], list); kore_platform_event_all(kw->msg[0]->fd, kw->msg[0]); @@ -117,12 +125,14 @@ kore_msg_register(u_int8_t id, void (*cb)(struct kore_msg *, const void *)) } void -kore_msg_send(u_int8_t id, void *data, u_int32_t len) +kore_msg_send(u_int16_t dst, u_int8_t id, void *data, u_int32_t len) { struct kore_msg m; m.id = id; + m.dst = dst; m.length = len; + m.src = worker->id; net_send_queue(worker->msg[1], &m, sizeof(m), NULL, NETBUF_LAST_CHAIN); net_send_queue(worker->msg[1], data, len, NULL, NETBUF_LAST_CHAIN); @@ -145,13 +155,23 @@ msg_recv_data(struct netbuf *nb) struct msg_type *type; struct kore_msg *msg = (struct kore_msg *)nb->buf; - if ((type = msg_type_lookup(msg->id)) != NULL) + if ((type = msg_type_lookup(msg->id)) != NULL) { + if (worker == NULL && msg->dst != KORE_MSG_PARENT) + fatal("received parent msg for non parent dst"); type->cb(msg, nb->buf + sizeof(*msg)); + } if (worker == NULL && type == NULL) { TAILQ_FOREACH(c, &connections, list) { if (c == nb->owner) continue; + if (c->proto != CONN_PROTO_MSG || c->hdlr_extra == NULL) + continue; + + if (msg->dst != KORE_MSG_WORKER_ALL && + *(u_int8_t *)c->hdlr_extra != msg->dst) + continue; + net_send_queue(c, nb->buf, nb->s_off, NULL, NETBUF_LAST_CHAIN); net_send_flush(c); @@ -170,6 +190,13 @@ msg_disconnected_parent(struct connection *c) kore_log(LOG_ERR, "failed to send SIGQUIT: %s", errno_s); } +static void +msg_disconnected_worker(struct connection *c) +{ + kore_mem_free(c->hdlr_extra); + c->hdlr_extra = NULL; +} + static void msg_type_accesslog(struct kore_msg *msg, const void *data) { diff --git a/src/websocket.c b/src/websocket.c index e3dad6b..0696320 100644 --- a/src/websocket.c +++ b/src/websocket.c @@ -143,8 +143,10 @@ kore_websocket_broadcast(struct connection *src, u_int8_t op, const void *data, } } - if (scope == WEBSOCKET_BROADCAST_GLOBAL) - kore_msg_send(KORE_MSG_WEBSOCKET, frame->data, frame->offset); + if (scope == WEBSOCKET_BROADCAST_GLOBAL) { + kore_msg_send(KORE_MSG_WORKER_ALL, + KORE_MSG_WEBSOCKET, frame->data, frame->offset); + } kore_buf_free(frame); }