libstdc++: Improve std::lock algorithm

The current std::lock algorithm is the one called "persistent" in Howard
Hinnant's https://howardhinnant.github.io/dining_philosophers.html post.
While it tends to perform acceptably fast, it wastes a lot of CPU cycles
by continuously locking and unlocking the uncontended mutexes.
Effectively, it's a spin lock with no back-off.

This replaces it with the one Howard calls "smart and polite". It's
smart, because when a Mi.try_lock() call fails because mutex Mi is
contended, the algorithm reorders the mutexes until Mi is first, then
calls Mi.lock(), to block until Mi is no longer contended.  It's
polite because it uses std::this_thread::yield() between the failed
Mi.try_lock() call and the Mi.lock() call. (In reality it uses
__gthread_yield() directly, because using this_thread::yield() would
require shuffling code around to avoid a circular dependency.)

This version of the algorithm is inspired by some hints from Howard, so
that it has strictly bounded stack usage. As the comment in the code
says:

// This function can recurse up to N levels deep, for N = 1+sizeof...(L1).
// On each recursion the lockables are rotated left one position,
// e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1.
// When a call to l_i.try_lock() fails it recurses/returns to depth=i
// so that l_i is the first argument, and then blocks until l_i is locked.

The 'i' parameter is the desired permuation of the lockables, and the
'depth' parameter is the depth in the call stack of the current
instantiation of the function template. If i == depth then the function
calls l0.lock() and then l1.try_lock()... for each lockable in the
parameter pack l1.  If i > depth then the function rotates the lockables
to the left one place, and calls itself again to go one level deeper.
Finally, if i < depth then the function returns to a shallower depth,
equivalent to a right rotate of the lockables.  When a call to
try_lock() fails, i is set to the index of the contended lockable, so
that the next call to l0.lock() will use the contended lockable as l0.

This commit also replaces the std::try_lock implementation details. The
new code is identical in behaviour, but uses a pair of constrained
function templates. This avoids instantiating a class template, and is a
litle simpler to call where used in std::__detail::__lock_impl and
std::try_lock.

Signed-off-by: Jonathan Wakely <jwakely@redhat.com>

libstdc++-v3/ChangeLog:

	* include/std/mutex (__try_to_lock): Move to __detail namespace.
	(struct __try_lock_impl): Replace with ...
	(__detail::__try_lock_impl<Idx>(tuple<Lockables...>&)): New
	function templates to implement std::try_lock.
	(try_lock): Use new __try_lock_impl.
	(__detail::__lock_impl(int, int&, L0&, L1&...)): New function
	template to implement std::lock.
	(lock): Use __lock_impl.
This commit is contained in:
Jonathan Wakely 2021-06-21 13:35:18 +01:00
parent 7232f7c4c2
commit 6cf0040fff

View File

