diff --git a/cocoa/inter/app_me.py b/cocoa/inter/app_me.py index 1988f82f..c6f89428 100644 --- a/cocoa/inter/app_me.py +++ b/cocoa/inter/app_me.py @@ -173,7 +173,7 @@ class DupeGuruME(DupeGuruBase): DupeGuruBase._do_delete_dupe(self, dupe, *args) def _create_file(self, path): - if (self.directories.itunes_libpath is not None) and (path in self.directories.itunes_libpath[:-1]): + if (self.directories.itunes_libpath is not None) and (path in self.directories.itunes_libpath.parent()): if not hasattr(self, 'itunes_songs'): songs = get_itunes_songs(self.directories.itunes_libpath) self.itunes_songs = {song.path: song for song in songs} diff --git a/cocoa/inter/app_pe.py b/cocoa/inter/app_pe.py index 18ea41e4..6a7326b6 100644 --- a/cocoa/inter/app_pe.py +++ b/cocoa/inter/app_pe.py @@ -12,9 +12,8 @@ import re from appscript import app, its, k, CommandError, ApplicationNotFoundError -from hscommon import io from hscommon.util import remove_invalid_xml, first -from hscommon.path import Path +from hscommon.path import Path, pathify from hscommon.trans import trget from cocoa import proxy @@ -76,11 +75,12 @@ class AperturePhoto(Photo): def display_folder_path(self): return APERTURE_PATH -def get_iphoto_or_aperture_pictures(plistpath, photo_class): +@pathify +def get_iphoto_or_aperture_pictures(plistpath: Path, photo_class): # The structure of iPhoto and Aperture libraries for the base photo list are excactly the same. - if not io.exists(plistpath): + if not plistpath.exists(): return [] - s = io.open(plistpath, 'rt', encoding='utf-8').read() + s = plistpath.open('rt', encoding='utf-8').read() # There was a case where a guy had 0x10 chars in his plist, causing expat errors on loading s = remove_invalid_xml(s, replace_with='') # It seems that iPhoto sometimes doesn't properly escape & chars. The regexp below is to find @@ -123,12 +123,12 @@ class Directories(directories.Directories): directories.Directories.__init__(self, fileclasses=[Photo]) try: self.iphoto_libpath = get_iphoto_database_path() - self.set_state(self.iphoto_libpath[:-1], directories.DirectoryState.Excluded) + self.set_state(self.iphoto_libpath.parent(), directories.DirectoryState.Excluded) except directories.InvalidPathError: self.iphoto_libpath = None try: self.aperture_libpath = get_aperture_database_path() - self.set_state(self.aperture_libpath[:-1], directories.DirectoryState.Excluded) + self.set_state(self.aperture_libpath.parent(), directories.DirectoryState.Excluded) except directories.InvalidPathError: self.aperture_libpath = None @@ -255,12 +255,12 @@ class DupeGuruPE(DupeGuruBase): DupeGuruBase._do_delete_dupe(self, dupe, *args) def _create_file(self, path): - if (self.directories.iphoto_libpath is not None) and (path in self.directories.iphoto_libpath[:-1]): + if (self.directories.iphoto_libpath is not None) and (path in self.directories.iphoto_libpath.parent()): if not hasattr(self, 'path2iphoto'): photos = get_iphoto_pictures(self.directories.iphoto_libpath) self.path2iphoto = {p.path: p for p in photos} return self.path2iphoto.get(path) - if (self.directories.aperture_libpath is not None) and (path in self.directories.aperture_libpath[:-1]): + if (self.directories.aperture_libpath is not None) and (path in self.directories.aperture_libpath.parent()): if not hasattr(self, 'path2aperture'): photos = get_aperture_pictures(self.directories.aperture_libpath) self.path2aperture = {p.path: p for p in photos} diff --git a/cocoa/inter/app_se.py b/cocoa/inter/app_se.py index 7b08bc08..9a22b6b7 100644 --- a/cocoa/inter/app_se.py +++ b/cocoa/inter/app_se.py @@ -9,8 +9,7 @@ import logging import os.path as op -from hscommon import io -from hscommon.path import Path +from hscommon.path import Path, pathify from cocoa import proxy from core.scanner import ScanType @@ -27,8 +26,9 @@ def is_bundle(str_path): class Bundle(fs.Folder): @classmethod - def can_handle(cls, path): - return not io.islink(path) and io.isdir(path) and is_bundle(str(path)) + @pathify + def can_handle(cls, path: Path): + return not path.islink() and path.isdir() and is_bundle(str(path)) class Directories(DirectoriesBase): diff --git a/core/app.py b/core/app.py index d7f9bc85..4ccd4c86 100644 --- a/core/app.py +++ b/core/app.py @@ -232,7 +232,7 @@ class DupeGuru(RegistrableApplication, Broadcaster): ref = group.ref linkfunc = os.link if use_hardlinks else os.symlink linkfunc(str(ref.path), str_path) - self.clean_empty_dirs(dupe.path[:-1]) + self.clean_empty_dirs(dupe.path.parent()) def _create_file(self, path): # We add fs.Folder to fileclasses in case the file we're loading contains folder paths. @@ -375,7 +375,7 @@ class DupeGuru(RegistrableApplication, Broadcaster): def clean_empty_dirs(self, path): if self.options['clean_empty_dirs']: while delete_if_empty(path, ['.DS_Store']): - path = path[:-1] + path = path.parent() def copy_or_move(self, dupe, copy: bool, destination: str, dest_type: DestType): source_path = dupe.path @@ -383,21 +383,21 @@ class DupeGuru(RegistrableApplication, Broadcaster): dest_path = Path(destination) if dest_type in {DestType.Relative, DestType.Absolute}: # no filename, no windows drive letter - source_base = source_path.remove_drive_letter()[:-1] + source_base = source_path.remove_drive_letter().parent() if dest_type == DestType.Relative: source_base = source_base[location_path:] - dest_path = dest_path + source_base + dest_path = dest_path[source_base] if not dest_path.exists(): dest_path.makedirs() # Add filename to dest_path. For file move/copy, it's not required, but for folders, yes. - dest_path = dest_path + source_path[-1] + dest_path = dest_path[source_path.name] logging.debug("Copy/Move operation from '%s' to '%s'", source_path, dest_path) # Raises an EnvironmentError if there's a problem if copy: smart_copy(source_path, dest_path) else: smart_move(source_path, dest_path) - self.clean_empty_dirs(source_path[:-1]) + self.clean_empty_dirs(source_path.parent()) def copy_or_move_marked(self, copy): """Start an async move (or copy) job on marked duplicates. diff --git a/core/directories.py b/core/directories.py index fa605dec..6b84295b 100644 --- a/core/directories.py +++ b/core/directories.py @@ -73,7 +73,7 @@ class Directories: #---Private def _default_state_for_path(self, path): # Override this in subclasses to specify the state of some special folders. - if path[-1].startswith('.'): # hidden + if path.name.startswith('.'): # hidden return DirectoryState.Excluded def _get_files(self, from_path, j): @@ -94,9 +94,8 @@ class Directories: file.is_ref = state == DirectoryState.Reference filepaths.add(file.path) yield file - subpaths = [from_path + name for name in from_path.listdir()] # it's possible that a folder (bundle) gets into the file list. in that case, we don't want to recurse into it - subfolders = [p for p in subpaths if not p.islink() and p.isdir() and p not in filepaths] + subfolders = [p for p in from_path.listdir() if not p.islink() and p.isdir() and p not in filepaths] for subfolder in subfolders: for file in self._get_files(subfolder, j): yield file @@ -143,9 +142,9 @@ class Directories: :rtype: list of Path """ try: - names = [name for name in path.listdir() if (path + name).isdir()] - names.sort(key=lambda x:x.lower()) - return [path + name for name in names] + subpaths = [p for p in path.listdir() if p.isdir()] + subpaths.sort(key=lambda x:x.name.lower()) + return subpaths except EnvironmentError: return [] @@ -178,7 +177,7 @@ class Directories: default_state = self._default_state_for_path(path) if default_state is not None: return default_state - parent = path[:-1] + parent = path.parent() if parent in self: return self.get_state(parent) else: diff --git a/core/fs.py b/core/fs.py index a0330e32..fddcd411 100644 --- a/core/fs.py +++ b/core/fs.py @@ -150,9 +150,9 @@ class File: def rename(self, newname): if newname == self.name: return - destpath = self.path[:-1] + newname + destpath = self.path.parent()[newname] if destpath.exists(): - raise AlreadyExistsError(newname, self.path[:-1]) + raise AlreadyExistsError(newname, self.path.parent()) try: self.path.rename(destpath) except EnvironmentError: @@ -173,11 +173,11 @@ class File: @property def name(self): - return self.path[-1] + return self.path.name @property def folder_path(self): - return self.path[:-1] + return self.path.parent() class Folder(File): @@ -219,8 +219,7 @@ class Folder(File): @property def subfolders(self): if self._subfolders is None: - subpaths = [self.path + name for name in self.path.listdir()] - subfolders = [p for p in subpaths if not p.islink() and p.isdir()] + subfolders = [p for p in self.path.listdir() if not p.islink() and p.isdir()] self._subfolders = [self.__class__(p) for p in subfolders] return self._subfolders @@ -248,18 +247,9 @@ def get_files(path, fileclasses=[File]): :param fileclasses: List of candidate :class:`File` classes """ assert all(issubclass(fileclass, File) for fileclass in fileclasses) - def combine_paths(p1, p2): - try: - return p1 + p2 - except Exception: - # This is temporary debug logging for #84. - logging.warning("Failed to combine %r and %r.", p1, p2) - raise - try: - paths = [combine_paths(path, name) for name in path.listdir()] result = [] - for path in paths: + for path in path.listdir(): file = get_file(path, fileclasses=fileclasses) if file is not None: result.append(file) diff --git a/core/gui/directory_tree.py b/core/gui/directory_tree.py index 4d7c142f..5e0d281a 100644 --- a/core/gui/directory_tree.py +++ b/core/gui/directory_tree.py @@ -31,7 +31,7 @@ class DirectoryNode(Node): self.clear() subpaths = self._tree.app.directories.get_subfolders(self._directory_path) for path in subpaths: - self.append(DirectoryNode(self._tree, path, path[-1])) + self.append(DirectoryNode(self._tree, path, path.name)) self._loaded = True def update_all_states(self): diff --git a/core/tests/app_test.py b/core/tests/app_test.py index ee90ba15..0d07e225 100644 --- a/core/tests/app_test.py +++ b/core/tests/app_test.py @@ -11,7 +11,6 @@ import os.path as op import logging from pytest import mark -from hscommon import io from hscommon.path import Path import hscommon.conflict import hscommon.util @@ -57,7 +56,7 @@ class TestCaseDupeGuru: # for this unit is pathetic. What's done is done. My approach now is to add tests for # every change I want to make. The blowup was caused by a missing import. p = Path(str(tmpdir)) - io.open(p + 'foo', 'w').close() + p['foo'].open('w').close() monkeypatch.setattr(hscommon.conflict, 'smart_copy', log_calls(lambda source_path, dest_path: None)) # XXX This monkeypatch is temporary. will be fixed in a better monkeypatcher. monkeypatch.setattr(app, 'smart_copy', hscommon.conflict.smart_copy) @@ -73,14 +72,14 @@ class TestCaseDupeGuru: def test_copy_or_move_clean_empty_dirs(self, tmpdir, monkeypatch): tmppath = Path(str(tmpdir)) - sourcepath = tmppath + 'source' - io.mkdir(sourcepath) - io.open(sourcepath + 'myfile', 'w') + sourcepath = tmppath['source'] + sourcepath.mkdir() + sourcepath['myfile'].open('w') app = TestApp().app app.directories.add_path(tmppath) [myfile] = app.directories.get_files() monkeypatch.setattr(app, 'clean_empty_dirs', log_calls(lambda path: None)) - app.copy_or_move(myfile, False, tmppath + 'dest', 0) + app.copy_or_move(myfile, False, tmppath['dest'], 0) calls = app.clean_empty_dirs.calls eq_(1, len(calls)) eq_(sourcepath, calls[0]['path']) @@ -104,8 +103,8 @@ class TestCaseDupeGuru: # If the ignore_hardlink_matches option is set, don't match files hardlinking to the same # inode. tmppath = Path(str(tmpdir)) - io.open(tmppath + 'myfile', 'w').write('foo') - os.link(str(tmppath + 'myfile'), str(tmppath + 'hardlink')) + tmppath['myfile'].open('w').write('foo') + os.link(str(tmppath['myfile']), str(tmppath['hardlink'])) app = TestApp().app app.directories.add_path(tmppath) app.scanner.scan_type = ScanType.Contents @@ -171,8 +170,8 @@ class TestCaseDupeGuruWithResults: self.rtable.refresh() tmpdir = request.getfuncargvalue('tmpdir') tmppath = Path(str(tmpdir)) - io.mkdir(tmppath + 'foo') - io.mkdir(tmppath + 'bar') + tmppath['foo'].mkdir() + tmppath['bar'].mkdir() self.app.directories.add_path(tmppath) def test_GetObjects(self, do_setup): @@ -417,11 +416,11 @@ class TestCaseDupeGuru_renameSelected: def pytest_funcarg__do_setup(self, request): tmpdir = request.getfuncargvalue('tmpdir') p = Path(str(tmpdir)) - fp = open(str(p + 'foo bar 1'),mode='w') + fp = open(str(p['foo bar 1']),mode='w') fp.close() - fp = open(str(p + 'foo bar 2'),mode='w') + fp = open(str(p['foo bar 2']),mode='w') fp.close() - fp = open(str(p + 'foo bar 3'),mode='w') + fp = open(str(p['foo bar 3']),mode='w') fp.close() files = fs.get_files(p) for f in files: @@ -444,7 +443,7 @@ class TestCaseDupeGuru_renameSelected: g = self.groups[0] self.rtable.select([1]) assert app.rename_selected('renamed') - names = io.listdir(self.p) + names = [p.name for p in self.p.listdir()] assert 'renamed' in names assert 'foo bar 2' not in names eq_(g.dupes[0].name, 'renamed') @@ -457,7 +456,7 @@ class TestCaseDupeGuru_renameSelected: assert not app.rename_selected('renamed') msg = logging.warning.calls[0]['msg'] eq_('dupeGuru Warning: list index out of range', msg) - names = io.listdir(self.p) + names = [p.name for p in self.p.listdir()] assert 'renamed' not in names assert 'foo bar 2' in names eq_(g.dupes[0].name, 'foo bar 2') @@ -470,7 +469,7 @@ class TestCaseDupeGuru_renameSelected: assert not app.rename_selected('foo bar 1') msg = logging.warning.calls[0]['msg'] assert msg.startswith('dupeGuru Warning: \'foo bar 1\' already exists in') - names = io.listdir(self.p) + names = [p.name for p in self.p.listdir()] assert 'foo bar 1' in names assert 'foo bar 2' in names eq_(g.dupes[0].name, 'foo bar 2') @@ -480,9 +479,9 @@ class TestAppWithDirectoriesInTree: def pytest_funcarg__do_setup(self, request): tmpdir = request.getfuncargvalue('tmpdir') p = Path(str(tmpdir)) - io.mkdir(p + 'sub1') - io.mkdir(p + 'sub2') - io.mkdir(p + 'sub3') + p['sub1'].mkdir() + p['sub2'].mkdir() + p['sub3'].mkdir() app = TestApp() self.app = app.app self.dtree = app.dtree diff --git a/core/tests/base.py b/core/tests/base.py index 9874cd5a..0b45e29f 100644 --- a/core/tests/base.py +++ b/core/tests/base.py @@ -102,11 +102,11 @@ class NamedObject: @property def path(self): - return self._folder + self.name + return self._folder[self.name] @property def folder_path(self): - return self.path[:-1] + return self.path.parent() @property def extension(self): diff --git a/core/tests/directories_test.py b/core/tests/directories_test.py index d5e05796..0c789d46 100644 --- a/core/tests/directories_test.py +++ b/core/tests/directories_test.py @@ -12,7 +12,6 @@ import tempfile import shutil from pytest import raises -from hscommon import io from hscommon.path import Path from hscommon.testutil import eq_ @@ -20,27 +19,27 @@ from ..directories import * def create_fake_fs(rootpath): # We have it as a separate function because other units are using it. - rootpath = rootpath + 'fs' - io.mkdir(rootpath) - io.mkdir(rootpath + 'dir1') - io.mkdir(rootpath + 'dir2') - io.mkdir(rootpath + 'dir3') - fp = io.open(rootpath + 'file1.test', 'w') + rootpath = rootpath['fs'] + rootpath.mkdir() + rootpath['dir1'].mkdir() + rootpath['dir2'].mkdir() + rootpath['dir3'].mkdir() + fp = rootpath['file1.test'].open('w') fp.write('1') fp.close() - fp = io.open(rootpath + 'file2.test', 'w') + fp = rootpath['file2.test'].open('w') fp.write('12') fp.close() - fp = io.open(rootpath + 'file3.test', 'w') + fp = rootpath['file3.test'].open('w') fp.write('123') fp.close() - fp = io.open(rootpath + ('dir1', 'file1.test'), 'w') + fp = rootpath['dir1']['file1.test'].open('w') fp.write('1') fp.close() - fp = io.open(rootpath + ('dir2', 'file2.test'), 'w') + fp = rootpath['dir2']['file2.test'].open('w') fp.write('12') fp.close() - fp = io.open(rootpath + ('dir3', 'file3.test'), 'w') + fp = rootpath['dir3']['file3.test'].open('w') fp.write('123') fp.close() return rootpath @@ -50,9 +49,9 @@ def setup_module(module): # and another with a more complex structure. testpath = Path(tempfile.mkdtemp()) module.testpath = testpath - rootpath = testpath + 'onefile' - io.mkdir(rootpath) - fp = io.open(rootpath + 'test.txt', 'w') + rootpath = testpath['onefile'] + rootpath.mkdir() + fp = rootpath['test.txt'].open('w') fp.write('test_data') fp.close() create_fake_fs(testpath) @@ -67,30 +66,30 @@ def test_empty(): def test_add_path(): d = Directories() - p = testpath + 'onefile' + p = testpath['onefile'] d.add_path(p) eq_(1,len(d)) assert p in d - assert (p + 'foobar') in d - assert p[:-1] not in d - p = testpath + 'fs' + assert (p['foobar']) in d + assert p.parent() not in d + p = testpath['fs'] d.add_path(p) eq_(2,len(d)) assert p in d def test_AddPath_when_path_is_already_there(): d = Directories() - p = testpath + 'onefile' + p = testpath['onefile'] d.add_path(p) with raises(AlreadyThereError): d.add_path(p) with raises(AlreadyThereError): - d.add_path(p + 'foobar') + d.add_path(p['foobar']) eq_(1, len(d)) def test_add_path_containing_paths_already_there(): d = Directories() - d.add_path(testpath + 'onefile') + d.add_path(testpath['onefile']) eq_(1, len(d)) d.add_path(testpath) eq_(len(d), 1) @@ -98,7 +97,7 @@ def test_add_path_containing_paths_already_there(): def test_AddPath_non_latin(tmpdir): p = Path(str(tmpdir)) - to_add = p + 'unicode\u201a' + to_add = p['unicode\u201a'] os.mkdir(str(to_add)) d = Directories() try: @@ -108,24 +107,24 @@ def test_AddPath_non_latin(tmpdir): def test_del(): d = Directories() - d.add_path(testpath + 'onefile') + d.add_path(testpath['onefile']) try: del d[1] assert False except IndexError: pass - d.add_path(testpath + 'fs') + d.add_path(testpath['fs']) del d[1] eq_(1, len(d)) def test_states(): d = Directories() - p = testpath + 'onefile' + p = testpath['onefile'] d.add_path(p) eq_(DirectoryState.Normal ,d.get_state(p)) d.set_state(p, DirectoryState.Reference) eq_(DirectoryState.Reference ,d.get_state(p)) - eq_(DirectoryState.Reference ,d.get_state(p + 'dir1')) + eq_(DirectoryState.Reference ,d.get_state(p['dir1'])) eq_(1,len(d.states)) eq_(p,list(d.states.keys())[0]) eq_(DirectoryState.Reference ,d.states[p]) @@ -133,67 +132,67 @@ def test_states(): def test_get_state_with_path_not_there(): # When the path's not there, just return DirectoryState.Normal d = Directories() - d.add_path(testpath + 'onefile') + d.add_path(testpath['onefile']) eq_(d.get_state(testpath), DirectoryState.Normal) def test_states_remain_when_larger_directory_eat_smaller_ones(): d = Directories() - p = testpath + 'onefile' + p = testpath['onefile'] d.add_path(p) d.set_state(p, DirectoryState.Excluded) d.add_path(testpath) d.set_state(testpath, DirectoryState.Reference) eq_(DirectoryState.Excluded ,d.get_state(p)) - eq_(DirectoryState.Excluded ,d.get_state(p + 'dir1')) + eq_(DirectoryState.Excluded ,d.get_state(p['dir1'])) eq_(DirectoryState.Reference ,d.get_state(testpath)) def test_set_state_keep_state_dict_size_to_minimum(): d = Directories() - p = testpath + 'fs' + p = testpath['fs'] d.add_path(p) d.set_state(p, DirectoryState.Reference) - d.set_state(p + 'dir1', DirectoryState.Reference) + d.set_state(p['dir1'], DirectoryState.Reference) eq_(1,len(d.states)) - eq_(DirectoryState.Reference ,d.get_state(p + 'dir1')) - d.set_state(p + 'dir1', DirectoryState.Normal) + eq_(DirectoryState.Reference ,d.get_state(p['dir1'])) + d.set_state(p['dir1'], DirectoryState.Normal) eq_(2,len(d.states)) - eq_(DirectoryState.Normal ,d.get_state(p + 'dir1')) - d.set_state(p + 'dir1', DirectoryState.Reference) + eq_(DirectoryState.Normal ,d.get_state(p['dir1'])) + d.set_state(p['dir1'], DirectoryState.Reference) eq_(1,len(d.states)) - eq_(DirectoryState.Reference ,d.get_state(p + 'dir1')) + eq_(DirectoryState.Reference ,d.get_state(p['dir1'])) def test_get_files(): d = Directories() - p = testpath + 'fs' + p = testpath['fs'] d.add_path(p) - d.set_state(p + 'dir1', DirectoryState.Reference) - d.set_state(p + 'dir2', DirectoryState.Excluded) + d.set_state(p['dir1'], DirectoryState.Reference) + d.set_state(p['dir2'], DirectoryState.Excluded) files = list(d.get_files()) eq_(5, len(files)) for f in files: - if f.path[:-1] == p + 'dir1': + if f.path.parent() == p['dir1']: assert f.is_ref else: assert not f.is_ref def test_get_folders(): d = Directories() - p = testpath + 'fs' + p = testpath['fs'] d.add_path(p) - d.set_state(p + 'dir1', DirectoryState.Reference) - d.set_state(p + 'dir2', DirectoryState.Excluded) + d.set_state(p['dir1'], DirectoryState.Reference) + d.set_state(p['dir2'], DirectoryState.Excluded) folders = list(d.get_folders()) eq_(len(folders), 3) ref = [f for f in folders if f.is_ref] not_ref = [f for f in folders if not f.is_ref] eq_(len(ref), 1) - eq_(ref[0].path, p + 'dir1') + eq_(ref[0].path, p['dir1']) eq_(len(not_ref), 2) eq_(ref[0].size, 1) def test_get_files_with_inherited_exclusion(): d = Directories() - p = testpath + 'onefile' + p = testpath['onefile'] d.add_path(p) d.set_state(p, DirectoryState.Excluded) eq_([], list(d.get_files())) @@ -202,19 +201,19 @@ def test_save_and_load(tmpdir): d1 = Directories() d2 = Directories() p1 = Path(str(tmpdir.join('p1'))) - io.mkdir(p1) + p1.mkdir() p2 = Path(str(tmpdir.join('p2'))) - io.mkdir(p2) + p2.mkdir() d1.add_path(p1) d1.add_path(p2) d1.set_state(p1, DirectoryState.Reference) - d1.set_state(p1 + 'dir1', DirectoryState.Excluded) + d1.set_state(p1['dir1'], DirectoryState.Excluded) tmpxml = str(tmpdir.join('directories_testunit.xml')) d1.save_to_file(tmpxml) d2.load_from_file(tmpxml) eq_(2, len(d2)) eq_(DirectoryState.Reference ,d2.get_state(p1)) - eq_(DirectoryState.Excluded ,d2.get_state(p1 + 'dir1')) + eq_(DirectoryState.Excluded ,d2.get_state(p1['dir1'])) def test_invalid_path(): d = Directories() @@ -234,12 +233,12 @@ def test_load_from_file_with_invalid_path(tmpdir): #This test simulates a load from file resulting in a #InvalidPath raise. Other directories must be loaded. d1 = Directories() - d1.add_path(testpath + 'onefile') + d1.add_path(testpath['onefile']) #Will raise InvalidPath upon loading p = Path(str(tmpdir.join('toremove'))) - io.mkdir(p) + p.mkdir() d1.add_path(p) - io.rmdir(p) + p.rmdir() tmpxml = str(tmpdir.join('directories_testunit.xml')) d1.save_to_file(tmpxml) d2 = Directories() @@ -248,11 +247,11 @@ def test_load_from_file_with_invalid_path(tmpdir): def test_unicode_save(tmpdir): d = Directories() - p1 = Path(str(tmpdir)) + 'hello\xe9' - io.mkdir(p1) - io.mkdir(p1 + 'foo\xe9') + p1 = Path(str(tmpdir))['hello\xe9'] + p1.mkdir() + p1['foo\xe9'].mkdir() d.add_path(p1) - d.set_state(p1 + 'foo\xe9', DirectoryState.Excluded) + d.set_state(p1['foo\xe9'], DirectoryState.Excluded) tmpxml = str(tmpdir.join('directories_testunit.xml')) try: d.save_to_file(tmpxml) @@ -261,12 +260,12 @@ def test_unicode_save(tmpdir): def test_get_files_refreshes_its_directories(): d = Directories() - p = testpath + 'fs' + p = testpath['fs'] d.add_path(p) files = d.get_files() eq_(6, len(list(files))) time.sleep(1) - os.remove(str(p + ('dir1','file1.test'))) + os.remove(str(p['dir1']['file1.test'])) files = d.get_files() eq_(5, len(list(files))) @@ -274,14 +273,14 @@ def test_get_files_does_not_choke_on_non_existing_directories(tmpdir): d = Directories() p = Path(str(tmpdir)) d.add_path(p) - io.rmtree(p) + p.rmtree() eq_([], list(d.get_files())) def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir): d = Directories() p = Path(str(tmpdir)) - hidden_dir_path = p + '.foo' - io.mkdir(p + '.foo') + hidden_dir_path = p['.foo'] + p['.foo'].mkdir() d.add_path(p) eq_(d.get_state(hidden_dir_path), DirectoryState.Excluded) # But it can be overriden @@ -297,16 +296,16 @@ def test_default_path_state_override(tmpdir): d = MyDirectories() p1 = Path(str(tmpdir)) - io.mkdir(p1 + 'foobar') - io.open(p1 + 'foobar/somefile', 'w').close() - io.mkdir(p1 + 'foobaz') - io.open(p1 + 'foobaz/somefile', 'w').close() + p1['foobar'].mkdir() + p1['foobar/somefile'].open('w').close() + p1['foobaz'].mkdir() + p1['foobaz/somefile'].open('w').close() d.add_path(p1) - eq_(d.get_state(p1 + 'foobaz'), DirectoryState.Normal) - eq_(d.get_state(p1 + 'foobar'), DirectoryState.Excluded) + eq_(d.get_state(p1['foobaz']), DirectoryState.Normal) + eq_(d.get_state(p1['foobar']), DirectoryState.Excluded) eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there # However, the default state can be changed - d.set_state(p1 + 'foobar', DirectoryState.Normal) - eq_(d.get_state(p1 + 'foobar'), DirectoryState.Normal) + d.set_state(p1['foobar'], DirectoryState.Normal) + eq_(d.get_state(p1['foobar']), DirectoryState.Normal) eq_(len(list(d.get_files())), 2) diff --git a/core/tests/fs_test.py b/core/tests/fs_test.py index f4186cec..059e2020 100644 --- a/core/tests/fs_test.py +++ b/core/tests/fs_test.py @@ -25,12 +25,12 @@ def test_md5_aggregate_subfiles_sorted(tmpdir): #same order everytime. p = create_fake_fs(Path(str(tmpdir))) b = fs.Folder(p) - md51 = fs.File(p + ('dir1', 'file1.test')).md5 - md52 = fs.File(p + ('dir2', 'file2.test')).md5 - md53 = fs.File(p + ('dir3', 'file3.test')).md5 - md54 = fs.File(p + 'file1.test').md5 - md55 = fs.File(p + 'file2.test').md5 - md56 = fs.File(p + 'file3.test').md5 + md51 = fs.File(p['dir1']['file1.test']).md5 + md52 = fs.File(p['dir2']['file2.test']).md5 + md53 = fs.File(p['dir3']['file3.test']).md5 + md54 = fs.File(p['file1.test']).md5 + md55 = fs.File(p['file2.test']).md5 + md56 = fs.File(p['file3.test']).md5 # The expected md5 is the md5 of md5s for folders and the direct md5 for files folder_md51 = hashlib.md5(md51).digest() folder_md52 = hashlib.md5(md52).digest() diff --git a/core/tests/scanner_test.py b/core/tests/scanner_test.py index 08594282..a2c322a1 100644 --- a/core/tests/scanner_test.py +++ b/core/tests/scanner_test.py @@ -7,7 +7,6 @@ # http://www.hardcoded.net/licenses/bsd_license from jobprogress import job -from hscommon import io from hscommon.path import Path from hscommon.testutil import eq_ @@ -21,7 +20,7 @@ class NamedObject: if path is None: path = Path(name) else: - path = Path(path) + name + path = Path(path)[name] self.name = name self.size = size self.path = path @@ -37,7 +36,6 @@ def pytest_funcarg__fake_fileexists(request): # This is a hack to avoid invalidating all previous tests since the scanner started to test # for file existence before doing the match grouping. monkeypatch = request.getfuncargvalue('monkeypatch') - monkeypatch.setattr(io, 'exists', lambda _: True) monkeypatch.setattr(Path, 'exists', lambda _: True) def test_empty(fake_fileexists): @@ -471,11 +469,11 @@ def test_dont_group_files_that_dont_exist(tmpdir): s = Scanner() s.scan_type = ScanType.Contents p = Path(str(tmpdir)) - io.open(p + 'file1', 'w').write('foo') - io.open(p + 'file2', 'w').write('foo') + p['file1'].open('w').write('foo') + p['file2'].open('w').write('foo') file1, file2 = fs.get_files(p) def getmatches(*args, **kw): - io.remove(file2.path) + file2.path.remove() return [Match(file1, file2, 100)] s._getmatches = getmatches diff --git a/core_me/fs.py b/core_me/fs.py index 528f060d..27314628 100644 --- a/core_me/fs.py +++ b/core_me/fs.py @@ -36,7 +36,7 @@ class MusicFile(fs.File): def can_handle(cls, path): if not fs.File.can_handle(path): return False - return get_file_ext(path[-1]) in auto.EXT2CLASS + return get_file_ext(path.name) in auto.EXT2CLASS def get_display_info(self, group, delta): size = self.size diff --git a/core_pe/photo.py b/core_pe/photo.py index 277ad535..09185c92 100644 --- a/core_pe/photo.py +++ b/core_pe/photo.py @@ -60,7 +60,7 @@ class Photo(fs.File): @classmethod def can_handle(cls, path): - return fs.File.can_handle(path) and get_file_ext(path[-1]) in cls.HANDLED_EXTS + return fs.File.can_handle(path) and get_file_ext(path.name) in cls.HANDLED_EXTS def get_display_info(self, group, delta): size = self.size diff --git a/hscommon/conflict.py b/hscommon/conflict.py index 7eeb76cb..019f0c81 100644 --- a/hscommon/conflict.py +++ b/hscommon/conflict.py @@ -7,7 +7,10 @@ # http://www.hardcoded.net/licenses/bsd_license import re -from . import io +import os +import shutil + +from .path import Path, pathify #This matches [123], but not [12] (3 digits being the minimum). #It also matches [1234] [12345] etc.. @@ -36,27 +39,28 @@ def get_unconflicted_name(name): def is_conflicted(name): return re_conflict.match(name) is not None -def _smart_move_or_copy(operation, source_path, dest_path): +@pathify +def _smart_move_or_copy(operation, source_path: Path, dest_path: Path): ''' Use move() or copy() to move and copy file with the conflict management, but without the slowness of the fs system. ''' - if io.isdir(dest_path) and not io.isdir(source_path): - dest_path = dest_path + source_path[-1] - if io.exists(dest_path): - filename = dest_path[-1] - dest_dir_path = dest_path[:-1] - newname = get_conflicted_name(io.listdir(dest_dir_path), filename) - dest_path = dest_dir_path + newname - operation(source_path, dest_path) + if dest_path.isdir() and not source_path.isdir(): + dest_path = dest_path[source_path.name] + if dest_path.exists(): + filename = dest_path.name + dest_dir_path = dest_path.parent() + newname = get_conflicted_name(os.listdir(str(dest_dir_path)), filename) + dest_path = dest_dir_path[newname] + operation(str(source_path), str(dest_path)) def smart_move(source_path, dest_path): - _smart_move_or_copy(io.move, source_path, dest_path) + _smart_move_or_copy(shutil.move, source_path, dest_path) def smart_copy(source_path, dest_path): try: - _smart_move_or_copy(io.copy, source_path, dest_path) + _smart_move_or_copy(shutil.copy, source_path, dest_path) except IOError as e: if e.errno in {21, 13}: # it's a directory, code is 21 on OS X / Linux and 13 on Windows - _smart_move_or_copy(io.copytree, source_path, dest_path) + _smart_move_or_copy(shutil.copytree, source_path, dest_path) else: raise \ No newline at end of file diff --git a/hscommon/currency.py b/hscommon/currency.py index e0b7a354..a4644505 100644 --- a/hscommon/currency.py +++ b/hscommon/currency.py @@ -6,13 +6,13 @@ # which should be included with this package. The terms are also available at # http://www.hardcoded.net/licenses/bsd_license +import os from datetime import datetime, date, timedelta import logging import sqlite3 as sqlite import threading from queue import Queue, Empty -from . import io from .path import Path from .util import iterdaterange @@ -271,6 +271,9 @@ EUR = Currency(code='EUR') class CurrencyNotSupportedException(Exception): """The current exchange rate provider doesn't support the requested currency.""" +class RateProviderUnavailable(Exception): + """The rate provider is temporarily unavailable.""" + def date2str(date): return '%d%02d%02d' % (date.year, date.month, date.day) @@ -314,7 +317,7 @@ class RatesDB: logging.warning("Corrupt currency database at {0}. Starting over.".format(repr(self.db_or_path))) if isinstance(self.db_or_path, (str, Path)): self.con.close() - io.remove(Path(self.db_or_path)) + os.remove(str(self.db_or_path)) self.con = sqlite.connect(str(self.db_or_path)) else: logging.warning("Can't re-use the file, using a memory table") @@ -452,11 +455,19 @@ class RatesDB: values = rate_provider(currency, fetch_start, fetch_end) except CurrencyNotSupportedException: continue + except RateProviderUnavailable: + logging.debug("Fetching failed due to temporary problems.") + break else: - if values: - self._fetched_values.put((values, currency, fetch_start, fetch_end)) - logging.debug("Fetching successful!") - break + if not values: + # We didn't get any value from the server, which means that we asked for + # rates that couldn't be delivered. Still, we report empty values so + # that the cache can correctly remember this unavailability so that we + # don't repeatedly fetch those ranges. + values = [] + self._fetched_values.put((values, currency, fetch_start, fetch_end)) + logging.debug("Fetching successful!") + break else: logging.debug("Fetching failed!") diff --git a/hscommon/desktop.py b/hscommon/desktop.py index 581a5963..745e115b 100644 --- a/hscommon/desktop.py +++ b/hscommon/desktop.py @@ -7,6 +7,7 @@ # http://www.hardcoded.net/licenses/bsd_license import os.path as op +import logging class SpecialFolder: AppData = 1 @@ -38,7 +39,13 @@ def special_folder_path(special_folder, appname=None): return _special_folder_path(special_folder, appname) try: - from cocoa import proxy + # Normally, we would simply do "from cocoa import proxy", but due to a bug in pytest (currently + # at v2.4.2), our test suite is broken when we do that. This below is a workaround until that + # bug is fixed. + import cocoa + if not hasattr(cocoa, 'proxy'): + raise ImportError() + proxy = cocoa.proxy _open_url = proxy.openURL_ _open_path = proxy.openPath_ _reveal_path = proxy.revealPath_ @@ -71,4 +78,14 @@ except ImportError: return str(QDesktopServices.storageLocation(qtfolder)) except ImportError: - raise Exception("Can't setup desktop functions!") + # We're either running tests, and these functions don't matter much or we're in a really + # weird situation. Let's just have dummy fallbacks. + logging.warning("Can't setup desktop functions!") + def _open_path(path): + pass + + def _reveal_path(path): + pass + + def _special_folder_path(special_folder, appname=None): + return '/tmp' diff --git a/hscommon/io.py b/hscommon/io.py deleted file mode 100644 index dd0fb4f4..00000000 --- a/hscommon/io.py +++ /dev/null @@ -1,79 +0,0 @@ -# Created By: Virgil Dupras -# Created On: 2007-10-23 -# Copyright 2013 Hardcoded Software (http://www.hardcoded.net) - -# This software is licensed under the "BSD" License as described in the "LICENSE" file, -# which should be included with this package. The terms are also available at -# http://www.hardcoded.net/licenses/bsd_license - -# HS code should only deal with Path instances, not string paths. One of the annoyances of this -# is to always have to convert Path instances with unicode() when calling open() or listdir() etc.. -# this unit takes care of this - -import builtins -import os -import os.path -import shutil -import logging - -def log_io_error(func): - """ Catches OSError, IOError and WindowsError and log them - """ - def wrapper(path, *args, **kwargs): - try: - return func(path, *args, **kwargs) - except (IOError, OSError) as e: - msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"' - classname = e.__class__.__name__ - funcname = func.__name__ - logging.warn(msg.format(classname, funcname, str(path), str(e))) - - return wrapper - -def copy(source_path, dest_path): - return shutil.copy(str(source_path), str(dest_path)) - -def copytree(source_path, dest_path, *args, **kwargs): - return shutil.copytree(str(source_path), str(dest_path), *args, **kwargs) - -def exists(path): - return os.path.exists(str(path)) - -def isdir(path): - return os.path.isdir(str(path)) - -def isfile(path): - return os.path.isfile(str(path)) - -def islink(path): - return os.path.islink(str(path)) - -def listdir(path): - return os.listdir(str(path)) - -def mkdir(path, *args, **kwargs): - return os.mkdir(str(path), *args, **kwargs) - -def makedirs(path, *args, **kwargs): - return os.makedirs(str(path), *args, **kwargs) - -def move(source_path, dest_path): - return shutil.move(str(source_path), str(dest_path)) - -def open(path, *args, **kwargs): - return builtins.open(str(path), *args, **kwargs) - -def remove(path): - return os.remove(str(path)) - -def rename(source_path, dest_path): - return os.rename(str(source_path), str(dest_path)) - -def rmdir(path): - return os.rmdir(str(path)) - -def rmtree(path): - return shutil.rmtree(str(path)) - -def stat(path): - return os.stat(str(path)) diff --git a/hscommon/path.py b/hscommon/path.py index a6f1675a..9ac8d197 100755 --- a/hscommon/path.py +++ b/hscommon/path.py @@ -12,6 +12,8 @@ import os.path as op import shutil import sys from itertools import takewhile +from functools import wraps +from inspect import signature class Path(tuple): """A handy class to work with paths. @@ -94,12 +96,11 @@ class Path(tuple): stop = -len(equal_elems) if equal_elems else None key = slice(key.start, stop, key.step) return Path(tuple.__getitem__(self, key)) + elif isinstance(key, (str, Path)): + return self + key else: return tuple.__getitem__(self, key) - def __getslice__(self, i, j): #I have to override it because tuple uses it. - return Path(tuple.__getslice__(self, i, j)) - def __hash__(self): return tuple.__hash__(self) @@ -133,6 +134,13 @@ class Path(tuple): def tobytes(self): return str(self).encode(sys.getfilesystemencoding()) + def parent(self): + return self[:-1] + + @property + def name(self): + return self[-1] + # OS method wrappers def exists(self): return op.exists(str(self)) @@ -153,7 +161,7 @@ class Path(tuple): return op.islink(str(self)) def listdir(self): - return os.listdir(str(self)) + return [self[name] for name in os.listdir(str(self))] def mkdir(self, *args, **kwargs): return os.mkdir(str(self), *args, **kwargs) @@ -182,3 +190,43 @@ class Path(tuple): def stat(self): return os.stat(str(self)) +def pathify(f): + """Ensure that every annotated :class:`Path` arguments are actually paths. + + When a function is decorated with ``@pathify``, every argument with annotated as Path will be + converted to a Path if it wasn't already. Example:: + + @pathify + def foo(path: Path, otherarg): + return path.listdir() + + Calling ``foo('/bar', 0)`` will convert ``'/bar'`` to ``Path('/bar')``. + """ + sig = signature(f) + pindexes = {i for i, p in enumerate(sig.parameters.values()) if p.annotation is Path} + pkeys = {k: v for k, v in sig.parameters.items() if v.annotation is Path} + def path_or_none(p): + return None if p is None else Path(p) + + @wraps(f) + def wrapped(*args, **kwargs): + args = tuple((path_or_none(a) if i in pindexes else a) for i, a in enumerate(args)) + kwargs = {k: (path_or_none(v) if k in pkeys else v) for k, v in kwargs.items()} + return f(*args, **kwargs) + + return wrapped + +def log_io_error(func): + """ Catches OSError, IOError and WindowsError and log them + """ + @wraps(func) + def wrapper(path, *args, **kwargs): + try: + return func(path, *args, **kwargs) + except (IOError, OSError) as e: + msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"' + classname = e.__class__.__name__ + funcname = func.__name__ + logging.warn(msg.format(classname, funcname, str(path), str(e))) + + return wrapper diff --git a/hscommon/tests/conflict_test.py b/hscommon/tests/conflict_test.py index 825c48a2..cce294e8 100644 --- a/hscommon/tests/conflict_test.py +++ b/hscommon/tests/conflict_test.py @@ -61,44 +61,44 @@ class TestCase_move_copy: def pytest_funcarg__do_setup(self, request): tmpdir = request.getfuncargvalue('tmpdir') self.path = Path(str(tmpdir)) - io.open(self.path + 'foo', 'w').close() - io.open(self.path + 'bar', 'w').close() - io.mkdir(self.path + 'dir') + self.path['foo'].open('w').close() + self.path['bar'].open('w').close() + self.path['dir'].mkdir() def test_move_no_conflict(self, do_setup): smart_move(self.path + 'foo', self.path + 'baz') - assert io.exists(self.path + 'baz') - assert not io.exists(self.path + 'foo') + assert self.path['baz'].exists() + assert not self.path['foo'].exists() def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move smart_copy(self.path + 'foo', self.path + 'baz') - assert io.exists(self.path + 'baz') - assert io.exists(self.path + 'foo') + assert self.path['baz'].exists() + assert self.path['foo'].exists() def test_move_no_conflict_dest_is_dir(self, do_setup): smart_move(self.path + 'foo', self.path + 'dir') - assert io.exists(self.path + ('dir', 'foo')) - assert not io.exists(self.path + 'foo') + assert self.path['dir']['foo'].exists() + assert not self.path['foo'].exists() def test_move_conflict(self, do_setup): smart_move(self.path + 'foo', self.path + 'bar') - assert io.exists(self.path + '[000] bar') - assert not io.exists(self.path + 'foo') + assert self.path['[000] bar'].exists() + assert not self.path['foo'].exists() def test_move_conflict_dest_is_dir(self, do_setup): - smart_move(self.path + 'foo', self.path + 'dir') - smart_move(self.path + 'bar', self.path + 'foo') - smart_move(self.path + 'foo', self.path + 'dir') - assert io.exists(self.path + ('dir', 'foo')) - assert io.exists(self.path + ('dir', '[000] foo')) - assert not io.exists(self.path + 'foo') - assert not io.exists(self.path + 'bar') + smart_move(self.path['foo'], self.path['dir']) + smart_move(self.path['bar'], self.path['foo']) + smart_move(self.path['foo'], self.path['dir']) + assert self.path['dir']['foo'].exists() + assert self.path['dir']['[000] foo'].exists() + assert not self.path['foo'].exists() + assert not self.path['bar'].exists() def test_copy_folder(self, tmpdir): # smart_copy also works on folders path = Path(str(tmpdir)) - io.mkdir(path + 'foo') - io.mkdir(path + 'bar') - smart_copy(path + 'foo', path + 'bar') # no crash - assert io.exists(path + '[000] bar') + path['foo'].mkdir() + path['bar'].mkdir() + smart_copy(path['foo'], path['bar']) # no crash + assert path['[000] bar'].exists() diff --git a/hscommon/tests/currency_test.py b/hscommon/tests/currency_test.py index 5c940a01..7b1c5fb3 100644 --- a/hscommon/tests/currency_test.py +++ b/hscommon/tests/currency_test.py @@ -9,7 +9,6 @@ from datetime import date import sqlite3 as sqlite -from .. import io from ..testutil import eq_, assert_almost_equal from ..currency import Currency, RatesDB, CAD, EUR, USD @@ -64,7 +63,7 @@ def test_db_with_connection(): def test_corrupt_db(tmpdir): dbpath = str(tmpdir.join('foo.db')) - fh = io.open(dbpath, 'w') + fh = open(dbpath, 'w') fh.write('corrupted') fh.close() db = RatesDB(dbpath) # no crash. deletes the old file and start a new db diff --git a/hscommon/tests/path_test.py b/hscommon/tests/path_test.py index 75c41665..8181d318 100644 --- a/hscommon/tests/path_test.py +++ b/hscommon/tests/path_test.py @@ -7,10 +7,11 @@ # http://www.hardcoded.net/licenses/bsd_license import sys +import os from pytest import raises, mark -from ..path import * +from ..path import Path, pathify from ..testutil import eq_ def pytest_funcarg__force_ossep(request): @@ -44,7 +45,7 @@ def test_init_with_tuple_and_list(force_ossep): def test_init_with_invalid_value(force_ossep): try: path = Path(42) - self.fail() + assert False except TypeError: pass @@ -63,6 +64,16 @@ def test_slicing(force_ossep): eq_('foo/bar',subpath) assert isinstance(subpath,Path) +def test_parent(force_ossep): + path = Path('foo/bar/bleh') + subpath = path.parent() + eq_('foo/bar', subpath) + assert isinstance(subpath, Path) + +def test_filename(force_ossep): + path = Path('foo/bar/bleh.ext') + eq_(path.name, 'bleh.ext') + def test_deal_with_empty_components(force_ossep): """Keep ONLY a leading space, which means we want a leading slash. """ @@ -99,7 +110,7 @@ def test_add(force_ossep): #Invalid concatenation try: Path(('foo','bar')) + 1 - self.fail() + assert False except TypeError: pass @@ -180,6 +191,16 @@ def test_Path_of_a_Path_returns_self(force_ossep): p = Path('foo/bar') assert Path(p) is p +def test_getitem_str(force_ossep): + # path['something'] returns the child path corresponding to the name + p = Path('/foo/bar') + eq_(p['baz'], Path('/foo/bar/baz')) + +def test_getitem_path(force_ossep): + # path[Path('something')] returns the child path corresponding to the name (or subpath) + p = Path('/foo/bar') + eq_(p[Path('baz/bleh')], Path('/foo/bar/baz/bleh')) + @mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate") def test_log_unicode_errors(force_ossep, monkeypatch, capsys): # When an there's a UnicodeDecodeError on path creation, log it so it can be possible @@ -206,4 +227,25 @@ def test_remove_drive_letter(monkeypatch): p = Path('C:\\') eq_(p.remove_drive_letter(), Path('')) p = Path('z:\\foo') - eq_(p.remove_drive_letter(), Path('foo')) \ No newline at end of file + eq_(p.remove_drive_letter(), Path('foo')) + +def test_pathify(): + @pathify + def foo(a: Path, b, c:Path): + return a, b, c + + a, b, c = foo('foo', 0, c=Path('bar')) + assert isinstance(a, Path) + assert a == Path('foo') + assert b == 0 + assert isinstance(c, Path) + assert c == Path('bar') + +def test_pathify_preserve_none(): + # @pathify preserves None value and doesn't try to return a Path + @pathify + def foo(a: Path): + return a + + a = foo(None) + assert a is None diff --git a/hscommon/tests/util_test.py b/hscommon/tests/util_test.py index 4d9967fd..d9fd1d7f 100644 --- a/hscommon/tests/util_test.py +++ b/hscommon/tests/util_test.py @@ -11,7 +11,6 @@ from io import StringIO from pytest import raises from ..testutil import eq_ -from .. import io from ..path import Path from ..util import * @@ -210,39 +209,49 @@ class TestCase_modified_after: monkeyplus.patch_osstat('first', st_mtime=42) assert modified_after('first', 'does_not_exist') # no crash + def test_first_file_is_none(self, monkeyplus): + # when the first file is None, we return False + monkeyplus.patch_osstat('second', st_mtime=42) + assert not modified_after(None, 'second') # no crash + + def test_second_file_is_none(self, monkeyplus): + # when the second file is None, we return True + monkeyplus.patch_osstat('first', st_mtime=42) + assert modified_after('first', None) # no crash + class TestCase_delete_if_empty: def test_is_empty(self, tmpdir): testpath = Path(str(tmpdir)) assert delete_if_empty(testpath) - assert not io.exists(testpath) + assert not testpath.exists() def test_not_empty(self, tmpdir): testpath = Path(str(tmpdir)) - io.mkdir(testpath + 'foo') + testpath['foo'].mkdir() assert not delete_if_empty(testpath) - assert io.exists(testpath) + assert testpath.exists() def test_with_files_to_delete(self, tmpdir): testpath = Path(str(tmpdir)) - io.open(testpath + 'foo', 'w') - io.open(testpath + 'bar', 'w') + testpath['foo'].open('w') + testpath['bar'].open('w') assert delete_if_empty(testpath, ['foo', 'bar']) - assert not io.exists(testpath) + assert not testpath.exists() def test_directory_in_files_to_delete(self, tmpdir): testpath = Path(str(tmpdir)) - io.mkdir(testpath + 'foo') + testpath['foo'].mkdir() assert not delete_if_empty(testpath, ['foo']) - assert io.exists(testpath) + assert testpath.exists() def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir): testpath = Path(str(tmpdir)) - io.open(testpath + 'foo', 'w') - io.open(testpath + 'bar', 'w') + testpath['foo'].open('w') + testpath['bar'].open('w') assert not delete_if_empty(testpath, ['foo']) - assert io.exists(testpath) - assert io.exists(testpath + 'foo') + assert testpath.exists() + assert testpath['foo'].exists() def test_doesnt_exist(self): # When the 'path' doesn't exist, just do nothing. @@ -251,7 +260,7 @@ class TestCase_delete_if_empty: def test_is_file(self, tmpdir): # When 'path' is a file, do nothing. p = Path(str(tmpdir)) + 'filename' - io.open(p, 'w').close() + p.open('w').close() delete_if_empty(p) # no crash def test_ioerror(self, tmpdir, monkeypatch): @@ -259,7 +268,7 @@ class TestCase_delete_if_empty: def do_raise(*args, **kw): raise OSError() - monkeypatch.setattr(io, 'rmdir', do_raise) + monkeypatch.setattr(Path, 'rmdir', do_raise) delete_if_empty(Path(str(tmpdir))) # no crash diff --git a/hscommon/util.py b/hscommon/util.py index 55858b1e..790d352c 100644 --- a/hscommon/util.py +++ b/hscommon/util.py @@ -15,8 +15,7 @@ import glob import shutil from datetime import timedelta -from . import io -from .path import Path +from .path import Path, pathify, log_io_error def nonone(value, replace_value): ''' Returns value if value is not None. Returns replace_value otherwise. @@ -267,15 +266,19 @@ def iterdaterange(start, end): #--- Files related -def modified_after(first_path, second_path): - """Returns True if first_path's mtime is higher than second_path's mtime.""" +@pathify +def modified_after(first_path: Path, second_path: Path): + """Returns ``True`` if first_path's mtime is higher than second_path's mtime. + + If one of the files doesn't exist or is ``None``, it is considered "never modified". + """ try: - first_mtime = io.stat(first_path).st_mtime - except EnvironmentError: + first_mtime = first_path.stat().st_mtime + except (EnvironmentError, AttributeError): return False try: - second_mtime = io.stat(second_path).st_mtime - except EnvironmentError: + second_mtime = second_path.stat().st_mtime + except (EnvironmentError, AttributeError): return True return first_mtime > second_mtime @@ -292,18 +295,19 @@ def find_in_path(name, paths=None): return op.join(path, name) return None -@io.log_io_error -def delete_if_empty(path, files_to_delete=[]): +@log_io_error +@pathify +def delete_if_empty(path: Path, files_to_delete=[]): ''' Deletes the directory at 'path' if it is empty or if it only contains files_to_delete. ''' - if not io.exists(path) or not io.isdir(path): + if not path.exists() or not path.isdir(): return - contents = io.listdir(path) - if any(name for name in contents if (name not in files_to_delete) or io.isdir(path + name)): + contents = path.listdir() + if any(p for p in contents if (p.name not in files_to_delete) or p.isdir()): return False - for name in contents: - io.remove(path + name) - io.rmdir(path) + for p in contents: + p.remove() + path.rmdir() return True def open_if_filename(infile, mode='rb'): @@ -313,7 +317,7 @@ def open_if_filename(infile, mode='rb'): Returns a tuple (shouldbeclosed,infile) infile is a file object """ if isinstance(infile, Path): - return (io.open(infile, mode), True) + return (infile.open(mode), True) if isinstance(infile, str): return (open(infile, mode), True) else: