/** * The condition module provides a primitive for synchronized condition * checking. * * Copyright: Copyright Sean Kelly 2005 - 2009. * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) * Authors: Sean Kelly * Source: $(DRUNTIMESRC core/sync/_condition.d) */ /* Copyright Sean Kelly 2005 - 2009. * Distributed under the Boost Software License, Version 1.0. * (See accompanying file LICENSE or copy at * http://www.boost.org/LICENSE_1_0.txt) */ module core.sync.condition; public import core.sync.exception; public import core.sync.mutex; public import core.time; version (Windows) { private import core.sync.semaphore; private import core.sys.windows.basetsd /+: HANDLE+/; private import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; private import core.sys.windows.windef /+: BOOL, DWORD+/; private import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; } else version (Posix) { private import core.sync.config; private import core.stdc.errno; private import core.sys.posix.pthread; private import core.sys.posix.time; } else { static assert(false, "Platform not supported"); } //////////////////////////////////////////////////////////////////////////////// // Condition // // void wait(); // void notify(); // void notifyAll(); //////////////////////////////////////////////////////////////////////////////// /** * This class represents a condition variable as conceived by C.A.R. Hoare. As * per Mesa type monitors however, "signal" has been replaced with "notify" to * indicate that control is not transferred to the waiter when a notification * is sent. */ class Condition { //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a condition object which is associated with the supplied * mutex object. * * Params: * m = The mutex with which this condition will be associated. * * Throws: * SyncError on error. */ this( Mutex m ) nothrow @safe { version (Windows) { m_blockLock = CreateSemaphoreA( null, 1, 1, null ); if ( m_blockLock == m_blockLock.init ) throw new SyncError( "Unable to initialize condition" ); scope(failure) CloseHandle( m_blockLock ); m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); if ( m_blockQueue == m_blockQueue.init ) throw new SyncError( "Unable to initialize condition" ); scope(failure) CloseHandle( m_blockQueue ); InitializeCriticalSection( &m_unblockLock ); m_assocMutex = m; } else version (Posix) { m_assocMutex = m; int rc = pthread_cond_init( &m_hndl, null ); if ( rc ) throw new SyncError( "Unable to initialize condition" ); } } ~this() { version (Windows) { BOOL rc = CloseHandle( m_blockLock ); assert( rc, "Unable to destroy condition" ); rc = CloseHandle( m_blockQueue ); assert( rc, "Unable to destroy condition" ); DeleteCriticalSection( &m_unblockLock ); } else version (Posix) { int rc = pthread_cond_destroy( &m_hndl ); assert( !rc, "Unable to destroy condition" ); } } //////////////////////////////////////////////////////////////////////////// // General Properties //////////////////////////////////////////////////////////////////////////// /** * Gets the mutex associated with this condition. * * Returns: * The mutex associated with this condition. */ @property Mutex mutex() { return m_assocMutex; } // undocumented function for internal use final @property Mutex mutex_nothrow() pure nothrow @safe @nogc { return m_assocMutex; } //////////////////////////////////////////////////////////////////////////// // General Actions //////////////////////////////////////////////////////////////////////////// /** * Wait until notified. * * Throws: * SyncError on error. */ void wait() { version (Windows) { timedWait( INFINITE ); } else version (Posix) { int rc = pthread_cond_wait( &m_hndl, m_assocMutex.handleAddr() ); if ( rc ) throw new SyncError( "Unable to wait for condition" ); } } /** * Suspends the calling thread until a notification occurs or until the * supplied time period has elapsed. * * Params: * val = The time to wait. * * In: * val must be non-negative. * * Throws: * SyncError on error. * * Returns: * true if notified before the timeout and false if not. */ bool wait( Duration val ) in { assert( !val.isNegative ); } body { version (Windows) { auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); while ( val > maxWaitMillis ) { if ( timedWait( cast(uint) maxWaitMillis.total!"msecs" ) ) return true; val -= maxWaitMillis; } return timedWait( cast(uint) val.total!"msecs" ); } else version (Posix) { timespec t = void; mktspec( t, val ); int rc = pthread_cond_timedwait( &m_hndl, m_assocMutex.handleAddr(), &t ); if ( !rc ) return true; if ( rc == ETIMEDOUT ) return false; throw new SyncError( "Unable to wait for condition" ); } } /** * Notifies one waiter. * * Throws: * SyncError on error. */ void notify() { version (Windows) { notify( false ); } else version (Posix) { int rc = pthread_cond_signal( &m_hndl ); if ( rc ) throw new SyncError( "Unable to notify condition" ); } } /** * Notifies all waiters. * * Throws: * SyncError on error. */ void notifyAll() { version (Windows) { notify( true ); } else version (Posix) { int rc = pthread_cond_broadcast( &m_hndl ); if ( rc ) throw new SyncError( "Unable to notify condition" ); } } private: version (Windows) { bool timedWait( DWORD timeout ) { int numSignalsLeft; int numWaitersGone; DWORD rc; rc = WaitForSingleObject( m_blockLock, INFINITE ); assert( rc == WAIT_OBJECT_0 ); m_numWaitersBlocked++; rc = ReleaseSemaphore( m_blockLock, 1, null ); assert( rc ); m_assocMutex.unlock(); scope(failure) m_assocMutex.lock(); rc = WaitForSingleObject( m_blockQueue, timeout ); assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); bool timedOut = (rc == WAIT_TIMEOUT); EnterCriticalSection( &m_unblockLock ); scope(failure) LeaveCriticalSection( &m_unblockLock ); if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) { if ( timedOut ) { // timeout (or canceled) if ( m_numWaitersBlocked != 0 ) { m_numWaitersBlocked--; // do not unblock next waiter below (already unblocked) numSignalsLeft = 0; } else { // spurious wakeup pending!! m_numWaitersGone = 1; } } if ( --m_numWaitersToUnblock == 0 ) { if ( m_numWaitersBlocked != 0 ) { // open the gate rc = ReleaseSemaphore( m_blockLock, 1, null ); assert( rc ); // do not open the gate below again numSignalsLeft = 0; } else if ( (numWaitersGone = m_numWaitersGone) != 0 ) { m_numWaitersGone = 0; } } } else if ( ++m_numWaitersGone == int.max / 2 ) { // timeout/canceled or spurious event :-) rc = WaitForSingleObject( m_blockLock, INFINITE ); assert( rc == WAIT_OBJECT_0 ); // something is going on here - test of timeouts? m_numWaitersBlocked -= m_numWaitersGone; rc = ReleaseSemaphore( m_blockLock, 1, null ); assert( rc == WAIT_OBJECT_0 ); m_numWaitersGone = 0; } LeaveCriticalSection( &m_unblockLock ); if ( numSignalsLeft == 1 ) { // better now than spurious later (same as ResetEvent) for ( ; numWaitersGone > 0; --numWaitersGone ) { rc = WaitForSingleObject( m_blockQueue, INFINITE ); assert( rc == WAIT_OBJECT_0 ); } // open the gate rc = ReleaseSemaphore( m_blockLock, 1, null ); assert( rc ); } else if ( numSignalsLeft != 0 ) { // unblock next waiter rc = ReleaseSemaphore( m_blockQueue, 1, null ); assert( rc ); } m_assocMutex.lock(); return !timedOut; } void notify( bool all ) { DWORD rc; EnterCriticalSection( &m_unblockLock ); scope(failure) LeaveCriticalSection( &m_unblockLock ); if ( m_numWaitersToUnblock != 0 ) { if ( m_numWaitersBlocked == 0 ) { LeaveCriticalSection( &m_unblockLock ); return; } if ( all ) { m_numWaitersToUnblock += m_numWaitersBlocked; m_numWaitersBlocked = 0; } else { m_numWaitersToUnblock++; m_numWaitersBlocked--; } LeaveCriticalSection( &m_unblockLock ); } else if ( m_numWaitersBlocked > m_numWaitersGone ) { rc = WaitForSingleObject( m_blockLock, INFINITE ); assert( rc == WAIT_OBJECT_0 ); if ( 0 != m_numWaitersGone ) { m_numWaitersBlocked -= m_numWaitersGone; m_numWaitersGone = 0; } if ( all ) { m_numWaitersToUnblock = m_numWaitersBlocked; m_numWaitersBlocked = 0; } else { m_numWaitersToUnblock = 1; m_numWaitersBlocked--; } LeaveCriticalSection( &m_unblockLock ); rc = ReleaseSemaphore( m_blockQueue, 1, null ); assert( rc ); } else { LeaveCriticalSection( &m_unblockLock ); } } // NOTE: This implementation uses Algorithm 8c as described here: // http://groups.google.com/group/comp.programming.threads/ // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a HANDLE m_blockLock; // auto-reset event (now semaphore) HANDLE m_blockQueue; // auto-reset event (now semaphore) Mutex m_assocMutex; // external mutex/CS CRITICAL_SECTION m_unblockLock; // internal mutex/CS int m_numWaitersGone = 0; int m_numWaitersBlocked = 0; int m_numWaitersToUnblock = 0; } else version (Posix) { Mutex m_assocMutex; pthread_cond_t m_hndl; } } //////////////////////////////////////////////////////////////////////////////// // Unit Tests //////////////////////////////////////////////////////////////////////////////// version (unittest) { private import core.thread; private import core.sync.mutex; private import core.sync.semaphore; void testNotify() { auto mutex = new Mutex; auto condReady = new Condition( mutex ); auto semDone = new Semaphore; auto synLoop = new Object; int numWaiters = 10; int numTries = 10; int numReady = 0; int numTotal = 0; int numDone = 0; int numPost = 0; void waiter() { for ( int i = 0; i < numTries; ++i ) { synchronized( mutex ) { while ( numReady < 1 ) { condReady.wait(); } --numReady; ++numTotal; } synchronized( synLoop ) { ++numDone; } semDone.wait(); } } auto group = new ThreadGroup; for ( int i = 0; i < numWaiters; ++i ) group.create( &waiter ); for ( int i = 0; i < numTries; ++i ) { for ( int j = 0; j < numWaiters; ++j ) { synchronized( mutex ) { ++numReady; condReady.notify(); } } while ( true ) { synchronized( synLoop ) { if ( numDone >= numWaiters ) break; } Thread.yield(); } for ( int j = 0; j < numWaiters; ++j ) { semDone.notify(); } } group.joinAll(); assert( numTotal == numWaiters * numTries ); } void testNotifyAll() { auto mutex = new Mutex; auto condReady = new Condition( mutex ); int numWaiters = 10; int numReady = 0; int numDone = 0; bool alert = false; void waiter() { synchronized( mutex ) { ++numReady; while ( !alert ) condReady.wait(); ++numDone; } } auto group = new ThreadGroup; for ( int i = 0; i < numWaiters; ++i ) group.create( &waiter ); while ( true ) { synchronized( mutex ) { if ( numReady >= numWaiters ) { alert = true; condReady.notifyAll(); break; } } Thread.yield(); } group.joinAll(); assert( numReady == numWaiters && numDone == numWaiters ); } void testWaitTimeout() { auto mutex = new Mutex; auto condReady = new Condition( mutex ); bool waiting = false; bool alertedOne = true; bool alertedTwo = true; void waiter() { synchronized( mutex ) { waiting = true; // we never want to miss the notification (30s) alertedOne = condReady.wait( dur!"seconds"(30) ); // but we don't want to wait long for the timeout (10ms) alertedTwo = condReady.wait( dur!"msecs"(10) ); } } auto thread = new Thread( &waiter ); thread.start(); while ( true ) { synchronized( mutex ) { if ( waiting ) { condReady.notify(); break; } } Thread.yield(); } thread.join(); assert( waiting ); assert( alertedOne ); assert( !alertedTwo ); } unittest { testNotify(); testNotifyAll(); testWaitTimeout(); } }