hscommon.path --> pathlib

core tests pass
This commit is contained in:
Virgil Dupras 2013-08-05 18:41:56 -04:00
parent 160aaaf880
commit 740e2ab263
12 changed files with 147 additions and 171 deletions

View File

@ -16,9 +16,9 @@ import shutil
from send2trash import send2trash
from jobprogress import job
from pathlib import Path
from hscommon.reg import RegistrableApplication
from hscommon.notify import Broadcaster
from hscommon.path import Path
from hscommon.conflict import smart_move, smart_copy
from hscommon.gui.progress_window import ProgressWindow
from hscommon.util import (delete_if_empty, first, escape, nonone, format_time_decimal, allsame,
@ -334,29 +334,29 @@ class DupeGuru(RegistrableApplication, Broadcaster):
def clean_empty_dirs(self, path):
if self.options['clean_empty_dirs']:
while delete_if_empty(path, ['.DS_Store']):
path = path[:-1]
path = path.parent()
def copy_or_move(self, dupe, copy: bool, destination: str, dest_type: DestType):
source_path = dupe.path
location_path = first(p for p in self.directories if dupe.path in p)
dest_path = Path(destination)
if dest_type in {DestType.Relative, DestType.Absolute}:
# no filename, no windows drive letter
source_base = source_path.remove_drive_letter()[:-1]
# no filename, no windows drive letter, no root
source_base = source_path.relative().parent()
if dest_type == DestType.Relative:
source_base = source_base[location_path:]
dest_path = dest_path + source_base
source_base = source_base.relative_to(location_path.relative())
dest_path = dest_path[source_base]
if not dest_path.exists():
dest_path.makedirs()
dest_path.mkdir(parents=True)
# Add filename to dest_path. For file move/copy, it's not required, but for folders, yes.
dest_path = dest_path + source_path[-1]
dest_path = dest_path[source_path.name]
logging.debug("Copy/Move operation from '%s' to '%s'", source_path, dest_path)
# Raises an EnvironmentError if there's a problem
if copy:
smart_copy(source_path, dest_path)
else:
smart_move(source_path, dest_path)
self.clean_empty_dirs(source_path[:-1])
self.clean_empty_dirs(source_path.parent())
def copy_or_move_marked(self, copy):
def do(j):

View File

@ -8,9 +8,9 @@
from xml.etree import ElementTree as ET
import logging
from pathlib import Path
from jobprogress import job
from hscommon.path import Path
from hscommon.util import FileOrPath
from . import fs
@ -35,7 +35,7 @@ class Directories:
def __contains__(self, path):
for p in self._dirs:
if path in p:
if path == p or p in path.parents():
return True
return False
@ -51,7 +51,7 @@ class Directories:
#---Private
def _default_state_for_path(self, path):
# Override this in subclasses to specify the state of some special folders.
if path[-1].startswith('.'): # hidden
if path.name.startswith('.'): # hidden
return DirectoryState.Excluded
def _get_files(self, from_path, j):
@ -61,7 +61,7 @@ class Directories:
# Recursively get files from folders with lots of subfolder is expensive. However, there
# might be a subfolder in this path that is not excluded. What we want to do is to skim
# through self.states and see if we must continue, or we can stop right here to save time
if not any(p[:len(from_path)] == from_path for p in self.states):
if not any(p.parts[:len(from_path.parts)] == from_path.parts for p in self.states):
return
try:
filepaths = set()
@ -72,9 +72,8 @@ class Directories:
file.is_ref = state == DirectoryState.Reference
filepaths.add(file.path)
yield file
subpaths = [from_path + name for name in from_path.listdir()]
# it's possible that a folder (bundle) gets into the file list. in that case, we don't want to recurse into it
subfolders = [p for p in subpaths if not p.islink() and p.isdir() and p not in filepaths]
subfolders = [p for p in from_path.glob('*') if not p.is_symlink() and p.is_dir() and p not in filepaths]
for subfolder in subfolders:
for file in self._get_files(subfolder, j):
yield file
@ -114,9 +113,9 @@ class Directories:
def get_subfolders(path):
"""returns a sorted list of paths corresponding to subfolders in `path`"""
try:
names = [name for name in path.listdir() if (path + name).isdir()]
names = [p.name for p in path.glob('*') if p.is_dir()]
names.sort(key=lambda x:x.lower())
return [path + name for name in names]
return [path[name] for name in names]
except EnvironmentError:
return []
@ -147,7 +146,7 @@ class Directories:
default_state = self._default_state_for_path(path)
if default_state is not None:
return default_state
parent = path[:-1]
parent = path.parent()
if parent in self:
return self.get_state(parent)
else:

View File

@ -129,14 +129,14 @@ class File:
#--- Public
@classmethod
def can_handle(cls, path):
return not path.islink() and path.isfile()
return not path.is_symlink() and path.is_file()
def rename(self, newname):
if newname == self.name:
return
destpath = self.path[:-1] + newname
destpath = self.path.parent()[newname]
if destpath.exists():
raise AlreadyExistsError(newname, self.path[:-1])
raise AlreadyExistsError(newname, self.path.parent())
try:
self.path.rename(destpath)
except EnvironmentError:
@ -157,11 +157,11 @@ class File:
@property
def name(self):
return self.path[-1]
return self.path.name
@property
def folder_path(self):
return self.path[:-1]
return self.path.parent()
class Folder(File):
@ -203,14 +203,13 @@ class Folder(File):
@property
def subfolders(self):
if self._subfolders is None:
subpaths = [self.path + name for name in self.path.listdir()]
subfolders = [p for p in subpaths if not p.islink() and p.isdir()]
subfolders = [p for p in self.path.glob('*') if not p.is_symlink() and p.is_dir()]
self._subfolders = [Folder(p) for p in subfolders]
return self._subfolders
@classmethod
def can_handle(cls, path):
return not path.islink() and path.isdir()
return not path.is_symlink() and path.is_dir()
def get_file(path, fileclasses=[File]):
@ -220,18 +219,9 @@ def get_file(path, fileclasses=[File]):
def get_files(path, fileclasses=[File]):
assert all(issubclass(fileclass, File) for fileclass in fileclasses)
def combine_paths(p1, p2):
try:
return p1 + p2
except Exception:
# This is temporary debug logging for #84.
logging.warning("Failed to combine %r and %r.", p1, p2)
raise
try:
paths = [combine_paths(path, name) for name in path.listdir()]
result = []
for path in paths:
for path in path.glob('*'):
file = get_file(path, fileclasses=fileclasses)
if file is not None:
result.append(file)

View File

@ -31,7 +31,7 @@ class DirectoryNode(Node):
self.clear()
subpaths = self._tree.app.directories.get_subfolders(self._directory_path)
for path in subpaths:
self.append(DirectoryNode(self._tree, path, path[-1]))
self.append(DirectoryNode(self._tree, path, path.name))
self._loaded = True
def update_all_states(self):

View File

@ -79,7 +79,7 @@ class FolderCategory(ValueListCategory):
def sort_key(self, dupe, crit_value):
value = self.extract_value(dupe)
if value[:len(crit_value)] == crit_value:
if crit_value == value or crit_value in value.parents():
return 0
else:
return 1

View File

@ -117,7 +117,7 @@ class Scanner:
return False
if is_same_with_digit(refname, dupename):
return True
return len(dupe.path) > len(ref.path)
return len(dupe.path.parts) > len(ref.path.parts)
def get_dupe_groups(self, files, j=job.nulljob):
j = j.start_subjob([8, 2])
@ -140,7 +140,7 @@ class Scanner:
toremove = set()
last_parent_path = sortedpaths[0]
for p in sortedpaths[1:]:
if p in last_parent_path:
if last_parent_path in p.parents():
toremove.add(p)
else:
last_parent_path = p

View File

@ -9,10 +9,9 @@
import os
import os.path as op
import logging
from pathlib import Path
from pytest import mark
from hscommon import io
from hscommon.path import Path
import hscommon.conflict
import hscommon.util
from hscommon.testutil import CallLogger, eq_, log_calls
@ -57,7 +56,7 @@ class TestCaseDupeGuru:
# for this unit is pathetic. What's done is done. My approach now is to add tests for
# every change I want to make. The blowup was caused by a missing import.
p = Path(str(tmpdir))
io.open(p + 'foo', 'w').close()
p['foo'].touch()
monkeypatch.setattr(hscommon.conflict, 'smart_copy', log_calls(lambda source_path, dest_path: None))
# XXX This monkeypatch is temporary. will be fixed in a better monkeypatcher.
monkeypatch.setattr(app, 'smart_copy', hscommon.conflict.smart_copy)
@ -68,19 +67,19 @@ class TestCaseDupeGuru:
dgapp.copy_or_move(f, True, 'some_destination', 0)
eq_(1, len(hscommon.conflict.smart_copy.calls))
call = hscommon.conflict.smart_copy.calls[0]
eq_(call['dest_path'], op.join('some_destination', 'foo'))
eq_(call['dest_path'], Path('some_destination', 'foo'))
eq_(call['source_path'], f.path)
def test_copy_or_move_clean_empty_dirs(self, tmpdir, monkeypatch):
tmppath = Path(str(tmpdir))
sourcepath = tmppath + 'source'
io.mkdir(sourcepath)
io.open(sourcepath + 'myfile', 'w')
sourcepath = tmppath['source']
sourcepath.mkdir()
sourcepath['myfile'].touch()
app = TestApp().app
app.directories.add_path(tmppath)
[myfile] = app.directories.get_files()
monkeypatch.setattr(app, 'clean_empty_dirs', log_calls(lambda path: None))
app.copy_or_move(myfile, False, tmppath + 'dest', 0)
app.copy_or_move(myfile, False, tmppath['dest'], 0)
calls = app.clean_empty_dirs.calls
eq_(1, len(calls))
eq_(sourcepath, calls[0]['path'])
@ -104,8 +103,8 @@ class TestCaseDupeGuru:
# If the ignore_hardlink_matches option is set, don't match files hardlinking to the same
# inode.
tmppath = Path(str(tmpdir))
io.open(tmppath + 'myfile', 'w').write('foo')
os.link(str(tmppath + 'myfile'), str(tmppath + 'hardlink'))
tmppath['myfile'].open('wt').write('foo')
os.link(str(tmppath['myfile']), str(tmppath['hardlink']))
app = TestApp().app
app.directories.add_path(tmppath)
app.scanner.scan_type = ScanType.Contents
@ -145,7 +144,7 @@ class TestCaseDupeGuru_clean_empty_dirs:
# delete_if_empty must be recursively called up in the path until it returns False
@log_calls
def mock_delete_if_empty(path, files_to_delete=[]):
return len(path) > 1
return len(path.parts) > 1
monkeypatch.setattr(hscommon.util, 'delete_if_empty', mock_delete_if_empty)
# XXX This monkeypatch is temporary. will be fixed in a better monkeypatcher.
@ -171,8 +170,8 @@ class TestCaseDupeGuruWithResults:
self.rtable.refresh()
tmpdir = request.getfuncargvalue('tmpdir')
tmppath = Path(str(tmpdir))
io.mkdir(tmppath + 'foo')
io.mkdir(tmppath + 'bar')
tmppath['foo'].mkdir()
tmppath['bar'].mkdir()
self.app.directories.add_path(tmppath)
def test_GetObjects(self, do_setup):
@ -404,12 +403,9 @@ class TestCaseDupeGuru_renameSelected:
def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir')
p = Path(str(tmpdir))
fp = open(str(p + 'foo bar 1'),mode='w')
fp.close()
fp = open(str(p + 'foo bar 2'),mode='w')
fp.close()
fp = open(str(p + 'foo bar 3'),mode='w')
fp.close()
p['foo bar 1'].touch()
p['foo bar 2'].touch()
p['foo bar 3'].touch()
files = fs.get_files(p)
for f in files:
f.is_ref = False
@ -431,7 +427,7 @@ class TestCaseDupeGuru_renameSelected:
g = self.groups[0]
self.rtable.select([1])
assert app.rename_selected('renamed')
names = io.listdir(self.p)
names = [p.name for p in self.p.glob('*')]
assert 'renamed' in names
assert 'foo bar 2' not in names
eq_(g.dupes[0].name, 'renamed')
@ -444,7 +440,7 @@ class TestCaseDupeGuru_renameSelected:
assert not app.rename_selected('renamed')
msg = logging.warning.calls[0]['msg']
eq_('dupeGuru Warning: list index out of range', msg)
names = io.listdir(self.p)
names = [p.name for p in self.p.glob('*')]
assert 'renamed' not in names
assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2')
@ -457,7 +453,7 @@ class TestCaseDupeGuru_renameSelected:
assert not app.rename_selected('foo bar 1')
msg = logging.warning.calls[0]['msg']
assert msg.startswith('dupeGuru Warning: \'foo bar 1\' already exists in')
names = io.listdir(self.p)
names = [p.name for p in self.p.glob('*')]
assert 'foo bar 1' in names
assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2')
@ -467,9 +463,9 @@ class TestAppWithDirectoriesInTree:
def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir')
p = Path(str(tmpdir))
io.mkdir(p + 'sub1')
io.mkdir(p + 'sub2')
io.mkdir(p + 'sub3')
p['sub1'].mkdir()
p['sub2'].mkdir()
p['sub3'].mkdir()
app = TestApp()
self.app = app.app
self.dtree = app.dtree

View File

@ -6,8 +6,9 @@
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from pathlib import Path
from hscommon.testutil import TestApp as TestAppBase, eq_, with_app
from hscommon.path import Path
from hscommon.util import get_file_ext, format_size
from hscommon.gui.column import Column
from jobprogress.job import nulljob, JobCancelled
@ -100,11 +101,11 @@ class NamedObject:
@property
def path(self):
return self._folder + self.name
return self._folder[self.name]
@property
def folder_path(self):
return self.path[:-1]
return self.path.parent()
@property
def extension(self):

View File

@ -10,39 +10,32 @@ import os
import time
import tempfile
import shutil
from pathlib import Path
from pytest import raises
from hscommon import io
from hscommon.path import Path
from hscommon.testutil import eq_
from ..directories import *
def create_fake_fs(rootpath):
# We have it as a separate function because other units are using it.
rootpath = rootpath + 'fs'
io.mkdir(rootpath)
io.mkdir(rootpath + 'dir1')
io.mkdir(rootpath + 'dir2')
io.mkdir(rootpath + 'dir3')
fp = io.open(rootpath + 'file1.test', 'w')
fp.write('1')
fp.close()
fp = io.open(rootpath + 'file2.test', 'w')
fp.write('12')
fp.close()
fp = io.open(rootpath + 'file3.test', 'w')
fp.write('123')
fp.close()
fp = io.open(rootpath + ('dir1', 'file1.test'), 'w')
fp.write('1')
fp.close()
fp = io.open(rootpath + ('dir2', 'file2.test'), 'w')
fp.write('12')
fp.close()
fp = io.open(rootpath + ('dir3', 'file3.test'), 'w')
fp.write('123')
fp.close()
rootpath = rootpath['fs']
rootpath.mkdir()
rootpath['dir1'].mkdir()
rootpath['dir2'].mkdir()
rootpath['dir3'].mkdir()
with rootpath['file1.test'].open('wt') as fp:
fp.write('1')
with rootpath['file2.test'].open('wt') as fp:
fp.write('12')
with rootpath['file3.test'].open('wt') as fp:
fp.write('123')
with rootpath['dir1/file1.test'].open('wt') as fp:
fp.write('1')
with rootpath['dir2/file2.test'].open('wt') as fp:
fp.write('12')
with rootpath['dir3/file3.test'].open('wt') as fp:
fp.write('123')
return rootpath
def setup_module(module):
@ -50,11 +43,10 @@ def setup_module(module):
# and another with a more complex structure.
testpath = Path(tempfile.mkdtemp())
module.testpath = testpath
rootpath = testpath + 'onefile'
io.mkdir(rootpath)
fp = io.open(rootpath + 'test.txt', 'w')
fp.write('test_data')
fp.close()
rootpath = testpath['onefile']
rootpath.mkdir()
with rootpath['test.txt'].open('wt') as fp:
fp.write('test_data')
create_fake_fs(testpath)
def teardown_module(module):
@ -67,30 +59,30 @@ def test_empty():
def test_add_path():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
eq_(1,len(d))
assert p in d
assert (p + 'foobar') in d
assert p[:-1] not in d
p = testpath + 'fs'
assert p['foobar'] in d
assert p.parent() not in d
p = testpath['fs']
d.add_path(p)
eq_(2,len(d))
assert p in d
def test_AddPath_when_path_is_already_there():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p + 'foobar')
d.add_path(p['foobar'])
eq_(1, len(d))
def test_add_path_containing_paths_already_there():
d = Directories()
d.add_path(testpath + 'onefile')
d.add_path(testpath['onefile'])
eq_(1, len(d))
d.add_path(testpath)
eq_(len(d), 1)
@ -98,7 +90,7 @@ def test_add_path_containing_paths_already_there():
def test_AddPath_non_latin(tmpdir):
p = Path(str(tmpdir))
to_add = p + 'unicode\u201a'
to_add = p['unicode\u201a']
os.mkdir(str(to_add))
d = Directories()
try:
@ -108,24 +100,24 @@ def test_AddPath_non_latin(tmpdir):
def test_del():
d = Directories()
d.add_path(testpath + 'onefile')
d.add_path(testpath['onefile'])
try:
del d[1]
assert False
except IndexError:
pass
d.add_path(testpath + 'fs')
d.add_path(testpath['fs'])
del d[1]
eq_(1, len(d))
def test_states():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
eq_(DirectoryState.Normal ,d.get_state(p))
d.set_state(p, DirectoryState.Reference)
eq_(DirectoryState.Reference ,d.get_state(p))
eq_(DirectoryState.Reference ,d.get_state(p + 'dir1'))
eq_(DirectoryState.Reference ,d.get_state(p['dir1']))
eq_(1,len(d.states))
eq_(p,list(d.states.keys())[0])
eq_(DirectoryState.Reference ,d.states[p])
@ -133,67 +125,67 @@ def test_states():
def test_get_state_with_path_not_there():
# When the path's not there, just return DirectoryState.Normal
d = Directories()
d.add_path(testpath + 'onefile')
d.add_path(testpath['onefile'])
eq_(d.get_state(testpath), DirectoryState.Normal)
def test_states_remain_when_larger_directory_eat_smaller_ones():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
d.set_state(p, DirectoryState.Excluded)
d.add_path(testpath)
d.set_state(testpath, DirectoryState.Reference)
eq_(DirectoryState.Excluded ,d.get_state(p))
eq_(DirectoryState.Excluded ,d.get_state(p + 'dir1'))
eq_(DirectoryState.Reference ,d.get_state(testpath))
eq_(DirectoryState.Excluded, d.get_state(p))
eq_(DirectoryState.Excluded, d.get_state(p['dir1']))
eq_(DirectoryState.Reference, d.get_state(testpath))
def test_set_state_keep_state_dict_size_to_minimum():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
d.set_state(p, DirectoryState.Reference)
d.set_state(p + 'dir1', DirectoryState.Reference)
d.set_state(p['dir1'], DirectoryState.Reference)
eq_(1,len(d.states))
eq_(DirectoryState.Reference ,d.get_state(p + 'dir1'))
d.set_state(p + 'dir1', DirectoryState.Normal)
eq_(DirectoryState.Reference ,d.get_state(p['dir1']))
d.set_state(p['dir1'], DirectoryState.Normal)
eq_(2,len(d.states))
eq_(DirectoryState.Normal ,d.get_state(p + 'dir1'))
d.set_state(p + 'dir1', DirectoryState.Reference)
eq_(DirectoryState.Normal ,d.get_state(p['dir1']))
d.set_state(p['dir1'], DirectoryState.Reference)
eq_(1,len(d.states))
eq_(DirectoryState.Reference ,d.get_state(p + 'dir1'))
eq_(DirectoryState.Reference ,d.get_state(p['dir1']))
def test_get_files():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
d.set_state(p + 'dir1', DirectoryState.Reference)
d.set_state(p + 'dir2', DirectoryState.Excluded)
d.set_state(p['dir1'], DirectoryState.Reference)
d.set_state(p['dir2'], DirectoryState.Excluded)
files = list(d.get_files())
eq_(5, len(files))
for f in files:
if f.path[:-1] == p + 'dir1':
if f.path.parent() == p['dir1']:
assert f.is_ref
else:
assert not f.is_ref
def test_get_folders():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
d.set_state(p + 'dir1', DirectoryState.Reference)
d.set_state(p + 'dir2', DirectoryState.Excluded)
d.set_state(p['dir1'], DirectoryState.Reference)
d.set_state(p['dir2'], DirectoryState.Excluded)
folders = list(d.get_folders())
eq_(len(folders), 3)
ref = [f for f in folders if f.is_ref]
not_ref = [f for f in folders if not f.is_ref]
eq_(len(ref), 1)
eq_(ref[0].path, p + 'dir1')
eq_(ref[0].path, p['dir1'])
eq_(len(not_ref), 2)
eq_(ref[0].size, 1)
def test_get_files_with_inherited_exclusion():
d = Directories()
p = testpath + 'onefile'
p = testpath['onefile']
d.add_path(p)
d.set_state(p, DirectoryState.Excluded)
eq_([], list(d.get_files()))
@ -202,19 +194,19 @@ def test_save_and_load(tmpdir):
d1 = Directories()
d2 = Directories()
p1 = Path(str(tmpdir.join('p1')))
io.mkdir(p1)
p1.mkdir()
p2 = Path(str(tmpdir.join('p2')))
io.mkdir(p2)
p2.mkdir()
d1.add_path(p1)
d1.add_path(p2)
d1.set_state(p1, DirectoryState.Reference)
d1.set_state(p1 + 'dir1', DirectoryState.Excluded)
d1.set_state(p1['dir1'], DirectoryState.Excluded)
tmpxml = str(tmpdir.join('directories_testunit.xml'))
d1.save_to_file(tmpxml)
d2.load_from_file(tmpxml)
eq_(2, len(d2))
eq_(DirectoryState.Reference ,d2.get_state(p1))
eq_(DirectoryState.Excluded ,d2.get_state(p1 + 'dir1'))
eq_(DirectoryState.Excluded ,d2.get_state(p1['dir1']))
def test_invalid_path():
d = Directories()
@ -234,12 +226,12 @@ def test_load_from_file_with_invalid_path(tmpdir):
#This test simulates a load from file resulting in a
#InvalidPath raise. Other directories must be loaded.
d1 = Directories()
d1.add_path(testpath + 'onefile')
d1.add_path(testpath['onefile'])
#Will raise InvalidPath upon loading
p = Path(str(tmpdir.join('toremove')))
io.mkdir(p)
p.mkdir()
d1.add_path(p)
io.rmdir(p)
p.rmdir()
tmpxml = str(tmpdir.join('directories_testunit.xml'))
d1.save_to_file(tmpxml)
d2 = Directories()
@ -248,11 +240,11 @@ def test_load_from_file_with_invalid_path(tmpdir):
def test_unicode_save(tmpdir):
d = Directories()
p1 = Path(str(tmpdir)) + 'hello\xe9'
io.mkdir(p1)
io.mkdir(p1 + 'foo\xe9')
p1 = Path(str(tmpdir))['hello\xe9']
p1.mkdir()
p1['foo\xe9'].mkdir()
d.add_path(p1)
d.set_state(p1 + 'foo\xe9', DirectoryState.Excluded)
d.set_state(p1['foo\xe9'], DirectoryState.Excluded)
tmpxml = str(tmpdir.join('directories_testunit.xml'))
try:
d.save_to_file(tmpxml)
@ -261,12 +253,12 @@ def test_unicode_save(tmpdir):
def test_get_files_refreshes_its_directories():
d = Directories()
p = testpath + 'fs'
p = testpath['fs']
d.add_path(p)
files = d.get_files()
eq_(6, len(list(files)))
time.sleep(1)
os.remove(str(p + ('dir1','file1.test')))
os.remove(str(p['dir1']['file1.test']))
files = d.get_files()
eq_(5, len(list(files)))
@ -274,14 +266,14 @@ def test_get_files_does_not_choke_on_non_existing_directories(tmpdir):
d = Directories()
p = Path(str(tmpdir))
d.add_path(p)
io.rmtree(p)
shutil.rmtree(str(p))
eq_([], list(d.get_files()))
def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir):
d = Directories()
p = Path(str(tmpdir))
hidden_dir_path = p + '.foo'
io.mkdir(p + '.foo')
hidden_dir_path = p['.foo']
p['.foo'].mkdir()
d.add_path(p)
eq_(d.get_state(hidden_dir_path), DirectoryState.Excluded)
# But it can be overriden
@ -292,21 +284,21 @@ def test_default_path_state_override(tmpdir):
# It's possible for a subclass to override the default state of a path
class MyDirectories(Directories):
def _default_state_for_path(self, path):
if 'foobar' in path:
if 'foobar' in path.parts:
return DirectoryState.Excluded
d = MyDirectories()
p1 = Path(str(tmpdir))
io.mkdir(p1 + 'foobar')
io.open(p1 + 'foobar/somefile', 'w').close()
io.mkdir(p1 + 'foobaz')
io.open(p1 + 'foobaz/somefile', 'w').close()
p1['foobar'].mkdir()
p1['foobar']['somefile'].touch()
p1['foobaz'].mkdir()
p1['foobaz']['somefile'].touch()
d.add_path(p1)
eq_(d.get_state(p1 + 'foobaz'), DirectoryState.Normal)
eq_(d.get_state(p1 + 'foobar'), DirectoryState.Excluded)
eq_(d.get_state(p1['foobaz']), DirectoryState.Normal)
eq_(d.get_state(p1['foobar']), DirectoryState.Excluded)
eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there
# However, the default state can be changed
d.set_state(p1 + 'foobar', DirectoryState.Normal)
eq_(d.get_state(p1 + 'foobar'), DirectoryState.Normal)
d.set_state(p1['foobar'], DirectoryState.Normal)
eq_(d.get_state(p1['foobar']), DirectoryState.Normal)
eq_(len(list(d.get_files())), 2)

