Squashed commit of the following:

commit 8b15fe9a502ebf4841c6529e7098cef03a6a5e6f
Author: Andrew Senetar <arsenetar@gmail.com>
Date:   Sun Mar 27 23:48:15 2022 -0500

    Finish up changes to copy_or_move

commit 21f6a32cf3186a400af8f30e67ad2743dc9a49bd
Author: Andrew Senetar <arsenetar@gmail.com>
Date:   Thu Mar 17 23:56:52 2022 -0500

    Migrate from hscommon.path to pathlib
    - Part one, this gets all hscommon and core tests passing
    - App appears to be able to load directories and complete scans, need further testing
    - app.py copy_or_move needs some additional work
This commit is contained in:
Andrew Senetar 2022-03-27 23:50:03 -05:00
parent 5ed5eddde6
commit da9f8b2b9d
Signed by: arsenetar
GPG Key ID: C63300DCE48AB2F1
17 changed files with 267 additions and 731 deletions

View File

@ -10,11 +10,11 @@ import logging
import subprocess
import re
import shutil
from pathlib import Path
from send2trash import send2trash
from hscommon.jobprogress import job
from hscommon.notify import Broadcaster
from hscommon.path import Path
from hscommon.conflict import smart_move, smart_copy
from hscommon.gui.progress_window import ProgressWindow
from hscommon.util import delete_if_empty, first, escape, nonone, allsame
@ -415,7 +415,7 @@ class DupeGuru(Broadcaster):
def clean_empty_dirs(self, path):
if self.options["clean_empty_dirs"]:
while delete_if_empty(path, [".DS_Store"]):
path = path.parent()
path = path.parent
def clear_picture_cache(self):
try:
@ -428,25 +428,25 @@ class DupeGuru(Broadcaster):
def copy_or_move(self, dupe, copy: bool, destination: str, dest_type: DestType):
source_path = dupe.path
location_path = first(p for p in self.directories if dupe.path in p)
location_path = first(p for p in self.directories if p in dupe.path.parents)
dest_path = Path(destination)
if dest_type in {DestType.RELATIVE, DestType.ABSOLUTE}:
# no filename, no windows drive letter
source_base = source_path.remove_drive_letter().parent()
source_base = source_path.relative_to(source_path.anchor).parent
if dest_type == DestType.RELATIVE:
source_base = source_base[location_path:]
dest_path = dest_path[source_base]
source_base = source_base.relative_to(location_path.relative_to(location_path.anchor))
dest_path = dest_path.joinpath(source_base)
if not dest_path.exists():
dest_path.makedirs()
dest_path.mkdir(parents=True)
# Add filename to dest_path. For file move/copy, it's not required, but for folders, yes.
dest_path = dest_path[source_path.name]
dest_path = dest_path.joinpath(source_path.name)
logging.debug("Copy/Move operation from '%s' to '%s'", source_path, dest_path)
# Raises an EnvironmentError if there's a problem
if copy:
smart_copy(source_path, dest_path)
else:
smart_move(source_path, dest_path)
self.clean_empty_dirs(source_path.parent())
self.clean_empty_dirs(source_path.parent)
def copy_or_move_marked(self, copy):
"""Start an async move (or copy) job on marked duplicates.

View File

@ -7,9 +7,9 @@
import os
from xml.etree import ElementTree as ET
import logging
from pathlib import Path
from hscommon.jobprogress import job
from hscommon.path import Path
from hscommon.util import FileOrPath
from hscommon.trans import tr
@ -63,7 +63,7 @@ class Directories:
def __contains__(self, path):
for p in self._dirs:
if path in p:
if path == p or p in path.parents:
return True
return False
@ -94,7 +94,9 @@ class Directories:
j.check_if_cancelled()
root_path = Path(root)
state = self.get_state(root_path)
if state == DirectoryState.EXCLUDED and not any(p[: len(root_path)] == root_path for p in self.states):
if state == DirectoryState.EXCLUDED and not any(
p.parts[: len(root_path.parts)] == root_path.parts for p in self.states
):
# Recursively get files from folders with lots of subfolder is expensive. However, there
# might be a subfolder in this path that is not excluded. What we want to do is to skim
# through self.states and see if we must continue, or we can stop right here to save time
@ -103,19 +105,19 @@ class Directories:
if state != DirectoryState.EXCLUDED:
# Old logic
if self._exclude_list is None or not self._exclude_list.mark_count:
found_files = [fs.get_file(root_path + f, fileclasses=fileclasses) for f in files]
found_files = [fs.get_file(root_path.joinpath(f), fileclasses=fileclasses) for f in files]
else:
found_files = []
# print(f"len of files: {len(files)} {files}")
for f in files:
if not self._exclude_list.is_excluded(root, f):
found_files.append(fs.get_file(root_path + f, fileclasses=fileclasses))
found_files.append(fs.get_file(root_path.joinpath(f), fileclasses=fileclasses))
found_files = [f for f in found_files if f is not None]
# In some cases, directories can be considered as files by dupeGuru, which is
# why we have this line below. In fact, there only one case: Bundle files under
# OS X... In other situations, this forloop will do nothing.
for d in dirs[:]:
f = fs.get_file(root_path + d, fileclasses=fileclasses)
f = fs.get_file(root_path.joinpath(d), fileclasses=fileclasses)
if f is not None:
found_files.append(f)
dirs.remove(d)
@ -159,7 +161,7 @@ class Directories:
raise AlreadyThereError()
if not path.exists():
raise InvalidPathError()
self._dirs = [p for p in self._dirs if p not in path]
self._dirs = [p for p in self._dirs if path not in p.parents]
self._dirs.append(path)
@staticmethod
@ -170,7 +172,7 @@ class Directories:
:rtype: list of Path
"""
try:
subpaths = [p for p in path.listdir() if p.isdir()]
subpaths = [p for p in path.glob("*") if p.is_dir()]
subpaths.sort(key=lambda x: x.name.lower())
return subpaths
except EnvironmentError:
@ -225,8 +227,8 @@ class Directories:
# we loop through the states to find the longest matching prefix
# if the parent has a state in cache, return that state
for p, s in self.states.items():
if p.is_parent_of(path) and len(p) > prevlen:
prevlen = len(p)
if p in path.parents and len(p.parts) > prevlen:
prevlen = len(p.parts)
state = s
return state
@ -296,6 +298,6 @@ class Directories:
if self.get_state(path) == state:
return
for iter_path in list(self.states.keys()):
if path.is_parent_of(iter_path):
if path in iter_path.parents:
del self.states[iter_path]
self.states[path] = state

View File

