waf/waflib/Runner.py

363 lines
9.4 KiB
Python
Raw Normal View History

2011-09-10 11:13:51 +02:00
#!/usr/bin/env python
# encoding: utf-8
2016-06-25 14:49:27 +02:00
# Thomas Nagy, 2005-2016 (ita)
2011-09-10 11:13:51 +02:00
"""
Runner.py: Task scheduling and execution
"""
import random
2011-09-10 11:13:51 +02:00
try:
from queue import Queue
2012-02-11 14:43:07 +01:00
except ImportError:
2011-09-10 11:13:51 +02:00
from Queue import Queue
2011-10-05 20:04:19 +02:00
from waflib import Utils, Task, Errors, Logs
2011-09-10 11:13:51 +02:00
2016-02-28 10:01:43 +01:00
GAP = 20
2011-09-10 11:13:51 +02:00
"""
2016-06-25 14:49:27 +02:00
Wait for at least ``GAP * njobs`` before trying to enqueue more tasks to run
2011-09-10 11:13:51 +02:00
"""
class Consumer(Utils.threading.Thread):
2016-06-25 14:49:27 +02:00
"""
Daemon thread object that executes a task. It shares a semaphore with
the coordinator :py:class:`waflib.Runner.Spawner`. There is one
instance per task to consume.
"""
def __init__(self, spawner, task):
2011-09-10 11:13:51 +02:00
Utils.threading.Thread.__init__(self)
self.task = task
2016-06-25 14:49:27 +02:00
"""Task to execute"""
self.spawner = spawner
2016-06-25 14:49:27 +02:00
"""Coordinator object"""
2011-09-10 11:13:51 +02:00
self.setDaemon(1)
self.start()
def run(self):
2016-06-25 14:49:27 +02:00
"""
Processes a single task
"""
2016-02-28 10:01:43 +01:00
try:
2016-02-28 10:13:03 +01:00
if not self.spawner.master.stop:
self.task.process()
2016-02-28 10:01:43 +01:00
finally:
2016-02-28 10:13:03 +01:00
self.spawner.sem.release()
2016-02-28 10:01:43 +01:00
self.spawner.master.out.put(self.task)
2016-02-28 10:13:03 +01:00
self.task = None
self.spawner = None
2011-09-10 11:13:51 +02:00
class Spawner(Utils.threading.Thread):
2016-06-25 14:49:27 +02:00
"""
Daemon thread that consumes tasks from :py:class:`waflib.Runner.Parallel` producer and
spawns a consuming thread :py:class:`waflib.Runner.Consumer` for each
:py:class:`waflib.Task.Task` instance.
2016-06-25 14:49:27 +02:00
"""
def __init__(self, master):
Utils.threading.Thread.__init__(self)
self.master = master
2016-06-25 14:49:27 +02:00
""":py:class:`waflib.Runner.Parallel` producer instance"""
self.sem = Utils.threading.Semaphore(master.numjobs)
2016-06-25 14:49:27 +02:00
"""Bounded semaphore that prevents spawning more than *n* concurrent consumers"""
self.setDaemon(1)
self.start()
def run(self):
2016-06-25 14:49:27 +02:00
"""
Spawns new consumers to execute tasks by delegating to :py:meth:`waflib.Runner.Spawner.loop`
"""
2016-02-28 10:01:43 +01:00
try:
self.loop()
except Exception:
# Python 2 prints unnecessary messages when shutting down
# we also want to stop the thread properly
pass
def loop(self):
2016-06-25 14:49:27 +02:00
"""
Consumes task objects from the producer; ends when the producer has no more
task to provide.
"""
master = self.master
2011-09-10 11:13:51 +02:00
while 1:
task = master.ready.get()
self.sem.acquire()
if not master.stop:
task.log_display(task.generator.bld)
Consumer(self, task)
2011-09-10 11:13:51 +02:00
class Parallel(object):
"""
Schedule the tasks obtained from the build context for execution.
"""
def __init__(self, bld, j=2):
"""
The initialization requires a build context reference
for computing the total number of jobs.
"""
self.numjobs = j
"""
2016-06-25 14:49:27 +02:00
Amount of parallel consumers to use
2011-09-10 11:13:51 +02:00
"""
self.bld = bld
"""
Instance of :py:class:`waflib.Build.BuildContext`
"""
2016-02-28 10:01:43 +01:00
self.outstanding = Utils.deque()
"""List of :py:class:`waflib.Task.Task` that may be ready to be executed"""
2011-09-10 11:13:51 +02:00
2016-02-28 10:01:43 +01:00
self.frozen = Utils.deque()
"""List of :py:class:`waflib.Task.Task` that are not ready yet"""
2011-09-10 11:13:51 +02:00
self.ready = Queue(0)
"""List of :py:class:`waflib.Task.Task` ready to be executed by consumers"""
2011-09-10 11:13:51 +02:00
self.out = Queue(0)
"""List of :py:class:`waflib.Task.Task` returned by the task consumers"""
2011-09-10 11:13:51 +02:00
self.count = 0
"""Amount of tasks that may be processed by :py:class:`waflib.Runner.TaskConsumer`"""
self.processed = 1
"""Amount of tasks processed"""
self.stop = False
"""Error flag to stop the build"""
self.error = []
"""Tasks that could not be executed"""
self.biter = None
"""Task iterator which must give groups of parallelizable tasks when calling ``next()``"""
self.dirty = False
2016-06-25 14:49:27 +02:00
"""
Flag that indicates that the build cache must be saved when a task was executed
(calls :py:meth:`waflib.Build.BuildContext.store`)"""
2011-09-10 11:13:51 +02:00
self.spawner = Spawner(self)
2016-06-25 14:49:27 +02:00
"""
Coordinating daemon thread that spawns thread consumers
"""
2011-09-10 11:13:51 +02:00
def get_next_task(self):
"""
2016-06-25 14:49:27 +02:00
Obtains the next Task instance to run
2011-09-10 11:13:51 +02:00
:rtype: :py:class:`waflib.Task.Task`
2011-09-10 11:13:51 +02:00
"""
if not self.outstanding:
return None
2016-02-28 10:01:43 +01:00
return self.outstanding.popleft()
2011-09-10 11:13:51 +02:00
def postpone(self, tsk):
"""
2016-06-25 14:49:27 +02:00
Adds the task to the list :py:attr:`waflib.Runner.Parallel.frozen`.
The order is scrambled so as to consume as many tasks in parallel as possible.
2011-09-10 11:13:51 +02:00
2016-06-25 14:49:27 +02:00
:param tsk: task instance
:type tsk: :py:class:`waflib.Task.Task`
2011-09-10 11:13:51 +02:00
"""
if random.randint(0, 1):
2016-02-28 10:01:43 +01:00
self.frozen.appendleft(tsk)
2011-09-10 11:13:51 +02:00
else:
self.frozen.append(tsk)
def refill_task_list(self):
"""
2016-06-25 14:49:27 +02:00
Adds the next group of tasks to execute in :py:attr:`waflib.Runner.Parallel.outstanding`.
2011-09-10 11:13:51 +02:00
"""
while self.count > self.numjobs * GAP:
self.get_out()
while not self.outstanding:
if self.count:
self.get_out()
elif self.frozen:
try:
cond = self.deadlock == self.processed
2012-02-11 14:43:07 +01:00
except AttributeError:
2011-09-10 11:13:51 +02:00
pass
else:
if cond:
msg = 'check the build order for the tasks'
for tsk in self.frozen:
if not tsk.run_after:
msg = 'check the methods runnable_status'
break
lst = []
for tsk in self.frozen:
lst.append('%s\t-> %r' % (repr(tsk), [id(x) for x in tsk.run_after]))
raise Errors.WafError('Deadlock detected: %s%s' % (msg, ''.join(lst)))
self.deadlock = self.processed
if self.frozen:
2016-02-28 11:37:00 +01:00
self.outstanding.extend(self.frozen)
2016-02-28 10:01:43 +01:00
self.frozen.clear()
2011-09-10 11:13:51 +02:00
elif not self.count:
self.outstanding.extend(next(self.biter))
self.total = self.bld.total()
break
def add_more_tasks(self, tsk):
"""
If a task provides :py:attr:`waflib.Task.Task.more_tasks`, then the tasks contained
2016-06-25 14:49:27 +02:00
in that list are added to the current build and will be processed before the next build group.
2011-09-10 11:13:51 +02:00
2016-06-25 14:49:27 +02:00
:param tsk: task instance
:type tsk: :py:attr:`waflib.Task.Task`
2011-09-10 11:13:51 +02:00
"""
if getattr(tsk, 'more_tasks', None):
2016-02-28 11:37:00 +01:00
self.outstanding.extend(tsk.more_tasks)
2011-09-10 11:13:51 +02:00
self.total += len(tsk.more_tasks)
def get_out(self):
"""
2016-06-25 14:49:27 +02:00
Waits for a Task that task consumers add to :py:attr:`waflib.Runner.Parallel.out` after execution.
Adds more Tasks if necessary through :py:attr:`waflib.Runner.Parallel.add_more_tasks`.
2011-09-10 11:13:51 +02:00
:rtype: :py:attr:`waflib.Task.Task`
2011-09-10 11:13:51 +02:00
"""
tsk = self.out.get()
if not self.stop:
self.add_more_tasks(tsk)
self.count -= 1
self.dirty = True
return tsk
2011-09-10 11:13:51 +02:00
def add_task(self, tsk):
"""
2016-06-25 14:49:27 +02:00
Enqueue a Task to :py:attr:`waflib.Runner.Parallel.ready` so that consumers can run them.
2011-09-10 11:13:51 +02:00
2016-06-25 14:49:27 +02:00
:param tsk: task instance
:type tsk: :py:attr:`waflib.Task.Task`
2011-09-10 11:13:51 +02:00
"""
self.ready.put(tsk)
def skip(self, tsk):
2016-06-25 14:49:27 +02:00
"""
Mark a task as skipped/up-to-date
"""
tsk.hasrun = Task.SKIPPED
def cancel(self, tsk):
"""
Mark a task as failed because of unsatisfiable dependencies
"""
tsk.hasrun = Task.CANCELED
2014-01-04 11:04:59 +01:00
def error_handler(self, tsk):
"""
Called when a task cannot be executed. The flag :py:attr:`waflib.Runner.Parallel.stop` is set, unless
the build is executed with::
$ waf build -k
2016-06-25 14:49:27 +02:00
:param tsk: task instance
:type tsk: :py:attr:`waflib.Task.Task`
2014-01-04 11:04:59 +01:00
"""
if hasattr(tsk, 'scan') and hasattr(tsk, 'uid'):
2016-06-25 14:49:27 +02:00
# TODO waf 2.0 - this breaks encapsulation
try:
del self.bld.imp_sigs[tsk.uid()]
except KeyError:
pass
2014-01-04 11:04:59 +01:00
if not self.bld.keep:
self.stop = True
self.error.append(tsk)
def task_status(self, tsk):
2016-06-25 14:49:27 +02:00
"""
Obtains the task status to decide whether to run it immediately or not.
:return: the exit status, for example :py:attr:`waflib.Task.ASK_LATER`
:rtype: integer
"""
try:
return tsk.runnable_status()
except Exception:
self.processed += 1
tsk.err_msg = Utils.ex_stack()
if not self.stop and self.bld.keep:
self.skip(tsk)
if self.bld.keep == 1:
# if -k stop on the first exception, if -kk try to go as far as possible
if Logs.verbose > 1 or not self.error:
self.error.append(tsk)
self.stop = True
else:
if Logs.verbose > 1:
self.error.append(tsk)
return Task.EXCEPTION
tsk.hasrun = Task.EXCEPTION
2014-01-04 11:04:59 +01:00
self.error_handler(tsk)
return Task.EXCEPTION
2011-09-10 11:13:51 +02:00
def start(self):
"""
2016-06-25 14:49:27 +02:00
Obtains Task instances from the BuildContext instance and adds the ones that need to be executed to
:py:class:`waflib.Runner.Parallel.ready` so that the :py:class:`waflib.Runner.Spawner` consumer thread
has them executed. Obtains the executed Tasks back from :py:class:`waflib.Runner.Parallel.out`
and marks the build as failed by setting the ``stop`` flag.
If only one job is used, then executes the tasks one by one, without consumers.
2011-09-10 11:13:51 +02:00
"""
self.total = self.bld.total()
while not self.stop:
self.refill_task_list()
# consider the next task
tsk = self.get_next_task()
if not tsk:
if self.count:
# tasks may add new ones after they are run
continue
else:
# no tasks to run, no tasks running, time to exit
break
if tsk.hasrun:
# if the task is marked as "run", just skip it
self.processed += 1
continue
if self.stop: # stop immediately after a failure is detected
2011-09-10 11:13:51 +02:00
break
st = self.task_status(tsk)
if st == Task.RUN_ME:
2011-09-10 11:13:51 +02:00
self.count += 1
self.processed += 1
if self.numjobs == 1:
2016-03-03 23:36:18 +01:00
tsk.log_display(tsk.generator.bld)
2016-02-28 11:30:18 +01:00
try:
tsk.process()
finally:
self.out.put(tsk)
2011-09-10 11:13:51 +02:00
else:
self.add_task(tsk)
elif st == Task.ASK_LATER:
self.postpone(tsk)
elif st == Task.SKIP_ME:
self.processed += 1
self.skip(tsk)
self.add_more_tasks(tsk)
elif st == Task.CANCEL_ME:
# A dependency problem has occured, and the
# build is most likely run with `waf -k`
if Logs.verbose > 1:
self.error.append(tsk)
self.processed += 1
self.cancel(tsk)
2011-09-10 11:13:51 +02:00
# self.count represents the tasks that have been made available to the consumer threads
# collect all the tasks after an error else the message may be incomplete
while self.error and self.count:
self.get_out()
2016-02-28 10:01:43 +01:00
self.ready.put(None)
2011-09-10 11:13:51 +02:00
assert (self.count == 0 or self.stop)