1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2025-03-10 05:34:36 +00:00

Catch MemoryError in PE's block matching algo

fixes #264 (hopefully)
This commit is contained in:
Virgil Dupras 2014-10-05 17:13:36 -04:00
parent 44266273bf
commit 372a682610
3 changed files with 95 additions and 62 deletions

View File

@ -10,7 +10,7 @@ import logging
import multiprocessing import multiprocessing
from itertools import combinations from itertools import combinations
from hscommon.util import extract from hscommon.util import extract, iterconsume
from hscommon.trans import tr from hscommon.trans import tr
from hscommon.jobprogress import job from hscommon.jobprogress import job
@ -175,25 +175,34 @@ def getmatches(pictures, cache_path, threshold=75, match_scaled=False, j=job.nul
comparisons_to_do = list(combinations(chunks + [None], 2)) comparisons_to_do = list(combinations(chunks + [None], 2))
comparison_count = 0 comparison_count = 0
j.start_job(len(comparisons_to_do)) j.start_job(len(comparisons_to_do))
for ref_chunk, other_chunk in comparisons_to_do: try:
picinfo = {p.cache_id: get_picinfo(p) for p in ref_chunk} for ref_chunk, other_chunk in comparisons_to_do:
ref_ids = [p.cache_id for p in ref_chunk] picinfo = {p.cache_id: get_picinfo(p) for p in ref_chunk}
if other_chunk is not None: ref_ids = [p.cache_id for p in ref_chunk]
other_ids = [p.cache_id for p in other_chunk] if other_chunk is not None:
picinfo.update({p.cache_id: get_picinfo(p) for p in other_chunk}) other_ids = [p.cache_id for p in other_chunk]
else: picinfo.update({p.cache_id: get_picinfo(p) for p in other_chunk})
other_ids = None else:
args = (ref_ids, other_ids, cache_path, threshold, picinfo) other_ids = None
async_results.append(pool.apply_async(async_compare, args)) args = (ref_ids, other_ids, cache_path, threshold, picinfo)
collect_results() async_results.append(pool.apply_async(async_compare, args))
collect_results(collect_all=True) collect_results()
collect_results(collect_all=True)
except MemoryError:
# Rare, but possible, even in 64bit situations (ref #264). What do we do now? We free us
# some wiggle room, log about the incident, and stop matching right here. We then process
# the matches we have. The rest of the process doesn't allocate much and we should be
# alright.
del matches[-1000:] # some wiggle room to ensure we don't run out of memory again.
logging.warning("Ran out of memory when scanning! We had %d matches.", len(matches) + 1000)
pool.close() pool.close()
result = [] result = []
myiter = j.iter_with_progress( myiter = j.iter_with_progress(
matches, iterconsume(matches),
tr("Verified %d/%d matches"), tr("Verified %d/%d matches"),
every=10 every=10,
count=len(matches),
) )
for ref_id, other_id, percentage in myiter: for ref_id, other_id, percentage in myiter:
ref = id2picture[ref_id] ref = id2picture[ref_id]

View File

@ -2,14 +2,14 @@
# Created On: 2004/12/20 # Created On: 2004/12/20
# Copyright 2011 Hardcoded Software (http://www.hardcoded.net) # Copyright 2011 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file, # This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at # which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license # http://www.hardcoded.net/licenses/bsd_license
class JobCancelled(Exception): class JobCancelled(Exception):
"The user has cancelled the job" "The user has cancelled the job"
class JobInProgressError(Exception): class JobInProgressError(Exception):
"A job is already being performed, you can't perform more than one at the same time." "A job is already being performed, you can't perform more than one at the same time."
class JobCountError(Exception): class JobCountError(Exception):
@ -17,7 +17,7 @@ class JobCountError(Exception):
class Job: class Job:
"""Manages a job's progression and return it's progression through a callback. """Manages a job's progression and return it's progression through a callback.
Note that this class is not foolproof. For example, you could call Note that this class is not foolproof. For example, you could call
start_subjob, and then call add_progress from the parent job, and nothing start_subjob, and then call add_progress from the parent job, and nothing
would stop you from doing it. However, it would mess your progression would stop you from doing it. However, it would mess your progression
@ -48,17 +48,17 @@ class Job:
self._passed_jobs = 0 self._passed_jobs = 0
self._progress = 0 self._progress = 0
self._currmax = 1 self._currmax = 1
#---Private #---Private
def _subjob_callback(self, progress, desc=''): def _subjob_callback(self, progress, desc=''):
"""This is the callback passed to children jobs. """This is the callback passed to children jobs.
""" """
self.set_progress(progress, desc) self.set_progress(progress, desc)
return True #if JobCancelled has to be raised, it will be at the highest level return True #if JobCancelled has to be raised, it will be at the highest level
def _do_update(self, desc): def _do_update(self, desc):
"""Calls the callback function with a % progress as a parameter. """Calls the callback function with a % progress as a parameter.
The parameter is a int in the 0-100 range. The parameter is a int in the 0-100 range.
""" """
if self._current_job: if self._current_job:
@ -72,31 +72,37 @@ class Job:
result = self._callback(progress, desc) if desc else self._callback(progress) result = self._callback(progress, desc) if desc else self._callback(progress)
if not result: if not result:
raise JobCancelled() raise JobCancelled()
#---Public #---Public
def add_progress(self, progress=1, desc=''): def add_progress(self, progress=1, desc=''):
self.set_progress(self._progress + progress, desc) self.set_progress(self._progress + progress, desc)
def check_if_cancelled(self): def check_if_cancelled(self):
self._do_update('') self._do_update('')
def iter_with_progress(self, sequence, desc_format=None, every=1): def iter_with_progress(self, iterable, desc_format=None, every=1, count=None):
''' Iterate through sequence while automatically adding progress. """Iterate through ``iterable`` while automatically adding progress.
'''
WARNING: We need our iterable's length. If ``iterable`` is not a sequence (that is,
something we can call ``len()`` on), you *have* to specify a count through the ``count``
argument. If ``count`` is ``None``, ``len(iterable)`` is used.
"""
if count is None:
count = len(iterable)
desc = '' desc = ''
if desc_format: if desc_format:
desc = desc_format % (0, len(sequence)) desc = desc_format % (0, count)
self.start_job(len(sequence), desc) self.start_job(count, desc)
for i, element in enumerate(sequence, start=1): for i, element in enumerate(iterable, start=1):
yield element yield element
if i % every == 0: if i % every == 0:
if desc_format: if desc_format:
desc = desc_format % (i, len(sequence)) desc = desc_format % (i, count)
self.add_progress(progress=every, desc=desc) self.add_progress(progress=every, desc=desc)
if desc_format: if desc_format:
desc = desc_format % (len(sequence), len(sequence)) desc = desc_format % (count, count)
self.set_progress(100, desc) self.set_progress(100, desc)
def start_job(self, max_progress=100, desc=''): def start_job(self, max_progress=100, desc=''):
"""Begin work on the next job. You must not call start_job more than """Begin work on the next job. You must not call start_job more than
'jobcount' (in __init__) times. 'jobcount' (in __init__) times.
@ -111,7 +117,7 @@ class Job:
self._progress = 0 self._progress = 0
self._currmax = max(1, max_progress) self._currmax = max(1, max_progress)
self._do_update(desc) self._do_update(desc)
def start_subjob(self, job_proportions, desc=''): def start_subjob(self, job_proportions, desc=''):
"""Starts a sub job. Use this when you want to split a job into """Starts a sub job. Use this when you want to split a job into
multiple smaller jobs. Pretty handy when starting a process where you multiple smaller jobs. Pretty handy when starting a process where you
@ -121,7 +127,7 @@ class Job:
""" """
self.start_job(100, desc) self.start_job(100, desc)
return Job(job_proportions, self._subjob_callback) return Job(job_proportions, self._subjob_callback)
def set_progress(self, progress, desc=''): def set_progress(self, progress, desc=''):
"""Sets the progress of the current job to 'progress', and call the """Sets the progress of the current job to 'progress', and call the
callback callback
@ -132,29 +138,29 @@ class Job:
if self._progress < 0: if self._progress < 0:
self._progress = 0 self._progress = 0
self._do_update(desc) self._do_update(desc)
class NullJob: class NullJob:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
pass pass
def add_progress(self, *args, **kwargs): def add_progress(self, *args, **kwargs):
pass pass
def check_if_cancelled(self): def check_if_cancelled(self):
pass pass
def iter_with_progress(self, sequence, *args, **kwargs): def iter_with_progress(self, sequence, *args, **kwargs):
return iter(sequence) return iter(sequence)
def start_job(self, *args, **kwargs): def start_job(self, *args, **kwargs):
pass pass
def start_subjob(self, *args, **kwargs): def start_subjob(self, *args, **kwargs):
return NullJob() return NullJob()
def set_progress(self, *args, **kwargs): def set_progress(self, *args, **kwargs):
pass pass
nulljob = NullJob() nulljob = NullJob()

View File

@ -1,9 +1,9 @@
# Created By: Virgil Dupras # Created By: Virgil Dupras
# Created On: 2011-01-11 # Created On: 2011-01-11
# Copyright 2014 Hardcoded Software (http://www.hardcoded.net) # Copyright 2014 Hardcoded Software (http://www.hardcoded.net)
# #
# This software is licensed under the "BSD" License as described in the "LICENSE" file, # This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at # which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license # http://www.hardcoded.net/licenses/bsd_license
import sys import sys
@ -42,7 +42,7 @@ def minmax(value, min_value, max_value):
def dedupe(iterable): def dedupe(iterable):
"""Returns a list of elements in ``iterable`` with all dupes removed. """Returns a list of elements in ``iterable`` with all dupes removed.
The order of the elements is preserved. The order of the elements is preserved.
""" """
result = [] result = []
@ -56,7 +56,7 @@ def dedupe(iterable):
def flatten(iterables, start_with=None): def flatten(iterables, start_with=None):
"""Takes a list of lists ``iterables`` and returns a list containing elements of every list. """Takes a list of lists ``iterables`` and returns a list containing elements of every list.
If ``start_with`` is not ``None``, the result will start with ``start_with`` items, exactly as If ``start_with`` is not ``None``, the result will start with ``start_with`` items, exactly as
if ``start_with`` would be the first item of lists. if ``start_with`` would be the first item of lists.
""" """
@ -104,7 +104,7 @@ def allsame(iterable):
def trailiter(iterable, skipfirst=False): def trailiter(iterable, skipfirst=False):
"""Yields (prev_element, element), starting with (None, first_element). """Yields (prev_element, element), starting with (None, first_element).
If skipfirst is True, there will be no (None, item1) element and we'll start If skipfirst is True, there will be no (None, item1) element and we'll start
directly with (item1, item2). directly with (item1, item2).
""" """
@ -117,6 +117,24 @@ def trailiter(iterable, skipfirst=False):
yield prev, item yield prev, item
prev = item prev = item
def iterconsume(seq):
"""Iterate over ``seq`` and discard yielded objects.
Right after the ``yield``, we replace the element we've just yielded by ``None`` in the
sequence.
This is useful in tight memory situation where you are looping over a sequence of objects that
are going to be discarded afterwards. If you're creating other objects during that iteration
you might want to use this to avoid ``MemoryError``.
Note that this only works for sequence (index accessible), not all iterables.
"""
# We don't use ``del``, because it would be disastrous performance-wise as the array would have
# to be constantly re-allocated.
for index, elem in enumerate(seq):
seq[index] = None
yield elem
#--- String related #--- String related
def escape(s, to_escape, escape_with='\\'): def escape(s, to_escape, escape_with='\\'):
@ -144,7 +162,7 @@ def rem_file_ext(filename):
def pluralize(number, word, decimals=0, plural_word=None): def pluralize(number, word, decimals=0, plural_word=None):
"""Returns a pluralized string with ``number`` in front of ``word``. """Returns a pluralized string with ``number`` in front of ``word``.
Adds a 's' to s if ``number`` > 1. Adds a 's' to s if ``number`` > 1.
``number``: The number to go in front of s ``number``: The number to go in front of s
``word``: The word to go after number ``word``: The word to go after number
@ -162,7 +180,7 @@ def pluralize(number, word, decimals=0, plural_word=None):
def format_time(seconds, with_hours=True): def format_time(seconds, with_hours=True):
"""Transforms seconds in a hh:mm:ss string. """Transforms seconds in a hh:mm:ss string.
If ``with_hours`` if false, the format is mm:ss. If ``with_hours`` if false, the format is mm:ss.
""" """
minus = seconds < 0 minus = seconds < 0
@ -202,14 +220,14 @@ SIZE_DESC = ('B','KB','MB','GB','TB','PB','EB','ZB','YB')
SIZE_VALS = tuple(1024 ** i for i in range(1,9)) SIZE_VALS = tuple(1024 ** i for i in range(1,9))
def format_size(size, decimal=0, forcepower=-1, showdesc=True): def format_size(size, decimal=0, forcepower=-1, showdesc=True):
"""Transform a byte count in a formatted string (KB, MB etc..). """Transform a byte count in a formatted string (KB, MB etc..).
``size`` is the number of bytes to format. ``size`` is the number of bytes to format.
``decimal`` is the number digits after the dot. ``decimal`` is the number digits after the dot.
``forcepower`` is the desired suffix. 0 is B, 1 is KB, 2 is MB etc.. if kept at -1, the suffix ``forcepower`` is the desired suffix. 0 is B, 1 is KB, 2 is MB etc.. if kept at -1, the suffix
will be automatically chosen (so the resulting number is always below 1024). will be automatically chosen (so the resulting number is always below 1024).
if ``showdesc`` is ``True``, the suffix will be shown after the number. if ``showdesc`` is ``True``, the suffix will be shown after the number.
Usage example:: Usage example::
>>> format_size(1234, decimal=2, showdesc=True) >>> format_size(1234, decimal=2, showdesc=True)
'1.21 KB' '1.21 KB'
""" """
@ -283,7 +301,7 @@ def iterdaterange(start, end):
@pathify @pathify
def modified_after(first_path: Path, second_path: Path): def modified_after(first_path: Path, second_path: Path):
"""Returns ``True`` if first_path's mtime is higher than second_path's mtime. """Returns ``True`` if first_path's mtime is higher than second_path's mtime.
If one of the files doesn't exist or is ``None``, it is considered "never modified". If one of the files doesn't exist or is ``None``, it is considered "never modified".
""" """
try: try:
@ -326,11 +344,11 @@ def delete_if_empty(path: Path, files_to_delete=[]):
def open_if_filename(infile, mode='rb'): def open_if_filename(infile, mode='rb'):
"""If ``infile`` is a string, it opens and returns it. If it's already a file object, it simply returns it. """If ``infile`` is a string, it opens and returns it. If it's already a file object, it simply returns it.
This function returns ``(file, should_close_flag)``. The should_close_flag is True is a file has This function returns ``(file, should_close_flag)``. The should_close_flag is True is a file has
effectively been opened (if we already pass a file object, we assume that the responsibility for effectively been opened (if we already pass a file object, we assume that the responsibility for
closing the file has already been taken). Example usage:: closing the file has already been taken). Example usage::
fp, shouldclose = open_if_filename(infile) fp, shouldclose = open_if_filename(infile)
dostuff() dostuff()
if shouldclose: if shouldclose:
@ -370,9 +388,9 @@ def delete_files_with_pattern(folder_path, pattern, recursive=True):
class FileOrPath: class FileOrPath:
"""Does the same as :func:`open_if_filename`, but it can be used with a ``with`` statement. """Does the same as :func:`open_if_filename`, but it can be used with a ``with`` statement.
Example:: Example::
with FileOrPath(infile): with FileOrPath(infile):
dostuff() dostuff()
""" """
@ -381,12 +399,12 @@ class FileOrPath:
self.mode = mode self.mode = mode
self.mustclose = False self.mustclose = False
self.fp = None self.fp = None
def __enter__(self): def __enter__(self):
self.fp, self.mustclose = open_if_filename(self.file_or_path, self.mode) self.fp, self.mustclose = open_if_filename(self.file_or_path, self.mode)
return self.fp return self.fp
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
if self.fp and self.mustclose: if self.fp and self.mustclose:
self.fp.close() self.fp.close()