From 01e2648e93e718f84c77aed16326a14f2b498a2e Mon Sep 17 00:00:00 2001 From: Alexandre Flament Date: Sat, 5 Nov 2016 13:45:20 +0100 Subject: [PATCH] Simplify search.py, basically updated PR #518 The timeouts in settings.yml is about the total time (not only the HTTP request but also the prepare the request and parsing the response) It was more or less the case before since the threaded_requests function ignores the thread after the timeout even the HTTP request is ended. New / changed stats : * page_load_time : record the HTTP request time * page_load_count: the number of HTTP request * engine_time : the execution total time of an engine * engine_time_count : the number of "engine_time" measure The avg response times in the preferences are the engine response time (engine_load_time / engine_load_count) To sum up : * Search.search() filters the engines that can't process the request * Search.search() call search_multiple_requests function * search_multiple_requests creates one thread per engine, each thread runs the search_one_request function * search_one_request calls the request function, make the HTTP request, calls the response function, extends the result_container * search_multiple_requests waits for the the thread to finish (or timeout) --- searx/engines/__init__.py | 73 +++++++------ searx/search.py | 209 ++++++++++++++++++++------------------ searx/webapp.py | 2 + 3 files changed, 153 insertions(+), 131 deletions(-) diff --git a/searx/engines/__init__.py b/searx/engines/__init__.py index 14376c31..74c66e35 100644 --- a/searx/engines/__init__.py +++ b/searx/engines/__init__.py @@ -99,6 +99,9 @@ def load_engine(engine_data): 'result_count': 0, 'search_count': 0, 'page_load_time': 0, + 'page_load_count': 0, + 'engine_time': 0, + 'engine_time_count': 0, 'score_count': 0, 'errors': 0 } @@ -115,32 +118,56 @@ def load_engine(engine_data): return engine +def to_percentage(stats, maxvalue): + for engine_stat in stats: + if maxvalue: + engine_stat['percentage'] = int(engine_stat['avg'] / maxvalue * 100) + else: + engine_stat['percentage'] = 0 + return stats + + def get_engines_stats(): # TODO refactor pageloads = [] + engine_times = [] results = [] scores = [] errors = [] scores_per_result = [] - max_pageload = max_results = max_score = max_errors = max_score_per_result = 0 # noqa + max_pageload = max_engine_times = max_results = max_score = max_errors = max_score_per_result = 0 # noqa for engine in engines.values(): if engine.stats['search_count'] == 0: continue results_num = \ engine.stats['result_count'] / float(engine.stats['search_count']) - load_times = engine.stats['page_load_time'] / float(engine.stats['search_count']) # noqa + + if engine.stats['page_load_count'] != 0: + load_times = engine.stats['page_load_time'] / float(engine.stats['page_load_count']) # noqa + else: + load_times = 0 + + if engine.stats['engine_time_count'] != 0: + this_engine_time = engine.stats['engine_time'] / float(engine.stats['engine_time_count']) # noqa + else: + this_engine_time = 0 + if results_num: score = engine.stats['score_count'] / float(engine.stats['search_count']) # noqa score_per_result = score / results_num else: score = score_per_result = 0.0 - max_results = max(results_num, max_results) + max_pageload = max(load_times, max_pageload) + max_engine_times = max(this_engine_time, max_engine_times) + max_results = max(results_num, max_results) max_score = max(score, max_score) max_score_per_result = max(score_per_result, max_score_per_result) max_errors = max(max_errors, engine.stats['errors']) + pageloads.append({'avg': load_times, 'name': engine.name}) + engine_times.append({'avg': this_engine_time, 'name': engine.name}) results.append({'avg': results_num, 'name': engine.name}) scores.append({'avg': score, 'name': engine.name}) errors.append({'avg': engine.stats['errors'], 'name': engine.name}) @@ -149,38 +176,18 @@ def get_engines_stats(): 'name': engine.name }) - for engine in pageloads: - if max_pageload: - engine['percentage'] = int(engine['avg'] / max_pageload * 100) - else: - engine['percentage'] = 0 - - for engine in results: - if max_results: - engine['percentage'] = int(engine['avg'] / max_results * 100) - else: - engine['percentage'] = 0 - - for engine in scores: - if max_score: - engine['percentage'] = int(engine['avg'] / max_score * 100) - else: - engine['percentage'] = 0 - - for engine in scores_per_result: - if max_score_per_result: - engine['percentage'] = int(engine['avg'] - / max_score_per_result * 100) - else: - engine['percentage'] = 0 - - for engine in errors: - if max_errors: - engine['percentage'] = int(float(engine['avg']) / max_errors * 100) - else: - engine['percentage'] = 0 + pageloads = to_percentage(pageloads, max_pageload) + engine_times = to_percentage(engine_times, max_engine_times) + results = to_percentage(results, max_results) + scores = to_percentage(scores, max_score) + scores_per_result = to_percentage(scores_per_result, max_score_per_result) + erros = to_percentage(errors, max_errors) return [ + ( + gettext('Engine time (sec)'), + sorted(engine_times, key=itemgetter('avg')) + ), ( gettext('Page loads (sec)'), sorted(pageloads, key=itemgetter('avg')) diff --git a/searx/search.py b/searx/search.py index 4c4a6a2f..360e0fe9 100644 --- a/searx/search.py +++ b/searx/search.py @@ -35,14 +35,53 @@ logger = logger.getChild('search') number_of_searches = 0 -def search_request_wrapper(fn, url, engine_name, **kwargs): - ret = None - engine = engines[engine_name] +def send_http_request(engine, request_params, timeout_limit): + response = None try: - ret = fn(url, **kwargs) + # create dictionary which contain all + # informations about the request + request_args = dict( + headers=request_params['headers'], + cookies=request_params['cookies'], + timeout=timeout_limit, + verify=request_params['verify'] + ) + # specific type of request (GET or POST) + if request_params['method'] == 'GET': + req = requests_lib.get + else: + req = requests_lib.post + request_args['data'] = request_params['data'] + + # for page_load_time stats + time_before_request = time() + + # send the request + response = req(request_params['url'], **request_args) + with threading.RLock(): + # no error : reset the suspend variables engine.continuous_errors = 0 engine.suspend_end_time = 0 + # update stats with current page-load-time + # only the HTTP request + engine.stats['page_load_time'] += time() - time_before_request + engine.stats['page_load_count'] += 1 + + # is there a timeout (no parsing in this case) + timeout_overhead = 0.2 # seconds + search_duration = time() - request_params['started'] + if search_duration > timeout_limit + timeout_overhead: + logger.exception('engine timeout on HTTP request:' + '{0} (search duration : {1} ms, time-out: {2} )' + .format(engine.name, search_duration, timeout_limit)) + with threading.RLock(): + engine.stats['errors'] += 1 + return False + + # everything is ok : return the response + return response + except: # increase errors stats with threading.RLock(): @@ -51,20 +90,62 @@ def search_request_wrapper(fn, url, engine_name, **kwargs): engine.suspend_end_time = time() + min(60, engine.continuous_errors) # print engine name and specific error message - logger.exception('engine crash: {0}'.format(engine_name)) - return ret + logger.exception('engine crash: {0}'.format(engine.name)) + return False -def threaded_requests(requests): - timeout_limit = max(r[2]['timeout'] for r in requests) - search_start = time() +def search_one_request(engine_name, query, request_params, result_container, timeout_limit): + engine = engines[engine_name] + + # update request parameters dependent on + # search-engine (contained in engines folder) + engine.request(query, request_params) + + # TODO add support of offline engines + if request_params['url'] is None: + return False + + # ignoring empty urls + if not request_params['url']: + return False + + # send request + response = send_http_request(engine, request_params, timeout_limit) + + # parse response + success = None + if response: + # parse the response + response.search_params = request_params + search_results = engine.response(response) + + # add results + for result in search_results: + result['engine'] = engine.name + + result_container.extend(engine.name, search_results) + + success = True + else: + success = False + + with threading.RLock(): + # update stats : total time + engine.stats['engine_time'] += time() - request_params['started'] + engine.stats['engine_time_count'] += 1 + + # + return success + + +def search_multiple_requests(requests, result_container, timeout_limit): + start_time = time() search_id = uuid4().__str__() - for fn, url, request_args, engine_name in requests: - request_args['timeout'] = timeout_limit + + for engine_name, query, request_params in requests: th = threading.Thread( - target=search_request_wrapper, - args=(fn, url, engine_name), - kwargs=request_args, + target=search_one_request, + args=(engine_name, query, request_params, result_container, timeout_limit), name=search_id, ) th._engine_name = engine_name @@ -72,7 +153,7 @@ def threaded_requests(requests): for th in threading.enumerate(): if th.name == search_id: - remaining_time = max(0.0, timeout_limit - (time() - search_start)) + remaining_time = max(0.0, timeout_limit - (time() - start_time)) th.join(remaining_time) if th.isAlive(): logger.warning('engine timeout: {0}'.format(th._engine_name)) @@ -90,44 +171,6 @@ def default_request_params(): } -# create a callback wrapper for the search engine results -def make_callback(engine_name, callback, params, result_container): - - # creating a callback wrapper for the search engine results - def process_callback(response, **kwargs): - # check if redirect comparing to the True value, - # because resp can be a Mock object, and any attribut name returns something. - if response.is_redirect is True: - logger.debug('{0} redirect on: {1}'.format(engine_name, response)) - return - - response.search_params = params - - search_duration = time() - params['started'] - # update stats with current page-load-time - with threading.RLock(): - engines[engine_name].stats['page_load_time'] += search_duration - - timeout_overhead = 0.2 # seconds - timeout_limit = engines[engine_name].timeout + timeout_overhead - - if search_duration > timeout_limit: - with threading.RLock(): - engines[engine_name].stats['errors'] += 1 - return - - # callback - search_results = callback(response) - - # add results - for result in search_results: - result['engine'] = engine_name - - result_container.extend(engine_name, search_results) - - return process_callback - - def get_search_query_from_webapp(preferences, form): query = None query_engines = [] @@ -254,6 +297,9 @@ class Search(object): def search(self): global number_of_searches + # start time + start_time = time() + # init vars requests = [] @@ -266,6 +312,9 @@ class Search(object): search_query = self.search_query + # max of all selected engine timeout + timeout_limit = 0 + # start search-reqest for all selected engines for selected_engine in search_query.engines: if selected_engine['name'] not in engines: @@ -294,7 +343,7 @@ class Search(object): request_params = default_request_params() request_params['headers']['User-Agent'] = user_agent request_params['category'] = selected_engine['category'] - request_params['started'] = time() + request_params['started'] = start_time request_params['pageno'] = search_query.pageno if hasattr(engine, 'language') and engine.language: @@ -306,52 +355,16 @@ class Search(object): request_params['safesearch'] = search_query.safesearch request_params['time_range'] = search_query.time_range - # update request parameters dependent on - # search-engine (contained in engines folder) - engine.request(search_query.query.encode('utf-8'), request_params) - - if request_params['url'] is None: - # TODO add support of offline engines - pass - - # create a callback wrapper for the search engine results - callback = make_callback( - selected_engine['name'], - engine.response, - request_params, - self.result_container) - - # create dictionary which contain all - # informations about the request - request_args = dict( - headers=request_params['headers'], - hooks=dict(response=callback), - cookies=request_params['cookies'], - timeout=engine.timeout, - verify=request_params['verify'] - ) - - # specific type of request (GET or POST) - if request_params['method'] == 'GET': - req = requests_lib.get - else: - req = requests_lib.post - request_args['data'] = request_params['data'] - - # ignoring empty urls - if not request_params['url']: - continue - # append request to list - requests.append((req, request_params['url'], - request_args, - selected_engine['name'])) + requests.append((selected_engine['name'], search_query.query.encode('utf-8'), request_params)) - if not requests: - return self.result_container - # send all search-request - threaded_requests(requests) - start_new_thread(gc.collect, tuple()) + # update timeout_limit + timeout_limit = max(timeout_limit, engine.timeout) + + if requests: + # send all search-request + search_multiple_requests(requests, self.result_container, timeout_limit - (time() - start_time)) + start_new_thread(gc.collect, tuple()) # return results, suggestions, answers and infoboxes return self.result_container diff --git a/searx/webapp.py b/searx/webapp.py index d3d5bb51..bd61a1a6 100644 --- a/searx/webapp.py +++ b/searx/webapp.py @@ -593,6 +593,8 @@ def preferences(): if e.timeout > settings['outgoing']['request_timeout']: stats[e.name]['warn_timeout'] = True + # get first element [0], the engine time, + # and then the second element [1] : the time (the first one is the label) for engine_stat in get_engines_stats()[0][1]: stats[engine_stat.get('name')]['time'] = round(engine_stat.get('avg'), 3) if engine_stat.get('avg') > settings['outgoing']['request_timeout']: