2
0
mirror of https://gitlab.com/ita1024/waf.git synced 2024-11-22 01:46:15 +01:00

Add asynchronous wafcache uploads

This commit is contained in:
Thomas Nagy 2022-12-31 21:37:12 +01:00
parent 412c9e6b22
commit 99bdb12a60

View File

@ -39,7 +39,14 @@ File cache specific options:
* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M) * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB) * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
and trim the cache (3 minutess) and trim the cache (3 minutes)
Upload specific options:
* WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously
this may improve build performance with many/long file uploads
the default is unset (synchronous uploads)
* WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False)
this requires asynchonous uploads to have an effect
Usage:: Usage::
@ -49,10 +56,10 @@ Usage::
To troubleshoot:: To troubleshoot::
waf clean build --zones=wafcache waf clean build --zone=wafcache
""" """
import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, traceback, urllib3, shlex import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, threading, traceback, urllib3, shlex
try: try:
import subprocess32 as subprocess import subprocess32 as subprocess
except ImportError: except ImportError:
@ -71,6 +78,8 @@ EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0 WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0
WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS')
WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT')
OK = "ok" OK = "ok"
re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})') re_waf_cmd = re.compile('(?P<src>%{SRC})|(?P<tgt>%{TGT})')
@ -99,7 +108,9 @@ def can_retrieve_cache(self):
self.generator.bld.cache_reqs += 1 self.generator.bld.cache_reqs += 1
files_to = [node.abspath() for node in self.outputs] files_to = [node.abspath() for node in self.outputs]
err = cache_command(ssig, [], files_to) proc = get_process()
err = cache_command(proc, ssig, [], files_to)
process_pool.append(proc)
if err.startswith(OK): if err.startswith(OK):
if WAFCACHE_VERBOSITY: if WAFCACHE_VERBOSITY:
Logs.pprint('CYAN', ' Fetched %r from cache' % files_to) Logs.pprint('CYAN', ' Fetched %r from cache' % files_to)
@ -143,26 +154,40 @@ def put_files_cache(self):
delattr(self, 'cache_sig') delattr(self, 'cache_sig')
sig = self.signature() sig = self.signature()
if old_sig == sig: def _async_put_files_cache(bld, ssig, files_from):
ssig = Utils.to_hex(self.uid() + sig) proc = get_process()
err = cache_command(ssig, files_from, []) if WAFCACHE_ASYNC_WORKERS:
with bld.wafcache_lock:
if bld.wafcache_stop:
process_pool.append(proc)
return
bld.wafcache_procs.add(proc)
err = cache_command(proc, ssig, files_from, [])
process_pool.append(proc)
if err.startswith(OK): if err.startswith(OK):
if WAFCACHE_VERBOSITY: if WAFCACHE_VERBOSITY:
Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from) Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from)
else: else:
Logs.debug('wafcache: Successfully uploaded %r to cache', files_from) Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
if WAFCACHE_STATS: if WAFCACHE_STATS:
self.generator.bld.cache_puts += 1 bld.cache_puts += 1
else: else:
if WAFCACHE_VERBOSITY: if WAFCACHE_VERBOSITY:
Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err)) Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err))
else: else:
Logs.debug('wafcache: Error caching results %s: %s', files_from, err) Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
if old_sig == sig:
ssig = Utils.to_hex(self.uid() + sig)
if WAFCACHE_ASYNC_WORKERS:
fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from)
bld.wafcache_uploads.append(fut)
else:
_async_put_files_cache(bld, ssig, files_from)
else: else:
Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs) Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs)
bld.task_sigs[self.uid()] = self.cache_sig bld.task_sigs[self.uid()] = self.cache_sig
def hash_env_vars(self, env, vars_lst): def hash_env_vars(self, env, vars_lst):
@ -258,19 +283,45 @@ def get_process():
return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0) return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0)
def atexit_pool(): def atexit_pool():
for k in process_pool: for proc in process_pool:
try: proc.kill()
os.kill(k.pid, 9)
except OSError:
pass
else:
k.wait()
atexit.register(atexit_pool) atexit.register(atexit_pool)
def build(bld): def build(bld):
""" """
Called during the build process to enable file caching Called during the build process to enable file caching
""" """
if WAFCACHE_ASYNC_WORKERS:
try:
num_workers = int(WAFCACHE_ASYNC_WORKERS)
except ValueError:
Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS)
else:
from concurrent.futures import ThreadPoolExecutor
bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers)
bld.wafcache_uploads = []
bld.wafcache_procs = set([])
bld.wafcache_stop = False
bld.wafcache_lock = threading.Lock()
def finalize_upload_async(bld):
if WAFCACHE_ASYNC_NOWAIT:
with bld.wafcache_lock:
bld.wafcache_stop = True
for fut in reversed(bld.wafcache_uploads):
fut.cancel()
for proc in bld.wafcache_procs:
proc.kill()
bld.wafcache_procs.clear()
else:
Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads))
bld.wafcache_executor.shutdown(wait=True)
bld.add_post_fun(finalize_upload_async)
if WAFCACHE_STATS: if WAFCACHE_STATS:
# Init counter for statistics and hook to print results at the end # Init counter for statistics and hook to print results at the end
bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0 bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0
@ -279,9 +330,8 @@ def build(bld):
hit_ratio = 0 hit_ratio = 0
if bld.cache_reqs > 0: if bld.cache_reqs > 0:
hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100 hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100
Logs.pprint('CYAN', ' wafcache stats: requests: %s, hits, %s, ratio: %.2f%%, writes %s' % Logs.pprint('CYAN', ' wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' %
(bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) ) (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) )
bld.add_post_fun(printstats) bld.add_post_fun(printstats)
if process_pool: if process_pool:
@ -299,15 +349,13 @@ def build(bld):
for x in reversed(list(Task.classes.values())): for x in reversed(list(Task.classes.values())):
make_cached(x) make_cached(x)
def cache_command(sig, files_from, files_to): def cache_command(proc, sig, files_from, files_to):
""" """
Create a command for cache worker processes, returns a pickled Create a command for cache worker processes, returns a pickled
base64-encoded tuple containing the task signature, a list of files to base64-encoded tuple containing the task signature, a list of files to
cache and a list of files files to get from cache (one of the lists cache and a list of files files to get from cache (one of the lists
is assumed to be empty) is assumed to be empty)
""" """
proc = get_process()
obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to])) obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to]))
proc.stdin.write(obj) proc.stdin.write(obj)
proc.stdin.write('\n'.encode()) proc.stdin.write('\n'.encode())
@ -315,7 +363,6 @@ def cache_command(sig, files_from, files_to):
obj = proc.stdout.readline() obj = proc.stdout.readline()
if not obj: if not obj:
raise OSError('Preforked sub-process %r died' % proc.pid) raise OSError('Preforked sub-process %r died' % proc.pid)
process_pool.append(proc)
return cPickle.loads(base64.b64decode(obj)) return cPickle.loads(base64.b64decode(obj))
try: try:
@ -469,7 +516,10 @@ class netcache(object):
class fcache(object): class fcache(object):
def __init__(self): def __init__(self):
if not os.path.exists(CACHE_DIR): if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR) try:
os.makedirs(CACHE_DIR)
except OSError:
pass
if not os.path.exists(CACHE_DIR): if not os.path.exists(CACHE_DIR):
raise ValueError('Could not initialize the cache directory') raise ValueError('Could not initialize the cache directory')