From 73f1bb696842444c9d7aa5339616025f3507c2a8 Mon Sep 17 00:00:00 2001 From: Virgil Dupras Date: Thu, 15 Apr 2010 10:38:30 +0200 Subject: [PATCH] Tweaked dgpe's matching to work better with huge scans. --- core_pe/matchbase.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/core_pe/matchbase.py b/core_pe/matchbase.py index 7d7c7c38..3bc32d1b 100644 --- a/core_pe/matchbase.py +++ b/core_pe/matchbase.py @@ -8,11 +8,9 @@ import logging import multiprocessing -from Queue import Empty -from collections import defaultdict +from collections import defaultdict, deque from hsutil import job -from hsutil.misc import dedupe from core.engine import Match from .block import avgdiff, DifferentBlockCountError, NoBlocksError @@ -76,13 +74,6 @@ def async_compare(ref_id, other_ids, dbname, threshold): return results def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nulljob): - def empty_out_queue(queue, into): - try: - while True: - into.append(queue.get(block=False)) - except Empty: - pass - j = j.start_subjob([3, 7]) pictures = prepare_pictures(pictures, cache_path, j) j = j.start_subjob([9, 1], 'Preparing for matching') @@ -100,7 +91,7 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul cache.close() pictures = [p for p in pictures if hasattr(p, 'cache_id')] pool = multiprocessing.Pool() - async_results = [] + async_results = deque() matches = [] pictures_copy = set(pictures) for ref in j.iter_with_progress(pictures, 'Matched %d/%d pictures'): @@ -118,8 +109,10 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul args = (ref.cache_id, cache_ids[:ARG_LIMIT], cache_path, threshold) async_results.append(pool.apply_async(async_compare, args)) cache_ids = cache_ids[ARG_LIMIT:] - if len(async_results) > RESULTS_QUEUE_LIMIT: - result = async_results.pop(0) + # We use a while here because it's possible that more than one result has been added if + # ARG_LIMIT has been reached. + while len(async_results) > RESULTS_QUEUE_LIMIT: + result = async_results.popleft() matches.extend(result.get()) for result in async_results: # process the rest of the results matches.extend(result.get())