@ -28,7 +28,7 @@ import sqlite3
from threading import Lock
from typing import Any, AnyStr, Union
from hscommon.path import Path
from pathlib import Path
from hscommon.util import nonone, get_file_ext
__all__ = [
@ -302,14 +302,14 @@ class File:
@classmethod
def can_handle(cls, path):
"""Returns whether this file wrapper class can handle ``path``."""
return not path.islink() and path.isfile()
return not path.is_symlink() and path.is_file()
def rename(self, newname):
if newname == self.name:
return
destpath = self.path.parent()[newname]
destpath = self.path.parent.joinpath(newname)
if destpath.exists():
raise AlreadyExistsError(newname, self.path.parent())
raise AlreadyExistsError(newname, self.path.parent)
try:
self.path.rename(destpath)
except EnvironmentError:
@ -333,7 +333,7 @@ class File:
@property
def folder_path(self):
return self.path.parent()
return self.path.parent
class Folder(File):
@ -377,13 +377,13 @@ class Folder(File):
@property
def subfolders(self):
if self._subfolders is None:
subfolders = [p for p in self.path.listdir() if not p.islink() and p.isdir()]
subfolders = [p for p in self.path.glob("*") if not p.is_symlink() and p.is_dir()]
self._subfolders = [self.__class__(p) for p in subfolders]
return self._subfolders
@classmethod
def can_handle(cls, path):
return not path.islink() and path.isdir()
return not path.is_symlink() and path.is_dir()
def get_file(path, fileclasses=[File]):
@ -408,7 +408,7 @@ def get_files(path, fileclasses=[File]):
assert all(issubclass(fileclass, File) for fileclass in fileclasses)
try:
result = []
for path in path.listdir():
for path in path.glob("*"):
file = get_file(path, fileclasses=fileclasses)
if file is not None:
result.append(file)

View File

@ -82,7 +82,7 @@ class FolderCategory(ValueListCategory):
def sort_key(self, dupe, crit_value):
value = self.extract_value(dupe)
if value[: len(crit_value)] == crit_value:
if value.is_relative_to(crit_value):
return 0
else:
return 1

View File

@ -134,7 +134,7 @@ class Scanner:
return False
if is_same_with_digit(refname, dupename):
return True
return len(dupe.path) > len(ref.path)
return len(dupe.path.parts) > len(ref.path.parts)
@staticmethod
def get_scan_options():
@ -164,7 +164,7 @@ class Scanner:
toremove = set()
last_parent_path = sortedpaths[0]
for p in sortedpaths[1:]:
if p in last_parent_path:
if last_parent_path in p.parents:
toremove.add(p)
else:
last_parent_path = p

View File

@ -9,7 +9,7 @@ import os.path as op
import logging
import pytest
from hscommon.path import Path
from pathlib import Path
import hscommon.conflict
import hscommon.util
from hscommon.testutil import eq_, log_calls
@ -56,7 +56,7 @@ class TestCaseDupeGuru:
# for this unit is pathetic. What's done is done. My approach now is to add tests for
# every change I want to make. The blowup was caused by a missing import.
p = Path(str(tmpdir))
p["foo"].open("w").close()
p.joinpath("foo").touch()
monkeypatch.setattr(
hscommon.conflict,
"smart_copy",
@ -71,19 +71,19 @@ class TestCaseDupeGuru:
dgapp.copy_or_move(f, True, "some_destination", 0)
eq_(1, len(hscommon.conflict.smart_copy.calls))
call = hscommon.conflict.smart_copy.calls[0]
eq_(call["dest_path"], op.join("some_destination", "foo"))
eq_(call["dest_path"], Path("some_destination", "foo"))
eq_(call["source_path"], f.path)
def test_copy_or_move_clean_empty_dirs(self, tmpdir, monkeypatch):
tmppath = Path(str(tmpdir))
sourcepath = tmppath["source"]
sourcepath = tmppath.joinpath("source")
sourcepath.mkdir()
sourcepath["myfile"].open("w")
sourcepath.joinpath("myfile").touch()
app = TestApp().app
app.directories.add_path(tmppath)
[myfile] = app.directories.get_files()
monkeypatch.setattr(app, "clean_empty_dirs", log_calls(lambda path: None))
app.copy_or_move(myfile, False, tmppath["dest"], 0)
app.copy_or_move(myfile, False, tmppath.joinpath("dest"), 0)
calls = app.clean_empty_dirs.calls
eq_(1, len(calls))
eq_(sourcepath, calls[0]["path"])
@ -106,8 +106,8 @@ class TestCaseDupeGuru:
# If the ignore_hardlink_matches option is set, don't match files hardlinking to the same
# inode.
tmppath = Path(str(tmpdir))
tmppath["myfile"].open("w").write("foo")
os.link(str(tmppath["myfile"]), str(tmppath["hardlink"]))
tmppath.joinpath("myfile").open("wt").write("foo")
os.link(str(tmppath.joinpath("myfile")), str(tmppath.joinpath("hardlink")))
app = TestApp().app
app.directories.add_path(tmppath)
app.options["scan_type"] = ScanType.CONTENTS
@ -153,7 +153,7 @@ class TestCaseDupeGuruCleanEmptyDirs:
# delete_if_empty must be recursively called up in the path until it returns False
@log_calls
def mock_delete_if_empty(path, files_to_delete=[]):
return len(path) > 1
return len(path.parts) > 1
monkeypatch.setattr(hscommon.util, "delete_if_empty", mock_delete_if_empty)
# XXX This monkeypatch is temporary. will be fixed in a better monkeypatcher.
@ -180,8 +180,8 @@ class TestCaseDupeGuruWithResults:
self.rtable.refresh()
tmpdir = request.getfixturevalue("tmpdir")
tmppath = Path(str(tmpdir))
tmppath["foo"].mkdir()
tmppath["bar"].mkdir()
tmppath.joinpath("foo").mkdir()
tmppath.joinpath("bar").mkdir()
self.app.directories.add_path(tmppath)
def test_get_objects(self, do_setup):
@ -424,12 +424,9 @@ class TestCaseDupeGuruRenameSelected:
def do_setup(self, request):
tmpdir = request.getfixturevalue("tmpdir")
p = Path(str(tmpdir))
fp = open(str(p["foo bar 1"]), mode="w")
fp.close()
fp = open(str(p["foo bar 2"]), mode="w")
fp.close()
fp = open(str(p["foo bar 3"]), mode="w")
fp.close()
p.joinpath("foo bar 1").touch()
p.joinpath("foo bar 2").touch()
p.joinpath("foo bar 3").touch()
files = fs.get_files(p)
for f in files:
f.is_ref = False
@ -451,7 +448,7 @@ class TestCaseDupeGuruRenameSelected:
g = self.groups[0]
self.rtable.select([1])
assert app.rename_selected("renamed")
names = [p.name for p in self.p.listdir()]
names = [p.name for p in self.p.glob("*")]
assert "renamed" in names
assert "foo bar 2" not in names
eq_(g.dupes[0].name, "renamed")
@ -464,7 +461,7 @@ class TestCaseDupeGuruRenameSelected:
assert not app.rename_selected("renamed")
msg = logging.warning.calls[0]["msg"]
eq_("dupeGuru Warning: list index out of range", msg)
names = [p.name for p in self.p.listdir()]
names = [p.name for p in self.p.glob("*")]
assert "renamed" not in names
assert "foo bar 2" in names
eq_(g.dupes[0].name, "foo bar 2")
@ -477,7 +474,7 @@ class TestCaseDupeGuruRenameSelected:
assert not app.rename_selected("foo bar 1")
msg = logging.warning.calls[0]["msg"]
assert msg.startswith("dupeGuru Warning: 'foo bar 1' already exists in")
names = [p.name for p in self.p.listdir()]
names = [p.name for p in self.p.glob("*")]
assert "foo bar 1" in names
assert "foo bar 2" in names
eq_(g.dupes[0].name, "foo bar 2")
@ -488,9 +485,9 @@ class TestAppWithDirectoriesInTree:
def do_setup(self, request):
tmpdir = request.getfixturevalue("tmpdir")
p = Path(str(tmpdir))
p["sub1"].mkdir()
p["sub2"].mkdir()
p["sub3"].mkdir()
p.joinpath("sub1").mkdir()
p.joinpath("sub2").mkdir()
p.joinpath("sub3").mkdir()
app = TestApp()
self.app = app.app
self.dtree = app.dtree

View File

@ -5,7 +5,7 @@
# http://www.gnu.org/licenses/gpl-3.0.html
from hscommon.testutil import TestApp as TestAppBase, CallLogger, eq_, with_app # noqa
from hscommon.path import Path
from pathlib import Path
from hscommon.util import get_file_ext, format_size
from hscommon.gui.column import Column
from hscommon.jobprogress.job import nulljob, JobCancelled
@ -111,11 +111,11 @@ class NamedObject:
@property
def path(self):
return self._folder[self.name]
return self._folder.joinpath(self.name)
@property
def folder_path(self):
return self.path.parent()
return self.path.parent
@property
def extension(self):

View File

@ -10,7 +10,7 @@ import tempfile
import shutil
from pytest import raises
from hscommon.path import Path
from pathlib import Path
from hscommon.testutil import eq_
from hscommon.plat import ISWINDOWS
@ -26,29 +26,23 @@ from ..exclude import ExcludeList, ExcludeDict
def create_fake_fs(rootpath):
# We have it as a separate function because other units are using it.
rootpath = rootpath["fs"]
rootpath = rootpath.joinpath("fs")
rootpath.mkdir()
rootpath["dir1"].mkdir()
rootpath["dir2"].mkdir()
rootpath["dir3"].mkdir()
fp = rootpath["file1.test"].open("w")
fp.write("1")
fp.close()
fp = rootpath["file2.test"].open("w")
fp.write("12")
fp.close()
fp = rootpath["file3.test"].open("w")
fp.write("123")
fp.close()
fp = rootpath["dir1"]["file1.test"].open("w")
fp.write("1")
fp.close()
fp = rootpath["dir2"]["file2.test"].open("w")
fp.write("12")
fp.close()
fp = rootpath["dir3"]["file3.test"].open("w")
fp.write("123")
fp.close()
rootpath.joinpath("dir1").mkdir()
rootpath.joinpath("dir2").mkdir()
rootpath.joinpath("dir3").mkdir()
with rootpath.joinpath("file1.test").open("wt") as fp:
fp.write("1")
with rootpath.joinpath("file2.test").open("wt") as fp:
fp.write("12")
with rootpath.joinpath("file3.test").open("wt") as fp:
fp.write("123")
with rootpath.joinpath("dir1", "file1.test").open("wt") as fp:
fp.write("1")
with rootpath.joinpath("dir2", "file2.test").open("wt") as fp:
fp.write("12")
with rootpath.joinpath("dir3", "file3.test").open("wt") as fp:
fp.write("123")
return rootpath
@ -60,11 +54,10 @@ def setup_module(module):
# and another with a more complex structure.
testpath = Path(tempfile.mkdtemp())
module.testpath = testpath
rootpath = testpath["onefile"]
rootpath = testpath.joinpath("onefile")
rootpath.mkdir()
fp = rootpath["test.txt"].open("w")
fp.write("test_data")
fp.close()
with rootpath.joinpath("test.txt").open("wt") as fp:
fp.write("test_data")
create_fake_fs(testpath)
@ -80,13 +73,13 @@ def test_empty():
def test_add_path():
d = Directories()
p = testpath["onefile"]
p = testpath.joinpath("onefile")
d.add_path(p)
eq_(1, len(d))
assert p in d
assert (p["foobar"]) in d
assert p.parent() not in d
p = testpath["fs"]
assert (p.joinpath("foobar")) in d
assert p.parent not in d
p = testpath.joinpath("fs")
d.add_path(p)
eq_(2, len(d))
assert p in d
@ -94,18 +87,18 @@ def test_add_path():
def test_add_path_when_path_is_already_there():
d = Directories()
p = testpath["onefile"]
p = testpath.joinpath("onefile")
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p["foobar"])
d.add_path(p.joinpath("foobar"))
eq_(1, len(d))
def test_add_path_containing_paths_already_there():
d = Directories()
d.add_path(testpath["onefile"])
d.add_path(testpath.joinpath("onefile"))
eq_(1, len(d))
d.add_path(testpath)
eq_(len(d), 1)
@ -114,7 +107,7 @@ def test_add_path_containing_paths_already_there():
def test_add_path_non_latin(tmpdir):
p = Path(str(tmpdir))
to_add = p["unicode\u201a"]
to_add = p.joinpath("unicode\u201a")
os.mkdir(str(to_add))
d = Directories()
try:
@ -125,25 +118,25 @@ def test_add_path_non_latin(tmpdir):
def test_del():
d = Directories()
d.add_path(testpath["onefile"])
d.add_path(testpath.joinpath("onefile"))
try:
del d[1]
assert False
except IndexError:
pass
d.add_path(testpath["fs"])
d.add_path(testpath.joinpath("fs"))
del d[1]
eq_(1, len(d))
def test_states():
d = Directories()
p = testpath["onefile"]
p = testpath.joinpath("onefile")
d.add_path(p)
eq_(DirectoryState.NORMAL, d.get_state(p))
d.set_state(p, DirectoryState.REFERENCE)
eq_(DirectoryState.REFERENCE, d.get_state(p))
eq_(DirectoryState.REFERENCE, d.get_state(p["dir1"]))
eq_(DirectoryState.REFERENCE, d.get_state(p.joinpath("dir1")))
eq_(1, len(d.states))
eq_(p, list(d.states.keys())[0])
eq_(DirectoryState.REFERENCE, d.states[p])
@ -152,7 +145,7 @@ def test_states():
def test_get_state_with_path_not_there():
# When the path's not there, just return DirectoryState.Normal
d = Directories()
d.add_path(testpath["onefile"])
d.add_path(testpath.joinpath("onefile"))
eq_(d.get_state(testpath), DirectoryState.NORMAL)
@ -160,26 +153,26 @@ def test_states_overwritten_when_larger_directory_eat_smaller_ones():
# ref #248
# When setting the state of a folder, we overwrite previously set states for subfolders.
d = Directories()
p = testpath["onefile"]
p = testpath.joinpath("onefile")
d.add_path(p)
d.set_state(p, DirectoryState.EXCLUDED)
d.add_path(testpath)
d.set_state(testpath, DirectoryState.REFERENCE)
eq_(d.get_state(p), DirectoryState.REFERENCE)
eq_(d.get_state(p["dir1"]), DirectoryState.REFERENCE)
eq_(d.get_state(p.joinpath("dir1")), DirectoryState.REFERENCE)
eq_(d.get_state(testpath), DirectoryState.REFERENCE)
def test_get_files():
d = Directories()
p = testpath["fs"]
p = testpath.joinpath("fs")
d.add_path(p)
d.set_state(p["dir1"], DirectoryState.REFERENCE)
d.set_state(p["dir2"], DirectoryState.EXCLUDED)
d.set_state(p.joinpath("dir1"), DirectoryState.REFERENCE)
d.set_state(p.joinpath("dir2"), DirectoryState.EXCLUDED)
files = list(d.get_files())
eq_(5, len(files))
for f in files:
if f.path.parent() == p["dir1"]:
if f.path.parent == p.joinpath("dir1"):
assert f.is_ref
else:
assert not f.is_ref
@ -193,7 +186,7 @@ def test_get_files_with_folders():
return True
d = Directories()
p = testpath["fs"]
p = testpath.joinpath("fs")
d.add_path(p)
files = list(d.get_files(fileclasses=[FakeFile]))
# We have the 3 root files and the 3 root dirs
@ -202,23 +195,23 @@ def test_get_files_with_folders():
def test_get_folders():
d = Directories()
p = testpath["fs"]
p = testpath.joinpath("fs")
d.add_path(p)
d.set_state(p["dir1"], DirectoryState.REFERENCE)
d.set_state(p["dir2"], DirectoryState.EXCLUDED)
d.set_state(p.joinpath("dir1"), DirectoryState.REFERENCE)
d.set_state(p.joinpath("dir2"), DirectoryState.EXCLUDED)
folders = list(d.get_folders())
eq_(len(folders), 3)
ref = [f for f in folders if f.is_ref]
not_ref = [f for f in folders if not f.is_ref]
eq_(len(ref), 1)
eq_(ref[0].path, p["dir1"])
eq_(ref[0].path, p.joinpath("dir1"))
eq_(len(not_ref), 2)
eq_(ref[0].size, 1)
def test_get_files_with_inherited_exclusion():
d = Directories()
p = testpath["onefile"]
p = testpath.joinpath("onefile")
d.add_path(p)
d.set_state(p, DirectoryState.EXCLUDED)
eq_([], list(d.get_files()))
@ -234,13 +227,13 @@ def test_save_and_load(tmpdir):
d1.add_path(p1)
d1.add_path(p2)
d1.set_state(p1, DirectoryState.REFERENCE)
d1.set_state(p1["dir1"], DirectoryState.EXCLUDED)
d1.set_state(p1.joinpath("dir1"), DirectoryState.EXCLUDED)
tmpxml = str(tmpdir.join("directories_testunit.xml"))
d1.save_to_file(tmpxml)
d2.load_from_file(tmpxml)
eq_(2, len(d2))
eq_(DirectoryState.REFERENCE, d2.get_state(p1))
eq_(DirectoryState.EXCLUDED, d2.get_state(p1["dir1"]))
eq_(DirectoryState.EXCLUDED, d2.get_state(p1.joinpath("dir1")))
def test_invalid_path():
@ -268,7 +261,7 @@ def test_load_from_file_with_invalid_path(tmpdir):
# This test simulates a load from file resulting in a
# InvalidPath raise. Other directories must be loaded.
d1 = Directories()
d1.add_path(testpath["onefile"])
d1.add_path(testpath.joinpath("onefile"))
# Will raise InvalidPath upon loading
p = Path(str(tmpdir.join("toremove")))
p.mkdir()
@ -283,11 +276,11 @@ def test_load_from_file_with_invalid_path(tmpdir):
def test_unicode_save(tmpdir):
d = Directories()
p1 = Path(str(tmpdir))["hello\xe9"]
p1 = Path(str(tmpdir), "hello\xe9")
p1.mkdir()
p1["foo\xe9"].mkdir()
p1.joinpath("foo\xe9").mkdir()
d.add_path(p1)
d.set_state(p1["foo\xe9"], DirectoryState.EXCLUDED)
d.set_state(p1.joinpath("foo\xe9"), DirectoryState.EXCLUDED)
tmpxml = str(tmpdir.join("directories_testunit.xml"))
try:
d.save_to_file(tmpxml)
@ -297,12 +290,12 @@ def test_unicode_save(tmpdir):
def test_get_files_refreshes_its_directories():
d = Directories()
p = testpath["fs"]
p = testpath.joinpath("fs")
d.add_path(p)
files = d.get_files()
eq_(6, len(list(files)))
time.sleep(1)
os.remove(str(p["dir1"]["file1.test"]))
os.remove(str(p.joinpath("dir1", "file1.test")))
files = d.get_files()
eq_(5, len(list(files)))
@ -311,15 +304,15 @@ def test_get_files_does_not_choke_on_non_existing_directories(tmpdir):
d = Directories()
p = Path(str(tmpdir))
d.add_path(p)
p.rmtree()
shutil.rmtree(str(p))
eq_([], list(d.get_files()))
def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir):
d = Directories()
p = Path(str(tmpdir))
hidden_dir_path = p[".foo"]
p[".foo"].mkdir()
hidden_dir_path = p.joinpath(".foo")
p.joinpath(".foo").mkdir()
d.add_path(p)
eq_(d.get_state(hidden_dir_path), DirectoryState.EXCLUDED)
# But it can be overriden
@ -331,22 +324,22 @@ def test_default_path_state_override(tmpdir):
# It's possible for a subclass to override the default state of a path
class MyDirectories(Directories):
def _default_state_for_path(self, path):
if "foobar" in path:
if "foobar" in path.parts:
return DirectoryState.EXCLUDED
d = MyDirectories()
p1 = Path(str(tmpdir))
p1["foobar"].mkdir()
p1["foobar/somefile"].open("w").close()
p1["foobaz"].mkdir()
p1["foobaz/somefile"].open("w").close()
p1.joinpath("foobar").mkdir()
p1.joinpath("foobar/somefile").touch()
p1.joinpath("foobaz").mkdir()
p1.joinpath("foobaz/somefile").touch()
d.add_path(p1)
eq_(d.get_state(p1["foobaz"]), DirectoryState.NORMAL)
eq_(d.get_state(p1["foobar"]), DirectoryState.EXCLUDED)
eq_(d.get_state(p1.joinpath("foobaz")), DirectoryState.NORMAL)
eq_(d.get_state(p1.joinpath("foobar")), DirectoryState.EXCLUDED)
eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there
# However, the default state can be changed
d.set_state(p1["foobar"], DirectoryState.NORMAL)
eq_(d.get_state(p1["foobar"]), DirectoryState.NORMAL)
d.set_state(p1.joinpath("foobar"), DirectoryState.NORMAL)
eq_(d.get_state(p1.joinpath("foobar")), DirectoryState.NORMAL)
eq_(len(list(d.get_files())), 2)
@ -372,42 +365,42 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
self.d._exclude_list.add(regex)
self.d._exclude_list.mark(regex)
p1 = Path(str(tmpdir))
p1["$Recycle.Bin"].mkdir()
p1["$Recycle.Bin"]["subdir"].mkdir()
p1.joinpath("$Recycle.Bin").mkdir()
p1.joinpath("$Recycle.Bin", "subdir").mkdir()
self.d.add_path(p1)
eq_(self.d.get_state(p1["$Recycle.Bin"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin")), DirectoryState.EXCLUDED)
# By default, subdirs should be excluded too, but this can be overridden separately
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.EXCLUDED)
self.d.set_state(p1["$Recycle.Bin"]["subdir"], DirectoryState.NORMAL)
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.EXCLUDED)
self.d.set_state(p1.joinpath("$Recycle.Bin", "subdir"), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL)
def test_exclude_refined(self, tmpdir):
regex1 = r"^\$Recycle\.Bin$"
self.d._exclude_list.add(regex1)
self.d._exclude_list.mark(regex1)
p1 = Path(str(tmpdir))
p1["$Recycle.Bin"].mkdir()
p1["$Recycle.Bin"]["somefile.png"].open("w").close()
p1["$Recycle.Bin"]["some_unwanted_file.jpg"].open("w").close()
p1["$Recycle.Bin"]["subdir"].mkdir()
p1["$Recycle.Bin"]["subdir"]["somesubdirfile.png"].open("w").close()
p1["$Recycle.Bin"]["subdir"]["unwanted_subdirfile.gif"].open("w").close()
p1["$Recycle.Bin"]["subdar"].mkdir()
p1["$Recycle.Bin"]["subdar"]["somesubdarfile.jpeg"].open("w").close()
p1["$Recycle.Bin"]["subdar"]["unwanted_subdarfile.png"].open("w").close()
self.d.add_path(p1["$Recycle.Bin"])
p1.joinpath("$Recycle.Bin").mkdir()
p1.joinpath("$Recycle.Bin", "somefile.png").touch()
p1.joinpath("$Recycle.Bin", "some_unwanted_file.jpg").touch()
p1.joinpath("$Recycle.Bin", "subdir").mkdir()
p1.joinpath("$Recycle.Bin", "subdir", "somesubdirfile.png").touch()
p1.joinpath("$Recycle.Bin", "subdir", "unwanted_subdirfile.gif").touch()
p1.joinpath("$Recycle.Bin", "subdar").mkdir()
p1.joinpath("$Recycle.Bin", "subdar", "somesubdarfile.jpeg").touch()
p1.joinpath("$Recycle.Bin", "subdar", "unwanted_subdarfile.png").touch()
self.d.add_path(p1.joinpath("$Recycle.Bin"))
# Filter should set the default state to Excluded
eq_(self.d.get_state(p1["$Recycle.Bin"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin")), DirectoryState.EXCLUDED)
# The subdir should inherit its parent state
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdar"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdar")), DirectoryState.EXCLUDED)
# Override a child path's state
self.d.set_state(p1["$Recycle.Bin"]["subdir"], DirectoryState.NORMAL)
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL)
self.d.set_state(p1.joinpath("$Recycle.Bin", "subdir"), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL)
# Parent should keep its default state, and the other child too
eq_(self.d.get_state(p1["$Recycle.Bin"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdar"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin")), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdar")), DirectoryState.EXCLUDED)
# print(f"get_folders(): {[x for x in self.d.get_folders()]}")
# only the 2 files directly under the Normal directory
@ -419,8 +412,8 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
assert "somesubdirfile.png" in files
assert "unwanted_subdirfile.gif" in files
# Overriding the parent should enable all children
self.d.set_state(p1["$Recycle.Bin"], DirectoryState.NORMAL)
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdar"]), DirectoryState.NORMAL)
self.d.set_state(p1.joinpath("$Recycle.Bin"), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdar")), DirectoryState.NORMAL)
# all files there
files = self.get_files_and_expect_num_result(6)
assert "somefile.png" in files
@ -444,7 +437,7 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
assert self.d._exclude_list.error(regex3) is None
# print(f"get_folders(): {[x for x in self.d.get_folders()]}")
# Directory shouldn't change its state here, unless explicitely done by user
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL)
files = self.get_files_and_expect_num_result(5)
assert "unwanted_subdirfile.gif" not in files
assert "unwanted_subdarfile.png" in files
@ -453,15 +446,15 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
regex4 = r".*subdir$"
self.d._exclude_list.rename(regex3, regex4)
assert self.d._exclude_list.error(regex4) is None
p1["$Recycle.Bin"]["subdar"]["file_ending_with_subdir"].open("w").close()
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.EXCLUDED)
p1.joinpath("$Recycle.Bin", "subdar", "file_ending_with_subdir").touch()
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.EXCLUDED)
files = self.get_files_and_expect_num_result(4)
assert "file_ending_with_subdir" not in files
assert "somesubdarfile.jpeg" in files
assert "somesubdirfile.png" not in files
assert "unwanted_subdirfile.gif" not in files
self.d.set_state(p1["$Recycle.Bin"]["subdir"], DirectoryState.NORMAL)
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL)
self.d.set_state(p1.joinpath("$Recycle.Bin", "subdir"), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL)
# print(f"get_folders(): {[x for x in self.d.get_folders()]}")
files = self.get_files_and_expect_num_result(6)
assert "file_ending_with_subdir" not in files
@ -471,9 +464,9 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
regex5 = r".*subdir.*"
self.d._exclude_list.rename(regex4, regex5)
# Files containing substring should be filtered
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL)
# The path should not match, only the filename, the "subdir" in the directory name shouldn't matter
p1["$Recycle.Bin"]["subdir"]["file_which_shouldnt_match"].open("w").close()
p1.joinpath("$Recycle.Bin", "subdir", "file_which_shouldnt_match").touch()
files = self.get_files_and_expect_num_result(5)
assert "somesubdirfile.png" not in files
assert "unwanted_subdirfile.gif" not in files
@ -493,7 +486,7 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
assert self.d._exclude_list.error(regex6) is None
assert regex6 in self.d._exclude_list
# This still should not be affected
eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL)
files = self.get_files_and_expect_num_result(5)
# These files are under the "/subdir" directory
assert "somesubdirfile.png" not in files
@ -505,20 +498,20 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
def test_japanese_unicode(self, tmpdir):
p1 = Path(str(tmpdir))
p1["$Recycle.Bin"].mkdir()
p1["$Recycle.Bin"]["somerecycledfile.png"].open("w").close()
p1["$Recycle.Bin"]["some_unwanted_file.jpg"].open("w").close()
p1["$Recycle.Bin"]["subdir"].mkdir()
p1["$Recycle.Bin"]["subdir"]["過去白濁物語~]_カラー.jpg"].open("w").close()
p1["$Recycle.Bin"]["思叫物語"].mkdir()
p1["$Recycle.Bin"]["思叫物語"]["なししろ会う前"].open("w").close()
p1["$Recycle.Bin"]["思叫物語"]["堂~ロ"].open("w").close()
self.d.add_path(p1["$Recycle.Bin"])
p1.joinpath("$Recycle.Bin").mkdir()
p1.joinpath("$Recycle.Bin", "somerecycledfile.png").touch()
p1.joinpath("$Recycle.Bin", "some_unwanted_file.jpg").touch()
p1.joinpath("$Recycle.Bin", "subdir").mkdir()
p1.joinpath("$Recycle.Bin", "subdir", "過去白濁物語~]_カラー.jpg").touch()
p1.joinpath("$Recycle.Bin", "思叫物語").mkdir()
p1.joinpath("$Recycle.Bin", "思叫物語", "なししろ会う前").touch()
p1.joinpath("$Recycle.Bin", "思叫物語", "堂~ロ").touch()
self.d.add_path(p1.joinpath("$Recycle.Bin"))
regex3 = r".*物語.*"
self.d._exclude_list.add(regex3)
self.d._exclude_list.mark(regex3)
# print(f"get_folders(): {[x for x in self.d.get_folders()]}")
eq_(self.d.get_state(p1["$Recycle.Bin"]["思叫物語"]), DirectoryState.EXCLUDED)
eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "思叫物語")), DirectoryState.EXCLUDED)
files = self.get_files_and_expect_num_result(2)
assert "過去白濁物語~]_カラー.jpg" not in files
assert "なししろ会う前" not in files
@ -527,7 +520,7 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
regex4 = r".*物語$"
self.d._exclude_list.rename(regex3, regex4)
assert self.d._exclude_list.error(regex4) is None
self.d.set_state(p1["$Recycle.Bin"]["思叫物語"], DirectoryState.NORMAL)
self.d.set_state(p1.joinpath("$Recycle.Bin", "思叫物語"), DirectoryState.NORMAL)
files = self.get_files_and_expect_num_result(5)
assert "過去白濁物語~]_カラー.jpg" in files
assert "なししろ会う前" in files
@ -539,15 +532,15 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled
self.d._exclude_list.add(regex)
self.d._exclude_list.mark(regex)
p1 = Path(str(tmpdir))
p1["foobar"].mkdir()
p1["foobar"][".hidden_file.txt"].open("w").close()
p1["foobar"][".hidden_dir"].mkdir()
p1["foobar"][".hidden_dir"]["foobar.jpg"].open("w").close()
p1["foobar"][".hidden_dir"][".hidden_subfile.png"].open("w").close()
self.d.add_path(p1["foobar"])
p1.joinpath("foobar").mkdir()
p1.joinpath("foobar", ".hidden_file.txt").touch()
p1.joinpath("foobar", ".hidden_dir").mkdir()
p1.joinpath("foobar", ".hidden_dir", "foobar.jpg").touch()
p1.joinpath("foobar", ".hidden_dir", ".hidden_subfile.png").touch()
self.d.add_path(p1.joinpath("foobar"))
# It should not inherit its parent's state originally
eq_(self.d.get_state(p1["foobar"][".hidden_dir"]), DirectoryState.EXCLUDED)
self.d.set_state(p1["foobar"][".hidden_dir"], DirectoryState.NORMAL)
eq_(self.d.get_state(p1.joinpath("foobar", ".hidden_dir")), DirectoryState.EXCLUDED)
self.d.set_state(p1.joinpath("foobar", ".hidden_dir"), DirectoryState.NORMAL)
# The files should still be filtered
files = self.get_files_and_expect_num_result(1)
eq_(len(self.d._exclude_list.compiled_paths), 0)

