1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2024-10-18 08:43:08 +00:00
dupeguru/core/scanner.py
Virgil Dupras b1ef3dc8fe Simplify progress report during scanning
We now get less progress feedback, but in exchange, our progress job is
simpler. Previously, our progress bar would often get wonky towards the
end of the scan and I didn't have the energy to debug that.

Besides, people don't care about that level of progress feedback.
2016-06-08 12:29:28 -04:00

197 lines
8.3 KiB
Python

# Copyright 2016 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "GPLv3" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.gnu.org/licenses/gpl-3.0.html
import logging
import re
import os.path as op
from collections import namedtuple
from hscommon.jobprogress import job
from hscommon.util import dedupe, rem_file_ext, get_file_ext
from hscommon.trans import tr
from . import engine
# It's quite ugly to have scan types from all editions all put in the same class, but because there's
# there will be some nasty bugs popping up (ScanType is used in core when in should exclusively be
# used in core_*). One day I'll clean this up.
class ScanType:
Filename = 0
Fields = 1
FieldsNoOrder = 2
Tag = 3
Folders = 4
Contents = 5
#PE
FuzzyBlock = 10
ExifTimestamp = 11
ScanOption = namedtuple('ScanOption', 'scan_type label')
SCANNABLE_TAGS = ['track', 'artist', 'album', 'title', 'genre', 'year']
RE_DIGIT_ENDING = re.compile(r'\d+|\(\d+\)|\[\d+\]|{\d+}')
def is_same_with_digit(name, refname):
# Returns True if name is the same as refname, but with digits (with brackets or not) at the end
if not name.startswith(refname):
return False
end = name[len(refname):].strip()
return RE_DIGIT_ENDING.match(end) is not None
def remove_dupe_paths(files):
# Returns files with duplicates-by-path removed. Files with the exact same path are considered
# duplicates and only the first file to have a path is kept. In certain cases, we have files
# that have the same path, but not with the same case, that's why we normalize. However, we also
# have case-sensitive filesystems, and in those, we don't want to falsely remove duplicates,
# that's why we have a `samefile` mechanism.
result = []
path2file = {}
for f in files:
normalized = str(f.path).lower()
if normalized in path2file:
try:
if op.samefile(normalized, str(path2file[normalized].path)):
continue # same file, it's a dupe
else:
pass # We don't treat them as dupes
except OSError:
continue # File doesn't exist? Well, treat them as dupes
else:
path2file[normalized] = f
result.append(f)
return result
class Scanner:
def __init__(self):
self.discarded_file_count = 0
def _getmatches(self, files, j):
if self.size_threshold or self.scan_type in {ScanType.Contents, ScanType.Folders}:
j = j.start_subjob([2, 8])
for f in j.iter_with_progress(files, tr("Read size of %d/%d files")):
f.size # pre-read, makes a smoother progress if read here (especially for bundles)
if self.size_threshold:
files = [f for f in files if f.size >= self.size_threshold]
if self.scan_type in {ScanType.Contents, ScanType.Folders}:
return engine.getmatches_by_contents(files, j=j)
else:
j = j.start_subjob([2, 8])
kw = {}
kw['match_similar_words'] = self.match_similar_words
kw['weight_words'] = self.word_weighting
kw['min_match_percentage'] = self.min_match_percentage
if self.scan_type == ScanType.FieldsNoOrder:
self.scan_type = ScanType.Fields
kw['no_field_order'] = True
func = {
ScanType.Filename: lambda f: engine.getwords(rem_file_ext(f.name)),
ScanType.Fields: lambda f: engine.getfields(rem_file_ext(f.name)),
ScanType.Tag: lambda f: [
engine.getwords(str(getattr(f, attrname)))
for attrname in SCANNABLE_TAGS
if attrname in self.scanned_tags
],
}[self.scan_type]
for f in j.iter_with_progress(files, tr("Read metadata of %d/%d files")):
logging.debug("Reading metadata of %s", f.path)
f.words = func(f)
return engine.getmatches(files, j=j, **kw)
@staticmethod
def _key_func(dupe):
return -dupe.size
@staticmethod
def _tie_breaker(ref, dupe):
refname = rem_file_ext(ref.name).lower()
dupename = rem_file_ext(dupe.name).lower()
if 'copy' in dupename:
return False
if 'copy' in refname:
return True
if is_same_with_digit(dupename, refname):
return False
if is_same_with_digit(refname, dupename):
return True
return len(dupe.path) > len(ref.path)
@staticmethod
def get_scan_options():
"""Returns a list of scanning options for this scanner.
Returns a list of ``ScanOption``.
"""
raise NotImplementedError()
def get_dupe_groups(self, files, ignore_list=None, j=job.nulljob):
for f in (f for f in files if not hasattr(f, 'is_ref')):
f.is_ref = False
files = remove_dupe_paths(files)
logging.info("Getting matches. Scan type: %d", self.scan_type)
matches = self._getmatches(files, j)
logging.info('Found %d matches' % len(matches))
j.set_progress(100, tr("Almost done! Fiddling with results..."))
# In removing what we call here "false matches", we first want to remove, if we scan by
# folders, we want to remove folder matches for which the parent is also in a match (they're
# "duplicated duplicates if you will). Then, we also don't want mixed file kinds if the
# option isn't enabled, we want matches for which both files exist and, lastly, we don't
# want matches with both files as ref.
if self.scan_type == ScanType.Folders and matches:
allpath = {m.first.path for m in matches}
allpath |= {m.second.path for m in matches}
sortedpaths = sorted(allpath)
toremove = set()
last_parent_path = sortedpaths[0]
for p in sortedpaths[1:]:
if p in last_parent_path:
toremove.add(p)
else:
last_parent_path = p
matches = [m for m in matches if m.first.path not in toremove or m.second.path not in toremove]
if not self.mix_file_kind:
matches = [m for m in matches if get_file_ext(m.first.name) == get_file_ext(m.second.name)]
matches = [m for m in matches if m.first.path.exists() and m.second.path.exists()]
matches = [m for m in matches if not (m.first.is_ref and m.second.is_ref)]
if ignore_list:
matches = [
m for m in matches
if not ignore_list.AreIgnored(str(m.first.path), str(m.second.path))
]
logging.info('Grouping matches')
groups = engine.get_groups(matches)
if self.scan_type in {ScanType.Filename, ScanType.Fields, ScanType.FieldsNoOrder, ScanType.Tag}:
matched_files = dedupe([m.first for m in matches] + [m.second for m in matches])
self.discarded_file_count = len(matched_files) - sum(len(g) for g in groups)
else:
# Ticket #195
# To speed up the scan, we don't bother comparing contents of files that are both ref
# files. However, this messes up "discarded" counting because there's a missing match
# in cases where we end up with a dupe group anyway (with a non-ref file). Because it's
# impossible to have discarded matches in exact dupe scans, we simply set it at 0, thus
# bypassing our tricky problem.
# Also, although ScanType.FuzzyBlock is not always doing exact comparisons, we also
# bypass ref comparison, thus messing up with our "discarded" count. So we're
# effectively disabling the "discarded" feature in PE, but it's better than falsely
# reporting discarded matches.
self.discarded_file_count = 0
groups = [g for g in groups if any(not f.is_ref for f in g)]
logging.info('Created %d groups' % len(groups))
for g in groups:
g.prioritize(self._key_func, self._tie_breaker)
return groups
match_similar_words = False
min_match_percentage = 80
mix_file_kind = True
scan_type = ScanType.Filename
scanned_tags = {'artist', 'title'}
size_threshold = 0
word_weighting = False