Schedule tasks using a priority queue

This commit is contained in:
Thomas Nagy 2017-06-14 18:59:28 +02:00
parent ddce7d344e
commit 23c0d41aff
2 changed files with 54 additions and 36 deletions

View File

@ -6,7 +6,7 @@
Runner.py: Task scheduling and execution
"""
import random
import heapq
try:
from queue import Queue
except ImportError:
@ -18,6 +18,31 @@ GAP = 20
Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
"""
class PriorityTasks(object):
def __init__(self):
self.lst = []
def __len__(self):
return len(self.lst)
def __iter__(self):
return iter(self.lst)
def clear(self):
self.lst = []
def append(self, task):
heapq.heappush(self.lst, task)
def pop(self):
return heapq.heappop(self.lst)
def extend(self, lst):
if self.lst:
# TODO no heapq.merge in Python 2.5
for x in lst:
self.append(x)
else:
if isinstance(lst, list):
self.lst = lst
heapq.heapify(lst)
else:
self.lst = lst.queue
class Consumer(Utils.threading.Thread):
"""
Daemon thread object that executes a task. It shares a semaphore with
@ -102,11 +127,11 @@ class Parallel(object):
Instance of :py:class:`waflib.Build.BuildContext`
"""
self.outstanding = Utils.deque()
"""List of :py:class:`waflib.Task.Task` that may be ready to be executed"""
self.outstanding = PriorityTasks()
"""Heap of :py:class:`waflib.Task.Task` that may be ready to be executed"""
self.postponed = Utils.deque()
"""List of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
self.postponed = PriorityTasks()
"""Heap of :py:class:`waflib.Task.Task` which are not ready to run for non-DAG reasons"""
self.incomplete = set()
"""List of :py:class:`waflib.Task.Task` waiting for dependent tasks to complete (DAG)"""
@ -155,7 +180,7 @@ class Parallel(object):
"""
if not self.outstanding:
return None
return self.outstanding.popleft()
return self.outstanding.pop()
def postpone(self, tsk):
"""
@ -165,10 +190,7 @@ class Parallel(object):
:param tsk: task instance
:type tsk: :py:class:`waflib.Task.Task`
"""
if random.randint(0, 1):
self.postponed.appendleft(tsk)
else:
self.postponed.append(tsk)
self.postponed.append(tsk)
def refill_task_list(self):
"""
@ -207,38 +229,24 @@ class Parallel(object):
# We cannot use a priority queue because the implementation
# must be able to handle postponed dependencies
self.outstanding.extend(ready)
self.postponed.extend(waiting)
self.incomplete.update(waiting)
self.total = self.bld.total()
break
def insert_with_prio(self, tsk):
# TODO the deque interface has insert in python 3.5 :-/
if self.outstanding:
try:
if tsk.prio >= self.outstanding[0].prio:
self.outstanding.appendleft(tsk)
else:
self.outstanding.append(tsk)
except AttributeError:
self.outstanding.appendleft(tsk)
else:
self.outstanding.append(tsk)
def add_more_tasks(self, tsk):
"""
If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
in that list are added to the current build and will be processed before the next build group.
The priorities for dependent tasks are not calculated globally
The priorities for dependent tasks are not re-calculated globally
:param tsk: task instance
:type tsk: :py:attr:`waflib.Task.Task`
"""
if getattr(tsk, 'more_tasks', None):
# TODO recompute priorities globally?
ready, waiting = self.prio_and_split(tsk.more_tasks)
for k in ready:
# TODO could be better, but we will have 1 task in general?
self.insert_with_prio(k)
self.outstanding.extend(ready)
self.incomplete.update(waiting)
self.total += len(tsk.more_tasks)
@ -253,7 +261,7 @@ class Parallel(object):
break
else:
self.incomplete.remove(x)
self.insert_with_prio(x)
self.outstanding.append(x)
if tsk in self.revdeps:
for x in self.revdeps[tsk]:
@ -468,13 +476,13 @@ class Parallel(object):
n.visited = 1
if n in reverse:
rev = reverse[n]
n.prio = n.priority() + len(rev) + sum(visit(k) for k in rev)
n.__order = n.priority() + len(rev) + sum(visit(k) for k in rev)
else:
n.prio = n.priority()
n.__order = n.priority()
n.visited = 2
elif n.visited == 1:
raise Errors.WafError('Dependency cycle found!')
return n.prio
return n.__order
for x in tasks:
if x.visited != 0:
@ -494,8 +502,6 @@ class Parallel(object):
break
else:
ready.append(x)
ready.sort(key=lambda x: x.prio, reverse=True)
return (ready, waiting)
def debug_cycles(self, tasks, reverse):

View File

@ -153,7 +153,7 @@ class Task(evil):
This may be useful for certain extensions but it can a lot of memory.
"""
__slots__ = ('hasrun', 'generator', 'env', 'inputs', 'outputs', 'dep_nodes', 'run_after')
__slots__ = ('hasrun', 'generator', 'env', 'inputs', 'outputs', 'dep_nodes', 'run_after', '__order')
def __init__(self, *k, **kw):
self.hasrun = NOT_RUN
@ -177,6 +177,18 @@ class Task(evil):
self.run_after = set()
"""Set of tasks that must be executed before this one"""
self.__order = 0
"""Task build order; used internally"""
def __lt__(self, other):
return self.__order > other.__order or id(self) > id(other)
def __le__(self, other):
return self.__order >= other.__order or id(self) >= id(other)
def __gt__(self, other):
return self.__order < other.__order or id(self) < id(other)
def __ge__(self, other):
return self.__order <= other.__order or id(self) <= id(other)
def get_cwd(self):
"""
:return: current working directory
@ -213,7 +225,7 @@ class Task(evil):
"""
The default priority for this task instance
:return: a positive numeric value representing the urgency of running this task early
:return: a numeric value representing the urgency of running this task (the higher, the sooner)
:rtype: int
"""
return getattr(self, 'weight', 0)