// -*- C++ -*- // Copyright (C) 2007, 2008, 2009 Free Software Foundation, Inc. // // This file is part of the GNU ISO C++ Library. This library is free // software; you can redistribute it and/or modify it under the terms // of the GNU General Public License as published by the Free Software // Foundation; either version 3, or (at your option) any later // version. // This library is distributed in the hope that it will be useful, but // WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // General Public License for more details. // Under Section 7 of GPL version 3, you are granted additional // permissions described in the GCC Runtime Library Exception, version // 3.1, as published by the Free Software Foundation. // You should have received a copy of the GNU General Public License and // a copy of the GCC Runtime Library Exception along with this program; // see the files COPYING3 and COPYING.RUNTIME respectively. If not, see // . /** @file parallel/workstealing.h * @brief Parallelization of embarrassingly parallel execution by * means of work-stealing. * * Work stealing is described in * * R. D. Blumofe and C. E. Leiserson. * Scheduling multithreaded computations by work stealing. * Journal of the ACM, 46(5):720–748, 1999. * * This file is a GNU parallel extension to the Standard C++ Library. */ // Written by Felix Putze. #ifndef _GLIBCXX_PARALLEL_WORKSTEALING_H #define _GLIBCXX_PARALLEL_WORKSTEALING_H 1 #include #include #include namespace __gnu_parallel { #define _GLIBCXX_JOB_VOLATILE volatile /** @brief One __job for a certain thread. */ template struct _Job { typedef _DifferenceTp _DifferenceType; /** @brief First element. * * Changed by owning and stealing thread. By stealing thread, * always incremented. */ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_first; /** @brief Last element. * * Changed by owning thread only. */ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_last; /** @brief Number of elements, i.e. @c _M_last-_M_first+1. * * Changed by owning thread only. */ _GLIBCXX_JOB_VOLATILE _DifferenceType _M_load; }; /** @brief Work stealing algorithm for random access iterators. * * Uses O(1) additional memory. Synchronization at job lists is * done with atomic operations. * @param __begin Begin iterator of element sequence. * @param __end End iterator of element sequence. * @param __op User-supplied functor (comparator, predicate, adding * functor, ...). * @param __f Functor to "process" an element with __op (depends on * desired functionality, e. g. for std::for_each(), ...). * @param __r Functor to "add" a single __result to the already * processed elements (depends on functionality). * @param __base Base value for reduction. * @param __output Pointer to position where final result is written to * @param __bound Maximum number of elements processed (e. g. for * std::count_n()). * @return User-supplied functor (that may contain a part of the result). */ template _Op __for_each_template_random_access_workstealing(_RAIter __begin, _RAIter __end, _Op __op, _Fu& __f, _Red __r, _Result __base, _Result& __output, typename std::iterator_traits<_RAIter>::difference_type __bound) { _GLIBCXX_CALL(__end - __begin) typedef std::iterator_traits<_RAIter> _TraitsType; typedef typename _TraitsType::difference_type _DifferenceType; const _Settings& __s = _Settings::get(); _DifferenceType __chunk_size = static_cast<_DifferenceType>(__s.workstealing_chunk_size); // How many jobs? _DifferenceType __length = (__bound < 0) ? (__end - __begin) : __bound; // To avoid false sharing in a cache line. const int __stride = (__s.cache_line_size * 10 / sizeof(_Job<_DifferenceType>) + 1); // Total number of threads currently working. _ThreadIndex __busy = 0; _Job<_DifferenceType> *__job; omp_lock_t __output_lock; omp_init_lock(&__output_lock); // Write base value to output. __output = __base; // No more threads than jobs, at least one thread. _ThreadIndex __num_threads = __gnu_parallel::max<_ThreadIndex> (1, __gnu_parallel::min<_DifferenceType>(__length, __get_max_threads())); # pragma omp parallel shared(__busy) num_threads(__num_threads) { # pragma omp single { __num_threads = omp_get_num_threads(); // Create job description array. __job = new _Job<_DifferenceType>[__num_threads * __stride]; } // Initialization phase. // Flags for every thread if it is doing productive work. bool __iam_working = false; // Thread id. _ThreadIndex __iam = omp_get_thread_num(); // This job. _Job<_DifferenceType>& __my_job = __job[__iam * __stride]; // Random number (for work stealing). _ThreadIndex __victim; // Local value for reduction. _Result __result = _Result(); // Number of elements to steal in one attempt. _DifferenceType __steal; // Every thread has its own random number generator // (modulo __num_threads). _RandomNumber __rand_gen(__iam, __num_threads); // This thread is currently working. # pragma omp atomic ++__busy; __iam_working = true; // How many jobs per thread? last thread gets the rest. __my_job._M_first = static_cast<_DifferenceType> (__iam * (__length / __num_threads)); __my_job._M_last = (__iam == (__num_threads - 1) ? (__length - 1) : ((__iam + 1) * (__length / __num_threads) - 1)); __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; // Init result with _M_first value (to have a base value for reduction) if (__my_job._M_first <= __my_job._M_last) { // Cannot use volatile variable directly. _DifferenceType __my_first = __my_job._M_first; __result = __f(__op, __begin + __my_first); ++__my_job._M_first; --__my_job._M_load; } _RAIter __current; # pragma omp barrier // Actual work phase // Work on own or stolen current start while (__busy > 0) { // Work until no productive thread left. # pragma omp flush(__busy) // Thread has own work to do while (__my_job._M_first <= __my_job._M_last) { // fetch-and-add call // Reserve current job block (size __chunk_size) in my queue. _DifferenceType __current_job = __fetch_and_add<_DifferenceType>(&(__my_job._M_first), __chunk_size); // Update _M_load, to make the three values consistent, // _M_first might have been changed in the meantime __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; for (_DifferenceType __job_counter = 0; __job_counter < __chunk_size && __current_job <= __my_job._M_last; ++__job_counter) { // Yes: process it! __current = __begin + __current_job; ++__current_job; // Do actual work. __result = __r(__result, __f(__op, __current)); } # pragma omp flush(__busy) } // After reaching this point, a thread's __job list is empty. if (__iam_working) { // This thread no longer has work. # pragma omp atomic --__busy; __iam_working = false; } _DifferenceType __supposed_first, __supposed_last, __supposed_load; do { // Find random nonempty deque (not own), do consistency check. __yield(); # pragma omp flush(__busy) __victim = __rand_gen(); __supposed_first = __job[__victim * __stride]._M_first; __supposed_last = __job[__victim * __stride]._M_last; __supposed_load = __job[__victim * __stride]._M_load; } while (__busy > 0 && ((__supposed_load <= 0) || ((__supposed_first + __supposed_load - 1) != __supposed_last))); if (__busy == 0) break; if (__supposed_load > 0) { // Has work and work to do. // Number of elements to steal (at least one). __steal = (__supposed_load < 2) ? 1 : __supposed_load / 2; // Push __victim's current start forward. _DifferenceType __stolen_first = __fetch_and_add<_DifferenceType> (&(__job[__victim * __stride]._M_first), __steal); _DifferenceType __stolen_try = (__stolen_first + __steal - _DifferenceType(1)); __my_job._M_first = __stolen_first; __my_job._M_last = __gnu_parallel::min(__stolen_try, __supposed_last); __my_job._M_load = __my_job._M_last - __my_job._M_first + 1; // Has potential work again. # pragma omp atomic ++__busy; __iam_working = true; # pragma omp flush(__busy) } # pragma omp flush(__busy) } // end while __busy > 0 // Add accumulated result to output. omp_set_lock(&__output_lock); __output = __r(__output, __result); omp_unset_lock(&__output_lock); } delete[] __job; // Points to last element processed (needed as return value for // some algorithms like transform) __f._M_finish_iterator = __begin + __length; omp_destroy_lock(&__output_lock); return __op; } } // end namespace #endif /* _GLIBCXX_PARALLEL_WORKSTEALING_H */