View File

@ -17,7 +17,7 @@ except ImportError:
from os import urandom
from hscommon.path import Path
from pathlib import Path
from hscommon.testutil import eq_
from core.tests.directories_test import create_fake_fs
@ -25,32 +25,26 @@ from .. import fs
def create_fake_fs_with_random_data(rootpath):
rootpath = rootpath["fs"]
rootpath = rootpath.joinpath("fs")
rootpath.mkdir()
rootpath["dir1"].mkdir()
rootpath["dir2"].mkdir()
rootpath["dir3"].mkdir()
fp = rootpath["file1.test"].open("wb")
rootpath.joinpath("dir1").mkdir()
rootpath.joinpath("dir2").mkdir()
rootpath.joinpath("dir3").mkdir()
data1 = urandom(200 * 1024) # 200KiB
data2 = urandom(1024 * 1024) # 1MiB
data3 = urandom(10 * 1024 * 1024) # 10MiB
fp.write(data1)
fp.close()
fp = rootpath["file2.test"].open("wb")
fp.write(data2)
fp.close()
fp = rootpath["file3.test"].open("wb")
fp.write(data3)
fp.close()
fp = rootpath["dir1"]["file1.test"].open("wb")
fp.write(data1)
fp.close()
fp = rootpath["dir2"]["file2.test"].open("wb")
fp.write(data2)
fp.close()
fp = rootpath["dir3"]["file3.test"].open("wb")
fp.write(data3)
fp.close()
with rootpath.joinpath("file1.test").open("wb") as fp:
fp.write(data1)
with rootpath.joinpath("file2.test").open("wb") as fp:
fp.write(data2)
with rootpath.joinpath("file3.test").open("wb") as fp:
fp.write(data3)
with rootpath.joinpath("dir1", "file1.test").open("wb") as fp:
fp.write(data1)
with rootpath.joinpath("dir2", "file2.test").open("wb") as fp:
fp.write(data2)
with rootpath.joinpath("dir3", "file3.test").open("wb") as fp:
fp.write(data3)
return rootpath
@ -66,12 +60,12 @@ def test_digest_aggregate_subfiles_sorted(tmpdir):
# same order everytime.
p = create_fake_fs_with_random_data(Path(str(tmpdir)))
b = fs.Folder(p)
digest1 = fs.File(p["dir1"]["file1.test"]).digest
digest2 = fs.File(p["dir2"]["file2.test"]).digest
digest3 = fs.File(p["dir3"]["file3.test"]).digest
digest4 = fs.File(p["file1.test"]).digest
digest5 = fs.File(p["file2.test"]).digest
digest6 = fs.File(p["file3.test"]).digest
digest1 = fs.File(p.joinpath("dir1", "file1.test")).digest
digest2 = fs.File(p.joinpath("dir2", "file2.test")).digest
digest3 = fs.File(p.joinpath("dir3", "file3.test")).digest
digest4 = fs.File(p.joinpath("file1.test")).digest
digest5 = fs.File(p.joinpath("file2.test")).digest
digest6 = fs.File(p.joinpath("file3.test")).digest
# The expected digest is the hash of digests for folders and the direct digest for files
folder_digest1 = hasher(digest1).digest()
folder_digest2 = hasher(digest2).digest()
@ -83,12 +77,12 @@ def test_digest_aggregate_subfiles_sorted(tmpdir):
def test_partial_digest_aggregate_subfile_sorted(tmpdir):
p = create_fake_fs_with_random_data(Path(str(tmpdir)))
b = fs.Folder(p)
digest1 = fs.File(p["dir1"]["file1.test"]).digest_partial
digest2 = fs.File(p["dir2"]["file2.test"]).digest_partial
digest3 = fs.File(p["dir3"]["file3.test"]).digest_partial
digest4 = fs.File(p["file1.test"]).digest_partial
digest5 = fs.File(p["file2.test"]).digest_partial
digest6 = fs.File(p["file3.test"]).digest_partial
digest1 = fs.File(p.joinpath("dir1", "file1.test")).digest_partial
digest2 = fs.File(p.joinpath("dir2", "file2.test")).digest_partial
digest3 = fs.File(p.joinpath("dir3", "file3.test")).digest_partial
digest4 = fs.File(p.joinpath("file1.test")).digest_partial
digest5 = fs.File(p.joinpath("file2.test")).digest_partial
digest6 = fs.File(p.joinpath("file3.test")).digest_partial
# The expected digest is the hash of digests for folders and the direct digest for files
folder_digest1 = hasher(digest1).digest()
folder_digest2 = hasher(digest2).digest()
@ -96,12 +90,12 @@ def test_partial_digest_aggregate_subfile_sorted(tmpdir):
digest = hasher(folder_digest1 + folder_digest2 + folder_digest3 + digest4 + digest5 + digest6).digest()
eq_(b.digest_partial, digest)
digest1 = fs.File(p["dir1"]["file1.test"]).digest_samples
digest2 = fs.File(p["dir2"]["file2.test"]).digest_samples
digest3 = fs.File(p["dir3"]["file3.test"]).digest_samples
digest4 = fs.File(p["file1.test"]).digest_samples
digest5 = fs.File(p["file2.test"]).digest_samples
digest6 = fs.File(p["file3.test"]).digest_samples
digest1 = fs.File(p.joinpath("dir1", "file1.test")).digest_samples
digest2 = fs.File(p.joinpath("dir2", "file2.test")).digest_samples
digest3 = fs.File(p.joinpath("dir3", "file3.test")).digest_samples
digest4 = fs.File(p.joinpath("file1.test")).digest_samples
digest5 = fs.File(p.joinpath("file2.test")).digest_samples
digest6 = fs.File(p.joinpath("file3.test")).digest_samples
# The expected digest is the digest of digests for folders and the direct digest for files
folder_digest1 = hasher(digest1).digest()
folder_digest2 = hasher(digest2).digest()

