diff --git a/pe/py/matchbase.py b/pe/py/matchbase.py index d1f208d9..34966a50 100644 --- a/pe/py/matchbase.py +++ b/pe/py/matchbase.py @@ -21,6 +21,11 @@ from .cache import Cache MIN_ITERATIONS = 3 +# Enough so that we're sure that the main thread will not wait after a result.get() call +# cpucount*2 should be enough to be sure that the spawned process will not wait after the results +# collection made by the main process. +RESULTS_QUEUE_LIMIT = multiprocessing.cpu_count() * 2 + def get_match(first,second,percentage): if percentage < 0: percentage = 0 @@ -40,7 +45,7 @@ class MatchFactory(object): # there is enough memory left to carry on the operation because it is assumed that the # MemoryError happens when trying to read an image file, which is freed from memory by the # time that MemoryError is raised. - j = j.start_subjob([2, 8]) + j = j.start_subjob([3, 7]) logging.info('Preparing %d files' % len(files)) prepared = self.prepare_files(files, j) logging.info('Finished preparing %d files' % len(prepared)) @@ -94,7 +99,7 @@ class AsyncMatchFactory(MatchFactory): except Empty: pass - j = j.start_subjob([1, 8, 1], 'Preparing for matching') + j = j.start_subjob([9, 1], 'Preparing for matching') cache = self.cached_blocks id2picture = {} dimensions2pictures = defaultdict(set) @@ -109,18 +114,18 @@ class AsyncMatchFactory(MatchFactory): pictures = [p for p in pictures if hasattr(p, 'cache_id')] pool = multiprocessing.Pool() async_results = [] + matches = [] pictures_copy = set(pictures) - for ref in j.iter_with_progress(pictures): + for ref in j.iter_with_progress(pictures, 'Matched %d/%d pictures'): others = pictures_copy if self.match_scaled else dimensions2pictures[ref.dimensions] others.remove(ref) if others: cache_ids = [f.cache_id for f in others] args = (ref.cache_id, cache_ids, self.cached_blocks.dbname, self.threshold) async_results.append(pool.apply_async(async_compare, args)) - - matches = [] - for result in j.iter_with_progress(async_results, 'Matched %d/%d pictures'): - matches.extend(result.get()) + if len(async_results) > RESULTS_QUEUE_LIMIT: + result = async_results.pop(0) + matches.extend(result.get()) result = [] for ref_id, other_id, percentage in j.iter_with_progress(matches, 'Verified %d/%d matches', every=10):