Refactoring: Path API compatibility with pathlib

Refactored dupeGuru to make hscommon.path's API a bit close to pathlib's
API. It's not 100% compatible yet, but it's much better than before.

This is more of a hscommon refactoring than a dupeguru one, but since
duepGuru is the main user of Path, it was the driver behind the
refactoring.

This refactoring also see the introduction of @pathify, which ensure
Path arguments. Previously, we were often unsure of whether the caller
of a function was passing a Path or a str. This problem is now solved
and this allows us to remove hscommon.io, an ill-conceived attempt to
solve that same ambiguity problem.

Fixes #235.
This commit is contained in:
Virgil Dupras 2013-11-16 12:06:16 -05:00
parent e8c42740cf
commit 10dbfa9b38
24 changed files with 353 additions and 313 deletions

View File

@ -173,7 +173,7 @@ class DupeGuruME(DupeGuruBase):
DupeGuruBase._do_delete_dupe(self, dupe, *args)
def _create_file(self, path):
if (self.directories.itunes_libpath is not None) and (path in self.directories.itunes_libpath[:-1]):
if (self.directories.itunes_libpath is not None) and (path in self.directories.itunes_libpath.parent()):
if not hasattr(self, 'itunes_songs'):
songs = get_itunes_songs(self.directories.itunes_libpath)
self.itunes_songs = {song.path: song for song in songs}

View File

@ -12,9 +12,8 @@ import re
from appscript import app, its, k, CommandError, ApplicationNotFoundError
from hscommon import io
from hscommon.util import remove_invalid_xml, first
from hscommon.path import Path
from hscommon.path import Path, pathify
from hscommon.trans import trget
from cocoa import proxy
@ -76,11 +75,12 @@ class AperturePhoto(Photo):
def display_folder_path(self):
return APERTURE_PATH
def get_iphoto_or_aperture_pictures(plistpath, photo_class):
@pathify
def get_iphoto_or_aperture_pictures(plistpath: Path, photo_class):
# The structure of iPhoto and Aperture libraries for the base photo list are excactly the same.
if not io.exists(plistpath):
if not plistpath.exists():
return []
s = io.open(plistpath, 'rt', encoding='utf-8').read()
s = plistpath.open('rt', encoding='utf-8').read()
# There was a case where a guy had 0x10 chars in his plist, causing expat errors on loading
s = remove_invalid_xml(s, replace_with='')
# It seems that iPhoto sometimes doesn't properly escape & chars. The regexp below is to find
@ -123,12 +123,12 @@ class Directories(directories.Directories):
directories.Directories.__init__(self, fileclasses=[Photo])
try:
self.iphoto_libpath = get_iphoto_database_path()
self.set_state(self.iphoto_libpath[:-1], directories.DirectoryState.Excluded)
self.set_state(self.iphoto_libpath.parent(), directories.DirectoryState.Excluded)
except directories.InvalidPathError:
self.iphoto_libpath = None
try:
self.aperture_libpath = get_aperture_database_path()
self.set_state(self.aperture_libpath[:-1], directories.DirectoryState.Excluded)
self.set_state(self.aperture_libpath.parent(), directories.DirectoryState.Excluded)
except directories.InvalidPathError:
self.aperture_libpath = None
@ -255,12 +255,12 @@ class DupeGuruPE(DupeGuruBase):
DupeGuruBase._do_delete_dupe(self, dupe, *args)
def _create_file(self, path):
if (self.directories.iphoto_libpath is not None) and (path in self.directories.iphoto_libpath[:-1]):
if (self.directories.iphoto_libpath is not None) and (path in self.directories.iphoto_libpath.parent()):
if not hasattr(self, 'path2iphoto'):
photos = get_iphoto_pictures(self.directories.iphoto_libpath)
self.path2iphoto = {p.path: p for p in photos}
return self.path2iphoto.get(path)
if (self.directories.aperture_libpath is not None) and (path in self.directories.aperture_libpath[:-1]):
if (self.directories.aperture_libpath is not None) and (path in self.directories.aperture_libpath.parent()):
if not hasattr(self, 'path2aperture'):
photos = get_aperture_pictures(self.directories.aperture_libpath)
self.path2aperture = {p.path: p for p in photos}

View File

@ -9,8 +9,7 @@
import logging
import os.path as op
from hscommon import io
from hscommon.path import Path
from hscommon.path import Path, pathify
from cocoa import proxy
from core.scanner import ScanType
@ -27,8 +26,9 @@ def is_bundle(str_path):
class Bundle(fs.Folder):
@classmethod
def can_handle(cls, path):
return not io.islink(path) and io.isdir(path) and is_bundle(str(path))
@pathify
def can_handle(cls, path: Path):
return not path.islink() and path.isdir() and is_bundle(str(path))
class Directories(DirectoriesBase):

View File

@ -232,7 +232,7 @@ class DupeGuru(RegistrableApplication, Broadcaster):
ref = group.ref
linkfunc = os.link if use_hardlinks else os.symlink
linkfunc(str(ref.path), str_path)
self.clean_empty_dirs(dupe.path[:-1])
self.clean_empty_dirs(dupe.path.parent())
def _create_file(self, path):
# We add fs.Folder to fileclasses in case the file we're loading contains folder paths.
@ -375,7 +375,7 @@ class DupeGuru(RegistrableApplication, Broadcaster):
def clean_empty_dirs(self, path):
if self.options['clean_empty_dirs']:
while delete_if_empty(path, ['.DS_Store']):
path = path[:-1]
path = path.parent()
def copy_or_move(self, dupe, copy: bool, destination: str, dest_type: DestType):
source_path = dupe.path
@ -383,21 +383,21 @@ class DupeGuru(RegistrableApplication, Broadcaster):
dest_path = Path(destination)
if dest_type in {DestType.Relative, DestType.Absolute}:
# no filename, no windows drive letter
source_base = source_path.remove_drive_letter()[:-1]
source_base = source_path.remove_drive_letter().parent()
if dest_type == DestType.Relative:
source_base = source_base[location_path:]
dest_path = dest_path + source_base
dest_path = dest_path[source_base]
if not dest_path.exists():
dest_path.makedirs()
# Add filename to dest_path. For file move/copy, it's not required, but for folders, yes.
dest_path = dest_path + source_path[-1]
dest_path = dest_path[source_path.name]
logging.debug("Copy/Move operation from '%s' to '%s'", source_path, dest_path)
# Raises an EnvironmentError if there's a problem
if copy:
smart_copy(source_path, dest_path)
else:
smart_move(source_path, dest_path)
self.clean_empty_dirs(source_path[:-1])
self.clean_empty_dirs(source_path.parent())
def copy_or_move_marked(self, copy):
"""Start an async move (or copy) job on marked duplicates.

View File

@ -73,7 +73,7 @@ class Directories:
#---Private
def _default_state_for_path(self, path):
# Override this in subclasses to specify the state of some special folders.
if path[-1].startswith('.'): # hidden
if path.name.startswith('.'): # hidden
return DirectoryState.Excluded
def _get_files(self, from_path, j):
@ -94,9 +94,8 @@ class Directories:
file.is_ref = state == DirectoryState.Reference
filepaths.add(file.path)
yield file
subpaths = [from_path + name for name in from_path.listdir()]
# it's possible that a folder (bundle) gets into the file list. in that case, we don't want to recurse into it
subfolders = [p for p in subpaths if not p.islink() and p.isdir() and p not in filepaths]
subfolders = [p for p in from_path.listdir() if not p.islink() and p.isdir() and p not in filepaths]
for subfolder in subfolders:
for file in self._get_files(subfolder, j):
yield file
@ -143,9 +142,9 @@ class Directories:
:rtype: list of Path
"""
try:
names = [name for name in path.listdir() if (path + name).isdir()]
names.sort(key=lambda x:x.lower())
return [path + name for name in names]
subpaths = [p for p in path.listdir() if p.isdir()]
subpaths.sort(key=lambda x:x.name.lower())
return subpaths
except EnvironmentError:
return []
@ -178,7 +177,7 @@ class Directories:
default_state = self._default_state_for_path(path)
if default_state is not None:
return default_state
parent = path[:-1]
parent = path.parent()
if parent in self:
return self.get_state(parent)
else:

View File

