diff --git a/waflib/Runner.py b/waflib/Runner.py index 3f183eda..12cbb931 100644 --- a/waflib/Runner.py +++ b/waflib/Runner.py @@ -105,8 +105,11 @@ class Parallel(object): self.outstanding = Utils.deque() """List of :py:class:`waflib.Task.Task` that may be ready to be executed""" - self.frozen = Utils.deque() - """List of :py:class:`waflib.Task.Task` that are not ready yet""" + self.frozen = set() + """Set of :py:class:`waflib.Task.Task` that need other tasks to complete first""" + + self.incomplete = Utils.deque() + """List of :py:class:`waflib.Task.Task` with incomplete dependencies""" self.ready = Queue(0) """List of :py:class:`waflib.Task.Task` ready to be executed by consumers""" @@ -134,6 +137,11 @@ class Parallel(object): Flag that indicates that the build cache must be saved when a task was executed (calls :py:meth:`waflib.Build.BuildContext.store`)""" + self.revdeps = Utils.defaultdict(list) + """ + The reverse dependency graph of dependencies obtained from Task.run_after + """ + self.spawner = Spawner(self) """ Coordinating daemon thread that spawns thread consumers @@ -151,16 +159,16 @@ class Parallel(object): def postpone(self, tsk): """ - Adds the task to the list :py:attr:`waflib.Runner.Parallel.frozen`. + Adds the task to the list :py:attr:`waflib.Runner.Parallel.incomplete`. The order is scrambled so as to consume as many tasks in parallel as possible. :param tsk: task instance :type tsk: :py:class:`waflib.Task.Task` """ if random.randint(0, 1): - self.frozen.appendleft(tsk) + self.incomplete.appendleft(tsk) else: - self.frozen.append(tsk) + self.incomplete.append(tsk) def refill_task_list(self): """ @@ -172,7 +180,7 @@ class Parallel(object): while not self.outstanding: if self.count: self.get_out() - elif self.frozen: + elif self.incomplete: try: cond = self.deadlock == self.processed except AttributeError: @@ -180,36 +188,71 @@ class Parallel(object): else: if cond: msg = 'check the build order for the tasks' - for tsk in self.frozen: + for tsk in self.incomplete: if not tsk.run_after: msg = 'check the methods runnable_status' break lst = [] - for tsk in self.frozen: + for tsk in self.incomplete: lst.append('%s\t-> %r' % (repr(tsk), [id(x) for x in tsk.run_after])) raise Errors.WafError('Deadlock detected: %s%s' % (msg, ''.join(lst))) self.deadlock = self.processed - if self.frozen: - self.outstanding.extend(self.frozen) - self.frozen.clear() + if self.incomplete: + self.outstanding.extend(self.incomplete) + self.incomplete.clear() elif not self.count: - self.outstanding.extend(next(self.biter)) + tasks = next(self.biter) + ready, waiting = self.prio_and_split(tasks) + # We cannot use a priority queue because the implementation + # must be able to handle incomplete dependencies + self.outstanding.extend(ready) + self.frozen.update(waiting) self.total = self.bld.total() break + def insert_with_prio(self, tsk): + # TODO the deque interface has insert in python 3.5 :-/ + if self.outstanding and tsk.prio >= self.outstanding[0].prio: + self.outstanding.appendleft(tsk) + else: + self.outstanding.append(tsk) + def add_more_tasks(self, tsk): """ If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained in that list are added to the current build and will be processed before the next build group. + Assume that the task is done, so that task priorities do not need + to be re-calculated + :param tsk: task instance :type tsk: :py:attr:`waflib.Task.Task` """ if getattr(tsk, 'more_tasks', None): - self.outstanding.extend(tsk.more_tasks) + ready, waiting = self.prio_and_split(tsk.tasks) + for k in ready: + # TODO could be better, but we will have 1 task in general? + self.insert_with_prio(k) + self.frozen.update(waiting) self.total += len(tsk.more_tasks) + def mark_finished(self, tsk): + # we assume that frozen tasks will be consumed as the build goes + if tsk in self.revdeps: + for x in self.revdeps[tsk]: + # ancestors are likely to be frozen + if x in self.frozen: + # TODO remove dependencies to free some memory? + # x.run_after.remove(tsk) + for k in x.run_after: + if not k.hasrun: + break + else: + self.frozen.remove(x) + self.insert_with_prio(x) + del self.revdeps[tsk] + def get_out(self): """ Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution. @@ -220,6 +263,8 @@ class Parallel(object): tsk = self.out.get() if not self.stop: self.add_more_tasks(tsk) + self.mark_finished(tsk) + self.count -= 1 self.dirty = True return tsk @@ -238,12 +283,14 @@ class Parallel(object): Mark a task as skipped/up-to-date """ tsk.hasrun = Task.SKIPPED + self.mark_finished(tsk) def cancel(self, tsk): """ Mark a task as failed because of unsatisfiable dependencies """ tsk.hasrun = Task.CANCELED + self.mark_finished(tsk) def error_handler(self, tsk): """ @@ -360,3 +407,85 @@ class Parallel(object): self.ready.put(None) assert (self.count == 0 or self.stop) + def prio_and_split(self, tasks): + """ + Label input tasks with priority values, and return a pair containing + the tasks that are ready to run and the tasks that are necessarily + waiting for other tasks to complete. + + The priority system is really meant as an optional layer for optimization: + dependency cycles are found more quickly, and build should be more efficient + + :return: A pair of task lists + :rtype: tuple + """ + # to disable: + #return tasks, [] + for x in tasks: + x.visited = 0 + + reverse = self.revdeps + + for x in tasks: + for k in x.run_after: + reverse[k].append(x) + + # the priority number is not the tree size + def visit(n): + if n.visited == 0: + n.visited = 1 + if n in reverse: + rev = reverse[n] + n.prio = n.priority() + len(rev) + sum(visit(k) for k in rev) + else: + n.prio = n.priority() + n.visited = 2 + elif n.visited == 1: + raise Errors.WafError('Dependency cycle found!') + return n.prio + + for x in tasks: + if x.visited != 0: + # must visit all to detect cycles + continue + try: + visit(x) + except Errors.WafError: + self.debug_cycles(tasks, reverse) + + ready = [] + waiting = [] + for x in tasks: + for k in x.run_after: + if not k.hasrun: + waiting.append(x) + break + else: + ready.append(x) + + ready.sort(key=lambda x: x.prio, reverse=True) + return (ready, waiting) + + def debug_cycles(self, tasks, reverse): + # TODO display more than one cycle? + tmp = {} + for x in tasks: + tmp[x] = 0 + + def visit(n, acc): + if tmp[n] == 0: + tmp[n] = 1 + for k in reverse.get(n, []): + visit(k, [n] + acc) + tmp[n] = 2 + elif tmp[n] == 1: + lst = [] + for tsk in acc: + lst.append(repr(tsk)) + if tsk is n: + # exclude prior nodes, we want the minimum cycle + break + raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst)) + for x in tasks: + visit(x, []) + diff --git a/waflib/Task.py b/waflib/Task.py index e42e09b9..9df9e0fd 100644 --- a/waflib/Task.py +++ b/waflib/Task.py @@ -209,6 +209,15 @@ class Task(evil): x = '"%s"' % x return x + def priority(self): + """ + The default priority for this task instance + + :return: a positive numeric value representing the urgency of running this task early + :rtype: int + """ + return getattr(self, 'weight', 0) + def split_argfile(self, cmd): """ Splits a list of process commands into the executable part and its list of arguments