From da9f8b2b9df1935c6f8285aa4427d8971512c6b4 Mon Sep 17 00:00:00 2001 From: Andrew Senetar Date: Sun, 27 Mar 2022 23:50:03 -0500 Subject: [PATCH] Squashed commit of the following: commit 8b15fe9a502ebf4841c6529e7098cef03a6a5e6f Author: Andrew Senetar Date: Sun Mar 27 23:48:15 2022 -0500 Finish up changes to copy_or_move commit 21f6a32cf3186a400af8f30e67ad2743dc9a49bd Author: Andrew Senetar Date: Thu Mar 17 23:56:52 2022 -0500 Migrate from hscommon.path to pathlib - Part one, this gets all hscommon and core tests passing - App appears to be able to load directories and complete scans, need further testing - app.py copy_or_move needs some additional work --- core/app.py | 18 +-- core/directories.py | 24 +-- core/fs.py | 16 +- core/prioritize.py | 2 +- core/scanner.py | 4 +- core/tests/app_test.py | 43 +++--- core/tests/base.py | 6 +- core/tests/directories_test.py | 241 +++++++++++++++--------------- core/tests/fs_test.py | 76 +++++----- core/tests/results_test.py | 2 +- core/tests/scanner_test.py | 12 +- hscommon/conflict.py | 11 +- hscommon/path.py | 201 +------------------------ hscommon/tests/conflict_test.py | 54 +++---- hscommon/tests/path_test.py | 257 +------------------------------- hscommon/tests/util_test.py | 20 +-- hscommon/util.py | 11 +- 17 files changed, 267 insertions(+), 731 deletions(-) diff --git a/core/app.py b/core/app.py index 774e1dcc..6869496c 100644 --- a/core/app.py +++ b/core/app.py @@ -10,11 +10,11 @@ import logging import subprocess import re import shutil +from pathlib import Path from send2trash import send2trash from hscommon.jobprogress import job from hscommon.notify import Broadcaster -from hscommon.path import Path from hscommon.conflict import smart_move, smart_copy from hscommon.gui.progress_window import ProgressWindow from hscommon.util import delete_if_empty, first, escape, nonone, allsame @@ -415,7 +415,7 @@ class DupeGuru(Broadcaster): def clean_empty_dirs(self, path): if self.options["clean_empty_dirs"]: while delete_if_empty(path, [".DS_Store"]): - path = path.parent() + path = path.parent def clear_picture_cache(self): try: @@ -428,25 +428,25 @@ class DupeGuru(Broadcaster): def copy_or_move(self, dupe, copy: bool, destination: str, dest_type: DestType): source_path = dupe.path - location_path = first(p for p in self.directories if dupe.path in p) + location_path = first(p for p in self.directories if p in dupe.path.parents) dest_path = Path(destination) if dest_type in {DestType.RELATIVE, DestType.ABSOLUTE}: # no filename, no windows drive letter - source_base = source_path.remove_drive_letter().parent() + source_base = source_path.relative_to(source_path.anchor).parent if dest_type == DestType.RELATIVE: - source_base = source_base[location_path:] - dest_path = dest_path[source_base] + source_base = source_base.relative_to(location_path.relative_to(location_path.anchor)) + dest_path = dest_path.joinpath(source_base) if not dest_path.exists(): - dest_path.makedirs() + dest_path.mkdir(parents=True) # Add filename to dest_path. For file move/copy, it's not required, but for folders, yes. - dest_path = dest_path[source_path.name] + dest_path = dest_path.joinpath(source_path.name) logging.debug("Copy/Move operation from '%s' to '%s'", source_path, dest_path) # Raises an EnvironmentError if there's a problem if copy: smart_copy(source_path, dest_path) else: smart_move(source_path, dest_path) - self.clean_empty_dirs(source_path.parent()) + self.clean_empty_dirs(source_path.parent) def copy_or_move_marked(self, copy): """Start an async move (or copy) job on marked duplicates. diff --git a/core/directories.py b/core/directories.py index 2a381c14..c0cf3a6e 100644 --- a/core/directories.py +++ b/core/directories.py @@ -7,9 +7,9 @@ import os from xml.etree import ElementTree as ET import logging +from pathlib import Path from hscommon.jobprogress import job -from hscommon.path import Path from hscommon.util import FileOrPath from hscommon.trans import tr @@ -63,7 +63,7 @@ class Directories: def __contains__(self, path): for p in self._dirs: - if path in p: + if path == p or p in path.parents: return True return False @@ -94,7 +94,9 @@ class Directories: j.check_if_cancelled() root_path = Path(root) state = self.get_state(root_path) - if state == DirectoryState.EXCLUDED and not any(p[: len(root_path)] == root_path for p in self.states): + if state == DirectoryState.EXCLUDED and not any( + p.parts[: len(root_path.parts)] == root_path.parts for p in self.states + ): # Recursively get files from folders with lots of subfolder is expensive. However, there # might be a subfolder in this path that is not excluded. What we want to do is to skim # through self.states and see if we must continue, or we can stop right here to save time @@ -103,19 +105,19 @@ class Directories: if state != DirectoryState.EXCLUDED: # Old logic if self._exclude_list is None or not self._exclude_list.mark_count: - found_files = [fs.get_file(root_path + f, fileclasses=fileclasses) for f in files] + found_files = [fs.get_file(root_path.joinpath(f), fileclasses=fileclasses) for f in files] else: found_files = [] # print(f"len of files: {len(files)} {files}") for f in files: if not self._exclude_list.is_excluded(root, f): - found_files.append(fs.get_file(root_path + f, fileclasses=fileclasses)) + found_files.append(fs.get_file(root_path.joinpath(f), fileclasses=fileclasses)) found_files = [f for f in found_files if f is not None] # In some cases, directories can be considered as files by dupeGuru, which is # why we have this line below. In fact, there only one case: Bundle files under # OS X... In other situations, this forloop will do nothing. for d in dirs[:]: - f = fs.get_file(root_path + d, fileclasses=fileclasses) + f = fs.get_file(root_path.joinpath(d), fileclasses=fileclasses) if f is not None: found_files.append(f) dirs.remove(d) @@ -159,7 +161,7 @@ class Directories: raise AlreadyThereError() if not path.exists(): raise InvalidPathError() - self._dirs = [p for p in self._dirs if p not in path] + self._dirs = [p for p in self._dirs if path not in p.parents] self._dirs.append(path) @staticmethod @@ -170,7 +172,7 @@ class Directories: :rtype: list of Path """ try: - subpaths = [p for p in path.listdir() if p.isdir()] + subpaths = [p for p in path.glob("*") if p.is_dir()] subpaths.sort(key=lambda x: x.name.lower()) return subpaths except EnvironmentError: @@ -225,8 +227,8 @@ class Directories: # we loop through the states to find the longest matching prefix # if the parent has a state in cache, return that state for p, s in self.states.items(): - if p.is_parent_of(path) and len(p) > prevlen: - prevlen = len(p) + if p in path.parents and len(p.parts) > prevlen: + prevlen = len(p.parts) state = s return state @@ -296,6 +298,6 @@ class Directories: if self.get_state(path) == state: return for iter_path in list(self.states.keys()): - if path.is_parent_of(iter_path): + if path in iter_path.parents: del self.states[iter_path] self.states[path] = state diff --git a/core/fs.py b/core/fs.py index 3710d9ac..bbb133b1 100644 --- a/core/fs.py +++ b/core/fs.py @@ -28,7 +28,7 @@ import sqlite3 from threading import Lock from typing import Any, AnyStr, Union -from hscommon.path import Path +from pathlib import Path from hscommon.util import nonone, get_file_ext __all__ = [ @@ -302,14 +302,14 @@ class File: @classmethod def can_handle(cls, path): """Returns whether this file wrapper class can handle ``path``.""" - return not path.islink() and path.isfile() + return not path.is_symlink() and path.is_file() def rename(self, newname): if newname == self.name: return - destpath = self.path.parent()[newname] + destpath = self.path.parent.joinpath(newname) if destpath.exists(): - raise AlreadyExistsError(newname, self.path.parent()) + raise AlreadyExistsError(newname, self.path.parent) try: self.path.rename(destpath) except EnvironmentError: @@ -333,7 +333,7 @@ class File: @property def folder_path(self): - return self.path.parent() + return self.path.parent class Folder(File): @@ -377,13 +377,13 @@ class Folder(File): @property def subfolders(self): if self._subfolders is None: - subfolders = [p for p in self.path.listdir() if not p.islink() and p.isdir()] + subfolders = [p for p in self.path.glob("*") if not p.is_symlink() and p.is_dir()] self._subfolders = [self.__class__(p) for p in subfolders] return self._subfolders @classmethod def can_handle(cls, path): - return not path.islink() and path.isdir() + return not path.is_symlink() and path.is_dir() def get_file(path, fileclasses=[File]): @@ -408,7 +408,7 @@ def get_files(path, fileclasses=[File]): assert all(issubclass(fileclass, File) for fileclass in fileclasses) try: result = [] - for path in path.listdir(): + for path in path.glob("*"): file = get_file(path, fileclasses=fileclasses) if file is not None: result.append(file) diff --git a/core/prioritize.py b/core/prioritize.py index 1b7826bb..181b5b2a 100644 --- a/core/prioritize.py +++ b/core/prioritize.py @@ -82,7 +82,7 @@ class FolderCategory(ValueListCategory): def sort_key(self, dupe, crit_value): value = self.extract_value(dupe) - if value[: len(crit_value)] == crit_value: + if value.is_relative_to(crit_value): return 0 else: return 1 diff --git a/core/scanner.py b/core/scanner.py index 8791c8e3..b03e1bf0 100644 --- a/core/scanner.py +++ b/core/scanner.py @@ -134,7 +134,7 @@ class Scanner: return False if is_same_with_digit(refname, dupename): return True - return len(dupe.path) > len(ref.path) + return len(dupe.path.parts) > len(ref.path.parts) @staticmethod def get_scan_options(): @@ -164,7 +164,7 @@ class Scanner: toremove = set() last_parent_path = sortedpaths[0] for p in sortedpaths[1:]: - if p in last_parent_path: + if last_parent_path in p.parents: toremove.add(p) else: last_parent_path = p diff --git a/core/tests/app_test.py b/core/tests/app_test.py index 9601486a..bc0104ba 100644 --- a/core/tests/app_test.py +++ b/core/tests/app_test.py @@ -9,7 +9,7 @@ import os.path as op import logging import pytest -from hscommon.path import Path +from pathlib import Path import hscommon.conflict import hscommon.util from hscommon.testutil import eq_, log_calls @@ -56,7 +56,7 @@ class TestCaseDupeGuru: # for this unit is pathetic. What's done is done. My approach now is to add tests for # every change I want to make. The blowup was caused by a missing import. p = Path(str(tmpdir)) - p["foo"].open("w").close() + p.joinpath("foo").touch() monkeypatch.setattr( hscommon.conflict, "smart_copy", @@ -71,19 +71,19 @@ class TestCaseDupeGuru: dgapp.copy_or_move(f, True, "some_destination", 0) eq_(1, len(hscommon.conflict.smart_copy.calls)) call = hscommon.conflict.smart_copy.calls[0] - eq_(call["dest_path"], op.join("some_destination", "foo")) + eq_(call["dest_path"], Path("some_destination", "foo")) eq_(call["source_path"], f.path) def test_copy_or_move_clean_empty_dirs(self, tmpdir, monkeypatch): tmppath = Path(str(tmpdir)) - sourcepath = tmppath["source"] + sourcepath = tmppath.joinpath("source") sourcepath.mkdir() - sourcepath["myfile"].open("w") + sourcepath.joinpath("myfile").touch() app = TestApp().app app.directories.add_path(tmppath) [myfile] = app.directories.get_files() monkeypatch.setattr(app, "clean_empty_dirs", log_calls(lambda path: None)) - app.copy_or_move(myfile, False, tmppath["dest"], 0) + app.copy_or_move(myfile, False, tmppath.joinpath("dest"), 0) calls = app.clean_empty_dirs.calls eq_(1, len(calls)) eq_(sourcepath, calls[0]["path"]) @@ -106,8 +106,8 @@ class TestCaseDupeGuru: # If the ignore_hardlink_matches option is set, don't match files hardlinking to the same # inode. tmppath = Path(str(tmpdir)) - tmppath["myfile"].open("w").write("foo") - os.link(str(tmppath["myfile"]), str(tmppath["hardlink"])) + tmppath.joinpath("myfile").open("wt").write("foo") + os.link(str(tmppath.joinpath("myfile")), str(tmppath.joinpath("hardlink"))) app = TestApp().app app.directories.add_path(tmppath) app.options["scan_type"] = ScanType.CONTENTS @@ -153,7 +153,7 @@ class TestCaseDupeGuruCleanEmptyDirs: # delete_if_empty must be recursively called up in the path until it returns False @log_calls def mock_delete_if_empty(path, files_to_delete=[]): - return len(path) > 1 + return len(path.parts) > 1 monkeypatch.setattr(hscommon.util, "delete_if_empty", mock_delete_if_empty) # XXX This monkeypatch is temporary. will be fixed in a better monkeypatcher. @@ -180,8 +180,8 @@ class TestCaseDupeGuruWithResults: self.rtable.refresh() tmpdir = request.getfixturevalue("tmpdir") tmppath = Path(str(tmpdir)) - tmppath["foo"].mkdir() - tmppath["bar"].mkdir() + tmppath.joinpath("foo").mkdir() + tmppath.joinpath("bar").mkdir() self.app.directories.add_path(tmppath) def test_get_objects(self, do_setup): @@ -424,12 +424,9 @@ class TestCaseDupeGuruRenameSelected: def do_setup(self, request): tmpdir = request.getfixturevalue("tmpdir") p = Path(str(tmpdir)) - fp = open(str(p["foo bar 1"]), mode="w") - fp.close() - fp = open(str(p["foo bar 2"]), mode="w") - fp.close() - fp = open(str(p["foo bar 3"]), mode="w") - fp.close() + p.joinpath("foo bar 1").touch() + p.joinpath("foo bar 2").touch() + p.joinpath("foo bar 3").touch() files = fs.get_files(p) for f in files: f.is_ref = False @@ -451,7 +448,7 @@ class TestCaseDupeGuruRenameSelected: g = self.groups[0] self.rtable.select([1]) assert app.rename_selected("renamed") - names = [p.name for p in self.p.listdir()] + names = [p.name for p in self.p.glob("*")] assert "renamed" in names assert "foo bar 2" not in names eq_(g.dupes[0].name, "renamed") @@ -464,7 +461,7 @@ class TestCaseDupeGuruRenameSelected: assert not app.rename_selected("renamed") msg = logging.warning.calls[0]["msg"] eq_("dupeGuru Warning: list index out of range", msg) - names = [p.name for p in self.p.listdir()] + names = [p.name for p in self.p.glob("*")] assert "renamed" not in names assert "foo bar 2" in names eq_(g.dupes[0].name, "foo bar 2") @@ -477,7 +474,7 @@ class TestCaseDupeGuruRenameSelected: assert not app.rename_selected("foo bar 1") msg = logging.warning.calls[0]["msg"] assert msg.startswith("dupeGuru Warning: 'foo bar 1' already exists in") - names = [p.name for p in self.p.listdir()] + names = [p.name for p in self.p.glob("*")] assert "foo bar 1" in names assert "foo bar 2" in names eq_(g.dupes[0].name, "foo bar 2") @@ -488,9 +485,9 @@ class TestAppWithDirectoriesInTree: def do_setup(self, request): tmpdir = request.getfixturevalue("tmpdir") p = Path(str(tmpdir)) - p["sub1"].mkdir() - p["sub2"].mkdir() - p["sub3"].mkdir() + p.joinpath("sub1").mkdir() + p.joinpath("sub2").mkdir() + p.joinpath("sub3").mkdir() app = TestApp() self.app = app.app self.dtree = app.dtree diff --git a/core/tests/base.py b/core/tests/base.py index 9f6e68d4..5ab2387e 100644 --- a/core/tests/base.py +++ b/core/tests/base.py @@ -5,7 +5,7 @@ # http://www.gnu.org/licenses/gpl-3.0.html from hscommon.testutil import TestApp as TestAppBase, CallLogger, eq_, with_app # noqa -from hscommon.path import Path +from pathlib import Path from hscommon.util import get_file_ext, format_size from hscommon.gui.column import Column from hscommon.jobprogress.job import nulljob, JobCancelled @@ -111,11 +111,11 @@ class NamedObject: @property def path(self): - return self._folder[self.name] + return self._folder.joinpath(self.name) @property def folder_path(self): - return self.path.parent() + return self.path.parent @property def extension(self): diff --git a/core/tests/directories_test.py b/core/tests/directories_test.py index 3622afb6..9f57a154 100644 --- a/core/tests/directories_test.py +++ b/core/tests/directories_test.py @@ -10,7 +10,7 @@ import tempfile import shutil from pytest import raises -from hscommon.path import Path +from pathlib import Path from hscommon.testutil import eq_ from hscommon.plat import ISWINDOWS @@ -26,29 +26,23 @@ from ..exclude import ExcludeList, ExcludeDict def create_fake_fs(rootpath): # We have it as a separate function because other units are using it. - rootpath = rootpath["fs"] + rootpath = rootpath.joinpath("fs") rootpath.mkdir() - rootpath["dir1"].mkdir() - rootpath["dir2"].mkdir() - rootpath["dir3"].mkdir() - fp = rootpath["file1.test"].open("w") - fp.write("1") - fp.close() - fp = rootpath["file2.test"].open("w") - fp.write("12") - fp.close() - fp = rootpath["file3.test"].open("w") - fp.write("123") - fp.close() - fp = rootpath["dir1"]["file1.test"].open("w") - fp.write("1") - fp.close() - fp = rootpath["dir2"]["file2.test"].open("w") - fp.write("12") - fp.close() - fp = rootpath["dir3"]["file3.test"].open("w") - fp.write("123") - fp.close() + rootpath.joinpath("dir1").mkdir() + rootpath.joinpath("dir2").mkdir() + rootpath.joinpath("dir3").mkdir() + with rootpath.joinpath("file1.test").open("wt") as fp: + fp.write("1") + with rootpath.joinpath("file2.test").open("wt") as fp: + fp.write("12") + with rootpath.joinpath("file3.test").open("wt") as fp: + fp.write("123") + with rootpath.joinpath("dir1", "file1.test").open("wt") as fp: + fp.write("1") + with rootpath.joinpath("dir2", "file2.test").open("wt") as fp: + fp.write("12") + with rootpath.joinpath("dir3", "file3.test").open("wt") as fp: + fp.write("123") return rootpath @@ -60,11 +54,10 @@ def setup_module(module): # and another with a more complex structure. testpath = Path(tempfile.mkdtemp()) module.testpath = testpath - rootpath = testpath["onefile"] + rootpath = testpath.joinpath("onefile") rootpath.mkdir() - fp = rootpath["test.txt"].open("w") - fp.write("test_data") - fp.close() + with rootpath.joinpath("test.txt").open("wt") as fp: + fp.write("test_data") create_fake_fs(testpath) @@ -80,13 +73,13 @@ def test_empty(): def test_add_path(): d = Directories() - p = testpath["onefile"] + p = testpath.joinpath("onefile") d.add_path(p) eq_(1, len(d)) assert p in d - assert (p["foobar"]) in d - assert p.parent() not in d - p = testpath["fs"] + assert (p.joinpath("foobar")) in d + assert p.parent not in d + p = testpath.joinpath("fs") d.add_path(p) eq_(2, len(d)) assert p in d @@ -94,18 +87,18 @@ def test_add_path(): def test_add_path_when_path_is_already_there(): d = Directories() - p = testpath["onefile"] + p = testpath.joinpath("onefile") d.add_path(p) with raises(AlreadyThereError): d.add_path(p) with raises(AlreadyThereError): - d.add_path(p["foobar"]) + d.add_path(p.joinpath("foobar")) eq_(1, len(d)) def test_add_path_containing_paths_already_there(): d = Directories() - d.add_path(testpath["onefile"]) + d.add_path(testpath.joinpath("onefile")) eq_(1, len(d)) d.add_path(testpath) eq_(len(d), 1) @@ -114,7 +107,7 @@ def test_add_path_containing_paths_already_there(): def test_add_path_non_latin(tmpdir): p = Path(str(tmpdir)) - to_add = p["unicode\u201a"] + to_add = p.joinpath("unicode\u201a") os.mkdir(str(to_add)) d = Directories() try: @@ -125,25 +118,25 @@ def test_add_path_non_latin(tmpdir): def test_del(): d = Directories() - d.add_path(testpath["onefile"]) + d.add_path(testpath.joinpath("onefile")) try: del d[1] assert False except IndexError: pass - d.add_path(testpath["fs"]) + d.add_path(testpath.joinpath("fs")) del d[1] eq_(1, len(d)) def test_states(): d = Directories() - p = testpath["onefile"] + p = testpath.joinpath("onefile") d.add_path(p) eq_(DirectoryState.NORMAL, d.get_state(p)) d.set_state(p, DirectoryState.REFERENCE) eq_(DirectoryState.REFERENCE, d.get_state(p)) - eq_(DirectoryState.REFERENCE, d.get_state(p["dir1"])) + eq_(DirectoryState.REFERENCE, d.get_state(p.joinpath("dir1"))) eq_(1, len(d.states)) eq_(p, list(d.states.keys())[0]) eq_(DirectoryState.REFERENCE, d.states[p]) @@ -152,7 +145,7 @@ def test_states(): def test_get_state_with_path_not_there(): # When the path's not there, just return DirectoryState.Normal d = Directories() - d.add_path(testpath["onefile"]) + d.add_path(testpath.joinpath("onefile")) eq_(d.get_state(testpath), DirectoryState.NORMAL) @@ -160,26 +153,26 @@ def test_states_overwritten_when_larger_directory_eat_smaller_ones(): # ref #248 # When setting the state of a folder, we overwrite previously set states for subfolders. d = Directories() - p = testpath["onefile"] + p = testpath.joinpath("onefile") d.add_path(p) d.set_state(p, DirectoryState.EXCLUDED) d.add_path(testpath) d.set_state(testpath, DirectoryState.REFERENCE) eq_(d.get_state(p), DirectoryState.REFERENCE) - eq_(d.get_state(p["dir1"]), DirectoryState.REFERENCE) + eq_(d.get_state(p.joinpath("dir1")), DirectoryState.REFERENCE) eq_(d.get_state(testpath), DirectoryState.REFERENCE) def test_get_files(): d = Directories() - p = testpath["fs"] + p = testpath.joinpath("fs") d.add_path(p) - d.set_state(p["dir1"], DirectoryState.REFERENCE) - d.set_state(p["dir2"], DirectoryState.EXCLUDED) + d.set_state(p.joinpath("dir1"), DirectoryState.REFERENCE) + d.set_state(p.joinpath("dir2"), DirectoryState.EXCLUDED) files = list(d.get_files()) eq_(5, len(files)) for f in files: - if f.path.parent() == p["dir1"]: + if f.path.parent == p.joinpath("dir1"): assert f.is_ref else: assert not f.is_ref @@ -193,7 +186,7 @@ def test_get_files_with_folders(): return True d = Directories() - p = testpath["fs"] + p = testpath.joinpath("fs") d.add_path(p) files = list(d.get_files(fileclasses=[FakeFile])) # We have the 3 root files and the 3 root dirs @@ -202,23 +195,23 @@ def test_get_files_with_folders(): def test_get_folders(): d = Directories() - p = testpath["fs"] + p = testpath.joinpath("fs") d.add_path(p) - d.set_state(p["dir1"], DirectoryState.REFERENCE) - d.set_state(p["dir2"], DirectoryState.EXCLUDED) + d.set_state(p.joinpath("dir1"), DirectoryState.REFERENCE) + d.set_state(p.joinpath("dir2"), DirectoryState.EXCLUDED) folders = list(d.get_folders()) eq_(len(folders), 3) ref = [f for f in folders if f.is_ref] not_ref = [f for f in folders if not f.is_ref] eq_(len(ref), 1) - eq_(ref[0].path, p["dir1"]) + eq_(ref[0].path, p.joinpath("dir1")) eq_(len(not_ref), 2) eq_(ref[0].size, 1) def test_get_files_with_inherited_exclusion(): d = Directories() - p = testpath["onefile"] + p = testpath.joinpath("onefile") d.add_path(p) d.set_state(p, DirectoryState.EXCLUDED) eq_([], list(d.get_files())) @@ -234,13 +227,13 @@ def test_save_and_load(tmpdir): d1.add_path(p1) d1.add_path(p2) d1.set_state(p1, DirectoryState.REFERENCE) - d1.set_state(p1["dir1"], DirectoryState.EXCLUDED) + d1.set_state(p1.joinpath("dir1"), DirectoryState.EXCLUDED) tmpxml = str(tmpdir.join("directories_testunit.xml")) d1.save_to_file(tmpxml) d2.load_from_file(tmpxml) eq_(2, len(d2)) eq_(DirectoryState.REFERENCE, d2.get_state(p1)) - eq_(DirectoryState.EXCLUDED, d2.get_state(p1["dir1"])) + eq_(DirectoryState.EXCLUDED, d2.get_state(p1.joinpath("dir1"))) def test_invalid_path(): @@ -268,7 +261,7 @@ def test_load_from_file_with_invalid_path(tmpdir): # This test simulates a load from file resulting in a # InvalidPath raise. Other directories must be loaded. d1 = Directories() - d1.add_path(testpath["onefile"]) + d1.add_path(testpath.joinpath("onefile")) # Will raise InvalidPath upon loading p = Path(str(tmpdir.join("toremove"))) p.mkdir() @@ -283,11 +276,11 @@ def test_load_from_file_with_invalid_path(tmpdir): def test_unicode_save(tmpdir): d = Directories() - p1 = Path(str(tmpdir))["hello\xe9"] + p1 = Path(str(tmpdir), "hello\xe9") p1.mkdir() - p1["foo\xe9"].mkdir() + p1.joinpath("foo\xe9").mkdir() d.add_path(p1) - d.set_state(p1["foo\xe9"], DirectoryState.EXCLUDED) + d.set_state(p1.joinpath("foo\xe9"), DirectoryState.EXCLUDED) tmpxml = str(tmpdir.join("directories_testunit.xml")) try: d.save_to_file(tmpxml) @@ -297,12 +290,12 @@ def test_unicode_save(tmpdir): def test_get_files_refreshes_its_directories(): d = Directories() - p = testpath["fs"] + p = testpath.joinpath("fs") d.add_path(p) files = d.get_files() eq_(6, len(list(files))) time.sleep(1) - os.remove(str(p["dir1"]["file1.test"])) + os.remove(str(p.joinpath("dir1", "file1.test"))) files = d.get_files() eq_(5, len(list(files))) @@ -311,15 +304,15 @@ def test_get_files_does_not_choke_on_non_existing_directories(tmpdir): d = Directories() p = Path(str(tmpdir)) d.add_path(p) - p.rmtree() + shutil.rmtree(str(p)) eq_([], list(d.get_files())) def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir): d = Directories() p = Path(str(tmpdir)) - hidden_dir_path = p[".foo"] - p[".foo"].mkdir() + hidden_dir_path = p.joinpath(".foo") + p.joinpath(".foo").mkdir() d.add_path(p) eq_(d.get_state(hidden_dir_path), DirectoryState.EXCLUDED) # But it can be overriden @@ -331,22 +324,22 @@ def test_default_path_state_override(tmpdir): # It's possible for a subclass to override the default state of a path class MyDirectories(Directories): def _default_state_for_path(self, path): - if "foobar" in path: + if "foobar" in path.parts: return DirectoryState.EXCLUDED d = MyDirectories() p1 = Path(str(tmpdir)) - p1["foobar"].mkdir() - p1["foobar/somefile"].open("w").close() - p1["foobaz"].mkdir() - p1["foobaz/somefile"].open("w").close() + p1.joinpath("foobar").mkdir() + p1.joinpath("foobar/somefile").touch() + p1.joinpath("foobaz").mkdir() + p1.joinpath("foobaz/somefile").touch() d.add_path(p1) - eq_(d.get_state(p1["foobaz"]), DirectoryState.NORMAL) - eq_(d.get_state(p1["foobar"]), DirectoryState.EXCLUDED) + eq_(d.get_state(p1.joinpath("foobaz")), DirectoryState.NORMAL) + eq_(d.get_state(p1.joinpath("foobar")), DirectoryState.EXCLUDED) eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there # However, the default state can be changed - d.set_state(p1["foobar"], DirectoryState.NORMAL) - eq_(d.get_state(p1["foobar"]), DirectoryState.NORMAL) + d.set_state(p1.joinpath("foobar"), DirectoryState.NORMAL) + eq_(d.get_state(p1.joinpath("foobar")), DirectoryState.NORMAL) eq_(len(list(d.get_files())), 2) @@ -372,42 +365,42 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled self.d._exclude_list.add(regex) self.d._exclude_list.mark(regex) p1 = Path(str(tmpdir)) - p1["$Recycle.Bin"].mkdir() - p1["$Recycle.Bin"]["subdir"].mkdir() + p1.joinpath("$Recycle.Bin").mkdir() + p1.joinpath("$Recycle.Bin", "subdir").mkdir() self.d.add_path(p1) - eq_(self.d.get_state(p1["$Recycle.Bin"]), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin")), DirectoryState.EXCLUDED) # By default, subdirs should be excluded too, but this can be overridden separately - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.EXCLUDED) - self.d.set_state(p1["$Recycle.Bin"]["subdir"], DirectoryState.NORMAL) - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.EXCLUDED) + self.d.set_state(p1.joinpath("$Recycle.Bin", "subdir"), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL) def test_exclude_refined(self, tmpdir): regex1 = r"^\$Recycle\.Bin$" self.d._exclude_list.add(regex1) self.d._exclude_list.mark(regex1) p1 = Path(str(tmpdir)) - p1["$Recycle.Bin"].mkdir() - p1["$Recycle.Bin"]["somefile.png"].open("w").close() - p1["$Recycle.Bin"]["some_unwanted_file.jpg"].open("w").close() - p1["$Recycle.Bin"]["subdir"].mkdir() - p1["$Recycle.Bin"]["subdir"]["somesubdirfile.png"].open("w").close() - p1["$Recycle.Bin"]["subdir"]["unwanted_subdirfile.gif"].open("w").close() - p1["$Recycle.Bin"]["subdar"].mkdir() - p1["$Recycle.Bin"]["subdar"]["somesubdarfile.jpeg"].open("w").close() - p1["$Recycle.Bin"]["subdar"]["unwanted_subdarfile.png"].open("w").close() - self.d.add_path(p1["$Recycle.Bin"]) + p1.joinpath("$Recycle.Bin").mkdir() + p1.joinpath("$Recycle.Bin", "somefile.png").touch() + p1.joinpath("$Recycle.Bin", "some_unwanted_file.jpg").touch() + p1.joinpath("$Recycle.Bin", "subdir").mkdir() + p1.joinpath("$Recycle.Bin", "subdir", "somesubdirfile.png").touch() + p1.joinpath("$Recycle.Bin", "subdir", "unwanted_subdirfile.gif").touch() + p1.joinpath("$Recycle.Bin", "subdar").mkdir() + p1.joinpath("$Recycle.Bin", "subdar", "somesubdarfile.jpeg").touch() + p1.joinpath("$Recycle.Bin", "subdar", "unwanted_subdarfile.png").touch() + self.d.add_path(p1.joinpath("$Recycle.Bin")) # Filter should set the default state to Excluded - eq_(self.d.get_state(p1["$Recycle.Bin"]), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin")), DirectoryState.EXCLUDED) # The subdir should inherit its parent state - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.EXCLUDED) - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdar"]), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdar")), DirectoryState.EXCLUDED) # Override a child path's state - self.d.set_state(p1["$Recycle.Bin"]["subdir"], DirectoryState.NORMAL) - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL) + self.d.set_state(p1.joinpath("$Recycle.Bin", "subdir"), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL) # Parent should keep its default state, and the other child too - eq_(self.d.get_state(p1["$Recycle.Bin"]), DirectoryState.EXCLUDED) - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdar"]), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin")), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdar")), DirectoryState.EXCLUDED) # print(f"get_folders(): {[x for x in self.d.get_folders()]}") # only the 2 files directly under the Normal directory @@ -419,8 +412,8 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled assert "somesubdirfile.png" in files assert "unwanted_subdirfile.gif" in files # Overriding the parent should enable all children - self.d.set_state(p1["$Recycle.Bin"], DirectoryState.NORMAL) - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdar"]), DirectoryState.NORMAL) + self.d.set_state(p1.joinpath("$Recycle.Bin"), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdar")), DirectoryState.NORMAL) # all files there files = self.get_files_and_expect_num_result(6) assert "somefile.png" in files @@ -444,7 +437,7 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled assert self.d._exclude_list.error(regex3) is None # print(f"get_folders(): {[x for x in self.d.get_folders()]}") # Directory shouldn't change its state here, unless explicitely done by user - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL) files = self.get_files_and_expect_num_result(5) assert "unwanted_subdirfile.gif" not in files assert "unwanted_subdarfile.png" in files @@ -453,15 +446,15 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled regex4 = r".*subdir$" self.d._exclude_list.rename(regex3, regex4) assert self.d._exclude_list.error(regex4) is None - p1["$Recycle.Bin"]["subdar"]["file_ending_with_subdir"].open("w").close() - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.EXCLUDED) + p1.joinpath("$Recycle.Bin", "subdar", "file_ending_with_subdir").touch() + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.EXCLUDED) files = self.get_files_and_expect_num_result(4) assert "file_ending_with_subdir" not in files assert "somesubdarfile.jpeg" in files assert "somesubdirfile.png" not in files assert "unwanted_subdirfile.gif" not in files - self.d.set_state(p1["$Recycle.Bin"]["subdir"], DirectoryState.NORMAL) - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL) + self.d.set_state(p1.joinpath("$Recycle.Bin", "subdir"), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL) # print(f"get_folders(): {[x for x in self.d.get_folders()]}") files = self.get_files_and_expect_num_result(6) assert "file_ending_with_subdir" not in files @@ -471,9 +464,9 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled regex5 = r".*subdir.*" self.d._exclude_list.rename(regex4, regex5) # Files containing substring should be filtered - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL) # The path should not match, only the filename, the "subdir" in the directory name shouldn't matter - p1["$Recycle.Bin"]["subdir"]["file_which_shouldnt_match"].open("w").close() + p1.joinpath("$Recycle.Bin", "subdir", "file_which_shouldnt_match").touch() files = self.get_files_and_expect_num_result(5) assert "somesubdirfile.png" not in files assert "unwanted_subdirfile.gif" not in files @@ -493,7 +486,7 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled assert self.d._exclude_list.error(regex6) is None assert regex6 in self.d._exclude_list # This still should not be affected - eq_(self.d.get_state(p1["$Recycle.Bin"]["subdir"]), DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "subdir")), DirectoryState.NORMAL) files = self.get_files_and_expect_num_result(5) # These files are under the "/subdir" directory assert "somesubdirfile.png" not in files @@ -505,20 +498,20 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled def test_japanese_unicode(self, tmpdir): p1 = Path(str(tmpdir)) - p1["$Recycle.Bin"].mkdir() - p1["$Recycle.Bin"]["somerecycledfile.png"].open("w").close() - p1["$Recycle.Bin"]["some_unwanted_file.jpg"].open("w").close() - p1["$Recycle.Bin"]["subdir"].mkdir() - p1["$Recycle.Bin"]["subdir"]["過去白濁物語~]_カラー.jpg"].open("w").close() - p1["$Recycle.Bin"]["思叫物語"].mkdir() - p1["$Recycle.Bin"]["思叫物語"]["なししろ会う前"].open("w").close() - p1["$Recycle.Bin"]["思叫物語"]["堂~ロ"].open("w").close() - self.d.add_path(p1["$Recycle.Bin"]) + p1.joinpath("$Recycle.Bin").mkdir() + p1.joinpath("$Recycle.Bin", "somerecycledfile.png").touch() + p1.joinpath("$Recycle.Bin", "some_unwanted_file.jpg").touch() + p1.joinpath("$Recycle.Bin", "subdir").mkdir() + p1.joinpath("$Recycle.Bin", "subdir", "過去白濁物語~]_カラー.jpg").touch() + p1.joinpath("$Recycle.Bin", "思叫物語").mkdir() + p1.joinpath("$Recycle.Bin", "思叫物語", "なししろ会う前").touch() + p1.joinpath("$Recycle.Bin", "思叫物語", "堂~ロ").touch() + self.d.add_path(p1.joinpath("$Recycle.Bin")) regex3 = r".*物語.*" self.d._exclude_list.add(regex3) self.d._exclude_list.mark(regex3) # print(f"get_folders(): {[x for x in self.d.get_folders()]}") - eq_(self.d.get_state(p1["$Recycle.Bin"]["思叫物語"]), DirectoryState.EXCLUDED) + eq_(self.d.get_state(p1.joinpath("$Recycle.Bin", "思叫物語")), DirectoryState.EXCLUDED) files = self.get_files_and_expect_num_result(2) assert "過去白濁物語~]_カラー.jpg" not in files assert "なししろ会う前" not in files @@ -527,7 +520,7 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled regex4 = r".*物語$" self.d._exclude_list.rename(regex3, regex4) assert self.d._exclude_list.error(regex4) is None - self.d.set_state(p1["$Recycle.Bin"]["思叫物語"], DirectoryState.NORMAL) + self.d.set_state(p1.joinpath("$Recycle.Bin", "思叫物語"), DirectoryState.NORMAL) files = self.get_files_and_expect_num_result(5) assert "過去白濁物語~]_カラー.jpg" in files assert "なししろ会う前" in files @@ -539,15 +532,15 @@ files: {self.d._exclude_list.compiled_files} all: {self.d._exclude_list.compiled self.d._exclude_list.add(regex) self.d._exclude_list.mark(regex) p1 = Path(str(tmpdir)) - p1["foobar"].mkdir() - p1["foobar"][".hidden_file.txt"].open("w").close() - p1["foobar"][".hidden_dir"].mkdir() - p1["foobar"][".hidden_dir"]["foobar.jpg"].open("w").close() - p1["foobar"][".hidden_dir"][".hidden_subfile.png"].open("w").close() - self.d.add_path(p1["foobar"]) + p1.joinpath("foobar").mkdir() + p1.joinpath("foobar", ".hidden_file.txt").touch() + p1.joinpath("foobar", ".hidden_dir").mkdir() + p1.joinpath("foobar", ".hidden_dir", "foobar.jpg").touch() + p1.joinpath("foobar", ".hidden_dir", ".hidden_subfile.png").touch() + self.d.add_path(p1.joinpath("foobar")) # It should not inherit its parent's state originally - eq_(self.d.get_state(p1["foobar"][".hidden_dir"]), DirectoryState.EXCLUDED) - self.d.set_state(p1["foobar"][".hidden_dir"], DirectoryState.NORMAL) + eq_(self.d.get_state(p1.joinpath("foobar", ".hidden_dir")), DirectoryState.EXCLUDED) + self.d.set_state(p1.joinpath("foobar", ".hidden_dir"), DirectoryState.NORMAL) # The files should still be filtered files = self.get_files_and_expect_num_result(1) eq_(len(self.d._exclude_list.compiled_paths), 0) diff --git a/core/tests/fs_test.py b/core/tests/fs_test.py index 529ca03a..47fb9b3c 100644 --- a/core/tests/fs_test.py +++ b/core/tests/fs_test.py @@ -17,7 +17,7 @@ except ImportError: from os import urandom -from hscommon.path import Path +from pathlib import Path from hscommon.testutil import eq_ from core.tests.directories_test import create_fake_fs @@ -25,32 +25,26 @@ from .. import fs def create_fake_fs_with_random_data(rootpath): - rootpath = rootpath["fs"] + rootpath = rootpath.joinpath("fs") rootpath.mkdir() - rootpath["dir1"].mkdir() - rootpath["dir2"].mkdir() - rootpath["dir3"].mkdir() - fp = rootpath["file1.test"].open("wb") + rootpath.joinpath("dir1").mkdir() + rootpath.joinpath("dir2").mkdir() + rootpath.joinpath("dir3").mkdir() data1 = urandom(200 * 1024) # 200KiB data2 = urandom(1024 * 1024) # 1MiB data3 = urandom(10 * 1024 * 1024) # 10MiB - fp.write(data1) - fp.close() - fp = rootpath["file2.test"].open("wb") - fp.write(data2) - fp.close() - fp = rootpath["file3.test"].open("wb") - fp.write(data3) - fp.close() - fp = rootpath["dir1"]["file1.test"].open("wb") - fp.write(data1) - fp.close() - fp = rootpath["dir2"]["file2.test"].open("wb") - fp.write(data2) - fp.close() - fp = rootpath["dir3"]["file3.test"].open("wb") - fp.write(data3) - fp.close() + with rootpath.joinpath("file1.test").open("wb") as fp: + fp.write(data1) + with rootpath.joinpath("file2.test").open("wb") as fp: + fp.write(data2) + with rootpath.joinpath("file3.test").open("wb") as fp: + fp.write(data3) + with rootpath.joinpath("dir1", "file1.test").open("wb") as fp: + fp.write(data1) + with rootpath.joinpath("dir2", "file2.test").open("wb") as fp: + fp.write(data2) + with rootpath.joinpath("dir3", "file3.test").open("wb") as fp: + fp.write(data3) return rootpath @@ -66,12 +60,12 @@ def test_digest_aggregate_subfiles_sorted(tmpdir): # same order everytime. p = create_fake_fs_with_random_data(Path(str(tmpdir))) b = fs.Folder(p) - digest1 = fs.File(p["dir1"]["file1.test"]).digest - digest2 = fs.File(p["dir2"]["file2.test"]).digest - digest3 = fs.File(p["dir3"]["file3.test"]).digest - digest4 = fs.File(p["file1.test"]).digest - digest5 = fs.File(p["file2.test"]).digest - digest6 = fs.File(p["file3.test"]).digest + digest1 = fs.File(p.joinpath("dir1", "file1.test")).digest + digest2 = fs.File(p.joinpath("dir2", "file2.test")).digest + digest3 = fs.File(p.joinpath("dir3", "file3.test")).digest + digest4 = fs.File(p.joinpath("file1.test")).digest + digest5 = fs.File(p.joinpath("file2.test")).digest + digest6 = fs.File(p.joinpath("file3.test")).digest # The expected digest is the hash of digests for folders and the direct digest for files folder_digest1 = hasher(digest1).digest() folder_digest2 = hasher(digest2).digest() @@ -83,12 +77,12 @@ def test_digest_aggregate_subfiles_sorted(tmpdir): def test_partial_digest_aggregate_subfile_sorted(tmpdir): p = create_fake_fs_with_random_data(Path(str(tmpdir))) b = fs.Folder(p) - digest1 = fs.File(p["dir1"]["file1.test"]).digest_partial - digest2 = fs.File(p["dir2"]["file2.test"]).digest_partial - digest3 = fs.File(p["dir3"]["file3.test"]).digest_partial - digest4 = fs.File(p["file1.test"]).digest_partial - digest5 = fs.File(p["file2.test"]).digest_partial - digest6 = fs.File(p["file3.test"]).digest_partial + digest1 = fs.File(p.joinpath("dir1", "file1.test")).digest_partial + digest2 = fs.File(p.joinpath("dir2", "file2.test")).digest_partial + digest3 = fs.File(p.joinpath("dir3", "file3.test")).digest_partial + digest4 = fs.File(p.joinpath("file1.test")).digest_partial + digest5 = fs.File(p.joinpath("file2.test")).digest_partial + digest6 = fs.File(p.joinpath("file3.test")).digest_partial # The expected digest is the hash of digests for folders and the direct digest for files folder_digest1 = hasher(digest1).digest() folder_digest2 = hasher(digest2).digest() @@ -96,12 +90,12 @@ def test_partial_digest_aggregate_subfile_sorted(tmpdir): digest = hasher(folder_digest1 + folder_digest2 + folder_digest3 + digest4 + digest5 + digest6).digest() eq_(b.digest_partial, digest) - digest1 = fs.File(p["dir1"]["file1.test"]).digest_samples - digest2 = fs.File(p["dir2"]["file2.test"]).digest_samples - digest3 = fs.File(p["dir3"]["file3.test"]).digest_samples - digest4 = fs.File(p["file1.test"]).digest_samples - digest5 = fs.File(p["file2.test"]).digest_samples - digest6 = fs.File(p["file3.test"]).digest_samples + digest1 = fs.File(p.joinpath("dir1", "file1.test")).digest_samples + digest2 = fs.File(p.joinpath("dir2", "file2.test")).digest_samples + digest3 = fs.File(p.joinpath("dir3", "file3.test")).digest_samples + digest4 = fs.File(p.joinpath("file1.test")).digest_samples + digest5 = fs.File(p.joinpath("file2.test")).digest_samples + digest6 = fs.File(p.joinpath("file3.test")).digest_samples # The expected digest is the digest of digests for folders and the direct digest for files folder_digest1 = hasher(digest1).digest() folder_digest2 = hasher(digest2).digest() diff --git a/core/tests/results_test.py b/core/tests/results_test.py index 30674633..363f3cdc 100644 --- a/core/tests/results_test.py +++ b/core/tests/results_test.py @@ -447,7 +447,7 @@ class TestCaseResultsXML: self.results.groups = self.groups def get_file(self, path): # use this as a callback for load_from_xml - return [o for o in self.objects if o.path == path][0] + return [o for o in self.objects if str(o.path) == path][0] def test_save_to_xml(self): self.objects[0].is_ref = True diff --git a/core/tests/scanner_test.py b/core/tests/scanner_test.py index 909ee3d4..bfdbdc57 100644 --- a/core/tests/scanner_test.py +++ b/core/tests/scanner_test.py @@ -7,7 +7,7 @@ import pytest from hscommon.jobprogress import job -from hscommon.path import Path +from pathlib import Path from hscommon.testutil import eq_ from .. import fs @@ -22,7 +22,7 @@ class NamedObject: if path is None: path = Path(name) else: - path = Path(path)[name] + path = Path(path, name) self.name = name self.size = size self.path = path @@ -572,12 +572,14 @@ def test_dont_group_files_that_dont_exist(tmpdir): s = Scanner() s.scan_type = ScanType.CONTENTS p = Path(str(tmpdir)) - p["file1"].open("w").write("foo") - p["file2"].open("w").write("foo") + with p.joinpath("file1").open("w") as fp: + fp.write("foo") + with p.joinpath("file2").open("w") as fp: + fp.write("foo") file1, file2 = fs.get_files(p) def getmatches(*args, **kw): - file2.path.remove() + file2.path.unlink() return [Match(file1, file2, 100)] s._getmatches = getmatches diff --git a/hscommon/conflict.py b/hscommon/conflict.py index a661e7b9..42acf46d 100644 --- a/hscommon/conflict.py +++ b/hscommon/conflict.py @@ -14,7 +14,7 @@ import re import os import shutil -from .path import Path, pathify +from pathlib import Path # This matches [123], but not [12] (3 digits being the minimum). # It also matches [1234] [12345] etc.. @@ -52,16 +52,15 @@ def is_conflicted(name): return re_conflict.match(name) is not None -@pathify def _smart_move_or_copy(operation, source_path: Path, dest_path: Path): """Use move() or copy() to move and copy file with the conflict management.""" - if dest_path.isdir() and not source_path.isdir(): - dest_path = dest_path[source_path.name] + if dest_path.is_dir() and not source_path.is_dir(): + dest_path = dest_path.joinpath(source_path.name) if dest_path.exists(): filename = dest_path.name - dest_dir_path = dest_path.parent() + dest_dir_path = dest_path.parent newname = get_conflicted_name(os.listdir(str(dest_dir_path)), filename) - dest_path = dest_dir_path[newname] + dest_path = dest_dir_path.joinpath(newname) operation(str(source_path), str(dest_path)) diff --git a/hscommon/path.py b/hscommon/path.py index 23d41f1e..08acce80 100644 --- a/hscommon/path.py +++ b/hscommon/path.py @@ -7,208 +7,9 @@ # http://www.gnu.org/licenses/gpl-3.0.html import logging -import os -import os.path as op -import shutil -import sys -from itertools import takewhile from functools import wraps from inspect import signature - - -class Path(tuple): - """A handy class to work with paths. - - We subclass ``tuple``, each element of the tuple represents an element of the path. - - * ``Path('/foo/bar/baz')[1]`` --> ``'bar'`` - * ``Path('/foo/bar/baz')[1:2]`` --> ``Path('bar/baz')`` - * ``Path('/foo/bar')['baz']`` --> ``Path('/foo/bar/baz')`` - * ``str(Path('/foo/bar/baz'))`` --> ``'/foo/bar/baz'`` - """ - - # Saves a little bit of memory usage - __slots__ = () - - def __new__(cls, value, separator=None): - def unicode_if_needed(s): - if isinstance(s, str): - return s - else: - try: - return str(s, sys.getfilesystemencoding()) - except UnicodeDecodeError: - logging.warning("Could not decode %r", s) - raise - - if isinstance(value, Path): - return value - if not separator: - separator = os.sep - if isinstance(value, bytes): - value = unicode_if_needed(value) - if isinstance(value, str): - if value: - if (separator not in value) and ("/" in value): - separator = "/" - value = value.split(separator) - else: - value = () - else: - if any(isinstance(x, bytes) for x in value): - value = [unicode_if_needed(x) for x in value] - # value is a tuple/list - if any(separator in x for x in value): - # We have a component with a separator in it. Let's rejoin it, and generate another path. - return Path(separator.join(value), separator) - if (len(value) > 1) and (not value[-1]): - value = value[ - :-1 - ] # We never want a path to end with a '' (because Path() can be called with a trailing slash ending path) - return tuple.__new__(cls, value) - - def __add__(self, other): - other = Path(other) - if other and (not other[0]): - other = other[1:] - return Path(tuple.__add__(self, other)) - - def __contains__(self, item): - if isinstance(item, Path): - return item[: len(self)] == self - else: - return tuple.__contains__(self, item) - - def __eq__(self, other): - return tuple.__eq__(self, Path(other)) - - def __getitem__(self, key): - if isinstance(key, slice): - if isinstance(key.start, Path): - equal_elems = list(takewhile(lambda pair: pair[0] == pair[1], zip(self, key.start))) - key = slice(len(equal_elems), key.stop, key.step) - if isinstance(key.stop, Path): - equal_elems = list( - takewhile( - lambda pair: pair[0] == pair[1], - zip(reversed(self), reversed(key.stop)), - ) - ) - stop = -len(equal_elems) if equal_elems else None - key = slice(key.start, stop, key.step) - return Path(tuple.__getitem__(self, key)) - elif isinstance(key, (str, Path)): - return self + key - else: - return tuple.__getitem__(self, key) - - def __hash__(self): - return tuple.__hash__(self) - - def __ne__(self, other): - return not self.__eq__(other) - - def __radd__(self, other): - return Path(other) + self - - def __str__(self): - if len(self) == 1: - first = self[0] - if (len(first) == 2) and (first[1] == ":"): # Windows drive letter - return first + "\\" - elif not len(first): # root directory - return "/" - return os.sep.join(self) - - def has_drive_letter(self): - if not self: - return False - first = self[0] - return (len(first) == 2) and (first[1] == ":") - - def is_parent_of(self, other): - """Whether ``other`` is a subpath of ``self``. - - Almost the same as ``other in self``, but it's a bit more self-explicative and when - ``other == self``, returns False. - """ - if other == self: - return False - else: - return other in self - - def remove_drive_letter(self): - if self.has_drive_letter(): - return self[1:] - else: - return self - - def tobytes(self): - return str(self).encode(sys.getfilesystemencoding()) - - def parent(self): - """Returns the parent path. - - ``Path('/foo/bar/baz').parent()`` --> ``Path('/foo/bar')`` - """ - return self[:-1] - - @property - def name(self): - """Last element of the path (filename), with extension. - - ``Path('/foo/bar/baz').name`` --> ``'baz'`` - """ - return self[-1] - - # OS method wrappers - def exists(self): - return op.exists(str(self)) - - def copy(self, dest_path): - return shutil.copy(str(self), str(dest_path)) - - def copytree(self, dest_path, *args, **kwargs): - return shutil.copytree(str(self), str(dest_path), *args, **kwargs) - - def isdir(self): - return op.isdir(str(self)) - - def isfile(self): - return op.isfile(str(self)) - - def islink(self): - return op.islink(str(self)) - - def listdir(self): - return [self[name] for name in os.listdir(str(self))] - - def mkdir(self, *args, **kwargs): - return os.mkdir(str(self), *args, **kwargs) - - def makedirs(self, *args, **kwargs): - return os.makedirs(str(self), *args, **kwargs) - - def move(self, dest_path): - return shutil.move(str(self), str(dest_path)) - - def open(self, *args, **kwargs): - return open(str(self), *args, **kwargs) - - def remove(self): - return os.remove(str(self)) - - def rename(self, dest_path): - return os.rename(str(self), str(dest_path)) - - def rmdir(self): - return os.rmdir(str(self)) - - def rmtree(self): - return shutil.rmtree(str(self)) - - def stat(self): - return os.stat(str(self)) +from pathlib import Path def pathify(f): diff --git a/hscommon/tests/conflict_test.py b/hscommon/tests/conflict_test.py index 20fd90a2..244fba0f 100644 --- a/hscommon/tests/conflict_test.py +++ b/hscommon/tests/conflict_test.py @@ -15,7 +15,7 @@ from ..conflict import ( smart_copy, smart_move, ) -from ..path import Path +from pathlib import Path from ..testutil import eq_ @@ -71,43 +71,43 @@ class TestCaseMoveCopy: def do_setup(self, request): tmpdir = request.getfixturevalue("tmpdir") self.path = Path(str(tmpdir)) - self.path["foo"].open("w").close() - self.path["bar"].open("w").close() - self.path["dir"].mkdir() + self.path.joinpath("foo").touch() + self.path.joinpath("bar").touch() + self.path.joinpath("dir").mkdir() def test_move_no_conflict(self, do_setup): - smart_move(self.path + "foo", self.path + "baz") - assert self.path["baz"].exists() - assert not self.path["foo"].exists() + smart_move(self.path.joinpath("foo"), self.path.joinpath("baz")) + assert self.path.joinpath("baz").exists() + assert not self.path.joinpath("foo").exists() def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move - smart_copy(self.path + "foo", self.path + "baz") - assert self.path["baz"].exists() - assert self.path["foo"].exists() + smart_copy(self.path.joinpath("foo"), self.path.joinpath("baz")) + assert self.path.joinpath("baz").exists() + assert self.path.joinpath("foo").exists() def test_move_no_conflict_dest_is_dir(self, do_setup): - smart_move(self.path + "foo", self.path + "dir") - assert self.path["dir"]["foo"].exists() - assert not self.path["foo"].exists() + smart_move(self.path.joinpath("foo"), self.path.joinpath("dir")) + assert self.path.joinpath("dir", "foo").exists() + assert not self.path.joinpath("foo").exists() def test_move_conflict(self, do_setup): - smart_move(self.path + "foo", self.path + "bar") - assert self.path["[000] bar"].exists() - assert not self.path["foo"].exists() + smart_move(self.path.joinpath("foo"), self.path.joinpath("bar")) + assert self.path.joinpath("[000] bar").exists() + assert not self.path.joinpath("foo").exists() def test_move_conflict_dest_is_dir(self, do_setup): - smart_move(self.path["foo"], self.path["dir"]) - smart_move(self.path["bar"], self.path["foo"]) - smart_move(self.path["foo"], self.path["dir"]) - assert self.path["dir"]["foo"].exists() - assert self.path["dir"]["[000] foo"].exists() - assert not self.path["foo"].exists() - assert not self.path["bar"].exists() + smart_move(self.path.joinpath("foo"), self.path.joinpath("dir")) + smart_move(self.path.joinpath("bar"), self.path.joinpath("foo")) + smart_move(self.path.joinpath("foo"), self.path.joinpath("dir")) + assert self.path.joinpath("dir", "foo").exists() + assert self.path.joinpath("dir", "[000] foo").exists() + assert not self.path.joinpath("foo").exists() + assert not self.path.joinpath("bar").exists() def test_copy_folder(self, tmpdir): # smart_copy also works on folders path = Path(str(tmpdir)) - path["foo"].mkdir() - path["bar"].mkdir() - smart_copy(path["foo"], path["bar"]) # no crash - assert path["[000] bar"].exists() + path.joinpath("foo").mkdir() + path.joinpath("bar").mkdir() + smart_copy(path.joinpath("foo"), path.joinpath("bar")) # no crash + assert path.joinpath("[000] bar").exists() diff --git a/hscommon/tests/path_test.py b/hscommon/tests/path_test.py index 806b85c5..bb89f3d0 100644 --- a/hscommon/tests/path_test.py +++ b/hscommon/tests/path_test.py @@ -6,261 +6,8 @@ # which should be included with this package. The terms are also available at # http://www.gnu.org/licenses/gpl-3.0.html -import sys -import os - -import pytest - -from ..path import Path, pathify -from ..testutil import eq_ - - -@pytest.fixture -def force_ossep(request): - monkeypatch = request.getfixturevalue("monkeypatch") - monkeypatch.setattr(os, "sep", "/") - - -def test_empty(force_ossep): - path = Path("") - eq_("", str(path)) - eq_(0, len(path)) - path = Path(()) - eq_("", str(path)) - eq_(0, len(path)) - - -def test_single(force_ossep): - path = Path("foobar") - eq_("foobar", path) - eq_(1, len(path)) - - -def test_multiple(force_ossep): - path = Path("foo/bar") - eq_("foo/bar", path) - eq_(2, len(path)) - - -def test_init_with_tuple_and_list(force_ossep): - path = Path(("foo", "bar")) - eq_("foo/bar", path) - path = Path(["foo", "bar"]) - eq_("foo/bar", path) - - -def test_init_with_invalid_value(force_ossep): - try: - Path(42) - assert False - except TypeError: - pass - - -def test_access(force_ossep): - path = Path("foo/bar/bleh") - eq_("foo", path[0]) - eq_("foo", path[-3]) - eq_("bar", path[1]) - eq_("bar", path[-2]) - eq_("bleh", path[2]) - eq_("bleh", path[-1]) - - -def test_slicing(force_ossep): - path = Path("foo/bar/bleh") - subpath = path[:2] - eq_("foo/bar", subpath) - assert isinstance(subpath, Path) - - -def test_parent(force_ossep): - path = Path("foo/bar/bleh") - subpath = path.parent() - eq_("foo/bar", subpath) - assert isinstance(subpath, Path) - - -def test_filename(force_ossep): - path = Path("foo/bar/bleh.ext") - eq_(path.name, "bleh.ext") - - -def test_deal_with_empty_components(force_ossep): - """Keep ONLY a leading space, which means we want a leading slash.""" - eq_("foo//bar", str(Path(("foo", "", "bar")))) - eq_("/foo/bar", str(Path(("", "foo", "bar")))) - eq_("foo/bar", str(Path("foo/bar/"))) - - -def test_old_compare_paths(force_ossep): - eq_(Path("foobar"), Path("foobar")) - eq_(Path("foobar/"), Path("foobar\\", "\\")) - eq_(Path("/foobar/"), Path("\\foobar\\", "\\")) - eq_(Path("/foo/bar"), Path("\\foo\\bar", "\\")) - eq_(Path("/foo/bar"), Path("\\foo\\bar\\", "\\")) - assert Path("/foo/bar") != Path("\\foo\\foo", "\\") - # We also have to test __ne__ - assert not (Path("foobar") != Path("foobar")) - assert Path("/a/b/c.x") != Path("/a/b/c.y") - - -def test_old_split_path(force_ossep): - eq_(Path("foobar"), ("foobar",)) - eq_(Path("foo/bar"), ("foo", "bar")) - eq_(Path("/foo/bar/"), ("", "foo", "bar")) - eq_(Path("\\foo\\bar", "\\"), ("", "foo", "bar")) - - -def test_representation(force_ossep): - eq_("('foo', 'bar')", repr(Path(("foo", "bar")))) - - -def test_add(force_ossep): - eq_("foo/bar/bar/foo", Path(("foo", "bar")) + Path("bar/foo")) - eq_("foo/bar/bar/foo", Path("foo/bar") + "bar/foo") - eq_("foo/bar/bar/foo", Path("foo/bar") + ("bar", "foo")) - eq_("foo/bar/bar/foo", ("foo", "bar") + Path("bar/foo")) - eq_("foo/bar/bar/foo", "foo/bar" + Path("bar/foo")) - # Invalid concatenation - try: - Path(("foo", "bar")) + 1 - assert False - except TypeError: - pass - - -def test_path_slice(force_ossep): - foo = Path("foo") - bar = Path("bar") - foobar = Path("foo/bar") - eq_("bar", foobar[foo:]) - eq_("foo", foobar[:bar]) - eq_("foo/bar", foobar[bar:]) - eq_("foo/bar", foobar[:foo]) - eq_((), foobar[foobar:]) - eq_((), foobar[:foobar]) - abcd = Path("a/b/c/d") - a = Path("a") - d = Path("d") - z = Path("z") - eq_("b/c", abcd[a:d]) - eq_("b/c/d", abcd[a : d + z]) - eq_("b/c", abcd[a : z + d]) - eq_("a/b/c/d", abcd[:z]) - - -def test_add_with_root_path(force_ossep): - """if I perform /a/b/c + /d/e/f, I want /a/b/c/d/e/f, not /a/b/c//d/e/f""" - eq_("/foo/bar", str(Path("/foo") + Path("/bar"))) - - -def test_create_with_tuple_that_have_slash_inside(force_ossep, monkeypatch): - eq_(("", "foo", "bar"), Path(("/foo", "bar"))) - monkeypatch.setattr(os, "sep", "\\") - eq_(("", "foo", "bar"), Path(("\\foo", "bar"))) - - -def test_auto_decode_os_sep(force_ossep, monkeypatch): - """Path should decode any either / or os.sep, but always encode in os.sep.""" - eq_(("foo\\bar", "bleh"), Path("foo\\bar/bleh")) - monkeypatch.setattr(os, "sep", "\\") - eq_(("foo", "bar/bleh"), Path("foo\\bar/bleh")) - path = Path("foo/bar") - eq_(("foo", "bar"), path) - eq_("foo\\bar", str(path)) - - -def test_contains(force_ossep): - p = Path(("foo", "bar")) - assert Path(("foo", "bar", "bleh")) in p - assert Path(("foo", "bar")) in p - assert "foo" in p - assert "bleh" not in p - assert Path("foo") not in p - - -def test_is_parent_of(force_ossep): - assert Path(("foo", "bar")).is_parent_of(Path(("foo", "bar", "bleh"))) - assert not Path(("foo", "bar")).is_parent_of(Path(("foo", "baz"))) - assert not Path(("foo", "bar")).is_parent_of(Path(("foo", "bar"))) - - -def test_windows_drive_letter(force_ossep): - p = Path(("c:",)) - eq_("c:\\", str(p)) - - -def test_root_path(force_ossep): - p = Path("/") - eq_("/", str(p)) - - -def test_str_encodes_unicode_to_getfilesystemencoding(force_ossep): - p = Path(("foo", "bar\u00e9")) - eq_("foo/bar\u00e9".encode(sys.getfilesystemencoding()), p.tobytes()) - - -def test_unicode(force_ossep): - p = Path(("foo", "bar\u00e9")) - eq_("foo/bar\u00e9", str(p)) - - -def test_str_repr_of_mix_between_non_ascii_str_and_unicode(force_ossep): - u = "foo\u00e9" - encoded = u.encode(sys.getfilesystemencoding()) - p = Path((encoded, "bar")) - print(repr(tuple(p))) - eq_("foo\u00e9/bar".encode(sys.getfilesystemencoding()), p.tobytes()) - - -def test_path_of_a_path_returns_self(force_ossep): - # if Path() is called with a path as value, just return value. - p = Path("foo/bar") - assert Path(p) is p - - -def test_getitem_str(force_ossep): - # path['something'] returns the child path corresponding to the name - p = Path("/foo/bar") - eq_(p["baz"], Path("/foo/bar/baz")) - - -def test_getitem_path(force_ossep): - # path[Path('something')] returns the child path corresponding to the name (or subpath) - p = Path("/foo/bar") - eq_(p[Path("baz/bleh")], Path("/foo/bar/baz/bleh")) - - -@pytest.mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate") -def test_log_unicode_errors(force_ossep, monkeypatch, capsys): - # When an there's a UnicodeDecodeError on path creation, log it so it can be possible - # to debug the cause of it. - monkeypatch.setattr(sys, "getfilesystemencoding", lambda: "ascii") - with pytest.raises(UnicodeDecodeError): - Path(["", b"foo\xe9"]) - out, err = capsys.readouterr() - assert repr(b"foo\xe9") in err - - -def test_has_drive_letter(monkeypatch): - monkeypatch.setattr(os, "sep", "\\") - p = Path("foo\\bar") - assert not p.has_drive_letter() - p = Path("C:\\") - assert p.has_drive_letter() - p = Path("z:\\foo") - assert p.has_drive_letter() - - -def test_remove_drive_letter(monkeypatch): - monkeypatch.setattr(os, "sep", "\\") - p = Path("foo\\bar") - eq_(p.remove_drive_letter(), Path("foo\\bar")) - p = Path("C:\\") - eq_(p.remove_drive_letter(), Path("")) - p = Path("z:\\foo") - eq_(p.remove_drive_letter(), Path("foo")) +from ..path import pathify +from pathlib import Path def test_pathify(): diff --git a/hscommon/tests/util_test.py b/hscommon/tests/util_test.py index c9f5fc07..86e4616e 100644 --- a/hscommon/tests/util_test.py +++ b/hscommon/tests/util_test.py @@ -11,7 +11,7 @@ from io import StringIO from pytest import raises from ..testutil import eq_ -from ..path import Path +from pathlib import Path from ..util import ( nonone, tryint, @@ -245,30 +245,30 @@ class TestCaseDeleteIfEmpty: def test_not_empty(self, tmpdir): testpath = Path(str(tmpdir)) - testpath["foo"].mkdir() + testpath.joinpath("foo").mkdir() assert not delete_if_empty(testpath) assert testpath.exists() def test_with_files_to_delete(self, tmpdir): testpath = Path(str(tmpdir)) - testpath["foo"].open("w") - testpath["bar"].open("w") + testpath.joinpath("foo").touch() + testpath.joinpath("bar").touch() assert delete_if_empty(testpath, ["foo", "bar"]) assert not testpath.exists() def test_directory_in_files_to_delete(self, tmpdir): testpath = Path(str(tmpdir)) - testpath["foo"].mkdir() + testpath.joinpath("foo").mkdir() assert not delete_if_empty(testpath, ["foo"]) assert testpath.exists() def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir): testpath = Path(str(tmpdir)) - testpath["foo"].open("w") - testpath["bar"].open("w") + testpath.joinpath("foo").touch() + testpath.joinpath("bar").touch() assert not delete_if_empty(testpath, ["foo"]) assert testpath.exists() - assert testpath["foo"].exists() + assert testpath.joinpath("foo").exists() def test_doesnt_exist(self): # When the 'path' doesn't exist, just do nothing. @@ -276,8 +276,8 @@ class TestCaseDeleteIfEmpty: def test_is_file(self, tmpdir): # When 'path' is a file, do nothing. - p = Path(str(tmpdir)) + "filename" - p.open("w").close() + p = Path(str(tmpdir)).joinpath("filename") + p.touch() delete_if_empty(p) # no crash def test_ioerror(self, tmpdir, monkeypatch): diff --git a/hscommon/util.py b/hscommon/util.py index 059470b7..22b023c3 100644 --- a/hscommon/util.py +++ b/hscommon/util.py @@ -15,7 +15,8 @@ import glob import shutil from datetime import timedelta -from .path import Path, pathify, log_io_error +from pathlib import Path +from .path import pathify, log_io_error def nonone(value, replace_value): @@ -354,13 +355,13 @@ def find_in_path(name, paths=None): @pathify def delete_if_empty(path: Path, files_to_delete=[]): """Deletes the directory at 'path' if it is empty or if it only contains files_to_delete.""" - if not path.exists() or not path.isdir(): + if not path.exists() or not path.is_dir(): return - contents = path.listdir() - if any(p for p in contents if (p.name not in files_to_delete) or p.isdir()): + contents = list(path.glob("*")) + if any(p for p in contents if (p.name not in files_to_delete) or p.is_dir()): return False for p in contents: - p.remove() + p.unlink() path.rmdir() return True