@ -150,9 +150,9 @@ class File:
def rename(self, newname):
if newname == self.name:
return
destpath = self.path[:-1] + newname
destpath = self.path.parent()[newname]
if destpath.exists():
raise AlreadyExistsError(newname, self.path[:-1])
raise AlreadyExistsError(newname, self.path.parent())
try:
self.path.rename(destpath)
except EnvironmentError:
@ -173,11 +173,11 @@ class File:
@property
def name(self):
return self.path[-1]
return self.path.name
@property
def folder_path(self):
return self.path[:-1]
return self.path.parent()
class Folder(File):
@ -219,8 +219,7 @@ class Folder(File):
@property
def subfolders(self):
if self._subfolders is None:
subpaths = [self.path + name for name in self.path.listdir()]
subfolders = [p for p in subpaths if not p.islink() and p.isdir()]
subfolders = [p for p in self.path.listdir() if not p.islink() and p.isdir()]
self._subfolders = [self.__class__(p) for p in subfolders]
return self._subfolders
@ -248,18 +247,9 @@ def get_files(path, fileclasses=[File]):
:param fileclasses: List of candidate :class:`File` classes
"""
assert all(issubclass(fileclass, File) for fileclass in fileclasses)
def combine_paths(p1, p2):
try:
return p1 + p2
except Exception:
# This is temporary debug logging for #84.
logging.warning("Failed to combine %r and %r.", p1, p2)
raise
try:
paths = [combine_paths(path, name) for name in path.listdir()]
result = []
for path in paths:
for path in path.listdir():
file = get_file(path, fileclasses=fileclasses)
if file is not None:
result.append(file)

View File

@ -31,7 +31,7 @@ class DirectoryNode(Node):
self.clear()
subpaths = self._tree.app.directories.get_subfolders(self._directory_path)
for path in subpaths:
self.append(DirectoryNode(self._tree, path, path[-1]))
self.append(DirectoryNode(self._tree, path, path.name))
self._loaded = True
def update_all_states(self):

View File

@ -11,7 +11,6 @@ import os.path as op
import logging
from pytest import mark
from hscommon import io
from hscommon.path import Path
import hscommon.conflict
import hscommon.util
@ -57,7 +56,7 @@ class TestCaseDupeGuru:
# for this unit is pathetic. What's done is done. My approach now is to add tests for
# every change I want to make. The blowup was caused by a missing import.
p = Path(str(tmpdir))
io.open(p + 'foo', 'w').close()
p['foo'].open('w').close()
monkeypatch.setattr(hscommon.conflict, 'smart_copy', log_calls(lambda source_path, dest_path: None))
# XXX This monkeypatch is temporary. will be fixed in a better monkeypatcher.
monkeypatch.setattr(app, 'smart_copy', hscommon.conflict.smart_copy)
@ -73,14 +72,14 @@ class TestCaseDupeGuru:
def test_copy_or_move_clean_empty_dirs(self, tmpdir, monkeypatch):
tmppath = Path(str(tmpdir))
sourcepath = tmppath + 'source'
io.mkdir(sourcepath)
io.open(sourcepath + 'myfile', 'w')
sourcepath = tmppath['source']
sourcepath.mkdir()
sourcepath['myfile'].open('w')
app = TestApp().app
app.directories.add_path(tmppath)
[myfile] = app.directories.get_files()
monkeypatch.setattr(app, 'clean_empty_dirs', log_calls(lambda path: None))
app.copy_or_move(myfile, False, tmppath + 'dest', 0)
app.copy_or_move(myfile, False, tmppath['dest'], 0)
calls = app.clean_empty_dirs.calls
eq_(1, len(calls))
eq_(sourcepath, calls[0]['path'])
@ -104,8 +103,8 @@ class TestCaseDupeGuru:
# If the ignore_hardlink_matches option is set, don't match files hardlinking to the same
# inode.
tmppath = Path(str(tmpdir))
io.open(tmppath + 'myfile', 'w').write('foo')
os.link(str(tmppath + 'myfile'), str(tmppath + 'hardlink'))
tmppath['myfile'].open('w').write('foo')
os.link(str(tmppath['myfile']), str(tmppath['hardlink']))
app = TestApp().app
app.directories.add_path(tmppath)
app.scanner.scan_type = ScanType.Contents
@ -171,8 +170,8 @@ class TestCaseDupeGuruWithResults:
self.rtable.refresh()
tmpdir = request.getfuncargvalue('tmpdir')
tmppath = Path(str(tmpdir))
io.mkdir(tmppath + 'foo')
io.mkdir(tmppath + 'bar')
tmppath['foo'].mkdir()
tmppath['bar'].mkdir()
self.app.directories.add_path(tmppath)
def test_GetObjects(self, do_setup):
@ -417,11 +416,11 @@ class TestCaseDupeGuru_renameSelected:
def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir')
p = Path(str(tmpdir))
fp = open(str(p + 'foo bar 1'),mode='w')
fp = open(str(p['foo bar 1']),mode='w')
fp.close()
fp = open(str(p + 'foo bar 2'),mode='w')
fp = open(str(p['foo bar 2']),mode='w')
fp.close()
fp = open(str(p + 'foo bar 3'),mode='w')
fp = open(str(p['foo bar 3']),mode='w')
fp.close()
files = fs.get_files(p)
for f in files:
@ -444,7 +443,7 @@ class TestCaseDupeGuru_renameSelected:
g = self.groups[0]
self.rtable.select([1])
assert app.rename_selected('renamed')
names = io.listdir(self.p)
names = [p.name for p in self.p.listdir()]
assert 'renamed' in names
assert 'foo bar 2' not in names
eq_(g.dupes[0].name, 'renamed')
@ -457,7 +456,7 @@ class TestCaseDupeGuru_renameSelected:
assert not app.rename_selected('renamed')
msg = logging.warning.calls[0]['msg']
eq_('dupeGuru Warning: list index out of range', msg)
names = io.listdir(self.p)
names = [p.name for p in self.p.listdir()]
assert 'renamed' not in names
assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2')
@ -470,7 +469,7 @@ class TestCaseDupeGuru_renameSelected:
assert not app.rename_selected('foo bar 1')
msg = logging.warning.calls[0]['msg']
assert msg.startswith('dupeGuru Warning: \'foo bar 1\' already exists in')
names = io.listdir(self.p)
names = [p.name for p in self.p.listdir()]
assert 'foo bar 1' in names
assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2')
@ -480,9 +479,9 @@ class TestAppWithDirectoriesInTree:
def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir')
p = Path(str(tmpdir))
io.mkdir(p + 'sub1')
io.mkdir(p + 'sub2')
io.mkdir(p + 'sub3')
p['sub1'].mkdir()
p['sub2'].mkdir()
p['sub3'].mkdir()
app = TestApp()
self.app = app.app
self.dtree = app.dtree

View File

@ -102,11 +102,11 @@ class NamedObject:
@property
def path(self):
return self._folder + self.name
return self._folder[self.name]
@property
def folder_path(self):
return self.path[:-1]
return self.path.parent()
@property
def extension(self):

View File

@ -12,7 +12,6 @@ import tempfile
import shutil
from pytest import raises
from hscommon import io
from hscommon.path import Path
from hscommon.testutil import eq_
@ -20,27 +19,27 @@ from ..directories import *
def create_fake_fs(rootpath):
# We have it as a separate function because other units are using it.
rootpath = rootpath + 'fs'
io.mkdir(rootpath)
io.mkdir(rootpath + 'dir1')
io.mkdir(rootpath + 'dir2')
io.mkdir(rootpath + 'dir3')
fp = io.open(rootpath + 'file1.test', 'w')
rootpath = rootpath['fs']
rootpath.mkdir()
rootpath['dir1'].mkdir()
rootpath['dir2'].mkdir()
rootpath['dir3'].mkdir()
fp = rootpath['file1.test'].open('w')
fp.write('1')
fp.close()
fp = io.open(rootpath + 'file2.test', 'w')
fp = rootpath['file2.test'].open('w')
fp.write('12')
fp.close()
fp = io.open(rootpath + 'file3.test', 'w')
fp = rootpath['file3.test'].open('w')
fp.write('123')
fp.close()
fp = io.open(rootpath + ('dir1', 'file1.test'), 'w')
fp = rootpath['dir1']['file1.test'].open('w')
fp.write('1')
fp.close()
fp = io.open(rootpath + ('dir2', 'file2.test'), 'w')
fp = rootpath['dir2']['file2.test'].open('w')
fp.write('12')
fp.close()
fp = io.open(rootpath + ('dir3', 'file3.test'), 'w')
fp = rootpath['dir3']['file3.test'].open('w')
fp.write('123')
fp.close()
return rootpath
@ -50,9 +49,9 @@ def setup_module(module):
# and another with a more complex structure.
testpath = Path(tempfile.mkdtemp())
module.testpath = testpath
rootpath = testpath + 'onefile'
io.mkdir(rootpath)
fp = io.open(rootpath + 'test.txt', 'w')
rootpath = testpath['onefile']
rootpath.mkdir()
fp = rootpath['test.txt'].open('w')
fp.write('test_data')
fp.close()
create_fake_fs(testpath)
@ -67,30 +66,30 @@ def test_empty():
def test_add_path():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
eq_(1,len(d))
assert p in d
assert (p + 'foobar') in d
assert p[:-1] not in d
p = testpath + 'fs'
assert (p['foobar']) in d
assert p.parent() not in d
p = testpath['fs']
d.add_path(p)
eq_(2,len(d))
assert p in d
def test_AddPath_when_path_is_already_there():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p + 'foobar')
d.add_path(p['foobar'])
eq_(1, len(d))
def test_add_path_containing_paths_already_there():
d = Directories()
d.add_path(testpath + 'onefile')
d.add_path(testpath['onefile'])
eq_(1, len(d))
d.add_path(testpath)
eq_(len(d), 1)
@ -98,7 +97,7 @@ def test_add_path_containing_paths_already_there():
def test_AddPath_non_latin(tmpdir):
p = Path(str(tmpdir))
to_add = p + 'unicode\u201a'
to_add = p['unicode\u201a']
os.mkdir(str(to_add))
d = Directories()
try:
@ -108,24 +107,24 @@ def test_AddPath_non_latin(tmpdir):
def test_del():
d = Directories()
d.add_path(testpath + 'onefile')
d.add_path(testpath['onefile'])
try:
del d[1]
assert False
except IndexError:
pass
d.add_path(testpath + 'fs')
d.add_path(testpath['fs'])
del d[1]
eq_(1, len(d))
def test_states():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
eq_(DirectoryState.Normal ,d.get_state(p))
d.set_state(p, DirectoryState.Reference)
eq_(DirectoryState.Reference ,d.get_state(p))
eq_(DirectoryState.Reference ,d.get_state(p + 'dir1'))
eq_(DirectoryState.Reference ,d.get_state(p['dir1']))
eq_(1,len(d.states))
eq_(p,list(d.states.keys())[0])
eq_(DirectoryState.Reference ,d.states[p])
@ -133,67 +132,67 @@ def test_states():
def test_get_state_with_path_not_there():
# When the path's not there, just return DirectoryState.Normal
d = Directories()
d.add_path(testpath + 'onefile')
d.add_path(testpath['onefile'])
eq_(d.get_state(testpath), DirectoryState.Normal)
def test_states_remain_when_larger_directory_eat_smaller_ones():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
d.set_state(p, DirectoryState.Excluded)
d.add_path(testpath)
d.set_state(testpath, DirectoryState.Reference)
eq_(DirectoryState.Excluded ,d.get_state(p))
eq_(DirectoryState.Excluded ,d.get_state(p + 'dir1'))
eq_(DirectoryState.Excluded ,d.get_state(p['dir1']))
eq_(DirectoryState.Reference ,d.get_state(testpath))
def test_set_state_keep_state_dict_size_to_minimum():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
d.set_state(p, DirectoryState.Reference)
d.set_state(p + 'dir1', DirectoryState.Reference)
d.set_state(p['dir1'], DirectoryState.Reference)
eq_(1,len(d.states))
eq_(DirectoryState.Reference ,d.get_state(p + 'dir1'))
d.set_state(p + 'dir1', DirectoryState.Normal)
eq_(DirectoryState.Reference ,d.get_state(p['dir1']))
d.set_state(p['dir1'], DirectoryState.Normal)
eq_(2,len(d.states))
eq_(DirectoryState.Normal ,d.get_state(p + 'dir1'))
d.set_state(p + 'dir1', DirectoryState.Reference)
eq_(DirectoryState.Normal ,d.get_state(p['dir1']))
d.set_state(p['dir1'], DirectoryState.Reference)
eq_(1,len(d.states))
eq_(DirectoryState.Reference ,d.get_state(p + 'dir1'))
eq_(DirectoryState.Reference ,d.get_state(p['dir1']))
def test_get_files():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
d.set_state(p + 'dir1', DirectoryState.Reference)
d.set_state(p + 'dir2', DirectoryState.Excluded)
d.set_state(p['dir1'], DirectoryState.Reference)
d.set_state(p['dir2'], DirectoryState.Excluded)
files = list(d.get_files())
eq_(5, len(files))
for f in files:
if f.path[:-1] == p + 'dir1':
if f.path.parent() == p['dir1']:
assert f.is_ref
else:
assert not f.is_ref
def test_get_folders():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
d.set_state(p + 'dir1', DirectoryState.Reference)
d.set_state(p + 'dir2', DirectoryState.Excluded)
d.set_state(p['dir1'], DirectoryState.Reference)
d.set_state(p['dir2'], DirectoryState.Excluded)
folders = list(d.get_folders())
eq_(len(folders), 3)
ref = [f for f in folders if f.is_ref]
not_ref = [f for f in folders if not f.is_ref]
eq_(len(ref), 1)
eq_(ref[0].path, p + 'dir1')
eq_(ref[0].path, p['dir1'])
eq_(len(not_ref), 2)
eq_(ref[0].size, 1)
def test_get_files_with_inherited_exclusion():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
d.set_state(p, DirectoryState.Excluded)
eq_([], list(d.get_files()))
@ -202,19 +201,19 @@ def test_save_and_load(tmpdir):
d1 = Directories()
d2 = Directories()
p1 = Path(str(tmpdir.join('p1')))
io.mkdir(p1)
p1.mkdir()
p2 = Path(str(tmpdir.join('p2')))
io.mkdir(p2)
p2.mkdir()
d1.add_path(p1)
d1.add_path(p2)
d1.set_state(p1, DirectoryState.Reference)
d1.set_state(p1 + 'dir1', DirectoryState.Excluded)
d1.set_state(p1['dir1'], DirectoryState.Excluded)
tmpxml = str(tmpdir.join('directories_testunit.xml'))
d1.save_to_file(tmpxml)
d2.load_from_file(tmpxml)
eq_(2, len(d2))
eq_(DirectoryState.Reference ,d2.get_state(p1))
eq_(DirectoryState.Excluded ,d2.get_state(p1 + 'dir1'))
eq_(DirectoryState.Excluded ,d2.get_state(p1['dir1']))
def test_invalid_path():
d = Directories()
@ -234,12 +233,12 @@ def test_load_from_file_with_invalid_path(tmpdir):
#This test simulates a load from file resulting in a
#InvalidPath raise. Other directories must be loaded.
d1 = Directories()
d1.add_path(testpath + 'onefile')
d1.add_path(testpath['onefile'])
#Will raise InvalidPath upon loading
p = Path(str(tmpdir.join('toremove')))
io.mkdir(p)
p.mkdir()
d1.add_path(p)
io.rmdir(p)
p.rmdir()
tmpxml = str(tmpdir.join('directories_testunit.xml'))
d1.save_to_file(tmpxml)
d2 = Directories()
@ -248,11 +247,11 @@ def test_load_from_file_with_invalid_path(tmpdir):
def test_unicode_save(tmpdir):
d = Directories()
p1 = Path(str(tmpdir)) + 'hello\xe9'
io.mkdir(p1)
io.mkdir(p1 + 'foo\xe9')
p1 = Path(str(tmpdir))['hello\xe9']
p1.mkdir()
p1['foo\xe9'].mkdir()
d.add_path(p1)
d.set_state(p1 + 'foo\xe9', DirectoryState.Excluded)
d.set_state(p1['foo\xe9'], DirectoryState.Excluded)
tmpxml = str(tmpdir.join('directories_testunit.xml'))
try:
d.save_to_file(tmpxml)
@ -261,12 +260,12 @@ def test_unicode_save(tmpdir):
def test_get_files_refreshes_its_directories():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
files = d.get_files()
eq_(6, len(list(files)))
time.sleep(1)
os.remove(str(p + ('dir1','file1.test')))
os.remove(str(p['dir1']['file1.test']))
files = d.get_files()
eq_(5, len(list(files)))
@ -274,14 +273,14 @@ def test_get_files_does_not_choke_on_non_existing_directories(tmpdir):
d = Directories()
p = Path(str(tmpdir))
d.add_path(p)
io.rmtree(p)
p.rmtree()
eq_([], list(d.get_files()))
def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir):
d = Directories()
p = Path(str(tmpdir))
hidden_dir_path = p + '.foo'
io.mkdir(p + '.foo')
hidden_dir_path = p['.foo']
p['.foo'].mkdir()
d.add_path(p)
eq_(d.get_state(hidden_dir_path), DirectoryState.Excluded)
# But it can be overriden
@ -297,16 +296,16 @@ def test_default_path_state_override(tmpdir):
d = MyDirectories()
p1 = Path(str(tmpdir))
io.mkdir(p1 + 'foobar')
io.open(p1 + 'foobar/somefile', 'w').close()
io.mkdir(p1 + 'foobaz')
io.open(p1 + 'foobaz/somefile', 'w').close()
p1['foobar'].mkdir()
p1['foobar/somefile'].open('w').close()
p1['foobaz'].mkdir()
p1['foobaz/somefile'].open('w').close()
d.add_path(p1)
eq_(d.get_state(p1 + 'foobaz'), DirectoryState.Normal)
eq_(d.get_state(p1 + 'foobar'), DirectoryState.Excluded)
eq_(d.get_state(p1['foobaz']), DirectoryState.Normal)
eq_(d.get_state(p1['foobar']), DirectoryState.Excluded)
eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there
# However, the default state can be changed
d.set_state(p1 + 'foobar', DirectoryState.Normal)
eq_(d.get_state(p1 + 'foobar'), DirectoryState.Normal)
d.set_state(p1['foobar'], DirectoryState.Normal)
eq_(d.get_state(p1['foobar']), DirectoryState.Normal)
eq_(len(list(d.get_files())), 2)

View File

@ -25,12 +25,12 @@ def test_md5_aggregate_subfiles_sorted(tmpdir):
#same order everytime.
p = create_fake_fs(Path(str(tmpdir)))
b = fs.Folder(p)
md51 = fs.File(p + ('dir1', 'file1.test')).md5
md52 = fs.File(p + ('dir2', 'file2.test')).md5
md53 = fs.File(p + ('dir3', 'file3.test')).md5
md54 = fs.File(p + 'file1.test').md5
md55 = fs.File(p + 'file2.test').md5
md56 = fs.File(p + 'file3.test').md5
md51 = fs.File(p['dir1']['file1.test']).md5
md52 = fs.File(p['dir2']['file2.test']).md5
md53 = fs.File(p['dir3']['file3.test']).md5
md54 = fs.File(p['file1.test']).md5
md55 = fs.File(p['file2.test']).md5
md56 = fs.File(p['file3.test']).md5
# The expected md5 is the md5 of md5s for folders and the direct md5 for files
folder_md51 = hashlib.md5(md51).digest()
folder_md52 = hashlib.md5(md52).digest()

View File

@ -7,7 +7,6 @@
# http://www.hardcoded.net/licenses/bsd_license
from jobprogress import job
from hscommon import io
from hscommon.path import Path
from hscommon.testutil import eq_
@ -21,7 +20,7 @@ class NamedObject:
if path is None:
path = Path(name)
else:
path = Path(path) + name
path = Path(path)[name]
self.name = name
self.size = size
self.path = path
@ -37,7 +36,6 @@ def pytest_funcarg__fake_fileexists(request):
# This is a hack to avoid invalidating all previous tests since the scanner started to test
# for file existence before doing the match grouping.
monkeypatch = request.getfuncargvalue('monkeypatch')
monkeypatch.setattr(io, 'exists', lambda _: True)
monkeypatch.setattr(Path, 'exists', lambda _: True)
def test_empty(fake_fileexists):
@ -471,11 +469,11 @@ def test_dont_group_files_that_dont_exist(tmpdir):
s = Scanner()
s.scan_type = ScanType.Contents
p = Path(str(tmpdir))
io.open(p + 'file1', 'w').write('foo')
io.open(p + 'file2', 'w').write('foo')
p['file1'].open('w').write('foo')
p['file2'].open('w').write('foo')
file1, file2 = fs.get_files(p)
def getmatches(*args, **kw):
io.remove(file2.path)
file2.path.remove()
return [Match(file1, file2, 100)]
s._getmatches = getmatches

View File

@ -36,7 +36,7 @@ class MusicFile(fs.File):
def can_handle(cls, path):
if not fs.File.can_handle(path):
return False
return get_file_ext(path[-1]) in auto.EXT2CLASS
return get_file_ext(path.name) in auto.EXT2CLASS
def get_display_info(self, group, delta):
size = self.size

View File

@ -60,7 +60,7 @@ class Photo(fs.File):
@classmethod
def can_handle(cls, path):
return fs.File.can_handle(path) and get_file_ext(path[-1]) in cls.HANDLED_EXTS
return fs.File.can_handle(path) and get_file_ext(path.name) in cls.HANDLED_EXTS
def get_display_info(self, group, delta):
size = self.size

View File

@ -7,7 +7,10 @@
# http://www.hardcoded.net/licenses/bsd_license
import re
from . import io
import os
import shutil
from .path import Path, pathify
#This matches [123], but not [12] (3 digits being the minimum).
#It also matches [1234] [12345] etc..
@ -36,27 +39,28 @@ def get_unconflicted_name(name):
def is_conflicted(name):
return re_conflict.match(name) is not None
def _smart_move_or_copy(operation, source_path, dest_path):
@pathify
def _smart_move_or_copy(operation, source_path: Path, dest_path: Path):
''' Use move() or copy() to move and copy file with the conflict management, but without the
slowness of the fs system.
'''
if io.isdir(dest_path) and not io.isdir(source_path):
dest_path = dest_path + source_path[-1]
if io.exists(dest_path):
filename = dest_path[-1]
dest_dir_path = dest_path[:-1]
newname = get_conflicted_name(io.listdir(dest_dir_path), filename)
dest_path = dest_dir_path + newname
operation(source_path, dest_path)
if dest_path.isdir() and not source_path.isdir():
dest_path = dest_path[source_path.name]
if dest_path.exists():
filename = dest_path.name
dest_dir_path = dest_path.parent()
newname = get_conflicted_name(os.listdir(str(dest_dir_path)), filename)
dest_path = dest_dir_path[newname]
operation(str(source_path), str(dest_path))
def smart_move(source_path, dest_path):
_smart_move_or_copy(io.move, source_path, dest_path)
_smart_move_or_copy(shutil.move, source_path, dest_path)
def smart_copy(source_path, dest_path):
try:
_smart_move_or_copy(io.copy, source_path, dest_path)
_smart_move_or_copy(shutil.copy, source_path, dest_path)
except IOError as e:
if e.errno in {21, 13}: # it's a directory, code is 21 on OS X / Linux and 13 on Windows
_smart_move_or_copy(io.copytree, source_path, dest_path)
_smart_move_or_copy(shutil.copytree, source_path, dest_path)
else:
raise

View File

@ -6,13 +6,13 @@
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import os
from datetime import datetime, date, timedelta
import logging
import sqlite3 as sqlite
import threading
from queue import Queue, Empty
from . import io
from .path import Path
from .util import iterdaterange
@ -271,6 +271,9 @@ EUR = Currency(code='EUR')
class CurrencyNotSupportedException(Exception):
"""The current exchange rate provider doesn't support the requested currency."""
class RateProviderUnavailable(Exception):
"""The rate provider is temporarily unavailable."""
def date2str(date):
return '%d%02d%02d' % (date.year, date.month, date.day)
@ -314,7 +317,7 @@ class RatesDB:
logging.warning("Corrupt currency database at {0}. Starting over.".format(repr(self.db_or_path)))
if isinstance(self.db_or_path, (str, Path)):
self.con.close()
io.remove(Path(self.db_or_path))
os.remove(str(self.db_or_path))
self.con = sqlite.connect(str(self.db_or_path))
else:
logging.warning("Can't re-use the file, using a memory table")
@ -452,11 +455,19 @@ class RatesDB:
values = rate_provider(currency, fetch_start, fetch_end)
except CurrencyNotSupportedException:
continue
except RateProviderUnavailable:
logging.debug("Fetching failed due to temporary problems.")
break
else:
if values:
self._fetched_values.put((values, currency, fetch_start, fetch_end))
logging.debug("Fetching successful!")
break
if not values:
# We didn't get any value from the server, which means that we asked for
# rates that couldn't be delivered. Still, we report empty values so
# that the cache can correctly remember this unavailability so that we
# don't repeatedly fetch those ranges.
values = []
self._fetched_values.put((values, currency, fetch_start, fetch_end))
logging.debug("Fetching successful!")
break
else:
logging.debug("Fetching failed!")