View File

@ -447,7 +447,7 @@ class TestCaseResultsXML:
self.results.groups = self.groups
def get_file(self, path): # use this as a callback for load_from_xml
return [o for o in self.objects if o.path == path][0]
return [o for o in self.objects if str(o.path) == path][0]
def test_save_to_xml(self):
self.objects[0].is_ref = True

View File

@ -7,7 +7,7 @@
import pytest
from hscommon.jobprogress import job
from hscommon.path import Path
from pathlib import Path
from hscommon.testutil import eq_
from .. import fs
@ -22,7 +22,7 @@ class NamedObject:
if path is None:
path = Path(name)
else:
path = Path(path)[name]
path = Path(path, name)
self.name = name
self.size = size
self.path = path
@ -572,12 +572,14 @@ def test_dont_group_files_that_dont_exist(tmpdir):
s = Scanner()
s.scan_type = ScanType.CONTENTS
p = Path(str(tmpdir))
p["file1"].open("w").write("foo")
p["file2"].open("w").write("foo")
with p.joinpath("file1").open("w") as fp:
fp.write("foo")
with p.joinpath("file2").open("w") as fp:
fp.write("foo")
file1, file2 = fs.get_files(p)
def getmatches(*args, **kw):
file2.path.remove()
file2.path.unlink()
return [Match(file1, file2, 100)]
s._getmatches = getmatches

