From 004b866789fbc94f08109fcb3e290105c79578a9 Mon Sep 17 00:00:00 2001 From: Thomas Nagy Date: Thu, 29 Jan 2015 00:00:01 +0100 Subject: [PATCH] Updated the netcache client/server --- playground/netcache/netcache_server.py | 262 ------------------------- waflib/extras/netcache_client.py | 113 ++++++----- 2 files changed, 67 insertions(+), 308 deletions(-) delete mode 100755 playground/netcache/netcache_server.py diff --git a/playground/netcache/netcache_server.py b/playground/netcache/netcache_server.py deleted file mode 100755 index 3dedd8f3..00000000 --- a/playground/netcache/netcache_server.py +++ /dev/null @@ -1,262 +0,0 @@ -#! /usr/bin/env python -# encoding: utf-8 -# Thomas Nagy 2011 (ita) - -""" -A simple TCP server to cache files over the network. -The client is located in waflib/extras/netcache_client.py - -This server uses a LRU cache policy (remove least recently used files), which means -that there is no risk of filling up the entire filesystem. - -Security: ---------- -+ the LRU cache policy will prevent filesystem saturation -+ invalid queries will be rejected (no risk of reading/writing arbitrary files on the OS) -- attackers might poison the cache - -Performance: ------------- -The server seems to work pretty well (for me) at the moment. cPython is limited to only -one CPU core, but there is a Java version of this server (Netcache.java). Send your -performance results to the Waf mailing-list! - -Future ideas: -------------- -- File transfer integrity -- Use servers on different ports (eg: get->1200, put->51201) to enable firewall filtering -- Use different processes for get/put (performance improvement) -""" - -import os, re, tempfile, socket, threading, shutil -import SocketServer - -CACHEDIR = '/tmp/wafcache' -CONN = (socket.gethostname(), 51200) -HEADER_SIZE = 128 -BUF = 8192*16 -MAX = 50*1024*1024*1024 # in bytes -CLEANRATIO = 0.85 -CHARS = '0123456789abcdef' - -GET = 'GET' -PUT = 'PUT' -LST = 'LST' -BYE = 'BYE' -CLEAN = 'CLN' -RESET = 'RST' - -re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$') - -flist = {} -def init_flist(): - """map the cache folder names to the timestamps and sizes""" - global flist - try: - os.makedirs(CACHEDIR) - except: - pass - flist = {} - for x in os.listdir(CACHEDIR): - if len(x) != 2: - continue - for y in os.listdir(os.path.join(CACHEDIR, x)): - path = os.path.join(CACHEDIR, x, y) - size = 0 - for z in os.listdir(path): - size += os.stat(os.path.join(path, z)).st_size - flist[y] = [os.stat(path).st_mtime, size] - -lock = threading.Lock() -def make_clean(): - global lock - # there is no need to spend a lot of time cleaning - # so one thread cleans and the others return immediately - - if lock.acquire(0): - try: - make_clean_unsafe() - finally: - lock.release() - -def make_clean_unsafe(): - global MAX, flist - # and do some cleanup if necessary - total = sum([x[1] for x in flist.values()]) - - #print("and the total is %d" % total) - if total >= MAX: - print("Trimming the cache since %r > %r" % (total, MAX)) - lst = [(p, v[0], v[1]) for (p, v) in flist.items()] - lst.sort(key=lambda x: x[1]) # sort by timestamp - lst.reverse() - - while total >= MAX * CLEANRATIO: - (k, t, s) = lst.pop() - shutil.rmtree(os.path.join(CACHEDIR, k[:2], k)) - total -= s - del flist[k] - -def reset(): - global MAX, flist - tmp = list(flist.keys()) - lock.acquire() - try: - flist = {} - finally: - lock.release() - for x in CHARS: - for y in CHARS: - try: - os.rename(os.path.join(CACHEDIR, x+y), os.path.join(CACHEDIR, x+y+'_rm')) - except: - pass - for x in CHARS: - for y in CHARS: - try: - shutil.rmtree(os.path.join(CACHEDIR, x+y+'_rm')) - except: - pass - - -def update(ssig): - """update the cache folder and make some space if necessary""" - global flist - # D, T, S : directory, timestamp, size - - # update the contents with the last folder created - cnt = 0 - d = os.path.join(CACHEDIR, ssig[:2], ssig) - for k in os.listdir(d): - cnt += os.stat(os.path.join(d, k)).st_size - - # the same thread will usually push the next files - try: - flist[ssig][1] = cnt - except: - flist[ssig] = [os.stat(d).st_mtime, cnt] - -class req(SocketServer.StreamRequestHandler): - def handle(self): - while 1: - try: - self.process_command() - except Exception as e: - print(e) - break - - def process_command(self): - query = self.rfile.read(HEADER_SIZE).strip() - #print "%r" % query - if not re_valid_query.match(query): - raise ValueError('Invalid query %r' % query) - - query = query.strip().split(',') - - if query[0] == GET: - self.get_file(query[1:]) - elif query[0] == PUT: - self.put_file(query[1:]) - elif query[0] == LST: - self.lst_file(query[1:]) - elif query[0] == CLEAN: - make_clean() - elif query[0] == RESET: - reset() - elif query[0] == BYE: - raise ValueError('Exit') - else: - raise ValueError('Invalid query %r' % query) - - def lst_file(self, query): - response = '\n'.join(flist.keys()) - params = [str(len(response)),''] - self.wfile.write(','.join(params).ljust(HEADER_SIZE)) - self.wfile.write(response) - - def get_file(self, query): - # get a file from the cache if it exists, else return 0 - tmp = os.path.join(CACHEDIR, query[0][:2], query[0], query[1]) - fsize = -1 - try: - fsize = os.stat(tmp).st_size - except Exception: - #print(e) - pass - else: - # cache was useful, update the last access for LRU - d = os.path.join(CACHEDIR, query[0][:2], query[0]) - os.utime(d, None) - flist[query[0]][0] = os.stat(d).st_mtime - params = [str(fsize)] - self.wfile.write(','.join(params).ljust(HEADER_SIZE)) - - if fsize < 0: - #print("file not found in cache %s" % query[0]) - return - f = open(tmp, 'rb') - try: - cnt = 0 - while cnt < fsize: - r = f.read(BUF) - self.wfile.write(r) - cnt += len(r) - finally: - f.close() - - def put_file(self, query): - # add a file to the cache, the third parameter is the file size - (fd, filename) = tempfile.mkstemp(dir=CACHEDIR) - try: - size = int(query[2]) - cnt = 0 - while cnt < size: - r = self.rfile.read(min(BUF, size-cnt)) - if not r: - raise ValueError('Connection closed') - os.write(fd, r) - cnt += len(r) - finally: - os.close(fd) - - - d = os.path.join(CACHEDIR, query[0][:2], query[0]) - try: - os.stat(d) - except: - try: - # obvious race condition here - os.makedirs(d) - except OSError: - pass - try: - os.rename(filename, os.path.join(d, query[1])) - except OSError: - pass # folder removed by the user, or another thread is pushing the same file - try: - update(query[0]) - except OSError: - pass - make_clean() - -class req_only_get(req): - def put_file(self, query): - self.wfile.write('ERROR,'.ljust(HEADER_SIZE)) - raise ValueError('Put is forbidden') - -class req_only_put(req): - def get_file(self, query): - self.wfile.write('ERROR,'.ljust(HEADER_SIZE)) - raise ValueError('Get is forbidden') - -def create_server(conn, cls): - SocketServer.ThreadingTCPServer.allow_reuse_address = True - server = SocketServer.ThreadingTCPServer(CONN, req) - server.timeout = 60 # seconds - server.serve_forever() - -if __name__ == '__main__': - init_flist() - print("ready (%r dirs)" % len(flist.keys())) - create_server(CONN, req) - diff --git a/waflib/extras/netcache_client.py b/waflib/extras/netcache_client.py index cf8e932c..6b16e301 100644 --- a/waflib/extras/netcache_client.py +++ b/waflib/extras/netcache_client.py @@ -10,12 +10,16 @@ A client for the network cache (playground/netcache/). Launch the server with: bld.load('netcache_client') The parameters should be present in the environment in the form: - NETCACHE=host:port@mode waf configure build + NETCACHE=host:port waf configure build + +Or in a more detailed way: + NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build where: - mode: PUSH, PULL, PUSH_PULL - host: host where the server resides, for example 127.0.0.1 - port: by default the java server receives files on 11001 and returns data on 12001 + host: host where the server resides, by default localhost + port: by default push on 11001 and pull on 12001 + +Use the server provided in playground/netcache/Netcache.java """ import os, socket, time, atexit, sys @@ -44,19 +48,29 @@ def put_data(conn, data): raise RuntimeError('connection ended') cnt += sent -active_connections = Runner.Queue(0) -def get_connection(): +push_connections = Runner.Queue(0) +pull_connections = Runner.Queue(0) +def get_connection(push=False): # return a new connection... do not forget to release it! try: - ret = active_connections.get(block=False) + if push: + ret = push_connections.get(block=False) + else: + ret = pull_connections.get(block=False) except Exception: ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ret.connect(Task.net_cache[:2]) + if push: + ret.connect(Task.push_addr) + else: + ret.connect(Task.pull_addr) return ret -def release_connection(conn, msg=''): +def release_connection(conn, msg='', push=False): if conn: - active_connections.put(conn) + if push: + push_connections.put(conn) + else: + pull_connections.put(conn) def close_connection(conn, msg=''): if conn: @@ -71,12 +85,14 @@ def close_connection(conn, msg=''): pass def close_all(): - while active_connections.qsize(): - conn = active_connections.get() - try: - close_connection(conn) - except: - pass + for q in (push_connections, pull_connections): + while q.qsize(): + conn = q.get() + try: + close_connection(conn) + except: + # ignore errors when cleaning up + pass atexit.register(close_all) def read_header(conn): @@ -179,12 +195,10 @@ def sock_send(conn, ssig, cnt, p): r = r[k:] def can_retrieve_cache(self): - if not Task.net_cache: + if not Task.pull_addr: return False if not self.outputs: return False - if Task.net_cache[-1] == 'PUSH': - return self.cached = False cnt = 0 @@ -226,12 +240,10 @@ def can_retrieve_cache(self): @Utils.run_once def put_files_cache(self): - if not Task.net_cache: + if not Task.push_addr: return if not self.outputs: return - if Task.net_cache[-1] == 'PULL': - return if getattr(self, 'cached', None): return @@ -249,7 +261,7 @@ def put_files_cache(self): # this is unnecessary try: if not conn: - conn = get_connection() + conn = get_connection(push=True) sock_send(conn, ssig, cnt, node.abspath()) except Exception as e: Logs.debug("netcache: could not push the files %r" % e) @@ -259,11 +271,12 @@ def put_files_cache(self): conn = None cnt += 1 finally: - release_connection(conn) + release_connection(conn, push=True) bld.task_sigs[self.uid()] = self.cache_sig def hash_env_vars(self, env, vars_lst): + # reimplement so that the resulting hash does not depend on local paths if not env.table: env = env.parent if not env: @@ -293,6 +306,7 @@ def hash_env_vars(self, env, vars_lst): return ret def uid(self): + # reimplement so that the signature does not depend on local paths try: return self.uid_ except AttributeError: @@ -312,7 +326,6 @@ def make_cached(cls): m1 = cls.run def run(self): - bld = self.generator.bld if self.can_retrieve_cache(): return 0 return m1(self) @@ -328,12 +341,12 @@ def make_cached(cls): cls.post_run = post_run @conf -def setup_netcache(ctx, host, port, mode): - Logs.warn('Using the network cache %s, %s, %s' % (host, port, mode)) - Task.net_cache = (host, port, mode) +def setup_netcache(ctx, push_addr, pull_addr): Task.Task.can_retrieve_cache = can_retrieve_cache Task.Task.put_files_cache = put_files_cache Task.Task.uid = uid + Task.push_addr = push_addr + Task.pull_addr = pull_addr Build.BuildContext.hash_env_vars = hash_env_vars ctx.cache_global = True @@ -341,22 +354,30 @@ def setup_netcache(ctx, host, port, mode): make_cached(x) def build(bld): - if not 'NETCACHE' in os.environ: - Logs.warn('The network cache is disabled, set NETCACHE=host:port@mode to enable, eg:') - Logs.warn(' export NETCACHE=localhost:51200@PUSH_PULL') - else: - v = os.environ['NETCACHE'] - if v in MODES: - host = socket.gethostname() - port = 51200 - mode = v - else: - mode = 'PUSH_PULL' - host, port = v.split(':') - if port.find('@'): - port, mode = port.split('@') - port = int(port) - if not mode in MODES: - bld.fatal('Invalid mode %s not in %r' % (mode, MODES)) - setup_netcache(bld, host, port, mode) + if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ: + Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001') + os.environ['NETCACHE_PULL'] = '127.0.0.1:12001' + os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001' + + if 'NETCACHE' in os.environ: + if not 'NETCACHE_PUSH' in os.environ: + os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE'] + if not 'NETCACHE_PULL' in os.environ: + os.environ['NETCACHE_PULL'] = os.environ['NETCACHE'] + + v = os.environ['NETCACHE_PULL'] + if v: + h, p = v.split(':') + pull_addr = (h, int(p)) + else: + pull_addr = None + + v = os.environ['NETCACHE_PUSH'] + if v: + h, p = v.split(':') + push_addr = (h, int(p)) + else: + push_addr = None + + setup_netcache(bld, push_addr, pull_addr)