View File

@ -7,8 +7,8 @@
# http://www.hardcoded.net/licenses/bsd_license
import hashlib
from pathlib import Path
from hscommon.path import Path
from hscommon.testutil import eq_
from core.tests.directories_test import create_fake_fs
@ -25,12 +25,12 @@ def test_md5_aggregate_subfiles_sorted(tmpdir):
#same order everytime.
p = create_fake_fs(Path(str(tmpdir)))
b = fs.Folder(p)
md51 = fs.File(p + ('dir1', 'file1.test')).md5
md52 = fs.File(p + ('dir2', 'file2.test')).md5
md53 = fs.File(p + ('dir3', 'file3.test')).md5
md54 = fs.File(p + 'file1.test').md5
md55 = fs.File(p + 'file2.test').md5
md56 = fs.File(p + 'file3.test').md5
md51 = fs.File(p['dir1']['file1.test']).md5
md52 = fs.File(p['dir2']['file2.test']).md5
md53 = fs.File(p['dir3']['file3.test']).md5
md54 = fs.File(p['file1.test']).md5
md55 = fs.File(p['file2.test']).md5
md56 = fs.File(p['file3.test']).md5
# The expected md5 is the md5 of md5s for folders and the direct md5 for files
folder_md51 = hashlib.md5(md51).digest()
folder_md52 = hashlib.md5(md52).digest()

