From 3f4acd6ff3fea3b8342505225659ae37fb8a6f3a Mon Sep 17 00:00:00 2001 From: Thomas Nagy Date: Sun, 29 Jul 2018 11:16:16 +0200 Subject: [PATCH] Add a new task semaphore system --- playground/task_semaphore/wscript | 44 ++++++++++++++++++++++ waflib/Runner.py | 41 +++++++++++++++------ waflib/Task.py | 61 +++++++++++++++++++++++++++++-- 3 files changed, 131 insertions(+), 15 deletions(-) create mode 100644 playground/task_semaphore/wscript diff --git a/playground/task_semaphore/wscript b/playground/task_semaphore/wscript new file mode 100644 index 00000000..cd11486e --- /dev/null +++ b/playground/task_semaphore/wscript @@ -0,0 +1,44 @@ +#! /usr/bin/env python + +""" +Task semaphore demo. Compare the runtimes: + + waf configure build --fast # 0m08 + waf configure build # 1m15 +""" + +import random, time +from waflib import Task, TaskGen, Utils + +def options(opt): + opt.add_option('--fast', action='store_true', default=False, help='Disable the semaphore to compare the runtime', dest='fast') + +def configure(conf): + pass + +def build(bld): + # max 20 jobs globally + bld.jobs = 20 + + bld(features='make_100') + + class Foo(Task.Task): + always_run = True + + if not bld.options.fast: + semaphore = Task.TaskSemaphore(2) # 2 jobs maximum + + def uid(self): + # unique id for each object + return Utils.h_list(self.num) + + def run(self): + time.sleep(random.randint(1000, 2000) / 1000.) + print("Task %r" % self.num) + + @TaskGen.feature('make_100') + def make_100_bound_tasks(self): + for x in range(100): + tsk = self.create_task('Foo') + tsk.num = x + diff --git a/waflib/Runner.py b/waflib/Runner.py index 975c3292..261084d2 100644 --- a/waflib/Runner.py +++ b/waflib/Runner.py @@ -329,6 +329,14 @@ class Parallel(object): try_unfreeze(x) del self.revdeps[tsk] + if hasattr(tsk, 'semaphore'): + sem = tsk.semaphore + sem.release(tsk) + while sem.waiting and not sem.is_locked(): + # take a frozen task, make it ready to run + x = sem.waiting.pop() + self._add_task(x) + def get_out(self): """ Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution. @@ -352,8 +360,29 @@ class Parallel(object): :param tsk: task instance :type tsk: :py:attr:`waflib.Task.Task` """ + # TODO change in waf 2.1 self.ready.put(tsk) + def _add_task(self, tsk): + if hasattr(tsk, 'semaphore'): + sem = tsk.semaphore + try: + sem.acquire(tsk) + except IndexError: + sem.waiting.add(tsk) + return + + self.count += 1 + self.processed += 1 + if self.numjobs == 1: + tsk.log_display(tsk.generator.bld) + try: + self.process_task(tsk) + finally: + self.out.put(tsk) + else: + self.add_task(tsk) + def process_task(self, tsk): """ Processes a task and attempts to stop the build in case of errors @@ -453,17 +482,7 @@ class Parallel(object): st = self.task_status(tsk) if st == Task.RUN_ME: - self.count += 1 - self.processed += 1 - - if self.numjobs == 1: - tsk.log_display(tsk.generator.bld) - try: - self.process_task(tsk) - finally: - self.out.put(tsk) - else: - self.add_task(tsk) + self._add_task(tsk) elif st == Task.ASK_LATER: self.postpone(tsk) elif st == Task.SKIP_ME: diff --git a/waflib/Task.py b/waflib/Task.py index fd7a453f..0fc449d4 100644 --- a/waflib/Task.py +++ b/waflib/Task.py @@ -133,10 +133,12 @@ evil = store_task_type('evil', (object,), {}) class Task(evil): """ - This class deals with the filesystem (:py:class:`waflib.Node.Node`). The method :py:class:`waflib.Task.Task.runnable_status` - uses a hash value (from :py:class:`waflib.Task.Task.signature`) which is persistent from build to build. When the value changes, - the task has to be executed. The method :py:class:`waflib.Task.Task.post_run` will assign the task signature to the output - nodes (if present). + Task objects represents actions to perform such as commands to execute by calling the `run` method. + + Detecting when to execute a task occurs in the method :py:meth:`waflib.Task.Task.runnable_status`. + + Detecting which tasks to execute is performed through a hash value returned by + :py:meth:`waflib.Task.Task.signature`. The task signature is persistent from build to build. """ vars = [] """ConfigSet variables that should trigger a rebuild (class attribute used for :py:meth:`waflib.Task.Task.sig_vars`)""" @@ -1339,3 +1341,54 @@ def deep_inputs(cls): TaskBase = Task "Provided for compatibility reasons, TaskBase should not be used" +class TaskSemaphore(object): + """ + Task semaphores provide a simple and efficient way of throttling the amount of + a particular task to run concurrently. The throttling value is capped + by the amount of maximum jobs, so for example, a `TaskSemaphore(10)` + has no effect in a `-j2` build. + + Task semaphores are typically specified on the task class level:: + + class compile(waflib.Task.Task): + semaphore = waflib.Task.TaskSemaphore(2) + run_str = 'touch ${TGT}' + + Task semaphores are meant to be used by the build scheduler in the main + thread, so there are no guarantees of thread safety. + """ + def __init__(self, num): + """ + :param num: maximum value of concurrent tasks + :type num: int + """ + self.num = num + self.locking = set() + self.waiting = set() + + def is_locked(self): + """Returns True if this semaphore cannot be acquired by more tasks""" + return len(self.locking) >= self.num + + def acquire(self, tsk): + """ + Mark the semaphore as used by the given task (not re-entrant). + + :param tsk: task object + :type tsk: :py:class:`waflib.Task.Task` + :raises: :py:class:`IndexError` in case the resource is already acquired + """ + if self.is_locked(): + raise IndexError('Cannot lock more %r' % self.locking) + self.locking.add(tsk) + + def release(self, tsk): + """ + Mark the semaphore as unused by the given task. + + :param tsk: task object + :type tsk: :py:class:`waflib.Task.Task` + :raises: :py:class:`KeyError` in case the resource is not acquired by the task + """ + self.locking.remove(tsk) +