From b8f19bfea32a631ee4c07a84e7cf88204198148e Mon Sep 17 00:00:00 2001 From: Thomas Nagy Date: Sun, 27 Oct 2019 10:30:09 +0100 Subject: [PATCH] Improve cache activity verbosity --- waflib/Tools/wafcache.py | 143 ++++++++++++++++++++++++++++----------- 1 file changed, 105 insertions(+), 38 deletions(-) diff --git a/waflib/Tools/wafcache.py b/waflib/Tools/wafcache.py index 58ef270b..8b9567fa 100644 --- a/waflib/Tools/wafcache.py +++ b/waflib/Tools/wafcache.py @@ -5,45 +5,59 @@ """ Filesystem-based cache system to share and re-use build artifacts +Cache access operations (copy to and from) are delegated to +independent pre-forked worker subprocesses. + The following environment variables may be set: -* WAFCACHE: absolute path of the waf cache (/tmp/wafcache_user, - where `user` represents the currently logged-in user) - or a URL to a cache server, for example: - export WAFCACHE=http://localhost:8080/files/ - in that case, GET/POST requests are made to urls of the form - http://localhost:8080/files/000000000/0 (cache - management is then up to the server) +* WAFCACHE: several possibilities: + - File cache: + absolute path of the waf cache (~/.cache/wafcache_user, + where `user` represents the currently logged-in user) + - URL to a cache server, for example: + export WAFCACHE=http://localhost:8080/files/ + in that case, GET/POST requests are made to urls of the form + http://localhost:8080/files/000000000/0 (cache management is then up to the server) + - GCS or S3 bucket + gs://my-bucket/ + s3://my-bucket/ +* WAFCACHE_NO_PUSH: if set, disables pushing to the cache +* WAFCACHE_VERBOSITY: if set, displays more detailed cache operations + +File cache specific options: + Files are copied using hard links by default; if the cache is located + onto another partition, the system switches to file copies instead. * WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M) * WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB) * WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try and trim the cache (3 minutess) -* WAFCACHE_NO_PUSH: if set, disables pushing to the cache - -Cache access operations (copy to and from) are delegated to pre-forked -subprocesses. Though these processes perform atomic copies, they -are unaware of other processes running on the system - -The files are copied using hard links by default; if the cache is located -onto another partition, the system switches to file copies instead. - Usage:: def build(bld): bld.load('wafcache') ... + +To troubleshoot:: + + waf clean build --zones=wafcache """ -import atexit, base64, errno, fcntl, getpass, os, shutil, sys, threading, time, urllib3 +import atexit, base64, errno, fcntl, getpass, os, shutil, sys, time, traceback, urllib3 try: import subprocess32 as subprocess except ImportError: import subprocess -CACHE_DIR = os.environ.get('WAFCACHE', '/tmp/wafcache_' + getpass.getuser()) +base_cache = os.path.expanduser('~/.cache/') +if not os.path.isdir(base_cache): + base_cache = '/tmp/' +default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser()) + +CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir) TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000)) EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3)) EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10)) WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0 +WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0 OK = "ok" try: @@ -68,9 +82,16 @@ def can_retrieve_cache(self): files_to = [node.abspath() for node in self.outputs] err = cache_command(ssig, [], files_to) - if not err.startswith(OK): - if Logs.verbose: - Logs.debug('wafcache: error getting from cache %s', err) + if err.startswith(OK): + if WAFCACHE_VERBOSITY: + Logs.pprint('CYAN', ' Fetched %r from cache' % files_to) + else: + Logs.debug('wafcache: fetched %r from cache', files_to) + else: + if WAFCACHE_VERBOSITY: + Logs.pprint('YELLOW', ' No cache entry %s' % files_to) + else: + Logs.debug('wafcache: No cache entry %s: %s', files_to, err) return False self.cached = True @@ -90,9 +111,16 @@ def put_files_cache(self): files_from = [node.abspath() for node in self.outputs] err = cache_command(ssig, files_from, []) - if not err.startswith(OK): - if Logs.verbose: - Logs.debug('wafcache: error caching %s', err) + if err.startswith(OK): + if WAFCACHE_VERBOSITY: + Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from) + else: + Logs.debug('wafcache: Successfully uploaded %r to cache', files_from) + else: + if WAFCACHE_VERBOSITY: + Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err)) + else: + Logs.debug('wafcache: Error caching results %s: %s', files_from, err) bld.task_sigs[self.uid()] = self.cache_sig @@ -212,7 +240,7 @@ def build(bld): Task.Task.put_files_cache = put_files_cache Task.Task.uid = uid Build.BuildContext.hash_env_vars = hash_env_vars - for x in Task.classes.values(): + for x in reversed(list(Task.classes.values())): make_cached(x) def cache_command(sig, files_from, files_to): @@ -316,7 +344,7 @@ def lru_evict(): except EnvironmentError as e: if e.errno == errno.ENOENT: with open(lockfile, 'w') as f: - f.write(''.encode()) + f.write('') return else: raise @@ -367,18 +395,25 @@ class netcache(object): for i, x in enumerate(files_from): if not os.path.islink(x): self.upload(x, sig, i) - except EnvironmentError: - pass + except Exception: + return traceback.format_exc() + return OK def copy_from_cache(self, sig, files_from, files_to): try: for i, x in enumerate(files_to): self.download(x, sig, i) - return OK - except EnvironmentError as e: - return "Failed to download %r to %r: %s" % (files_from, files_to, e) + except Exception: + return traceback.format_exc() + return OK class fcache(object): + def __init__(self): + if not os.path.exists(CACHE_DIR): + os.makedirs(CACHE_DIR) + if not os.path.exists(CACHE_DIR): + raise ValueError('Could not initialize the cache directory') + def copy_to_cache(self, sig, files_from, files_to): """ Copy files to the cache, existing files are overwritten, @@ -389,13 +424,13 @@ class fcache(object): for i, x in enumerate(files_from): dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) atomic_copy(x, dest) - except EnvironmentError: - # no errors should be raised - pass + except Exception: + return traceback.format_exc() else: # attempt trimming if caching was successful: # we may have things to trim! lru_evict() + return OK def copy_from_cache(self, sig, files_from, files_to): """ @@ -408,8 +443,38 @@ class fcache(object): # success! update the cache time os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None) - except EnvironmentError as e: - return "Failed to copy %r to %r: %s" % (files_from, files_to, e) + except Exception: + return traceback.format_exc() + return OK + +class bucket_cache(object): + def bucket_copy(self, source, target): + if CACHE_DIR.startswith('s3://'): + cmd = ['aws', 's3', 'cp', source, target] + else: + cmd = ['gsutil', 'cp', source, target] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = proc.communicate() + if proc.returncode: + raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % ( + source, target, cmd, proc.returncode, out.decode(), err.decode())) + + def copy_to_cache(self, sig, files_from, files_to): + try: + for i, x in enumerate(files_from): + dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) + self.bucket_copy(x, dest) + except Exception: + return traceback.format_exc() + return OK + + def copy_from_cache(self, sig, files_from, files_to): + try: + for i, x in enumerate(files_to): + orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i)) + self.bucket_copy(orig, x) + except EnvironmentError: + return traceback.format_exc() return OK def loop(service): @@ -432,7 +497,7 @@ def loop(service): [sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt)) if files_from: # TODO return early when pushing files upstream - ret = service.copy_from_cache(sig, files_from, files_to) + ret = service.copy_to_cache(sig, files_from, files_to) elif files_to: # the build process waits for workers to (possibly) obtain files from the cache ret = service.copy_from_cache(sig, files_from, files_to) @@ -445,7 +510,9 @@ def loop(service): sys.stdout.flush() if __name__ == '__main__': - if CACHE_DIR.startswith('http'): + if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://'): + service = bucket_cache() + elif CACHE_DIR.startswith('http'): service = netcache() else: service = fcache()