1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2025-03-10 05:34:36 +00:00

Stop using hsutil.testcase.

This commit is contained in:
Virgil Dupras 2011-01-05 11:11:21 +01:00
parent 2d423b2358
commit e0cc8ecda2
7 changed files with 1030 additions and 1016 deletions

View File

@ -7,16 +7,15 @@
# http://www.hardcoded.net/licenses/bsd_license # http://www.hardcoded.net/licenses/bsd_license
import os import os
import os.path as op
import logging import logging
from pytest import mark from pytest import mark
from hsutil.testutil import eq_
from hsutil.testcase import TestCase
from hsutil import io from hsutil import io
from hsutil.path import Path from hsutil.path import Path
from hsutil.decorators import log_calls from hsutil.decorators import log_calls
import hsutil.files import hsutil.files
from hscommon.testutil import CallLogger from hscommon.testutil import CallLogger, eq_
from jobprogress.job import nulljob, Job, JobCancelled from jobprogress.job import nulljob, Job, JobCancelled
from . import data from . import data
@ -45,62 +44,61 @@ def add_fake_files_to_directories(directories, files):
directories.get_files = lambda: iter(files) directories.get_files = lambda: iter(files)
directories._dirs.append('this is just so Scan() doesnt return 3') directories._dirs.append('this is just so Scan() doesnt return 3')
class TCDupeGuru(TestCase): class TestCaseDupeGuru:
cls_tested_module = app def test_apply_filter_calls_results_apply_filter(self, monkeypatch):
def test_apply_filter_calls_results_apply_filter(self):
app = DupeGuru() app = DupeGuru()
self.mock(app.results, 'apply_filter', log_calls(app.results.apply_filter)) monkeypatch.setattr(app.results, 'apply_filter', log_calls(app.results.apply_filter))
app.apply_filter('foo') app.apply_filter('foo')
self.assertEqual(2, len(app.results.apply_filter.calls)) eq_(2, len(app.results.apply_filter.calls))
call = app.results.apply_filter.calls[0] call = app.results.apply_filter.calls[0]
self.assert_(call['filter_str'] is None) assert call['filter_str'] is None
call = app.results.apply_filter.calls[1] call = app.results.apply_filter.calls[1]
self.assertEqual('foo', call['filter_str']) eq_('foo', call['filter_str'])
def test_apply_filter_escapes_regexp(self): def test_apply_filter_escapes_regexp(self, monkeypatch):
app = DupeGuru() app = DupeGuru()
self.mock(app.results, 'apply_filter', log_calls(app.results.apply_filter)) monkeypatch.setattr(app.results, 'apply_filter', log_calls(app.results.apply_filter))
app.apply_filter('()[]\\.|+?^abc') app.apply_filter('()[]\\.|+?^abc')
call = app.results.apply_filter.calls[1] call = app.results.apply_filter.calls[1]
self.assertEqual('\\(\\)\\[\\]\\\\\\.\\|\\+\\?\\^abc', call['filter_str']) eq_('\\(\\)\\[\\]\\\\\\.\\|\\+\\?\\^abc', call['filter_str'])
app.apply_filter('(*)') # In "simple mode", we want the * to behave as a wilcard app.apply_filter('(*)') # In "simple mode", we want the * to behave as a wilcard
call = app.results.apply_filter.calls[3] call = app.results.apply_filter.calls[3]
self.assertEqual('\(.*\)', call['filter_str']) eq_('\(.*\)', call['filter_str'])
app.options['escape_filter_regexp'] = False app.options['escape_filter_regexp'] = False
app.apply_filter('(abc)') app.apply_filter('(abc)')
call = app.results.apply_filter.calls[5] call = app.results.apply_filter.calls[5]
self.assertEqual('(abc)', call['filter_str']) eq_('(abc)', call['filter_str'])
def test_copy_or_move(self): def test_copy_or_move(self, tmpdir, monkeypatch):
# The goal here is just to have a test for a previous blowup I had. I know my test coverage # The goal here is just to have a test for a previous blowup I had. I know my test coverage
# for this unit is pathetic. What's done is done. My approach now is to add tests for # for this unit is pathetic. What's done is done. My approach now is to add tests for
# every change I want to make. The blowup was caused by a missing import. # every change I want to make. The blowup was caused by a missing import.
p = self.tmppath() p = Path(str(tmpdir))
io.open(p + 'foo', 'w').close() io.open(p + 'foo', 'w').close()
self.mock(hsutil.files, 'copy', log_calls(lambda source_path, dest_path: None)) monkeypatch.setattr(hsutil.files, 'copy', log_calls(lambda source_path, dest_path: None))
self.mock(os, 'makedirs', lambda path: None) # We don't want the test to create that fake directory monkeypatch.setattr(os, 'makedirs', lambda path: None) # We don't want the test to create that fake directory
app = DupeGuru() app = DupeGuru()
app.directories.add_path(p) app.directories.add_path(p)
[f] = app.directories.get_files() [f] = app.directories.get_files()
app.copy_or_move(f, True, 'some_destination', 0) app.copy_or_move(f, True, 'some_destination', 0)
self.assertEqual(1, len(hsutil.files.copy.calls)) eq_(1, len(hsutil.files.copy.calls))
call = hsutil.files.copy.calls[0] call = hsutil.files.copy.calls[0]
self.assertEqual('some_destination', call['dest_path']) eq_('some_destination', call['dest_path'])
self.assertEqual(f.path, call['source_path']) eq_(f.path, call['source_path'])
def test_copy_or_move_clean_empty_dirs(self): def test_copy_or_move_clean_empty_dirs(self, tmpdir, monkeypatch):
tmppath = Path(self.tmpdir()) tmppath = Path(str(tmpdir))
sourcepath = tmppath + 'source' sourcepath = tmppath + 'source'
io.mkdir(sourcepath) io.mkdir(sourcepath)
io.open(sourcepath + 'myfile', 'w') io.open(sourcepath + 'myfile', 'w')
app = DupeGuru() app = DupeGuru()
app.directories.add_path(tmppath) app.directories.add_path(tmppath)
[myfile] = app.directories.get_files() [myfile] = app.directories.get_files()
self.mock(app, 'clean_empty_dirs', log_calls(lambda path: None)) monkeypatch.setattr(app, 'clean_empty_dirs', log_calls(lambda path: None))
app.copy_or_move(myfile, False, tmppath + 'dest', 0) app.copy_or_move(myfile, False, tmppath + 'dest', 0)
calls = app.clean_empty_dirs.calls calls = app.clean_empty_dirs.calls
self.assertEqual(1, len(calls)) eq_(1, len(calls))
self.assertEqual(sourcepath, calls[0]['path']) eq_(sourcepath, calls[0]['path'])
def test_Scan_with_objects_evaluating_to_false(self): def test_Scan_with_objects_evaluating_to_false(self):
class FakeFile(fs.File): class FakeFile(fs.File):
@ -117,10 +115,10 @@ class TCDupeGuru(TestCase):
app.start_scanning() # no exception app.start_scanning() # no exception
@mark.skipif("not hasattr(os, 'link')") @mark.skipif("not hasattr(os, 'link')")
def test_ignore_hardlink_matches(self): def test_ignore_hardlink_matches(self, tmpdir):
# If the ignore_hardlink_matches option is set, don't match files hardlinking to the same # If the ignore_hardlink_matches option is set, don't match files hardlinking to the same
# inode. # inode.
tmppath = Path(self.tmpdir()) tmppath = Path(str(tmpdir))
io.open(tmppath + 'myfile', 'w').write('foo') io.open(tmppath + 'myfile', 'w').write('foo')
os.link(str(tmppath + 'myfile'), str(tmppath + 'hardlink')) os.link(str(tmppath + 'myfile'), str(tmppath + 'hardlink'))
app = DupeGuru() app = DupeGuru()
@ -131,42 +129,42 @@ class TCDupeGuru(TestCase):
eq_(len(app.results.groups), 0) eq_(len(app.results.groups), 0)
class TCDupeGuru_clean_empty_dirs(TestCase): class TestCaseDupeGuru_clean_empty_dirs:
cls_tested_module = app def pytest_funcarg__do_setup(self, request):
def setUp(self): monkeypatch = request.getfuncargvalue('monkeypatch')
self.mock(hsutil.files, 'delete_if_empty', log_calls(lambda path, files_to_delete=[]: None)) monkeypatch.setattr(hsutil.files, 'delete_if_empty', log_calls(lambda path, files_to_delete=[]: None))
self.app = DupeGuru() self.app = DupeGuru()
def test_option_off(self): def test_option_off(self, do_setup):
self.app.clean_empty_dirs(Path('/foo/bar')) self.app.clean_empty_dirs(Path('/foo/bar'))
self.assertEqual(0, len(hsutil.files.delete_if_empty.calls)) eq_(0, len(hsutil.files.delete_if_empty.calls))
def test_option_on(self): def test_option_on(self, do_setup):
self.app.options['clean_empty_dirs'] = True self.app.options['clean_empty_dirs'] = True
self.app.clean_empty_dirs(Path('/foo/bar')) self.app.clean_empty_dirs(Path('/foo/bar'))
calls = hsutil.files.delete_if_empty.calls calls = hsutil.files.delete_if_empty.calls
self.assertEqual(1, len(calls)) eq_(1, len(calls))
self.assertEqual(Path('/foo/bar'), calls[0]['path']) eq_(Path('/foo/bar'), calls[0]['path'])
self.assertEqual(['.DS_Store'], calls[0]['files_to_delete']) eq_(['.DS_Store'], calls[0]['files_to_delete'])
def test_recurse_up(self): def test_recurse_up(self, do_setup, monkeypatch):
# delete_if_empty must be recursively called up in the path until it returns False # delete_if_empty must be recursively called up in the path until it returns False
@log_calls @log_calls
def mock_delete_if_empty(path, files_to_delete=[]): def mock_delete_if_empty(path, files_to_delete=[]):
return len(path) > 1 return len(path) > 1
self.mock(hsutil.files, 'delete_if_empty', mock_delete_if_empty) monkeypatch.setattr(hsutil.files, 'delete_if_empty', mock_delete_if_empty)
self.app.options['clean_empty_dirs'] = True self.app.options['clean_empty_dirs'] = True
self.app.clean_empty_dirs(Path('not-empty/empty/empty')) self.app.clean_empty_dirs(Path('not-empty/empty/empty'))
calls = hsutil.files.delete_if_empty.calls calls = hsutil.files.delete_if_empty.calls
self.assertEqual(3, len(calls)) eq_(3, len(calls))
self.assertEqual(Path('not-empty/empty/empty'), calls[0]['path']) eq_(Path('not-empty/empty/empty'), calls[0]['path'])
self.assertEqual(Path('not-empty/empty'), calls[1]['path']) eq_(Path('not-empty/empty'), calls[1]['path'])
self.assertEqual(Path('not-empty'), calls[2]['path']) eq_(Path('not-empty'), calls[2]['path'])
class TCDupeGuruWithResults(TestCase): class TestCaseDupeGuruWithResults:
def setUp(self): def pytest_funcarg__do_setup(self, request):
self.app = DupeGuru() self.app = DupeGuru()
self.objects,self.matches,self.groups = GetTestGroups() self.objects,self.matches,self.groups = GetTestGroups()
self.app.results.groups = self.groups self.app.results.groups = self.groups
@ -179,12 +177,13 @@ class TCDupeGuruWithResults(TestCase):
self.dpanel.connect() self.dpanel.connect()
self.dtree.connect() self.dtree.connect()
self.rtable.connect() self.rtable.connect()
tmppath = self.tmppath() tmpdir = request.getfuncargvalue('tmpdir')
tmppath = Path(str(tmpdir))
io.mkdir(tmppath + 'foo') io.mkdir(tmppath + 'foo')
io.mkdir(tmppath + 'bar') io.mkdir(tmppath + 'bar')
self.app.directories.add_path(tmppath) self.app.directories.add_path(tmppath)
def test_GetObjects(self): def test_GetObjects(self, do_setup):
objects = self.objects objects = self.objects
groups = self.groups groups = self.groups
r = self.rtable[0] r = self.rtable[0]
@ -197,7 +196,7 @@ class TCDupeGuruWithResults(TestCase):
assert r._group is groups[1] assert r._group is groups[1]
assert r._dupe is objects[4] assert r._dupe is objects[4]
def test_GetObjects_after_sort(self): def test_GetObjects_after_sort(self, do_setup):
objects = self.objects objects = self.objects
groups = self.groups[:] # we need an un-sorted reference groups = self.groups[:] # we need an un-sorted reference
self.rtable.sort(0, False) #0 = Filename self.rtable.sort(0, False) #0 = Filename
@ -205,14 +204,14 @@ class TCDupeGuruWithResults(TestCase):
assert r._group is groups[1] assert r._group is groups[1]
assert r._dupe is objects[4] assert r._dupe is objects[4]
def test_selected_result_node_paths_after_deletion(self): def test_selected_result_node_paths_after_deletion(self, do_setup):
# cases where the selected dupes aren't there are correctly handled # cases where the selected dupes aren't there are correctly handled
self.rtable.select([1, 2, 3]) self.rtable.select([1, 2, 3])
self.app.remove_selected() self.app.remove_selected()
# The first 2 dupes have been removed. The 3rd one is a ref. it stays there, in first pos. # The first 2 dupes have been removed. The 3rd one is a ref. it stays there, in first pos.
eq_(self.rtable.selected_indexes, [1]) # no exception eq_(self.rtable.selected_indexes, [1]) # no exception
def test_selectResultNodePaths(self): def test_selectResultNodePaths(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
self.rtable.select([1, 2]) self.rtable.select([1, 2])
@ -220,7 +219,7 @@ class TCDupeGuruWithResults(TestCase):
assert app.selected_dupes[0] is objects[1] assert app.selected_dupes[0] is objects[1]
assert app.selected_dupes[1] is objects[2] assert app.selected_dupes[1] is objects[2]
def test_selectResultNodePaths_with_ref(self): def test_selectResultNodePaths_with_ref(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
self.rtable.select([1, 2, 3]) self.rtable.select([1, 2, 3])
@ -229,7 +228,7 @@ class TCDupeGuruWithResults(TestCase):
assert app.selected_dupes[1] is objects[2] assert app.selected_dupes[1] is objects[2]
assert app.selected_dupes[2] is self.groups[1].ref assert app.selected_dupes[2] is self.groups[1].ref
def test_selectResultNodePaths_after_sort(self): def test_selectResultNodePaths_after_sort(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
groups = self.groups[:] #To keep the old order in memory groups = self.groups[:] #To keep the old order in memory
@ -241,14 +240,14 @@ class TCDupeGuruWithResults(TestCase):
assert app.selected_dupes[1] is groups[0].ref assert app.selected_dupes[1] is groups[0].ref
assert app.selected_dupes[2] is objects[1] assert app.selected_dupes[2] is objects[1]
def test_selected_powermarker_node_paths(self): def test_selected_powermarker_node_paths(self, do_setup):
# app.selected_dupes is correctly converted into paths # app.selected_dupes is correctly converted into paths
self.rtable.power_marker = True self.rtable.power_marker = True
self.rtable.select([0, 1, 2]) self.rtable.select([0, 1, 2])
self.rtable.power_marker = False self.rtable.power_marker = False
eq_(self.rtable.selected_indexes, [1, 2, 4]) eq_(self.rtable.selected_indexes, [1, 2, 4])
def test_selected_powermarker_node_paths_after_deletion(self): def test_selected_powermarker_node_paths_after_deletion(self, do_setup):
# cases where the selected dupes aren't there are correctly handled # cases where the selected dupes aren't there are correctly handled
app = self.app app = self.app
self.rtable.power_marker = True self.rtable.power_marker = True
@ -256,7 +255,7 @@ class TCDupeGuruWithResults(TestCase):
app.remove_selected() app.remove_selected()
eq_(self.rtable.selected_indexes, []) # no exception eq_(self.rtable.selected_indexes, []) # no exception
def test_selectPowerMarkerRows_after_sort(self): def test_selectPowerMarkerRows_after_sort(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
self.rtable.power_marker = True self.rtable.power_marker = True
@ -267,7 +266,7 @@ class TCDupeGuruWithResults(TestCase):
assert app.selected_dupes[1] is objects[2] assert app.selected_dupes[1] is objects[2]
assert app.selected_dupes[2] is objects[1] assert app.selected_dupes[2] is objects[1]
def test_toggleSelectedMark(self): def test_toggleSelectedMark(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
app.toggle_selected_mark_state() app.toggle_selected_mark_state()
@ -281,7 +280,7 @@ class TCDupeGuruWithResults(TestCase):
assert not app.results.is_marked(objects[3]) assert not app.results.is_marked(objects[3])
assert app.results.is_marked(objects[4]) assert app.results.is_marked(objects[4])
def test_refreshDetailsWithSelected(self): def test_refreshDetailsWithSelected(self, do_setup):
self.rtable.select([1, 4]) self.rtable.select([1, 4])
eq_(self.dpanel.row(0), ('Filename', 'bar bleh', 'foo bar')) eq_(self.dpanel.row(0), ('Filename', 'bar bleh', 'foo bar'))
self.dpanel_gui.check_gui_calls(['refresh']) self.dpanel_gui.check_gui_calls(['refresh'])
@ -289,7 +288,7 @@ class TCDupeGuruWithResults(TestCase):
eq_(self.dpanel.row(0), ('Filename', '---', '---')) eq_(self.dpanel.row(0), ('Filename', '---', '---'))
self.dpanel_gui.check_gui_calls(['refresh']) self.dpanel_gui.check_gui_calls(['refresh'])
def test_makeSelectedReference(self): def test_makeSelectedReference(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
groups = self.groups groups = self.groups
@ -298,7 +297,7 @@ class TCDupeGuruWithResults(TestCase):
assert groups[0].ref is objects[1] assert groups[0].ref is objects[1]
assert groups[1].ref is objects[4] assert groups[1].ref is objects[4]
def test_makeSelectedReference_by_selecting_two_dupes_in_the_same_group(self): def test_makeSelectedReference_by_selecting_two_dupes_in_the_same_group(self, do_setup):
app = self.app app = self.app
objects = self.objects objects = self.objects
groups = self.groups groups = self.groups
@ -308,7 +307,7 @@ class TCDupeGuruWithResults(TestCase):
assert groups[0].ref is objects[1] assert groups[0].ref is objects[1]
assert groups[1].ref is objects[4] assert groups[1].ref is objects[4]
def test_removeSelected(self): def test_removeSelected(self, do_setup):
app = self.app app = self.app
self.rtable.select([1, 4]) self.rtable.select([1, 4])
app.remove_selected() app.remove_selected()
@ -316,22 +315,25 @@ class TCDupeGuruWithResults(TestCase):
app.remove_selected() app.remove_selected()
eq_(len(app.results.dupes), 0) eq_(len(app.results.dupes), 0)
def test_addDirectory_simple(self): def test_addDirectory_simple(self, do_setup):
# There's already a directory in self.app, so adding another once makes 2 of em # There's already a directory in self.app, so adding another once makes 2 of em
app = self.app app = self.app
eq_(app.add_directory(self.datadirpath()), 0) # any other path that isn't a parent or child of the already added path
otherpath = Path(op.dirname(__file__))
eq_(app.add_directory(otherpath), 0)
eq_(len(app.directories), 2) eq_(len(app.directories), 2)
def test_addDirectory_already_there(self): def test_addDirectory_already_there(self, do_setup):
app = self.app app = self.app
self.assertEqual(0,app.add_directory(self.datadirpath())) otherpath = Path(op.dirname(__file__))
self.assertEqual(1,app.add_directory(self.datadirpath())) eq_(app.add_directory(otherpath), 0)
eq_(app.add_directory(otherpath), 1)
def test_addDirectory_does_not_exist(self): def test_addDirectory_does_not_exist(self, do_setup):
app = self.app app = self.app
self.assertEqual(2,app.add_directory('/does_not_exist')) eq_(2,app.add_directory('/does_not_exist'))
def test_ignore(self): def test_ignore(self, do_setup):
app = self.app app = self.app
self.rtable.select([4]) #The dupe of the second, 2 sized group self.rtable.select([4]) #The dupe of the second, 2 sized group
app.add_selected_to_ignore_list() app.add_selected_to_ignore_list()
@ -341,20 +343,22 @@ class TCDupeGuruWithResults(TestCase):
#BOTH the ref and the other dupe should have been added #BOTH the ref and the other dupe should have been added
eq_(len(app.scanner.ignore_list), 3) eq_(len(app.scanner.ignore_list), 3)
def test_purgeIgnoreList(self): def test_purgeIgnoreList(self, do_setup, tmpdir):
app = self.app app = self.app
p1 = self.filepath('zerofile') p1 = str(tmpdir.join('file1'))
p2 = self.filepath('zerofill') p2 = str(tmpdir.join('file2'))
open(p1, 'w').close()
open(p2, 'w').close()
dne = '/does_not_exist' dne = '/does_not_exist'
app.scanner.ignore_list.Ignore(dne,p1) app.scanner.ignore_list.Ignore(dne,p1)
app.scanner.ignore_list.Ignore(p2,dne) app.scanner.ignore_list.Ignore(p2,dne)
app.scanner.ignore_list.Ignore(p1,p2) app.scanner.ignore_list.Ignore(p1,p2)
app.purge_ignore_list() app.purge_ignore_list()
self.assertEqual(1,len(app.scanner.ignore_list)) eq_(1,len(app.scanner.ignore_list))
self.assert_(app.scanner.ignore_list.AreIgnored(p1,p2)) assert app.scanner.ignore_list.AreIgnored(p1,p2)
self.assert_(not app.scanner.ignore_list.AreIgnored(dne,p1)) assert not app.scanner.ignore_list.AreIgnored(dne,p1)
def test_only_unicode_is_added_to_ignore_list(self): def test_only_unicode_is_added_to_ignore_list(self, do_setup):
def FakeIgnore(first,second): def FakeIgnore(first,second):
if not isinstance(first,str): if not isinstance(first,str):
self.fail() self.fail()
@ -366,7 +370,7 @@ class TCDupeGuruWithResults(TestCase):
self.rtable.select([4]) self.rtable.select([4])
app.add_selected_to_ignore_list() app.add_selected_to_ignore_list()
def test_cancel_scan_with_previous_results(self): def test_cancel_scan_with_previous_results(self, do_setup):
# When doing a scan with results being present prior to the scan, correctly invalidate the # When doing a scan with results being present prior to the scan, correctly invalidate the
# results table. # results table.
app = self.app app = self.app
@ -375,9 +379,10 @@ class TCDupeGuruWithResults(TestCase):
app.start_scanning() # will be cancelled immediately app.start_scanning() # will be cancelled immediately
eq_(len(self.rtable), 0) eq_(len(self.rtable), 0)
class TCDupeGuru_renameSelected(TestCase): class TestCaseDupeGuru_renameSelected:
def setUp(self): def pytest_funcarg__do_setup(self, request):
p = self.tmppath() tmpdir = request.getfuncargvalue('tmpdir')
p = Path(str(tmpdir))
fp = open(str(p + 'foo bar 1'),mode='w') fp = open(str(p + 'foo bar 1'),mode='w')
fp.close() fp.close()
fp = open(str(p + 'foo bar 2'),mode='w') fp = open(str(p + 'foo bar 2'),mode='w')
@ -399,7 +404,7 @@ class TCDupeGuru_renameSelected(TestCase):
self.rtable = ResultTable(self.rtable_gui, self.app) self.rtable = ResultTable(self.rtable_gui, self.app)
self.rtable.connect() self.rtable.connect()
def test_simple(self): def test_simple(self, do_setup):
app = self.app app = self.app
g = self.groups[0] g = self.groups[0]
self.rtable.select([1]) self.rtable.select([1])
@ -409,11 +414,11 @@ class TCDupeGuru_renameSelected(TestCase):
assert 'foo bar 2' not in names assert 'foo bar 2' not in names
eq_(g.dupes[0].name, 'renamed') eq_(g.dupes[0].name, 'renamed')
def test_none_selected(self): def test_none_selected(self, do_setup, monkeypatch):
app = self.app app = self.app
g = self.groups[0] g = self.groups[0]
self.rtable.select([]) self.rtable.select([])
self.mock(logging, 'warning', log_calls(lambda msg: None)) monkeypatch.setattr(logging, 'warning', log_calls(lambda msg: None))
assert not app.rename_selected('renamed') assert not app.rename_selected('renamed')
msg = logging.warning.calls[0]['msg'] msg = logging.warning.calls[0]['msg']
eq_('dupeGuru Warning: list index out of range', msg) eq_('dupeGuru Warning: list index out of range', msg)
@ -422,11 +427,11 @@ class TCDupeGuru_renameSelected(TestCase):
assert 'foo bar 2' in names assert 'foo bar 2' in names
eq_(g.dupes[0].name, 'foo bar 2') eq_(g.dupes[0].name, 'foo bar 2')
def test_name_already_exists(self): def test_name_already_exists(self, do_setup, monkeypatch):
app = self.app app = self.app
g = self.groups[0] g = self.groups[0]
self.rtable.select([1]) self.rtable.select([1])
self.mock(logging, 'warning', log_calls(lambda msg: None)) monkeypatch.setattr(logging, 'warning', log_calls(lambda msg: None))
assert not app.rename_selected('foo bar 1') assert not app.rename_selected('foo bar 1')
msg = logging.warning.calls[0]['msg'] msg = logging.warning.calls[0]['msg']
assert msg.startswith('dupeGuru Warning: \'foo bar 1\' already exists in') assert msg.startswith('dupeGuru Warning: \'foo bar 1\' already exists in')

View File

@ -6,20 +6,20 @@
# which should be included with this package. The terms are also available at # which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license # http://www.hardcoded.net/licenses/bsd_license
import os.path as op
import os import os
import time import time
import tempfile
import shutil
from pytest import raises
from hsutil import io from hsutil import io
from hsutil.path import Path from hsutil.path import Path
from hsutil.testutil import eq_ from hscommon.testutil import eq_
from hsutil.testcase import TestCase
from ..directories import * from ..directories import *
testpath = Path(TestCase.datadirpath())
def create_fake_fs(rootpath): def create_fake_fs(rootpath):
# We have it as a separate function because other units are using it.
rootpath = rootpath + 'fs' rootpath = rootpath + 'fs'
io.mkdir(rootpath) io.mkdir(rootpath)
io.mkdir(rootpath + 'dir1') io.mkdir(rootpath + 'dir1')
@ -45,233 +45,253 @@ def create_fake_fs(rootpath):
fp.close() fp.close()
return rootpath return rootpath
class TCDirectories(TestCase): def setup_module(module):
def test_empty(self): # In this unit, we have tests depending on two directory structure. One with only one file in it
d = Directories() # and another with a more complex structure.
self.assertEqual(0,len(d)) testpath = Path(tempfile.mkdtemp())
self.assert_('foobar' not in d) module.testpath = testpath
rootpath = testpath + 'onefile'
def test_add_path(self): io.mkdir(rootpath)
d = Directories() fp = io.open(rootpath + 'test.txt', 'w')
p = testpath + 'utils' fp.write('test_data')
fp.close()
create_fake_fs(testpath)
def teardown_module(module):
shutil.rmtree(str(module.testpath))
def test_empty():
d = Directories()
eq_(len(d), 0)
assert 'foobar' not in d
def test_add_path():
d = Directories()
p = testpath + 'onefile'
d.add_path(p)
eq_(1,len(d))
assert p in d
assert (p + 'foobar') in d
assert p[:-1] not in d
p = testpath + 'fs'
d.add_path(p)
eq_(2,len(d))
assert p in d
def test_AddPath_when_path_is_already_there():
d = Directories()
p = testpath + 'onefile'
d.add_path(p)
with raises(AlreadyThereError):
d.add_path(p) d.add_path(p)
self.assertEqual(1,len(d)) with raises(AlreadyThereError):
self.assert_(p in d) d.add_path(p + 'foobar')
self.assert_((p + 'foobar') in d) eq_(1, len(d))
self.assert_(p[:-1] not in d)
p = self.tmppath() def test_add_path_containing_paths_already_there():
d.add_path(p) d = Directories()
self.assertEqual(2,len(d)) d.add_path(testpath + 'onefile')
self.assert_(p in d) eq_(1, len(d))
d.add_path(testpath)
def test_AddPath_when_path_is_already_there(self): eq_(len(d), 1)
d = Directories() eq_(d[0], testpath)
p = testpath + 'utils'
d.add_path(p) def test_AddPath_non_latin(tmpdir):
self.assertRaises(AlreadyThereError, d.add_path, p) p = Path(str(tmpdir))
self.assertRaises(AlreadyThereError, d.add_path, p + 'foobar') to_add = p + 'unicode\u201a'
self.assertEqual(1, len(d)) os.mkdir(str(to_add))
d = Directories()
def test_add_path_containing_paths_already_there(self): try:
d = Directories() d.add_path(to_add)
d.add_path(testpath + 'utils') except UnicodeDecodeError:
self.assertEqual(1, len(d)) assert False
d.add_path(testpath)
eq_(len(d), 1) def test_del():
eq_(d[0], testpath) d = Directories()
d.add_path(testpath + 'onefile')
def test_AddPath_non_latin(self): try:
p = Path(self.tmpdir())
to_add = p + 'unicode\u201a'
os.mkdir(str(to_add))
d = Directories()
try:
d.add_path(to_add)
except UnicodeDecodeError:
self.fail()
def test_del(self):
d = Directories()
d.add_path(testpath + 'utils')
try:
del d[1]
self.fail()
except IndexError:
pass
d.add_path(self.tmppath())
del d[1] del d[1]
self.assertEqual(1, len(d)) assert False
except IndexError:
def test_states(self): pass
d = Directories() d.add_path(testpath + 'fs')
p = testpath + 'utils' del d[1]
eq_(1, len(d))
def test_states():
d = Directories()
p = testpath + 'onefile'
d.add_path(p)
eq_(STATE_NORMAL,d.get_state(p))
d.set_state(p,STATE_REFERENCE)
eq_(STATE_REFERENCE,d.get_state(p))
eq_(STATE_REFERENCE,d.get_state(p + 'dir1'))
eq_(1,len(d.states))
eq_(p,list(d.states.keys())[0])
eq_(STATE_REFERENCE,d.states[p])
def test_get_state_with_path_not_there():
# When the path's not there, just return STATE_NORMAL
d = Directories()
d.add_path(testpath + 'onefile')
eq_(d.get_state(testpath), STATE_NORMAL)
def test_states_remain_when_larger_directory_eat_smaller_ones():
d = Directories()
p = testpath + 'onefile'
d.add_path(p)
d.set_state(p,STATE_EXCLUDED)
d.add_path(testpath)
d.set_state(testpath,STATE_REFERENCE)
eq_(STATE_EXCLUDED,d.get_state(p))
eq_(STATE_EXCLUDED,d.get_state(p + 'dir1'))
eq_(STATE_REFERENCE,d.get_state(testpath))
def test_set_state_keep_state_dict_size_to_minimum():
d = Directories()
p = testpath + 'fs'
d.add_path(p)
d.set_state(p,STATE_REFERENCE)
d.set_state(p + 'dir1',STATE_REFERENCE)
eq_(1,len(d.states))
eq_(STATE_REFERENCE,d.get_state(p + 'dir1'))
d.set_state(p + 'dir1',STATE_NORMAL)
eq_(2,len(d.states))
eq_(STATE_NORMAL,d.get_state(p + 'dir1'))
d.set_state(p + 'dir1',STATE_REFERENCE)
eq_(1,len(d.states))
eq_(STATE_REFERENCE,d.get_state(p + 'dir1'))
def test_get_files():
d = Directories()
p = testpath + 'fs'
d.add_path(p)
d.set_state(p + 'dir1',STATE_REFERENCE)
d.set_state(p + 'dir2',STATE_EXCLUDED)
files = list(d.get_files())
eq_(5, len(files))
for f in files:
if f.path[:-1] == p + 'dir1':
assert f.is_ref
else:
assert not f.is_ref
def test_get_files_with_inherited_exclusion():
d = Directories()
p = testpath + 'onefile'
d.add_path(p)
d.set_state(p,STATE_EXCLUDED)
eq_([], list(d.get_files()))
def test_save_and_load(tmpdir):
d1 = Directories()
d2 = Directories()
p1 = Path(str(tmpdir.join('p1')))
io.mkdir(p1)
p2 = Path(str(tmpdir.join('p2')))
io.mkdir(p2)
d1.add_path(p1)
d1.add_path(p2)
d1.set_state(p1, STATE_REFERENCE)
d1.set_state(p1 + 'dir1',STATE_EXCLUDED)
tmpxml = str(tmpdir.join('directories_testunit.xml'))
d1.save_to_file(tmpxml)
d2.load_from_file(tmpxml)
eq_(2, len(d2))
eq_(STATE_REFERENCE,d2.get_state(p1))
eq_(STATE_EXCLUDED,d2.get_state(p1 + 'dir1'))
def test_invalid_path():
d = Directories()
p = Path('does_not_exist')
with raises(InvalidPathError):
d.add_path(p) d.add_path(p)
self.assertEqual(STATE_NORMAL,d.get_state(p)) eq_(0, len(d))
d.set_state(p,STATE_REFERENCE)
self.assertEqual(STATE_REFERENCE,d.get_state(p)) def test_set_state_on_invalid_path():
self.assertEqual(STATE_REFERENCE,d.get_state(p + 'dir1')) d = Directories()
self.assertEqual(1,len(d.states)) try:
self.assertEqual(p,list(d.states.keys())[0]) d.set_state(Path('foobar',),STATE_NORMAL)
self.assertEqual(STATE_REFERENCE,d.states[p]) except LookupError:
assert False
def test_get_state_with_path_not_there(self):
# When the path's not there, just return STATE_NORMAL def test_load_from_file_with_invalid_path(tmpdir):
d = Directories() #This test simulates a load from file resulting in a
d.add_path(testpath + 'utils') #InvalidPath raise. Other directories must be loaded.
eq_(d.get_state(testpath), STATE_NORMAL) d1 = Directories()
d1.add_path(testpath + 'onefile')
def test_states_remain_when_larger_directory_eat_smaller_ones(self): #Will raise InvalidPath upon loading
d = Directories() p = Path(str(tmpdir.join('toremove')))
p = testpath + 'utils' io.mkdir(p)
d.add_path(p) d1.add_path(p)
d.set_state(p,STATE_EXCLUDED) io.rmdir(p)
d.add_path(testpath) tmpxml = str(tmpdir.join('directories_testunit.xml'))
d.set_state(testpath,STATE_REFERENCE) d1.save_to_file(tmpxml)
self.assertEqual(STATE_EXCLUDED,d.get_state(p)) d2 = Directories()
self.assertEqual(STATE_EXCLUDED,d.get_state(p + 'dir1')) d2.load_from_file(tmpxml)
self.assertEqual(STATE_REFERENCE,d.get_state(testpath)) eq_(1, len(d2))
def test_set_state_keep_state_dict_size_to_minimum(self): def test_unicode_save(tmpdir):
d = Directories() d = Directories()
p = create_fake_fs(self.tmppath()) p1 = Path(str(tmpdir)) + 'hello\xe9'
d.add_path(p) io.mkdir(p1)
d.set_state(p,STATE_REFERENCE) io.mkdir(p1 + 'foo\xe9')
d.set_state(p + 'dir1',STATE_REFERENCE) d.add_path(p1)
self.assertEqual(1,len(d.states)) d.set_state(p1 + 'foo\xe9', STATE_EXCLUDED)
self.assertEqual(STATE_REFERENCE,d.get_state(p + 'dir1')) tmpxml = str(tmpdir.join('directories_testunit.xml'))
d.set_state(p + 'dir1',STATE_NORMAL) try:
self.assertEqual(2,len(d.states)) d.save_to_file(tmpxml)
self.assertEqual(STATE_NORMAL,d.get_state(p + 'dir1')) except UnicodeDecodeError:
d.set_state(p + 'dir1',STATE_REFERENCE) assert False
self.assertEqual(1,len(d.states))
self.assertEqual(STATE_REFERENCE,d.get_state(p + 'dir1')) def test_get_files_refreshes_its_directories():
d = Directories()
def test_get_files(self): p = testpath + 'fs'
d = Directories() d.add_path(p)
p = create_fake_fs(self.tmppath()) files = d.get_files()
d.add_path(p) eq_(6, len(list(files)))
d.set_state(p + 'dir1',STATE_REFERENCE) time.sleep(1)
d.set_state(p + 'dir2',STATE_EXCLUDED) os.remove(str(p + ('dir1','file1.test')))
files = list(d.get_files()) files = d.get_files()
self.assertEqual(5, len(files)) eq_(5, len(list(files)))
for f in files:
if f.path[:-1] == p + 'dir1': def test_get_files_does_not_choke_on_non_existing_directories(tmpdir):
assert f.is_ref d = Directories()
else: p = Path(str(tmpdir))
assert not f.is_ref d.add_path(p)
io.rmtree(p)
def test_get_files_with_inherited_exclusion(self): eq_([], list(d.get_files()))
d = Directories()
p = testpath + 'utils' def test_get_state_returns_excluded_by_default_for_hidden_directories(tmpdir):
d.add_path(p) d = Directories()
d.set_state(p,STATE_EXCLUDED) p = Path(str(tmpdir))
self.assertEqual([], list(d.get_files())) hidden_dir_path = p + '.foo'
io.mkdir(p + '.foo')
def test_save_and_load(self): d.add_path(p)
d1 = Directories() eq_(d.get_state(hidden_dir_path), STATE_EXCLUDED)
d2 = Directories() # But it can be overriden
p1 = self.tmppath() d.set_state(hidden_dir_path, STATE_NORMAL)
p2 = self.tmppath() eq_(d.get_state(hidden_dir_path), STATE_NORMAL)
d1.add_path(p1)
d1.add_path(p2) def test_default_path_state_override(tmpdir):
d1.set_state(p1, STATE_REFERENCE) # It's possible for a subclass to override the default state of a path
d1.set_state(p1 + 'dir1',STATE_EXCLUDED) class MyDirectories(Directories):
tmpxml = op.join(self.tmpdir(), 'directories_testunit.xml') def _default_state_for_path(self, path):
d1.save_to_file(tmpxml) if 'foobar' in path:
d2.load_from_file(tmpxml) return STATE_EXCLUDED
self.assertEqual(2, len(d2))
self.assertEqual(STATE_REFERENCE,d2.get_state(p1))
self.assertEqual(STATE_EXCLUDED,d2.get_state(p1 + 'dir1'))
def test_invalid_path(self):
d = Directories()
p = Path('does_not_exist')
self.assertRaises(InvalidPathError, d.add_path, p)
self.assertEqual(0, len(d))
def test_set_state_on_invalid_path(self):
d = Directories()
try:
d.set_state(Path('foobar',),STATE_NORMAL)
except LookupError:
self.fail()
def test_load_from_file_with_invalid_path(self):
#This test simulates a load from file resulting in a
#InvalidPath raise. Other directories must be loaded.
d1 = Directories()
d1.add_path(testpath + 'utils')
#Will raise InvalidPath upon loading
p = self.tmppath()
d1.add_path(p)
io.rmdir(p)
tmpxml = op.join(self.tmpdir(), 'directories_testunit.xml')
d1.save_to_file(tmpxml)
d2 = Directories()
d2.load_from_file(tmpxml)
self.assertEqual(1, len(d2))
def test_unicode_save(self):
d = Directories()
p1 = self.tmppath() + 'hello\xe9'
io.mkdir(p1)
io.mkdir(p1 + 'foo\xe9')
d.add_path(p1)
d.set_state(p1 + 'foo\xe9', STATE_EXCLUDED)
tmpxml = op.join(self.tmpdir(), 'directories_testunit.xml')
try:
d.save_to_file(tmpxml)
except UnicodeDecodeError:
self.fail()
def test_get_files_refreshes_its_directories(self):
d = Directories()
p = create_fake_fs(self.tmppath())
d.add_path(p)
files = d.get_files()
self.assertEqual(6, len(list(files)))
time.sleep(1)
os.remove(str(p + ('dir1','file1.test')))
files = d.get_files()
self.assertEqual(5, len(list(files)))
def test_get_files_does_not_choke_on_non_existing_directories(self):
d = Directories()
p = Path(self.tmpdir())
d.add_path(p)
io.rmtree(p)
self.assertEqual([], list(d.get_files()))
def test_get_state_returns_excluded_by_default_for_hidden_directories(self):
d = Directories()
p = Path(self.tmpdir())
hidden_dir_path = p + '.foo'
io.mkdir(p + '.foo')
d.add_path(p)
self.assertEqual(d.get_state(hidden_dir_path), STATE_EXCLUDED)
# But it can be overriden
d.set_state(hidden_dir_path, STATE_NORMAL)
self.assertEqual(d.get_state(hidden_dir_path), STATE_NORMAL)
def test_default_path_state_override(self):
# It's possible for a subclass to override the default state of a path
class MyDirectories(Directories):
def _default_state_for_path(self, path):
if 'foobar' in path:
return STATE_EXCLUDED
d = MyDirectories()
p1 = self.tmppath()
io.mkdir(p1 + 'foobar')
io.open(p1 + 'foobar/somefile', 'w').close()
io.mkdir(p1 + 'foobaz')
io.open(p1 + 'foobaz/somefile', 'w').close()
d.add_path(p1)
eq_(d.get_state(p1 + 'foobaz'), STATE_NORMAL)
eq_(d.get_state(p1 + 'foobar'), STATE_EXCLUDED)
eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there
# However, the default state can be changed
d.set_state(p1 + 'foobar', STATE_NORMAL)
eq_(d.get_state(p1 + 'foobar'), STATE_NORMAL)
eq_(len(list(d.get_files())), 2)
d = MyDirectories()
p1 = Path(str(tmpdir))
io.mkdir(p1 + 'foobar')
io.open(p1 + 'foobar/somefile', 'w').close()
io.mkdir(p1 + 'foobaz')
io.open(p1 + 'foobaz/somefile', 'w').close()
d.add_path(p1)
eq_(d.get_state(p1 + 'foobaz'), STATE_NORMAL)
eq_(d.get_state(p1 + 'foobar'), STATE_EXCLUDED)
eq_(len(list(d.get_files())), 1) # only the 'foobaz' file is there
# However, the default state can be changed
d.set_state(p1 + 'foobar', STATE_NORMAL)
eq_(d.get_state(p1 + 'foobar'), STATE_NORMAL)
eq_(len(list(d.get_files())), 2)

View File

@ -12,12 +12,11 @@ from jobprogress import job
from hsutil.decorators import log_calls from hsutil.decorators import log_calls
from hsutil.misc import first from hsutil.misc import first
from hsutil.testutil import eq_ from hsutil.testutil import eq_
from hsutil.testcase import TestCase
from .. import engine from .. import engine
from ..engine import * from ..engine import *
class NamedObject(object): class NamedObject:
def __init__(self, name="foobar", with_words=False, size=1): def __init__(self, name="foobar", with_words=False, size=1):
self.name = name self.name = name
self.size = size self.size = size
@ -55,179 +54,179 @@ def assert_match(m, name1, name2):
eq_(m.first.name, name2) eq_(m.first.name, name2)
eq_(m.second.name, name1) eq_(m.second.name, name1)
class TCgetwords(TestCase): class TestCasegetwords:
def test_spaces(self): def test_spaces(self):
self.assertEqual(['a', 'b', 'c', 'd'], getwords("a b c d")) eq_(['a', 'b', 'c', 'd'], getwords("a b c d"))
self.assertEqual(['a', 'b', 'c', 'd'], getwords(" a b c d ")) eq_(['a', 'b', 'c', 'd'], getwords(" a b c d "))
def test_splitter_chars(self): def test_splitter_chars(self):
self.assertEqual( eq_(
[chr(i) for i in range(ord('a'),ord('z')+1)], [chr(i) for i in range(ord('a'),ord('z')+1)],
getwords("a-b_c&d+e(f)g;h\\i[j]k{l}m:n.o,p<q>r/s?t~u!v@w#x$y*z") getwords("a-b_c&d+e(f)g;h\\i[j]k{l}m:n.o,p<q>r/s?t~u!v@w#x$y*z")
) )
def test_joiner_chars(self): def test_joiner_chars(self):
self.assertEqual(["aec"], getwords("a'e\u0301c")) eq_(["aec"], getwords("a'e\u0301c"))
def test_empty(self): def test_empty(self):
self.assertEqual([], getwords('')) eq_([], getwords(''))
def test_returns_lowercase(self): def test_returns_lowercase(self):
self.assertEqual(['foo', 'bar'], getwords('FOO BAR')) eq_(['foo', 'bar'], getwords('FOO BAR'))
def test_decompose_unicode(self): def test_decompose_unicode(self):
self.assertEqual(getwords('foo\xe9bar'), ['fooebar']) eq_(getwords('foo\xe9bar'), ['fooebar'])
class TCgetfields(TestCase): class TestCasegetfields:
def test_simple(self): def test_simple(self):
self.assertEqual([['a', 'b'], ['c', 'd', 'e']], getfields('a b - c d e')) eq_([['a', 'b'], ['c', 'd', 'e']], getfields('a b - c d e'))
def test_empty(self): def test_empty(self):
self.assertEqual([], getfields('')) eq_([], getfields(''))
def test_cleans_empty_fields(self): def test_cleans_empty_fields(self):
expected = [['a', 'bc', 'def']] expected = [['a', 'bc', 'def']]
actual = getfields(' - a bc def') actual = getfields(' - a bc def')
self.assertEqual(expected, actual) eq_(expected, actual)
expected = [['bc', 'def']] expected = [['bc', 'def']]
class TCunpack_fields(TestCase): class TestCaseunpack_fields:
def test_with_fields(self): def test_with_fields(self):
expected = ['a', 'b', 'c', 'd', 'e', 'f'] expected = ['a', 'b', 'c', 'd', 'e', 'f']
actual = unpack_fields([['a'], ['b', 'c'], ['d', 'e', 'f']]) actual = unpack_fields([['a'], ['b', 'c'], ['d', 'e', 'f']])
self.assertEqual(expected, actual) eq_(expected, actual)
def test_without_fields(self): def test_without_fields(self):
expected = ['a', 'b', 'c', 'd', 'e', 'f'] expected = ['a', 'b', 'c', 'd', 'e', 'f']
actual = unpack_fields(['a', 'b', 'c', 'd', 'e', 'f']) actual = unpack_fields(['a', 'b', 'c', 'd', 'e', 'f'])
self.assertEqual(expected, actual) eq_(expected, actual)
def test_empty(self): def test_empty(self):
self.assertEqual([], unpack_fields([])) eq_([], unpack_fields([]))
class TCWordCompare(TestCase): class TestCaseWordCompare:
def test_list(self): def test_list(self):
self.assertEqual(100, compare(['a', 'b', 'c', 'd'],['a', 'b', 'c', 'd'])) eq_(100, compare(['a', 'b', 'c', 'd'],['a', 'b', 'c', 'd']))
self.assertEqual(86, compare(['a', 'b', 'c', 'd'],['a', 'b', 'c'])) eq_(86, compare(['a', 'b', 'c', 'd'],['a', 'b', 'c']))
def test_unordered(self): def test_unordered(self):
#Sometimes, users don't want fuzzy matching too much When they set the slider #Sometimes, users don't want fuzzy matching too much When they set the slider
#to 100, they don't expect a filename with the same words, but not the same order, to match. #to 100, they don't expect a filename with the same words, but not the same order, to match.
#Thus, we want to return 99 in that case. #Thus, we want to return 99 in that case.
self.assertEqual(99, compare(['a', 'b', 'c', 'd'], ['d', 'b', 'c', 'a'])) eq_(99, compare(['a', 'b', 'c', 'd'], ['d', 'b', 'c', 'a']))
def test_word_occurs_twice(self): def test_word_occurs_twice(self):
#if a word occurs twice in first, but once in second, we want the word to be only counted once #if a word occurs twice in first, but once in second, we want the word to be only counted once
self.assertEqual(89, compare(['a', 'b', 'c', 'd', 'a'], ['d', 'b', 'c', 'a'])) eq_(89, compare(['a', 'b', 'c', 'd', 'a'], ['d', 'b', 'c', 'a']))
def test_uses_copy_of_lists(self): def test_uses_copy_of_lists(self):
first = ['foo', 'bar'] first = ['foo', 'bar']
second = ['bar', 'bleh'] second = ['bar', 'bleh']
compare(first, second) compare(first, second)
self.assertEqual(['foo', 'bar'], first) eq_(['foo', 'bar'], first)
self.assertEqual(['bar', 'bleh'], second) eq_(['bar', 'bleh'], second)
def test_word_weight(self): def test_word_weight(self):
self.assertEqual(int((6.0 / 13.0) * 100), compare(['foo', 'bar'], ['bar', 'bleh'], (WEIGHT_WORDS, ))) eq_(int((6.0 / 13.0) * 100), compare(['foo', 'bar'], ['bar', 'bleh'], (WEIGHT_WORDS, )))
def test_similar_words(self): def test_similar_words(self):
self.assertEqual(100, compare(['the', 'white', 'stripes'],['the', 'whites', 'stripe'], (MATCH_SIMILAR_WORDS, ))) eq_(100, compare(['the', 'white', 'stripes'],['the', 'whites', 'stripe'], (MATCH_SIMILAR_WORDS, )))
def test_empty(self): def test_empty(self):
self.assertEqual(0, compare([], [])) eq_(0, compare([], []))
def test_with_fields(self): def test_with_fields(self):
self.assertEqual(67, compare([['a', 'b'], ['c', 'd', 'e']], [['a', 'b'], ['c', 'd', 'f']])) eq_(67, compare([['a', 'b'], ['c', 'd', 'e']], [['a', 'b'], ['c', 'd', 'f']]))
def test_propagate_flags_with_fields(self): def test_propagate_flags_with_fields(self, monkeypatch):
def mock_compare(first, second, flags): def mock_compare(first, second, flags):
self.assertEqual((0, 1, 2, 3, 5), flags) eq_((0, 1, 2, 3, 5), flags)
self.mock(engine, 'compare_fields', mock_compare) monkeypatch.setattr(engine, 'compare_fields', mock_compare)
compare([['a']], [['a']], (0, 1, 2, 3, 5)) compare([['a']], [['a']], (0, 1, 2, 3, 5))
class TCWordCompareWithFields(TestCase): class TestCaseWordCompareWithFields:
def test_simple(self): def test_simple(self):
self.assertEqual(67, compare_fields([['a', 'b'], ['c', 'd', 'e']], [['a', 'b'], ['c', 'd', 'f']])) eq_(67, compare_fields([['a', 'b'], ['c', 'd', 'e']], [['a', 'b'], ['c', 'd', 'f']]))
def test_empty(self): def test_empty(self):
self.assertEqual(0, compare_fields([], [])) eq_(0, compare_fields([], []))
def test_different_length(self): def test_different_length(self):
self.assertEqual(0, compare_fields([['a'], ['b']], [['a'], ['b'], ['c']])) eq_(0, compare_fields([['a'], ['b']], [['a'], ['b'], ['c']]))
def test_propagates_flags(self): def test_propagates_flags(self, monkeypatch):
def mock_compare(first, second, flags): def mock_compare(first, second, flags):
self.assertEqual((0, 1, 2, 3, 5), flags) eq_((0, 1, 2, 3, 5), flags)
self.mock(engine, 'compare_fields', mock_compare) monkeypatch.setattr(engine, 'compare_fields', mock_compare)
compare_fields([['a']], [['a']],(0, 1, 2, 3, 5)) compare_fields([['a']], [['a']],(0, 1, 2, 3, 5))
def test_order(self): def test_order(self):
first = [['a', 'b'], ['c', 'd', 'e']] first = [['a', 'b'], ['c', 'd', 'e']]
second = [['c', 'd', 'f'], ['a', 'b']] second = [['c', 'd', 'f'], ['a', 'b']]
self.assertEqual(0, compare_fields(first, second)) eq_(0, compare_fields(first, second))
def test_no_order(self): def test_no_order(self):
first = [['a','b'],['c','d','e']] first = [['a','b'],['c','d','e']]
second = [['c','d','f'],['a','b']] second = [['c','d','f'],['a','b']]
self.assertEqual(67, compare_fields(first, second, (NO_FIELD_ORDER, ))) eq_(67, compare_fields(first, second, (NO_FIELD_ORDER, )))
first = [['a','b'],['a','b']] #a field can only be matched once. first = [['a','b'],['a','b']] #a field can only be matched once.
second = [['c','d','f'],['a','b']] second = [['c','d','f'],['a','b']]
self.assertEqual(0, compare_fields(first, second, (NO_FIELD_ORDER, ))) eq_(0, compare_fields(first, second, (NO_FIELD_ORDER, )))
first = [['a','b'],['a','b','c']] first = [['a','b'],['a','b','c']]
second = [['c','d','f'],['a','b']] second = [['c','d','f'],['a','b']]
self.assertEqual(33, compare_fields(first, second, (NO_FIELD_ORDER, ))) eq_(33, compare_fields(first, second, (NO_FIELD_ORDER, )))
def test_compare_fields_without_order_doesnt_alter_fields(self): def test_compare_fields_without_order_doesnt_alter_fields(self):
#The NO_ORDER comp type altered the fields! #The NO_ORDER comp type altered the fields!
first = [['a','b'],['c','d','e']] first = [['a','b'],['c','d','e']]
second = [['c','d','f'],['a','b']] second = [['c','d','f'],['a','b']]
self.assertEqual(67, compare_fields(first, second, (NO_FIELD_ORDER, ))) eq_(67, compare_fields(first, second, (NO_FIELD_ORDER, )))
self.assertEqual([['a','b'],['c','d','e']],first) eq_([['a','b'],['c','d','e']],first)
self.assertEqual([['c','d','f'],['a','b']],second) eq_([['c','d','f'],['a','b']],second)
class TCbuild_word_dict(TestCase): class TestCasebuild_word_dict:
def test_with_standard_words(self): def test_with_standard_words(self):
l = [NamedObject('foo bar',True)] l = [NamedObject('foo bar',True)]
l.append(NamedObject('bar baz',True)) l.append(NamedObject('bar baz',True))
l.append(NamedObject('baz bleh foo',True)) l.append(NamedObject('baz bleh foo',True))
d = build_word_dict(l) d = build_word_dict(l)
self.assertEqual(4,len(d)) eq_(4,len(d))
self.assertEqual(2,len(d['foo'])) eq_(2,len(d['foo']))
self.assert_(l[0] in d['foo']) assert l[0] in d['foo']
self.assert_(l[2] in d['foo']) assert l[2] in d['foo']
self.assertEqual(2,len(d['bar'])) eq_(2,len(d['bar']))
self.assert_(l[0] in d['bar']) assert l[0] in d['bar']
self.assert_(l[1] in d['bar']) assert l[1] in d['bar']
self.assertEqual(2,len(d['baz'])) eq_(2,len(d['baz']))
self.assert_(l[1] in d['baz']) assert l[1] in d['baz']
self.assert_(l[2] in d['baz']) assert l[2] in d['baz']
self.assertEqual(1,len(d['bleh'])) eq_(1,len(d['bleh']))
self.assert_(l[2] in d['bleh']) assert l[2] in d['bleh']
def test_unpack_fields(self): def test_unpack_fields(self):
o = NamedObject('') o = NamedObject('')
o.words = [['foo','bar'],['baz']] o.words = [['foo','bar'],['baz']]
d = build_word_dict([o]) d = build_word_dict([o])
self.assertEqual(3,len(d)) eq_(3,len(d))
self.assertEqual(1,len(d['foo'])) eq_(1,len(d['foo']))
def test_words_are_unaltered(self): def test_words_are_unaltered(self):
o = NamedObject('') o = NamedObject('')
o.words = [['foo','bar'],['baz']] o.words = [['foo','bar'],['baz']]
d = build_word_dict([o]) build_word_dict([o])
self.assertEqual([['foo','bar'],['baz']],o.words) eq_([['foo','bar'],['baz']],o.words)
def test_object_instances_can_only_be_once_in_words_object_list(self): def test_object_instances_can_only_be_once_in_words_object_list(self):
o = NamedObject('foo foo',True) o = NamedObject('foo foo',True)
d = build_word_dict([o]) d = build_word_dict([o])
self.assertEqual(1,len(d['foo'])) eq_(1,len(d['foo']))
def test_job(self): def test_job(self):
def do_progress(p,d=''): def do_progress(p,d=''):
@ -239,11 +238,11 @@ class TCbuild_word_dict(TestCase):
s = "foo bar" s = "foo bar"
build_word_dict([NamedObject(s, True), NamedObject(s, True), NamedObject(s, True)], j) build_word_dict([NamedObject(s, True), NamedObject(s, True), NamedObject(s, True)], j)
# We don't have intermediate log because iter_with_progress is called with every > 1 # We don't have intermediate log because iter_with_progress is called with every > 1
self.assertEqual(0,self.log[0]) eq_(0,self.log[0])
self.assertEqual(100,self.log[1]) eq_(100,self.log[1])
class TCmerge_similar_words(TestCase): class TestCasemerge_similar_words:
def test_some_similar_words(self): def test_some_similar_words(self):
d = { d = {
'foobar':set([1]), 'foobar':set([1]),
@ -251,20 +250,20 @@ class TCmerge_similar_words(TestCase):
'foobar2':set([3]), 'foobar2':set([3]),
} }
merge_similar_words(d) merge_similar_words(d)
self.assertEqual(1,len(d)) eq_(1,len(d))
self.assertEqual(3,len(d['foobar'])) eq_(3,len(d['foobar']))
class TCreduce_common_words(TestCase): class TestCasereduce_common_words:
def test_typical(self): def test_typical(self):
d = { d = {
'foo': set([NamedObject('foo bar',True) for i in range(50)]), 'foo': set([NamedObject('foo bar',True) for i in range(50)]),
'bar': set([NamedObject('foo bar',True) for i in range(49)]) 'bar': set([NamedObject('foo bar',True) for i in range(49)])
} }
reduce_common_words(d, 50) reduce_common_words(d, 50)
self.assert_('foo' not in d) assert 'foo' not in d
self.assertEqual(49,len(d['bar'])) eq_(49,len(d['bar']))
def test_dont_remove_objects_with_only_common_words(self): def test_dont_remove_objects_with_only_common_words(self):
d = { d = {
@ -272,8 +271,8 @@ class TCreduce_common_words(TestCase):
'uncommon': set([NamedObject("common uncommon",True)]) 'uncommon': set([NamedObject("common uncommon",True)])
} }
reduce_common_words(d, 50) reduce_common_words(d, 50)
self.assertEqual(1,len(d['common'])) eq_(1,len(d['common']))
self.assertEqual(1,len(d['uncommon'])) eq_(1,len(d['uncommon']))
def test_values_still_are_set_instances(self): def test_values_still_are_set_instances(self):
d = { d = {
@ -281,8 +280,8 @@ class TCreduce_common_words(TestCase):
'uncommon': set([NamedObject("common uncommon",True)]) 'uncommon': set([NamedObject("common uncommon",True)])
} }
reduce_common_words(d, 50) reduce_common_words(d, 50)
self.assert_(isinstance(d['common'],set)) assert isinstance(d['common'],set)
self.assert_(isinstance(d['uncommon'],set)) assert isinstance(d['uncommon'],set)
def test_dont_raise_KeyError_when_a_word_has_been_removed(self): def test_dont_raise_KeyError_when_a_word_has_been_removed(self):
#If a word has been removed by the reduce, an object in a subsequent common word that #If a word has been removed by the reduce, an object in a subsequent common word that
@ -324,42 +323,42 @@ class TCreduce_common_words(TestCase):
'baz': set([NamedObject('foo bar baz',True) for i in range(49)]) 'baz': set([NamedObject('foo bar baz',True) for i in range(49)])
} }
reduce_common_words(d, 50) reduce_common_words(d, 50)
self.assertEqual(1,len(d['foo'])) eq_(1,len(d['foo']))
self.assertEqual(1,len(d['bar'])) eq_(1,len(d['bar']))
self.assertEqual(49,len(d['baz'])) eq_(49,len(d['baz']))
class TCget_match(TestCase): class TestCaseget_match:
def test_simple(self): def test_simple(self):
o1 = NamedObject("foo bar",True) o1 = NamedObject("foo bar",True)
o2 = NamedObject("bar bleh",True) o2 = NamedObject("bar bleh",True)
m = get_match(o1,o2) m = get_match(o1,o2)
self.assertEqual(50,m.percentage) eq_(50,m.percentage)
self.assertEqual(['foo','bar'],m.first.words) eq_(['foo','bar'],m.first.words)
self.assertEqual(['bar','bleh'],m.second.words) eq_(['bar','bleh'],m.second.words)
self.assert_(m.first is o1) assert m.first is o1
self.assert_(m.second is o2) assert m.second is o2
def test_in(self): def test_in(self):
o1 = NamedObject("foo",True) o1 = NamedObject("foo",True)
o2 = NamedObject("bar",True) o2 = NamedObject("bar",True)
m = get_match(o1,o2) m = get_match(o1,o2)
self.assert_(o1 in m) assert o1 in m
self.assert_(o2 in m) assert o2 in m
self.assert_(object() not in m) assert object() not in m
def test_word_weight(self): def test_word_weight(self):
self.assertEqual(int((6.0 / 13.0) * 100),get_match(NamedObject("foo bar",True),NamedObject("bar bleh",True),(WEIGHT_WORDS,)).percentage) eq_(int((6.0 / 13.0) * 100),get_match(NamedObject("foo bar",True),NamedObject("bar bleh",True),(WEIGHT_WORDS,)).percentage)
class GetMatches(TestCase): class TestCaseGetMatches:
def test_empty(self): def test_empty(self):
eq_(getmatches([]), []) eq_(getmatches([]), [])
def test_simple(self): def test_simple(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject("a b c foo")] l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject("a b c foo")]
r = getmatches(l) r = getmatches(l)
self.assertEqual(2,len(r)) eq_(2,len(r))
m = first(m for m in r if m.percentage == 50) #"foo bar" and "bar bleh" m = first(m for m in r if m.percentage == 50) #"foo bar" and "bar bleh"
assert_match(m, 'foo bar', 'bar bleh') assert_match(m, 'foo bar', 'bar bleh')
m = first(m for m in r if m.percentage == 33) #"foo bar" and "a b c foo" m = first(m for m in r if m.percentage == 33) #"foo bar" and "a b c foo"
@ -376,17 +375,17 @@ class GetMatches(TestCase):
def test_twice_the_same_word(self): def test_twice_the_same_word(self):
l = [NamedObject("foo foo bar"),NamedObject("bar bleh")] l = [NamedObject("foo foo bar"),NamedObject("bar bleh")]
r = getmatches(l) r = getmatches(l)
self.assertEqual(1,len(r)) eq_(1,len(r))
def test_twice_the_same_word_when_preworded(self): def test_twice_the_same_word_when_preworded(self):
l = [NamedObject("foo foo bar",True),NamedObject("bar bleh",True)] l = [NamedObject("foo foo bar",True),NamedObject("bar bleh",True)]
r = getmatches(l) r = getmatches(l)
self.assertEqual(1,len(r)) eq_(1,len(r))
def test_two_words_match(self): def test_two_words_match(self):
l = [NamedObject("foo bar"),NamedObject("foo bar bleh")] l = [NamedObject("foo bar"),NamedObject("foo bar bleh")]
r = getmatches(l) r = getmatches(l)
self.assertEqual(1,len(r)) eq_(1,len(r))
def test_match_files_with_only_common_words(self): def test_match_files_with_only_common_words(self):
#If a word occurs more than 50 times, it is excluded from the matching process #If a word occurs more than 50 times, it is excluded from the matching process
@ -395,7 +394,7 @@ class GetMatches(TestCase):
# This test assumes that the common word threashold const is 50 # This test assumes that the common word threashold const is 50
l = [NamedObject("foo") for i in range(50)] l = [NamedObject("foo") for i in range(50)]
r = getmatches(l) r = getmatches(l)
self.assertEqual(1225,len(r)) eq_(1225,len(r))
def test_use_words_already_there_if_there(self): def test_use_words_already_there_if_there(self):
o1 = NamedObject('foo') o1 = NamedObject('foo')
@ -412,14 +411,14 @@ class GetMatches(TestCase):
self.log = [] self.log = []
s = "foo bar" s = "foo bar"
getmatches([NamedObject(s), NamedObject(s), NamedObject(s)], j=j) getmatches([NamedObject(s), NamedObject(s), NamedObject(s)], j=j)
self.assert_(len(self.log) > 2) assert len(self.log) > 2
self.assertEqual(0,self.log[0]) eq_(0,self.log[0])
self.assertEqual(100,self.log[-1]) eq_(100,self.log[-1])
def test_weight_words(self): def test_weight_words(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh")] l = [NamedObject("foo bar"),NamedObject("bar bleh")]
m = getmatches(l, weight_words=True)[0] m = getmatches(l, weight_words=True)[0]
self.assertEqual(int((6.0 / 13.0) * 100),m.percentage) eq_(int((6.0 / 13.0) * 100),m.percentage)
def test_similar_word(self): def test_similar_word(self):
l = [NamedObject("foobar"),NamedObject("foobars")] l = [NamedObject("foobar"),NamedObject("foobars")]
@ -439,7 +438,7 @@ class GetMatches(TestCase):
def test_double_words_get_counted_only_once(self): def test_double_words_get_counted_only_once(self):
l = [NamedObject("foo bar foo bleh"),NamedObject("foo bar bleh bar")] l = [NamedObject("foo bar foo bleh"),NamedObject("foo bar bleh bar")]
m = getmatches(l)[0] m = getmatches(l)[0]
self.assertEqual(75,m.percentage) eq_(75,m.percentage)
def test_with_fields(self): def test_with_fields(self):
o1 = NamedObject("foo bar - foo bleh") o1 = NamedObject("foo bar - foo bleh")
@ -447,7 +446,7 @@ class GetMatches(TestCase):
o1.words = getfields(o1.name) o1.words = getfields(o1.name)
o2.words = getfields(o2.name) o2.words = getfields(o2.name)
m = getmatches([o1, o2])[0] m = getmatches([o1, o2])[0]
self.assertEqual(50, m.percentage) eq_(50, m.percentage)
def test_with_fields_no_order(self): def test_with_fields_no_order(self):
o1 = NamedObject("foo bar - foo bleh") o1 = NamedObject("foo bar - foo bleh")
@ -475,9 +474,9 @@ class GetMatches(TestCase):
def test_min_match_percentage(self): def test_min_match_percentage(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject("a b c foo")] l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject("a b c foo")]
r = getmatches(l, min_match_percentage=50) r = getmatches(l, min_match_percentage=50)
self.assertEqual(1,len(r)) #Only "foo bar" / "bar bleh" should match eq_(1,len(r)) #Only "foo bar" / "bar bleh" should match
def test_MemoryError(self): def test_MemoryError(self, monkeypatch):
@log_calls @log_calls
def mocked_match(first, second, flags): def mocked_match(first, second, flags):
if len(mocked_match.calls) > 42: if len(mocked_match.calls) > 42:
@ -485,35 +484,35 @@ class GetMatches(TestCase):
return Match(first, second, 0) return Match(first, second, 0)
objects = [NamedObject() for i in range(10)] # results in 45 matches objects = [NamedObject() for i in range(10)] # results in 45 matches
self.mock(engine, 'get_match', mocked_match) monkeypatch.setattr(engine, 'get_match', mocked_match)
try: try:
r = getmatches(objects) r = getmatches(objects)
except MemoryError: except MemoryError:
self.fail('MemorryError must be handled') self.fail('MemorryError must be handled')
self.assertEqual(42, len(r)) eq_(42, len(r))
class GetMatchesByContents(TestCase): class TestCaseGetMatchesByContents:
def test_dont_compare_empty_files(self): def test_dont_compare_empty_files(self):
o1, o2 = no(size=0), no(size=0) o1, o2 = no(size=0), no(size=0)
assert not getmatches_by_contents([o1, o2]) assert not getmatches_by_contents([o1, o2])
class TCGroup(TestCase): class TestCaseGroup:
def test_empy(self): def test_empy(self):
g = Group() g = Group()
self.assertEqual(None,g.ref) eq_(None,g.ref)
self.assertEqual([],g.dupes) eq_([],g.dupes)
self.assertEqual(0,len(g.matches)) eq_(0,len(g.matches))
def test_add_match(self): def test_add_match(self):
g = Group() g = Group()
m = get_match(NamedObject("foo",True),NamedObject("bar",True)) m = get_match(NamedObject("foo",True),NamedObject("bar",True))
g.add_match(m) g.add_match(m)
self.assert_(g.ref is m.first) assert g.ref is m.first
self.assertEqual([m.second],g.dupes) eq_([m.second],g.dupes)
self.assertEqual(1,len(g.matches)) eq_(1,len(g.matches))
self.assert_(m in g.matches) assert m in g.matches
def test_multiple_add_match(self): def test_multiple_add_match(self):
g = Group() g = Group()
@ -522,49 +521,49 @@ class TCGroup(TestCase):
o3 = NamedObject("c",True) o3 = NamedObject("c",True)
o4 = NamedObject("d",True) o4 = NamedObject("d",True)
g.add_match(get_match(o1,o2)) g.add_match(get_match(o1,o2))
self.assert_(g.ref is o1) assert g.ref is o1
self.assertEqual([o2],g.dupes) eq_([o2],g.dupes)
self.assertEqual(1,len(g.matches)) eq_(1,len(g.matches))
g.add_match(get_match(o1,o3)) g.add_match(get_match(o1,o3))
self.assertEqual([o2],g.dupes) eq_([o2],g.dupes)
self.assertEqual(2,len(g.matches)) eq_(2,len(g.matches))
g.add_match(get_match(o2,o3)) g.add_match(get_match(o2,o3))
self.assertEqual([o2,o3],g.dupes) eq_([o2,o3],g.dupes)
self.assertEqual(3,len(g.matches)) eq_(3,len(g.matches))
g.add_match(get_match(o1,o4)) g.add_match(get_match(o1,o4))
self.assertEqual([o2,o3],g.dupes) eq_([o2,o3],g.dupes)
self.assertEqual(4,len(g.matches)) eq_(4,len(g.matches))
g.add_match(get_match(o2,o4)) g.add_match(get_match(o2,o4))
self.assertEqual([o2,o3],g.dupes) eq_([o2,o3],g.dupes)
self.assertEqual(5,len(g.matches)) eq_(5,len(g.matches))
g.add_match(get_match(o3,o4)) g.add_match(get_match(o3,o4))
self.assertEqual([o2,o3,o4],g.dupes) eq_([o2,o3,o4],g.dupes)
self.assertEqual(6,len(g.matches)) eq_(6,len(g.matches))
def test_len(self): def test_len(self):
g = Group() g = Group()
self.assertEqual(0,len(g)) eq_(0,len(g))
g.add_match(get_match(NamedObject("foo",True),NamedObject("bar",True))) g.add_match(get_match(NamedObject("foo",True),NamedObject("bar",True)))
self.assertEqual(2,len(g)) eq_(2,len(g))
def test_add_same_match_twice(self): def test_add_same_match_twice(self):
g = Group() g = Group()
m = get_match(NamedObject("foo",True),NamedObject("foo",True)) m = get_match(NamedObject("foo",True),NamedObject("foo",True))
g.add_match(m) g.add_match(m)
self.assertEqual(2,len(g)) eq_(2,len(g))
self.assertEqual(1,len(g.matches)) eq_(1,len(g.matches))
g.add_match(m) g.add_match(m)
self.assertEqual(2,len(g)) eq_(2,len(g))
self.assertEqual(1,len(g.matches)) eq_(1,len(g.matches))
def test_in(self): def test_in(self):
g = Group() g = Group()
o1 = NamedObject("foo",True) o1 = NamedObject("foo",True)
o2 = NamedObject("bar",True) o2 = NamedObject("bar",True)
self.assert_(o1 not in g) assert o1 not in g
g.add_match(get_match(o1,o2)) g.add_match(get_match(o1,o2))
self.assert_(o1 in g) assert o1 in g
self.assert_(o2 in g) assert o2 in g
def test_remove(self): def test_remove(self):
g = Group() g = Group()
@ -574,14 +573,14 @@ class TCGroup(TestCase):
g.add_match(get_match(o1,o2)) g.add_match(get_match(o1,o2))
g.add_match(get_match(o1,o3)) g.add_match(get_match(o1,o3))
g.add_match(get_match(o2,o3)) g.add_match(get_match(o2,o3))
self.assertEqual(3,len(g.matches)) eq_(3,len(g.matches))
self.assertEqual(3,len(g)) eq_(3,len(g))
g.remove_dupe(o3) g.remove_dupe(o3)
self.assertEqual(1,len(g.matches)) eq_(1,len(g.matches))
self.assertEqual(2,len(g)) eq_(2,len(g))
g.remove_dupe(o1) g.remove_dupe(o1)
self.assertEqual(0,len(g.matches)) eq_(0,len(g.matches))
self.assertEqual(0,len(g)) eq_(0,len(g))
def test_remove_with_ref_dupes(self): def test_remove_with_ref_dupes(self):
g = Group() g = Group()
@ -594,21 +593,21 @@ class TCGroup(TestCase):
o1.is_ref = True o1.is_ref = True
o2.is_ref = True o2.is_ref = True
g.remove_dupe(o3) g.remove_dupe(o3)
self.assertEqual(0,len(g)) eq_(0,len(g))
def test_switch_ref(self): def test_switch_ref(self):
o1 = NamedObject(with_words=True) o1 = NamedObject(with_words=True)
o2 = NamedObject(with_words=True) o2 = NamedObject(with_words=True)
g = Group() g = Group()
g.add_match(get_match(o1,o2)) g.add_match(get_match(o1,o2))
self.assert_(o1 is g.ref) assert o1 is g.ref
g.switch_ref(o2) g.switch_ref(o2)
self.assert_(o2 is g.ref) assert o2 is g.ref
self.assertEqual([o1],g.dupes) eq_([o1],g.dupes)
g.switch_ref(o2) g.switch_ref(o2)
self.assert_(o2 is g.ref) assert o2 is g.ref
g.switch_ref(NamedObject('',True)) g.switch_ref(NamedObject('',True))
self.assert_(o2 is g.ref) assert o2 is g.ref
def test_get_match_of(self): def test_get_match_of(self):
g = Group() g = Group()
@ -616,10 +615,10 @@ class TCGroup(TestCase):
g.add_match(m) g.add_match(m)
o = g.dupes[0] o = g.dupes[0]
m = g.get_match_of(o) m = g.get_match_of(o)
self.assert_(g.ref in m) assert g.ref in m
self.assert_(o in m) assert o in m
self.assert_(g.get_match_of(NamedObject('',True)) is None) assert g.get_match_of(NamedObject('',True)) is None
self.assert_(g.get_match_of(g.ref) is None) assert g.get_match_of(g.ref) is None
def test_percentage(self): def test_percentage(self):
#percentage should return the avg percentage in relation to the ref #percentage should return the avg percentage in relation to the ref
@ -631,18 +630,18 @@ class TCGroup(TestCase):
g.add_match(m1) g.add_match(m1)
g.add_match(m2) g.add_match(m2)
g.add_match(m3) g.add_match(m3)
self.assertEqual(75,g.percentage) eq_(75,g.percentage)
g.switch_ref(g.dupes[0]) g.switch_ref(g.dupes[0])
self.assertEqual(66,g.percentage) eq_(66,g.percentage)
g.remove_dupe(g.dupes[0]) g.remove_dupe(g.dupes[0])
self.assertEqual(33,g.percentage) eq_(33,g.percentage)
g.add_match(m1) g.add_match(m1)
g.add_match(m2) g.add_match(m2)
self.assertEqual(66,g.percentage) eq_(66,g.percentage)
def test_percentage_on_empty_group(self): def test_percentage_on_empty_group(self):
g = Group() g = Group()
self.assertEqual(0,g.percentage) eq_(0,g.percentage)
def test_prioritize(self): def test_prioritize(self):
m1,m2,m3 = get_match_triangle() m1,m2,m3 = get_match_triangle()
@ -656,9 +655,9 @@ class TCGroup(TestCase):
g.add_match(m1) g.add_match(m1)
g.add_match(m2) g.add_match(m2)
g.add_match(m3) g.add_match(m3)
self.assert_(o1 is g.ref) assert o1 is g.ref
g.prioritize(lambda x:x.name) g.prioritize(lambda x:x.name)
self.assert_(o3 is g.ref) assert o3 is g.ref
def test_prioritize_with_tie_breaker(self): def test_prioritize_with_tie_breaker(self):
# if the ref has the same key as one or more of the dupe, run the tie_breaker func among them # if the ref has the same key as one or more of the dupe, run the tie_breaker func among them
@ -666,7 +665,7 @@ class TCGroup(TestCase):
o1, o2, o3 = g.ordered o1, o2, o3 = g.ordered
tie_breaker = lambda ref, dupe: dupe is o3 tie_breaker = lambda ref, dupe: dupe is o3
g.prioritize(lambda x:0, tie_breaker) g.prioritize(lambda x:0, tie_breaker)
self.assertTrue(g.ref is o3) assert g.ref is o3
def test_prioritize_with_tie_breaker_runs_on_all_dupes(self): def test_prioritize_with_tie_breaker_runs_on_all_dupes(self):
# Even if a dupe is chosen to switch with ref with a tie breaker, we still run the tie breaker # Even if a dupe is chosen to switch with ref with a tie breaker, we still run the tie breaker
@ -678,7 +677,7 @@ class TCGroup(TestCase):
o3.foo = 3 o3.foo = 3
tie_breaker = lambda ref, dupe: dupe.foo > ref.foo tie_breaker = lambda ref, dupe: dupe.foo > ref.foo
g.prioritize(lambda x:0, tie_breaker) g.prioritize(lambda x:0, tie_breaker)
self.assertTrue(g.ref is o3) assert g.ref is o3
def test_prioritize_with_tie_breaker_runs_only_on_tie_dupes(self): def test_prioritize_with_tie_breaker_runs_only_on_tie_dupes(self):
# The tie breaker only runs on dupes that had the same value for the key_func # The tie breaker only runs on dupes that had the same value for the key_func
@ -693,14 +692,14 @@ class TCGroup(TestCase):
key_func = lambda x: -x.foo key_func = lambda x: -x.foo
tie_breaker = lambda ref, dupe: dupe.bar > ref.bar tie_breaker = lambda ref, dupe: dupe.bar > ref.bar
g.prioritize(key_func, tie_breaker) g.prioritize(key_func, tie_breaker)
self.assertTrue(g.ref is o2) assert g.ref is o2
def test_list_like(self): def test_list_like(self):
g = Group() g = Group()
o1,o2 = (NamedObject("foo",True),NamedObject("bar",True)) o1,o2 = (NamedObject("foo",True),NamedObject("bar",True))
g.add_match(get_match(o1,o2)) g.add_match(get_match(o1,o2))
self.assert_(g[0] is o1) assert g[0] is o1
self.assert_(g[1] is o2) assert g[1] is o2
def test_discard_matches(self): def test_discard_matches(self):
g = Group() g = Group()
@ -708,33 +707,33 @@ class TCGroup(TestCase):
g.add_match(get_match(o1,o2)) g.add_match(get_match(o1,o2))
g.add_match(get_match(o1,o3)) g.add_match(get_match(o1,o3))
g.discard_matches() g.discard_matches()
self.assertEqual(1,len(g.matches)) eq_(1,len(g.matches))
self.assertEqual(0,len(g.candidates)) eq_(0,len(g.candidates))
class TCget_groups(TestCase): class TestCaseget_groups:
def test_empty(self): def test_empty(self):
r = get_groups([]) r = get_groups([])
self.assertEqual([],r) eq_([],r)
def test_simple(self): def test_simple(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh")] l = [NamedObject("foo bar"),NamedObject("bar bleh")]
matches = getmatches(l) matches = getmatches(l)
m = matches[0] m = matches[0]
r = get_groups(matches) r = get_groups(matches)
self.assertEqual(1,len(r)) eq_(1,len(r))
g = r[0] g = r[0]
self.assert_(g.ref is m.first) assert g.ref is m.first
self.assertEqual([m.second],g.dupes) eq_([m.second],g.dupes)
def test_group_with_multiple_matches(self): def test_group_with_multiple_matches(self):
#This results in 3 matches #This results in 3 matches
l = [NamedObject("foo"),NamedObject("foo"),NamedObject("foo")] l = [NamedObject("foo"),NamedObject("foo"),NamedObject("foo")]
matches = getmatches(l) matches = getmatches(l)
r = get_groups(matches) r = get_groups(matches)
self.assertEqual(1,len(r)) eq_(1,len(r))
g = r[0] g = r[0]
self.assertEqual(3,len(g)) eq_(3,len(g))
def test_must_choose_a_group(self): def test_must_choose_a_group(self):
l = [NamedObject("a b"),NamedObject("a b"),NamedObject("b c"),NamedObject("c d"),NamedObject("c d")] l = [NamedObject("a b"),NamedObject("a b"),NamedObject("b c"),NamedObject("c d"),NamedObject("c d")]
@ -742,8 +741,8 @@ class TCget_groups(TestCase):
#"b c" can go either of them, but not both. #"b c" can go either of them, but not both.
matches = getmatches(l) matches = getmatches(l)
r = get_groups(matches) r = get_groups(matches)
self.assertEqual(2,len(r)) eq_(2,len(r))
self.assertEqual(5,len(r[0])+len(r[1])) eq_(5,len(r[0])+len(r[1]))
def test_should_all_go_in_the_same_group(self): def test_should_all_go_in_the_same_group(self):
l = [NamedObject("a b"),NamedObject("a b"),NamedObject("a b"),NamedObject("a b")] l = [NamedObject("a b"),NamedObject("a b"),NamedObject("a b"),NamedObject("a b")]
@ -751,7 +750,7 @@ class TCget_groups(TestCase):
#"b c" can fit in both, but it must be in only one of them #"b c" can fit in both, but it must be in only one of them
matches = getmatches(l) matches = getmatches(l)
r = get_groups(matches) r = get_groups(matches)
self.assertEqual(1,len(r)) eq_(1,len(r))
def test_give_priority_to_matches_with_higher_percentage(self): def test_give_priority_to_matches_with_higher_percentage(self):
o1 = NamedObject(with_words=True) o1 = NamedObject(with_words=True)
@ -760,19 +759,19 @@ class TCget_groups(TestCase):
m1 = Match(o1, o2, 1) m1 = Match(o1, o2, 1)
m2 = Match(o2, o3, 2) m2 = Match(o2, o3, 2)
r = get_groups([m1,m2]) r = get_groups([m1,m2])
self.assertEqual(1,len(r)) eq_(1,len(r))
g = r[0] g = r[0]
self.assertEqual(2,len(g)) eq_(2,len(g))
self.assert_(o1 not in g) assert o1 not in g
self.assert_(o2 in g) assert o2 in g
self.assert_(o3 in g) assert o3 in g
def test_four_sized_group(self): def test_four_sized_group(self):
l = [NamedObject("foobar") for i in range(4)] l = [NamedObject("foobar") for i in range(4)]
m = getmatches(l) m = getmatches(l)
r = get_groups(m) r = get_groups(m)
self.assertEqual(1,len(r)) eq_(1,len(r))
self.assertEqual(4,len(r[0])) eq_(4,len(r[0]))
def test_referenced_by_ref2(self): def test_referenced_by_ref2(self):
o1 = NamedObject(with_words=True) o1 = NamedObject(with_words=True)
@ -782,7 +781,7 @@ class TCget_groups(TestCase):
m2 = get_match(o3,o1) m2 = get_match(o3,o1)
m3 = get_match(o3,o2) m3 = get_match(o3,o2)
r = get_groups([m1,m2,m3]) r = get_groups([m1,m2,m3])
self.assertEqual(3,len(r[0])) eq_(3,len(r[0]))
def test_job(self): def test_job(self):
def do_progress(p,d=''): def do_progress(p,d=''):
@ -795,8 +794,8 @@ class TCget_groups(TestCase):
#101%: To make sure it is processed first so the job test works correctly #101%: To make sure it is processed first so the job test works correctly
m4 = Match(NamedObject('a',True), NamedObject('a',True), 101) m4 = Match(NamedObject('a',True), NamedObject('a',True), 101)
get_groups([m1,m2,m3,m4],j) get_groups([m1,m2,m3,m4],j)
self.assertEqual(0,self.log[0]) eq_(0,self.log[0])
self.assertEqual(100,self.log[-1]) eq_(100,self.log[-1])
def test_group_admissible_discarded_dupes(self): def test_group_admissible_discarded_dupes(self):
# If, with a (A, B, C, D) set, all match with A, but C and D don't match with B and that the # If, with a (A, B, C, D) set, all match with A, but C and D don't match with B and that the

View File

@ -13,8 +13,7 @@ import os.path as op
from xml.etree import ElementTree as ET from xml.etree import ElementTree as ET
from hsutil.path import Path from hsutil.path import Path
from hsutil.testutil import eq_ from hscommon.testutil import eq_
from hsutil.testcase import TestCase
from hsutil.misc import first from hsutil.misc import first
from . import engine_test, data from . import engine_test, data
@ -44,8 +43,8 @@ def GetTestGroups():
groups.sort(key=len, reverse=True) # We want the group with 3 members to be first. groups.sort(key=len, reverse=True) # We want the group with 3 members to be first.
return (objects,matches,groups) return (objects,matches,groups)
class TCResultsEmpty(TestCase): class TestCaseResultsEmpty:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
def test_apply_invalid_filter(self): def test_apply_invalid_filter(self):
@ -74,8 +73,8 @@ class TCResultsEmpty(TestCase):
assert not self.results.is_modified assert not self.results.is_modified
class TCResultsWithSomeGroups(TestCase): class TestCaseResultsWithSomeGroups:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
self.objects,self.matches,self.groups = GetTestGroups() self.objects,self.matches,self.groups = GetTestGroups()
self.results.groups = self.groups self.results.groups = self.groups
@ -222,8 +221,8 @@ class TCResultsWithSomeGroups(TestCase):
assert not self.results.is_modified assert not self.results.is_modified
class ResultsWithSavedResults(TestCase): class TestCaseResultsWithSavedResults:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
self.objects,self.matches,self.groups = GetTestGroups() self.objects,self.matches,self.groups = GetTestGroups()
self.results.groups = self.groups self.results.groups = self.groups
@ -255,8 +254,8 @@ class ResultsWithSavedResults(TestCase):
assert self.results.is_modified assert self.results.is_modified
class TCResultsMarkings(TestCase): class TestCaseResultsMarkings:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
self.objects,self.matches,self.groups = GetTestGroups() self.objects,self.matches,self.groups = GetTestGroups()
self.results.groups = self.groups self.results.groups = self.groups
@ -356,7 +355,6 @@ class TCResultsMarkings(TestCase):
def test_remove_duplicates(self): def test_remove_duplicates(self):
g1 = self.results.groups[0] g1 = self.results.groups[0]
g2 = self.results.groups[1]
self.results.mark(g1.dupes[0]) self.results.mark(g1.dupes[0])
eq_("1 / 3 (1.00 KB / 1.01 KB) duplicates marked.",self.results.stat_line) eq_("1 / 3 (1.00 KB / 1.01 KB) duplicates marked.",self.results.stat_line)
self.results.remove_duplicates([g1.dupes[1]]) self.results.remove_duplicates([g1.dupes[1]])
@ -410,8 +408,8 @@ class TCResultsMarkings(TestCase):
assert r.is_marked(self.objects[4]) assert r.is_marked(self.objects[4])
class TCResultsXML(TestCase): class TestCaseResultsXML:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
self.objects, self.matches, self.groups = GetTestGroups() self.objects, self.matches, self.groups = GetTestGroups()
self.results.groups = self.groups self.results.groups = self.groups
@ -486,11 +484,11 @@ class TCResultsXML(TestCase):
eq_(['ibabtu'],g2[0].words) eq_(['ibabtu'],g2[0].words)
eq_(['ibabtu'],g2[1].words) eq_(['ibabtu'],g2[1].words)
def test_LoadXML_with_filename(self): def test_LoadXML_with_filename(self, tmpdir):
def get_file(path): def get_file(path):
return [f for f in self.objects if str(f.path) == path][0] return [f for f in self.objects if str(f.path) == path][0]
filename = op.join(self.tmpdir(), 'dupeguru_results.xml') filename = str(tmpdir.join('dupeguru_results.xml'))
self.objects[4].name = 'ibabtu 2' #we can't have 2 files with the same path self.objects[4].name = 'ibabtu 2' #we can't have 2 files with the same path
self.results.save_to_xml(filename) self.results.save_to_xml(filename)
r = Results(data) r = Results(data)
@ -634,8 +632,8 @@ class TCResultsXML(TestCase):
self.results.save_to_xml(io.BytesIO()) # don't crash self.results.save_to_xml(io.BytesIO()) # don't crash
class TCResultsFilter(TestCase): class TestCaseResultsFilter:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
self.objects, self.matches, self.groups = GetTestGroups() self.objects, self.matches, self.groups = GetTestGroups()
self.results.groups = self.groups self.results.groups = self.groups
@ -716,11 +714,11 @@ class TCResultsFilter(TestCase):
eq_(1, len(self.results.groups)) eq_(1, len(self.results.groups))
assert self.results.groups[0] is self.groups[0] assert self.results.groups[0] is self.groups[0]
def test_load_cancels_filter(self): def test_load_cancels_filter(self, tmpdir):
def get_file(path): def get_file(path):
return [f for f in self.objects if str(f.path) == path][0] return [f for f in self.objects if str(f.path) == path][0]
filename = op.join(self.tmpdir(), 'dupeguru_results.xml') filename = str(tmpdir.join('dupeguru_results.xml'))
self.objects[4].name = 'ibabtu 2' #we can't have 2 files with the same path self.objects[4].name = 'ibabtu 2' #we can't have 2 files with the same path
self.results.save_to_xml(filename) self.results.save_to_xml(filename)
r = Results(data) r = Results(data)
@ -759,8 +757,8 @@ class TCResultsFilter(TestCase):
eq_(expected, self.results.stat_line) eq_(expected, self.results.stat_line)
class TCResultsRefFile(TestCase): class TestCaseResultsRefFile:
def setUp(self): def setup_method(self, method):
self.results = Results(data) self.results = Results(data)
self.objects, self.matches, self.groups = GetTestGroups() self.objects, self.matches, self.groups = GetTestGroups()
self.objects[0].is_ref = True self.objects[0].is_ref = True

View File

@ -9,15 +9,14 @@
from jobprogress import job from jobprogress import job
from hsutil import io from hsutil import io
from hsutil.path import Path from hsutil.path import Path
from hsutil.testutil import eq_ from hscommon.testutil import eq_
from hsutil.testcase import TestCase
from .. import fs from .. import fs
from ..engine import getwords, Match from ..engine import getwords, Match
from ..ignore import IgnoreList from ..ignore import IgnoreList
from ..scanner import * from ..scanner import *
class NamedObject(object): class NamedObject:
def __init__(self, name="foobar", size=1): def __init__(self, name="foobar", size=1):
self.name = name self.name = name
self.size = size self.size = size
@ -30,449 +29,445 @@ class NamedObject(object):
no = NamedObject no = NamedObject
#--- Scanner def pytest_funcarg__fake_fileexists(request):
class ScannerTestFakeFiles(TestCase): # This is a hack to avoid invalidating all previous tests since the scanner started to test
def setUp(self): # for file existence before doing the match grouping.
# This is a hack to avoid invalidating all previous tests since the scanner started to test monkeypatch = request.getfuncargvalue('monkeypatch')
# for file existence before doing the match grouping. monkeypatch.setattr(io, 'exists', lambda _: True)
self.mock(io, 'exists', lambda _: True)
def test_empty(fake_fileexists):
def test_empty(self): s = Scanner()
s = Scanner() r = s.GetDupeGroups([])
r = s.GetDupeGroups([]) eq_(r, [])
eq_(r, [])
def test_default_settings(fake_fileexists):
def test_default_settings(self): s = Scanner()
s = Scanner() eq_(s.min_match_percentage, 80)
eq_(s.min_match_percentage, 80) eq_(s.scan_type, ScanType.Filename)
eq_(s.scan_type, ScanType.Filename) eq_(s.mix_file_kind, True)
eq_(s.mix_file_kind, True) eq_(s.word_weighting, False)
eq_(s.word_weighting, False) eq_(s.match_similar_words, False)
eq_(s.match_similar_words, False) assert isinstance(s.ignore_list, IgnoreList)
assert isinstance(s.ignore_list, IgnoreList)
def test_simple_with_default_settings(fake_fileexists):
def test_simple_with_default_settings(self): s = Scanner()
s = Scanner() f = [no('foo bar'), no('foo bar'), no('foo bleh')]
f = [no('foo bar'), no('foo bar'), no('foo bleh')] r = s.GetDupeGroups(f)
r = s.GetDupeGroups(f) eq_(len(r), 1)
eq_(len(r), 1) g = r[0]
g = r[0] #'foo bleh' cannot be in the group because the default min match % is 80
#'foo bleh' cannot be in the group because the default min match % is 80 eq_(len(g), 2)
eq_(len(g), 2) assert g.ref in f[:2]
assert g.ref in f[:2] assert g.dupes[0] in f[:2]
assert g.dupes[0] in f[:2]
def test_simple_with_lower_min_match(fake_fileexists):
def test_simple_with_lower_min_match(self): s = Scanner()
s = Scanner() s.min_match_percentage = 50
s.min_match_percentage = 50 f = [no('foo bar'), no('foo bar'), no('foo bleh')]
f = [no('foo bar'), no('foo bar'), no('foo bleh')] r = s.GetDupeGroups(f)
r = s.GetDupeGroups(f) eq_(len(r), 1)
eq_(len(r), 1) g = r[0]
g = r[0] eq_(len(g), 3)
eq_(len(g), 3)
def test_trim_all_ref_groups(fake_fileexists):
def test_trim_all_ref_groups(self): # When all files of a group are ref, don't include that group in the results, but also don't
# When all files of a group are ref, don't include that group in the results, but also don't # count the files from that group as discarded.
# count the files from that group as discarded. s = Scanner()
s = Scanner() f = [no('foo'), no('foo'), no('bar'), no('bar')]
f = [no('foo'), no('foo'), no('bar'), no('bar')] f[2].is_ref = True
f[2].is_ref = True f[3].is_ref = True
f[3].is_ref = True r = s.GetDupeGroups(f)
r = s.GetDupeGroups(f) eq_(len(r), 1)
eq_(len(r), 1) eq_(s.discarded_file_count, 0)
eq_(s.discarded_file_count, 0)
def test_priorize(fake_fileexists):
def test_priorize(self): s = Scanner()
s = Scanner() f = [no('foo'), no('foo'), no('bar'), no('bar')]
f = [no('foo'), no('foo'), no('bar'), no('bar')] f[1].size = 2
f[1].size = 2 f[2].size = 3
f[2].size = 3 f[3].is_ref = True
f[3].is_ref = True r = s.GetDupeGroups(f)
r = s.GetDupeGroups(f) g1, g2 = r
g1, g2 = r assert f[1] in (g1.ref,g2.ref)
assert f[1] in (g1.ref,g2.ref) assert f[0] in (g1.dupes[0],g2.dupes[0])
assert f[0] in (g1.dupes[0],g2.dupes[0]) assert f[3] in (g1.ref,g2.ref)
assert f[3] in (g1.ref,g2.ref) assert f[2] in (g1.dupes[0],g2.dupes[0])
assert f[2] in (g1.dupes[0],g2.dupes[0])
def test_content_scan(fake_fileexists):
def test_content_scan(self): s = Scanner()
s = Scanner() s.scan_type = ScanType.Contents
s.scan_type = ScanType.Contents f = [no('foo'), no('bar'), no('bleh')]
f = [no('foo'), no('bar'), no('bleh')] f[0].md5 = f[0].md5partial = 'foobar'
f[0].md5 = f[0].md5partial = 'foobar' f[1].md5 = f[1].md5partial = 'foobar'
f[1].md5 = f[1].md5partial = 'foobar' f[2].md5 = f[2].md5partial = 'bleh'
f[2].md5 = f[2].md5partial = 'bleh' r = s.GetDupeGroups(f)
r = s.GetDupeGroups(f) eq_(len(r), 1)
eq_(len(r), 1) eq_(len(r[0]), 2)
eq_(len(r[0]), 2) eq_(s.discarded_file_count, 0) # don't count the different md5 as discarded!
eq_(s.discarded_file_count, 0) # don't count the different md5 as discarded!
def test_content_scan_compare_sizes_first(fake_fileexists):
def test_content_scan_compare_sizes_first(self): class MyFile(no):
class MyFile(no): @property
@property def md5(file):
def md5(file):
raise AssertionError()
s = Scanner()
s.scan_type = ScanType.Contents
f = [MyFile('foo', 1), MyFile('bar', 2)]
eq_(len(s.GetDupeGroups(f)), 0)
def test_min_match_perc_doesnt_matter_for_content_scan(self):
s = Scanner()
s.scan_type = ScanType.Contents
f = [no('foo'), no('bar'), no('bleh')]
f[0].md5 = f[0].md5partial = 'foobar'
f[1].md5 = f[1].md5partial = 'foobar'
f[2].md5 = f[2].md5partial = 'bleh'
s.min_match_percentage = 101
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
s.min_match_percentage = 0
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
def test_content_scan_doesnt_put_md5_in_words_at_the_end(self):
s = Scanner()
s.scan_type = ScanType.Contents
f = [no('foo'),no('bar')]
f[0].md5 = f[0].md5partial = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'
f[1].md5 = f[1].md5partial = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'
r = s.GetDupeGroups(f)
g = r[0]
def test_extension_is_not_counted_in_filename_scan(self):
s = Scanner()
s.min_match_percentage = 100
f = [no('foo.bar'), no('foo.bleh')]
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
def test_job(self):
def do_progress(progress, desc=''):
log.append(progress)
return True
s = Scanner()
log = []
f = [no('foo bar'), no('foo bar'), no('foo bleh')]
r = s.GetDupeGroups(f, job.Job(1, do_progress))
eq_(log[0], 0)
eq_(log[-1], 100)
def test_mix_file_kind(self):
s = Scanner()
s.mix_file_kind = False
f = [no('foo.1'), no('foo.2')]
r = s.GetDupeGroups(f)
eq_(len(r), 0)
def test_word_weighting(self):
s = Scanner()
s.min_match_percentage = 75
s.word_weighting = True
f = [no('foo bar'), no('foo bar bleh')]
r = s.GetDupeGroups(f)
eq_(len(r), 1)
g = r[0]
m = g.get_match_of(g.dupes[0])
eq_(m.percentage, 75) # 16 letters, 12 matching
def test_similar_words(self):
s = Scanner()
s.match_similar_words = True
f = [no('The White Stripes'), no('The Whites Stripe'), no('Limp Bizkit'), no('Limp Bizkitt')]
r = s.GetDupeGroups(f)
eq_(len(r), 2)
def test_fields(self):
s = Scanner()
s.scan_type = ScanType.Fields
f = [no('The White Stripes - Little Ghost'), no('The White Stripes - Little Acorn')]
r = s.GetDupeGroups(f)
eq_(len(r), 0)
def test_fields_no_order(self):
s = Scanner()
s.scan_type = ScanType.FieldsNoOrder
f = [no('The White Stripes - Little Ghost'), no('Little Ghost - The White Stripes')]
r = s.GetDupeGroups(f)
eq_(len(r), 1)
def test_tag_scan(self):
s = Scanner()
s.scan_type = ScanType.Tag
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes'
o1.title = 'The Air Near My Fingers'
o2.artist = 'The White Stripes'
o2.title = 'The Air Near My Fingers'
r = s.GetDupeGroups([o1,o2])
eq_(len(r), 1)
def test_tag_with_album_scan(self):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['artist', 'album', 'title'])
o1 = no('foo')
o2 = no('bar')
o3 = no('bleh')
o1.artist = 'The White Stripes'
o1.title = 'The Air Near My Fingers'
o1.album = 'Elephant'
o2.artist = 'The White Stripes'
o2.title = 'The Air Near My Fingers'
o2.album = 'Elephant'
o3.artist = 'The White Stripes'
o3.title = 'The Air Near My Fingers'
o3.album = 'foobar'
r = s.GetDupeGroups([o1,o2,o3])
eq_(len(r), 1)
def test_that_dash_in_tags_dont_create_new_fields(self):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['artist', 'album', 'title'])
s.min_match_percentage = 50
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes - a'
o1.title = 'The Air Near My Fingers - a'
o1.album = 'Elephant - a'
o2.artist = 'The White Stripes - b'
o2.title = 'The Air Near My Fingers - b'
o2.album = 'Elephant - b'
r = s.GetDupeGroups([o1,o2])
eq_(len(r), 1)
def test_tag_scan_with_different_scanned(self):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['track', 'year'])
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes'
o1.title = 'some title'
o1.track = 'foo'
o1.year = 'bar'
o2.artist = 'The White Stripes'
o2.title = 'another title'
o2.track = 'foo'
o2.year = 'bar'
r = s.GetDupeGroups([o1, o2])
eq_(len(r), 1)
def test_tag_scan_only_scans_existing_tags(self):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['artist', 'foo'])
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes'
o1.foo = 'foo'
o2.artist = 'The White Stripes'
o2.foo = 'bar'
r = s.GetDupeGroups([o1, o2])
eq_(len(r), 1) # Because 'foo' is not scanned, they match
def test_tag_scan_converts_to_str(self):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['track'])
o1 = no('foo')
o2 = no('bar')
o1.track = 42
o2.track = 42
try:
r = s.GetDupeGroups([o1, o2])
except TypeError:
raise AssertionError() raise AssertionError()
eq_(len(r), 1)
s = Scanner()
def test_tag_scan_non_ascii(self): s.scan_type = ScanType.Contents
s = Scanner() f = [MyFile('foo', 1), MyFile('bar', 2)]
s.scan_type = ScanType.Tag eq_(len(s.GetDupeGroups(f)), 0)
s.scanned_tags = set(['title'])
o1 = no('foo') def test_min_match_perc_doesnt_matter_for_content_scan(fake_fileexists):
o2 = no('bar') s = Scanner()
o1.title = 'foobar\u00e9' s.scan_type = ScanType.Contents
o2.title = 'foobar\u00e9' f = [no('foo'), no('bar'), no('bleh')]
try: f[0].md5 = f[0].md5partial = 'foobar'
r = s.GetDupeGroups([o1, o2]) f[1].md5 = f[1].md5partial = 'foobar'
except UnicodeEncodeError: f[2].md5 = f[2].md5partial = 'bleh'
s.min_match_percentage = 101
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
s.min_match_percentage = 0
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
def test_content_scan_doesnt_put_md5_in_words_at_the_end(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Contents
f = [no('foo'),no('bar')]
f[0].md5 = f[0].md5partial = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'
f[1].md5 = f[1].md5partial = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f'
r = s.GetDupeGroups(f)
g = r[0]
def test_extension_is_not_counted_in_filename_scan(fake_fileexists):
s = Scanner()
s.min_match_percentage = 100
f = [no('foo.bar'), no('foo.bleh')]
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
def test_job(fake_fileexists):
def do_progress(progress, desc=''):
log.append(progress)
return True
s = Scanner()
log = []
f = [no('foo bar'), no('foo bar'), no('foo bleh')]
r = s.GetDupeGroups(f, job.Job(1, do_progress))
eq_(log[0], 0)
eq_(log[-1], 100)
def test_mix_file_kind(fake_fileexists):
s = Scanner()
s.mix_file_kind = False
f = [no('foo.1'), no('foo.2')]
r = s.GetDupeGroups(f)
eq_(len(r), 0)
def test_word_weighting(fake_fileexists):
s = Scanner()
s.min_match_percentage = 75
s.word_weighting = True
f = [no('foo bar'), no('foo bar bleh')]
r = s.GetDupeGroups(f)
eq_(len(r), 1)
g = r[0]
m = g.get_match_of(g.dupes[0])
eq_(m.percentage, 75) # 16 letters, 12 matching
def test_similar_words(fake_fileexists):
s = Scanner()
s.match_similar_words = True
f = [no('The White Stripes'), no('The Whites Stripe'), no('Limp Bizkit'), no('Limp Bizkitt')]
r = s.GetDupeGroups(f)
eq_(len(r), 2)
def test_fields(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Fields
f = [no('The White Stripes - Little Ghost'), no('The White Stripes - Little Acorn')]
r = s.GetDupeGroups(f)
eq_(len(r), 0)
def test_fields_no_order(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.FieldsNoOrder
f = [no('The White Stripes - Little Ghost'), no('Little Ghost - The White Stripes')]
r = s.GetDupeGroups(f)
eq_(len(r), 1)
def test_tag_scan(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes'
o1.title = 'The Air Near My Fingers'
o2.artist = 'The White Stripes'
o2.title = 'The Air Near My Fingers'
r = s.GetDupeGroups([o1,o2])
eq_(len(r), 1)
def test_tag_with_album_scan(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['artist', 'album', 'title'])
o1 = no('foo')
o2 = no('bar')
o3 = no('bleh')
o1.artist = 'The White Stripes'
o1.title = 'The Air Near My Fingers'
o1.album = 'Elephant'
o2.artist = 'The White Stripes'
o2.title = 'The Air Near My Fingers'
o2.album = 'Elephant'
o3.artist = 'The White Stripes'
o3.title = 'The Air Near My Fingers'
o3.album = 'foobar'
r = s.GetDupeGroups([o1,o2,o3])
eq_(len(r), 1)
def test_that_dash_in_tags_dont_create_new_fields(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['artist', 'album', 'title'])
s.min_match_percentage = 50
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes - a'
o1.title = 'The Air Near My Fingers - a'
o1.album = 'Elephant - a'
o2.artist = 'The White Stripes - b'
o2.title = 'The Air Near My Fingers - b'
o2.album = 'Elephant - b'
r = s.GetDupeGroups([o1,o2])
eq_(len(r), 1)
def test_tag_scan_with_different_scanned(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['track', 'year'])
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes'
o1.title = 'some title'
o1.track = 'foo'
o1.year = 'bar'
o2.artist = 'The White Stripes'
o2.title = 'another title'
o2.track = 'foo'
o2.year = 'bar'
r = s.GetDupeGroups([o1, o2])
eq_(len(r), 1)
def test_tag_scan_only_scans_existing_tags(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['artist', 'foo'])
o1 = no('foo')
o2 = no('bar')
o1.artist = 'The White Stripes'
o1.foo = 'foo'
o2.artist = 'The White Stripes'
o2.foo = 'bar'
r = s.GetDupeGroups([o1, o2])
eq_(len(r), 1) # Because 'foo' is not scanned, they match
def test_tag_scan_converts_to_str(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['track'])
o1 = no('foo')
o2 = no('bar')
o1.track = 42
o2.track = 42
try:
r = s.GetDupeGroups([o1, o2])
except TypeError:
raise AssertionError()
eq_(len(r), 1)
def test_tag_scan_non_ascii(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.Tag
s.scanned_tags = set(['title'])
o1 = no('foo')
o2 = no('bar')
o1.title = 'foobar\u00e9'
o2.title = 'foobar\u00e9'
try:
r = s.GetDupeGroups([o1, o2])
except UnicodeEncodeError:
raise AssertionError()
eq_(len(r), 1)
def test_audio_content_scan(fake_fileexists):
s = Scanner()
s.scan_type = ScanType.ContentsAudio
f = [no('foo'), no('bar'), no('bleh')]
f[0].md5 = 'foo'
f[1].md5 = 'bar'
f[2].md5 = 'bleh'
f[0].md5partial = 'foo'
f[1].md5partial = 'foo'
f[2].md5partial = 'bleh'
f[0].audiosize = 1
f[1].audiosize = 1
f[2].audiosize = 1
r = s.GetDupeGroups(f)
eq_(len(r), 1)
eq_(len(r[0]), 2)
def test_audio_content_scan_compare_sizes_first(fake_fileexists):
class MyFile(no):
@property
def md5partial(file):
raise AssertionError() raise AssertionError()
eq_(len(r), 1)
s = Scanner()
def test_audio_content_scan(self): s.scan_type = ScanType.ContentsAudio
s = Scanner() f = [MyFile('foo'), MyFile('bar')]
s.scan_type = ScanType.ContentsAudio f[0].audiosize = 1
f = [no('foo'), no('bar'), no('bleh')] f[1].audiosize = 2
f[0].md5 = 'foo' eq_(len(s.GetDupeGroups(f)), 0)
f[1].md5 = 'bar'
f[2].md5 = 'bleh' def test_ignore_list(fake_fileexists):
f[0].md5partial = 'foo' s = Scanner()
f[1].md5partial = 'foo' f1 = no('foobar')
f[2].md5partial = 'bleh' f2 = no('foobar')
f[0].audiosize = 1 f3 = no('foobar')
f[1].audiosize = 1 f1.path = Path('dir1/foobar')
f[2].audiosize = 1 f2.path = Path('dir2/foobar')
r = s.GetDupeGroups(f) f3.path = Path('dir3/foobar')
eq_(len(r), 1) s.ignore_list.Ignore(str(f1.path),str(f2.path))
eq_(len(r[0]), 2) s.ignore_list.Ignore(str(f1.path),str(f3.path))
r = s.GetDupeGroups([f1,f2,f3])
def test_audio_content_scan_compare_sizes_first(self): eq_(len(r), 1)
class MyFile(no): g = r[0]
@property eq_(len(g.dupes), 1)
def md5partial(file): assert f1 not in g
raise AssertionError() assert f2 in g
assert f3 in g
s = Scanner() # Ignored matches are not counted as discarded
s.scan_type = ScanType.ContentsAudio eq_(s.discarded_file_count, 0)
f = [MyFile('foo'), MyFile('bar')]
f[0].audiosize = 1 def test_ignore_list_checks_for_unicode(fake_fileexists):
f[1].audiosize = 2 #scanner was calling path_str for ignore list checks. Since the Path changes, it must
eq_(len(s.GetDupeGroups(f)), 0) #be unicode(path)
s = Scanner()
def test_ignore_list(self): f1 = no('foobar')
s = Scanner() f2 = no('foobar')
f1 = no('foobar') f3 = no('foobar')
f2 = no('foobar') f1.path = Path('foo1\u00e9')
f3 = no('foobar') f2.path = Path('foo2\u00e9')
f1.path = Path('dir1/foobar') f3.path = Path('foo3\u00e9')
f2.path = Path('dir2/foobar') s.ignore_list.Ignore(str(f1.path),str(f2.path))
f3.path = Path('dir3/foobar') s.ignore_list.Ignore(str(f1.path),str(f3.path))
s.ignore_list.Ignore(str(f1.path),str(f2.path)) r = s.GetDupeGroups([f1,f2,f3])
s.ignore_list.Ignore(str(f1.path),str(f3.path)) eq_(len(r), 1)
r = s.GetDupeGroups([f1,f2,f3]) g = r[0]
eq_(len(r), 1) eq_(len(g.dupes), 1)
g = r[0] assert f1 not in g
eq_(len(g.dupes), 1) assert f2 in g
assert f1 not in g assert f3 in g
assert f2 in g
assert f3 in g def test_file_evaluates_to_false(fake_fileexists):
# Ignored matches are not counted as discarded # A very wrong way to use any() was added at some point, causing resulting group list
eq_(s.discarded_file_count, 0) # to be empty.
class FalseNamedObject(NamedObject):
def test_ignore_list_checks_for_unicode(self): def __bool__(self):
#scanner was calling path_str for ignore list checks. Since the Path changes, it must return False
#be unicode(path)
s = Scanner()
f1 = no('foobar')
f2 = no('foobar')
f3 = no('foobar')
f1.path = Path('foo1\u00e9')
f2.path = Path('foo2\u00e9')
f3.path = Path('foo3\u00e9')
s.ignore_list.Ignore(str(f1.path),str(f2.path))
s.ignore_list.Ignore(str(f1.path),str(f3.path))
r = s.GetDupeGroups([f1,f2,f3])
eq_(len(r), 1)
g = r[0]
eq_(len(g.dupes), 1)
assert f1 not in g
assert f2 in g
assert f3 in g
def test_file_evaluates_to_false(self):
# A very wrong way to use any() was added at some point, causing resulting group list
# to be empty.
class FalseNamedObject(NamedObject):
def __bool__(self):
return False
s = Scanner()
f1 = FalseNamedObject('foobar')
f2 = FalseNamedObject('foobar')
r = s.GetDupeGroups([f1, f2])
eq_(len(r), 1)
def test_size_threshold(self):
# Only file equal or higher than the size_threshold in size are scanned
s = Scanner()
f1 = no('foo', 1)
f2 = no('foo', 2)
f3 = no('foo', 3)
s.size_threshold = 2
groups = s.GetDupeGroups([f1,f2,f3])
eq_(len(groups), 1)
[group] = groups
eq_(len(group), 2)
assert f1 not in group
assert f2 in group
assert f3 in group
def test_tie_breaker_path_deepness(self):
# If there is a tie in prioritization, path deepness is used as a tie breaker
s = Scanner()
o1, o2 = no('foo'), no('foo')
o1.path = Path('foo')
o2.path = Path('foo/bar')
[group] = s.GetDupeGroups([o1, o2])
assert group.ref is o2
def test_tie_breaker_copy(self):
# if copy is in the words used (even if it has a deeper path), it becomes a dupe
s = Scanner()
o1, o2 = no('foo bar Copy'), no('foo bar')
o1.path = Path('deeper/path')
o2.path = Path('foo')
[group] = s.GetDupeGroups([o1, o2])
assert group.ref is o2
def test_tie_breaker_same_name_plus_digit(self):
# if ref has the same words as dupe, but has some just one extra word which is a digit, it
# becomes a dupe
s = Scanner()
o1 = no('foo bar 42')
o2 = no('foo bar [42]')
o3 = no('foo bar (42)')
o4 = no('foo bar {42}')
o5 = no('foo bar')
# all numbered names have deeper paths, so they'll end up ref if the digits aren't correctly
# used as tie breakers
o1.path = Path('deeper/path')
o2.path = Path('deeper/path')
o3.path = Path('deeper/path')
o4.path = Path('deeper/path')
o5.path = Path('foo')
[group] = s.GetDupeGroups([o1, o2, o3, o4, o5])
assert group.ref is o5
def test_partial_group_match(self):
# Count the number od discarded matches (when a file doesn't match all other dupes of the
# group) in Scanner.discarded_file_count
s = Scanner()
o1, o2, o3 = no('a b'), no('a'), no('b')
s.min_match_percentage = 50
[group] = s.GetDupeGroups([o1, o2, o3])
eq_(len(group), 2)
assert o1 in group
assert o2 in group
assert o3 not in group
eq_(s.discarded_file_count, 1)
class ScannerTest(TestCase): s = Scanner()
def test_dont_group_files_that_dont_exist(self): f1 = FalseNamedObject('foobar')
# when creating groups, check that files exist first. It's possible that these files have f2 = FalseNamedObject('foobar')
# been moved during the scan by the user. r = s.GetDupeGroups([f1, f2])
# In this test, we have to delete one of the files between the get_matches() part and the eq_(len(r), 1)
# get_groups() part.
s = Scanner() def test_size_threshold(fake_fileexists):
s.scan_type = ScanType.Contents # Only file equal or higher than the size_threshold in size are scanned
p = self.tmppath() s = Scanner()
io.open(p + 'file1', 'w').write('foo') f1 = no('foo', 1)
io.open(p + 'file2', 'w').write('foo') f2 = no('foo', 2)
file1, file2 = fs.get_files(p) f3 = no('foo', 3)
def getmatches(*args, **kw): s.size_threshold = 2
io.remove(file2.path) groups = s.GetDupeGroups([f1,f2,f3])
return [Match(file1, file2, 100)] eq_(len(groups), 1)
s._getmatches = getmatches [group] = groups
eq_(len(group), 2)
assert not s.GetDupeGroups([file1, file2]) assert f1 not in group
assert f2 in group
assert f3 in group
def test_tie_breaker_path_deepness(fake_fileexists):
# If there is a tie in prioritization, path deepness is used as a tie breaker
s = Scanner()
o1, o2 = no('foo'), no('foo')
o1.path = Path('foo')
o2.path = Path('foo/bar')
[group] = s.GetDupeGroups([o1, o2])
assert group.ref is o2
def test_tie_breaker_copy(fake_fileexists):
# if copy is in the words used (even if it has a deeper path), it becomes a dupe
s = Scanner()
o1, o2 = no('foo bar Copy'), no('foo bar')
o1.path = Path('deeper/path')
o2.path = Path('foo')
[group] = s.GetDupeGroups([o1, o2])
assert group.ref is o2
def test_tie_breaker_same_name_plus_digit(fake_fileexists):
# if ref has the same words as dupe, but has some just one extra word which is a digit, it
# becomes a dupe
s = Scanner()
o1 = no('foo bar 42')
o2 = no('foo bar [42]')
o3 = no('foo bar (42)')
o4 = no('foo bar {42}')
o5 = no('foo bar')
# all numbered names have deeper paths, so they'll end up ref if the digits aren't correctly
# used as tie breakers
o1.path = Path('deeper/path')
o2.path = Path('deeper/path')
o3.path = Path('deeper/path')
o4.path = Path('deeper/path')
o5.path = Path('foo')
[group] = s.GetDupeGroups([o1, o2, o3, o4, o5])
assert group.ref is o5
def test_partial_group_match(fake_fileexists):
# Count the number od discarded matches (when a file doesn't match all other dupes of the
# group) in Scanner.discarded_file_count
s = Scanner()
o1, o2, o3 = no('a b'), no('a'), no('b')
s.min_match_percentage = 50
[group] = s.GetDupeGroups([o1, o2, o3])
eq_(len(group), 2)
assert o1 in group
assert o2 in group
assert o3 not in group
eq_(s.discarded_file_count, 1)
def test_dont_group_files_that_dont_exist(tmpdir):
# when creating groups, check that files exist first. It's possible that these files have
# been moved during the scan by the user.
# In this test, we have to delete one of the files between the get_matches() part and the
# get_groups() part.
s = Scanner()
s.scan_type = ScanType.Contents
p = Path(str(tmpdir))
io.open(p + 'file1', 'w').write('foo')
io.open(p + 'file2', 'w').write('foo')
file1, file2 = fs.get_files(p)
def getmatches(*args, **kw):
io.remove(file2.path)
return [Match(file1, file2, 100)]
s._getmatches = getmatches
assert not s.GetDupeGroups([file1, file2])

View File

@ -9,12 +9,11 @@
from hsutil import io from hsutil import io
from hsutil.path import Path from hsutil.path import Path
from hsutil.testcase import TestCase
from core.engine import getwords from core.engine import getwords
from ..scanner import * from ..scanner import *
class NamedObject(object): class NamedObject:
def __init__(self, name="foobar", size=1): def __init__(self, name="foobar", size=1):
self.name = name self.name = name
self.size = size self.size = size
@ -24,18 +23,18 @@ class NamedObject(object):
no = NamedObject no = NamedObject
class ScannerTestFakeFiles(TestCase): def pytest_funcarg__fake_fileexists(request):
def setUp(self): # This is a hack to avoid invalidating all previous tests since the scanner started to test
# This is a hack to avoid invalidating all previous tests since the scanner started to test # for file existence before doing the match grouping.
# for file existence before doing the match grouping. monkeypatch = request.getfuncargvalue('monkeypatch')
self.mock(io, 'exists', lambda _: True) monkeypatch.setattr(io, 'exists', lambda _: True)
def test_priorize_me(self): def test_priorize_me(fake_fileexists):
# in ScannerME, bitrate goes first (right after is_ref) in priorization # in ScannerME, bitrate goes first (right after is_ref) in priorization
s = ScannerME() s = ScannerME()
o1, o2 = no('foo'), no('foo') o1, o2 = no('foo'), no('foo')
o1.bitrate = 1 o1.bitrate = 1
o2.bitrate = 2 o2.bitrate = 2
[group] = s.GetDupeGroups([o1, o2]) [group] = s.GetDupeGroups([o1, o2])
assert group.ref is o2 assert group.ref is o2

View File

@ -9,37 +9,35 @@
import hashlib import hashlib
from hsutil.testcase import TestCase from hsutil.path import Path
from hsutil.testutil import eq_ from hscommon.testutil import eq_
from core.fs import File from core.fs import File
from core.tests.directories_test import create_fake_fs from core.tests.directories_test import create_fake_fs
from .. import fs from .. import fs
class TCBundle(TestCase): def test_size_aggregates_subfiles(tmpdir):
def test_size_aggregates_subfiles(self): p = create_fake_fs(Path(str(tmpdir)))
p = create_fake_fs(self.tmppath()) b = fs.Bundle(p)
b = fs.Bundle(p) eq_(b.size, 12)
eq_(b.size, 12)
def test_md5_aggregate_subfiles_sorted(tmpdir):
def test_md5_aggregate_subfiles_sorted(self): #dir.allfiles can return child in any order. Thus, bundle.md5 must aggregate
#dir.allfiles can return child in any order. Thus, bundle.md5 must aggregate #all files' md5 it contains, but it must make sure that it does so in the
#all files' md5 it contains, but it must make sure that it does so in the #same order everytime.
#same order everytime. p = create_fake_fs(Path(str(tmpdir)))
p = create_fake_fs(self.tmppath()) b = fs.Bundle(p)
b = fs.Bundle(p) md5s = File(p + ('dir1', 'file1.test')).md5
md5s = File(p + ('dir1', 'file1.test')).md5 md5s += File(p + ('dir2', 'file2.test')).md5
md5s += File(p + ('dir2', 'file2.test')).md5 md5s += File(p + ('dir3', 'file3.test')).md5
md5s += File(p + ('dir3', 'file3.test')).md5 md5s += File(p + 'file1.test').md5
md5s += File(p + 'file1.test').md5 md5s += File(p + 'file2.test').md5
md5s += File(p + 'file2.test').md5 md5s += File(p + 'file3.test').md5
md5s += File(p + 'file3.test').md5 md5 = hashlib.md5(md5s)
md5 = hashlib.md5(md5s) eq_(b.md5, md5.digest())
eq_(b.md5, md5.digest())
def test_has_file_attrs(tmpdir):
def test_has_file_attrs(self): #a Bundle must behave like a file, so it must have mtime attributes
#a Bundle must behave like a file, so it must have mtime attributes b = fs.Bundle(Path(str(tmpdir)))
b = fs.Bundle(self.tmppath()) assert b.mtime > 0
assert b.mtime > 0 eq_(b.extension, '')
eq_(b.extension, '')