Enable wafcache remote servers

This commit is contained in:
Thomas Nagy 2019-08-25 22:31:12 +02:00
parent 397432f81e
commit aece9b5e4b
1 changed files with 85 additions and 35 deletions

View File

@ -8,6 +8,11 @@ Filesystem-based cache system to share and re-use build artifacts
The following environment variables may be set:
* WAFCACHE: absolute path of the waf cache (/tmp/wafcache_user,
where `user` represents the currently logged-in user)
or a URL to a cache server, for example:
export WAFCACHE=http://localhost:8080/files/
in that case, GET/POST requests are made to urls of the form
http://localhost:8080/files/000000000/0 (cache
management is then up to the server)
* WAFCACHE_TRIM_MAX_FOLDER: maximum amount of tasks to cache (1M)
* WAFCACHE_EVICT_MAX_BYTES: maximum amount of cache size in bytes (10GB)
* WAFCACHE_EVICT_INTERVAL_MINUTES: minimum time interval to try
@ -27,7 +32,7 @@ Usage::
...
"""
import atexit, base64, errno, fcntl, getpass, os, shutil, sys, threading, time
import atexit, base64, errno, fcntl, getpass, os, shutil, sys, threading, time, urllib3
try:
import subprocess32 as subprocess
except ImportError:
@ -334,40 +339,81 @@ def lru_evict():
finally:
os.close(fd)
def copy_to_cache(sig, files_from, files_to):
"""
Copy files to the cache, existing files are overwritten,
and the copy is atomic only for a given file, not for all files
that belong to a given task object
"""
try:
for i, x in enumerate(files_from):
dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
atomic_copy(x, dest)
except EnvironmentError:
# no errors should be raised
pass
else:
# attempt trimming if caching was successful:
# we may have things to trim!
lru_evict()
class netcache(object):
def __init__(self):
self.http = urllib3.PoolManager()
def copy_from_cache(sig, files_from, files_to):
"""
Copy files from the cache
"""
try:
for i, x in enumerate(files_to):
orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
atomic_copy(orig, x)
def url_of(self, sig, i):
return "%s/%s/%s" % (CACHE_DIR, sig, i)
# success! update the cache time
os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
except EnvironmentError as e:
return "Failed to copy %r to %r: %s" % (orig, x, e)
return OK
def upload(self, file_path, sig, i):
url = self.url_of(sig, i)
with open(file_path, 'rb') as f:
file_data = f.read()
r = self.http.request('POST', url, timeout=60,
fields={ 'file': ('%s/%s' % (sig, i), file_data), })
if r.status >= 400:
raise OSError("Invalid status %r %r" % (url, r.status))
def loop():
def download(self, file_path, sig, i):
url = self.url_of(sig, i)
with self.http.request('GET', url, preload_content=False, timeout=60) as inf:
if inf.status >= 400:
raise OSError("Invalid status %r %r" % (url, inf.status))
with open(file_path, 'wb') as out:
shutil.copyfileobj(inf, out)
def copy_to_cache(self, sig, files_from, files_to):
try:
for i, x in enumerate(files_from):
if not os.path.islink(x):
self.upload(x, sig, i)
except EnvironmentError:
pass
def copy_from_cache(self, sig, files_from, files_to):
try:
for i, x in enumerate(files_to):
self.download(x, sig, i)
return OK
except EnvironmentError as e:
return "Failed to download %r to %r: %s" % (files_from, files_to, e)
class fcache(object):
def copy_to_cache(self, sig, files_from, files_to):
"""
Copy files to the cache, existing files are overwritten,
and the copy is atomic only for a given file, not for all files
that belong to a given task object
"""
try:
for i, x in enumerate(files_from):
dest = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
atomic_copy(x, dest)
except EnvironmentError:
# no errors should be raised
pass
else:
# attempt trimming if caching was successful:
# we may have things to trim!
lru_evict()
def copy_from_cache(self, sig, files_from, files_to):
"""
Copy files from the cache
"""
try:
for i, x in enumerate(files_to):
orig = os.path.join(CACHE_DIR, sig[:2], sig, str(i))
atomic_copy(orig, x)
# success! update the cache time
os.utime(os.path.join(CACHE_DIR, sig[:2], sig), None)
except EnvironmentError as e:
return "Failed to copy %r to %r: %s" % (files_from, files_to, e)
return OK
def loop(service):
"""
This function is run when this file is run as a standalone python script,
it assumes a parent process that will communicate the commands to it
@ -387,12 +433,12 @@ def loop():
[sig, files_from, files_to] = cPickle.loads(base64.b64decode(txt))
if files_from:
# pushing to cache is done without any wait
th = threading.Thread(target=copy_to_cache, args=(sig, files_from, files_to))
th = threading.Thread(target=service.copy_to_cache, args=(sig, files_from, files_to))
th.setDaemon(True)
th.start()
elif files_to:
# the build process waits for workers to (possibly) obtain files from the cache
ret = copy_from_cache(sig, files_from, files_to)
ret = service.copy_from_cache(sig, files_from, files_to)
else:
ret = "Invalid command"
@ -402,9 +448,13 @@ def loop():
sys.stdout.flush()
if __name__ == '__main__':
if CACHE_DIR.startswith('http'):
service = netcache()
else:
service = fcache()
while 1:
try:
loop()
loop(service)
except KeyboardInterrupt:
break