1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2024-10-18 08:43:08 +00:00
dupeguru/core/results.py
Virgil Dupras 70e505ad92 Tweaked Make Selected into Reference.
Having dupes from ref folders (which makes ref switching impossible) would make
the new feature glitchy (selection would be emptied). Now, in cases where the action
results in nothing being changed, the selection stays intact. [#222]
2013-04-28 14:12:08 -04:00

367 lines
14 KiB
Python

# Created By: Virgil Dupras
# Created On: 2006/02/23
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import logging
import re
import os
import os.path as op
from xml.etree import ElementTree as ET
from jobprogress.job import nulljob
from hscommon.conflict import get_conflicted_name
from hscommon.util import flatten, nonone, FileOrPath, format_size
from hscommon.trans import tr
from . import engine
from .markable import Markable
class Results(Markable):
#---Override
def __init__(self, app):
Markable.__init__(self)
self.__groups = []
self.__group_of_duplicate = {}
self.__groups_sort_descriptor = None # This is a tuple (key, asc)
self.__dupes = None
self.__dupes_sort_descriptor = None # This is a tuple (key, asc, delta)
self.__filters = None
self.__filtered_dupes = None
self.__filtered_groups = None
self.__recalculate_stats()
self.__marked_size = 0
self.app = app
self.problems = [] # (dupe, error_msg)
self.is_modified = False
def _did_mark(self, dupe):
self.__marked_size += dupe.size
def _did_unmark(self, dupe):
self.__marked_size -= dupe.size
def _get_markable_count(self):
return self.__total_count
def _is_markable(self, dupe):
if dupe.is_ref:
return False
g = self.get_group_of_duplicate(dupe)
if not g:
return False
if dupe is g.ref:
return False
if self.__filtered_dupes and dupe not in self.__filtered_dupes:
return False
return True
def mark_all(self):
if self.__filters:
self.mark_multiple(self.__filtered_dupes)
else:
Markable.mark_all(self)
def mark_invert(self):
if self.__filters:
self.mark_toggle_multiple(self.__filtered_dupes)
else:
Markable.mark_invert(self)
def mark_none(self):
if self.__filters:
self.unmark_multiple(self.__filtered_dupes)
else:
Markable.mark_none(self)
#---Private
def __get_dupe_list(self):
if self.__dupes is None:
self.__dupes = flatten(group.dupes for group in self.groups)
if None in self.__dupes:
# This is debug logging to try to figure out #44
logging.warning("There is a None value in the Results' dupe list. dupes: %r groups: %r", self.__dupes, self.groups)
if self.__filtered_dupes:
self.__dupes = [dupe for dupe in self.__dupes if dupe in self.__filtered_dupes]
sd = self.__dupes_sort_descriptor
if sd:
self.sort_dupes(sd[0], sd[1], sd[2])
return self.__dupes
def __get_groups(self):
if self.__filtered_groups is None:
return self.__groups
else:
return self.__filtered_groups
def __get_stat_line(self):
if self.__filtered_dupes is None:
mark_count = self.mark_count
marked_size = self.__marked_size
total_count = self.__total_count
total_size = self.__total_size
else:
mark_count = len([dupe for dupe in self.__filtered_dupes if self.is_marked(dupe)])
marked_size = sum(dupe.size for dupe in self.__filtered_dupes if self.is_marked(dupe))
total_count = len([dupe for dupe in self.__filtered_dupes if self.is_markable(dupe)])
total_size = sum(dupe.size for dupe in self.__filtered_dupes if self.is_markable(dupe))
if self.mark_inverted:
marked_size = self.__total_size - marked_size
result = tr("%d / %d (%s / %s) duplicates marked.") % (
mark_count,
total_count,
format_size(marked_size, 2),
format_size(total_size, 2),
)
if self.__filters:
result += tr(" filter: %s") % ' --> '.join(self.__filters)
return result
def __recalculate_stats(self):
self.__total_size = 0
self.__total_count = 0
for group in self.groups:
markable = [dupe for dupe in group.dupes if self._is_markable(dupe)]
self.__total_count += len(markable)
self.__total_size += sum(dupe.size for dupe in markable)
def __set_groups(self, new_groups):
self.mark_none()
self.__groups = new_groups
self.__group_of_duplicate = {}
for g in self.__groups:
for dupe in g:
self.__group_of_duplicate[dupe] = g
if not hasattr(dupe, 'is_ref'):
dupe.is_ref = False
self.is_modified = bool(self.__groups)
old_filters = nonone(self.__filters, [])
self.apply_filter(None)
for filter_str in old_filters:
self.apply_filter(filter_str)
#---Public
def apply_filter(self, filter_str):
''' Applies a filter 'filter_str' to self.groups
When you apply the filter, only dupes with the filename matching 'filter_str' will be in
in the results. To cancel the filter, just call apply_filter with 'filter_str' to None,
and the results will go back to normal.
If call apply_filter on a filtered results, the filter will be applied
*on the filtered results*.
'filter_str' is a string containing a regexp to filter dupes with.
'''
if not filter_str:
self.__filtered_dupes = None
self.__filtered_groups = None
self.__filters = None
else:
if not self.__filters:
self.__filters = []
try:
filter_re = re.compile(filter_str, re.IGNORECASE)
except re.error:
return # don't apply this filter.
self.__filters.append(filter_str)
if self.__filtered_dupes is None:
self.__filtered_dupes = flatten(g[:] for g in self.groups)
self.__filtered_dupes = set(dupe for dupe in self.__filtered_dupes if filter_re.search(str(dupe.path)))
filtered_groups = set()
for dupe in self.__filtered_dupes:
filtered_groups.add(self.get_group_of_duplicate(dupe))
self.__filtered_groups = list(filtered_groups)
self.__recalculate_stats()
sd = self.__groups_sort_descriptor
if sd:
self.sort_groups(sd[0], sd[1])
self.__dupes = None
def get_group_of_duplicate(self, dupe):
try:
return self.__group_of_duplicate[dupe]
except (TypeError, KeyError):
return None
is_markable = _is_markable
def load_from_xml(self, infile, get_file, j=nulljob):
def do_match(ref_file, other_files, group):
if not other_files:
return
for other_file in other_files:
group.add_match(engine.get_match(ref_file, other_file))
do_match(other_files[0], other_files[1:], group)
self.apply_filter(None)
try:
root = ET.parse(infile).getroot()
except Exception:
return
group_elems = list(root.getiterator('group'))
groups = []
marked = set()
for group_elem in j.iter_with_progress(group_elems, every=100):
group = engine.Group()
dupes = []
for file_elem in group_elem.getiterator('file'):
path = file_elem.get('path')
words = file_elem.get('words', '')
if not path:
continue
file = get_file(path)
if file is None:
continue
file.words = words.split(',')
file.is_ref = file_elem.get('is_ref') == 'y'
dupes.append(file)
if file_elem.get('marked') == 'y':
marked.add(file)
for match_elem in group_elem.getiterator('match'):
try:
attrs = match_elem.attrib
first_file = dupes[int(attrs['first'])]
second_file = dupes[int(attrs['second'])]
percentage = int(attrs['percentage'])
group.add_match(engine.Match(first_file, second_file, percentage))
except (IndexError, KeyError, ValueError): # Covers missing attr, non-int values and indexes out of bounds
pass
if (not group.matches) and (len(dupes) >= 2):
do_match(dupes[0], dupes[1:], group)
group.prioritize(lambda x: dupes.index(x))
if len(group):
groups.append(group)
j.add_progress()
self.groups = groups
for dupe_file in marked:
self.mark(dupe_file)
self.is_modified = False
def make_ref(self, dupe):
g = self.get_group_of_duplicate(dupe)
r = g.ref
if not g.switch_ref(dupe):
return False
self._remove_mark_flag(dupe)
if not r.is_ref:
self.__total_count += 1
self.__total_size += r.size
if not dupe.is_ref:
self.__total_count -= 1
self.__total_size -= dupe.size
self.__dupes = None
self.is_modified = True
return True
def perform_on_marked(self, func, remove_from_results):
# Performs `func` on all marked dupes. If an EnvironmentError is raised during the call,
# the problematic dupe is added to self.problems.
self.problems = []
to_remove = []
marked = (dupe for dupe in self.dupes if self.is_marked(dupe))
for dupe in marked:
try:
func(dupe)
to_remove.append(dupe)
except (EnvironmentError, UnicodeEncodeError) as e:
self.problems.append((dupe, str(e)))
if remove_from_results:
self.remove_duplicates(to_remove)
self.mark_none()
for dupe, _ in self.problems:
self.mark(dupe)
def remove_duplicates(self, dupes):
'''Remove 'dupes' from their respective group, and remove the group is it ends up empty.
'''
affected_groups = set()
for dupe in dupes:
group = self.get_group_of_duplicate(dupe)
if dupe not in group.dupes:
return
ref = group.ref
group.remove_dupe(dupe, False)
del self.__group_of_duplicate[dupe]
self._remove_mark_flag(dupe)
self.__total_count -= 1
self.__total_size -= dupe.size
if not group:
del self.__group_of_duplicate[ref]
self.__groups.remove(group)
if self.__filtered_groups:
self.__filtered_groups.remove(group)
else:
affected_groups.add(group)
for group in affected_groups:
group.discard_matches()
self.__dupes = None
self.is_modified = bool(self.__groups)
def save_to_xml(self, outfile):
self.apply_filter(None)
root = ET.Element('results')
# writer = XMLGenerator(outfile, 'utf-8')
for g in self.groups:
group_elem = ET.SubElement(root, 'group')
dupe2index = {}
for index, d in enumerate(g):
dupe2index[d] = index
try:
words = engine.unpack_fields(d.words)
except AttributeError:
words = ()
file_elem = ET.SubElement(group_elem, 'file')
try:
file_elem.set('path', str(d.path))
file_elem.set('words', ','.join(words))
except ValueError: # If there's an invalid character, just skip the file
file_elem.set('path', '')
file_elem.set('is_ref', ('y' if d.is_ref else 'n'))
file_elem.set('marked', ('y' if self.is_marked(d) else 'n'))
for match in g.matches:
match_elem = ET.SubElement(group_elem, 'match')
match_elem.set('first', str(dupe2index[match.first]))
match_elem.set('second', str(dupe2index[match.second]))
match_elem.set('percentage', str(int(match.percentage)))
tree = ET.ElementTree(root)
def do_write(outfile):
with FileOrPath(outfile, 'wb') as fp:
tree.write(fp, encoding='utf-8')
try:
do_write(outfile)
except IOError as e:
# If our IOError is because dest is already a directory, we want to handle that. 21 is
# the code we get on OS X and Linux, 13 is what we get on Windows.
if e.errno in {21, 13}:
p = str(outfile)
dirname, basename = op.split(p)
otherfiles = os.listdir(dirname)
newname = get_conflicted_name(otherfiles, basename)
do_write(op.join(dirname, newname))
else:
raise
self.is_modified = False
def sort_dupes(self, key, asc=True, delta=False):
if not self.__dupes:
self.__get_dupe_list()
keyfunc = lambda d: self.app._get_dupe_sort_key(d, lambda: self.get_group_of_duplicate(d), key, delta)
self.__dupes.sort(key=keyfunc, reverse=not asc)
self.__dupes_sort_descriptor = (key,asc,delta)
def sort_groups(self,key,asc=True):
keyfunc = lambda g: self.app._get_group_sort_key(g, key)
self.groups.sort(key=keyfunc, reverse=not asc)
self.__groups_sort_descriptor = (key,asc)
#---Properties
dupes = property(__get_dupe_list)
groups = property(__get_groups, __set_groups)
stat_line = property(__get_stat_line)