View File

@ -14,7 +14,7 @@ import re
import os
import shutil
from .path import Path, pathify
from pathlib import Path
# This matches [123], but not [12] (3 digits being the minimum).
# It also matches [1234] [12345] etc..
@ -52,16 +52,15 @@ def is_conflicted(name):
return re_conflict.match(name) is not None
@pathify
def _smart_move_or_copy(operation, source_path: Path, dest_path: Path):
"""Use move() or copy() to move and copy file with the conflict management."""
if dest_path.isdir() and not source_path.isdir():
dest_path = dest_path[source_path.name]
if dest_path.is_dir() and not source_path.is_dir():
dest_path = dest_path.joinpath(source_path.name)
if dest_path.exists():
filename = dest_path.name
dest_dir_path = dest_path.parent()
dest_dir_path = dest_path.parent
newname = get_conflicted_name(os.listdir(str(dest_dir_path)), filename)
dest_path = dest_dir_path[newname]
dest_path = dest_dir_path.joinpath(newname)
operation(str(source_path), str(dest_path))

View File

@ -7,208 +7,9 @@
# http://www.gnu.org/licenses/gpl-3.0.html
import logging
import os
import os.path as op
import shutil
import sys
from itertools import takewhile
from functools import wraps
from inspect import signature
class Path(tuple):
"""A handy class to work with paths.
We subclass ``tuple``, each element of the tuple represents an element of the path.
* ``Path('/foo/bar/baz')[1]`` --> ``'bar'``
* ``Path('/foo/bar/baz')[1:2]`` --> ``Path('bar/baz')``
* ``Path('/foo/bar')['baz']`` --> ``Path('/foo/bar/baz')``
* ``str(Path('/foo/bar/baz'))`` --> ``'/foo/bar/baz'``
"""
# Saves a little bit of memory usage
__slots__ = ()
def __new__(cls, value, separator=None):
def unicode_if_needed(s):
if isinstance(s, str):
return s
else:
try:
return str(s, sys.getfilesystemencoding())
except UnicodeDecodeError:
logging.warning("Could not decode %r", s)
raise
if isinstance(value, Path):
return value
if not separator:
separator = os.sep
if isinstance(value, bytes):
value = unicode_if_needed(value)
if isinstance(value, str):
if value:
if (separator not in value) and ("/" in value):
separator = "/"
value = value.split(separator)
else:
value = ()
else:
if any(isinstance(x, bytes) for x in value):
value = [unicode_if_needed(x) for x in value]
# value is a tuple/list
if any(separator in x for x in value):
# We have a component with a separator in it. Let's rejoin it, and generate another path.
return Path(separator.join(value), separator)
if (len(value) > 1) and (not value[-1]):
value = value[
:-1
] # We never want a path to end with a '' (because Path() can be called with a trailing slash ending path)
return tuple.__new__(cls, value)
def __add__(self, other):
other = Path(other)
if other and (not other[0]):
other = other[1:]
return Path(tuple.__add__(self, other))
def __contains__(self, item):
if isinstance(item, Path):
return item[: len(self)] == self
else:
return tuple.__contains__(self, item)
def __eq__(self, other):
return tuple.__eq__(self, Path(other))
def __getitem__(self, key):
if isinstance(key, slice):
if isinstance(key.start, Path):
equal_elems = list(takewhile(lambda pair: pair[0] == pair[1], zip(self, key.start)))
key = slice(len(equal_elems), key.stop, key.step)
if isinstance(key.stop, Path):
equal_elems = list(
takewhile(
lambda pair: pair[0] == pair[1],
zip(reversed(self), reversed(key.stop)),
)
)
stop = -len(equal_elems) if equal_elems else None
key = slice(key.start, stop, key.step)
return Path(tuple.__getitem__(self, key))
elif isinstance(key, (str, Path)):
return self + key
else:
return tuple.__getitem__(self, key)
def __hash__(self):
return tuple.__hash__(self)
def __ne__(self, other):
return not self.__eq__(other)
def __radd__(self, other):
return Path(other) + self
def __str__(self):
if len(self) == 1:
first = self[0]
if (len(first) == 2) and (first[1] == ":"): # Windows drive letter
return first + "\\"
elif not len(first): # root directory
return "/"
return os.sep.join(self)
def has_drive_letter(self):
if not self:
return False
first = self[0]
return (len(first) == 2) and (first[1] == ":")
def is_parent_of(self, other):
"""Whether ``other`` is a subpath of ``self``.
Almost the same as ``other in self``, but it's a bit more self-explicative and when
``other == self``, returns False.
"""
if other == self:
return False
else:
return other in self
def remove_drive_letter(self):
if self.has_drive_letter():
return self[1:]
else:
return self
def tobytes(self):
return str(self).encode(sys.getfilesystemencoding())
def parent(self):
"""Returns the parent path.
``Path('/foo/bar/baz').parent()`` --> ``Path('/foo/bar')``
"""
return self[:-1]
@property
def name(self):
"""Last element of the path (filename), with extension.
``Path('/foo/bar/baz').name`` --> ``'baz'``
"""
return self[-1]
# OS method wrappers
def exists(self):
return op.exists(str(self))
def copy(self, dest_path):
return shutil.copy(str(self), str(dest_path))
def copytree(self, dest_path, *args, **kwargs):
return shutil.copytree(str(self), str(dest_path), *args, **kwargs)
def isdir(self):
return op.isdir(str(self))
def isfile(self):
return op.isfile(str(self))
def islink(self):
return op.islink(str(self))
def listdir(self):
return [self[name] for name in os.listdir(str(self))]
def mkdir(self, *args, **kwargs):
return os.mkdir(str(self), *args, **kwargs)
def makedirs(self, *args, **kwargs):
return os.makedirs(str(self), *args, **kwargs)
def move(self, dest_path):
return shutil.move(str(self), str(dest_path))
def open(self, *args, **kwargs):
return open(str(self), *args, **kwargs)
def remove(self):
return os.remove(str(self))
def rename(self, dest_path):
return os.rename(str(self), str(dest_path))
def rmdir(self):
return os.rmdir(str(self))
def rmtree(self):
return shutil.rmtree(str(self))
def stat(self):
return os.stat(str(self))
from pathlib import Path
def pathify(f):

