1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2026-01-22 14:41:39 +00:00

Format files with black

- Format all files with black
- Update tox.ini flake8 arguments to be compatible
- Add black to requirements-extra.txt
- Reduce ignored flake8 rules and fix a few violations
This commit is contained in:
2019-12-31 20:16:27 -06:00
parent 359d6498f7
commit 7ba8aa3514
141 changed files with 5241 additions and 3648 deletions

View File

@@ -34,8 +34,8 @@ from .gui.ignore_list_dialog import IgnoreListDialog
from .gui.problem_dialog import ProblemDialog
from .gui.stats_label import StatsLabel
HAD_FIRST_LAUNCH_PREFERENCE = 'HadFirstLaunch'
DEBUG_MODE_PREFERENCE = 'DebugMode'
HAD_FIRST_LAUNCH_PREFERENCE = "HadFirstLaunch"
DEBUG_MODE_PREFERENCE = "DebugMode"
MSG_NO_MARKED_DUPES = tr("There are no marked duplicates. Nothing has been done.")
MSG_NO_SELECTED_DUPES = tr("There are no selected duplicates. Nothing has been done.")
@@ -44,23 +44,27 @@ MSG_MANY_FILES_TO_OPEN = tr(
"files are opened with, doing so can create quite a mess. Continue?"
)
class DestType:
Direct = 0
Relative = 1
Absolute = 2
class JobType:
Scan = 'job_scan'
Load = 'job_load'
Move = 'job_move'
Copy = 'job_copy'
Delete = 'job_delete'
Scan = "job_scan"
Load = "job_load"
Move = "job_move"
Copy = "job_copy"
Delete = "job_delete"
class AppMode:
Standard = 0
Music = 1
Picture = 2
JOBID2TITLE = {
JobType.Scan: tr("Scanning for duplicates"),
JobType.Load: tr("Loading"),
@@ -69,6 +73,7 @@ JOBID2TITLE = {
JobType.Delete: tr("Sending to Trash"),
}
class DupeGuru(Broadcaster):
"""Holds everything together.
@@ -100,7 +105,8 @@ class DupeGuru(Broadcaster):
Instance of :mod:`meta-gui <core.gui>` table listing the results from :attr:`results`
"""
#--- View interface
# --- View interface
# get_default(key_name)
# set_default(key_name, value)
# show_message(msg)
@@ -116,7 +122,7 @@ class DupeGuru(Broadcaster):
NAME = PROMPT_NAME = "dupeGuru"
PICTURE_CACHE_TYPE = 'sqlite' # set to 'shelve' for a ShelveCache
PICTURE_CACHE_TYPE = "sqlite" # set to 'shelve' for a ShelveCache
def __init__(self, view):
if view.get_default(DEBUG_MODE_PREFERENCE):
@@ -124,7 +130,9 @@ class DupeGuru(Broadcaster):
logging.debug("Debug mode enabled")
Broadcaster.__init__(self)
self.view = view
self.appdata = desktop.special_folder_path(desktop.SpecialFolder.AppData, appname=self.NAME)
self.appdata = desktop.special_folder_path(
desktop.SpecialFolder.AppData, appname=self.NAME
)
if not op.exists(self.appdata):
os.makedirs(self.appdata)
self.app_mode = AppMode.Standard
@@ -136,11 +144,11 @@ class DupeGuru(Broadcaster):
# sent to the scanner. They don't have default values because those defaults values are
# defined in the scanner class.
self.options = {
'escape_filter_regexp': True,
'clean_empty_dirs': False,
'ignore_hardlink_matches': False,
'copymove_dest_type': DestType.Relative,
'picture_cache_type': self.PICTURE_CACHE_TYPE
"escape_filter_regexp": True,
"clean_empty_dirs": False,
"ignore_hardlink_matches": False,
"copymove_dest_type": DestType.Relative,
"picture_cache_type": self.PICTURE_CACHE_TYPE,
}
self.selected_dupes = []
self.details_panel = DetailsPanel(self)
@@ -155,7 +163,7 @@ class DupeGuru(Broadcaster):
for child in children:
child.connect()
#--- Private
# --- Private
def _recreate_result_table(self):
if self.result_table is not None:
self.result_table.disconnect()
@@ -169,26 +177,30 @@ class DupeGuru(Broadcaster):
self.view.create_results_window()
def _get_picture_cache_path(self):
cache_type = self.options['picture_cache_type']
cache_name = 'cached_pictures.shelve' if cache_type == 'shelve' else 'cached_pictures.db'
cache_type = self.options["picture_cache_type"]
cache_name = (
"cached_pictures.shelve" if cache_type == "shelve" else "cached_pictures.db"
)
return op.join(self.appdata, cache_name)
def _get_dupe_sort_key(self, dupe, get_group, key, delta):
if self.app_mode in (AppMode.Music, AppMode.Picture):
if key == 'folder_path':
dupe_folder_path = getattr(dupe, 'display_folder_path', dupe.folder_path)
if key == "folder_path":
dupe_folder_path = getattr(
dupe, "display_folder_path", dupe.folder_path
)
return str(dupe_folder_path).lower()
if self.app_mode == AppMode.Picture:
if delta and key == 'dimensions':
if delta and key == "dimensions":
r = cmp_value(dupe, key)
ref_value = cmp_value(get_group().ref, key)
return get_delta_dimensions(r, ref_value)
if key == 'marked':
if key == "marked":
return self.results.is_marked(dupe)
if key == 'percentage':
if key == "percentage":
m = get_group().get_match_of(dupe)
return m.percentage
elif key == 'dupe_count':
elif key == "dupe_count":
return 0
else:
result = cmp_value(dupe, key)
@@ -203,21 +215,25 @@ class DupeGuru(Broadcaster):
def _get_group_sort_key(self, group, key):
if self.app_mode in (AppMode.Music, AppMode.Picture):
if key == 'folder_path':
dupe_folder_path = getattr(group.ref, 'display_folder_path', group.ref.folder_path)
if key == "folder_path":
dupe_folder_path = getattr(
group.ref, "display_folder_path", group.ref.folder_path
)
return str(dupe_folder_path).lower()
if key == 'percentage':
if key == "percentage":
return group.percentage
if key == 'dupe_count':
if key == "dupe_count":
return len(group)
if key == 'marked':
if key == "marked":
return len([dupe for dupe in group.dupes if self.results.is_marked(dupe)])
return cmp_value(group.ref, key)
def _do_delete(self, j, link_deleted, use_hardlinks, direct_deletion):
def op(dupe):
j.add_progress()
return self._do_delete_dupe(dupe, link_deleted, use_hardlinks, direct_deletion)
return self._do_delete_dupe(
dupe, link_deleted, use_hardlinks, direct_deletion
)
j.start_job(self.results.mark_count)
self.results.perform_on_marked(op, True)
@@ -233,7 +249,7 @@ class DupeGuru(Broadcaster):
else:
os.remove(str_path)
else:
send2trash(str_path) # Raises OSError when there's a problem
send2trash(str_path) # Raises OSError when there's a problem
if link_deleted:
group = self.results.get_group_of_duplicate(dupe)
ref = group.ref
@@ -258,8 +274,9 @@ class DupeGuru(Broadcaster):
def _get_export_data(self):
columns = [
col for col in self.result_table.columns.ordered_columns
if col.visible and col.name != 'marked'
col
for col in self.result_table.columns.ordered_columns
if col.visible and col.name != "marked"
]
colnames = [col.display for col in columns]
rows = []
@@ -273,10 +290,11 @@ class DupeGuru(Broadcaster):
def _results_changed(self):
self.selected_dupes = [
d for d in self.selected_dupes
d
for d in self.selected_dupes
if self.results.get_group_of_duplicate(d) is not None
]
self.notify('results_changed')
self.notify("results_changed")
def _start_job(self, jobid, func, args=()):
title = JOBID2TITLE[jobid]
@@ -310,7 +328,9 @@ class DupeGuru(Broadcaster):
msg = {
JobType.Copy: tr("All marked files were copied successfully."),
JobType.Move: tr("All marked files were moved successfully."),
JobType.Delete: tr("All marked files were successfully sent to Trash."),
JobType.Delete: tr(
"All marked files were successfully sent to Trash."
),
}[jobid]
self.view.show_message(msg)
@@ -341,9 +361,9 @@ class DupeGuru(Broadcaster):
if dupes == self.selected_dupes:
return
self.selected_dupes = dupes
self.notify('dupes_selected')
self.notify("dupes_selected")
#--- Protected
# --- Protected
def _get_fileclasses(self):
if self.app_mode == AppMode.Picture:
return [pe.photo.PLAT_SPECIFIC_PHOTO_CLASS]
@@ -360,7 +380,7 @@ class DupeGuru(Broadcaster):
else:
return prioritize.all_categories()
#--- Public
# --- Public
def add_directory(self, d):
"""Adds folder ``d`` to :attr:`directories`.
@@ -370,7 +390,7 @@ class DupeGuru(Broadcaster):
"""
try:
self.directories.add_path(Path(d))
self.notify('directories_changed')
self.notify("directories_changed")
except directories.AlreadyThereError:
self.view.show_message(tr("'{}' already is in the list.").format(d))
except directories.InvalidPathError:
@@ -383,7 +403,9 @@ class DupeGuru(Broadcaster):
if not dupes:
self.view.show_message(MSG_NO_SELECTED_DUPES)
return
msg = tr("All selected %d matches are going to be ignored in all subsequent scans. Continue?")
msg = tr(
"All selected %d matches are going to be ignored in all subsequent scans. Continue?"
)
if not self.view.ask_yes_no(msg % len(dupes)):
return
for dupe in dupes:
@@ -400,22 +422,22 @@ class DupeGuru(Broadcaster):
:param str filter: filter to apply
"""
self.results.apply_filter(None)
if self.options['escape_filter_regexp']:
filter = escape(filter, set('()[]\\.|+?^'))
filter = escape(filter, '*', '.')
if self.options["escape_filter_regexp"]:
filter = escape(filter, set("()[]\\.|+?^"))
filter = escape(filter, "*", ".")
self.results.apply_filter(filter)
self._results_changed()
def clean_empty_dirs(self, path):
if self.options['clean_empty_dirs']:
while delete_if_empty(path, ['.DS_Store']):
if self.options["clean_empty_dirs"]:
while delete_if_empty(path, [".DS_Store"]):
path = path.parent()
def clear_picture_cache(self):
try:
os.remove(self._get_picture_cache_path())
except FileNotFoundError:
pass # we don't care
pass # we don't care
def copy_or_move(self, dupe, copy: bool, destination: str, dest_type: DestType):
source_path = dupe.path
@@ -444,6 +466,7 @@ class DupeGuru(Broadcaster):
:param bool copy: If True, duplicates will be copied instead of moved
"""
def do(j):
def op(dupe):
j.add_progress()
@@ -459,7 +482,7 @@ class DupeGuru(Broadcaster):
prompt = tr("Select a directory to {} marked files to").format(opname)
destination = self.view.select_dest_folder(prompt)
if destination:
desttype = self.options['copymove_dest_type']
desttype = self.options["copymove_dest_type"]
jobid = JobType.Copy if copy else JobType.Move
self._start_job(jobid, do)
@@ -472,8 +495,9 @@ class DupeGuru(Broadcaster):
if not self.deletion_options.show(self.results.mark_count):
return
args = [
self.deletion_options.link_deleted, self.deletion_options.use_hardlinks,
self.deletion_options.direct
self.deletion_options.link_deleted,
self.deletion_options.use_hardlinks,
self.deletion_options.direct,
]
logging.debug("Starting deletion job with args %r", args)
self._start_job(JobType.Delete, self._do_delete, args=args)
@@ -495,7 +519,9 @@ class DupeGuru(Broadcaster):
The columns and their order in the resulting CSV file is determined in the same way as in
:meth:`export_to_xhtml`.
"""
dest_file = self.view.select_dest_file(tr("Select a destination for your exported CSV"), 'csv')
dest_file = self.view.select_dest_file(
tr("Select a destination for your exported CSV"), "csv"
)
if dest_file:
colnames, rows = self._get_export_data()
try:
@@ -505,13 +531,16 @@ class DupeGuru(Broadcaster):
def get_display_info(self, dupe, group, delta=False):
def empty_data():
return {c.name: '---' for c in self.result_table.COLUMNS[1:]}
return {c.name: "---" for c in self.result_table.COLUMNS[1:]}
if (dupe is None) or (group is None):
return empty_data()
try:
return dupe.get_display_info(group, delta)
except Exception as e:
logging.warning("Exception on GetDisplayInfo for %s: %s", str(dupe.path), str(e))
logging.warning(
"Exception on GetDisplayInfo for %s: %s", str(dupe.path), str(e)
)
return empty_data()
def invoke_custom_command(self):
@@ -521,9 +550,11 @@ class DupeGuru(Broadcaster):
is replaced with that dupe's ref file. If there's no selection, the command is not invoked.
If the dupe is a ref, ``%d`` and ``%r`` will be the same.
"""
cmd = self.view.get_default('CustomCommand')
cmd = self.view.get_default("CustomCommand")
if not cmd:
msg = tr("You have no custom command set up. Set it up in your preferences.")
msg = tr(
"You have no custom command set up. Set it up in your preferences."
)
self.view.show_message(msg)
return
if not self.selected_dupes:
@@ -531,8 +562,8 @@ class DupeGuru(Broadcaster):
dupe = self.selected_dupes[0]
group = self.results.get_group_of_duplicate(dupe)
ref = group.ref
cmd = cmd.replace('%d', str(dupe.path))
cmd = cmd.replace('%r', str(ref.path))
cmd = cmd.replace("%d", str(dupe.path))
cmd = cmd.replace("%r", str(ref.path))
match = re.match(r'"([^"]+)"(.*)', cmd)
if match is not None:
# This code here is because subprocess. Popen doesn't seem to accept, under Windows,
@@ -551,9 +582,9 @@ class DupeGuru(Broadcaster):
is persistent data, is the same as when the last session was closed (when :meth:`save` was
called).
"""
self.directories.load_from_file(op.join(self.appdata, 'last_directories.xml'))
self.notify('directories_changed')
p = op.join(self.appdata, 'ignore_list.xml')
self.directories.load_from_file(op.join(self.appdata, "last_directories.xml"))
self.notify("directories_changed")
p = op.join(self.appdata, "ignore_list.xml")
self.ignore_list.load_from_xml(p)
self.ignore_list_dialog.refresh()
@@ -562,8 +593,10 @@ class DupeGuru(Broadcaster):
:param str filename: path of the XML file (created with :meth:`save_as`) to load
"""
def do(j):
self.results.load_from_xml(filename, self._get_file, j)
self._start_job(JobType.Load, do)
def make_selected_reference(self):
@@ -588,35 +621,36 @@ class DupeGuru(Broadcaster):
if not self.result_table.power_marker:
if changed_groups:
self.selected_dupes = [
d for d in self.selected_dupes
d
for d in self.selected_dupes
if self.results.get_group_of_duplicate(d).ref is d
]
self.notify('results_changed')
self.notify("results_changed")
else:
# If we're in "Dupes Only" mode (previously called Power Marker), things are a bit
# different. The refs are not shown in the table, and if our operation is successful,
# this means that there's no way to follow our dupe selection. Then, the best thing to
# do is to keep our selection index-wise (different dupe selection, but same index
# selection).
self.notify('results_changed_but_keep_selection')
self.notify("results_changed_but_keep_selection")
def mark_all(self):
"""Set all dupes in the results as marked.
"""
self.results.mark_all()
self.notify('marking_changed')
self.notify("marking_changed")
def mark_none(self):
"""Set all dupes in the results as unmarked.
"""
self.results.mark_none()
self.notify('marking_changed')
self.notify("marking_changed")
def mark_invert(self):
"""Invert the marked state of all dupes in the results.
"""
self.results.mark_invert()
self.notify('marking_changed')
self.notify("marking_changed")
def mark_dupe(self, dupe, marked):
"""Change marked status of ``dupe``.
@@ -629,7 +663,7 @@ class DupeGuru(Broadcaster):
self.results.mark(dupe)
else:
self.results.unmark(dupe)
self.notify('marking_changed')
self.notify("marking_changed")
def open_selected(self):
"""Open :attr:`selected_dupes` with their associated application.
@@ -656,7 +690,7 @@ class DupeGuru(Broadcaster):
indexes = sorted(indexes, reverse=True)
for index in indexes:
del self.directories[index]
self.notify('directories_changed')
self.notify("directories_changed")
except IndexError:
pass
@@ -669,7 +703,7 @@ class DupeGuru(Broadcaster):
:type duplicates: list of :class:`~core.fs.File`
"""
self.results.remove_duplicates(self.without_ref(duplicates))
self.notify('results_changed_but_keep_selection')
self.notify("results_changed_but_keep_selection")
def remove_marked(self):
"""Removed marked duplicates from the results (without touching the files themselves).
@@ -724,7 +758,9 @@ class DupeGuru(Broadcaster):
if group.prioritize(key_func=sort_key):
count += 1
self._results_changed()
msg = tr("{} duplicate groups were changed by the re-prioritization.").format(count)
msg = tr("{} duplicate groups were changed by the re-prioritization.").format(
count
)
self.view.show_message(msg)
def reveal_selected(self):
@@ -734,10 +770,10 @@ class DupeGuru(Broadcaster):
def save(self):
if not op.exists(self.appdata):
os.makedirs(self.appdata)
self.directories.save_to_file(op.join(self.appdata, 'last_directories.xml'))
p = op.join(self.appdata, 'ignore_list.xml')
self.directories.save_to_file(op.join(self.appdata, "last_directories.xml"))
p = op.join(self.appdata, "ignore_list.xml")
self.ignore_list.save_to_xml(p)
self.notify('save_session')
self.notify("save_session")
def save_as(self, filename):
"""Save results in ``filename``.
@@ -756,7 +792,9 @@ class DupeGuru(Broadcaster):
"""
scanner = self.SCANNER_CLASS()
if not self.directories.has_any_file():
self.view.show_message(tr("The selected directories contain no scannable file."))
self.view.show_message(
tr("The selected directories contain no scannable file.")
)
return
# Send relevant options down to the scanner instance
for k, v in self.options.items():
@@ -771,12 +809,16 @@ class DupeGuru(Broadcaster):
def do(j):
j.set_progress(0, tr("Collecting files to scan"))
if scanner.scan_type == ScanType.Folders:
files = list(self.directories.get_folders(folderclass=se.fs.Folder, j=j))
files = list(
self.directories.get_folders(folderclass=se.fs.Folder, j=j)
)
else:
files = list(self.directories.get_files(fileclasses=self.fileclasses, j=j))
if self.options['ignore_hardlink_matches']:
files = list(
self.directories.get_files(fileclasses=self.fileclasses, j=j)
)
if self.options["ignore_hardlink_matches"]:
files = self._remove_hardlink_dupes(files)
logging.info('Scanning %d files' % len(files))
logging.info("Scanning %d files" % len(files))
self.results.groups = scanner.get_dupe_groups(files, self.ignore_list, j)
self.discarded_file_count = scanner.discarded_file_count
@@ -792,12 +834,16 @@ class DupeGuru(Broadcaster):
markfunc = self.results.mark
for dupe in selected:
markfunc(dupe)
self.notify('marking_changed')
self.notify("marking_changed")
def without_ref(self, dupes):
"""Returns ``dupes`` with all reference elements removed.
"""
return [dupe for dupe in dupes if self.results.get_group_of_duplicate(dupe).ref is not dupe]
return [
dupe
for dupe in dupes
if self.results.get_group_of_duplicate(dupe).ref is not dupe
]
def get_default(self, key, fallback_value=None):
result = nonone(self.view.get_default(key), fallback_value)
@@ -812,7 +858,7 @@ class DupeGuru(Broadcaster):
def set_default(self, key, value):
self.view.set_default(key, value)
#--- Properties
# --- Properties
@property
def stat_line(self):
result = self.results.stat_line
@@ -836,12 +882,21 @@ class DupeGuru(Broadcaster):
@property
def METADATA_TO_READ(self):
if self.app_mode == AppMode.Picture:
return ['size', 'mtime', 'dimensions', 'exif_timestamp']
return ["size", "mtime", "dimensions", "exif_timestamp"]
elif self.app_mode == AppMode.Music:
return [
'size', 'mtime', 'duration', 'bitrate', 'samplerate', 'title', 'artist',
'album', 'genre', 'year', 'track', 'comment'
"size",
"mtime",
"duration",
"bitrate",
"samplerate",
"title",
"artist",
"album",
"genre",
"year",
"track",
"comment",
]
else:
return ['size', 'mtime']
return ["size", "mtime"]