1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2025-07-05 15:03:20 +00:00

Replaced hscommon.path with pathlib

Removed hscommon.path and hscommon.io and adapted the
rest of hscommon's code to make tests pass.
This commit is contained in:
Virgil Dupras 2013-08-04 18:32:53 -04:00
parent d0a3f081da
commit 160aaaf880
9 changed files with 86 additions and 548 deletions

View File

@ -7,7 +7,7 @@
# http://www.hardcoded.net/licenses/bsd_license # http://www.hardcoded.net/licenses/bsd_license
import re import re
from . import io import shutil
#This matches [123], but not [12] (3 digits being the minimum). #This matches [123], but not [12] (3 digits being the minimum).
#It also matches [1234] [12345] etc.. #It also matches [1234] [12345] etc..
@ -40,23 +40,24 @@ def _smart_move_or_copy(operation, source_path, dest_path):
''' Use move() or copy() to move and copy file with the conflict management, but without the ''' Use move() or copy() to move and copy file with the conflict management, but without the
slowness of the fs system. slowness of the fs system.
''' '''
if io.isdir(dest_path) and not io.isdir(source_path): if dest_path.exists() and dest_path.is_dir() and not source_path.is_dir():
dest_path = dest_path + source_path[-1] dest_path = dest_path[source_path.name]
if io.exists(dest_path): if dest_path.exists():
filename = dest_path[-1] filename = dest_path.name
dest_dir_path = dest_path[:-1] dest_dir_path = dest_path.parent()
newname = get_conflicted_name(io.listdir(dest_dir_path), filename) existing_names = [p.name for p in dest_dir_path.glob('*')]
dest_path = dest_dir_path + newname newname = get_conflicted_name(existing_names, filename)
operation(source_path, dest_path) dest_path = dest_dir_path[newname]
operation(str(source_path), str(dest_path))
def smart_move(source_path, dest_path): def smart_move(source_path, dest_path):
_smart_move_or_copy(io.move, source_path, dest_path) _smart_move_or_copy(shutil.move, source_path, dest_path)
def smart_copy(source_path, dest_path): def smart_copy(source_path, dest_path):
try: try:
_smart_move_or_copy(io.copy, source_path, dest_path) _smart_move_or_copy(shutil.copy, source_path, dest_path)
except IOError as e: except IOError as e:
if e.errno in {21, 13}: # it's a directory, code is 21 on OS X / Linux and 13 on Windows if e.errno in {21, 13}: # it's a directory, code is 21 on OS X / Linux and 13 on Windows
_smart_move_or_copy(io.copytree, source_path, dest_path) _smart_move_or_copy(shutil.copytree, source_path, dest_path)
else: else:
raise raise

View File

@ -11,9 +11,7 @@ import logging
import sqlite3 as sqlite import sqlite3 as sqlite
import threading import threading
from queue import Queue, Empty from queue import Queue, Empty
from pathlib import Path
from . import io
from .path import Path
class Currency: class Currency:
all = [] all = []
@ -310,7 +308,7 @@ class RatesDB:
logging.warning("Corrupt currency database at {0}. Starting over.".format(repr(self.db_or_path))) logging.warning("Corrupt currency database at {0}. Starting over.".format(repr(self.db_or_path)))
if isinstance(self.db_or_path, (str, Path)): if isinstance(self.db_or_path, (str, Path)):
self.con.close() self.con.close()
io.remove(Path(self.db_or_path)) Path(self.db_or_path).unlink()
self.con = sqlite.connect(str(self.db_or_path)) self.con = sqlite.connect(str(self.db_or_path))
else: else:
logging.warning("Can't re-use the file, using a memory table") logging.warning("Can't re-use the file, using a memory table")

View File