View File

@ -7,6 +7,7 @@
# http://www.hardcoded.net/licenses/bsd_license
import os.path as op
import logging
class SpecialFolder:
AppData = 1
@ -38,7 +39,13 @@ def special_folder_path(special_folder, appname=None):
return _special_folder_path(special_folder, appname)
try:
from cocoa import proxy
# Normally, we would simply do "from cocoa import proxy", but due to a bug in pytest (currently
# at v2.4.2), our test suite is broken when we do that. This below is a workaround until that
# bug is fixed.
import cocoa
if not hasattr(cocoa, 'proxy'):
raise ImportError()
proxy = cocoa.proxy
_open_url = proxy.openURL_
_open_path = proxy.openPath_
_reveal_path = proxy.revealPath_
@ -71,4 +78,14 @@ except ImportError:
return str(QDesktopServices.storageLocation(qtfolder))
except ImportError:
raise Exception("Can't setup desktop functions!")
# We're either running tests, and these functions don't matter much or we're in a really
# weird situation. Let's just have dummy fallbacks.
logging.warning("Can't setup desktop functions!")
def _open_path(path):
pass
def _reveal_path(path):
pass
def _special_folder_path(special_folder, appname=None):
return '/tmp'

View File

@ -1,79 +0,0 @@
# Created By: Virgil Dupras
# Created On: 2007-10-23
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
# HS code should only deal with Path instances, not string paths. One of the annoyances of this
# is to always have to convert Path instances with unicode() when calling open() or listdir() etc..
# this unit takes care of this
import builtins
import os
import os.path
import shutil
import logging
def log_io_error(func):
""" Catches OSError, IOError and WindowsError and log them
"""
def wrapper(path, *args, **kwargs):
try:
return func(path, *args, **kwargs)
except (IOError, OSError) as e:
msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"'
classname = e.__class__.__name__
funcname = func.__name__
logging.warn(msg.format(classname, funcname, str(path), str(e)))
return wrapper
def copy(source_path, dest_path):
return shutil.copy(str(source_path), str(dest_path))
def copytree(source_path, dest_path, *args, **kwargs):
return shutil.copytree(str(source_path), str(dest_path), *args, **kwargs)
def exists(path):
return os.path.exists(str(path))
def isdir(path):
return os.path.isdir(str(path))
def isfile(path):
return os.path.isfile(str(path))
def islink(path):
return os.path.islink(str(path))
def listdir(path):
return os.listdir(str(path))
def mkdir(path, *args, **kwargs):
return os.mkdir(str(path), *args, **kwargs)
def makedirs(path, *args, **kwargs):
return os.makedirs(str(path), *args, **kwargs)
def move(source_path, dest_path):
return shutil.move(str(source_path), str(dest_path))
def open(path, *args, **kwargs):
return builtins.open(str(path), *args, **kwargs)
def remove(path):
return os.remove(str(path))
def rename(source_path, dest_path):
return os.rename(str(source_path), str(dest_path))
def rmdir(path):
return os.rmdir(str(path))
def rmtree(path):
return shutil.rmtree(str(path))
def stat(path):
return os.stat(str(path))

