1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2026-01-22 14:41:39 +00:00

Integrated the jobprogress library into hscommon

I have a fix to make in it and it's really silly to pretend that this
lib is of any use to anybody outside HS apps. Bringing it back here will
make things more simple.
This commit is contained in:
Virgil Dupras
2014-10-05 16:31:16 -04:00
parent 87c2fa2573
commit ac32305532
21 changed files with 775 additions and 487 deletions

View File

@@ -1,14 +1,14 @@
# Created By: Virgil Dupras
# Created On: 2006/01/29
# Copyright 2014 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import sys
from jobprogress import job
from hscommon.jobprogress import job
from hscommon.util import first
from hscommon.testutil import eq_, log_calls
@@ -48,119 +48,119 @@ class TestCasegetwords:
def test_spaces(self):
eq_(['a', 'b', 'c', 'd'], getwords("a b c d"))
eq_(['a', 'b', 'c', 'd'], getwords(" a b c d "))
def test_splitter_chars(self):
eq_(
[chr(i) for i in range(ord('a'),ord('z')+1)],
getwords("a-b_c&d+e(f)g;h\\i[j]k{l}m:n.o,p<q>r/s?t~u!v@w#x$y*z")
)
def test_joiner_chars(self):
eq_(["aec"], getwords("a'e\u0301c"))
def test_empty(self):
eq_([], getwords(''))
def test_returns_lowercase(self):
eq_(['foo', 'bar'], getwords('FOO BAR'))
def test_decompose_unicode(self):
eq_(getwords('foo\xe9bar'), ['fooebar'])
class TestCasegetfields:
def test_simple(self):
eq_([['a', 'b'], ['c', 'd', 'e']], getfields('a b - c d e'))
def test_empty(self):
eq_([], getfields(''))
def test_cleans_empty_fields(self):
expected = [['a', 'bc', 'def']]
actual = getfields(' - a bc def')
eq_(expected, actual)
expected = [['bc', 'def']]
class TestCaseunpack_fields:
def test_with_fields(self):
expected = ['a', 'b', 'c', 'd', 'e', 'f']
actual = unpack_fields([['a'], ['b', 'c'], ['d', 'e', 'f']])
eq_(expected, actual)
def test_without_fields(self):
expected = ['a', 'b', 'c', 'd', 'e', 'f']
actual = unpack_fields(['a', 'b', 'c', 'd', 'e', 'f'])
eq_(expected, actual)
def test_empty(self):
eq_([], unpack_fields([]))
class TestCaseWordCompare:
def test_list(self):
eq_(100, compare(['a', 'b', 'c', 'd'],['a', 'b', 'c', 'd']))
eq_(86, compare(['a', 'b', 'c', 'd'],['a', 'b', 'c']))
def test_unordered(self):
#Sometimes, users don't want fuzzy matching too much When they set the slider
#to 100, they don't expect a filename with the same words, but not the same order, to match.
#Thus, we want to return 99 in that case.
eq_(99, compare(['a', 'b', 'c', 'd'], ['d', 'b', 'c', 'a']))
def test_word_occurs_twice(self):
#if a word occurs twice in first, but once in second, we want the word to be only counted once
eq_(89, compare(['a', 'b', 'c', 'd', 'a'], ['d', 'b', 'c', 'a']))
def test_uses_copy_of_lists(self):
first = ['foo', 'bar']
second = ['bar', 'bleh']
compare(first, second)
eq_(['foo', 'bar'], first)
eq_(['bar', 'bleh'], second)
def test_word_weight(self):
eq_(int((6.0 / 13.0) * 100), compare(['foo', 'bar'], ['bar', 'bleh'], (WEIGHT_WORDS, )))
def test_similar_words(self):
eq_(100, compare(['the', 'white', 'stripes'],['the', 'whites', 'stripe'], (MATCH_SIMILAR_WORDS, )))
def test_empty(self):
eq_(0, compare([], []))
def test_with_fields(self):
eq_(67, compare([['a', 'b'], ['c', 'd', 'e']], [['a', 'b'], ['c', 'd', 'f']]))
def test_propagate_flags_with_fields(self, monkeypatch):
def mock_compare(first, second, flags):
eq_((0, 1, 2, 3, 5), flags)
monkeypatch.setattr(engine, 'compare_fields', mock_compare)
compare([['a']], [['a']], (0, 1, 2, 3, 5))
class TestCaseWordCompareWithFields:
def test_simple(self):
eq_(67, compare_fields([['a', 'b'], ['c', 'd', 'e']], [['a', 'b'], ['c', 'd', 'f']]))
def test_empty(self):
eq_(0, compare_fields([], []))
def test_different_length(self):
eq_(0, compare_fields([['a'], ['b']], [['a'], ['b'], ['c']]))
def test_propagates_flags(self, monkeypatch):
def mock_compare(first, second, flags):
eq_((0, 1, 2, 3, 5), flags)
monkeypatch.setattr(engine, 'compare_fields', mock_compare)
compare_fields([['a']], [['a']],(0, 1, 2, 3, 5))
def test_order(self):
first = [['a', 'b'], ['c', 'd', 'e']]
second = [['c', 'd', 'f'], ['a', 'b']]
eq_(0, compare_fields(first, second))
def test_no_order(self):
first = [['a','b'],['c','d','e']]
second = [['c','d','f'],['a','b']]
@@ -168,10 +168,10 @@ class TestCaseWordCompareWithFields:
first = [['a','b'],['a','b']] #a field can only be matched once.
second = [['c','d','f'],['a','b']]
eq_(0, compare_fields(first, second, (NO_FIELD_ORDER, )))
first = [['a','b'],['a','b','c']]
first = [['a','b'],['a','b','c']]
second = [['c','d','f'],['a','b']]
eq_(33, compare_fields(first, second, (NO_FIELD_ORDER, )))
def test_compare_fields_without_order_doesnt_alter_fields(self):
#The NO_ORDER comp type altered the fields!
first = [['a','b'],['c','d','e']]
@@ -179,7 +179,7 @@ class TestCaseWordCompareWithFields:
eq_(67, compare_fields(first, second, (NO_FIELD_ORDER, )))
eq_([['a','b'],['c','d','e']],first)
eq_([['c','d','f'],['a','b']],second)
class TestCasebuild_word_dict:
def test_with_standard_words(self):
@@ -199,30 +199,30 @@ class TestCasebuild_word_dict:
assert l[2] in d['baz']
eq_(1,len(d['bleh']))
assert l[2] in d['bleh']
def test_unpack_fields(self):
o = NamedObject('')
o.words = [['foo','bar'],['baz']]
d = build_word_dict([o])
eq_(3,len(d))
eq_(1,len(d['foo']))
def test_words_are_unaltered(self):
o = NamedObject('')
o.words = [['foo','bar'],['baz']]
build_word_dict([o])
eq_([['foo','bar'],['baz']],o.words)
def test_object_instances_can_only_be_once_in_words_object_list(self):
o = NamedObject('foo foo',True)
d = build_word_dict([o])
eq_(1,len(d['foo']))
def test_job(self):
def do_progress(p,d=''):
self.log.append(p)
return True
j = job.Job(1,do_progress)
self.log = []
s = "foo bar"
@@ -230,7 +230,7 @@ class TestCasebuild_word_dict:
# We don't have intermediate log because iter_with_progress is called with every > 1
eq_(0,self.log[0])
eq_(100,self.log[1])
class TestCasemerge_similar_words:
def test_some_similar_words(self):
@@ -242,8 +242,8 @@ class TestCasemerge_similar_words:
merge_similar_words(d)
eq_(1,len(d))
eq_(3,len(d['foobar']))
class TestCasereduce_common_words:
def test_typical(self):
@@ -254,7 +254,7 @@ class TestCasereduce_common_words:
reduce_common_words(d, 50)
assert 'foo' not in d
eq_(49,len(d['bar']))
def test_dont_remove_objects_with_only_common_words(self):
d = {
'common': set([NamedObject("common uncommon",True) for i in range(50)] + [NamedObject("common",True)]),
@@ -263,7 +263,7 @@ class TestCasereduce_common_words:
reduce_common_words(d, 50)
eq_(1,len(d['common']))
eq_(1,len(d['uncommon']))
def test_values_still_are_set_instances(self):
d = {
'common': set([NamedObject("common uncommon",True) for i in range(50)] + [NamedObject("common",True)]),
@@ -272,7 +272,7 @@ class TestCasereduce_common_words:
reduce_common_words(d, 50)
assert isinstance(d['common'],set)
assert isinstance(d['uncommon'],set)
def test_dont_raise_KeyError_when_a_word_has_been_removed(self):
#If a word has been removed by the reduce, an object in a subsequent common word that
#contains the word that has been removed would cause a KeyError.
@@ -285,14 +285,14 @@ class TestCasereduce_common_words:
reduce_common_words(d, 50)
except KeyError:
self.fail()
def test_unpack_fields(self):
#object.words may be fields.
def create_it():
o = NamedObject('')
o.words = [['foo','bar'],['baz']]
return o
d = {
'foo': set([create_it() for i in range(50)])
}
@@ -300,7 +300,7 @@ class TestCasereduce_common_words:
reduce_common_words(d, 50)
except TypeError:
self.fail("must support fields.")
def test_consider_a_reduced_common_word_common_even_after_reduction(self):
#There was a bug in the code that causeda word that has already been reduced not to
#be counted as a common word for subsequent words. For example, if 'foo' is processed
@@ -316,7 +316,7 @@ class TestCasereduce_common_words:
eq_(1,len(d['foo']))
eq_(1,len(d['bar']))
eq_(49,len(d['baz']))
class TestCaseget_match:
def test_simple(self):
@@ -328,7 +328,7 @@ class TestCaseget_match:
eq_(['bar','bleh'],m.second.words)
assert m.first is o1
assert m.second is o2
def test_in(self):
o1 = NamedObject("foo",True)
o2 = NamedObject("bar",True)
@@ -336,15 +336,15 @@ class TestCaseget_match:
assert o1 in m
assert o2 in m
assert object() not in m
def test_word_weight(self):
eq_(int((6.0 / 13.0) * 100),get_match(NamedObject("foo bar",True),NamedObject("bar bleh",True),(WEIGHT_WORDS,)).percentage)
class TestCaseGetMatches:
def test_empty(self):
eq_(getmatches([]), [])
def test_simple(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject("a b c foo")]
r = getmatches(l)
@@ -353,7 +353,7 @@ class TestCaseGetMatches:
assert_match(m, 'foo bar', 'bar bleh')
m = first(m for m in r if m.percentage == 33) #"foo bar" and "a b c foo"
assert_match(m, 'foo bar', 'a b c foo')
def test_null_and_unrelated_objects(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject(""),NamedObject("unrelated object")]
r = getmatches(l)
@@ -361,22 +361,22 @@ class TestCaseGetMatches:
m = r[0]
eq_(m.percentage, 50)
assert_match(m, 'foo bar', 'bar bleh')
def test_twice_the_same_word(self):
l = [NamedObject("foo foo bar"),NamedObject("bar bleh")]
r = getmatches(l)
eq_(1,len(r))
def test_twice_the_same_word_when_preworded(self):
l = [NamedObject("foo foo bar",True),NamedObject("bar bleh",True)]
r = getmatches(l)
eq_(1,len(r))
def test_two_words_match(self):
l = [NamedObject("foo bar"),NamedObject("foo bar bleh")]
r = getmatches(l)
eq_(1,len(r))
def test_match_files_with_only_common_words(self):
#If a word occurs more than 50 times, it is excluded from the matching process
#The problem with the common_word_threshold is that the files containing only common
@@ -385,18 +385,18 @@ class TestCaseGetMatches:
l = [NamedObject("foo") for i in range(50)]
r = getmatches(l)
eq_(1225,len(r))
def test_use_words_already_there_if_there(self):
o1 = NamedObject('foo')
o2 = NamedObject('bar')
o2.words = ['foo']
eq_(1, len(getmatches([o1,o2])))
def test_job(self):
def do_progress(p,d=''):
self.log.append(p)
return True
j = job.Job(1,do_progress)
self.log = []
s = "foo bar"
@@ -404,12 +404,12 @@ class TestCaseGetMatches:
assert len(self.log) > 2
eq_(0,self.log[0])
eq_(100,self.log[-1])
def test_weight_words(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh")]
m = getmatches(l, weight_words=True)[0]
eq_(int((6.0 / 13.0) * 100),m.percentage)
def test_similar_word(self):
l = [NamedObject("foobar"),NamedObject("foobars")]
eq_(len(getmatches(l, match_similar_words=True)), 1)
@@ -420,16 +420,16 @@ class TestCaseGetMatches:
eq_(len(getmatches(l, match_similar_words=True)), 1)
l = [NamedObject("foobar"),NamedObject("foosbar")]
eq_(len(getmatches(l, match_similar_words=True)), 1)
def test_single_object_with_similar_words(self):
l = [NamedObject("foo foos")]
eq_(len(getmatches(l, match_similar_words=True)), 0)
def test_double_words_get_counted_only_once(self):
l = [NamedObject("foo bar foo bleh"),NamedObject("foo bar bleh bar")]
m = getmatches(l)[0]
eq_(75,m.percentage)
def test_with_fields(self):
o1 = NamedObject("foo bar - foo bleh")
o2 = NamedObject("foo bar - bleh bar")
@@ -437,7 +437,7 @@ class TestCaseGetMatches:
o2.words = getfields(o2.name)
m = getmatches([o1, o2])[0]
eq_(50, m.percentage)
def test_with_fields_no_order(self):
o1 = NamedObject("foo bar - foo bleh")
o2 = NamedObject("bleh bang - foo bar")
@@ -445,11 +445,11 @@ class TestCaseGetMatches:
o2.words = getfields(o2.name)
m = getmatches([o1, o2], no_field_order=True)[0]
eq_(m.percentage, 50)
def test_only_match_similar_when_the_option_is_set(self):
l = [NamedObject("foobar"),NamedObject("foobars")]
eq_(len(getmatches(l, match_similar_words=False)), 0)
def test_dont_recurse_do_match(self):
# with nosetests, the stack is increased. The number has to be high enough not to be failing falsely
sys.setrecursionlimit(100)
@@ -460,19 +460,19 @@ class TestCaseGetMatches:
self.fail()
finally:
sys.setrecursionlimit(1000)
def test_min_match_percentage(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh"),NamedObject("a b c foo")]
r = getmatches(l, min_match_percentage=50)
eq_(1,len(r)) #Only "foo bar" / "bar bleh" should match
def test_MemoryError(self, monkeypatch):
@log_calls
def mocked_match(first, second, flags):
if len(mocked_match.calls) > 42:
raise MemoryError()
return Match(first, second, 0)
objects = [NamedObject() for i in range(10)] # results in 45 matches
monkeypatch.setattr(engine, 'get_match', mocked_match)
try:
@@ -480,13 +480,13 @@ class TestCaseGetMatches:
except MemoryError:
self.fail('MemorryError must be handled')
eq_(42, len(r))
class TestCaseGetMatchesByContents:
def test_dont_compare_empty_files(self):
o1, o2 = no(size=0), no(size=0)
assert not getmatches_by_contents([o1, o2])
class TestCaseGroup:
def test_empy(self):
@@ -494,7 +494,7 @@ class TestCaseGroup:
eq_(None,g.ref)
eq_([],g.dupes)
eq_(0,len(g.matches))
def test_add_match(self):
g = Group()
m = get_match(NamedObject("foo",True),NamedObject("bar",True))
@@ -503,7 +503,7 @@ class TestCaseGroup:
eq_([m.second],g.dupes)
eq_(1,len(g.matches))
assert m in g.matches
def test_multiple_add_match(self):
g = Group()
o1 = NamedObject("a",True)
@@ -529,13 +529,13 @@ class TestCaseGroup:
g.add_match(get_match(o3,o4))
eq_([o2,o3,o4],g.dupes)
eq_(6,len(g.matches))
def test_len(self):
g = Group()
eq_(0,len(g))
g.add_match(get_match(NamedObject("foo",True),NamedObject("bar",True)))
eq_(2,len(g))
def test_add_same_match_twice(self):
g = Group()
m = get_match(NamedObject("foo",True),NamedObject("foo",True))
@@ -545,7 +545,7 @@ class TestCaseGroup:
g.add_match(m)
eq_(2,len(g))
eq_(1,len(g.matches))
def test_in(self):
g = Group()
o1 = NamedObject("foo",True)
@@ -554,7 +554,7 @@ class TestCaseGroup:
g.add_match(get_match(o1,o2))
assert o1 in g
assert o2 in g
def test_remove(self):
g = Group()
o1 = NamedObject("foo",True)
@@ -571,7 +571,7 @@ class TestCaseGroup:
g.remove_dupe(o1)
eq_(0,len(g.matches))
eq_(0,len(g))
def test_remove_with_ref_dupes(self):
g = Group()
o1 = NamedObject("foo",True)
@@ -584,7 +584,7 @@ class TestCaseGroup:
o2.is_ref = True
g.remove_dupe(o3)
eq_(0,len(g))
def test_switch_ref(self):
o1 = NamedObject(with_words=True)
o2 = NamedObject(with_words=True)
@@ -598,7 +598,7 @@ class TestCaseGroup:
assert o2 is g.ref
g.switch_ref(NamedObject('',True))
assert o2 is g.ref
def test_switch_ref_from_ref_dir(self):
# When the ref dupe is from a ref dir, switch_ref() does nothing
o1 = no(with_words=True)
@@ -608,7 +608,7 @@ class TestCaseGroup:
g.add_match(get_match(o1, o2))
g.switch_ref(o2)
assert o1 is g.ref
def test_get_match_of(self):
g = Group()
for m in get_match_triangle():
@@ -619,7 +619,7 @@ class TestCaseGroup:
assert o in m
assert g.get_match_of(NamedObject('',True)) is None
assert g.get_match_of(g.ref) is None
def test_percentage(self):
#percentage should return the avg percentage in relation to the ref
m1,m2,m3 = get_match_triangle()
@@ -638,11 +638,11 @@ class TestCaseGroup:
g.add_match(m1)
g.add_match(m2)
eq_(66,g.percentage)
def test_percentage_on_empty_group(self):
g = Group()
eq_(0,g.percentage)
def test_prioritize(self):
m1,m2,m3 = get_match_triangle()
o1 = m1.first
@@ -658,7 +658,7 @@ class TestCaseGroup:
assert o1 is g.ref
assert g.prioritize(lambda x:x.name)
assert o3 is g.ref
def test_prioritize_with_tie_breaker(self):
# if the ref has the same key as one or more of the dupe, run the tie_breaker func among them
g = get_test_group()
@@ -666,9 +666,9 @@ class TestCaseGroup:
tie_breaker = lambda ref, dupe: dupe is o3
g.prioritize(lambda x:0, tie_breaker)
assert g.ref is o3
def test_prioritize_with_tie_breaker_runs_on_all_dupes(self):
# Even if a dupe is chosen to switch with ref with a tie breaker, we still run the tie breaker
# Even if a dupe is chosen to switch with ref with a tie breaker, we still run the tie breaker
# with other dupes and the newly chosen ref
g = get_test_group()
o1, o2, o3 = g.ordered
@@ -678,7 +678,7 @@ class TestCaseGroup:
tie_breaker = lambda ref, dupe: dupe.foo > ref.foo
g.prioritize(lambda x:0, tie_breaker)
assert g.ref is o3
def test_prioritize_with_tie_breaker_runs_only_on_tie_dupes(self):
# The tie breaker only runs on dupes that had the same value for the key_func
g = get_test_group()
@@ -693,7 +693,7 @@ class TestCaseGroup:
tie_breaker = lambda ref, dupe: dupe.bar > ref.bar
g.prioritize(key_func, tie_breaker)
assert g.ref is o2
def test_prioritize_with_ref_dupe(self):
# when the ref dupe of a group is from a ref dir, make it stay on top.
g = get_test_group()
@@ -702,7 +702,7 @@ class TestCaseGroup:
o2.size = 2
g.prioritize(lambda x: -x.size)
assert g.ref is o1
def test_prioritize_nothing_changes(self):
# prioritize() returns False when nothing changes in the group.
g = get_test_group()
@@ -710,14 +710,14 @@ class TestCaseGroup:
g[1].name = 'b'
g[2].name = 'c'
assert not g.prioritize(lambda x:x.name)
def test_list_like(self):
g = Group()
o1,o2 = (NamedObject("foo",True),NamedObject("bar",True))
g.add_match(get_match(o1,o2))
assert g[0] is o1
assert g[1] is o2
def test_discard_matches(self):
g = Group()
o1,o2,o3 = (NamedObject("foo",True),NamedObject("bar",True),NamedObject("baz",True))
@@ -726,13 +726,13 @@ class TestCaseGroup:
g.discard_matches()
eq_(1,len(g.matches))
eq_(0,len(g.candidates))
class TestCaseget_groups:
def test_empty(self):
r = get_groups([])
eq_([],r)
def test_simple(self):
l = [NamedObject("foo bar"),NamedObject("bar bleh")]
matches = getmatches(l)
@@ -742,7 +742,7 @@ class TestCaseget_groups:
g = r[0]
assert g.ref is m.first
eq_([m.second],g.dupes)
def test_group_with_multiple_matches(self):
#This results in 3 matches
l = [NamedObject("foo"),NamedObject("foo"),NamedObject("foo")]
@@ -751,7 +751,7 @@ class TestCaseget_groups:
eq_(1,len(r))
g = r[0]
eq_(3,len(g))
def test_must_choose_a_group(self):
l = [NamedObject("a b"),NamedObject("a b"),NamedObject("b c"),NamedObject("c d"),NamedObject("c d")]
#There will be 2 groups here: group "a b" and group "c d"
@@ -760,7 +760,7 @@ class TestCaseget_groups:
r = get_groups(matches)
eq_(2,len(r))
eq_(5,len(r[0])+len(r[1]))
def test_should_all_go_in_the_same_group(self):
l = [NamedObject("a b"),NamedObject("a b"),NamedObject("a b"),NamedObject("a b")]
#There will be 2 groups here: group "a b" and group "c d"
@@ -768,7 +768,7 @@ class TestCaseget_groups:
matches = getmatches(l)
r = get_groups(matches)
eq_(1,len(r))
def test_give_priority_to_matches_with_higher_percentage(self):
o1 = NamedObject(with_words=True)
o2 = NamedObject(with_words=True)
@@ -782,14 +782,14 @@ class TestCaseget_groups:
assert o1 not in g
assert o2 in g
assert o3 in g
def test_four_sized_group(self):
l = [NamedObject("foobar") for i in range(4)]
m = getmatches(l)
r = get_groups(m)
eq_(1,len(r))
eq_(4,len(r[0]))
def test_referenced_by_ref2(self):
o1 = NamedObject(with_words=True)
o2 = NamedObject(with_words=True)
@@ -799,12 +799,12 @@ class TestCaseget_groups:
m3 = get_match(o3,o2)
r = get_groups([m1,m2,m3])
eq_(3,len(r[0]))
def test_job(self):
def do_progress(p,d=''):
self.log.append(p)
return True
self.log = []
j = job.Job(1,do_progress)
m1,m2,m3 = get_match_triangle()
@@ -813,7 +813,7 @@ class TestCaseget_groups:
get_groups([m1,m2,m3,m4],j)
eq_(0,self.log[0])
eq_(100,self.log[-1])
def test_group_admissible_discarded_dupes(self):
# If, with a (A, B, C, D) set, all match with A, but C and D don't match with B and that the
# (A, B) match is the highest (thus resulting in an (A, B) group), still match C and D
@@ -830,4 +830,4 @@ class TestCaseget_groups:
assert B in g1
assert C in g2
assert D in g2