Implement a new priority system

This commit is contained in:
Thomas Nagy 2017-02-12 15:04:48 +01:00
parent ee98328c2f
commit 497f028a95
No known key found for this signature in database
GPG Key ID: 49B4C67C05277AAA
2 changed files with 151 additions and 13 deletions

View File

@ -105,8 +105,11 @@ class Parallel(object):
self.outstanding = Utils.deque()
"""List of :py:class:`waflib.Task.Task` that may be ready to be executed"""
self.frozen = Utils.deque()
"""List of :py:class:`waflib.Task.Task` that are not ready yet"""
self.frozen = set()
"""Set of :py:class:`waflib.Task.Task` that need other tasks to complete first"""
self.incomplete = Utils.deque()
"""List of :py:class:`waflib.Task.Task` with incomplete dependencies"""
self.ready = Queue(0)
"""List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
@ -134,6 +137,11 @@ class Parallel(object):
Flag that indicates that the build cache must be saved when a task was executed
(calls :py:meth:`waflib.Build.BuildContext.store`)"""
self.revdeps = Utils.defaultdict(list)
"""
The reverse dependency graph of dependencies obtained from Task.run_after
"""
self.spawner = Spawner(self)
"""
Coordinating daemon thread that spawns thread consumers
@ -151,16 +159,16 @@ class Parallel(object):
def postpone(self, tsk):
"""
Adds the task to the list :py:attr:`waflib.Runner.Parallel.frozen`.
Adds the task to the list :py:attr:`waflib.Runner.Parallel.incomplete`.
The order is scrambled so as to consume as many tasks in parallel as possible.
:param tsk: task instance
:type tsk: :py:class:`waflib.Task.Task`
"""
if random.randint(0, 1):
self.frozen.appendleft(tsk)
self.incomplete.appendleft(tsk)
else:
self.frozen.append(tsk)
self.incomplete.append(tsk)
def refill_task_list(self):
"""
@ -172,7 +180,7 @@ class Parallel(object):
while not self.outstanding:
if self.count:
self.get_out()
elif self.frozen:
elif self.incomplete:
try:
cond = self.deadlock == self.processed
except AttributeError:
@ -180,36 +188,71 @@ class Parallel(object):
else:
if cond:
msg = 'check the build order for the tasks'
for tsk in self.frozen:
for tsk in self.incomplete:
if not tsk.run_after:
msg = 'check the methods runnable_status'
break
lst = []
for tsk in self.frozen:
for tsk in self.incomplete:
lst.append('%s\t-> %r' % (repr(tsk), [id(x) for x in tsk.run_after]))
raise Errors.WafError('Deadlock detected: %s%s' % (msg, ''.join(lst)))
self.deadlock = self.processed
if self.frozen:
self.outstanding.extend(self.frozen)
self.frozen.clear()
if self.incomplete:
self.outstanding.extend(self.incomplete)
self.incomplete.clear()
elif not self.count:
self.outstanding.extend(next(self.biter))
tasks = next(self.biter)
ready, waiting = self.prio_and_split(tasks)
# We cannot use a priority queue because the implementation
# must be able to handle incomplete dependencies
self.outstanding.extend(ready)
self.frozen.update(waiting)
self.total = self.bld.total()
break
def insert_with_prio(self, tsk):
# TODO the deque interface has insert in python 3.5 :-/
if self.outstanding and tsk.prio >= self.outstanding[0].prio:
self.outstanding.appendleft(tsk)
else:
self.outstanding.append(tsk)
def add_more_tasks(self, tsk):
"""
If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
in that list are added to the current build and will be processed before the next build group.
Assume that the task is done, so that task priorities do not need
to be re-calculated
:param tsk: task instance
:type tsk: :py:attr:`waflib.Task.Task`
"""
if getattr(tsk, 'more_tasks', None):
self.outstanding.extend(tsk.more_tasks)
ready, waiting = self.prio_and_split(tsk.tasks)
for k in ready:
# TODO could be better, but we will have 1 task in general?
self.insert_with_prio(k)
self.frozen.update(waiting)
self.total += len(tsk.more_tasks)
def mark_finished(self, tsk):
# we assume that frozen tasks will be consumed as the build goes
if tsk in self.revdeps:
for x in self.revdeps[tsk]:
# ancestors are likely to be frozen
if x in self.frozen:
# TODO remove dependencies to free some memory?
# x.run_after.remove(tsk)
for k in x.run_after:
if not k.hasrun:
break
else:
self.frozen.remove(x)
self.insert_with_prio(x)
del self.revdeps[tsk]
def get_out(self):
"""
Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
@ -220,6 +263,8 @@ class Parallel(object):
tsk = self.out.get()
if not self.stop:
self.add_more_tasks(tsk)
self.mark_finished(tsk)
self.count -= 1
self.dirty = True
return tsk
@ -238,12 +283,14 @@ class Parallel(object):
Mark a task as skipped/up-to-date
"""
tsk.hasrun = Task.SKIPPED
self.mark_finished(tsk)
def cancel(self, tsk):
"""
Mark a task as failed because of unsatisfiable dependencies
"""
tsk.hasrun = Task.CANCELED
self.mark_finished(tsk)
def error_handler(self, tsk):
"""
@ -360,3 +407,85 @@ class Parallel(object):
self.ready.put(None)
assert (self.count == 0 or self.stop)
def prio_and_split(self, tasks):
"""
Label input tasks with priority values, and return a pair containing
the tasks that are ready to run and the tasks that are necessarily
waiting for other tasks to complete.
The priority system is really meant as an optional layer for optimization:
dependency cycles are found more quickly, and build should be more efficient
:return: A pair of task lists
:rtype: tuple
"""
# to disable:
#return tasks, []
for x in tasks:
x.visited = 0
reverse = self.revdeps
for x in tasks:
for k in x.run_after:
reverse[k].append(x)
# the priority number is not the tree size
def visit(n):
if n.visited == 0:
n.visited = 1
if n in reverse:
rev = reverse[n]
n.prio = n.priority() + len(rev) + sum(visit(k) for k in rev)
else:
n.prio = n.priority()
n.visited = 2
elif n.visited == 1:
raise Errors.WafError('Dependency cycle found!')
return n.prio
for x in tasks:
if x.visited != 0:
# must visit all to detect cycles
continue
try:
visit(x)
except Errors.WafError:
self.debug_cycles(tasks, reverse)
ready = []
waiting = []
for x in tasks:
for k in x.run_after:
if not k.hasrun:
waiting.append(x)
break
else:
ready.append(x)
ready.sort(key=lambda x: x.prio, reverse=True)
return (ready, waiting)
def debug_cycles(self, tasks, reverse):
# TODO display more than one cycle?
tmp = {}
for x in tasks:
tmp[x] = 0
def visit(n, acc):
if tmp[n] == 0:
tmp[n] = 1
for k in reverse.get(n, []):
visit(k, [n] + acc)
tmp[n] = 2
elif tmp[n] == 1:
lst = []
for tsk in acc:
lst.append(repr(tsk))
if tsk is n:
# exclude prior nodes, we want the minimum cycle
break
raise Errors.WafError('Task dependency cycle in "run_after" constraints: %s' % ''.join(lst))
for x in tasks:
visit(x, [])

View File

@ -209,6 +209,15 @@ class Task(evil):
x = '"%s"' % x
return x
def priority(self):
"""
The default priority for this task instance
:return: a positive numeric value representing the urgency of running this task early
:rtype: int
"""
return getattr(self, 'weight', 0)
def split_argfile(self, cmd):
"""
Splits a list of process commands into the executable part and its list of arguments