@ -1,79 +0,0 @@
# Created By: Virgil Dupras
# Created On: 2007-10-23
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
# HS code should only deal with Path instances, not string paths. One of the annoyances of this
# is to always have to convert Path instances with unicode() when calling open() or listdir() etc..
# this unit takes care of this
import builtins
import os
import os.path
import shutil
import logging
def log_io_error(func):
""" Catches OSError, IOError and WindowsError and log them
"""
def wrapper(path, *args, **kwargs):
try:
return func(path, *args, **kwargs)
except (IOError, OSError) as e:
msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"'
classname = e.__class__.__name__
funcname = func.__name__
logging.warn(msg.format(classname, funcname, str(path), str(e)))
return wrapper
def copy(source_path, dest_path):
return shutil.copy(str(source_path), str(dest_path))
def copytree(source_path, dest_path, *args, **kwargs):
return shutil.copytree(str(source_path), str(dest_path), *args, **kwargs)
def exists(path):
return os.path.exists(str(path))
def isdir(path):
return os.path.isdir(str(path))
def isfile(path):
return os.path.isfile(str(path))
def islink(path):
return os.path.islink(str(path))
def listdir(path):
return os.listdir(str(path))
def mkdir(path, *args, **kwargs):
return os.mkdir(str(path), *args, **kwargs)
def makedirs(path, *args, **kwargs):
return os.makedirs(str(path), *args, **kwargs)
def move(source_path, dest_path):
return shutil.move(str(source_path), str(dest_path))
def open(path, *args, **kwargs):
return builtins.open(str(path), *args, **kwargs)
def remove(path):
return os.remove(str(path))
def rename(source_path, dest_path):
return os.rename(str(source_path), str(dest_path))
def rmdir(path):
return os.rmdir(str(path))
def rmtree(path):
return shutil.rmtree(str(path))
def stat(path):
return os.stat(str(path))

View File

@ -1,184 +0,0 @@
# Created By: Virgil Dupras
# Created On: 2006/02/21
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import logging
import os
import os.path as op
import shutil
import sys
from itertools import takewhile
class Path(tuple):
"""A handy class to work with paths.
path[index] returns a string
path[start:stop] returns a Path
start and stop can be int, but the can also be path instances. When start
or stop are Path like in refpath[p1:p2], it is the same thing as typing
refpath[len(p1):-len(p2)], except that it will only slice out stuff that are
equal. For example, 'a/b/c/d'['a/z':'z/d'] returns 'b/c', not ''.
See the test units for more details.
You can use the + operator, which is the same thing as with tuples, but
returns a Path.
In HS applications, all paths variable should be Path instances. These Path instances should
be converted to str only at the last moment (when it is needed in an external function, such
as os.rename)
"""
# Saves a little bit of memory usage
__slots__ = ()
def __new__(cls, value, separator=None):
def unicode_if_needed(s):
if isinstance(s, str):
return s
else:
try:
return str(s, sys.getfilesystemencoding())
except UnicodeDecodeError:
logging.warning("Could not decode %r", s)
raise
if isinstance(value, Path):
return value
if not separator:
separator = os.sep
if isinstance(value, bytes):
value = unicode_if_needed(value)
if isinstance(value, str):
if value:
if (separator not in value) and ('/' in value):
separator = '/'
value = value.split(separator)
else:
value = ()
else:
if any(isinstance(x, bytes) for x in value):
value = [unicode_if_needed(x) for x in value]
#value is a tuple/list
if any(separator in x for x in value):
#We have a component with a separator in it. Let's rejoin it, and generate another path.
return Path(separator.join(value), separator)
if (len(value) > 1) and (not value[-1]):
value = value[:-1] #We never want a path to end with a '' (because Path() can be called with a trailing slash ending path)
return tuple.__new__(cls, value)
def __add__(self, other):
other = Path(other)
if other and (not other[0]):
other = other[1:]
return Path(tuple.__add__(self, other))
def __contains__(self, item):
if isinstance(item, Path):
return item[:len(self)] == self
else:
return tuple.__contains__(self, item)
def __eq__(self, other):
return tuple.__eq__(self, Path(other))
def __getitem__(self, key):
if isinstance(key, slice):
if isinstance(key.start, Path):
equal_elems = list(takewhile(lambda pair: pair[0] == pair[1], zip(self, key.start)))
key = slice(len(equal_elems), key.stop, key.step)
if isinstance(key.stop, Path):
equal_elems = list(takewhile(lambda pair: pair[0] == pair[1], zip(reversed(self), reversed(key.stop))))
stop = -len(equal_elems) if equal_elems else None
key = slice(key.start, stop, key.step)
return Path(tuple.__getitem__(self, key))
else:
return tuple.__getitem__(self, key)
def __getslice__(self, i, j): #I have to override it because tuple uses it.
return Path(tuple.__getslice__(self, i, j))
def __hash__(self):
return tuple.__hash__(self)
def __ne__(self, other):
return not self.__eq__(other)
def __radd__(self, other):
return Path(other) + self
def __str__(self):
if len(self) == 1:
first = self[0]
if (len(first) == 2) and (first[1] == ':'): #Windows drive letter
return first + '\\'
elif not len(first): #root directory
return '/'
return os.sep.join(self)
def has_drive_letter(self):
if not self:
return False
first = self[0]
return (len(first) == 2) and (first[1] == ':')
def remove_drive_letter(self):
if self.has_drive_letter():
return self[1:]
else:
return self
def tobytes(self):
return str(self).encode(sys.getfilesystemencoding())
# OS method wrappers
def exists(self):
return op.exists(str(self))
def copy(self, dest_path):
return shutil.copy(str(self), str(dest_path))
def copytree(self, dest_path, *args, **kwargs):
return shutil.copytree(str(self), str(dest_path), *args, **kwargs)
def isdir(self):
return op.isdir(str(self))
def isfile(self):
return op.isfile(str(self))
def islink(self):
return op.islink(str(self))
def listdir(self):
return os.listdir(str(self))
def mkdir(self, *args, **kwargs):
return os.mkdir(str(self), *args, **kwargs)
def makedirs(self, *args, **kwargs):
return os.makedirs(str(self), *args, **kwargs)
def move(self, dest_path):
return shutil.move(str(self), str(dest_path))
def open(self, *args, **kwargs):
return open(str(self), *args, **kwargs)
def remove(self):
return os.remove(str(self))
def rename(self, dest_path):
return os.rename(str(self), str(dest_path))
def rmdir(self):
return os.rmdir(str(self))
def rmtree(self):
return shutil.rmtree(str(self))
def stat(self):
return os.stat(str(self))

