From 160aaaf8804c9ffdd4f414e6ecaa308cc975a8ad Mon Sep 17 00:00:00 2001 From: Virgil Dupras Date: Sun, 4 Aug 2013 18:32:53 -0400 Subject: [PATCH] Replaced hscommon.path with pathlib Removed hscommon.path and hscommon.io and adapted the rest of hscommon's code to make tests pass. --- hscommon/conflict.py | 27 +++-- hscommon/currency.py | 6 +- hscommon/io.py | 79 ------------ hscommon/path.py | 184 ---------------------------- hscommon/tests/conflict_test.py | 54 ++++----- hscommon/tests/currency_test.py | 3 +- hscommon/tests/path_test.py | 209 -------------------------------- hscommon/tests/util_test.py | 33 +++-- hscommon/util.py | 39 ++++-- 9 files changed, 86 insertions(+), 548 deletions(-) delete mode 100644 hscommon/io.py delete mode 100755 hscommon/path.py delete mode 100644 hscommon/tests/path_test.py diff --git a/hscommon/conflict.py b/hscommon/conflict.py index 7eeb76cb..f2e4d4f6 100644 --- a/hscommon/conflict.py +++ b/hscommon/conflict.py @@ -7,7 +7,7 @@ # http://www.hardcoded.net/licenses/bsd_license import re -from . import io +import shutil #This matches [123], but not [12] (3 digits being the minimum). #It also matches [1234] [12345] etc.. @@ -40,23 +40,24 @@ def _smart_move_or_copy(operation, source_path, dest_path): ''' Use move() or copy() to move and copy file with the conflict management, but without the slowness of the fs system. ''' - if io.isdir(dest_path) and not io.isdir(source_path): - dest_path = dest_path + source_path[-1] - if io.exists(dest_path): - filename = dest_path[-1] - dest_dir_path = dest_path[:-1] - newname = get_conflicted_name(io.listdir(dest_dir_path), filename) - dest_path = dest_dir_path + newname - operation(source_path, dest_path) - + if dest_path.exists() and dest_path.is_dir() and not source_path.is_dir(): + dest_path = dest_path[source_path.name] + if dest_path.exists(): + filename = dest_path.name + dest_dir_path = dest_path.parent() + existing_names = [p.name for p in dest_dir_path.glob('*')] + newname = get_conflicted_name(existing_names, filename) + dest_path = dest_dir_path[newname] + operation(str(source_path), str(dest_path)) + def smart_move(source_path, dest_path): - _smart_move_or_copy(io.move, source_path, dest_path) + _smart_move_or_copy(shutil.move, source_path, dest_path) def smart_copy(source_path, dest_path): try: - _smart_move_or_copy(io.copy, source_path, dest_path) + _smart_move_or_copy(shutil.copy, source_path, dest_path) except IOError as e: if e.errno in {21, 13}: # it's a directory, code is 21 on OS X / Linux and 13 on Windows - _smart_move_or_copy(io.copytree, source_path, dest_path) + _smart_move_or_copy(shutil.copytree, source_path, dest_path) else: raise \ No newline at end of file diff --git a/hscommon/currency.py b/hscommon/currency.py index 9ffcd313..266cd716 100644 --- a/hscommon/currency.py +++ b/hscommon/currency.py @@ -11,9 +11,7 @@ import logging import sqlite3 as sqlite import threading from queue import Queue, Empty - -from . import io -from .path import Path +from pathlib import Path class Currency: all = [] @@ -310,7 +308,7 @@ class RatesDB: logging.warning("Corrupt currency database at {0}. Starting over.".format(repr(self.db_or_path))) if isinstance(self.db_or_path, (str, Path)): self.con.close() - io.remove(Path(self.db_or_path)) + Path(self.db_or_path).unlink() self.con = sqlite.connect(str(self.db_or_path)) else: logging.warning("Can't re-use the file, using a memory table") diff --git a/hscommon/io.py b/hscommon/io.py deleted file mode 100644 index dd0fb4f4..00000000 --- a/hscommon/io.py +++ /dev/null @@ -1,79 +0,0 @@ -# Created By: Virgil Dupras -# Created On: 2007-10-23 -# Copyright 2013 Hardcoded Software (http://www.hardcoded.net) - -# This software is licensed under the "BSD" License as described in the "LICENSE" file, -# which should be included with this package. The terms are also available at -# http://www.hardcoded.net/licenses/bsd_license - -# HS code should only deal with Path instances, not string paths. One of the annoyances of this -# is to always have to convert Path instances with unicode() when calling open() or listdir() etc.. -# this unit takes care of this - -import builtins -import os -import os.path -import shutil -import logging - -def log_io_error(func): - """ Catches OSError, IOError and WindowsError and log them - """ - def wrapper(path, *args, **kwargs): - try: - return func(path, *args, **kwargs) - except (IOError, OSError) as e: - msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"' - classname = e.__class__.__name__ - funcname = func.__name__ - logging.warn(msg.format(classname, funcname, str(path), str(e))) - - return wrapper - -def copy(source_path, dest_path): - return shutil.copy(str(source_path), str(dest_path)) - -def copytree(source_path, dest_path, *args, **kwargs): - return shutil.copytree(str(source_path), str(dest_path), *args, **kwargs) - -def exists(path): - return os.path.exists(str(path)) - -def isdir(path): - return os.path.isdir(str(path)) - -def isfile(path): - return os.path.isfile(str(path)) - -def islink(path): - return os.path.islink(str(path)) - -def listdir(path): - return os.listdir(str(path)) - -def mkdir(path, *args, **kwargs): - return os.mkdir(str(path), *args, **kwargs) - -def makedirs(path, *args, **kwargs): - return os.makedirs(str(path), *args, **kwargs) - -def move(source_path, dest_path): - return shutil.move(str(source_path), str(dest_path)) - -def open(path, *args, **kwargs): - return builtins.open(str(path), *args, **kwargs) - -def remove(path): - return os.remove(str(path)) - -def rename(source_path, dest_path): - return os.rename(str(source_path), str(dest_path)) - -def rmdir(path): - return os.rmdir(str(path)) - -def rmtree(path): - return shutil.rmtree(str(path)) - -def stat(path): - return os.stat(str(path)) diff --git a/hscommon/path.py b/hscommon/path.py deleted file mode 100755 index a6f1675a..00000000 --- a/hscommon/path.py +++ /dev/null @@ -1,184 +0,0 @@ -# Created By: Virgil Dupras -# Created On: 2006/02/21 -# Copyright 2013 Hardcoded Software (http://www.hardcoded.net) - -# This software is licensed under the "BSD" License as described in the "LICENSE" file, -# which should be included with this package. The terms are also available at -# http://www.hardcoded.net/licenses/bsd_license - -import logging -import os -import os.path as op -import shutil -import sys -from itertools import takewhile - -class Path(tuple): - """A handy class to work with paths. - - path[index] returns a string - path[start:stop] returns a Path - start and stop can be int, but the can also be path instances. When start - or stop are Path like in refpath[p1:p2], it is the same thing as typing - refpath[len(p1):-len(p2)], except that it will only slice out stuff that are - equal. For example, 'a/b/c/d'['a/z':'z/d'] returns 'b/c', not ''. - See the test units for more details. - - You can use the + operator, which is the same thing as with tuples, but - returns a Path. - - In HS applications, all paths variable should be Path instances. These Path instances should - be converted to str only at the last moment (when it is needed in an external function, such - as os.rename) - """ - # Saves a little bit of memory usage - __slots__ = () - - def __new__(cls, value, separator=None): - def unicode_if_needed(s): - if isinstance(s, str): - return s - else: - try: - return str(s, sys.getfilesystemencoding()) - except UnicodeDecodeError: - logging.warning("Could not decode %r", s) - raise - - if isinstance(value, Path): - return value - if not separator: - separator = os.sep - if isinstance(value, bytes): - value = unicode_if_needed(value) - if isinstance(value, str): - if value: - if (separator not in value) and ('/' in value): - separator = '/' - value = value.split(separator) - else: - value = () - else: - if any(isinstance(x, bytes) for x in value): - value = [unicode_if_needed(x) for x in value] - #value is a tuple/list - if any(separator in x for x in value): - #We have a component with a separator in it. Let's rejoin it, and generate another path. - return Path(separator.join(value), separator) - if (len(value) > 1) and (not value[-1]): - value = value[:-1] #We never want a path to end with a '' (because Path() can be called with a trailing slash ending path) - return tuple.__new__(cls, value) - - def __add__(self, other): - other = Path(other) - if other and (not other[0]): - other = other[1:] - return Path(tuple.__add__(self, other)) - - def __contains__(self, item): - if isinstance(item, Path): - return item[:len(self)] == self - else: - return tuple.__contains__(self, item) - - def __eq__(self, other): - return tuple.__eq__(self, Path(other)) - - def __getitem__(self, key): - if isinstance(key, slice): - if isinstance(key.start, Path): - equal_elems = list(takewhile(lambda pair: pair[0] == pair[1], zip(self, key.start))) - key = slice(len(equal_elems), key.stop, key.step) - if isinstance(key.stop, Path): - equal_elems = list(takewhile(lambda pair: pair[0] == pair[1], zip(reversed(self), reversed(key.stop)))) - stop = -len(equal_elems) if equal_elems else None - key = slice(key.start, stop, key.step) - return Path(tuple.__getitem__(self, key)) - else: - return tuple.__getitem__(self, key) - - def __getslice__(self, i, j): #I have to override it because tuple uses it. - return Path(tuple.__getslice__(self, i, j)) - - def __hash__(self): - return tuple.__hash__(self) - - def __ne__(self, other): - return not self.__eq__(other) - - def __radd__(self, other): - return Path(other) + self - - def __str__(self): - if len(self) == 1: - first = self[0] - if (len(first) == 2) and (first[1] == ':'): #Windows drive letter - return first + '\\' - elif not len(first): #root directory - return '/' - return os.sep.join(self) - - def has_drive_letter(self): - if not self: - return False - first = self[0] - return (len(first) == 2) and (first[1] == ':') - - def remove_drive_letter(self): - if self.has_drive_letter(): - return self[1:] - else: - return self - - def tobytes(self): - return str(self).encode(sys.getfilesystemencoding()) - - # OS method wrappers - def exists(self): - return op.exists(str(self)) - - def copy(self, dest_path): - return shutil.copy(str(self), str(dest_path)) - - def copytree(self, dest_path, *args, **kwargs): - return shutil.copytree(str(self), str(dest_path), *args, **kwargs) - - def isdir(self): - return op.isdir(str(self)) - - def isfile(self): - return op.isfile(str(self)) - - def islink(self): - return op.islink(str(self)) - - def listdir(self): - return os.listdir(str(self)) - - def mkdir(self, *args, **kwargs): - return os.mkdir(str(self), *args, **kwargs) - - def makedirs(self, *args, **kwargs): - return os.makedirs(str(self), *args, **kwargs) - - def move(self, dest_path): - return shutil.move(str(self), str(dest_path)) - - def open(self, *args, **kwargs): - return open(str(self), *args, **kwargs) - - def remove(self): - return os.remove(str(self)) - - def rename(self, dest_path): - return os.rename(str(self), str(dest_path)) - - def rmdir(self): - return os.rmdir(str(self)) - - def rmtree(self): - return shutil.rmtree(str(self)) - - def stat(self): - return os.stat(str(self)) - diff --git a/hscommon/tests/conflict_test.py b/hscommon/tests/conflict_test.py index 825c48a2..c6a0eb33 100644 --- a/hscommon/tests/conflict_test.py +++ b/hscommon/tests/conflict_test.py @@ -6,8 +6,8 @@ # which should be included with this package. The terms are also available at # http://www.hardcoded.net/licenses/bsd_license +from pathlib import Path from ..conflict import * -from ..path import Path from ..testutil import eq_ class TestCase_GetConflictedName: @@ -61,44 +61,44 @@ class TestCase_move_copy: def pytest_funcarg__do_setup(self, request): tmpdir = request.getfuncargvalue('tmpdir') self.path = Path(str(tmpdir)) - io.open(self.path + 'foo', 'w').close() - io.open(self.path + 'bar', 'w').close() - io.mkdir(self.path + 'dir') + self.path['foo'].touch() + self.path['bar'].touch() + self.path['dir'].mkdir() def test_move_no_conflict(self, do_setup): - smart_move(self.path + 'foo', self.path + 'baz') - assert io.exists(self.path + 'baz') - assert not io.exists(self.path + 'foo') + smart_move(self.path['foo'], self.path['baz']) + assert self.path['baz'].exists() + assert not self.path['foo'].exists() def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move - smart_copy(self.path + 'foo', self.path + 'baz') - assert io.exists(self.path + 'baz') - assert io.exists(self.path + 'foo') + smart_copy(self.path['foo'], self.path['baz']) + assert self.path['baz'].exists() + assert self.path['foo'].exists() def test_move_no_conflict_dest_is_dir(self, do_setup): - smart_move(self.path + 'foo', self.path + 'dir') - assert io.exists(self.path + ('dir', 'foo')) - assert not io.exists(self.path + 'foo') + smart_move(self.path['foo'], self.path['dir']) + assert self.path['dir/foo'].exists() + assert not self.path['foo'].exists() def test_move_conflict(self, do_setup): - smart_move(self.path + 'foo', self.path + 'bar') - assert io.exists(self.path + '[000] bar') - assert not io.exists(self.path + 'foo') + smart_move(self.path['foo'], self.path['bar']) + assert self.path['[000] bar'].exists() + assert not self.path['foo'].exists() def test_move_conflict_dest_is_dir(self, do_setup): - smart_move(self.path + 'foo', self.path + 'dir') - smart_move(self.path + 'bar', self.path + 'foo') - smart_move(self.path + 'foo', self.path + 'dir') - assert io.exists(self.path + ('dir', 'foo')) - assert io.exists(self.path + ('dir', '[000] foo')) - assert not io.exists(self.path + 'foo') - assert not io.exists(self.path + 'bar') + smart_move(self.path['foo'], self.path['dir']) + smart_move(self.path['bar'], self.path['foo']) + smart_move(self.path['foo'], self.path['dir']) + assert self.path['dir/foo'].exists() + assert self.path['dir/[000] foo'].exists() + assert not self.path['foo'].exists() + assert not self.path['bar'].exists() def test_copy_folder(self, tmpdir): # smart_copy also works on folders path = Path(str(tmpdir)) - io.mkdir(path + 'foo') - io.mkdir(path + 'bar') - smart_copy(path + 'foo', path + 'bar') # no crash - assert io.exists(path + '[000] bar') + path['foo'].mkdir() + path['bar'].mkdir() + smart_copy(path['foo'], path['bar']) # no crash + assert path['[000] bar'].exists() diff --git a/hscommon/tests/currency_test.py b/hscommon/tests/currency_test.py index 5c940a01..7b1c5fb3 100644 --- a/hscommon/tests/currency_test.py +++ b/hscommon/tests/currency_test.py @@ -9,7 +9,6 @@ from datetime import date import sqlite3 as sqlite -from .. import io from ..testutil import eq_, assert_almost_equal from ..currency import Currency, RatesDB, CAD, EUR, USD @@ -64,7 +63,7 @@ def test_db_with_connection(): def test_corrupt_db(tmpdir): dbpath = str(tmpdir.join('foo.db')) - fh = io.open(dbpath, 'w') + fh = open(dbpath, 'w') fh.write('corrupted') fh.close() db = RatesDB(dbpath) # no crash. deletes the old file and start a new db diff --git a/hscommon/tests/path_test.py b/hscommon/tests/path_test.py deleted file mode 100644 index 75c41665..00000000 --- a/hscommon/tests/path_test.py +++ /dev/null @@ -1,209 +0,0 @@ -# Created By: Virgil Dupras -# Created On: 2006/02/21 -# Copyright 2013 Hardcoded Software (http://www.hardcoded.net) - -# This software is licensed under the "BSD" License as described in the "LICENSE" file, -# which should be included with this package. The terms are also available at -# http://www.hardcoded.net/licenses/bsd_license - -import sys - -from pytest import raises, mark - -from ..path import * -from ..testutil import eq_ - -def pytest_funcarg__force_ossep(request): - monkeypatch = request.getfuncargvalue('monkeypatch') - monkeypatch.setattr(os, 'sep', '/') - -def test_empty(force_ossep): - path = Path('') - eq_('',str(path)) - eq_(0,len(path)) - path = Path(()) - eq_('',str(path)) - eq_(0,len(path)) - -def test_single(force_ossep): - path = Path('foobar') - eq_('foobar',path) - eq_(1,len(path)) - -def test_multiple(force_ossep): - path = Path('foo/bar') - eq_('foo/bar',path) - eq_(2,len(path)) - -def test_init_with_tuple_and_list(force_ossep): - path = Path(('foo','bar')) - eq_('foo/bar',path) - path = Path(['foo','bar']) - eq_('foo/bar',path) - -def test_init_with_invalid_value(force_ossep): - try: - path = Path(42) - self.fail() - except TypeError: - pass - -def test_access(force_ossep): - path = Path('foo/bar/bleh') - eq_('foo',path[0]) - eq_('foo',path[-3]) - eq_('bar',path[1]) - eq_('bar',path[-2]) - eq_('bleh',path[2]) - eq_('bleh',path[-1]) - -def test_slicing(force_ossep): - path = Path('foo/bar/bleh') - subpath = path[:2] - eq_('foo/bar',subpath) - assert isinstance(subpath,Path) - -def test_deal_with_empty_components(force_ossep): - """Keep ONLY a leading space, which means we want a leading slash. - """ - eq_('foo//bar',str(Path(('foo','','bar')))) - eq_('/foo/bar',str(Path(('','foo','bar')))) - eq_('foo/bar',str(Path('foo/bar/'))) - -def test_old_compare_paths(force_ossep): - eq_(Path('foobar'),Path('foobar')) - eq_(Path('foobar/'),Path('foobar\\','\\')) - eq_(Path('/foobar/'),Path('\\foobar\\','\\')) - eq_(Path('/foo/bar'),Path('\\foo\\bar','\\')) - eq_(Path('/foo/bar'),Path('\\foo\\bar\\','\\')) - assert Path('/foo/bar') != Path('\\foo\\foo','\\') - #We also have to test __ne__ - assert not (Path('foobar') != Path('foobar')) - assert Path('/a/b/c.x') != Path('/a/b/c.y') - -def test_old_split_path(force_ossep): - eq_(Path('foobar'),('foobar',)) - eq_(Path('foo/bar'),('foo','bar')) - eq_(Path('/foo/bar/'),('','foo','bar')) - eq_(Path('\\foo\\bar','\\'),('','foo','bar')) - -def test_representation(force_ossep): - eq_("('foo', 'bar')",repr(Path(('foo','bar')))) - -def test_add(force_ossep): - eq_('foo/bar/bar/foo',Path(('foo','bar')) + Path('bar/foo')) - eq_('foo/bar/bar/foo',Path('foo/bar') + 'bar/foo') - eq_('foo/bar/bar/foo',Path('foo/bar') + ('bar','foo')) - eq_('foo/bar/bar/foo',('foo','bar') + Path('bar/foo')) - eq_('foo/bar/bar/foo','foo/bar' + Path('bar/foo')) - #Invalid concatenation - try: - Path(('foo','bar')) + 1 - self.fail() - except TypeError: - pass - -def test_path_slice(force_ossep): - foo = Path('foo') - bar = Path('bar') - foobar = Path('foo/bar') - eq_('bar',foobar[foo:]) - eq_('foo',foobar[:bar]) - eq_('foo/bar',foobar[bar:]) - eq_('foo/bar',foobar[:foo]) - eq_((),foobar[foobar:]) - eq_((),foobar[:foobar]) - abcd = Path('a/b/c/d') - a = Path('a') - b = Path('b') - c = Path('c') - d = Path('d') - z = Path('z') - eq_('b/c',abcd[a:d]) - eq_('b/c/d',abcd[a:d+z]) - eq_('b/c',abcd[a:z+d]) - eq_('a/b/c/d',abcd[:z]) - -def test_add_with_root_path(force_ossep): - """if I perform /a/b/c + /d/e/f, I want /a/b/c/d/e/f, not /a/b/c//d/e/f - """ - eq_('/foo/bar',str(Path('/foo') + Path('/bar'))) - -def test_create_with_tuple_that_have_slash_inside(force_ossep, monkeypatch): - eq_(('','foo','bar'), Path(('/foo','bar'))) - monkeypatch.setattr(os, 'sep', '\\') - eq_(('','foo','bar'), Path(('\\foo','bar'))) - -def test_auto_decode_os_sep(force_ossep, monkeypatch): - """Path should decode any either / or os.sep, but always encode in os.sep. - """ - eq_(('foo\\bar','bleh'),Path('foo\\bar/bleh')) - monkeypatch.setattr(os, 'sep', '\\') - eq_(('foo','bar/bleh'),Path('foo\\bar/bleh')) - path = Path('foo/bar') - eq_(('foo','bar'),path) - eq_('foo\\bar',str(path)) - -def test_contains(force_ossep): - p = Path(('foo','bar')) - assert Path(('foo','bar','bleh')) in p - assert Path(('foo','bar')) in p - assert 'foo' in p - assert 'bleh' not in p - assert Path('foo') not in p - -def test_windows_drive_letter(force_ossep): - p = Path(('c:',)) - eq_('c:\\',str(p)) - -def test_root_path(force_ossep): - p = Path('/') - eq_('/',str(p)) - -def test_str_encodes_unicode_to_getfilesystemencoding(force_ossep): - p = Path(('foo','bar\u00e9')) - eq_('foo/bar\u00e9'.encode(sys.getfilesystemencoding()), p.tobytes()) - -def test_unicode(force_ossep): - p = Path(('foo','bar\u00e9')) - eq_('foo/bar\u00e9',str(p)) - -def test_str_repr_of_mix_between_non_ascii_str_and_unicode(force_ossep): - u = 'foo\u00e9' - encoded = u.encode(sys.getfilesystemencoding()) - p = Path((encoded,'bar')) - print(repr(tuple(p))) - eq_('foo\u00e9/bar'.encode(sys.getfilesystemencoding()), p.tobytes()) - -def test_Path_of_a_Path_returns_self(force_ossep): - #if Path() is called with a path as value, just return value. - p = Path('foo/bar') - assert Path(p) is p - -@mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate") -def test_log_unicode_errors(force_ossep, monkeypatch, capsys): - # When an there's a UnicodeDecodeError on path creation, log it so it can be possible - # to debug the cause of it. - monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: 'ascii') - with raises(UnicodeDecodeError): - Path(['', b'foo\xe9']) - out, err = capsys.readouterr() - assert repr(b'foo\xe9') in err - -def test_has_drive_letter(monkeypatch): - monkeypatch.setattr(os, 'sep', '\\') - p = Path('foo\\bar') - assert not p.has_drive_letter() - p = Path('C:\\') - assert p.has_drive_letter() - p = Path('z:\\foo') - assert p.has_drive_letter() - -def test_remove_drive_letter(monkeypatch): - monkeypatch.setattr(os, 'sep', '\\') - p = Path('foo\\bar') - eq_(p.remove_drive_letter(), Path('foo\\bar')) - p = Path('C:\\') - eq_(p.remove_drive_letter(), Path('')) - p = Path('z:\\foo') - eq_(p.remove_drive_letter(), Path('foo')) \ No newline at end of file diff --git a/hscommon/tests/util_test.py b/hscommon/tests/util_test.py index 4d9967fd..e5d294ad 100644 --- a/hscommon/tests/util_test.py +++ b/hscommon/tests/util_test.py @@ -9,10 +9,9 @@ from io import StringIO from pytest import raises +from pathlib import Path from ..testutil import eq_ -from .. import io -from ..path import Path from ..util import * def test_nonone(): @@ -215,34 +214,34 @@ class TestCase_delete_if_empty: def test_is_empty(self, tmpdir): testpath = Path(str(tmpdir)) assert delete_if_empty(testpath) - assert not io.exists(testpath) + assert not testpath.exists() def test_not_empty(self, tmpdir): testpath = Path(str(tmpdir)) - io.mkdir(testpath + 'foo') + testpath['foo'].mkdir() assert not delete_if_empty(testpath) - assert io.exists(testpath) + assert testpath.exists() def test_with_files_to_delete(self, tmpdir): testpath = Path(str(tmpdir)) - io.open(testpath + 'foo', 'w') - io.open(testpath + 'bar', 'w') + testpath['foo'].touch() + testpath['bar'].touch() assert delete_if_empty(testpath, ['foo', 'bar']) - assert not io.exists(testpath) + assert not testpath.exists() def test_directory_in_files_to_delete(self, tmpdir): testpath = Path(str(tmpdir)) - io.mkdir(testpath + 'foo') + testpath['foo'].mkdir() assert not delete_if_empty(testpath, ['foo']) - assert io.exists(testpath) + assert testpath.exists() def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir): testpath = Path(str(tmpdir)) - io.open(testpath + 'foo', 'w') - io.open(testpath + 'bar', 'w') + testpath['foo'].touch() + testpath['bar'].touch() assert not delete_if_empty(testpath, ['foo']) - assert io.exists(testpath) - assert io.exists(testpath + 'foo') + assert testpath.exists() + assert testpath['foo'].exists() def test_doesnt_exist(self): # When the 'path' doesn't exist, just do nothing. @@ -250,8 +249,8 @@ class TestCase_delete_if_empty: def test_is_file(self, tmpdir): # When 'path' is a file, do nothing. - p = Path(str(tmpdir)) + 'filename' - io.open(p, 'w').close() + p = Path(str(tmpdir))['filename'] + p.touch() delete_if_empty(p) # no crash def test_ioerror(self, tmpdir, monkeypatch): @@ -259,7 +258,7 @@ class TestCase_delete_if_empty: def do_raise(*args, **kw): raise OSError() - monkeypatch.setattr(io, 'rmdir', do_raise) + monkeypatch.setattr(Path, 'rmdir', do_raise) delete_if_empty(Path(str(tmpdir))) # no crash diff --git a/hscommon/util.py b/hscommon/util.py index 8d562d89..797c6fbf 100644 --- a/hscommon/util.py +++ b/hscommon/util.py @@ -13,9 +13,8 @@ import re from math import ceil import glob import shutil - -from . import io -from .path import Path +from pathlib import Path +import logging def nonone(value, replace_value): ''' Returns value if value is not None. Returns replace_value otherwise. @@ -256,14 +255,28 @@ def multi_replace(s, replace_from, replace_to=''): #--- Files related +def log_io_error(func): + """Catches OSError, IOError and WindowsError and log them + """ + def wrapper(path, *args, **kwargs): + try: + return func(path, *args, **kwargs) + except (IOError, OSError) as e: + msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"' + classname = e.__class__.__name__ + funcname = func.__name__ + logging.warn(msg.format(classname, funcname, str(path), str(e))) + + return wrapper + def modified_after(first_path, second_path): """Returns True if first_path's mtime is higher than second_path's mtime.""" try: - first_mtime = io.stat(first_path).st_mtime + first_mtime = os.stat(str(first_path)).st_mtime except EnvironmentError: return False try: - second_mtime = io.stat(second_path).st_mtime + second_mtime = os.stat(str(second_path)).st_mtime except EnvironmentError: return True return first_mtime > second_mtime @@ -281,18 +294,18 @@ def find_in_path(name, paths=None): return op.join(path, name) return None -@io.log_io_error +@log_io_error def delete_if_empty(path, files_to_delete=[]): ''' Deletes the directory at 'path' if it is empty or if it only contains files_to_delete. ''' - if not io.exists(path) or not io.isdir(path): + if not path.exists() or not path.is_dir(): return - contents = io.listdir(path) - if any(name for name in contents if (name not in files_to_delete) or io.isdir(path + name)): + contents = list(path.glob('*')) + if any(p for p in contents if (p.name not in files_to_delete) or p.is_dir()): return False - for name in contents: - io.remove(path + name) - io.rmdir(path) + for p in contents: + p.unlink() + path.rmdir() return True def open_if_filename(infile, mode='rb'): @@ -302,7 +315,7 @@ def open_if_filename(infile, mode='rb'): Returns a tuple (shouldbeclosed,infile) infile is a file object """ if isinstance(infile, Path): - return (io.open(infile, mode), True) + return (infile.open(mode=mode), True) if isinstance(infile, str): return (open(infile, mode), True) else: