mirror of
https://github.com/arsenetar/dupeguru.git
synced 2025-03-11 06:04:36 +00:00
Ignore path and filename based on regex
* Added initial draft for test suit * Fixed small logging bug
This commit is contained in:
parent
089f00adb8
commit
470307aa3c
@ -5,6 +5,7 @@
|
|||||||
# http://www.gnu.org/licenses/gpl-3.0.html
|
# http://www.gnu.org/licenses/gpl-3.0.html
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
from xml.etree import ElementTree as ET
|
from xml.etree import ElementTree as ET
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@ -52,12 +53,34 @@ class Directories:
|
|||||||
Then, when the user starts the scan, :meth:`get_files` is called to retrieve all files (wrapped
|
Then, when the user starts the scan, :meth:`get_files` is called to retrieve all files (wrapped
|
||||||
in :mod:`core.fs`) that have to be scanned according to the chosen folders/states.
|
in :mod:`core.fs`) that have to be scanned according to the chosen folders/states.
|
||||||
"""
|
"""
|
||||||
|
deny_list_str = set()
|
||||||
|
deny_list_re = set()
|
||||||
|
deny_list_re_files = set()
|
||||||
|
|
||||||
# ---Override
|
# ---Override
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._dirs = []
|
self._dirs = []
|
||||||
# {path: state}
|
# {path: state}
|
||||||
self.states = {}
|
self.states = {}
|
||||||
|
self.deny_list_str.add(r".*Recycle\.Bin$")
|
||||||
|
self.deny_list_str.add(r"denyme.*")
|
||||||
|
self.deny_list_str.add(r".*denyme")
|
||||||
|
self.deny_list_str.add(r".*/test/denyme*")
|
||||||
|
self.deny_list_str.add(r".*/test/*denyme")
|
||||||
|
self.deny_list_str.add(r"denyme")
|
||||||
|
self.deny_list_str.add(r".*\/\..*")
|
||||||
|
self.deny_list_str.add(r"^\..*")
|
||||||
|
self.compile_re()
|
||||||
|
|
||||||
|
def compile_re(self):
|
||||||
|
for expr in self.deny_list_str:
|
||||||
|
try:
|
||||||
|
self.deny_list_re.add(re.compile(expr))
|
||||||
|
if os.sep not in expr:
|
||||||
|
self.deny_list_re_files.add(re.compile(expr))
|
||||||
|
except Exception as e:
|
||||||
|
logging.debug(f"Invalid regular expression \"{expr}\" in exclude list: {e}")
|
||||||
|
print(f"re_all: {self.deny_list_re}\nre_files: {self.deny_list_re_files}")
|
||||||
|
|
||||||
def __contains__(self, path):
|
def __contains__(self, path):
|
||||||
for p in self._dirs:
|
for p in self._dirs:
|
||||||
@ -75,12 +98,15 @@ class Directories:
|
|||||||
return len(self._dirs)
|
return len(self._dirs)
|
||||||
|
|
||||||
# ---Private
|
# ---Private
|
||||||
def _default_state_for_path(self, path):
|
def _default_state_for_path(self, path, deny_list_re=deny_list_re):
|
||||||
# Override this in subclasses to specify the state of some special folders.
|
# Override this in subclasses to specify the state of some special folders.
|
||||||
if path.name.startswith("."): # hidden
|
# if path.name.startswith("."): # hidden
|
||||||
return DirectoryState.Excluded
|
# return DirectoryState.Excluded
|
||||||
|
for denied_path_re in deny_list_re:
|
||||||
|
if denied_path_re.match(str(path)):
|
||||||
|
return DirectoryState.Excluded
|
||||||
|
|
||||||
def _get_files(self, from_path, fileclasses, j):
|
def _get_files(self, from_path, fileclasses, j, deny_list_re=deny_list_re_files):
|
||||||
for root, dirs, files in os.walk(str(from_path)):
|
for root, dirs, files in os.walk(str(from_path)):
|
||||||
j.check_if_cancelled()
|
j.check_if_cancelled()
|
||||||
root = Path(root)
|
root = Path(root)
|
||||||
@ -93,9 +119,15 @@ class Directories:
|
|||||||
del dirs[:]
|
del dirs[:]
|
||||||
try:
|
try:
|
||||||
if state != DirectoryState.Excluded:
|
if state != DirectoryState.Excluded:
|
||||||
found_files = [
|
found_files = []
|
||||||
fs.get_file(root + f, fileclasses=fileclasses) for f in files
|
for f in files:
|
||||||
]
|
found = False
|
||||||
|
for expr in deny_list_re:
|
||||||
|
found = expr.match(f)
|
||||||
|
if found:
|
||||||
|
break
|
||||||
|
if not found:
|
||||||
|
found_files.append(fs.get_file(root + f, fileclasses=fileclasses))
|
||||||
found_files = [f for f in found_files if f is not None]
|
found_files = [f for f in found_files if f is not None]
|
||||||
# In some cases, directories can be considered as files by dupeGuru, which is
|
# In some cases, directories can be considered as files by dupeGuru, which is
|
||||||
# why we have this line below. In fact, there only one case: Bundle files under
|
# why we have this line below. In fact, there only one case: Bundle files under
|
||||||
@ -108,7 +140,7 @@ class Directories:
|
|||||||
logging.debug(
|
logging.debug(
|
||||||
"Collected %d files in folder %s",
|
"Collected %d files in folder %s",
|
||||||
len(found_files),
|
len(found_files),
|
||||||
str(from_path),
|
str(root),
|
||||||
)
|
)
|
||||||
for file in found_files:
|
for file in found_files:
|
||||||
file.is_ref = state == DirectoryState.Reference
|
file.is_ref = state == DirectoryState.Reference
|
||||||
@ -116,7 +148,7 @@ class Directories:
|
|||||||
except (EnvironmentError, fs.InvalidPath):
|
except (EnvironmentError, fs.InvalidPath):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _get_folders(self, from_folder, j):
|
def _get_folders(self, from_folder, j, deny_list_re=deny_list_re):
|
||||||
j.check_if_cancelled()
|
j.check_if_cancelled()
|
||||||
try:
|
try:
|
||||||
for subfolder in from_folder.subfolders:
|
for subfolder in from_folder.subfolders:
|
||||||
@ -162,7 +194,7 @@ class Directories:
|
|||||||
except EnvironmentError:
|
except EnvironmentError:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def get_files(self, fileclasses=None, j=job.nulljob):
|
def get_files(self, fileclasses=None, j=job.nulljob, deny_list_re=deny_list_re_files):
|
||||||
"""Returns a list of all files that are not excluded.
|
"""Returns a list of all files that are not excluded.
|
||||||
|
|
||||||
Returned files also have their ``is_ref`` attr set if applicable.
|
Returned files also have their ``is_ref`` attr set if applicable.
|
||||||
@ -170,7 +202,7 @@ class Directories:
|
|||||||
if fileclasses is None:
|
if fileclasses is None:
|
||||||
fileclasses = [fs.File]
|
fileclasses = [fs.File]
|
||||||
for path in self._dirs:
|
for path in self._dirs:
|
||||||
for file in self._get_files(path, fileclasses=fileclasses, j=j):
|
for file in self._get_files(path, fileclasses=fileclasses, j=j, deny_list_re=deny_list_re):
|
||||||
yield file
|
yield file
|
||||||
|
|
||||||
def get_folders(self, folderclass=None, j=job.nulljob):
|
def get_folders(self, folderclass=None, j=job.nulljob):
|
||||||
@ -185,7 +217,7 @@ class Directories:
|
|||||||
for folder in self._get_folders(from_folder, j):
|
for folder in self._get_folders(from_folder, j):
|
||||||
yield folder
|
yield folder
|
||||||
|
|
||||||
def get_state(self, path):
|
def get_state(self, path, denylist=deny_list_re):
|
||||||
"""Returns the state of ``path``.
|
"""Returns the state of ``path``.
|
||||||
|
|
||||||
:rtype: :class:`DirectoryState`
|
:rtype: :class:`DirectoryState`
|
||||||
@ -193,7 +225,7 @@ class Directories:
|
|||||||
# direct match? easy result.
|
# direct match? easy result.
|
||||||
if path in self.states:
|
if path in self.states:
|
||||||
return self.states[path]
|
return self.states[path]
|
||||||
state = self._default_state_for_path(path) or DirectoryState.Normal
|
state = self._default_state_for_path(path, denylist) or DirectoryState.Normal
|
||||||
prevlen = 0
|
prevlen = 0
|
||||||
# we loop through the states to find the longest matching prefix
|
# we loop through the states to find the longest matching prefix
|
||||||
for p, s in self.states.items():
|
for p, s in self.states.items():
|
||||||
|
11
core/fs.py
11
core/fs.py
@ -245,7 +245,7 @@ class Folder(File):
|
|||||||
return not path.islink() and path.isdir()
|
return not path.islink() and path.isdir()
|
||||||
|
|
||||||
|
|
||||||
def get_file(path, fileclasses=[File]):
|
def get_file(path, fileclasses=[File], deny_list_re=set()):
|
||||||
"""Wraps ``path`` around its appropriate :class:`File` class.
|
"""Wraps ``path`` around its appropriate :class:`File` class.
|
||||||
|
|
||||||
Whether a class is "appropriate" is decided by :meth:`File.can_handle`
|
Whether a class is "appropriate" is decided by :meth:`File.can_handle`
|
||||||
@ -255,10 +255,15 @@ def get_file(path, fileclasses=[File]):
|
|||||||
"""
|
"""
|
||||||
for fileclass in fileclasses:
|
for fileclass in fileclasses:
|
||||||
if fileclass.can_handle(path):
|
if fileclass.can_handle(path):
|
||||||
|
# print(f"returning {path}")
|
||||||
|
# for expr in deny_list_re:
|
||||||
|
# if expr.match(str(path.name)):
|
||||||
|
# print(f"FOUND {repr(expr)} in {str(path.name)}")
|
||||||
|
# return
|
||||||
return fileclass(path)
|
return fileclass(path)
|
||||||
|
|
||||||
|
|
||||||
def get_files(path, fileclasses=[File]):
|
def get_files(path, fileclasses=[File], deny_list_re=set()):
|
||||||
"""Returns a list of :class:`File` for each file contained in ``path``.
|
"""Returns a list of :class:`File` for each file contained in ``path``.
|
||||||
|
|
||||||
:param Path path: path to scan
|
:param Path path: path to scan
|
||||||
@ -268,7 +273,7 @@ def get_files(path, fileclasses=[File]):
|
|||||||
try:
|
try:
|
||||||
result = []
|
result = []
|
||||||
for path in path.listdir():
|
for path in path.listdir():
|
||||||
file = get_file(path, fileclasses=fileclasses)
|
file = get_file(path, fileclasses=fileclasses, deny_list_re=deny_list_re)
|
||||||
if file is not None:
|
if file is not None:
|
||||||
result.append(file)
|
result.append(file)
|
||||||
return result
|
return result
|
||||||
|
@ -323,7 +323,7 @@ def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir):
|
|||||||
def test_default_path_state_override(tmpdir):
|
def test_default_path_state_override(tmpdir):
|
||||||
# It's possible for a subclass to override the default state of a path
|
# It's possible for a subclass to override the default state of a path
|
||||||
class MyDirectories(Directories):
|
class MyDirectories(Directories):
|
||||||
def _default_state_for_path(self, path):
|
def _default_state_for_path(self, path, denylist):
|
||||||
if "foobar" in path:
|
if "foobar" in path:
|
||||||
return DirectoryState.Excluded
|
return DirectoryState.Excluded
|
||||||
|
|
||||||
@ -341,3 +341,54 @@ def test_default_path_state_override(tmpdir):
|
|||||||
d.set_state(p1["foobar"], DirectoryState.Normal)
|
d.set_state(p1["foobar"], DirectoryState.Normal)
|
||||||
eq_(d.get_state(p1["foobar"]), DirectoryState.Normal)
|
eq_(d.get_state(p1["foobar"]), DirectoryState.Normal)
|
||||||
eq_(len(list(d.get_files())), 2)
|
eq_(len(list(d.get_files())), 2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_exclude_list_regular_expressions(tmpdir):
|
||||||
|
d = Directories()
|
||||||
|
d.deny_list_str.clear()
|
||||||
|
d.deny_list_re.clear()
|
||||||
|
d.deny_list_re_files.clear()
|
||||||
|
# This should only exlude the directory, but not the contained files if
|
||||||
|
# its status is set to normal after loading it in the directory tree
|
||||||
|
d.deny_list_str.add(r".*Recycle\.Bin$")
|
||||||
|
d.deny_list_str.add(r"denyme.*")
|
||||||
|
# d.deny_list_str.add(r".*denymetoo")
|
||||||
|
# d.deny_list_str.add(r"denyme")
|
||||||
|
d.deny_list_str.add(r".*\/\..*")
|
||||||
|
d.deny_list_str.add(r"^\..*")
|
||||||
|
d.compile_re()
|
||||||
|
p1 = Path(str(tmpdir))
|
||||||
|
# Should be ignored on Windows only (by default)
|
||||||
|
p1["Recycle.Bin"].mkdir()
|
||||||
|
p1["Recycle.Bin/somerecycledfile"].open("w").close()
|
||||||
|
|
||||||
|
p1["denyme_blah.txt"].open("w").close()
|
||||||
|
p1["blah_denymetoo"].open("w").close()
|
||||||
|
p1["blah_denyme"].open("w").close()
|
||||||
|
|
||||||
|
p1[".hidden_file"].open("w").close()
|
||||||
|
p1[".hidden_dir"].mkdir()
|
||||||
|
p1[".hidden_dir/somenormalfile1"].open("w").close()
|
||||||
|
p1[".hidden_dir/somenormalfile2_denyme"].open("w").close()
|
||||||
|
|
||||||
|
p1["foobar"].mkdir()
|
||||||
|
p1["foobar/somefile"].open("w").close()
|
||||||
|
d.add_path(p1)
|
||||||
|
eq_(d.get_state(p1["Recycle.Bin"]), DirectoryState.Excluded)
|
||||||
|
eq_(d.get_state(p1["foobar"]), DirectoryState.Normal)
|
||||||
|
files = list(d.get_files())
|
||||||
|
files = [file.name for file in files]
|
||||||
|
print(f"first files: {files}")
|
||||||
|
assert "somerecycledfile" not in files
|
||||||
|
assert "denyme_blah.txt" not in files
|
||||||
|
assert ".hidden_file" not in files
|
||||||
|
assert "somefile1" not in files
|
||||||
|
assert "somefile2_denyme" not in files
|
||||||
|
# Overriding the default state from the Directory Tree
|
||||||
|
d.set_state(p1["Recycle.Bin"], DirectoryState.Normal)
|
||||||
|
d.set_state(p1[".hidden_dir"], DirectoryState.Normal)
|
||||||
|
files = list(d.get_files())
|
||||||
|
files = [file.name for file in files]
|
||||||
|
print(f"second files: {files}")
|
||||||
|
assert "somerecycledfile" in files
|
||||||
|
assert "somenormalfile1" in files
|
||||||
|
2
tox.ini
2
tox.ini
@ -10,7 +10,7 @@ setenv =
|
|||||||
PYTHON="{envpython}"
|
PYTHON="{envpython}"
|
||||||
commands =
|
commands =
|
||||||
make modules
|
make modules
|
||||||
py.test core hscommon
|
{posargs:py.test} core hscommon
|
||||||
deps =
|
deps =
|
||||||
-r{toxinidir}/requirements.txt
|
-r{toxinidir}/requirements.txt
|
||||||
-r{toxinidir}/requirements-extra.txt
|
-r{toxinidir}/requirements-extra.txt
|
||||||
|
Loading…
x
Reference in New Issue
Block a user