mirror of
https://github.com/arsenetar/dupeguru.git
synced 2024-11-16 20:29:02 +00:00
70e505ad92
Having dupes from ref folders (which makes ref switching impossible) would make the new feature glitchy (selection would be emptied). Now, in cases where the action results in nothing being changed, the selection stays intact. [#222]
367 lines
14 KiB
Python
367 lines
14 KiB
Python
# Created By: Virgil Dupras
|
|
# Created On: 2006/02/23
|
|
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
|
|
#
|
|
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
|
|
# which should be included with this package. The terms are also available at
|
|
# http://www.hardcoded.net/licenses/bsd_license
|
|
|
|
import logging
|
|
import re
|
|
import os
|
|
import os.path as op
|
|
from xml.etree import ElementTree as ET
|
|
|
|
from jobprogress.job import nulljob
|
|
from hscommon.conflict import get_conflicted_name
|
|
from hscommon.util import flatten, nonone, FileOrPath, format_size
|
|
from hscommon.trans import tr
|
|
|
|
from . import engine
|
|
from .markable import Markable
|
|
|
|
class Results(Markable):
|
|
#---Override
|
|
def __init__(self, app):
|
|
Markable.__init__(self)
|
|
self.__groups = []
|
|
self.__group_of_duplicate = {}
|
|
self.__groups_sort_descriptor = None # This is a tuple (key, asc)
|
|
self.__dupes = None
|
|
self.__dupes_sort_descriptor = None # This is a tuple (key, asc, delta)
|
|
self.__filters = None
|
|
self.__filtered_dupes = None
|
|
self.__filtered_groups = None
|
|
self.__recalculate_stats()
|
|
self.__marked_size = 0
|
|
self.app = app
|
|
self.problems = [] # (dupe, error_msg)
|
|
self.is_modified = False
|
|
|
|
def _did_mark(self, dupe):
|
|
self.__marked_size += dupe.size
|
|
|
|
def _did_unmark(self, dupe):
|
|
self.__marked_size -= dupe.size
|
|
|
|
def _get_markable_count(self):
|
|
return self.__total_count
|
|
|
|
def _is_markable(self, dupe):
|
|
if dupe.is_ref:
|
|
return False
|
|
g = self.get_group_of_duplicate(dupe)
|
|
if not g:
|
|
return False
|
|
if dupe is g.ref:
|
|
return False
|
|
if self.__filtered_dupes and dupe not in self.__filtered_dupes:
|
|
return False
|
|
return True
|
|
|
|
def mark_all(self):
|
|
if self.__filters:
|
|
self.mark_multiple(self.__filtered_dupes)
|
|
else:
|
|
Markable.mark_all(self)
|
|
|
|
def mark_invert(self):
|
|
if self.__filters:
|
|
self.mark_toggle_multiple(self.__filtered_dupes)
|
|
else:
|
|
Markable.mark_invert(self)
|
|
|
|
def mark_none(self):
|
|
if self.__filters:
|
|
self.unmark_multiple(self.__filtered_dupes)
|
|
else:
|
|
Markable.mark_none(self)
|
|
|
|
#---Private
|
|
def __get_dupe_list(self):
|
|
if self.__dupes is None:
|
|
self.__dupes = flatten(group.dupes for group in self.groups)
|
|
if None in self.__dupes:
|
|
# This is debug logging to try to figure out #44
|
|
logging.warning("There is a None value in the Results' dupe list. dupes: %r groups: %r", self.__dupes, self.groups)
|
|
if self.__filtered_dupes:
|
|
self.__dupes = [dupe for dupe in self.__dupes if dupe in self.__filtered_dupes]
|
|
sd = self.__dupes_sort_descriptor
|
|
if sd:
|
|
self.sort_dupes(sd[0], sd[1], sd[2])
|
|
return self.__dupes
|
|
|
|
def __get_groups(self):
|
|
if self.__filtered_groups is None:
|
|
return self.__groups
|
|
else:
|
|
return self.__filtered_groups
|
|
|
|
def __get_stat_line(self):
|
|
if self.__filtered_dupes is None:
|
|
mark_count = self.mark_count
|
|
marked_size = self.__marked_size
|
|
total_count = self.__total_count
|
|
total_size = self.__total_size
|
|
else:
|
|
mark_count = len([dupe for dupe in self.__filtered_dupes if self.is_marked(dupe)])
|
|
marked_size = sum(dupe.size for dupe in self.__filtered_dupes if self.is_marked(dupe))
|
|
total_count = len([dupe for dupe in self.__filtered_dupes if self.is_markable(dupe)])
|
|
total_size = sum(dupe.size for dupe in self.__filtered_dupes if self.is_markable(dupe))
|
|
if self.mark_inverted:
|
|
marked_size = self.__total_size - marked_size
|
|
result = tr("%d / %d (%s / %s) duplicates marked.") % (
|
|
mark_count,
|
|
total_count,
|
|
format_size(marked_size, 2),
|
|
format_size(total_size, 2),
|
|
)
|
|
if self.__filters:
|
|
result += tr(" filter: %s") % ' --> '.join(self.__filters)
|
|
return result
|
|
|
|
def __recalculate_stats(self):
|
|
self.__total_size = 0
|
|
self.__total_count = 0
|
|
for group in self.groups:
|
|
markable = [dupe for dupe in group.dupes if self._is_markable(dupe)]
|
|
self.__total_count += len(markable)
|
|
self.__total_size += sum(dupe.size for dupe in markable)
|
|
|
|
def __set_groups(self, new_groups):
|
|
self.mark_none()
|
|
self.__groups = new_groups
|
|
self.__group_of_duplicate = {}
|
|
for g in self.__groups:
|
|
for dupe in g:
|
|
self.__group_of_duplicate[dupe] = g
|
|
if not hasattr(dupe, 'is_ref'):
|
|
dupe.is_ref = False
|
|
self.is_modified = bool(self.__groups)
|
|
old_filters = nonone(self.__filters, [])
|
|
self.apply_filter(None)
|
|
for filter_str in old_filters:
|
|
self.apply_filter(filter_str)
|
|
|
|
#---Public
|
|
def apply_filter(self, filter_str):
|
|
''' Applies a filter 'filter_str' to self.groups
|
|
|
|
When you apply the filter, only dupes with the filename matching 'filter_str' will be in
|
|
in the results. To cancel the filter, just call apply_filter with 'filter_str' to None,
|
|
and the results will go back to normal.
|
|
|
|
If call apply_filter on a filtered results, the filter will be applied
|
|
*on the filtered results*.
|
|
|
|
'filter_str' is a string containing a regexp to filter dupes with.
|
|
'''
|
|
if not filter_str:
|
|
self.__filtered_dupes = None
|
|
self.__filtered_groups = None
|
|
self.__filters = None
|
|
else:
|
|
if not self.__filters:
|
|
self.__filters = []
|
|
try:
|
|
filter_re = re.compile(filter_str, re.IGNORECASE)
|
|
except re.error:
|
|
return # don't apply this filter.
|
|
self.__filters.append(filter_str)
|
|
if self.__filtered_dupes is None:
|
|
self.__filtered_dupes = flatten(g[:] for g in self.groups)
|
|
self.__filtered_dupes = set(dupe for dupe in self.__filtered_dupes if filter_re.search(str(dupe.path)))
|
|
filtered_groups = set()
|
|
for dupe in self.__filtered_dupes:
|
|
filtered_groups.add(self.get_group_of_duplicate(dupe))
|
|
self.__filtered_groups = list(filtered_groups)
|
|
self.__recalculate_stats()
|
|
sd = self.__groups_sort_descriptor
|
|
if sd:
|
|
self.sort_groups(sd[0], sd[1])
|
|
self.__dupes = None
|
|
|
|
def get_group_of_duplicate(self, dupe):
|
|
try:
|
|
return self.__group_of_duplicate[dupe]
|
|
except (TypeError, KeyError):
|
|
return None
|
|
|
|
is_markable = _is_markable
|
|
|
|
def load_from_xml(self, infile, get_file, j=nulljob):
|
|
def do_match(ref_file, other_files, group):
|
|
if not other_files:
|
|
return
|
|
for other_file in other_files:
|
|
group.add_match(engine.get_match(ref_file, other_file))
|
|
do_match(other_files[0], other_files[1:], group)
|
|
|
|
self.apply_filter(None)
|
|
try:
|
|
root = ET.parse(infile).getroot()
|
|
except Exception:
|
|
return
|
|
group_elems = list(root.getiterator('group'))
|
|
groups = []
|
|
marked = set()
|
|
for group_elem in j.iter_with_progress(group_elems, every=100):
|
|
group = engine.Group()
|
|
dupes = []
|
|
for file_elem in group_elem.getiterator('file'):
|
|
path = file_elem.get('path')
|
|
words = file_elem.get('words', '')
|
|
if not path:
|
|
continue
|
|
file = get_file(path)
|
|
if file is None:
|
|
continue
|
|
file.words = words.split(',')
|
|
file.is_ref = file_elem.get('is_ref') == 'y'
|
|
dupes.append(file)
|
|
if file_elem.get('marked') == 'y':
|
|
marked.add(file)
|
|
for match_elem in group_elem.getiterator('match'):
|
|
try:
|
|
attrs = match_elem.attrib
|
|
first_file = dupes[int(attrs['first'])]
|
|
second_file = dupes[int(attrs['second'])]
|
|
percentage = int(attrs['percentage'])
|
|
group.add_match(engine.Match(first_file, second_file, percentage))
|
|
except (IndexError, KeyError, ValueError): # Covers missing attr, non-int values and indexes out of bounds
|
|
pass
|
|
if (not group.matches) and (len(dupes) >= 2):
|
|
do_match(dupes[0], dupes[1:], group)
|
|
group.prioritize(lambda x: dupes.index(x))
|
|
if len(group):
|
|
groups.append(group)
|
|
j.add_progress()
|
|
self.groups = groups
|
|
for dupe_file in marked:
|
|
self.mark(dupe_file)
|
|
self.is_modified = False
|
|
|
|
def make_ref(self, dupe):
|
|
g = self.get_group_of_duplicate(dupe)
|
|
r = g.ref
|
|
if not g.switch_ref(dupe):
|
|
return False
|
|
self._remove_mark_flag(dupe)
|
|
if not r.is_ref:
|
|
self.__total_count += 1
|
|
self.__total_size += r.size
|
|
if not dupe.is_ref:
|
|
self.__total_count -= 1
|
|
self.__total_size -= dupe.size
|
|
self.__dupes = None
|
|
self.is_modified = True
|
|
return True
|
|
|
|
def perform_on_marked(self, func, remove_from_results):
|
|
# Performs `func` on all marked dupes. If an EnvironmentError is raised during the call,
|
|
# the problematic dupe is added to self.problems.
|
|
self.problems = []
|
|
to_remove = []
|
|
marked = (dupe for dupe in self.dupes if self.is_marked(dupe))
|
|
for dupe in marked:
|
|
try:
|
|
func(dupe)
|
|
to_remove.append(dupe)
|
|
except (EnvironmentError, UnicodeEncodeError) as e:
|
|
self.problems.append((dupe, str(e)))
|
|
if remove_from_results:
|
|
self.remove_duplicates(to_remove)
|
|
self.mark_none()
|
|
for dupe, _ in self.problems:
|
|
self.mark(dupe)
|
|
|
|
def remove_duplicates(self, dupes):
|
|
'''Remove 'dupes' from their respective group, and remove the group is it ends up empty.
|
|
'''
|
|
affected_groups = set()
|
|
for dupe in dupes:
|
|
group = self.get_group_of_duplicate(dupe)
|
|
if dupe not in group.dupes:
|
|
return
|
|
ref = group.ref
|
|
group.remove_dupe(dupe, False)
|
|
del self.__group_of_duplicate[dupe]
|
|
self._remove_mark_flag(dupe)
|
|
self.__total_count -= 1
|
|
self.__total_size -= dupe.size
|
|
if not group:
|
|
del self.__group_of_duplicate[ref]
|
|
self.__groups.remove(group)
|
|
if self.__filtered_groups:
|
|
self.__filtered_groups.remove(group)
|
|
else:
|
|
affected_groups.add(group)
|
|
for group in affected_groups:
|
|
group.discard_matches()
|
|
self.__dupes = None
|
|
self.is_modified = bool(self.__groups)
|
|
|
|
def save_to_xml(self, outfile):
|
|
self.apply_filter(None)
|
|
root = ET.Element('results')
|
|
# writer = XMLGenerator(outfile, 'utf-8')
|
|
for g in self.groups:
|
|
group_elem = ET.SubElement(root, 'group')
|
|
dupe2index = {}
|
|
for index, d in enumerate(g):
|
|
dupe2index[d] = index
|
|
try:
|
|
words = engine.unpack_fields(d.words)
|
|
except AttributeError:
|
|
words = ()
|
|
file_elem = ET.SubElement(group_elem, 'file')
|
|
try:
|
|
file_elem.set('path', str(d.path))
|
|
file_elem.set('words', ','.join(words))
|
|
except ValueError: # If there's an invalid character, just skip the file
|
|
file_elem.set('path', '')
|
|
file_elem.set('is_ref', ('y' if d.is_ref else 'n'))
|
|
file_elem.set('marked', ('y' if self.is_marked(d) else 'n'))
|
|
for match in g.matches:
|
|
match_elem = ET.SubElement(group_elem, 'match')
|
|
match_elem.set('first', str(dupe2index[match.first]))
|
|
match_elem.set('second', str(dupe2index[match.second]))
|
|
match_elem.set('percentage', str(int(match.percentage)))
|
|
tree = ET.ElementTree(root)
|
|
|
|
def do_write(outfile):
|
|
with FileOrPath(outfile, 'wb') as fp:
|
|
tree.write(fp, encoding='utf-8')
|
|
|
|
try:
|
|
do_write(outfile)
|
|
except IOError as e:
|
|
# If our IOError is because dest is already a directory, we want to handle that. 21 is
|
|
# the code we get on OS X and Linux, 13 is what we get on Windows.
|
|
if e.errno in {21, 13}:
|
|
p = str(outfile)
|
|
dirname, basename = op.split(p)
|
|
otherfiles = os.listdir(dirname)
|
|
newname = get_conflicted_name(otherfiles, basename)
|
|
do_write(op.join(dirname, newname))
|
|
else:
|
|
raise
|
|
self.is_modified = False
|
|
|
|
def sort_dupes(self, key, asc=True, delta=False):
|
|
if not self.__dupes:
|
|
self.__get_dupe_list()
|
|
keyfunc = lambda d: self.app._get_dupe_sort_key(d, lambda: self.get_group_of_duplicate(d), key, delta)
|
|
self.__dupes.sort(key=keyfunc, reverse=not asc)
|
|
self.__dupes_sort_descriptor = (key,asc,delta)
|
|
|
|
def sort_groups(self,key,asc=True):
|
|
keyfunc = lambda g: self.app._get_group_sort_key(g, key)
|
|
self.groups.sort(key=keyfunc, reverse=not asc)
|
|
self.__groups_sort_descriptor = (key,asc)
|
|
|
|
#---Properties
|
|
dupes = property(__get_dupe_list)
|
|
groups = property(__get_groups, __set_groups)
|
|
stat_line = property(__get_stat_line)
|