diff --git a/waflib/Runner.py b/waflib/Runner.py index 70b804df..b171ed42 100644 --- a/waflib/Runner.py +++ b/waflib/Runner.py @@ -6,7 +6,7 @@ Runner.py: Task scheduling and execution """ -import random +import heapq try: from queue import Queue except ImportError: @@ -18,6 +18,31 @@ GAP = 20 Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run """ +class PriorityTasks(object): + def __init__(self): + self.lst = [] + def __len__(self): + return len(self.lst) + def __iter__(self): + return iter(self.lst) + def clear(self): + self.lst = [] + def append(self, task): + heapq.heappush(self.lst, task) + def pop(self): + return heapq.heappop(self.lst) + def extend(self, lst): + if self.lst: + # TODO no heapq.merge in Python 2.5 + for x in lst: + self.append(x) + else: + if isinstance(lst, list): + self.lst = lst + heapq.heapify(lst) + else: + self.lst = lst.queue + class Consumer(Utils.threading.Thread): """ Daemon thread object that executes a task. It shares a semaphore with @@ -102,11 +127,11 @@ class Parallel(object): Instance of :py:class:`waflib.Build.BuildContext` """ - self.outstanding = Utils.deque() - """List of :py:class:`waflib.Task.Task` that may be ready to be executed""" + self.outstanding = PriorityTasks() + """Heap of :py:class:`waflib.Task.Task` that may be ready to be executed""" - self.postponed = Utils.deque() - """List of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons""" + self.postponed = PriorityTasks() + """Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons""" self.incomplete = set() """List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)""" @@ -155,7 +180,7 @@ class Parallel(object): """ if not self.outstanding: return None - return self.outstanding.popleft() + return self.outstanding.pop() def postpone(self, tsk): """ @@ -165,10 +190,7 @@ class Parallel(object): :param tsk: task instance :type tsk: :py:class:`waflib.Task.Task` """ - if random.randint(0, 1): - self.postponed.appendleft(tsk) - else: - self.postponed.append(tsk) + self.postponed.append(tsk) def refill_task_list(self): """ @@ -207,38 +229,24 @@ class Parallel(object): # We cannot use a priority queue because the implementation # must be able to handle postponed dependencies self.outstanding.extend(ready) - self.postponed.extend(waiting) + self.incomplete.update(waiting) self.total = self.bld.total() break - def insert_with_prio(self, tsk): - # TODO the deque interface has insert in python 3.5 :-/ - if self.outstanding: - try: - if tsk.prio >= self.outstanding[0].prio: - self.outstanding.appendleft(tsk) - else: - self.outstanding.append(tsk) - except AttributeError: - self.outstanding.appendleft(tsk) - else: - self.outstanding.append(tsk) - def add_more_tasks(self, tsk): """ If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained in that list are added to the current build and will be processed before the next build group. - The priorities for dependent tasks are not calculated globally + The priorities for dependent tasks are not re-calculated globally :param tsk: task instance :type tsk: :py:attr:`waflib.Task.Task` """ if getattr(tsk, 'more_tasks', None): + # TODO recompute priorities globally? ready, waiting = self.prio_and_split(tsk.more_tasks) - for k in ready: - # TODO could be better, but we will have 1 task in general? - self.insert_with_prio(k) + self.outstanding.extend(ready) self.incomplete.update(waiting) self.total += len(tsk.more_tasks) @@ -253,7 +261,7 @@ class Parallel(object): break else: self.incomplete.remove(x) - self.insert_with_prio(x) + self.outstanding.append(x) if tsk in self.revdeps: for x in self.revdeps[tsk]: @@ -468,13 +476,13 @@ class Parallel(object): n.visited = 1 if n in reverse: rev = reverse[n] - n.prio = n.priority() + len(rev) + sum(visit(k) for k in rev) + n.__order = n.priority() + len(rev) + sum(visit(k) for k in rev) else: - n.prio = n.priority() + n.__order = n.priority() n.visited = 2 elif n.visited == 1: raise Errors.WafError('Dependency cycle found!') - return n.prio + return n.__order for x in tasks: if x.visited != 0: @@ -494,8 +502,6 @@ class Parallel(object): break else: ready.append(x) - - ready.sort(key=lambda x: x.prio, reverse=True) return (ready, waiting) def debug_cycles(self, tasks, reverse): diff --git a/waflib/Task.py b/waflib/Task.py index 37bee4f2..3628b7dd 100644 --- a/waflib/Task.py +++ b/waflib/Task.py @@ -153,7 +153,7 @@ class Task(evil): This may be useful for certain extensions but it can a lot of memory. """ - __slots__ = ('hasrun', 'generator', 'env', 'inputs', 'outputs', 'dep_nodes', 'run_after') + __slots__ = ('hasrun', 'generator', 'env', 'inputs', 'outputs', 'dep_nodes', 'run_after', '__order') def __init__(self, *k, **kw): self.hasrun = NOT_RUN @@ -177,6 +177,18 @@ class Task(evil): self.run_after = set() """Set of tasks that must be executed before this one""" + self.__order = 0 + """Task build order; used internally""" + + def __lt__(self, other): + return self.__order > other.__order or id(self) > id(other) + def __le__(self, other): + return self.__order >= other.__order or id(self) >= id(other) + def __gt__(self, other): + return self.__order < other.__order or id(self) < id(other) + def __ge__(self, other): + return self.__order <= other.__order or id(self) <= id(other) + def get_cwd(self): """ :return: current working directory @@ -213,7 +225,7 @@ class Task(evil): """ The default priority for this task instance - :return: a positive numeric value representing the urgency of running this task early + :return: a numeric value representing the urgency of running this task (the higher, the sooner) :rtype: int """ return getattr(self, 'weight', 0)