[#58 state:fixed] Moved the async results collection into the same loops as the async filler phase to avoid getting memory errors.

--HG--
extra : convert_revision : svn%3Ac306627e-7827-47d3-bdf0-9a457c9553a1/trunk%40174
This commit is contained in:
hsoft 2009-10-03 15:37:53 +00:00
parent b7acc79165
commit 2a6124eacd
1 changed files with 12 additions and 7 deletions

View File

@ -21,6 +21,11 @@ from .cache import Cache
MIN_ITERATIONS = 3
# Enough so that we're sure that the main thread will not wait after a result.get() call
# cpucount*2 should be enough to be sure that the spawned process will not wait after the results
# collection made by the main process.
RESULTS_QUEUE_LIMIT = multiprocessing.cpu_count() * 2
def get_match(first,second,percentage):
if percentage < 0:
percentage = 0
@ -40,7 +45,7 @@ class MatchFactory(object):
# there is enough memory left to carry on the operation because it is assumed that the
# MemoryError happens when trying to read an image file, which is freed from memory by the
# time that MemoryError is raised.
j = j.start_subjob([2, 8])
j = j.start_subjob([3, 7])
logging.info('Preparing %d files' % len(files))
prepared = self.prepare_files(files, j)
logging.info('Finished preparing %d files' % len(prepared))
@ -94,7 +99,7 @@ class AsyncMatchFactory(MatchFactory):
except Empty:
pass
j = j.start_subjob([1, 8, 1], 'Preparing for matching')
j = j.start_subjob([9, 1], 'Preparing for matching')
cache = self.cached_blocks
id2picture = {}
dimensions2pictures = defaultdict(set)
@ -109,18 +114,18 @@ class AsyncMatchFactory(MatchFactory):
pictures = [p for p in pictures if hasattr(p, 'cache_id')]
pool = multiprocessing.Pool()
async_results = []
matches = []
pictures_copy = set(pictures)
for ref in j.iter_with_progress(pictures):
for ref in j.iter_with_progress(pictures, 'Matched %d/%d pictures'):
others = pictures_copy if self.match_scaled else dimensions2pictures[ref.dimensions]
others.remove(ref)
if others:
cache_ids = [f.cache_id for f in others]
args = (ref.cache_id, cache_ids, self.cached_blocks.dbname, self.threshold)
async_results.append(pool.apply_async(async_compare, args))
matches = []
for result in j.iter_with_progress(async_results, 'Matched %d/%d pictures'):
matches.extend(result.get())
if len(async_results) > RESULTS_QUEUE_LIMIT:
result = async_results.pop(0)
matches.extend(result.get())
result = []
for ref_id, other_id, percentage in j.iter_with_progress(matches, 'Verified %d/%d matches', every=10):