Add a new task semaphore system

This commit is contained in:
Thomas Nagy 2018-07-29 11:16:16 +02:00
parent a10822f688
commit 3f4acd6ff3
No known key found for this signature in database
GPG Key ID: 49B4C67C05277AAA
3 changed files with 131 additions and 15 deletions

View File

@ -0,0 +1,44 @@
#! /usr/bin/env python
"""
Task semaphore demo. Compare the runtimes:
waf configure build --fast # 0m08
waf configure build # 1m15
"""
import random, time
from waflib import Task, TaskGen, Utils
def options(opt):
opt.add_option('--fast', action='store_true', default=False, help='Disable the semaphore to compare the runtime', dest='fast')
def configure(conf):
pass
def build(bld):
# max 20 jobs globally
bld.jobs = 20
bld(features='make_100')
class Foo(Task.Task):
always_run = True
if not bld.options.fast:
semaphore = Task.TaskSemaphore(2) # 2 jobs maximum
def uid(self):
# unique id for each object
return Utils.h_list(self.num)
def run(self):
time.sleep(random.randint(1000, 2000) / 1000.)
print("Task %r" % self.num)
@TaskGen.feature('make_100')
def make_100_bound_tasks(self):
for x in range(100):
tsk = self.create_task('Foo')
tsk.num = x

View File

@ -329,6 +329,14 @@ class Parallel(object):
try_unfreeze(x)
del self.revdeps[tsk]
if hasattr(tsk, 'semaphore'):
sem = tsk.semaphore
sem.release(tsk)
while sem.waiting and not sem.is_locked():
# take a frozen task, make it ready to run
x = sem.waiting.pop()
self._add_task(x)
def get_out(self):
"""
Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
@ -352,8 +360,29 @@ class Parallel(object):
:param tsk: task instance
:type tsk: :py:attr:`waflib.Task.Task`
"""
# TODO change in waf 2.1
self.ready.put(tsk)
def _add_task(self, tsk):
if hasattr(tsk, 'semaphore'):
sem = tsk.semaphore
try:
sem.acquire(tsk)
except IndexError:
sem.waiting.add(tsk)
return
self.count += 1
self.processed += 1
if self.numjobs == 1:
tsk.log_display(tsk.generator.bld)
try:
self.process_task(tsk)
finally:
self.out.put(tsk)
else:
self.add_task(tsk)
def process_task(self, tsk):
"""
Processes a task and attempts to stop the build in case of errors
@ -453,17 +482,7 @@ class Parallel(object):
st = self.task_status(tsk)
if st == Task.RUN_ME:
self.count += 1
self.processed += 1
if self.numjobs == 1:
tsk.log_display(tsk.generator.bld)
try:
self.process_task(tsk)
finally:
self.out.put(tsk)
else:
self.add_task(tsk)
self._add_task(tsk)
elif st == Task.ASK_LATER:
self.postpone(tsk)
elif st == Task.SKIP_ME:

View File

@ -133,10 +133,12 @@ evil = store_task_type('evil', (object,), {})
class Task(evil):
"""
This class deals with the filesystem (:py:class:`waflib.Node.Node`). The method :py:class:`waflib.Task.Task.runnable_status`
uses a hash value (from :py:class:`waflib.Task.Task.signature`) which is persistent from build to build. When the value changes,
the task has to be executed. The method :py:class:`waflib.Task.Task.post_run` will assign the task signature to the output
nodes (if present).
Task objects represents actions to perform such as commands to execute by calling the `run` method.
Detecting when to execute a task occurs in the method :py:meth:`waflib.Task.Task.runnable_status`.
Detecting which tasks to execute is performed through a hash value returned by
:py:meth:`waflib.Task.Task.signature`. The task signature is persistent from build to build.
"""
vars = []
"""ConfigSet variables that should trigger a rebuild (class attribute used for :py:meth:`waflib.Task.Task.sig_vars`)"""
@ -1339,3 +1341,54 @@ def deep_inputs(cls):
TaskBase = Task
"Provided for compatibility reasons, TaskBase should not be used"
class TaskSemaphore(object):
"""
Task semaphores provide a simple and efficient way of throttling the amount of
a particular task to run concurrently. The throttling value is capped
by the amount of maximum jobs, so for example, a `TaskSemaphore(10)`
has no effect in a `-j2` build.
Task semaphores are typically specified on the task class level::
class compile(waflib.Task.Task):
semaphore = waflib.Task.TaskSemaphore(2)
run_str = 'touch ${TGT}'
Task semaphores are meant to be used by the build scheduler in the main
thread, so there are no guarantees of thread safety.
"""
def __init__(self, num):
"""
:param num: maximum value of concurrent tasks
:type num: int
"""
self.num = num
self.locking = set()
self.waiting = set()
def is_locked(self):
"""Returns True if this semaphore cannot be acquired by more tasks"""
return len(self.locking) >= self.num
def acquire(self, tsk):
"""
Mark the semaphore as used by the given task (not re-entrant).
:param tsk: task object
:type tsk: :py:class:`waflib.Task.Task`
:raises: :py:class:`IndexError` in case the resource is already acquired
"""
if self.is_locked():
raise IndexError('Cannot lock more %r' % self.locking)
self.locking.add(tsk)
def release(self, tsk):
"""
Mark the semaphore as unused by the given task.
:param tsk: task object
:type tsk: :py:class:`waflib.Task.Task`
:raises: :py:class:`KeyError` in case the resource is not acquired by the task
"""
self.locking.remove(tsk)