mirror of
https://github.com/arsenetar/dupeguru.git
synced 2026-01-22 14:41:39 +00:00
Merge branch 'develop' into qt5
Conflicts: hscommon/desktop.py
This commit is contained in:
@@ -7,7 +7,10 @@
|
||||
# http://www.hardcoded.net/licenses/bsd_license
|
||||
|
||||
import re
|
||||
from . import io
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from .path import Path, pathify
|
||||
|
||||
#This matches [123], but not [12] (3 digits being the minimum).
|
||||
#It also matches [1234] [12345] etc..
|
||||
@@ -36,27 +39,28 @@ def get_unconflicted_name(name):
|
||||
def is_conflicted(name):
|
||||
return re_conflict.match(name) is not None
|
||||
|
||||
def _smart_move_or_copy(operation, source_path, dest_path):
|
||||
@pathify
|
||||
def _smart_move_or_copy(operation, source_path: Path, dest_path: Path):
|
||||
''' Use move() or copy() to move and copy file with the conflict management, but without the
|
||||
slowness of the fs system.
|
||||
'''
|
||||
if io.isdir(dest_path) and not io.isdir(source_path):
|
||||
dest_path = dest_path + source_path[-1]
|
||||
if io.exists(dest_path):
|
||||
filename = dest_path[-1]
|
||||
dest_dir_path = dest_path[:-1]
|
||||
newname = get_conflicted_name(io.listdir(dest_dir_path), filename)
|
||||
dest_path = dest_dir_path + newname
|
||||
operation(source_path, dest_path)
|
||||
if dest_path.isdir() and not source_path.isdir():
|
||||
dest_path = dest_path[source_path.name]
|
||||
if dest_path.exists():
|
||||
filename = dest_path.name
|
||||
dest_dir_path = dest_path.parent()
|
||||
newname = get_conflicted_name(os.listdir(str(dest_dir_path)), filename)
|
||||
dest_path = dest_dir_path[newname]
|
||||
operation(str(source_path), str(dest_path))
|
||||
|
||||
def smart_move(source_path, dest_path):
|
||||
_smart_move_or_copy(io.move, source_path, dest_path)
|
||||
_smart_move_or_copy(shutil.move, source_path, dest_path)
|
||||
|
||||
def smart_copy(source_path, dest_path):
|
||||
try:
|
||||
_smart_move_or_copy(io.copy, source_path, dest_path)
|
||||
_smart_move_or_copy(shutil.copy, source_path, dest_path)
|
||||
except IOError as e:
|
||||
if e.errno in {21, 13}: # it's a directory, code is 21 on OS X / Linux and 13 on Windows
|
||||
_smart_move_or_copy(io.copytree, source_path, dest_path)
|
||||
_smart_move_or_copy(shutil.copytree, source_path, dest_path)
|
||||
else:
|
||||
raise
|
||||
@@ -6,13 +6,13 @@
|
||||
# which should be included with this package. The terms are also available at
|
||||
# http://www.hardcoded.net/licenses/bsd_license
|
||||
|
||||
import os
|
||||
from datetime import datetime, date, timedelta
|
||||
import logging
|
||||
import sqlite3 as sqlite
|
||||
import threading
|
||||
from queue import Queue, Empty
|
||||
|
||||
from . import io
|
||||
from .path import Path
|
||||
from .util import iterdaterange
|
||||
|
||||
@@ -271,6 +271,9 @@ EUR = Currency(code='EUR')
|
||||
class CurrencyNotSupportedException(Exception):
|
||||
"""The current exchange rate provider doesn't support the requested currency."""
|
||||
|
||||
class RateProviderUnavailable(Exception):
|
||||
"""The rate provider is temporarily unavailable."""
|
||||
|
||||
def date2str(date):
|
||||
return '%d%02d%02d' % (date.year, date.month, date.day)
|
||||
|
||||
@@ -314,7 +317,7 @@ class RatesDB:
|
||||
logging.warning("Corrupt currency database at {0}. Starting over.".format(repr(self.db_or_path)))
|
||||
if isinstance(self.db_or_path, (str, Path)):
|
||||
self.con.close()
|
||||
io.remove(Path(self.db_or_path))
|
||||
os.remove(str(self.db_or_path))
|
||||
self.con = sqlite.connect(str(self.db_or_path))
|
||||
else:
|
||||
logging.warning("Can't re-use the file, using a memory table")
|
||||
@@ -452,11 +455,19 @@ class RatesDB:
|
||||
values = rate_provider(currency, fetch_start, fetch_end)
|
||||
except CurrencyNotSupportedException:
|
||||
continue
|
||||
except RateProviderUnavailable:
|
||||
logging.debug("Fetching failed due to temporary problems.")
|
||||
break
|
||||
else:
|
||||
if values:
|
||||
self._fetched_values.put((values, currency, fetch_start, fetch_end))
|
||||
logging.debug("Fetching successful!")
|
||||
break
|
||||
if not values:
|
||||
# We didn't get any value from the server, which means that we asked for
|
||||
# rates that couldn't be delivered. Still, we report empty values so
|
||||
# that the cache can correctly remember this unavailability so that we
|
||||
# don't repeatedly fetch those ranges.
|
||||
values = []
|
||||
self._fetched_values.put((values, currency, fetch_start, fetch_end))
|
||||
logging.debug("Fetching successful!")
|
||||
break
|
||||
else:
|
||||
logging.debug("Fetching failed!")
|
||||
|
||||
|
||||
@@ -6,6 +6,9 @@
|
||||
# which should be included with this package. The terms are also available at
|
||||
# http://www.hardcoded.net/licenses/bsd_license
|
||||
|
||||
import os.path as op
|
||||
import logging
|
||||
|
||||
class SpecialFolder:
|
||||
AppData = 1
|
||||
Cache = 2
|
||||
@@ -25,30 +28,41 @@ def reveal_path(path):
|
||||
"""
|
||||
_reveal_path(str(path))
|
||||
|
||||
def special_folder_path(special_folder):
|
||||
def special_folder_path(special_folder, appname=None):
|
||||
"""Returns the path of ``special_folder``.
|
||||
|
||||
``special_folder`` is a SpecialFolder.* const.
|
||||
``special_folder`` is a SpecialFolder.* const. The result is the special folder for the current
|
||||
application. The running process' application info is used to determine relevant information.
|
||||
|
||||
You can override the application name with ``appname``. This argument is ingored under Qt.
|
||||
"""
|
||||
return _special_folder_path(special_folder)
|
||||
return _special_folder_path(special_folder, appname)
|
||||
|
||||
try:
|
||||
from cocoa import proxy
|
||||
# Normally, we would simply do "from cocoa import proxy", but due to a bug in pytest (currently
|
||||
# at v2.4.2), our test suite is broken when we do that. This below is a workaround until that
|
||||
# bug is fixed.
|
||||
import cocoa
|
||||
if not hasattr(cocoa, 'proxy'):
|
||||
raise ImportError()
|
||||
proxy = cocoa.proxy
|
||||
_open_url = proxy.openURL_
|
||||
_open_path = proxy.openPath_
|
||||
_reveal_path = proxy.revealPath_
|
||||
|
||||
def _special_folder_path(special_folder):
|
||||
def _special_folder_path(special_folder, appname=None):
|
||||
if special_folder == SpecialFolder.Cache:
|
||||
return proxy.getCachePath()
|
||||
base = proxy.getCachePath()
|
||||
else:
|
||||
return proxy.getAppdataPath()
|
||||
base = proxy.getAppdataPath()
|
||||
if not appname:
|
||||
appname = proxy.bundleInfo_('CFBundleName')
|
||||
return op.join(base, appname)
|
||||
|
||||
except ImportError:
|
||||
try:
|
||||
from PyQt5.QtCore import QUrl, QStandardPaths
|
||||
from PyQt5.QtGui import QDesktopServices
|
||||
import os.path as op
|
||||
def _open_path(path):
|
||||
url = QUrl.fromLocalFile(str(path))
|
||||
QDesktopServices.openUrl(url)
|
||||
@@ -56,7 +70,7 @@ except ImportError:
|
||||
def _reveal_path(path):
|
||||
_open_path(op.dirname(str(path)))
|
||||
|
||||
def _special_folder_path(special_folder):
|
||||
def _special_folder_path(special_folder, appname=None):
|
||||
if special_folder == SpecialFolder.Cache:
|
||||
qtfolder = QStandardPaths.CacheLocation
|
||||
else:
|
||||
@@ -64,4 +78,14 @@ except ImportError:
|
||||
return QStandardPaths.standardLocations(qtfolder)[0]
|
||||
|
||||
except ImportError:
|
||||
raise Exception("Can't setup desktop functions!")
|
||||
# We're either running tests, and these functions don't matter much or we're in a really
|
||||
# weird situation. Let's just have dummy fallbacks.
|
||||
logging.warning("Can't setup desktop functions!")
|
||||
def _open_path(path):
|
||||
pass
|
||||
|
||||
def _reveal_path(path):
|
||||
pass
|
||||
|
||||
def _special_folder_path(special_folder, appname=None):
|
||||
return '/tmp'
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
# Created By: Virgil Dupras
|
||||
# Created On: 2007-10-23
|
||||
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
|
||||
|
||||
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
|
||||
# which should be included with this package. The terms are also available at
|
||||
# http://www.hardcoded.net/licenses/bsd_license
|
||||
|
||||
# HS code should only deal with Path instances, not string paths. One of the annoyances of this
|
||||
# is to always have to convert Path instances with unicode() when calling open() or listdir() etc..
|
||||
# this unit takes care of this
|
||||
|
||||
import builtins
|
||||
import os
|
||||
import os.path
|
||||
import shutil
|
||||
import logging
|
||||
|
||||
def log_io_error(func):
|
||||
""" Catches OSError, IOError and WindowsError and log them
|
||||
"""
|
||||
def wrapper(path, *args, **kwargs):
|
||||
try:
|
||||
return func(path, *args, **kwargs)
|
||||
except (IOError, OSError) as e:
|
||||
msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"'
|
||||
classname = e.__class__.__name__
|
||||
funcname = func.__name__
|
||||
logging.warn(msg.format(classname, funcname, str(path), str(e)))
|
||||
|
||||
return wrapper
|
||||
|
||||
def copy(source_path, dest_path):
|
||||
return shutil.copy(str(source_path), str(dest_path))
|
||||
|
||||
def copytree(source_path, dest_path, *args, **kwargs):
|
||||
return shutil.copytree(str(source_path), str(dest_path), *args, **kwargs)
|
||||
|
||||
def exists(path):
|
||||
return os.path.exists(str(path))
|
||||
|
||||
def isdir(path):
|
||||
return os.path.isdir(str(path))
|
||||
|
||||
def isfile(path):
|
||||
return os.path.isfile(str(path))
|
||||
|
||||
def islink(path):
|
||||
return os.path.islink(str(path))
|
||||
|
||||
def listdir(path):
|
||||
return os.listdir(str(path))
|
||||
|
||||
def mkdir(path, *args, **kwargs):
|
||||
return os.mkdir(str(path), *args, **kwargs)
|
||||
|
||||
def makedirs(path, *args, **kwargs):
|
||||
return os.makedirs(str(path), *args, **kwargs)
|
||||
|
||||
def move(source_path, dest_path):
|
||||
return shutil.move(str(source_path), str(dest_path))
|
||||
|
||||
def open(path, *args, **kwargs):
|
||||
return builtins.open(str(path), *args, **kwargs)
|
||||
|
||||
def remove(path):
|
||||
return os.remove(str(path))
|
||||
|
||||
def rename(source_path, dest_path):
|
||||
return os.rename(str(source_path), str(dest_path))
|
||||
|
||||
def rmdir(path):
|
||||
return os.rmdir(str(path))
|
||||
|
||||
def rmtree(path):
|
||||
return shutil.rmtree(str(path))
|
||||
|
||||
def stat(path):
|
||||
return os.stat(str(path))
|
||||
@@ -12,6 +12,8 @@ import os.path as op
|
||||
import shutil
|
||||
import sys
|
||||
from itertools import takewhile
|
||||
from functools import wraps
|
||||
from inspect import signature
|
||||
|
||||
class Path(tuple):
|
||||
"""A handy class to work with paths.
|
||||
@@ -94,12 +96,11 @@ class Path(tuple):
|
||||
stop = -len(equal_elems) if equal_elems else None
|
||||
key = slice(key.start, stop, key.step)
|
||||
return Path(tuple.__getitem__(self, key))
|
||||
elif isinstance(key, (str, Path)):
|
||||
return self + key
|
||||
else:
|
||||
return tuple.__getitem__(self, key)
|
||||
|
||||
def __getslice__(self, i, j): #I have to override it because tuple uses it.
|
||||
return Path(tuple.__getslice__(self, i, j))
|
||||
|
||||
def __hash__(self):
|
||||
return tuple.__hash__(self)
|
||||
|
||||
@@ -133,6 +134,13 @@ class Path(tuple):
|
||||
def tobytes(self):
|
||||
return str(self).encode(sys.getfilesystemencoding())
|
||||
|
||||
def parent(self):
|
||||
return self[:-1]
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self[-1]
|
||||
|
||||
# OS method wrappers
|
||||
def exists(self):
|
||||
return op.exists(str(self))
|
||||
@@ -153,7 +161,7 @@ class Path(tuple):
|
||||
return op.islink(str(self))
|
||||
|
||||
def listdir(self):
|
||||
return os.listdir(str(self))
|
||||
return [self[name] for name in os.listdir(str(self))]
|
||||
|
||||
def mkdir(self, *args, **kwargs):
|
||||
return os.mkdir(str(self), *args, **kwargs)
|
||||
@@ -182,3 +190,43 @@ class Path(tuple):
|
||||
def stat(self):
|
||||
return os.stat(str(self))
|
||||
|
||||
def pathify(f):
|
||||
"""Ensure that every annotated :class:`Path` arguments are actually paths.
|
||||
|
||||
When a function is decorated with ``@pathify``, every argument with annotated as Path will be
|
||||
converted to a Path if it wasn't already. Example::
|
||||
|
||||
@pathify
|
||||
def foo(path: Path, otherarg):
|
||||
return path.listdir()
|
||||
|
||||
Calling ``foo('/bar', 0)`` will convert ``'/bar'`` to ``Path('/bar')``.
|
||||
"""
|
||||
sig = signature(f)
|
||||
pindexes = {i for i, p in enumerate(sig.parameters.values()) if p.annotation is Path}
|
||||
pkeys = {k: v for k, v in sig.parameters.items() if v.annotation is Path}
|
||||
def path_or_none(p):
|
||||
return None if p is None else Path(p)
|
||||
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
args = tuple((path_or_none(a) if i in pindexes else a) for i, a in enumerate(args))
|
||||
kwargs = {k: (path_or_none(v) if k in pkeys else v) for k, v in kwargs.items()}
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapped
|
||||
|
||||
def log_io_error(func):
|
||||
""" Catches OSError, IOError and WindowsError and log them
|
||||
"""
|
||||
@wraps(func)
|
||||
def wrapper(path, *args, **kwargs):
|
||||
try:
|
||||
return func(path, *args, **kwargs)
|
||||
except (IOError, OSError) as e:
|
||||
msg = 'Error "{0}" during operation "{1}" on "{2}": "{3}"'
|
||||
classname = e.__class__.__name__
|
||||
funcname = func.__name__
|
||||
logging.warn(msg.format(classname, funcname, str(path), str(e)))
|
||||
|
||||
return wrapper
|
||||
|
||||
@@ -61,44 +61,44 @@ class TestCase_move_copy:
|
||||
def pytest_funcarg__do_setup(self, request):
|
||||
tmpdir = request.getfuncargvalue('tmpdir')
|
||||
self.path = Path(str(tmpdir))
|
||||
io.open(self.path + 'foo', 'w').close()
|
||||
io.open(self.path + 'bar', 'w').close()
|
||||
io.mkdir(self.path + 'dir')
|
||||
self.path['foo'].open('w').close()
|
||||
self.path['bar'].open('w').close()
|
||||
self.path['dir'].mkdir()
|
||||
|
||||
def test_move_no_conflict(self, do_setup):
|
||||
smart_move(self.path + 'foo', self.path + 'baz')
|
||||
assert io.exists(self.path + 'baz')
|
||||
assert not io.exists(self.path + 'foo')
|
||||
assert self.path['baz'].exists()
|
||||
assert not self.path['foo'].exists()
|
||||
|
||||
def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move
|
||||
smart_copy(self.path + 'foo', self.path + 'baz')
|
||||
assert io.exists(self.path + 'baz')
|
||||
assert io.exists(self.path + 'foo')
|
||||
assert self.path['baz'].exists()
|
||||
assert self.path['foo'].exists()
|
||||
|
||||
def test_move_no_conflict_dest_is_dir(self, do_setup):
|
||||
smart_move(self.path + 'foo', self.path + 'dir')
|
||||
assert io.exists(self.path + ('dir', 'foo'))
|
||||
assert not io.exists(self.path + 'foo')
|
||||
assert self.path['dir']['foo'].exists()
|
||||
assert not self.path['foo'].exists()
|
||||
|
||||
def test_move_conflict(self, do_setup):
|
||||
smart_move(self.path + 'foo', self.path + 'bar')
|
||||
assert io.exists(self.path + '[000] bar')
|
||||
assert not io.exists(self.path + 'foo')
|
||||
assert self.path['[000] bar'].exists()
|
||||
assert not self.path['foo'].exists()
|
||||
|
||||
def test_move_conflict_dest_is_dir(self, do_setup):
|
||||
smart_move(self.path + 'foo', self.path + 'dir')
|
||||
smart_move(self.path + 'bar', self.path + 'foo')
|
||||
smart_move(self.path + 'foo', self.path + 'dir')
|
||||
assert io.exists(self.path + ('dir', 'foo'))
|
||||
assert io.exists(self.path + ('dir', '[000] foo'))
|
||||
assert not io.exists(self.path + 'foo')
|
||||
assert not io.exists(self.path + 'bar')
|
||||
smart_move(self.path['foo'], self.path['dir'])
|
||||
smart_move(self.path['bar'], self.path['foo'])
|
||||
smart_move(self.path['foo'], self.path['dir'])
|
||||
assert self.path['dir']['foo'].exists()
|
||||
assert self.path['dir']['[000] foo'].exists()
|
||||
assert not self.path['foo'].exists()
|
||||
assert not self.path['bar'].exists()
|
||||
|
||||
def test_copy_folder(self, tmpdir):
|
||||
# smart_copy also works on folders
|
||||
path = Path(str(tmpdir))
|
||||
io.mkdir(path + 'foo')
|
||||
io.mkdir(path + 'bar')
|
||||
smart_copy(path + 'foo', path + 'bar') # no crash
|
||||
assert io.exists(path + '[000] bar')
|
||||
path['foo'].mkdir()
|
||||
path['bar'].mkdir()
|
||||
smart_copy(path['foo'], path['bar']) # no crash
|
||||
assert path['[000] bar'].exists()
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
from datetime import date
|
||||
import sqlite3 as sqlite
|
||||
|
||||
from .. import io
|
||||
from ..testutil import eq_, assert_almost_equal
|
||||
from ..currency import Currency, RatesDB, CAD, EUR, USD
|
||||
|
||||
@@ -64,7 +63,7 @@ def test_db_with_connection():
|
||||
|
||||
def test_corrupt_db(tmpdir):
|
||||
dbpath = str(tmpdir.join('foo.db'))
|
||||
fh = io.open(dbpath, 'w')
|
||||
fh = open(dbpath, 'w')
|
||||
fh.write('corrupted')
|
||||
fh.close()
|
||||
db = RatesDB(dbpath) # no crash. deletes the old file and start a new db
|
||||
|
||||
@@ -7,10 +7,11 @@
|
||||
# http://www.hardcoded.net/licenses/bsd_license
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
from pytest import raises, mark
|
||||
|
||||
from ..path import *
|
||||
from ..path import Path, pathify
|
||||
from ..testutil import eq_
|
||||
|
||||
def pytest_funcarg__force_ossep(request):
|
||||
@@ -44,7 +45,7 @@ def test_init_with_tuple_and_list(force_ossep):
|
||||
def test_init_with_invalid_value(force_ossep):
|
||||
try:
|
||||
path = Path(42)
|
||||
self.fail()
|
||||
assert False
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
@@ -63,6 +64,16 @@ def test_slicing(force_ossep):
|
||||
eq_('foo/bar',subpath)
|
||||
assert isinstance(subpath,Path)
|
||||
|
||||
def test_parent(force_ossep):
|
||||
path = Path('foo/bar/bleh')
|
||||
subpath = path.parent()
|
||||
eq_('foo/bar', subpath)
|
||||
assert isinstance(subpath, Path)
|
||||
|
||||
def test_filename(force_ossep):
|
||||
path = Path('foo/bar/bleh.ext')
|
||||
eq_(path.name, 'bleh.ext')
|
||||
|
||||
def test_deal_with_empty_components(force_ossep):
|
||||
"""Keep ONLY a leading space, which means we want a leading slash.
|
||||
"""
|
||||
@@ -99,7 +110,7 @@ def test_add(force_ossep):
|
||||
#Invalid concatenation
|
||||
try:
|
||||
Path(('foo','bar')) + 1
|
||||
self.fail()
|
||||
assert False
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
@@ -180,6 +191,16 @@ def test_Path_of_a_Path_returns_self(force_ossep):
|
||||
p = Path('foo/bar')
|
||||
assert Path(p) is p
|
||||
|
||||
def test_getitem_str(force_ossep):
|
||||
# path['something'] returns the child path corresponding to the name
|
||||
p = Path('/foo/bar')
|
||||
eq_(p['baz'], Path('/foo/bar/baz'))
|
||||
|
||||
def test_getitem_path(force_ossep):
|
||||
# path[Path('something')] returns the child path corresponding to the name (or subpath)
|
||||
p = Path('/foo/bar')
|
||||
eq_(p[Path('baz/bleh')], Path('/foo/bar/baz/bleh'))
|
||||
|
||||
@mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate")
|
||||
def test_log_unicode_errors(force_ossep, monkeypatch, capsys):
|
||||
# When an there's a UnicodeDecodeError on path creation, log it so it can be possible
|
||||
@@ -206,4 +227,25 @@ def test_remove_drive_letter(monkeypatch):
|
||||
p = Path('C:\\')
|
||||
eq_(p.remove_drive_letter(), Path(''))
|
||||
p = Path('z:\\foo')
|
||||
eq_(p.remove_drive_letter(), Path('foo'))
|
||||
eq_(p.remove_drive_letter(), Path('foo'))
|
||||
|
||||
def test_pathify():
|
||||
@pathify
|
||||
def foo(a: Path, b, c:Path):
|
||||
return a, b, c
|
||||
|
||||
a, b, c = foo('foo', 0, c=Path('bar'))
|
||||
assert isinstance(a, Path)
|
||||
assert a == Path('foo')
|
||||
assert b == 0
|
||||
assert isinstance(c, Path)
|
||||
assert c == Path('bar')
|
||||
|
||||
def test_pathify_preserve_none():
|
||||
# @pathify preserves None value and doesn't try to return a Path
|
||||
@pathify
|
||||
def foo(a: Path):
|
||||
return a
|
||||
|
||||
a = foo(None)
|
||||
assert a is None
|
||||
|
||||
@@ -11,7 +11,6 @@ from io import StringIO
|
||||
from pytest import raises
|
||||
|
||||
from ..testutil import eq_
|
||||
from .. import io
|
||||
from ..path import Path
|
||||
from ..util import *
|
||||
|
||||
@@ -210,39 +209,49 @@ class TestCase_modified_after:
|
||||
monkeyplus.patch_osstat('first', st_mtime=42)
|
||||
assert modified_after('first', 'does_not_exist') # no crash
|
||||
|
||||
def test_first_file_is_none(self, monkeyplus):
|
||||
# when the first file is None, we return False
|
||||
monkeyplus.patch_osstat('second', st_mtime=42)
|
||||
assert not modified_after(None, 'second') # no crash
|
||||
|
||||
def test_second_file_is_none(self, monkeyplus):
|
||||
# when the second file is None, we return True
|
||||
monkeyplus.patch_osstat('first', st_mtime=42)
|
||||
assert modified_after('first', None) # no crash
|
||||
|
||||
|
||||
class TestCase_delete_if_empty:
|
||||
def test_is_empty(self, tmpdir):
|
||||
testpath = Path(str(tmpdir))
|
||||
assert delete_if_empty(testpath)
|
||||
assert not io.exists(testpath)
|
||||
assert not testpath.exists()
|
||||
|
||||
def test_not_empty(self, tmpdir):
|
||||
testpath = Path(str(tmpdir))
|
||||
io.mkdir(testpath + 'foo')
|
||||
testpath['foo'].mkdir()
|
||||
assert not delete_if_empty(testpath)
|
||||
assert io.exists(testpath)
|
||||
assert testpath.exists()
|
||||
|
||||
def test_with_files_to_delete(self, tmpdir):
|
||||
testpath = Path(str(tmpdir))
|
||||
io.open(testpath + 'foo', 'w')
|
||||
io.open(testpath + 'bar', 'w')
|
||||
testpath['foo'].open('w')
|
||||
testpath['bar'].open('w')
|
||||
assert delete_if_empty(testpath, ['foo', 'bar'])
|
||||
assert not io.exists(testpath)
|
||||
assert not testpath.exists()
|
||||
|
||||
def test_directory_in_files_to_delete(self, tmpdir):
|
||||
testpath = Path(str(tmpdir))
|
||||
io.mkdir(testpath + 'foo')
|
||||
testpath['foo'].mkdir()
|
||||
assert not delete_if_empty(testpath, ['foo'])
|
||||
assert io.exists(testpath)
|
||||
assert testpath.exists()
|
||||
|
||||
def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir):
|
||||
testpath = Path(str(tmpdir))
|
||||
io.open(testpath + 'foo', 'w')
|
||||
io.open(testpath + 'bar', 'w')
|
||||
testpath['foo'].open('w')
|
||||
testpath['bar'].open('w')
|
||||
assert not delete_if_empty(testpath, ['foo'])
|
||||
assert io.exists(testpath)
|
||||
assert io.exists(testpath + 'foo')
|
||||
assert testpath.exists()
|
||||
assert testpath['foo'].exists()
|
||||
|
||||
def test_doesnt_exist(self):
|
||||
# When the 'path' doesn't exist, just do nothing.
|
||||
@@ -251,7 +260,7 @@ class TestCase_delete_if_empty:
|
||||
def test_is_file(self, tmpdir):
|
||||
# When 'path' is a file, do nothing.
|
||||
p = Path(str(tmpdir)) + 'filename'
|
||||
io.open(p, 'w').close()
|
||||
p.open('w').close()
|
||||
delete_if_empty(p) # no crash
|
||||
|
||||
def test_ioerror(self, tmpdir, monkeypatch):
|
||||
@@ -259,7 +268,7 @@ class TestCase_delete_if_empty:
|
||||
def do_raise(*args, **kw):
|
||||
raise OSError()
|
||||
|
||||
monkeypatch.setattr(io, 'rmdir', do_raise)
|
||||
monkeypatch.setattr(Path, 'rmdir', do_raise)
|
||||
delete_if_empty(Path(str(tmpdir))) # no crash
|
||||
|
||||
|
||||
|
||||
@@ -15,8 +15,7 @@ import glob
|
||||
import shutil
|
||||
from datetime import timedelta
|
||||
|
||||
from . import io
|
||||
from .path import Path
|
||||
from .path import Path, pathify, log_io_error
|
||||
|
||||
def nonone(value, replace_value):
|
||||
''' Returns value if value is not None. Returns replace_value otherwise.
|
||||
@@ -267,15 +266,19 @@ def iterdaterange(start, end):
|
||||
|
||||
#--- Files related
|
||||
|
||||
def modified_after(first_path, second_path):
|
||||
"""Returns True if first_path's mtime is higher than second_path's mtime."""
|
||||
@pathify
|
||||
def modified_after(first_path: Path, second_path: Path):
|
||||
"""Returns ``True`` if first_path's mtime is higher than second_path's mtime.
|
||||
|
||||
If one of the files doesn't exist or is ``None``, it is considered "never modified".
|
||||
"""
|
||||
try:
|
||||
first_mtime = io.stat(first_path).st_mtime
|
||||
except EnvironmentError:
|
||||
first_mtime = first_path.stat().st_mtime
|
||||
except (EnvironmentError, AttributeError):
|
||||
return False
|
||||
try:
|
||||
second_mtime = io.stat(second_path).st_mtime
|
||||
except EnvironmentError:
|
||||
second_mtime = second_path.stat().st_mtime
|
||||
except (EnvironmentError, AttributeError):
|
||||
return True
|
||||
return first_mtime > second_mtime
|
||||
|
||||
@@ -292,18 +295,19 @@ def find_in_path(name, paths=None):
|
||||
return op.join(path, name)
|
||||
return None
|
||||
|
||||
@io.log_io_error
|
||||
def delete_if_empty(path, files_to_delete=[]):
|
||||
@log_io_error
|
||||
@pathify
|
||||
def delete_if_empty(path: Path, files_to_delete=[]):
|
||||
''' Deletes the directory at 'path' if it is empty or if it only contains files_to_delete.
|
||||
'''
|
||||
if not io.exists(path) or not io.isdir(path):
|
||||
if not path.exists() or not path.isdir():
|
||||
return
|
||||
contents = io.listdir(path)
|
||||
if any(name for name in contents if (name not in files_to_delete) or io.isdir(path + name)):
|
||||
contents = path.listdir()
|
||||
if any(p for p in contents if (p.name not in files_to_delete) or p.isdir()):
|
||||
return False
|
||||
for name in contents:
|
||||
io.remove(path + name)
|
||||
io.rmdir(path)
|
||||
for p in contents:
|
||||
p.remove()
|
||||
path.rmdir()
|
||||
return True
|
||||
|
||||
def open_if_filename(infile, mode='rb'):
|
||||
@@ -313,7 +317,7 @@ def open_if_filename(infile, mode='rb'):
|
||||
Returns a tuple (shouldbeclosed,infile) infile is a file object
|
||||
"""
|
||||
if isinstance(infile, Path):
|
||||
return (io.open(infile, mode), True)
|
||||
return (infile.open(mode), True)
|
||||
if isinstance(infile, str):
|
||||
return (open(infile, mode), True)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user