Improve cache activity verbosity

This commit is contained in:
Thomas Nagy 2019-10-27 10:30:09 +01:00
parent a6ff2fc274
commit b8f19bfea3
1 changed files with 105 additions and 38 deletions

View File

@ -5,45 +5,59 @@
"""
Filesystem-based cache system to share and re-use build artifacts
Cache access operations (copy to and from) are delegated to
independent pre-forked worker subprocesses.
The following environment variables may be set:
* WAFCACHE: absolute path of the waf cache (/tmp/wafcache_user,
where `user` represents the currently logged-in user)
or a URL to a cache server, for example:
export WAFCACHE=http://localhost:8080/files/
in that case, GET/POST requests are made to urls of the form
http://localhost:8080/files/000000000/0 (cache
management is then up to the server)
* WAFCACHE: several possibilities:
- File cache:
absolute path of the waf cache (~/.cache/wafcache_user,
where `user` represents the currently logged-in user)
- URL to a cache server, for example:
export WAFCACHE=http://localhost:8080/files/
in that case, GET/POST requests are made to urls of the form
http://localhost:8080/files/000000000/0 (cache management is then up to the server)
- GCS or S3 bucket
gs://my-bucket/
s3://my-bucket/
* WAFCACHE_NO_PUSH: if set, disables pushing to the cache
* WAFCACHE_VERBOSITY: if set, displays more detailed cache operations
File cache specific options:
Files are copied using hard links by default; if the cache is located
onto another partition, the system switches to file copies instead.
* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
and trim the cache (3 minutess)
* WAFCACHE_NO_PUSH: if set, disables pushing to the cache
Cache access operations (copy to and from) are delegated to pre-forked
subprocesses. Though these processes perform atomic copies, they
are unaware of other processes running on the system
The files are copied using hard links by default; if the cache is located
onto another partition, the system switches to file copies instead.
Usage::
def build(bld):
bld.load('wafcache')
...
To troubleshoot::
waf clean build --zones=wafcache
"""
import atexit, base64, errno, fcntl, getpass, os, shutil, sys, threading, time, urllib3
import atexit, base64, errno, fcntl, getpass, os, shutil, sys, time, traceback, urllib3
try:
import subprocess32 as subprocess
except ImportError:
import subprocess
CACHE_DIR = os.environ.get('WAFCACHE', '/tmp/wafcache_' + getpass.getuser())
base_cache = os.path.expanduser('~/.cache/')
if not os.path.isdir(base_cache):
base_cache = '/tmp/'
default_wafcache_dir = os.path.join(base_cache, 'wafcache_' + getpass.getuser())
CACHE_DIR = os.environ.get('WAFCACHE', default_wafcache_dir)
TRIM_MAX_FOLDERS = int(os.environ.get('WAFCACHE_TRIM_MAX_FOLDER', 1000000))
EVICT_INTERVAL_MINUTES = int(os.environ.get('WAFCACHE_EVICT_INTERVAL_MINUTES', 3))
EVICT_MAX_BYTES = int(os.environ.get('WAFCACHE_EVICT_MAX_BYTES', 10**10))
WAFCACHE_NO_PUSH = 1 if os.environ.get('WAFCACHE_NO_PUSH') else 0
WAFCACHE_VERBOSITY = 1 if os.environ.get('WAFCACHE_VERBOSITY') else 0
OK = "ok"
try:
@ -68,9 +82,16 @@ def can_retrieve_cache(self):
files_to = [node.abspath() for node in self.outputs]
err = cache_command(ssig, [], files_to)
if not err.startswith(OK):
if Logs.verbose:
Logs.debug('wafcache: error getting from cache %s', err)
if err.startswith(OK):
if WAFCACHE_VERBOSITY:
Logs.pprint('CYAN', ' Fetched %r from cache' % files_to)
else:
Logs.debug('wafcache: fetched %r from cache', files_to)
else:
if WAFCACHE_VERBOSITY:
Logs.pprint('YELLOW', ' No cache entry %s' % files_to)
else:
Logs.debug('wafcache: No cache entry %s: %s', files_to, err)
return False
self.cached = True
@ -90,9 +111,16 @@ def put_files_cache(self):
files_from = [node.abspath() for node in self.outputs]
err = cache_command(ssig, files_from, [])
if not err.startswith(OK):
if Logs.verbose:
Logs.debug('wafcache: error caching %s', err)
if err.startswith(OK):
if WAFCACHE_VERBOSITY:
Logs.pprint('CYAN', ' Successfully uploaded %s to cache' % files_from)
else:
Logs.debug('wafcache: Successfully uploaded %r to cache', files_from)
else:
if WAFCACHE_VERBOSITY:
Logs.pprint('RED', ' Error caching step results %s: %s' % (files_from, err))
else:
Logs.debug('wafcache: Error caching results %s: %s', files_from, err)
bld.task_sigs[self.uid()] = self.cache_sig
@ -212,7 +240,7 @@ def build(bld):
Task.Task.put_files_cache = put_files_cache
Task.Task.uid = uid
Build.BuildContext.hash_env_vars = hash_env_vars
for x in Task.classes.values():
for x in reversed(list(Task.classes.values())):
make_cached(x)
def cache_command(sig, files_from, files_to):
@ -316,7 +344,7 @@ def lru_evict():
except EnvironmentError as e:
if e.errno == errno.ENOENT:
with open(lockfile, 'w') as f:
f.write(''.encode())
f.write('')
return
else:
raise
@ -367,18 +395,25 @@ class netcache(object):
for i, x in enumerate(files_from):
if not os.path.islink(x):
self.upload(x, sig, i)
except EnvironmentError:
pass
except Exception:
return traceback.format_exc()
return OK
def copy_from_cache(self, sig, files_from, files_to):
try:
for i, x in enumerate(files_to):
self.download(x, sig, i)
return OK
except EnvironmentError as e:
return "Failed to download %r to %r: %s" % (files_from, files_to, e)
except Exception:
return traceback.format_exc()
return OK
class fcache(object):
def __init__(self):
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
if not os.path.exists(CACHE_DIR):
raise ValueError('Could not initialize the cache directory')
def copy_to_cache(self, sig, files_from, files_to):
"""
Copy files to the cache, existing files are overwritten,
@ -389,13 +424,13 @@ class fcache(object):
for i, x in enumerate(files_from):
dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
atomic_copy(x, dest)
except EnvironmentError:
# no errors should be raised
pass
except Exception:
return traceback.format_exc()
else:
# attempt trimming if caching was successful:
# we may have things to trim!
lru_evict()
return OK
def copy_from_cache(self, sig, files_from, files_to):
"""
@ -408,8 +443,38 @@ class fcache(object):
# success! update the cache time
os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
except EnvironmentError as e:
return "Failed to copy %r to %r: %s" % (files_from, files_to, e)
except Exception:
return traceback.format_exc()
return OK
class bucket_cache(object):
def bucket_copy(self, source, target):
if CACHE_DIR.startswith('s3://'):
cmd = ['aws', 's3', 'cp', source, target]
else:
cmd = ['gsutil', 'cp', source, target]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
if proc.returncode:
raise OSError('Error copy %r to %r using: %r (exit %r):\n out:%s\n err:%s' % (
source, target, cmd, proc.returncode, out.decode(), err.decode()))
def copy_to_cache(self, sig, files_from, files_to):
try:
for i, x in enumerate(files_from):
dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
self.bucket_copy(x, dest)
except Exception:
return traceback.format_exc()
return OK
def copy_from_cache(self, sig, files_from, files_to):
try:
for i, x in enumerate(files_to):
orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
self.bucket_copy(orig, x)
except EnvironmentError:
return traceback.format_exc()
return OK
def loop(service):
@ -432,7 +497,7 @@ def loop(service):
[sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
if files_from:
# TODO return early when pushing files upstream
ret = service.copy_from_cache(sig, files_from, files_to)
ret = service.copy_to_cache(sig, files_from, files_to)
elif files_to:
# the build process waits for workers to (possibly) obtain files from the cache
ret = service.copy_from_cache(sig, files_from, files_to)
@ -445,7 +510,9 @@ def loop(service):
sys.stdout.flush()
if __name__ == '__main__':
if CACHE_DIR.startswith('http'):
if CACHE_DIR.startswith('s3://') or CACHE_DIR.startswith('gs://'):
service = bucket_cache()
elif CACHE_DIR.startswith('http'):
service = netcache()
else:
service = fcache()