@ -512,47 +512,44 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
#endif // _GLIBCXX_HAS_GTHREADS
/// @cond undocumented
template<typename _Lock>
inline unique_lock<_Lock>
__try_to_lock(_Lock& __l)
{ return unique_lock<_Lock>{__l, try_to_lock}; }
namespace __detail
{
template<typename _Lockable>
inline unique_lock<_Lockable>
__try_to_lock(_Lockable& __l)
{ return unique_lock<_Lockable>{__l, try_to_lock}; }
template<int _Idx, bool _Continue = true>
struct __try_lock_impl
{
template<typename... _Lock>
static void
__do_try_lock(tuple<_Lock&...>& __locks, int& __idx)
{
__idx = _Idx;
auto __lock = std::__try_to_lock(std::get<_Idx>(__locks));
if (__lock.owns_lock())
{
constexpr bool __cont = _Idx + 2 < sizeof...(_Lock);
using __try_locker = __try_lock_impl<_Idx + 1, __cont>;
__try_locker::__do_try_lock(__locks, __idx);
if (__idx == -1)
__lock.release();
}
}
};
// Lock the last element of the tuple, after all previous ones are locked.
template<int _Idx, typename... _Lockables>
inline __enable_if_t<_Idx + 1 == sizeof...(_Lockables), int>
__try_lock_impl(tuple<_Lockables&...>& __lockables)
{
if (auto __lock = __detail::__try_to_lock(std::get<_Idx>(__lockables)))
{
__lock.release();
return -1;
}
else
return _Idx;
}
template<int _Idx>
struct __try_lock_impl<_Idx, false>
{
template<typename... _Lock>
static void
__do_try_lock(tuple<_Lock&...>& __locks, int& __idx)
{
__idx = _Idx;
auto __lock = std::__try_to_lock(std::get<_Idx>(__locks));
if (__lock.owns_lock())
{
__idx = -1;
__lock.release();
}
}
};
// Lock tuple elements starting from _Idx.
template<int _Idx, typename... _Lockables>
inline __enable_if_t<_Idx + 1 != sizeof...(_Lockables), int>
__try_lock_impl(tuple<_Lockables&...>& __lockables)
{
if (auto __lock = __detail::__try_to_lock(std::get<_Idx>(__lockables)))
{
int __idx = __detail::__try_lock_impl<_Idx + 1>(__lockables);
if (__idx == -1)
__lock.release();
return __idx;
}
else
return _Idx;
}
} // namespace __detail
/// @endcond
/** @brief Generic try_lock.
@ -569,12 +566,52 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
int
try_lock(_Lock1& __l1, _Lock2& __l2, _Lock3&... __l3)
{
int __idx;
auto __locks = std::tie(__l1, __l2, __l3...);
__try_lock_impl<0>::__do_try_lock(__locks, __idx);
return __idx;
auto __lockables = std::tie(__l1, __l2, __l3...);
return __detail::__try_lock_impl<0>(__lockables);
}
/// @cond undocumented
namespace __detail
{
// This function can recurse up to N levels deep, for N = 1+sizeof...(L1).
// On each recursion the lockables are rotated left one position,
// e.g. depth 0: l0, l1, l2; depth 1: l1, l2, l0; depth 2: l2, l0, l1.
// When a call to l_i.try_lock() fails it recurses/returns to depth=i
// so that l_i is the first argument, and then blocks until l_i is locked.
template<typename _L0, typename... _L1>
void
__lock_impl(int& __i, int __depth, _L0& __l0, _L1&... __l1)
{
while (__i >= __depth)
{
if (__i == __depth)
{
int __failed = 1; // index that couldn't be locked
{
unique_lock<_L0> __first(__l0);
auto __rest = std::tie(__l1...);
__failed += __detail::__try_lock_impl<0>(__rest);
if (!__failed)
{
__i = -1; // finished
__first.release();
return;
}
}
#ifdef _GLIBCXX_USE_SCHED_YIELD
__gthread_yield();
#endif
constexpr auto __n = 1 + sizeof...(_L1);
__i = (__depth + __failed) % __n;
}
else // rotate left until l_i is first.
__detail::__lock_impl(__i, __depth + 1, __l1..., __l0);
}
}
} // namespace __detail
/// @endcond
/** @brief Generic lock.
* @param __l1 Meets Lockable requirements (try_lock() may throw).
* @param __l2 Meets Lockable requirements (try_lock() may throw).
@ -590,19 +627,8 @@ _GLIBCXX_BEGIN_NAMESPACE_VERSION
void
lock(_L1& __l1, _L2& __l2, _L3&... __l3)
{
while (true)
{
using __try_locker = __try_lock_impl<0, sizeof...(_L3) != 0>;
unique_lock<_L1> __first(__l1);
int __idx;
auto __locks = std::tie(__l2, __l3...);
__try_locker::__do_try_lock(__locks, __idx);
if (__idx == -1)
{
__first.release();
return;
}
}
int __i = 0;
__detail::__lock_impl(__i, 0, __l1, __l2, __l3...);
}
#if __cplusplus >= 201703L