Python is getting a bit better at threading, we do not need to recycle threads anymore

This commit is contained in:
Thomas Nagy 2012-04-09 23:17:31 +02:00
parent 062c5cae1a
commit 5448e23e40
1 changed files with 9 additions and 59 deletions

View File

@ -7,7 +7,7 @@ Runner.py: Task scheduling and execution
"""
import random, atexit
import random
try:
from queue import Queue
except ImportError:
@ -25,9 +25,9 @@ class TaskConsumer(Utils.threading.Thread):
They wait for tasks in the queue and then use ``task.process(...)``
"""
def __init__(self):
def __init__(self, q=None):
Utils.threading.Thread.__init__(self)
self.ready = Queue()
self.ready = q or Queue()
"""
Obtain :py:class:`waflib.Task.TaskBase` instances from this queue.
"""
@ -54,46 +54,6 @@ class TaskConsumer(Utils.threading.Thread):
tsk(self)
else:
tsk.process()
pool = Queue()
"""
Pool of task consumer objects
"""
def get_pool():
"""
Obtain a task consumer from :py:attr:`waflib.Runner.pool`.
Do not forget to put it back by using :py:func:`waflib.Runner.put_pool`
and reset properly (original waiting queue).
:rtype: :py:class:`waflib.Runner.TaskConsumer`
"""
try:
return pool.get(False)
except Exception:
return TaskConsumer()
def put_pool(x):
"""
Return a task consumer to the thread pool :py:attr:`waflib.Runner.pool`
:param x: task consumer object
:type x: :py:class:`waflib.Runner.TaskConsumer`
"""
pool.put(x)
def _free_resources():
global pool
lst = []
while pool.qsize():
lst.append(pool.get())
for x in lst:
x.ready.put(None)
for x in lst:
x.join()
pool = None
atexit.register(_free_resources)
class Parallel(object):
"""
Schedule the tasks obtained from the build context for execution.
@ -253,30 +213,21 @@ class Parallel(object):
def init_task_pool(self):
# lazy creation, and set a common pool for all task consumers
pool = self.pool = [get_pool() for i in range(self.numjobs)]
self.ready = Queue(0)
def setq(consumer):
consumer.ready = self.ready
for x in pool:
x.ready.put(setq)
pool = self.pool = [TaskConsumer(self.ready) for i in range(self.numjobs)]
return pool
def free_task_pool(self):
# return the consumers, setting a different queue for each of them
def setq(consumer):
consumer.ready = Queue(0)
self.out.put(self)
# free the task pool, if any
try:
pool = self.pool
except AttributeError:
pass
else:
for x in pool:
self.ready.put(setq)
for x in pool:
self.get_out()
for x in pool:
put_pool(x)
for x in self.pool:
self.ready.put(None)
for x in self.pool:
x.join()
self.pool = []
def start(self):
@ -356,6 +307,5 @@ class Parallel(object):
#print loop
assert (self.count == 0 or self.stop)
# free the task pool, if any
self.free_task_pool()