View File

@ -12,6 +12,8 @@ import os.path as op
import shutil
import sys
from itertools import takewhile
from functools import wraps
from inspect import signature
class Path(tuple):
"""A handy class to work with paths.
@ -94,12 +96,11 @@ class Path(tuple):
stop = -len(equal_elems) if equal_elems else None
key = slice(key.start, stop, key.step)
return Path(tuple.__getitem__(self, key))
elif isinstance(key, (str, Path)):
return self + key
else:
return tuple.__getitem__(self, key)
def __getslice__(self, i, j): #I have to override it because tuple uses it.
return Path(tuple.__getslice__(self, i, j))
def __hash__(self):
return tuple.__hash__(self)
@ -133,6 +134,13 @@ class Path(tuple):
def tobytes(self):
return str(self).encode(sys.getfilesystemencoding())
def parent(self):
return self[:-1]
@property
def name(self):
return self[-1]
# OS method wrappers
def exists(self):
return op.exists(str(self))
@ -153,7 +161,7 @@ class Path(tuple):
return op.islink(str(self))
def listdir(self):
return os.listdir(str(self))
return [self[name] for name in os.listdir(str(self))]
def mkdir(self, *args, **kwargs):
return os.mkdir(str(self), *args, **kwargs)
@ -182,3 +190,43 @@ class Path(tuple):
def stat(self):
return os.stat(str(self))
def pathify(f):
"""Ensure that every annotated :class:`Path` arguments are actually paths.
When a function is decorated with ``@pathify``, every argument with annotated as Path will be
converted to a Path if it wasn't already. Example::
@pathify
def foo(path: Path, otherarg):
return path.listdir()
Calling ``foo('/bar', 0)`` will convert ``'/bar'`` to ``Path('/bar')``.
"""
sig = signature(f)
pindexes = {i for i, p in enumerate(sig.parameters.values()) if p.annotation is Path}
pkeys = {k: v for k, v in sig.parameters.items() if v.annotation is Path}
def path_or_none(p):
return None if p is None else Path(p)
@wraps(f)
def wrapped(*args, **kwargs):
args = tuple((path_or_none(a) if i in pindexes else a) for i, a in enumerate(args))
kwargs = {k: (path_or_none(v) if k in pkeys else v) for k, v in kwargs.items()}
return f(*args, **kwargs)
return wrapped
def log_io_error(func):
""" Catches OSError, IOError and WindowsError and log them
"""
@wraps(func)
def wrapper(path, *args, **kwargs):
try:
return func(path, *args, **kwargs)
except (IOError, OSError) as e:
msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"'
classname = e.__class__.__name__
funcname = func.__name__
logging.warn(msg.format(classname, funcname, str(path), str(e)))
return wrapper

View File

@ -61,44 +61,44 @@ class TestCase_move_copy:
def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir')
self.path = Path(str(tmpdir))
io.open(self.path + 'foo', 'w').close()
io.open(self.path + 'bar', 'w').close()
io.mkdir(self.path + 'dir')
self.path['foo'].open('w').close()
self.path['bar'].open('w').close()
self.path['dir'].mkdir()
def test_move_no_conflict(self, do_setup):
smart_move(self.path + 'foo', self.path + 'baz')
assert io.exists(self.path + 'baz')
assert not io.exists(self.path + 'foo')
assert self.path['baz'].exists()
assert not self.path['foo'].exists()
def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move
smart_copy(self.path + 'foo', self.path + 'baz')
assert io.exists(self.path + 'baz')
assert io.exists(self.path + 'foo')
assert self.path['baz'].exists()
assert self.path['foo'].exists()
def test_move_no_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + 'foo', self.path + 'dir')
assert io.exists(self.path + ('dir', 'foo'))
assert not io.exists(self.path + 'foo')
assert self.path['dir']['foo'].exists()
assert not self.path['foo'].exists()
def test_move_conflict(self, do_setup):
smart_move(self.path + 'foo', self.path + 'bar')
assert io.exists(self.path + '[000] bar')
assert not io.exists(self.path + 'foo')
assert self.path['[000] bar'].exists()
assert not self.path['foo'].exists()
def test_move_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + 'foo', self.path + 'dir')
smart_move(self.path + 'bar', self.path + 'foo')
smart_move(self.path + 'foo', self.path + 'dir')
assert io.exists(self.path + ('dir', 'foo'))
assert io.exists(self.path + ('dir', '[000] foo'))
assert not io.exists(self.path + 'foo')
assert not io.exists(self.path + 'bar')
smart_move(self.path['foo'], self.path['dir'])
smart_move(self.path['bar'], self.path['foo'])
smart_move(self.path['foo'], self.path['dir'])
assert self.path['dir']['foo'].exists()
assert self.path['dir']['[000] foo'].exists()
assert not self.path['foo'].exists()
assert not self.path['bar'].exists()
def test_copy_folder(self, tmpdir):
# smart_copy also works on folders
path = Path(str(tmpdir))
io.mkdir(path + 'foo')
io.mkdir(path + 'bar')
smart_copy(path + 'foo', path + 'bar') # no crash
assert io.exists(path + '[000] bar')
path['foo'].mkdir()
path['bar'].mkdir()
smart_copy(path['foo'], path['bar']) # no crash
assert path['[000] bar'].exists()

View File

@ -9,7 +9,6 @@
from datetime import date
import sqlite3 as sqlite
from .. import io
from ..testutil import eq_, assert_almost_equal
from ..currency import Currency, RatesDB, CAD, EUR, USD
@ -64,7 +63,7 @@ def test_db_with_connection():
def test_corrupt_db(tmpdir):
dbpath = str(tmpdir.join('foo.db'))
fh = io.open(dbpath, 'w')
fh = open(dbpath, 'w')
fh.write('corrupted')
fh.close()
db = RatesDB(dbpath) # no crash. deletes the old file and start a new db

View File

@ -7,10 +7,11 @@
# http://www.hardcoded.net/licenses/bsd_license
import sys
import os
from pytest import raises, mark
from ..path import *
from ..path import Path, pathify
from ..testutil import eq_
def pytest_funcarg__force_ossep(request):
@ -44,7 +45,7 @@ def test_init_with_tuple_and_list(force_ossep):
def test_init_with_invalid_value(force_ossep):
try:
path = Path(42)
self.fail()
assert False
except TypeError:
pass
@ -63,6 +64,16 @@ def test_slicing(force_ossep):
eq_('foo/bar',subpath)
assert isinstance(subpath,Path)
def test_parent(force_ossep):
path = Path('foo/bar/bleh')
subpath = path.parent()
eq_('foo/bar', subpath)
assert isinstance(subpath, Path)
def test_filename(force_ossep):
path = Path('foo/bar/bleh.ext')
eq_(path.name, 'bleh.ext')
def test_deal_with_empty_components(force_ossep):
"""Keep ONLY a leading space, which means we want a leading slash.
"""
@ -99,7 +110,7 @@ def test_add(force_ossep):
#Invalid concatenation
try:
Path(('foo','bar')) + 1
self.fail()
assert False
except TypeError:
pass
@ -180,6 +191,16 @@ def test_Path_of_a_Path_returns_self(force_ossep):
p = Path('foo/bar')
assert Path(p) is p
def test_getitem_str(force_ossep):
# path['something'] returns the child path corresponding to the name
p = Path('/foo/bar')
eq_(p['baz'], Path('/foo/bar/baz'))
def test_getitem_path(force_ossep):
# path[Path('something')] returns the child path corresponding to the name (or subpath)
p = Path('/foo/bar')
eq_(p[Path('baz/bleh')], Path('/foo/bar/baz/bleh'))
@mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate")
def test_log_unicode_errors(force_ossep, monkeypatch, capsys):
# When an there's a UnicodeDecodeError on path creation, log it so it can be possible
@ -206,4 +227,25 @@ def test_remove_drive_letter(monkeypatch):
p = Path('C:\\')
eq_(p.remove_drive_letter(), Path(''))
p = Path('z:\\foo')
eq_(p.remove_drive_letter(), Path('foo'))
eq_(p.remove_drive_letter(), Path('foo'))
def test_pathify():
@pathify
def foo(a: Path, b, c:Path):
return a, b, c
a, b, c = foo('foo', 0, c=Path('bar'))
assert isinstance(a, Path)
assert a == Path('foo')
assert b == 0
assert isinstance(c, Path)
assert c == Path('bar')
def test_pathify_preserve_none():
# @pathify preserves None value and doesn't try to return a Path
@pathify
def foo(a: Path):
return a
a = foo(None)
assert a is None

View File

@ -11,7 +11,6 @@ from io import StringIO
from pytest import raises
from ..testutil import eq_
from .. import io
from ..path import Path
from ..util import *
@ -210,39 +209,49 @@ class TestCase_modified_after:
monkeyplus.patch_osstat('first', st_mtime=42)
assert modified_after('first', 'does_not_exist') # no crash
def test_first_file_is_none(self, monkeyplus):
# when the first file is None, we return False
monkeyplus.patch_osstat('second', st_mtime=42)
assert not modified_after(None, 'second') # no crash
def test_second_file_is_none(self, monkeyplus):
# when the second file is None, we return True
monkeyplus.patch_osstat('first', st_mtime=42)
assert modified_after('first', None) # no crash
class TestCase_delete_if_empty:
def test_is_empty(self, tmpdir):
testpath = Path(str(tmpdir))
assert delete_if_empty(testpath)
assert not io.exists(testpath)
assert not testpath.exists()
def test_not_empty(self, tmpdir):
testpath = Path(str(tmpdir))
io.mkdir(testpath + 'foo')
testpath['foo'].mkdir()
assert not delete_if_empty(testpath)
assert io.exists(testpath)
assert testpath.exists()
def test_with_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir))
io.open(testpath + 'foo', 'w')
io.open(testpath + 'bar', 'w')
testpath['foo'].open('w')
testpath['bar'].open('w')
assert delete_if_empty(testpath, ['foo', 'bar'])
assert not io.exists(testpath)
assert not testpath.exists()
def test_directory_in_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir))
io.mkdir(testpath + 'foo')
testpath['foo'].mkdir()
assert not delete_if_empty(testpath, ['foo'])
assert io.exists(testpath)
assert testpath.exists()
def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir):
testpath = Path(str(tmpdir))
io.open(testpath + 'foo', 'w')
io.open(testpath + 'bar', 'w')
testpath['foo'].open('w')
testpath['bar'].open('w')
assert not delete_if_empty(testpath, ['foo'])
assert io.exists(testpath)
assert io.exists(testpath + 'foo')
assert testpath.exists()
assert testpath['foo'].exists()
def test_doesnt_exist(self):
# When the 'path' doesn't exist, just do nothing.
@ -251,7 +260,7 @@ class TestCase_delete_if_empty:
def test_is_file(self, tmpdir):
# When 'path' is a file, do nothing.
p = Path(str(tmpdir)) + 'filename'
io.open(p, 'w').close()
p.open('w').close()
delete_if_empty(p) # no crash
def test_ioerror(self, tmpdir, monkeypatch):
@ -259,7 +268,7 @@ class TestCase_delete_if_empty:
def do_raise(*args, **kw):
raise OSError()
monkeypatch.setattr(io, 'rmdir', do_raise)
monkeypatch.setattr(Path, 'rmdir', do_raise)
delete_if_empty(Path(str(tmpdir))) # no crash

View File

@ -15,8 +15,7 @@ import glob
import shutil
from datetime import timedelta
from . import io
from .path import Path
from .path import Path, pathify, log_io_error
def nonone(value, replace_value):
''' Returns value if value is not None. Returns replace_value otherwise.
@ -267,15 +266,19 @@ def iterdaterange(start, end):
#--- Files related
def modified_after(first_path, second_path):
"""Returns True if first_path's mtime is higher than second_path's mtime."""
@pathify
def modified_after(first_path: Path, second_path: Path):
"""Returns ``True`` if first_path's mtime is higher than second_path's mtime.
If one of the files doesn't exist or is ``None``, it is considered "never modified".
"""
try:
first_mtime = io.stat(first_path).st_mtime
except EnvironmentError:
first_mtime = first_path.stat().st_mtime
except (EnvironmentError, AttributeError):
return False
try:
second_mtime = io.stat(second_path).st_mtime
except EnvironmentError:
second_mtime = second_path.stat().st_mtime
except (EnvironmentError, AttributeError):
return True
return first_mtime > second_mtime
@ -292,18 +295,19 @@ def find_in_path(name, paths=None):
return op.join(path, name)
return None
@io.log_io_error
def delete_if_empty(path, files_to_delete=[]):
@log_io_error
@pathify
def delete_if_empty(path: Path, files_to_delete=[]):
''' Deletes the directory at 'path' if it is empty or if it only contains files_to_delete.
'''
if not io.exists(path) or not io.isdir(path):
if not path.exists() or not path.isdir():
return
contents = io.listdir(path)
if any(name for name in contents if (name not in files_to_delete) or io.isdir(path + name)):
contents = path.listdir()
if any(p for p in contents if (p.name not in files_to_delete) or p.isdir()):
return False
for name in contents:
io.remove(path + name)
io.rmdir(path)
for p in contents:
p.remove()
path.rmdir()
return True
def open_if_filename(infile, mode='rb'):
@ -313,7 +317,7 @@ def open_if_filename(infile, mode='rb'):
Returns a tuple (shouldbeclosed,infile) infile is a file object
"""
if isinstance(infile, Path):
return (io.open(infile, mode), True)
return (infile.open(mode), True)
if isinstance(infile, str):
return (open(infile, mode), True)
else: