diff --git a/core_pe/matchblock.py b/core_pe/matchblock.py index b2ec7625..efb3b306 100644 --- a/core_pe/matchblock.py +++ b/core_pe/matchblock.py @@ -10,7 +10,7 @@ import logging import multiprocessing from itertools import combinations -from hscommon.util import extract +from hscommon.util import extract, iterconsume from hscommon.trans import tr from hscommon.jobprogress import job @@ -175,25 +175,34 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul comparisons_to_do = list(combinations(chunks + [None], 2)) comparison_count = 0 j.start_job(len(comparisons_to_do)) - for ref_chunk, other_chunk in comparisons_to_do: - picinfo = {p.cache_id: get_picinfo(p) for p in ref_chunk} - ref_ids = [p.cache_id for p in ref_chunk] - if other_chunk is not None: - other_ids = [p.cache_id for p in other_chunk] - picinfo.update({p.cache_id: get_picinfo(p) for p in other_chunk}) - else: - other_ids = None - args = (ref_ids, other_ids, cache_path, threshold, picinfo) - async_results.append(pool.apply_async(async_compare, args)) - collect_results() - collect_results(collect_all=True) + try: + for ref_chunk, other_chunk in comparisons_to_do: + picinfo = {p.cache_id: get_picinfo(p) for p in ref_chunk} + ref_ids = [p.cache_id for p in ref_chunk] + if other_chunk is not None: + other_ids = [p.cache_id for p in other_chunk] + picinfo.update({p.cache_id: get_picinfo(p) for p in other_chunk}) + else: + other_ids = None + args = (ref_ids, other_ids, cache_path, threshold, picinfo) + async_results.append(pool.apply_async(async_compare, args)) + collect_results() + collect_results(collect_all=True) + except MemoryError: + # Rare, but possible, even in 64bit situations (ref #264). What do we do now? We free us + # some wiggle room, log about the incident, and stop matching right here. We then process + # the matches we have. The rest of the process doesn't allocate much and we should be + # alright. + del matches[-1000:] # some wiggle room to ensure we don't run out of memory again. + logging.warning("Ran out of memory when scanning! We had %d matches.", len(matches) + 1000) pool.close() result = [] myiter = j.iter_with_progress( - matches, + iterconsume(matches), tr("Verified %d/%d matches"), - every=10 + every=10, + count=len(matches), ) for ref_id, other_id, percentage in myiter: ref = id2picture[ref_id] diff --git a/hscommon/jobprogress/job.py b/hscommon/jobprogress/job.py index 6bf5a41e..7ff49313 100644 --- a/hscommon/jobprogress/job.py +++ b/hscommon/jobprogress/job.py @@ -2,14 +2,14 @@ # Created On: 2004/12/20 # Copyright 2011 Hardcoded Software (http://www.hardcoded.net) -# This software is licensed under the "BSD" License as described in the "LICENSE" file, -# which should be included with this package. The terms are also available at +# This software is licensed under the "BSD" License as described in the "LICENSE" file, +# which should be included with this package. The terms are also available at # http://www.hardcoded.net/licenses/bsd_license class JobCancelled(Exception): "The user has cancelled the job" -class JobInProgressError(Exception): +class JobInProgressError(Exception): "A job is already being performed, you can't perform more than one at the same time." class JobCountError(Exception): @@ -17,7 +17,7 @@ class JobCountError(Exception): class Job: """Manages a job's progression and return it's progression through a callback. - + Note that this class is not foolproof. For example, you could call start_subjob, and then call add_progress from the parent job, and nothing would stop you from doing it. However, it would mess your progression @@ -48,17 +48,17 @@ class Job: self._passed_jobs = 0 self._progress = 0 self._currmax = 1 - + #---Private def _subjob_callback(self, progress, desc=''): """This is the callback passed to children jobs. """ self.set_progress(progress, desc) return True #if JobCancelled has to be raised, it will be at the highest level - + def _do_update(self, desc): """Calls the callback function with a % progress as a parameter. - + The parameter is a int in the 0-100 range. """ if self._current_job: @@ -72,31 +72,37 @@ class Job: result = self._callback(progress, desc) if desc else self._callback(progress) if not result: raise JobCancelled() - + #---Public def add_progress(self, progress=1, desc=''): self.set_progress(self._progress + progress, desc) - + def check_if_cancelled(self): self._do_update('') - - def iter_with_progress(self, sequence, desc_format=None, every=1): - ''' Iterate through sequence while automatically adding progress. - ''' + + def iter_with_progress(self, iterable, desc_format=None, every=1, count=None): + """Iterate through ``iterable`` while automatically adding progress. + + WARNING: We need our iterable's length. If ``iterable`` is not a sequence (that is, + something we can call ``len()`` on), you *have* to specify a count through the ``count`` + argument. If ``count`` is ``None``, ``len(iterable)`` is used. + """ + if count is None: + count = len(iterable) desc = '' if desc_format: - desc = desc_format % (0, len(sequence)) - self.start_job(len(sequence), desc) - for i, element in enumerate(sequence, start=1): + desc = desc_format % (0, count) + self.start_job(count, desc) + for i, element in enumerate(iterable, start=1): yield element if i % every == 0: if desc_format: - desc = desc_format % (i, len(sequence)) + desc = desc_format % (i, count) self.add_progress(progress=every, desc=desc) if desc_format: - desc = desc_format % (len(sequence), len(sequence)) + desc = desc_format % (count, count) self.set_progress(100, desc) - + def start_job(self, max_progress=100, desc=''): """Begin work on the next job. You must not call start_job more than 'jobcount' (in __init__) times. @@ -111,7 +117,7 @@ class Job: self._progress = 0 self._currmax = max(1, max_progress) self._do_update(desc) - + def start_subjob(self, job_proportions, desc=''): """Starts a sub job. Use this when you want to split a job into multiple smaller jobs. Pretty handy when starting a process where you @@ -121,7 +127,7 @@ class Job: """ self.start_job(100, desc) return Job(job_proportions, self._subjob_callback) - + def set_progress(self, progress, desc=''): """Sets the progress of the current job to 'progress', and call the callback @@ -132,29 +138,29 @@ class Job: if self._progress < 0: self._progress = 0 self._do_update(desc) - + class NullJob: def __init__(self, *args, **kwargs): pass - + def add_progress(self, *args, **kwargs): pass - + def check_if_cancelled(self): pass - + def iter_with_progress(self, sequence, *args, **kwargs): return iter(sequence) - + def start_job(self, *args, **kwargs): pass - + def start_subjob(self, *args, **kwargs): return NullJob() - + def set_progress(self, *args, **kwargs): pass - + nulljob = NullJob() diff --git a/hscommon/util.py b/hscommon/util.py index 987f9c31..f3769146 100644 --- a/hscommon/util.py +++ b/hscommon/util.py @@ -1,9 +1,9 @@ # Created By: Virgil Dupras # Created On: 2011-01-11 # Copyright 2014 Hardcoded Software (http://www.hardcoded.net) -# -# This software is licensed under the "BSD" License as described in the "LICENSE" file, -# which should be included with this package. The terms are also available at +# +# This software is licensed under the "BSD" License as described in the "LICENSE" file, +# which should be included with this package. The terms are also available at # http://www.hardcoded.net/licenses/bsd_license import sys @@ -42,7 +42,7 @@ def minmax(value, min_value, max_value): def dedupe(iterable): """Returns a list of elements in ``iterable`` with all dupes removed. - + The order of the elements is preserved. """ result = [] @@ -56,7 +56,7 @@ def dedupe(iterable): def flatten(iterables, start_with=None): """Takes a list of lists ``iterables`` and returns a list containing elements of every list. - + If ``start_with`` is not ``None``, the result will start with ``start_with`` items, exactly as if ``start_with`` would be the first item of lists. """ @@ -104,7 +104,7 @@ def allsame(iterable): def trailiter(iterable, skipfirst=False): """Yields (prev_element, element), starting with (None, first_element). - + If skipfirst is True, there will be no (None, item1) element and we'll start directly with (item1, item2). """ @@ -117,6 +117,24 @@ def trailiter(iterable, skipfirst=False): yield prev, item prev = item +def iterconsume(seq): + """Iterate over ``seq`` and discard yielded objects. + + Right after the ``yield``, we replace the element we've just yielded by ``None`` in the + sequence. + + This is useful in tight memory situation where you are looping over a sequence of objects that + are going to be discarded afterwards. If you're creating other objects during that iteration + you might want to use this to avoid ``MemoryError``. + + Note that this only works for sequence (index accessible), not all iterables. + """ + # We don't use ``del``, because it would be disastrous performance-wise as the array would have + # to be constantly re-allocated. + for index, elem in enumerate(seq): + seq[index] = None + yield elem + #--- String related def escape(s, to_escape, escape_with='\\'): @@ -144,7 +162,7 @@ def rem_file_ext(filename): def pluralize(number, word, decimals=0, plural_word=None): """Returns a pluralized string with ``number`` in front of ``word``. - + Adds a 's' to s if ``number`` > 1. ``number``: The number to go in front of s ``word``: The word to go after number @@ -162,7 +180,7 @@ def pluralize(number, word, decimals=0, plural_word=None): def format_time(seconds, with_hours=True): """Transforms seconds in a hh:mm:ss string. - + If ``with_hours`` if false, the format is mm:ss. """ minus = seconds < 0 @@ -202,14 +220,14 @@ SIZE_DESC = ('B','KB','MB','GB','TB','PB','EB','ZB','YB') SIZE_VALS = tuple(1024 ** i for i in range(1,9)) def format_size(size, decimal=0, forcepower=-1, showdesc=True): """Transform a byte count in a formatted string (KB, MB etc..). - + ``size`` is the number of bytes to format. ``decimal`` is the number digits after the dot. ``forcepower`` is the desired suffix. 0 is B, 1 is KB, 2 is MB etc.. if kept at -1, the suffix will be automatically chosen (so the resulting number is always below 1024). if ``showdesc`` is ``True``, the suffix will be shown after the number. Usage example:: - + >>> format_size(1234, decimal=2, showdesc=True) '1.21 KB' """ @@ -283,7 +301,7 @@ def iterdaterange(start, end): @pathify def modified_after(first_path: Path, second_path: Path): """Returns ``True`` if first_path's mtime is higher than second_path's mtime. - + If one of the files doesn't exist or is ``None``, it is considered "never modified". """ try: @@ -326,11 +344,11 @@ def delete_if_empty(path: Path, files_to_delete=[]): def open_if_filename(infile, mode='rb'): """If ``infile`` is a string, it opens and returns it. If it's already a file object, it simply returns it. - + This function returns ``(file, should_close_flag)``. The should_close_flag is True is a file has effectively been opened (if we already pass a file object, we assume that the responsibility for closing the file has already been taken). Example usage:: - + fp, shouldclose = open_if_filename(infile) dostuff() if shouldclose: @@ -370,9 +388,9 @@ def delete_files_with_pattern(folder_path, pattern, recursive=True): class FileOrPath: """Does the same as :func:`open_if_filename`, but it can be used with a ``with`` statement. - + Example:: - + with FileOrPath(infile): dostuff() """ @@ -381,12 +399,12 @@ class FileOrPath: self.mode = mode self.mustclose = False self.fp = None - + def __enter__(self): self.fp, self.mustclose = open_if_filename(self.file_or_path, self.mode) return self.fp - + def __exit__(self, exc_type, exc_value, traceback): if self.fp and self.mustclose: self.fp.close() - +