View File

@ -15,7 +15,7 @@ from ..conflict import (
smart_copy,
smart_move,
)
from ..path import Path
from pathlib import Path
from ..testutil import eq_
@ -71,43 +71,43 @@ class TestCaseMoveCopy:
def do_setup(self, request):
tmpdir = request.getfixturevalue("tmpdir")
self.path = Path(str(tmpdir))
self.path["foo"].open("w").close()
self.path["bar"].open("w").close()
self.path["dir"].mkdir()
self.path.joinpath("foo").touch()
self.path.joinpath("bar").touch()
self.path.joinpath("dir").mkdir()
def test_move_no_conflict(self, do_setup):
smart_move(self.path + "foo", self.path + "baz")
assert self.path["baz"].exists()
assert not self.path["foo"].exists()
smart_move(self.path.joinpath("foo"), self.path.joinpath("baz"))
assert self.path.joinpath("baz").exists()
assert not self.path.joinpath("foo").exists()
def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move
smart_copy(self.path + "foo", self.path + "baz")
assert self.path["baz"].exists()
assert self.path["foo"].exists()
smart_copy(self.path.joinpath("foo"), self.path.joinpath("baz"))
assert self.path.joinpath("baz").exists()
assert self.path.joinpath("foo").exists()
def test_move_no_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + "foo", self.path + "dir")
assert self.path["dir"]["foo"].exists()
assert not self.path["foo"].exists()
smart_move(self.path.joinpath("foo"), self.path.joinpath("dir"))
assert self.path.joinpath("dir", "foo").exists()
assert not self.path.joinpath("foo").exists()
def test_move_conflict(self, do_setup):
smart_move(self.path + "foo", self.path + "bar")
assert self.path["[000] bar"].exists()
assert not self.path["foo"].exists()
smart_move(self.path.joinpath("foo"), self.path.joinpath("bar"))
assert self.path.joinpath("[000] bar").exists()
assert not self.path.joinpath("foo").exists()
def test_move_conflict_dest_is_dir(self, do_setup):
smart_move(self.path["foo"], self.path["dir"])
smart_move(self.path["bar"], self.path["foo"])
smart_move(self.path["foo"], self.path["dir"])
assert self.path["dir"]["foo"].exists()
assert self.path["dir"]["[000] foo"].exists()
assert not self.path["foo"].exists()
assert not self.path["bar"].exists()
smart_move(self.path.joinpath("foo"), self.path.joinpath("dir"))
smart_move(self.path.joinpath("bar"), self.path.joinpath("foo"))
smart_move(self.path.joinpath("foo"), self.path.joinpath("dir"))
assert self.path.joinpath("dir", "foo").exists()
assert self.path.joinpath("dir", "[000] foo").exists()
assert not self.path.joinpath("foo").exists()
assert not self.path.joinpath("bar").exists()
def test_copy_folder(self, tmpdir):
# smart_copy also works on folders
path = Path(str(tmpdir))
path["foo"].mkdir()
path["bar"].mkdir()
smart_copy(path["foo"], path["bar"]) # no crash
assert path["[000] bar"].exists()
path.joinpath("foo").mkdir()
path.joinpath("bar").mkdir()
smart_copy(path.joinpath("foo"), path.joinpath("bar")) # no crash
assert path.joinpath("[000] bar").exists()