View File

@ -6,8 +6,8 @@
# which should be included with this package. The terms are also available at # which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license # http://www.hardcoded.net/licenses/bsd_license
from pathlib import Path
from ..conflict import * from ..conflict import *
from ..path import Path
from ..testutil import eq_ from ..testutil import eq_
class TestCase_GetConflictedName: class TestCase_GetConflictedName:
@ -61,44 +61,44 @@ class TestCase_move_copy:
def pytest_funcarg__do_setup(self, request): def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir') tmpdir = request.getfuncargvalue('tmpdir')
self.path = Path(str(tmpdir)) self.path = Path(str(tmpdir))
io.open(self.path + 'foo', 'w').close() self.path['foo'].touch()
io.open(self.path + 'bar', 'w').close() self.path['bar'].touch()
io.mkdir(self.path + 'dir') self.path['dir'].mkdir()
def test_move_no_conflict(self, do_setup): def test_move_no_conflict(self, do_setup):
smart_move(self.path + 'foo', self.path + 'baz') smart_move(self.path['foo'], self.path['baz'])
assert io.exists(self.path + 'baz') assert self.path['baz'].exists()
assert not io.exists(self.path + 'foo') assert not self.path['foo'].exists()
def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move
smart_copy(self.path + 'foo', self.path + 'baz') smart_copy(self.path['foo'], self.path['baz'])
assert io.exists(self.path + 'baz') assert self.path['baz'].exists()
assert io.exists(self.path + 'foo') assert self.path['foo'].exists()
def test_move_no_conflict_dest_is_dir(self, do_setup): def test_move_no_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + 'foo', self.path + 'dir') smart_move(self.path['foo'], self.path['dir'])
assert io.exists(self.path + ('dir', 'foo')) assert self.path['dir/foo'].exists()
assert not io.exists(self.path + 'foo') assert not self.path['foo'].exists()
def test_move_conflict(self, do_setup): def test_move_conflict(self, do_setup):
smart_move(self.path + 'foo', self.path + 'bar') smart_move(self.path['foo'], self.path['bar'])
assert io.exists(self.path + '[000] bar') assert self.path['[000] bar'].exists()
assert not io.exists(self.path + 'foo') assert not self.path['foo'].exists()
def test_move_conflict_dest_is_dir(self, do_setup): def test_move_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + 'foo', self.path + 'dir') smart_move(self.path['foo'], self.path['dir'])
smart_move(self.path + 'bar', self.path + 'foo') smart_move(self.path['bar'], self.path['foo'])
smart_move(self.path + 'foo', self.path + 'dir') smart_move(self.path['foo'], self.path['dir'])
assert io.exists(self.path + ('dir', 'foo')) assert self.path['dir/foo'].exists()
assert io.exists(self.path + ('dir', '[000] foo')) assert self.path['dir/[000] foo'].exists()
assert not io.exists(self.path + 'foo') assert not self.path['foo'].exists()
assert not io.exists(self.path + 'bar') assert not self.path['bar'].exists()
def test_copy_folder(self, tmpdir): def test_copy_folder(self, tmpdir):
# smart_copy also works on folders # smart_copy also works on folders
path = Path(str(tmpdir)) path = Path(str(tmpdir))
io.mkdir(path + 'foo') path['foo'].mkdir()
io.mkdir(path + 'bar') path['bar'].mkdir()
smart_copy(path + 'foo', path + 'bar') # no crash smart_copy(path['foo'], path['bar']) # no crash
assert io.exists(path + '[000] bar') assert path['[000] bar'].exists()

