mirror of https://gitlab.com/ita1024/waf.git
New tool nobuild for profiling, and a unix-only version of prefork.py
This commit is contained in:
parent
760b34e264
commit
8d9e7fda30
|
@ -0,0 +1,24 @@
|
|||
#! /usr/bin/env python
|
||||
# encoding: utf-8
|
||||
# Thomas Nagy, 2015 (ita)
|
||||
|
||||
"""
|
||||
Override the build commands to write empty files.
|
||||
This is useful for profiling and evaluating the Python overhead.
|
||||
|
||||
To use::
|
||||
|
||||
def build(bld):
|
||||
...
|
||||
bld.load('nobuild')
|
||||
|
||||
"""
|
||||
|
||||
from waflib import Task
|
||||
def build(bld):
|
||||
def run(self):
|
||||
for x in self.outputs:
|
||||
x.write('')
|
||||
for (name, cls) in Task.classes.items():
|
||||
cls.run = run
|
||||
|
|
@ -13,8 +13,12 @@ On a benchmark executed on Linux Kubuntu 14, 8 virtual cores and SSD drive::
|
|||
|
||||
To use::
|
||||
|
||||
def options(opt):
|
||||
# optional, will spawn 40 servers early
|
||||
opt.load('prefork')
|
||||
|
||||
def build(bld):
|
||||
bld.load('serverprocess_client')
|
||||
bld.load('prefork')
|
||||
...
|
||||
more code
|
||||
|
||||
|
@ -168,8 +172,6 @@ else:
|
|||
|
||||
from waflib import Logs, Utils, Runner, Errors
|
||||
|
||||
SERVERS = []
|
||||
|
||||
def init_task_pool(self):
|
||||
# lazy creation, and set a common pool for all task consumers
|
||||
pool = self.pool = []
|
||||
|
@ -209,6 +211,7 @@ else:
|
|||
SERVERS = []
|
||||
CONNS = []
|
||||
def close_all():
|
||||
global SERVERS, CONNS
|
||||
while CONNS:
|
||||
conn = CONNS.pop()
|
||||
try:
|
||||
|
@ -309,31 +312,44 @@ else:
|
|||
|
||||
return ret
|
||||
|
||||
def init_key(ctx):
|
||||
try:
|
||||
key = ctx.SHARED_KEY = os.environ['SHARED_KEY']
|
||||
except KeyError:
|
||||
key = "".join([chr(random.SystemRandom().randint(40, 126)) for x in range(20)])
|
||||
os.environ['SHARED_KEY'] = ctx.SHARED_KEY = key
|
||||
return key
|
||||
|
||||
def build(bld):
|
||||
if bld.cmd == 'clean':
|
||||
return
|
||||
|
||||
key = "".join([chr(random.SystemRandom().randint(40, 126)) for x in range(20)])
|
||||
os.environ['SHARED_KEY'] = bld.SHARED_KEY = key
|
||||
|
||||
while len(SERVERS) < bld.jobs:
|
||||
def init_servers(ctx, maxval):
|
||||
while len(SERVERS) < maxval:
|
||||
i = len(SERVERS)
|
||||
srv = make_server(bld, i)
|
||||
srv = make_server(ctx, i)
|
||||
SERVERS.append(srv)
|
||||
while len(CONNS) < bld.jobs:
|
||||
while len(CONNS) < maxval:
|
||||
i = len(CONNS)
|
||||
srv = SERVERS[i]
|
||||
conn = None
|
||||
for x in range(30):
|
||||
try:
|
||||
conn = make_conn(bld, srv)
|
||||
conn = make_conn(ctx, srv)
|
||||
break
|
||||
except socket.error:
|
||||
time.sleep(0.01)
|
||||
if not conn:
|
||||
raise ValueError('Could not start the server!')
|
||||
CONNS.append(conn)
|
||||
|
||||
def options(opt):
|
||||
init_key(opt)
|
||||
init_servers(opt, 40)
|
||||
|
||||
def build(bld):
|
||||
if bld.cmd == 'clean':
|
||||
return
|
||||
|
||||
init_key(bld)
|
||||
init_servers(bld, bld.jobs)
|
||||
|
||||
bld.__class__.exec_command_old = bld.__class__.exec_command
|
||||
bld.__class__.exec_command = exec_command
|
||||
|
||||
|
|
|
@ -0,0 +1,277 @@
|
|||
#! /usr/bin/env python
|
||||
# encoding: utf-8
|
||||
# Thomas Nagy, 2015 (ita)
|
||||
|
||||
"""
|
||||
A version of prefork.py that uses unix sockets. The advantage is that it does not expose
|
||||
connections to the outside. Yet performance it only works on unix-like systems
|
||||
and performance can be slightly worse.
|
||||
|
||||
To use::
|
||||
|
||||
def options(opt):
|
||||
# recommended, fork new processes before using more memory
|
||||
opt.load('preforkunix')
|
||||
|
||||
def build(bld):
|
||||
bld.load('preforkunix')
|
||||
...
|
||||
more code
|
||||
"""
|
||||
|
||||
import os, re, socket, threading, sys, subprocess, time, atexit, traceback, random
|
||||
try:
|
||||
import SocketServer
|
||||
except ImportError:
|
||||
import socketserver as SocketServer
|
||||
try:
|
||||
from queue import Queue
|
||||
except ImportError:
|
||||
from Queue import Queue
|
||||
try:
|
||||
import cPickle
|
||||
except ImportError:
|
||||
import pickle as cPickle
|
||||
|
||||
HEADER_SIZE = 20
|
||||
|
||||
REQ = 'REQ'
|
||||
RES = 'RES'
|
||||
BYE = 'BYE'
|
||||
|
||||
def make_header(params, cookie=''):
|
||||
header = ','.join(params)
|
||||
header = header.ljust(HEADER_SIZE - len(cookie))
|
||||
assert(len(header) == HEADER_SIZE - len(cookie))
|
||||
header = header + cookie
|
||||
if sys.hexversion > 0x3000000:
|
||||
header = header.encode('iso8859-1')
|
||||
return header
|
||||
|
||||
def safe_compare(x, y):
|
||||
vec = [abs(ord(a) - ord(b)) for (a, b) in zip(x, y)]
|
||||
return not sum(vec)
|
||||
|
||||
re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
|
||||
if 1:
|
||||
def send_response(conn, ret, out, err, exc):
|
||||
if out or err or exc:
|
||||
data = (out, err, exc)
|
||||
data = cPickle.dumps(data, -1)
|
||||
else:
|
||||
data = ''
|
||||
|
||||
params = [RES, str(ret), str(len(data))]
|
||||
|
||||
# no need for the cookie in the response
|
||||
conn.send(make_header(params))
|
||||
if data:
|
||||
conn.send(data)
|
||||
|
||||
def process_command(conn):
|
||||
query = conn.recv(HEADER_SIZE)
|
||||
if not query:
|
||||
return
|
||||
#print(len(query))
|
||||
assert(len(query) == HEADER_SIZE)
|
||||
if sys.hexversion > 0x3000000:
|
||||
query = query.decode('iso8859-1')
|
||||
|
||||
#print "%r" % query
|
||||
if not re_valid_query.match(query):
|
||||
send_response(conn, -1, '', '', 'Invalid query %r' % query)
|
||||
raise ValueError('Invalid query %r' % query)
|
||||
|
||||
query = query.strip().split(',')
|
||||
|
||||
if query[0] == REQ:
|
||||
run_command(conn, query[1:])
|
||||
elif query[0] == BYE:
|
||||
raise ValueError('Exit')
|
||||
else:
|
||||
raise ValueError('Invalid query %r' % query)
|
||||
|
||||
def run_command(conn, query):
|
||||
|
||||
size = int(query[0])
|
||||
data = conn.recv(size)
|
||||
assert(len(data) == size)
|
||||
kw = cPickle.loads(data)
|
||||
|
||||
# run command
|
||||
ret = out = err = exc = None
|
||||
cmd = kw['cmd']
|
||||
del kw['cmd']
|
||||
#print(cmd)
|
||||
|
||||
try:
|
||||
if kw['stdout'] or kw['stderr']:
|
||||
p = subprocess.Popen(cmd, **kw)
|
||||
(out, err) = p.communicate()
|
||||
ret = p.returncode
|
||||
else:
|
||||
ret = subprocess.Popen(cmd, **kw).wait()
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
except Exception as e:
|
||||
ret = -1
|
||||
exc = str(e) + traceback.format_exc()
|
||||
|
||||
send_response(conn, ret, out, err, exc)
|
||||
|
||||
if 1:
|
||||
|
||||
from waflib import Logs, Utils, Runner, Errors
|
||||
|
||||
def init_task_pool(self):
|
||||
# lazy creation, and set a common pool for all task consumers
|
||||
pool = self.pool = []
|
||||
for i in range(self.numjobs):
|
||||
consumer = Runner.get_pool()
|
||||
pool.append(consumer)
|
||||
consumer.idx = i
|
||||
self.ready = Queue(0)
|
||||
def setq(consumer):
|
||||
consumer.ready = self.ready
|
||||
try:
|
||||
threading.current_thread().idx = consumer.idx
|
||||
except Exception as e:
|
||||
print(e)
|
||||
for x in pool:
|
||||
x.ready.put(setq)
|
||||
return pool
|
||||
Runner.Parallel.init_task_pool = init_task_pool
|
||||
|
||||
def make_conn(bld):
|
||||
child_socket, parent_socket = socket.socketpair(socket.AF_UNIX)
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
parent_socket.close()
|
||||
# write to child_socket only
|
||||
while 1:
|
||||
process_command(child_socket)
|
||||
else:
|
||||
child_socket.close()
|
||||
return (pid, parent_socket)
|
||||
|
||||
SERVERS = []
|
||||
CONNS = []
|
||||
def close_all():
|
||||
global SERVERS, CONS
|
||||
while CONNS:
|
||||
conn = CONNS.pop()
|
||||
try:
|
||||
conn.close()
|
||||
except:
|
||||
pass
|
||||
while SERVERS:
|
||||
pid = SERVERS.pop()
|
||||
try:
|
||||
os.kill(pid, 9)
|
||||
except:
|
||||
pass
|
||||
atexit.register(close_all)
|
||||
|
||||
def put_data(conn, data):
|
||||
conn.send(data)
|
||||
|
||||
def read_data(conn, siz):
|
||||
ret = conn.recv(siz)
|
||||
#if not ret:
|
||||
# print("closed connection?")
|
||||
assert(len(ret) == siz)
|
||||
return ret
|
||||
|
||||
def exec_command(self, cmd, **kw):
|
||||
|
||||
if 'stdout' in kw:
|
||||
if kw['stdout'] not in (None, subprocess.PIPE):
|
||||
return self.exec_command_old(cmd, **kw)
|
||||
elif 'stderr' in kw:
|
||||
if kw['stderr'] not in (None, subprocess.PIPE):
|
||||
return self.exec_command_old(cmd, **kw)
|
||||
|
||||
kw['shell'] = isinstance(cmd, str)
|
||||
Logs.debug('runner: %r' % cmd)
|
||||
Logs.debug('runner_env: kw=%s' % kw)
|
||||
|
||||
if self.logger:
|
||||
self.logger.info(cmd)
|
||||
|
||||
if 'stdout' not in kw:
|
||||
kw['stdout'] = subprocess.PIPE
|
||||
if 'stderr' not in kw:
|
||||
kw['stderr'] = subprocess.PIPE
|
||||
|
||||
if Logs.verbose and not kw['shell'] and not Utils.check_exe(cmd[0]):
|
||||
raise Errors.WafError("Program %s not found!" % cmd[0])
|
||||
|
||||
idx = threading.current_thread().idx
|
||||
kw['cmd'] = cmd
|
||||
|
||||
# serialization..
|
||||
#print("sub %r %r" % (idx, cmd))
|
||||
#print("write to %r %r" % (idx, cmd))
|
||||
|
||||
data = cPickle.dumps(kw, -1)
|
||||
params = [REQ, str(len(data))]
|
||||
header = make_header(params)
|
||||
|
||||
conn = CONNS[idx]
|
||||
|
||||
put_data(conn, header)
|
||||
put_data(conn, data)
|
||||
|
||||
#print("running %r %r" % (idx, cmd))
|
||||
#print("read from %r %r" % (idx, cmd))
|
||||
|
||||
data = read_data(conn, HEADER_SIZE)
|
||||
if sys.hexversion > 0x3000000:
|
||||
data = data.decode('iso8859-1')
|
||||
|
||||
#print("received %r" % data)
|
||||
lst = data.split(',')
|
||||
ret = int(lst[1])
|
||||
dlen = int(lst[2])
|
||||
|
||||
out = err = None
|
||||
if dlen:
|
||||
data = read_data(conn, dlen)
|
||||
(out, err, exc) = cPickle.loads(data)
|
||||
if exc:
|
||||
raise Errors.WafError('Execution failure: %s' % exc)
|
||||
|
||||
if out:
|
||||
if not isinstance(out, str):
|
||||
out = out.decode(sys.stdout.encoding or 'iso8859-1')
|
||||
if self.logger:
|
||||
self.logger.debug('out: %s' % out)
|
||||
else:
|
||||
Logs.info(out, extra={'stream':sys.stdout, 'c1': ''})
|
||||
if err:
|
||||
if not isinstance(err, str):
|
||||
err = err.decode(sys.stdout.encoding or 'iso8859-1')
|
||||
if self.logger:
|
||||
self.logger.error('err: %s' % err)
|
||||
else:
|
||||
Logs.info(err, extra={'stream':sys.stderr, 'c1': ''})
|
||||
|
||||
return ret
|
||||
|
||||
def options(opt):
|
||||
# memory consumption might be at the lowest point while processing options
|
||||
while len(CONNS) < 30:
|
||||
(pid, conn) = make_conn(opt)
|
||||
SERVERS.append(pid)
|
||||
CONNS.append(conn)
|
||||
|
||||
def build(bld):
|
||||
if bld.cmd == 'clean':
|
||||
return
|
||||
while len(CONNS) < bld.jobs:
|
||||
(pid, conn) = make_conn(bld)
|
||||
SERVERS.append(pid)
|
||||
CONNS.append(conn)
|
||||
bld.__class__.exec_command_old = bld.__class__.exec_command
|
||||
bld.__class__.exec_command = exec_command
|
||||
|
Loading…
Reference in New Issue