View File

@ -6,261 +6,8 @@
# which should be included with this package. The terms are also available at
# http://www.gnu.org/licenses/gpl-3.0.html
import sys
import os
import pytest
from ..path import Path, pathify
from ..testutil import eq_
@pytest.fixture
def force_ossep(request):
monkeypatch = request.getfixturevalue("monkeypatch")
monkeypatch.setattr(os, "sep", "/")
def test_empty(force_ossep):
path = Path("")
eq_("", str(path))
eq_(0, len(path))
path = Path(())
eq_("", str(path))
eq_(0, len(path))
def test_single(force_ossep):
path = Path("foobar")
eq_("foobar", path)
eq_(1, len(path))
def test_multiple(force_ossep):
path = Path("foo/bar")
eq_("foo/bar", path)
eq_(2, len(path))
def test_init_with_tuple_and_list(force_ossep):
path = Path(("foo", "bar"))
eq_("foo/bar", path)
path = Path(["foo", "bar"])
eq_("foo/bar", path)
def test_init_with_invalid_value(force_ossep):
try:
Path(42)
assert False
except TypeError:
pass
def test_access(force_ossep):
path = Path("foo/bar/bleh")
eq_("foo", path[0])
eq_("foo", path[-3])
eq_("bar", path[1])
eq_("bar", path[-2])
eq_("bleh", path[2])
eq_("bleh", path[-1])
def test_slicing(force_ossep):
path = Path("foo/bar/bleh")
subpath = path[:2]
eq_("foo/bar", subpath)
assert isinstance(subpath, Path)
def test_parent(force_ossep):
path = Path("foo/bar/bleh")
subpath = path.parent()
eq_("foo/bar", subpath)
assert isinstance(subpath, Path)
def test_filename(force_ossep):
path = Path("foo/bar/bleh.ext")
eq_(path.name, "bleh.ext")
def test_deal_with_empty_components(force_ossep):
"""Keep ONLY a leading space, which means we want a leading slash."""
eq_("foo//bar", str(Path(("foo", "", "bar"))))
eq_("/foo/bar", str(Path(("", "foo", "bar"))))
eq_("foo/bar", str(Path("foo/bar/")))
def test_old_compare_paths(force_ossep):
eq_(Path("foobar"), Path("foobar"))
eq_(Path("foobar/"), Path("foobar\\", "\\"))
eq_(Path("/foobar/"), Path("\\foobar\\", "\\"))
eq_(Path("/foo/bar"), Path("\\foo\\bar", "\\"))
eq_(Path("/foo/bar"), Path("\\foo\\bar\\", "\\"))
assert Path("/foo/bar") != Path("\\foo\\foo", "\\")
# We also have to test __ne__
assert not (Path("foobar") != Path("foobar"))
assert Path("/a/b/c.x") != Path("/a/b/c.y")
def test_old_split_path(force_ossep):
eq_(Path("foobar"), ("foobar",))
eq_(Path("foo/bar"), ("foo", "bar"))
eq_(Path("/foo/bar/"), ("", "foo", "bar"))
eq_(Path("\\foo\\bar", "\\"), ("", "foo", "bar"))
def test_representation(force_ossep):
eq_("('foo', 'bar')", repr(Path(("foo", "bar"))))
def test_add(force_ossep):
eq_("foo/bar/bar/foo", Path(("foo", "bar")) + Path("bar/foo"))
eq_("foo/bar/bar/foo", Path("foo/bar") + "bar/foo")
eq_("foo/bar/bar/foo", Path("foo/bar") + ("bar", "foo"))
eq_("foo/bar/bar/foo", ("foo", "bar") + Path("bar/foo"))
eq_("foo/bar/bar/foo", "foo/bar" + Path("bar/foo"))
# Invalid concatenation
try:
Path(("foo", "bar")) + 1
assert False
except TypeError:
pass
def test_path_slice(force_ossep):
foo = Path("foo")
bar = Path("bar")
foobar = Path("foo/bar")
eq_("bar", foobar[foo:])
eq_("foo", foobar[:bar])
eq_("foo/bar", foobar[bar:])
eq_("foo/bar", foobar[:foo])
eq_((), foobar[foobar:])
eq_((), foobar[:foobar])
abcd = Path("a/b/c/d")
a = Path("a")
d = Path("d")
z = Path("z")
eq_("b/c", abcd[a:d])
eq_("b/c/d", abcd[a : d + z])
eq_("b/c", abcd[a : z + d])
eq_("a/b/c/d", abcd[:z])
def test_add_with_root_path(force_ossep):
"""if I perform /a/b/c + /d/e/f, I want /a/b/c/d/e/f, not /a/b/c//d/e/f"""
eq_("/foo/bar", str(Path("/foo") + Path("/bar")))
def test_create_with_tuple_that_have_slash_inside(force_ossep, monkeypatch):
eq_(("", "foo", "bar"), Path(("/foo", "bar")))
monkeypatch.setattr(os, "sep", "\\")
eq_(("", "foo", "bar"), Path(("\\foo", "bar")))
def test_auto_decode_os_sep(force_ossep, monkeypatch):
"""Path should decode any either / or os.sep, but always encode in os.sep."""
eq_(("foo\\bar", "bleh"), Path("foo\\bar/bleh"))
monkeypatch.setattr(os, "sep", "\\")
eq_(("foo", "bar/bleh"), Path("foo\\bar/bleh"))
path = Path("foo/bar")
eq_(("foo", "bar"), path)
eq_("foo\\bar", str(path))
def test_contains(force_ossep):
p = Path(("foo", "bar"))
assert Path(("foo", "bar", "bleh")) in p
assert Path(("foo", "bar")) in p
assert "foo" in p
assert "bleh" not in p
assert Path("foo") not in p
def test_is_parent_of(force_ossep):
assert Path(("foo", "bar")).is_parent_of(Path(("foo", "bar", "bleh")))
assert not Path(("foo", "bar")).is_parent_of(Path(("foo", "baz")))
assert not Path(("foo", "bar")).is_parent_of(Path(("foo", "bar")))
def test_windows_drive_letter(force_ossep):
p = Path(("c:",))
eq_("c:\\", str(p))
def test_root_path(force_ossep):
p = Path("/")
eq_("/", str(p))
def test_str_encodes_unicode_to_getfilesystemencoding(force_ossep):
p = Path(("foo", "bar\u00e9"))
eq_("foo/bar\u00e9".encode(sys.getfilesystemencoding()), p.tobytes())
def test_unicode(force_ossep):
p = Path(("foo", "bar\u00e9"))
eq_("foo/bar\u00e9", str(p))
def test_str_repr_of_mix_between_non_ascii_str_and_unicode(force_ossep):
u = "foo\u00e9"
encoded = u.encode(sys.getfilesystemencoding())
p = Path((encoded, "bar"))
print(repr(tuple(p)))
eq_("foo\u00e9/bar".encode(sys.getfilesystemencoding()), p.tobytes())
def test_path_of_a_path_returns_self(force_ossep):
# if Path() is called with a path as value, just return value.
p = Path("foo/bar")
assert Path(p) is p
def test_getitem_str(force_ossep):
# path['something'] returns the child path corresponding to the name
p = Path("/foo/bar")
eq_(p["baz"], Path("/foo/bar/baz"))
def test_getitem_path(force_ossep):
# path[Path('something')] returns the child path corresponding to the name (or subpath)
p = Path("/foo/bar")
eq_(p[Path("baz/bleh")], Path("/foo/bar/baz/bleh"))
@pytest.mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate")
def test_log_unicode_errors(force_ossep, monkeypatch, capsys):
# When an there's a UnicodeDecodeError on path creation, log it so it can be possible
# to debug the cause of it.
monkeypatch.setattr(sys, "getfilesystemencoding", lambda: "ascii")
with pytest.raises(UnicodeDecodeError):
Path(["", b"foo\xe9"])
out, err = capsys.readouterr()
assert repr(b"foo\xe9") in err
def test_has_drive_letter(monkeypatch):
monkeypatch.setattr(os, "sep", "\\")
p = Path("foo\\bar")
assert not p.has_drive_letter()
p = Path("C:\\")
assert p.has_drive_letter()
p = Path("z:\\foo")
assert p.has_drive_letter()
def test_remove_drive_letter(monkeypatch):
monkeypatch.setattr(os, "sep", "\\")
p = Path("foo\\bar")
eq_(p.remove_drive_letter(), Path("foo\\bar"))
p = Path("C:\\")
eq_(p.remove_drive_letter(), Path(""))
p = Path("z:\\foo")
eq_(p.remove_drive_letter(), Path("foo"))
from ..path import pathify
from pathlib import Path
def test_pathify():