View File

@ -429,7 +429,7 @@ class TestCaseResultsXML:
self.results.groups = self.groups
def get_file(self, path): # use this as a callback for load_from_xml
return [o for o in self.objects if o.path == path][0]
return [o for o in self.objects if str(o.path) == path][0]
def test_save_to_xml(self):
self.objects[0].is_ref = True

View File

@ -7,8 +7,7 @@
# http://www.hardcoded.net/licenses/bsd_license
from jobprogress import job
from hscommon import io
from hscommon.path import Path
from pathlib import Path
from hscommon.testutil import eq_
from .. import fs
@ -21,7 +20,7 @@ class NamedObject:
if path is None:
path = Path(name)
else:
path = Path(path) + name
path = Path(path, name)
self.name = name
self.size = size
self.path = path
@ -37,7 +36,6 @@ def pytest_funcarg__fake_fileexists(request):
# This is a hack to avoid invalidating all previous tests since the scanner started to test
# for file existence before doing the match grouping.
monkeypatch = request.getfuncargvalue('monkeypatch')
monkeypatch.setattr(io, 'exists', lambda _: True)
monkeypatch.setattr(Path, 'exists', lambda _: True)
def test_empty(fake_fileexists):
@ -471,11 +469,11 @@ def test_dont_group_files_that_dont_exist(tmpdir):
s = Scanner()
s.scan_type = ScanType.Contents
p = Path(str(tmpdir))
io.open(p + 'file1', 'w').write('foo')
io.open(p + 'file2', 'w').write('foo')
p['file1'].touch()
p['file2'].touch()
file1, file2 = fs.get_files(p)
def getmatches(*args, **kw):
io.remove(file2.path)
file2.path.unlink()
return [Match(file1, file2, 100)]
s._getmatches = getmatches