mirror of
https://gitlab.com/ita1024/waf.git
synced 2024-11-22 01:46:15 +01:00
Updated the netcache client/server
This commit is contained in:
parent
8bb6b1d299
commit
004b866789
@ -1,262 +0,0 @@
|
|||||||
#! /usr/bin/env python
|
|
||||||
# encoding: utf-8
|
|
||||||
# Thomas Nagy 2011 (ita)
|
|
||||||
|
|
||||||
"""
|
|
||||||
A simple TCP server to cache files over the network.
|
|
||||||
The client is located in waflib/extras/netcache_client.py
|
|
||||||
|
|
||||||
This server uses a LRU cache policy (remove least recently used files), which means
|
|
||||||
that there is no risk of filling up the entire filesystem.
|
|
||||||
|
|
||||||
Security:
|
|
||||||
---------
|
|
||||||
+ the LRU cache policy will prevent filesystem saturation
|
|
||||||
+ invalid queries will be rejected (no risk of reading/writing arbitrary files on the OS)
|
|
||||||
- attackers might poison the cache
|
|
||||||
|
|
||||||
Performance:
|
|
||||||
------------
|
|
||||||
The server seems to work pretty well (for me) at the moment. cPython is limited to only
|
|
||||||
one CPU core, but there is a Java version of this server (Netcache.java). Send your
|
|
||||||
performance results to the Waf mailing-list!
|
|
||||||
|
|
||||||
Future ideas:
|
|
||||||
-------------
|
|
||||||
- File transfer integrity
|
|
||||||
- Use servers on different ports (eg: get->1200, put->51201) to enable firewall filtering
|
|
||||||
- Use different processes for get/put (performance improvement)
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os, re, tempfile, socket, threading, shutil
|
|
||||||
import SocketServer
|
|
||||||
|
|
||||||
CACHEDIR = '/tmp/wafcache'
|
|
||||||
CONN = (socket.gethostname(), 51200)
|
|
||||||
HEADER_SIZE = 128
|
|
||||||
BUF = 8192*16
|
|
||||||
MAX = 50*1024*1024*1024 # in bytes
|
|
||||||
CLEANRATIO = 0.85
|
|
||||||
CHARS = '0123456789abcdef'
|
|
||||||
|
|
||||||
GET = 'GET'
|
|
||||||
PUT = 'PUT'
|
|
||||||
LST = 'LST'
|
|
||||||
BYE = 'BYE'
|
|
||||||
CLEAN = 'CLN'
|
|
||||||
RESET = 'RST'
|
|
||||||
|
|
||||||
re_valid_query = re.compile('^[a-zA-Z0-9_, ]+$')
|
|
||||||
|
|
||||||
flist = {}
|
|
||||||
def init_flist():
|
|
||||||
"""map the cache folder names to the timestamps and sizes"""
|
|
||||||
global flist
|
|
||||||
try:
|
|
||||||
os.makedirs(CACHEDIR)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
flist = {}
|
|
||||||
for x in os.listdir(CACHEDIR):
|
|
||||||
if len(x) != 2:
|
|
||||||
continue
|
|
||||||
for y in os.listdir(os.path.join(CACHEDIR, x)):
|
|
||||||
path = os.path.join(CACHEDIR, x, y)
|
|
||||||
size = 0
|
|
||||||
for z in os.listdir(path):
|
|
||||||
size += os.stat(os.path.join(path, z)).st_size
|
|
||||||
flist[y] = [os.stat(path).st_mtime, size]
|
|
||||||
|
|
||||||
lock = threading.Lock()
|
|
||||||
def make_clean():
|
|
||||||
global lock
|
|
||||||
# there is no need to spend a lot of time cleaning
|
|
||||||
# so one thread cleans and the others return immediately
|
|
||||||
|
|
||||||
if lock.acquire(0):
|
|
||||||
try:
|
|
||||||
make_clean_unsafe()
|
|
||||||
finally:
|
|
||||||
lock.release()
|
|
||||||
|
|
||||||
def make_clean_unsafe():
|
|
||||||
global MAX, flist
|
|
||||||
# and do some cleanup if necessary
|
|
||||||
total = sum([x[1] for x in flist.values()])
|
|
||||||
|
|
||||||
#print("and the total is %d" % total)
|
|
||||||
if total >= MAX:
|
|
||||||
print("Trimming the cache since %r > %r" % (total, MAX))
|
|
||||||
lst = [(p, v[0], v[1]) for (p, v) in flist.items()]
|
|
||||||
lst.sort(key=lambda x: x[1]) # sort by timestamp
|
|
||||||
lst.reverse()
|
|
||||||
|
|
||||||
while total >= MAX * CLEANRATIO:
|
|
||||||
(k, t, s) = lst.pop()
|
|
||||||
shutil.rmtree(os.path.join(CACHEDIR, k[:2], k))
|
|
||||||
total -= s
|
|
||||||
del flist[k]
|
|
||||||
|
|
||||||
def reset():
|
|
||||||
global MAX, flist
|
|
||||||
tmp = list(flist.keys())
|
|
||||||
lock.acquire()
|
|
||||||
try:
|
|
||||||
flist = {}
|
|
||||||
finally:
|
|
||||||
lock.release()
|
|
||||||
for x in CHARS:
|
|
||||||
for y in CHARS:
|
|
||||||
try:
|
|
||||||
os.rename(os.path.join(CACHEDIR, x+y), os.path.join(CACHEDIR, x+y+'_rm'))
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
for x in CHARS:
|
|
||||||
for y in CHARS:
|
|
||||||
try:
|
|
||||||
shutil.rmtree(os.path.join(CACHEDIR, x+y+'_rm'))
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def update(ssig):
|
|
||||||
"""update the cache folder and make some space if necessary"""
|
|
||||||
global flist
|
|
||||||
# D, T, S : directory, timestamp, size
|
|
||||||
|
|
||||||
# update the contents with the last folder created
|
|
||||||
cnt = 0
|
|
||||||
d = os.path.join(CACHEDIR, ssig[:2], ssig)
|
|
||||||
for k in os.listdir(d):
|
|
||||||
cnt += os.stat(os.path.join(d, k)).st_size
|
|
||||||
|
|
||||||
# the same thread will usually push the next files
|
|
||||||
try:
|
|
||||||
flist[ssig][1] = cnt
|
|
||||||
except:
|
|
||||||
flist[ssig] = [os.stat(d).st_mtime, cnt]
|
|
||||||
|
|
||||||
class req(SocketServer.StreamRequestHandler):
|
|
||||||
def handle(self):
|
|
||||||
while 1:
|
|
||||||
try:
|
|
||||||
self.process_command()
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
break
|
|
||||||
|
|
||||||
def process_command(self):
|
|
||||||
query = self.rfile.read(HEADER_SIZE).strip()
|
|
||||||
#print "%r" % query
|
|
||||||
if not re_valid_query.match(query):
|
|
||||||
raise ValueError('Invalid query %r' % query)
|
|
||||||
|
|
||||||
query = query.strip().split(',')
|
|
||||||
|
|
||||||
if query[0] == GET:
|
|
||||||
self.get_file(query[1:])
|
|
||||||
elif query[0] == PUT:
|
|
||||||
self.put_file(query[1:])
|
|
||||||
elif query[0] == LST:
|
|
||||||
self.lst_file(query[1:])
|
|
||||||
elif query[0] == CLEAN:
|
|
||||||
make_clean()
|
|
||||||
elif query[0] == RESET:
|
|
||||||
reset()
|
|
||||||
elif query[0] == BYE:
|
|
||||||
raise ValueError('Exit')
|
|
||||||
else:
|
|
||||||
raise ValueError('Invalid query %r' % query)
|
|
||||||
|
|
||||||
def lst_file(self, query):
|
|
||||||
response = '\n'.join(flist.keys())
|
|
||||||
params = [str(len(response)),'']
|
|
||||||
self.wfile.write(','.join(params).ljust(HEADER_SIZE))
|
|
||||||
self.wfile.write(response)
|
|
||||||
|
|
||||||
def get_file(self, query):
|
|
||||||
# get a file from the cache if it exists, else return 0
|
|
||||||
tmp = os.path.join(CACHEDIR, query[0][:2], query[0], query[1])
|
|
||||||
fsize = -1
|
|
||||||
try:
|
|
||||||
fsize = os.stat(tmp).st_size
|
|
||||||
except Exception:
|
|
||||||
#print(e)
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
# cache was useful, update the last access for LRU
|
|
||||||
d = os.path.join(CACHEDIR, query[0][:2], query[0])
|
|
||||||
os.utime(d, None)
|
|
||||||
flist[query[0]][0] = os.stat(d).st_mtime
|
|
||||||
params = [str(fsize)]
|
|
||||||
self.wfile.write(','.join(params).ljust(HEADER_SIZE))
|
|
||||||
|
|
||||||
if fsize < 0:
|
|
||||||
#print("file not found in cache %s" % query[0])
|
|
||||||
return
|
|
||||||
f = open(tmp, 'rb')
|
|
||||||
try:
|
|
||||||
cnt = 0
|
|
||||||
while cnt < fsize:
|
|
||||||
r = f.read(BUF)
|
|
||||||
self.wfile.write(r)
|
|
||||||
cnt += len(r)
|
|
||||||
finally:
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
def put_file(self, query):
|
|
||||||
# add a file to the cache, the third parameter is the file size
|
|
||||||
(fd, filename) = tempfile.mkstemp(dir=CACHEDIR)
|
|
||||||
try:
|
|
||||||
size = int(query[2])
|
|
||||||
cnt = 0
|
|
||||||
while cnt < size:
|
|
||||||
r = self.rfile.read(min(BUF, size-cnt))
|
|
||||||
if not r:
|
|
||||||
raise ValueError('Connection closed')
|
|
||||||
os.write(fd, r)
|
|
||||||
cnt += len(r)
|
|
||||||
finally:
|
|
||||||
os.close(fd)
|
|
||||||
|
|
||||||
|
|
||||||
d = os.path.join(CACHEDIR, query[0][:2], query[0])
|
|
||||||
try:
|
|
||||||
os.stat(d)
|
|
||||||
except:
|
|
||||||
try:
|
|
||||||
# obvious race condition here
|
|
||||||
os.makedirs(d)
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
os.rename(filename, os.path.join(d, query[1]))
|
|
||||||
except OSError:
|
|
||||||
pass # folder removed by the user, or another thread is pushing the same file
|
|
||||||
try:
|
|
||||||
update(query[0])
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
make_clean()
|
|
||||||
|
|
||||||
class req_only_get(req):
|
|
||||||
def put_file(self, query):
|
|
||||||
self.wfile.write('ERROR,'.ljust(HEADER_SIZE))
|
|
||||||
raise ValueError('Put is forbidden')
|
|
||||||
|
|
||||||
class req_only_put(req):
|
|
||||||
def get_file(self, query):
|
|
||||||
self.wfile.write('ERROR,'.ljust(HEADER_SIZE))
|
|
||||||
raise ValueError('Get is forbidden')
|
|
||||||
|
|
||||||
def create_server(conn, cls):
|
|
||||||
SocketServer.ThreadingTCPServer.allow_reuse_address = True
|
|
||||||
server = SocketServer.ThreadingTCPServer(CONN, req)
|
|
||||||
server.timeout = 60 # seconds
|
|
||||||
server.serve_forever()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
init_flist()
|
|
||||||
print("ready (%r dirs)" % len(flist.keys()))
|
|
||||||
create_server(CONN, req)
|
|
||||||
|
|
@ -10,12 +10,16 @@ A client for the network cache (playground/netcache/). Launch the server with:
|
|||||||
bld.load('netcache_client')
|
bld.load('netcache_client')
|
||||||
|
|
||||||
The parameters should be present in the environment in the form:
|
The parameters should be present in the environment in the form:
|
||||||
NETCACHE=host:port@mode waf configure build
|
NETCACHE=host:port waf configure build
|
||||||
|
|
||||||
|
Or in a more detailed way:
|
||||||
|
NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
|
||||||
|
|
||||||
where:
|
where:
|
||||||
mode: PUSH, PULL, PUSH_PULL
|
host: host where the server resides, by default localhost
|
||||||
host: host where the server resides, for example 127.0.0.1
|
port: by default push on 11001 and pull on 12001
|
||||||
port: by default the java server receives files on 11001 and returns data on 12001
|
|
||||||
|
Use the server provided in playground/netcache/Netcache.java
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os, socket, time, atexit, sys
|
import os, socket, time, atexit, sys
|
||||||
@ -44,19 +48,29 @@ def put_data(conn, data):
|
|||||||
raise RuntimeError('connection ended')
|
raise RuntimeError('connection ended')
|
||||||
cnt += sent
|
cnt += sent
|
||||||
|
|
||||||
active_connections = Runner.Queue(0)
|
push_connections = Runner.Queue(0)
|
||||||
def get_connection():
|
pull_connections = Runner.Queue(0)
|
||||||
|
def get_connection(push=False):
|
||||||
# return a new connection... do not forget to release it!
|
# return a new connection... do not forget to release it!
|
||||||
try:
|
try:
|
||||||
ret = active_connections.get(block=False)
|
if push:
|
||||||
|
ret = push_connections.get(block=False)
|
||||||
|
else:
|
||||||
|
ret = pull_connections.get(block=False)
|
||||||
except Exception:
|
except Exception:
|
||||||
ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
ret.connect(Task.net_cache[:2])
|
if push:
|
||||||
|
ret.connect(Task.push_addr)
|
||||||
|
else:
|
||||||
|
ret.connect(Task.pull_addr)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def release_connection(conn, msg=''):
|
def release_connection(conn, msg='', push=False):
|
||||||
if conn:
|
if conn:
|
||||||
active_connections.put(conn)
|
if push:
|
||||||
|
push_connections.put(conn)
|
||||||
|
else:
|
||||||
|
pull_connections.put(conn)
|
||||||
|
|
||||||
def close_connection(conn, msg=''):
|
def close_connection(conn, msg=''):
|
||||||
if conn:
|
if conn:
|
||||||
@ -71,12 +85,14 @@ def close_connection(conn, msg=''):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def close_all():
|
def close_all():
|
||||||
while active_connections.qsize():
|
for q in (push_connections, pull_connections):
|
||||||
conn = active_connections.get()
|
while q.qsize():
|
||||||
try:
|
conn = q.get()
|
||||||
close_connection(conn)
|
try:
|
||||||
except:
|
close_connection(conn)
|
||||||
pass
|
except:
|
||||||
|
# ignore errors when cleaning up
|
||||||
|
pass
|
||||||
atexit.register(close_all)
|
atexit.register(close_all)
|
||||||
|
|
||||||
def read_header(conn):
|
def read_header(conn):
|
||||||
@ -179,12 +195,10 @@ def sock_send(conn, ssig, cnt, p):
|
|||||||
r = r[k:]
|
r = r[k:]
|
||||||
|
|
||||||
def can_retrieve_cache(self):
|
def can_retrieve_cache(self):
|
||||||
if not Task.net_cache:
|
if not Task.pull_addr:
|
||||||
return False
|
return False
|
||||||
if not self.outputs:
|
if not self.outputs:
|
||||||
return False
|
return False
|
||||||
if Task.net_cache[-1] == 'PUSH':
|
|
||||||
return
|
|
||||||
self.cached = False
|
self.cached = False
|
||||||
|
|
||||||
cnt = 0
|
cnt = 0
|
||||||
@ -226,12 +240,10 @@ def can_retrieve_cache(self):
|
|||||||
|
|
||||||
@Utils.run_once
|
@Utils.run_once
|
||||||
def put_files_cache(self):
|
def put_files_cache(self):
|
||||||
if not Task.net_cache:
|
if not Task.push_addr:
|
||||||
return
|
return
|
||||||
if not self.outputs:
|
if not self.outputs:
|
||||||
return
|
return
|
||||||
if Task.net_cache[-1] == 'PULL':
|
|
||||||
return
|
|
||||||
if getattr(self, 'cached', None):
|
if getattr(self, 'cached', None):
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -249,7 +261,7 @@ def put_files_cache(self):
|
|||||||
# this is unnecessary
|
# this is unnecessary
|
||||||
try:
|
try:
|
||||||
if not conn:
|
if not conn:
|
||||||
conn = get_connection()
|
conn = get_connection(push=True)
|
||||||
sock_send(conn, ssig, cnt, node.abspath())
|
sock_send(conn, ssig, cnt, node.abspath())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
Logs.debug("netcache: could not push the files %r" % e)
|
Logs.debug("netcache: could not push the files %r" % e)
|
||||||
@ -259,11 +271,12 @@ def put_files_cache(self):
|
|||||||
conn = None
|
conn = None
|
||||||
cnt += 1
|
cnt += 1
|
||||||
finally:
|
finally:
|
||||||
release_connection(conn)
|
release_connection(conn, push=True)
|
||||||
|
|
||||||
bld.task_sigs[self.uid()] = self.cache_sig
|
bld.task_sigs[self.uid()] = self.cache_sig
|
||||||
|
|
||||||
def hash_env_vars(self, env, vars_lst):
|
def hash_env_vars(self, env, vars_lst):
|
||||||
|
# reimplement so that the resulting hash does not depend on local paths
|
||||||
if not env.table:
|
if not env.table:
|
||||||
env = env.parent
|
env = env.parent
|
||||||
if not env:
|
if not env:
|
||||||
@ -293,6 +306,7 @@ def hash_env_vars(self, env, vars_lst):
|
|||||||
return ret
|
return ret
|
||||||
|
|
||||||
def uid(self):
|
def uid(self):
|
||||||
|
# reimplement so that the signature does not depend on local paths
|
||||||
try:
|
try:
|
||||||
return self.uid_
|
return self.uid_
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
@ -312,7 +326,6 @@ def make_cached(cls):
|
|||||||
|
|
||||||
m1 = cls.run
|
m1 = cls.run
|
||||||
def run(self):
|
def run(self):
|
||||||
bld = self.generator.bld
|
|
||||||
if self.can_retrieve_cache():
|
if self.can_retrieve_cache():
|
||||||
return 0
|
return 0
|
||||||
return m1(self)
|
return m1(self)
|
||||||
@ -328,12 +341,12 @@ def make_cached(cls):
|
|||||||
cls.post_run = post_run
|
cls.post_run = post_run
|
||||||
|
|
||||||
@conf
|
@conf
|
||||||
def setup_netcache(ctx, host, port, mode):
|
def setup_netcache(ctx, push_addr, pull_addr):
|
||||||
Logs.warn('Using the network cache %s, %s, %s' % (host, port, mode))
|
|
||||||
Task.net_cache = (host, port, mode)
|
|
||||||
Task.Task.can_retrieve_cache = can_retrieve_cache
|
Task.Task.can_retrieve_cache = can_retrieve_cache
|
||||||
Task.Task.put_files_cache = put_files_cache
|
Task.Task.put_files_cache = put_files_cache
|
||||||
Task.Task.uid = uid
|
Task.Task.uid = uid
|
||||||
|
Task.push_addr = push_addr
|
||||||
|
Task.pull_addr = pull_addr
|
||||||
Build.BuildContext.hash_env_vars = hash_env_vars
|
Build.BuildContext.hash_env_vars = hash_env_vars
|
||||||
ctx.cache_global = True
|
ctx.cache_global = True
|
||||||
|
|
||||||
@ -341,22 +354,30 @@ def setup_netcache(ctx, host, port, mode):
|
|||||||
make_cached(x)
|
make_cached(x)
|
||||||
|
|
||||||
def build(bld):
|
def build(bld):
|
||||||
if not 'NETCACHE' in os.environ:
|
if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
|
||||||
Logs.warn('The network cache is disabled, set NETCACHE=host:port@mode to enable, eg:')
|
Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
|
||||||
Logs.warn(' export NETCACHE=localhost:51200@PUSH_PULL')
|
os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
|
||||||
else:
|
os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
|
||||||
v = os.environ['NETCACHE']
|
|
||||||
if v in MODES:
|
if 'NETCACHE' in os.environ:
|
||||||
host = socket.gethostname()
|
if not 'NETCACHE_PUSH' in os.environ:
|
||||||
port = 51200
|
os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
|
||||||
mode = v
|
if not 'NETCACHE_PULL' in os.environ:
|
||||||
else:
|
os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
|
||||||
mode = 'PUSH_PULL'
|
|
||||||
host, port = v.split(':')
|
v = os.environ['NETCACHE_PULL']
|
||||||
if port.find('@'):
|
if v:
|
||||||
port, mode = port.split('@')
|
h, p = v.split(':')
|
||||||
port = int(port)
|
pull_addr = (h, int(p))
|
||||||
if not mode in MODES:
|
else:
|
||||||
bld.fatal('Invalid mode %s not in %r' % (mode, MODES))
|
pull_addr = None
|
||||||
setup_netcache(bld, host, port, mode)
|
|
||||||
|
v = os.environ['NETCACHE_PUSH']
|
||||||
|
if v:
|
||||||
|
h, p = v.split(':')
|
||||||
|
push_addr = (h, int(p))
|
||||||
|
else:
|
||||||
|
push_addr = None
|
||||||
|
|
||||||
|
setup_netcache(bld, push_addr, pull_addr)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user