View File

@ -11,7 +11,7 @@ from io import StringIO
from pytest import raises
from ..testutil import eq_
from ..path import Path
from pathlib import Path
from ..util import (
nonone,
tryint,
@ -245,30 +245,30 @@ class TestCaseDeleteIfEmpty:
def test_not_empty(self, tmpdir):
testpath = Path(str(tmpdir))
testpath["foo"].mkdir()
testpath.joinpath("foo").mkdir()
assert not delete_if_empty(testpath)
assert testpath.exists()
def test_with_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir))
testpath["foo"].open("w")
testpath["bar"].open("w")
testpath.joinpath("foo").touch()
testpath.joinpath("bar").touch()
assert delete_if_empty(testpath, ["foo", "bar"])
assert not testpath.exists()
def test_directory_in_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir))
testpath["foo"].mkdir()
testpath.joinpath("foo").mkdir()
assert not delete_if_empty(testpath, ["foo"])
assert testpath.exists()
def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir):
testpath = Path(str(tmpdir))
testpath["foo"].open("w")
testpath["bar"].open("w")
testpath.joinpath("foo").touch()
testpath.joinpath("bar").touch()
assert not delete_if_empty(testpath, ["foo"])
assert testpath.exists()
assert testpath["foo"].exists()
assert testpath.joinpath("foo").exists()
def test_doesnt_exist(self):
# When the 'path' doesn't exist, just do nothing.
@ -276,8 +276,8 @@ class TestCaseDeleteIfEmpty:
def test_is_file(self, tmpdir):
# When 'path' is a file, do nothing.
p = Path(str(tmpdir)) + "filename"
p.open("w").close()
p = Path(str(tmpdir)).joinpath("filename")
p.touch()
delete_if_empty(p) # no crash
def test_ioerror(self, tmpdir, monkeypatch):

View File

@ -15,7 +15,8 @@ import glob
import shutil
from datetime import timedelta
from .path import Path, pathify, log_io_error
from pathlib import Path
from .path import pathify, log_io_error
def nonone(value, replace_value):
@ -354,13 +355,13 @@ def find_in_path(name, paths=None):
@pathify
def delete_if_empty(path: Path, files_to_delete=[]):
"""Deletes the directory at 'path' if it is empty or if it only contains files_to_delete."""
if not path.exists() or not path.isdir():
if not path.exists() or not path.is_dir():
return
contents = path.listdir()
if any(p for p in contents if (p.name not in files_to_delete) or p.isdir()):
contents = list(path.glob("*"))
if any(p for p in contents if (p.name not in files_to_delete) or p.is_dir()):
return False
for p in contents:
p.remove()
p.unlink()
path.rmdir()
return True