Tweaked dgpe's matching to work better with huge scans.

This commit is contained in:
Virgil Dupras 2010-04-15 10:38:30 +02:00
parent d1a7f51859
commit 73f1bb6968
1 changed files with 6 additions and 13 deletions

View File

@ -8,11 +8,9 @@
import logging import logging
import multiprocessing import multiprocessing
from Queue import Empty from collections import defaultdict, deque
from collections import defaultdict
from hsutil import job from hsutil import job
from hsutil.misc import dedupe
from core.engine import Match from core.engine import Match
from .block import avgdiff, DifferentBlockCountError, NoBlocksError from .block import avgdiff, DifferentBlockCountError, NoBlocksError
@ -76,13 +74,6 @@ def async_compare(ref_id, other_ids, dbname, threshold):
return results return results
def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nulljob): def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nulljob):
def empty_out_queue(queue, into):
try:
while True:
into.append(queue.get(block=False))
except Empty:
pass
j = j.start_subjob([3, 7]) j = j.start_subjob([3, 7])
pictures = prepare_pictures(pictures, cache_path, j) pictures = prepare_pictures(pictures, cache_path, j)
j = j.start_subjob([9, 1], 'Preparing for matching') j = j.start_subjob([9, 1], 'Preparing for matching')
@ -100,7 +91,7 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul
cache.close() cache.close()
pictures = [p for p in pictures if hasattr(p, 'cache_id')] pictures = [p for p in pictures if hasattr(p, 'cache_id')]
pool = multiprocessing.Pool() pool = multiprocessing.Pool()
async_results = [] async_results = deque()
matches = [] matches = []
pictures_copy = set(pictures) pictures_copy = set(pictures)
for ref in j.iter_with_progress(pictures, 'Matched %d/%d pictures'): for ref in j.iter_with_progress(pictures, 'Matched %d/%d pictures'):
@ -118,8 +109,10 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul
args = (ref.cache_id, cache_ids[:ARG_LIMIT], cache_path, threshold) args = (ref.cache_id, cache_ids[:ARG_LIMIT], cache_path, threshold)
async_results.append(pool.apply_async(async_compare, args)) async_results.append(pool.apply_async(async_compare, args))
cache_ids = cache_ids[ARG_LIMIT:] cache_ids = cache_ids[ARG_LIMIT:]
if len(async_results) > RESULTS_QUEUE_LIMIT: # We use a while here because it's possible that more than one result has been added if
result = async_results.pop(0) # ARG_LIMIT has been reached.
while len(async_results) > RESULTS_QUEUE_LIMIT:
result = async_results.popleft()
matches.extend(result.get()) matches.extend(result.get())
for result in async_results: # process the rest of the results for result in async_results: # process the rest of the results
matches.extend(result.get()) matches.extend(result.get())