Implemented an lock free queue based on this paper http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf, the "lock free queue" we had before wasn't lock free at all.
This commit is contained in:
parent
d9fe885ba5
commit
64ff82ecf9
15
src/Makefile
15
src/Makefile
@ -34,7 +34,7 @@ DSYMUTIL := true
|
||||
ifeq ($(CFG_OSTYPE), Linux)
|
||||
CFG_RUNTIME := librustrt.so
|
||||
CFG_STDLIB := libstd.so
|
||||
CFG_GCC_CFLAGS += -fPIC
|
||||
CFG_GCC_CFLAGS += -fPIC -march=i686
|
||||
CFG_GCC_LINK_FLAGS += -shared -fPIC -ldl -lpthread -lrt
|
||||
ifeq ($(CFG_CPUTYPE), x86_64)
|
||||
CFG_GCC_CFLAGS += -m32
|
||||
@ -107,7 +107,7 @@ ifdef CFG_UNIXY
|
||||
endif
|
||||
CFG_OBJ_SUFFIX := .o
|
||||
CFG_EXE_SUFFIX := .exe
|
||||
CFG_GCC_CFLAGS :=
|
||||
CFG_GCC_CFLAGS := -march=i686
|
||||
CFG_GCC_LINK_FLAGS := -shared
|
||||
ifeq ($(CFG_CPUTYPE), x86_64)
|
||||
CFG_GCC_CFLAGS += -m32
|
||||
@ -248,7 +248,6 @@ BOOT_CMIS := $(BOOT_MLS:.ml=.cmi)
|
||||
RUNTIME_CS := rt/sync/timer.cpp \
|
||||
rt/sync/sync.cpp \
|
||||
rt/sync/spin_lock.cpp \
|
||||
rt/sync/lock_free_queue.cpp \
|
||||
rt/sync/condition_variable.cpp \
|
||||
rt/rust.cpp \
|
||||
rt/rust_builtin.cpp \
|
||||
@ -286,6 +285,7 @@ RUNTIME_HDR := rt/globals.h \
|
||||
rt/util/hash_map.h \
|
||||
rt/sync/sync.h \
|
||||
rt/sync/timer.h \
|
||||
rt/sync/lock_free_queue.h \
|
||||
rt/rust_srv.h \
|
||||
rt/memory_region.h \
|
||||
rt/memory.h
|
||||
@ -394,7 +394,9 @@ TASK_XFAILS := test/run-pass/acyclic-unwind.rs \
|
||||
test/run-pass/task-life-0.rs \
|
||||
test/run-pass/task-comm.rs \
|
||||
test/run-pass/threads.rs \
|
||||
test/run-pass/yield.rs
|
||||
test/run-pass/yield.rs \
|
||||
test/run-pass/task-comm-15.rs \
|
||||
test/run-pass/task-life-0.rs
|
||||
|
||||
TEST_XFAILS_X86 := $(TASK_XFAILS) \
|
||||
test/run-pass/child-outlives-parent.rs \
|
||||
@ -537,6 +539,11 @@ TEST_XFAILS_LLVM := $(TASK_XFAILS) \
|
||||
task-comm-9.rs \
|
||||
task-comm-10.rs \
|
||||
task-comm-11.rs \
|
||||
task-comm-12.rs \
|
||||
task-comm-13.rs \
|
||||
task-comm-13-thread.rs \
|
||||
task-comm-14.rs \
|
||||
task-comm-15.rs \
|
||||
task-life-0.rs \
|
||||
threads.rs \
|
||||
type-sizes.rs \
|
||||
|
@ -301,12 +301,14 @@ void rust_dom::send_message(rust_message *message) {
|
||||
/**
|
||||
* Drains and processes incoming pending messages.
|
||||
*/
|
||||
void rust_dom::drain_incoming_message_queue() {
|
||||
void rust_dom::drain_incoming_message_queue(bool process) {
|
||||
rust_message *message;
|
||||
while ((message = (rust_message *) _incoming_message_queue.dequeue())) {
|
||||
while (_incoming_message_queue.dequeue(&message)) {
|
||||
log(rust_log::COMM, "<== processing incoming message \"%s\" 0x%"
|
||||
PRIxPTR, message->label, message);
|
||||
message->process();
|
||||
if (process) {
|
||||
message->process();
|
||||
}
|
||||
message->~rust_message();
|
||||
this->synchronized_region.free(message);
|
||||
}
|
||||
@ -454,7 +456,7 @@ rust_dom::start_main_loop()
|
||||
while (n_live_tasks() > 0) {
|
||||
A(this, is_deadlocked() == false, "deadlock");
|
||||
|
||||
drain_incoming_message_queue();
|
||||
drain_incoming_message_queue(true);
|
||||
|
||||
rust_task *scheduled_task = schedule_task();
|
||||
|
||||
@ -519,7 +521,7 @@ rust_dom::start_main_loop()
|
||||
}
|
||||
sync::yield();
|
||||
} else {
|
||||
drain_incoming_message_queue();
|
||||
drain_incoming_message_queue(true);
|
||||
}
|
||||
reap_dead_tasks();
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ struct rust_dom
|
||||
hash_map<rust_port *, rust_proxy<rust_port> *> _port_proxies;
|
||||
|
||||
// Incoming messages from other domains.
|
||||
lock_free_queue _incoming_message_queue;
|
||||
lock_free_queue<rust_message*> _incoming_message_queue;
|
||||
|
||||
#ifndef __WIN32__
|
||||
pthread_attr_t attr;
|
||||
@ -71,7 +71,7 @@ struct rust_dom
|
||||
void free(void *mem, memory_region::memory_region_type type);
|
||||
|
||||
void send_message(rust_message *message);
|
||||
void drain_incoming_message_queue();
|
||||
void drain_incoming_message_queue(bool process);
|
||||
rust_proxy<rust_task> *get_task_proxy(rust_task *task);
|
||||
void delete_proxies();
|
||||
rust_proxy<rust_port> *get_port_proxy_synchronized(rust_port *port);
|
||||
|
@ -9,7 +9,7 @@
|
||||
/**
|
||||
* Abstract base class for all message types.
|
||||
*/
|
||||
class rust_message : public lock_free_queue_node {
|
||||
class rust_message {
|
||||
public:
|
||||
const char* label;
|
||||
private:
|
||||
|
56
src/rt/sync/interrupt_transparent_queue.cpp
Normal file
56
src/rt/sync/interrupt_transparent_queue.cpp
Normal file
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Interrupt transparent queue, Schoen et. al, "On Interrupt-Transparent
|
||||
* Synchronization in an Embedded Object-Oriented Operating System", 2000.
|
||||
* enqueue() is allowed to interrupt enqueue() and dequeue(), however,
|
||||
* dequeue() is not allowed to interrupt itself.
|
||||
*/
|
||||
|
||||
#include "../globals.h"
|
||||
#include "interrupt_transparent_queue.h"
|
||||
|
||||
interrupt_transparent_queue_node::interrupt_transparent_queue_node() :
|
||||
next(NULL) {
|
||||
|
||||
}
|
||||
|
||||
interrupt_transparent_queue::interrupt_transparent_queue() : _tail(this) {
|
||||
|
||||
}
|
||||
|
||||
void
|
||||
interrupt_transparent_queue::enqueue(interrupt_transparent_queue_node *item) {
|
||||
lock.lock();
|
||||
item->next = (interrupt_transparent_queue_node *) NULL;
|
||||
interrupt_transparent_queue_node *last = _tail;
|
||||
_tail = item;
|
||||
while (last->next) {
|
||||
last = last->next;
|
||||
}
|
||||
last->next = item;
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
interrupt_transparent_queue_node *
|
||||
interrupt_transparent_queue::dequeue() {
|
||||
lock.lock();
|
||||
interrupt_transparent_queue_node *item = next;
|
||||
if (item && !(next = item->next)) {
|
||||
_tail = (interrupt_transparent_queue_node *) this;
|
||||
if (item->next) {
|
||||
interrupt_transparent_queue_node *lost = item->next;
|
||||
interrupt_transparent_queue_node *help;
|
||||
do {
|
||||
help = lost->next;
|
||||
enqueue(lost);
|
||||
} while ((lost = help) !=
|
||||
(interrupt_transparent_queue_node *) NULL);
|
||||
}
|
||||
}
|
||||
lock.unlock();
|
||||
return item;
|
||||
}
|
||||
|
||||
bool
|
||||
interrupt_transparent_queue::is_empty() {
|
||||
return next == NULL;
|
||||
}
|
22
src/rt/sync/interrupt_transparent_queue.h
Normal file
22
src/rt/sync/interrupt_transparent_queue.h
Normal file
@ -0,0 +1,22 @@
|
||||
#ifndef INTERRUPT_TRANSPARENT_QUEUE_H
|
||||
#define INTERRUPT_TRANSPARENT_QUEUE_H
|
||||
|
||||
#include "spin_lock.h"
|
||||
|
||||
class interrupt_transparent_queue_node {
|
||||
public:
|
||||
interrupt_transparent_queue_node *next;
|
||||
interrupt_transparent_queue_node();
|
||||
};
|
||||
|
||||
class interrupt_transparent_queue : interrupt_transparent_queue_node {
|
||||
spin_lock lock;
|
||||
interrupt_transparent_queue_node *_tail;
|
||||
public:
|
||||
interrupt_transparent_queue();
|
||||
void enqueue(interrupt_transparent_queue_node *item);
|
||||
interrupt_transparent_queue_node *dequeue();
|
||||
bool is_empty();
|
||||
};
|
||||
|
||||
#endif /* INTERRUPT_TRANSPARENT_QUEUE_H */
|
@ -1,22 +1,210 @@
|
||||
#ifndef LOCK_FREE_QUEUE_H
|
||||
#define LOCK_FREE_QUEUE_H
|
||||
|
||||
#include "spin_lock.h"
|
||||
/**
|
||||
* How and why this lock free queue works:
|
||||
*
|
||||
* Adapted from the paper titled "Simple, Fast, and Practical Non-Blocking
|
||||
* and Blocking Concurrent Queue Algorithms" by Maged M. Michael,
|
||||
* Michael L. Scott.
|
||||
*
|
||||
* Safety Properties:
|
||||
*
|
||||
* 1. The linked list is always connected.
|
||||
* 2. Nodes are only inserted after the last node in the linked list.
|
||||
* 3. Nodes are only deleted from the beginning of the linked list.
|
||||
* 4. Head always points to the first node in the linked list.
|
||||
* 5. Tail always points to a node in the linked list.
|
||||
*
|
||||
*
|
||||
* 1. The linked list is always connected because the next pointer is not set
|
||||
* to null before the node is freed, and no node is freed until deleted
|
||||
* from the linked list.
|
||||
*
|
||||
* 2. Nodes are only inserted at the end of the linked list because they are
|
||||
* linked through the tail pointer which always points to a node in the
|
||||
* linked list (5) and an inserted node is only linked to a node that has
|
||||
* a null next pointer, and the only such node is the last one (1).
|
||||
*
|
||||
* 3. Nodes are deleted from the beginning of the list because they are
|
||||
* deleted only when they are pointed to by head which always points to the
|
||||
* first node (4).
|
||||
*
|
||||
* 4. Head always points to the first node in the list because it only changes
|
||||
* its value to the next node atomically. The new value of head cannot be
|
||||
* null because if there is only one node in the list the dequeue operation
|
||||
* returns without deleting any nodes.
|
||||
*
|
||||
* 5. Tail always points to a node in the linked list because it never lags
|
||||
* behind head, so it can never point to a deleted node. Also, when tail
|
||||
* changes its value it always swings to the next node in the list and it
|
||||
* never tires to change its value if the next pointer is NULL.
|
||||
*/
|
||||
|
||||
class lock_free_queue_node {
|
||||
public:
|
||||
lock_free_queue_node *next;
|
||||
lock_free_queue_node();
|
||||
};
|
||||
#include <assert.h>
|
||||
template <class T>
|
||||
class lock_free_queue {
|
||||
|
||||
struct node_t;
|
||||
struct pointer_t {
|
||||
node_t *node;
|
||||
uint32_t count;
|
||||
pointer_t() : node(NULL), count(0) {
|
||||
// Nop.
|
||||
}
|
||||
pointer_t(node_t *node, uint32_t count) {
|
||||
this->node = node;
|
||||
this->count = count;
|
||||
}
|
||||
bool equals(pointer_t &other) {
|
||||
return node == other.node && count == other.count;
|
||||
}
|
||||
};
|
||||
|
||||
struct node_t {
|
||||
T value;
|
||||
pointer_t next;
|
||||
|
||||
node_t() {
|
||||
next.node = NULL;
|
||||
next.count = 0;
|
||||
}
|
||||
|
||||
node_t(pointer_t next, T value) {
|
||||
this->next = next;
|
||||
this->value = value;
|
||||
}
|
||||
};
|
||||
|
||||
// Always points to the first node in the list.
|
||||
pointer_t head;
|
||||
|
||||
// Always points to a node in the list, (not necessarily the last).
|
||||
pointer_t tail;
|
||||
|
||||
// Compare and swap counted pointers, we can only do this if pointr_t is
|
||||
// 8 bytes or less since that the maximum size CAS can handle.
|
||||
bool compare_and_swap(pointer_t *address,
|
||||
pointer_t *oldValue,
|
||||
pointer_t newValue) {
|
||||
|
||||
if (sync::compare_and_swap(
|
||||
(uint64_t*) address,
|
||||
*(uint64_t*) oldValue,
|
||||
*(uint64_t*) &newValue)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
class lock_free_queue : lock_free_queue_node {
|
||||
spin_lock lock;
|
||||
lock_free_queue_node *_tail;
|
||||
public:
|
||||
lock_free_queue();
|
||||
void enqueue(lock_free_queue_node *item);
|
||||
lock_free_queue_node *dequeue();
|
||||
bool is_empty();
|
||||
lock_free_queue() {
|
||||
// We can only handle 64bit CAS for counted pointers, so this will
|
||||
// not work with 64bit pointers.
|
||||
assert (sizeof(pointer_t) == sizeof(uint64_t));
|
||||
|
||||
// Allocate a dummy node to be used as the first node in the list.
|
||||
node_t *node = new node_t();
|
||||
|
||||
// Head and tail both start out pointing to the dummy node.
|
||||
head.node = node;
|
||||
tail.node = node;
|
||||
}
|
||||
|
||||
virtual ~lock_free_queue() {
|
||||
// Delete dummy node.
|
||||
delete head.node;
|
||||
}
|
||||
|
||||
bool is_empty() {
|
||||
return head.node == tail.node;
|
||||
}
|
||||
|
||||
void enqueue(T value) {
|
||||
|
||||
// Create a new node to be inserted in the linked list, and set the
|
||||
// next node to NULL.
|
||||
node_t *node = new node_t();
|
||||
node->value = value;
|
||||
node->next.node = NULL;
|
||||
pointer_t tail;
|
||||
|
||||
// Keep trying until enqueue is done.
|
||||
while (true) {
|
||||
// Read the current tail which may either point to the last node
|
||||
// or to the second to last node (not sure why second to last,
|
||||
// and not any other node).
|
||||
tail = this->tail;
|
||||
|
||||
// Reads the next node after the tail which will be the last node
|
||||
// if null.
|
||||
pointer_t next;
|
||||
if (tail.node != NULL) {
|
||||
next = tail.node->next;
|
||||
}
|
||||
|
||||
// Loop if another thread changed the tail since we last read it.
|
||||
if (tail.equals(this->tail) == false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If next is not pointing to the last node try to swing tail to
|
||||
// the last node and loop.
|
||||
if (next.node != NULL) {
|
||||
compare_and_swap(&this->tail, &tail,
|
||||
pointer_t(next.node, tail.count + 1));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Try to link node at the end of the linked list.
|
||||
if (compare_and_swap(&tail.node->next, &next,
|
||||
pointer_t(node, next.count + 1))) {
|
||||
// Enqueueing is done.
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue is done, try to swing tail to the inserted node.
|
||||
compare_and_swap(&this->tail, &tail,
|
||||
pointer_t(node, tail.count + 1));
|
||||
}
|
||||
|
||||
bool dequeue(T *value) {
|
||||
pointer_t head;
|
||||
|
||||
// Keep trying until dequeue is done.
|
||||
while(true) {
|
||||
head = this->head;
|
||||
pointer_t tail = this->tail;
|
||||
pointer_t next = head.node->next;
|
||||
|
||||
if (head.equals(this->head) == false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If queue is empty, or if tail is falling behind.
|
||||
if (head.node == tail.node) {
|
||||
// If queue is empty.
|
||||
if (next.node == NULL) {
|
||||
return false;
|
||||
}
|
||||
// Tail is falling behind, advance it.
|
||||
compare_and_swap(&this->tail,
|
||||
&tail,
|
||||
pointer_t(next.node, tail.count + 1));
|
||||
} else {
|
||||
// Read value before CAS, otherwise another
|
||||
// dequeue might advance it.
|
||||
*value = next.node->value;
|
||||
if (compare_and_swap(&this->head, &head,
|
||||
pointer_t(next.node, head.count + 1))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
delete head.node;
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* LOCK_FREE_QUEUE_H */
|
||||
|
@ -4,6 +4,11 @@
|
||||
class sync {
|
||||
public:
|
||||
static void yield();
|
||||
template <class T>
|
||||
static bool compare_and_swap(T *address,
|
||||
T oldValue, T newValue) {
|
||||
return __sync_bool_compare_and_swap(address, oldValue, newValue);
|
||||
}
|
||||
};
|
||||
|
||||
#endif /* SYNC_H */
|
||||
|
26
src/test/run-fail/task-comm-14.rs
Normal file
26
src/test/run-fail/task-comm-14.rs
Normal file
@ -0,0 +1,26 @@
|
||||
io fn main() {
|
||||
let port[int] po = port();
|
||||
|
||||
// Spawn 10 tasks each sending us back one int.
|
||||
let int i = 10;
|
||||
while (i > 0) {
|
||||
log i;
|
||||
spawn "child" child(i, chan(po));
|
||||
i = i - 1;
|
||||
}
|
||||
|
||||
i = 10;
|
||||
let int value = 0;
|
||||
while (i > 0) {
|
||||
log i;
|
||||
value <- po;
|
||||
i = i - 1;
|
||||
}
|
||||
|
||||
log "main thread exiting";
|
||||
}
|
||||
|
||||
io fn child(int x, chan[int] ch) {
|
||||
log x;
|
||||
ch <| x;
|
||||
}
|
23
src/test/run-pass/task-comm-12.rs
Normal file
23
src/test/run-pass/task-comm-12.rs
Normal file
@ -0,0 +1,23 @@
|
||||
use std;
|
||||
import std._task;
|
||||
|
||||
fn main() -> () {
|
||||
test00();
|
||||
}
|
||||
|
||||
fn start(int task_number) {
|
||||
log "Started / Finished Task.";
|
||||
}
|
||||
|
||||
fn test00() {
|
||||
let int i = 0;
|
||||
let task t = spawn thread start(i);
|
||||
|
||||
// Sleep long enough for the task to finish.
|
||||
_task.sleep(10000u);
|
||||
|
||||
// Try joining tasks that have already finished.
|
||||
join t;
|
||||
|
||||
log "Joined Task.";
|
||||
}
|
18
src/test/run-pass/task-comm-13-thread.rs
Normal file
18
src/test/run-pass/task-comm-13-thread.rs
Normal file
@ -0,0 +1,18 @@
|
||||
use std;
|
||||
import std._task;
|
||||
|
||||
io fn start(chan[int] c, int start, int number_of_messages) {
|
||||
let int i = 0;
|
||||
while (i < number_of_messages) {
|
||||
c <| start + i;
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> () {
|
||||
log "Check that we don't deadlock.";
|
||||
let port[int] p = port();
|
||||
let task a = spawn thread "start" start(chan(p), 0, 10);
|
||||
join a;
|
||||
log "Joined Task";
|
||||
}
|
18
src/test/run-pass/task-comm-13.rs
Normal file
18
src/test/run-pass/task-comm-13.rs
Normal file
@ -0,0 +1,18 @@
|
||||
use std;
|
||||
import std._task;
|
||||
|
||||
io fn start(chan[int] c, int start, int number_of_messages) {
|
||||
let int i = 0;
|
||||
while (i < number_of_messages) {
|
||||
c <| start + i;
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> () {
|
||||
log "Check that we don't deadlock.";
|
||||
let port[int] p = port();
|
||||
let task a = spawn "start" start(chan(p), 0, 10);
|
||||
join a;
|
||||
log "Joined Task";
|
||||
}
|
14
src/test/run-pass/task-comm-15.rs
Normal file
14
src/test/run-pass/task-comm-15.rs
Normal file
@ -0,0 +1,14 @@
|
||||
io fn start(chan[int] c, int n) {
|
||||
let int i = n;
|
||||
|
||||
while(i > 0) {
|
||||
c <| 0;
|
||||
i = i - 1;
|
||||
}
|
||||
}
|
||||
|
||||
io fn main() {
|
||||
let port[int] p = port();
|
||||
auto child = spawn thread "child" start(chan(p), 10);
|
||||
auto c <- p;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user