From 99bdb12a60d5b95ef3ffec99c304c045717fc75c Mon Sep 17 00:00:00 2001 From: Thomas Nagy Date: Sat, 31 Dec 2022 21:37:12 +0100 Subject: [PATCH] Add asynchronous wafcache uploads --- waflib/Tools/wafcache.py | 96 ++++++++++++++++++++++++++++++---------- 1 file changed, 73 insertions(+), 23 deletions(-) diff --git a/waflib/Tools/wafcache.py b/waflib/Tools/wafcache.py index 05ae8fbf..30ac3ef5 100644 --- a/waflib/Tools/wafcache.py +++ b/waflib/Tools/wafcache.py @@ -39,7 +39,14 @@ File cache specific options: * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M) * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB) * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try - and trim the cache (3 minutess) + and trim the cache (3 minutes) + +Upload specific options: +* WAFCACHE_ASYNC_WORKERS: define a number of workers to upload results asynchronously + this may improve build performance with many/long file uploads + the default is unset (synchronous uploads) +* WAFCACHE_ASYNC_NOWAIT: do not wait for uploads to complete (default: False) + this requires asynchonous uploads to have an effect Usage:: @@ -49,10 +56,10 @@ Usage:: To troubleshoot:: - waf clean build --zones=wafcache + waf clean build --zone=wafcache """ -import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, traceback, urllib3, shlex +import atexit, base64, errno, fcntl, getpass, os, re, shutil, sys, time, threading, traceback, urllib3, shlex try: import subprocess32 as subprocess except ImportError: @@ -71,6 +78,8 @@ EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10)) WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0 WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0 WAFCACHE_STATS = 1 if os.environ.get('WAFCACHE_STATS') else 0 +WAFCACHE_ASYNC_WORKERS = os.environ.get('WAFCACHE_ASYNC_WORKERS') +WAFCACHE_ASYNC_NOWAIT = os.environ.get('WAFCACHE_ASYNC_NOWAIT') OK = "ok" re_waf_cmd = re.compile('(?P%{SRC})|(?P%{TGT})') @@ -99,7 +108,9 @@ def can_retrieve_cache(self): self.generator.bld.cache_reqs += 1 files_to = [node.abspath() for node in self.outputs] - err = cache_command(ssig, [], files_to) + proc = get_process() + err = cache_command(proc, ssig, [], files_to) + process_pool.append(proc) if err.startswith(OK): if WAFCACHE_VERBOSITY: Logs.pprint('CYAN', ' Fetched %r from cache' % files_to) @@ -143,26 +154,40 @@ def put_files_cache(self): delattr(self, 'cache_sig') sig = self.signature() - if old_sig == sig: - ssig = Utils.to_hex(self.uid() + sig) - err = cache_command(ssig, files_from, []) + def _async_put_files_cache(bld, ssig, files_from): + proc = get_process() + if WAFCACHE_ASYNC_WORKERS: + with bld.wafcache_lock: + if bld.wafcache_stop: + process_pool.append(proc) + return + bld.wafcache_procs.add(proc) + err = cache_command(proc, ssig, files_from, []) + process_pool.append(proc) if err.startswith(OK): if WAFCACHE_VERBOSITY: Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from) else: Logs.debug('wafcache: Successfully uploaded %r to cache', files_from) if WAFCACHE_STATS: - self.generator.bld.cache_puts += 1 + bld.cache_puts += 1 else: if WAFCACHE_VERBOSITY: Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err)) else: Logs.debug('wafcache: Error caching results %s: %s', files_from, err) + + if old_sig == sig: + ssig = Utils.to_hex(self.uid() + sig) + if WAFCACHE_ASYNC_WORKERS: + fut = bld.wafcache_executor.submit(_async_put_files_cache, bld, ssig, files_from) + bld.wafcache_uploads.append(fut) + else: + _async_put_files_cache(bld, ssig, files_from) else: Logs.debug('wafcache: skipped %r upload due to late input modifications %r', self.outputs, self.inputs) - bld.task_sigs[self.uid()] = self.cache_sig def hash_env_vars(self, env, vars_lst): @@ -258,19 +283,45 @@ def get_process(): return subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, bufsize=0) def atexit_pool(): - for k in process_pool: - try: - os.kill(k.pid, 9) - except OSError: - pass - else: - k.wait() + for proc in process_pool: + proc.kill() atexit.register(atexit_pool) def build(bld): """ Called during the build process to enable file caching """ + + if WAFCACHE_ASYNC_WORKERS: + try: + num_workers = int(WAFCACHE_ASYNC_WORKERS) + except ValueError: + Logs.warn('Invalid WAFCACHE_ASYNC_WORKERS specified: %r' % WAFCACHE_ASYNC_WORKERS) + else: + from concurrent.futures import ThreadPoolExecutor + bld.wafcache_executor = ThreadPoolExecutor(max_workers=num_workers) + bld.wafcache_uploads = [] + bld.wafcache_procs = set([]) + bld.wafcache_stop = False + bld.wafcache_lock = threading.Lock() + + def finalize_upload_async(bld): + if WAFCACHE_ASYNC_NOWAIT: + with bld.wafcache_lock: + bld.wafcache_stop = True + + for fut in reversed(bld.wafcache_uploads): + fut.cancel() + + for proc in bld.wafcache_procs: + proc.kill() + + bld.wafcache_procs.clear() + else: + Logs.pprint('CYAN', '... waiting for wafcache uploads to complete (%s uploads)' % len(bld.wafcache_uploads)) + bld.wafcache_executor.shutdown(wait=True) + bld.add_post_fun(finalize_upload_async) + if WAFCACHE_STATS: # Init counter for statistics and hook to print results at the end bld.cache_reqs = bld.cache_hits = bld.cache_puts = 0 @@ -279,9 +330,8 @@ def build(bld): hit_ratio = 0 if bld.cache_reqs > 0: hit_ratio = (bld.cache_hits / bld.cache_reqs) * 100 - Logs.pprint('CYAN', ' wafcache stats: requests: %s, hits, %s, ratio: %.2f%%, writes %s' % + Logs.pprint('CYAN', ' wafcache stats: %s requests, %s hits (ratio: %.2f%%), %s writes' % (bld.cache_reqs, bld.cache_hits, hit_ratio, bld.cache_puts) ) - bld.add_post_fun(printstats) if process_pool: @@ -299,15 +349,13 @@ def build(bld): for x in reversed(list(Task.classes.values())): make_cached(x) -def cache_command(sig, files_from, files_to): +def cache_command(proc, sig, files_from, files_to): """ Create a command for cache worker processes, returns a pickled base64-encoded tuple containing the task signature, a list of files to cache and a list of files files to get from cache (one of the lists is assumed to be empty) """ - proc = get_process() - obj = base64.b64encode(cPickle.dumps([sig, files_from, files_to])) proc.stdin.write(obj) proc.stdin.write('\n'.encode()) @@ -315,7 +363,6 @@ def cache_command(sig, files_from, files_to): obj = proc.stdout.readline() if not obj: raise OSError('Preforked sub-process %r died' % proc.pid) - process_pool.append(proc) return cPickle.loads(base64.b64decode(obj)) try: @@ -469,7 +516,10 @@ class netcache(object): class fcache(object): def __init__(self): if not os.path.exists(CACHE_DIR): - os.makedirs(CACHE_DIR) + try: + os.makedirs(CACHE_DIR) + except OSError: + pass if not os.path.exists(CACHE_DIR): raise ValueError('Could not initialize the cache directory')