View File

@ -9,7 +9,6 @@
from datetime import date from datetime import date
import sqlite3 as sqlite import sqlite3 as sqlite
from .. import io
from ..testutil import eq_, assert_almost_equal from ..testutil import eq_, assert_almost_equal
from ..currency import Currency, RatesDB, CAD, EUR, USD from ..currency import Currency, RatesDB, CAD, EUR, USD
@ -64,7 +63,7 @@ def test_db_with_connection():
def test_corrupt_db(tmpdir): def test_corrupt_db(tmpdir):
dbpath = str(tmpdir.join('foo.db')) dbpath = str(tmpdir.join('foo.db'))
fh = io.open(dbpath, 'w') fh = open(dbpath, 'w')
fh.write('corrupted') fh.write('corrupted')
fh.close() fh.close()
db = RatesDB(dbpath) # no crash. deletes the old file and start a new db db = RatesDB(dbpath) # no crash. deletes the old file and start a new db

View File

@ -1,209 +0,0 @@
# Created By: Virgil Dupras
# Created On: 2006/02/21
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import sys
from pytest import raises, mark
from ..path import *
from ..testutil import eq_
def pytest_funcarg__force_ossep(request):
monkeypatch = request.getfuncargvalue('monkeypatch')
monkeypatch.setattr(os, 'sep', '/')
def test_empty(force_ossep):
path = Path('')
eq_('',str(path))
eq_(0,len(path))
path = Path(())
eq_('',str(path))
eq_(0,len(path))
def test_single(force_ossep):
path = Path('foobar')
eq_('foobar',path)
eq_(1,len(path))
def test_multiple(force_ossep):
path = Path('foo/bar')
eq_('foo/bar',path)
eq_(2,len(path))
def test_init_with_tuple_and_list(force_ossep):
path = Path(('foo','bar'))
eq_('foo/bar',path)
path = Path(['foo','bar'])
eq_('foo/bar',path)
def test_init_with_invalid_value(force_ossep):
try:
path = Path(42)
self.fail()
except TypeError:
pass
def test_access(force_ossep):
path = Path('foo/bar/bleh')
eq_('foo',path[0])
eq_('foo',path[-3])
eq_('bar',path[1])
eq_('bar',path[-2])
eq_('bleh',path[2])
eq_('bleh',path[-1])
def test_slicing(force_ossep):
path = Path('foo/bar/bleh')
subpath = path[:2]
eq_('foo/bar',subpath)
assert isinstance(subpath,Path)
def test_deal_with_empty_components(force_ossep):
"""Keep ONLY a leading space, which means we want a leading slash.
"""
eq_('foo//bar',str(Path(('foo','','bar'))))
eq_('/foo/bar',str(Path(('','foo','bar'))))
eq_('foo/bar',str(Path('foo/bar/')))
def test_old_compare_paths(force_ossep):
eq_(Path('foobar'),Path('foobar'))
eq_(Path('foobar/'),Path('foobar\\','\\'))
eq_(Path('/foobar/'),Path('\\foobar\\','\\'))
eq_(Path('/foo/bar'),Path('\\foo\\bar','\\'))
eq_(Path('/foo/bar'),Path('\\foo\\bar\\','\\'))
assert Path('/foo/bar') != Path('\\foo\\foo','\\')
#We also have to test __ne__
assert not (Path('foobar') != Path('foobar'))
assert Path('/a/b/c.x') != Path('/a/b/c.y')
def test_old_split_path(force_ossep):
eq_(Path('foobar'),('foobar',))
eq_(Path('foo/bar'),('foo','bar'))
eq_(Path('/foo/bar/'),('','foo','bar'))
eq_(Path('\\foo\\bar','\\'),('','foo','bar'))
def test_representation(force_ossep):
eq_("('foo', 'bar')",repr(Path(('foo','bar'))))
def test_add(force_ossep):
eq_('foo/bar/bar/foo',Path(('foo','bar')) + Path('bar/foo'))
eq_('foo/bar/bar/foo',Path('foo/bar') + 'bar/foo')
eq_('foo/bar/bar/foo',Path('foo/bar') + ('bar','foo'))
eq_('foo/bar/bar/foo',('foo','bar') + Path('bar/foo'))
eq_('foo/bar/bar/foo','foo/bar' + Path('bar/foo'))
#Invalid concatenation
try:
Path(('foo','bar')) + 1
self.fail()
except TypeError:
pass
def test_path_slice(force_ossep):
foo = Path('foo')
bar = Path('bar')
foobar = Path('foo/bar')
eq_('bar',foobar[foo:])
eq_('foo',foobar[:bar])
eq_('foo/bar',foobar[bar:])
eq_('foo/bar',foobar[:foo])
eq_((),foobar[foobar:])
eq_((),foobar[:foobar])
abcd = Path('a/b/c/d')
a = Path('a')
b = Path('b')
c = Path('c')
d = Path('d')
z = Path('z')
eq_('b/c',abcd[a:d])
eq_('b/c/d',abcd[a:d+z])
eq_('b/c',abcd[a:z+d])
eq_('a/b/c/d',abcd[:z])
def test_add_with_root_path(force_ossep):
"""if I perform /a/b/c + /d/e/f, I want /a/b/c/d/e/f, not /a/b/c//d/e/f
"""
eq_('/foo/bar',str(Path('/foo') + Path('/bar')))
def test_create_with_tuple_that_have_slash_inside(force_ossep, monkeypatch):
eq_(('','foo','bar'), Path(('/foo','bar')))
monkeypatch.setattr(os, 'sep', '\\')
eq_(('','foo','bar'), Path(('\\foo','bar')))
def test_auto_decode_os_sep(force_ossep, monkeypatch):
"""Path should decode any either / or os.sep, but always encode in os.sep.
"""
eq_(('foo\\bar','bleh'),Path('foo\\bar/bleh'))
monkeypatch.setattr(os, 'sep', '\\')
eq_(('foo','bar/bleh'),Path('foo\\bar/bleh'))
path = Path('foo/bar')
eq_(('foo','bar'),path)
eq_('foo\\bar',str(path))
def test_contains(force_ossep):
p = Path(('foo','bar'))
assert Path(('foo','bar','bleh')) in p
assert Path(('foo','bar')) in p
assert 'foo' in p
assert 'bleh' not in p
assert Path('foo') not in p
def test_windows_drive_letter(force_ossep):
p = Path(('c:',))
eq_('c:\\',str(p))
def test_root_path(force_ossep):
p = Path('/')
eq_('/',str(p))
def test_str_encodes_unicode_to_getfilesystemencoding(force_ossep):
p = Path(('foo','bar\u00e9'))
eq_('foo/bar\u00e9'.encode(sys.getfilesystemencoding()), p.tobytes())
def test_unicode(force_ossep):
p = Path(('foo','bar\u00e9'))
eq_('foo/bar\u00e9',str(p))
def test_str_repr_of_mix_between_non_ascii_str_and_unicode(force_ossep):
u = 'foo\u00e9'
encoded = u.encode(sys.getfilesystemencoding())
p = Path((encoded,'bar'))
print(repr(tuple(p)))
eq_('foo\u00e9/bar'.encode(sys.getfilesystemencoding()), p.tobytes())
def test_Path_of_a_Path_returns_self(force_ossep):
#if Path() is called with a path as value, just return value.
p = Path('foo/bar')
assert Path(p) is p
@mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate")
def test_log_unicode_errors(force_ossep, monkeypatch, capsys):
# When an there's a UnicodeDecodeError on path creation, log it so it can be possible
# to debug the cause of it.
monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: 'ascii')
with raises(UnicodeDecodeError):
Path(['', b'foo\xe9'])
out, err = capsys.readouterr()
assert repr(b'foo\xe9') in err
def test_has_drive_letter(monkeypatch):
monkeypatch.setattr(os, 'sep', '\\')
p = Path('foo\\bar')
assert not p.has_drive_letter()
p = Path('C:\\')
assert p.has_drive_letter()
p = Path('z:\\foo')
assert p.has_drive_letter()
def test_remove_drive_letter(monkeypatch):
monkeypatch.setattr(os, 'sep', '\\')
p = Path('foo\\bar')
eq_(p.remove_drive_letter(), Path('foo\\bar'))
p = Path('C:\\')
eq_(p.remove_drive_letter(), Path(''))
p = Path('z:\\foo')
eq_(p.remove_drive_letter(), Path('foo'))

View File

@ -9,10 +9,9 @@
from io import StringIO from io import StringIO
from pytest import raises from pytest import raises
from pathlib import Path
from ..testutil import eq_ from ..testutil import eq_
from .. import io
from ..path import Path
from ..util import * from ..util import *
def test_nonone(): def test_nonone():
@ -215,34 +214,34 @@ class TestCase_delete_if_empty:
def test_is_empty(self, tmpdir): def test_is_empty(self, tmpdir):
testpath = Path(str(tmpdir)) testpath = Path(str(tmpdir))
assert delete_if_empty(testpath) assert delete_if_empty(testpath)
assert not io.exists(testpath) assert not testpath.exists()
def test_not_empty(self, tmpdir): def test_not_empty(self, tmpdir):
testpath = Path(str(tmpdir)) testpath = Path(str(tmpdir))
io.mkdir(testpath + 'foo') testpath['foo'].mkdir()
assert not delete_if_empty(testpath) assert not delete_if_empty(testpath)
assert io.exists(testpath) assert testpath.exists()
def test_with_files_to_delete(self, tmpdir): def test_with_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir)) testpath = Path(str(tmpdir))
io.open(testpath + 'foo', 'w') testpath['foo'].touch()
io.open(testpath + 'bar', 'w') testpath['bar'].touch()
assert delete_if_empty(testpath, ['foo', 'bar']) assert delete_if_empty(testpath, ['foo', 'bar'])
assert not io.exists(testpath) assert not testpath.exists()
def test_directory_in_files_to_delete(self, tmpdir): def test_directory_in_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir)) testpath = Path(str(tmpdir))
io.mkdir(testpath + 'foo') testpath['foo'].mkdir()
assert not delete_if_empty(testpath, ['foo']) assert not delete_if_empty(testpath, ['foo'])
assert io.exists(testpath) assert testpath.exists()
def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir): def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir):
testpath = Path(str(tmpdir)) testpath = Path(str(tmpdir))
io.open(testpath + 'foo', 'w') testpath['foo'].touch()
io.open(testpath + 'bar', 'w') testpath['bar'].touch()
assert not delete_if_empty(testpath, ['foo']) assert not delete_if_empty(testpath, ['foo'])
assert io.exists(testpath) assert testpath.exists()
assert io.exists(testpath + 'foo') assert testpath['foo'].exists()
def test_doesnt_exist(self): def test_doesnt_exist(self):
# When the 'path' doesn't exist, just do nothing. # When the 'path' doesn't exist, just do nothing.
@ -250,8 +249,8 @@ class TestCase_delete_if_empty:
def test_is_file(self, tmpdir): def test_is_file(self, tmpdir):
# When 'path' is a file, do nothing. # When 'path' is a file, do nothing.
p = Path(str(tmpdir)) + 'filename' p = Path(str(tmpdir))['filename']
io.open(p, 'w').close() p.touch()
delete_if_empty(p) # no crash delete_if_empty(p) # no crash
def test_ioerror(self, tmpdir, monkeypatch): def test_ioerror(self, tmpdir, monkeypatch):
@ -259,7 +258,7 @@ class TestCase_delete_if_empty:
def do_raise(*args, **kw): def do_raise(*args, **kw):
raise OSError() raise OSError()
monkeypatch.setattr(io, 'rmdir', do_raise) monkeypatch.setattr(Path, 'rmdir', do_raise)
delete_if_empty(Path(str(tmpdir))) # no crash delete_if_empty(Path(str(tmpdir))) # no crash

View File

@ -13,9 +13,8 @@ import re
from math import ceil from math import ceil
import glob import glob
import shutil import shutil
from pathlib import Path
from . import io import logging
from .path import Path
def nonone(value, replace_value): def nonone(value, replace_value):
''' Returns value if value is not None. Returns replace_value otherwise. ''' Returns value if value is not None. Returns replace_value otherwise.
@ -256,14 +255,28 @@ def multi_replace(s, replace_from, replace_to=''):
#--- Files related #--- Files related
def log_io_error(func):
"""Catches OSError, IOError and WindowsError and log them
"""
def wrapper(path, *args, **kwargs):
try:
return func(path, *args, **kwargs)
except (IOError, OSError) as e:
msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"'
classname = e.__class__.__name__
funcname = func.__name__
logging.warn(msg.format(classname, funcname, str(path), str(e)))
return wrapper
def modified_after(first_path, second_path): def modified_after(first_path, second_path):
"""Returns True if first_path's mtime is higher than second_path's mtime.""" """Returns True if first_path's mtime is higher than second_path's mtime."""
try: try:
first_mtime = io.stat(first_path).st_mtime first_mtime = os.stat(str(first_path)).st_mtime
except EnvironmentError: except EnvironmentError:
return False return False
try: try:
second_mtime = io.stat(second_path).st_mtime second_mtime = os.stat(str(second_path)).st_mtime
except EnvironmentError: except EnvironmentError:
return True return True
return first_mtime > second_mtime return first_mtime > second_mtime
@ -281,18 +294,18 @@ def find_in_path(name, paths=None):
return op.join(path, name) return op.join(path, name)
return None return None
@io.log_io_error @log_io_error
def delete_if_empty(path, files_to_delete=[]): def delete_if_empty(path, files_to_delete=[]):
''' Deletes the directory at 'path' if it is empty or if it only contains files_to_delete. ''' Deletes the directory at 'path' if it is empty or if it only contains files_to_delete.
''' '''
if not io.exists(path) or not io.isdir(path): if not path.exists() or not path.is_dir():
return return
contents = io.listdir(path) contents = list(path.glob('*'))
if any(name for name in contents if (name not in files_to_delete) or io.isdir(path + name)): if any(p for p in contents if (p.name not in files_to_delete) or p.is_dir()):
return False return False
for name in contents: for p in contents:
io.remove(path + name) p.unlink()
io.rmdir(path) path.rmdir()
return True return True
def open_if_filename(infile, mode='rb'): def open_if_filename(infile, mode='rb'):
@ -302,7 +315,7 @@ def open_if_filename(infile, mode='rb'):
Returns a tuple (shouldbeclosed,infile) infile is a file object Returns a tuple (shouldbeclosed,infile) infile is a file object
""" """
if isinstance(infile, Path): if isinstance(infile, Path):
return (io.open(infile, mode), True) return (infile.open(mode=mode), True)
if isinstance(infile, str): if isinstance(infile, str):
return (open(infile, mode), True) return (open(infile, mode), True)
else: else: