1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2026-01-22 14:41:39 +00:00

Added hscommon repo as a subtree

This commit is contained in:
Virgil Dupras
2013-06-22 21:32:23 -04:00
parent 95623f9b47
commit 94a469205a
62 changed files with 6553 additions and 0 deletions

View File

View File

@@ -0,0 +1,104 @@
# Created By: Virgil Dupras
# Created On: 2008-01-08
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from ..conflict import *
from ..path import Path
from ..testutil import eq_
class TestCase_GetConflictedName:
def test_simple(self):
name = get_conflicted_name(['bar'], 'bar')
eq_('[000] bar', name)
name = get_conflicted_name(['bar', '[000] bar'], 'bar')
eq_('[001] bar', name)
def test_no_conflict(self):
name = get_conflicted_name(['bar'], 'foobar')
eq_('foobar', name)
def test_fourth_digit(self):
# This test is long because every time we have to add a conflicted name,
# a test must be made for every other conflicted name existing...
# Anyway, this has very few chances to happen.
names = ['bar'] + ['[%03d] bar' % i for i in range(1000)]
name = get_conflicted_name(names, 'bar')
eq_('[1000] bar', name)
def test_auto_unconflict(self):
# Automatically unconflict the name if it's already conflicted.
name = get_conflicted_name([], '[000] foobar')
eq_('foobar', name)
name = get_conflicted_name(['bar'], '[001] bar')
eq_('[000] bar', name)
class TestCase_GetUnconflictedName:
def test_main(self):
eq_('foobar',get_unconflicted_name('[000] foobar'))
eq_('foobar',get_unconflicted_name('[9999] foobar'))
eq_('[000]foobar',get_unconflicted_name('[000]foobar'))
eq_('[000a] foobar',get_unconflicted_name('[000a] foobar'))
eq_('foobar',get_unconflicted_name('foobar'))
eq_('foo [000] bar',get_unconflicted_name('foo [000] bar'))
class TestCase_IsConflicted:
def test_main(self):
assert is_conflicted('[000] foobar')
assert is_conflicted('[9999] foobar')
assert not is_conflicted('[000]foobar')
assert not is_conflicted('[000a] foobar')
assert not is_conflicted('foobar')
assert not is_conflicted('foo [000] bar')
class TestCase_move_copy:
def pytest_funcarg__do_setup(self, request):
tmpdir = request.getfuncargvalue('tmpdir')
self.path = Path(str(tmpdir))
io.open(self.path + 'foo', 'w').close()
io.open(self.path + 'bar', 'w').close()
io.mkdir(self.path + 'dir')
def test_move_no_conflict(self, do_setup):
smart_move(self.path + 'foo', self.path + 'baz')
assert io.exists(self.path + 'baz')
assert not io.exists(self.path + 'foo')
def test_copy_no_conflict(self, do_setup): # No need to duplicate the rest of the tests... Let's just test on move
smart_copy(self.path + 'foo', self.path + 'baz')
assert io.exists(self.path + 'baz')
assert io.exists(self.path + 'foo')
def test_move_no_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + 'foo', self.path + 'dir')
assert io.exists(self.path + ('dir', 'foo'))
assert not io.exists(self.path + 'foo')
def test_move_conflict(self, do_setup):
smart_move(self.path + 'foo', self.path + 'bar')
assert io.exists(self.path + '[000] bar')
assert not io.exists(self.path + 'foo')
def test_move_conflict_dest_is_dir(self, do_setup):
smart_move(self.path + 'foo', self.path + 'dir')
smart_move(self.path + 'bar', self.path + 'foo')
smart_move(self.path + 'foo', self.path + 'dir')
assert io.exists(self.path + ('dir', 'foo'))
assert io.exists(self.path + ('dir', '[000] foo'))
assert not io.exists(self.path + 'foo')
assert not io.exists(self.path + 'bar')
def test_copy_folder(self, tmpdir):
# smart_copy also works on folders
path = Path(str(tmpdir))
io.mkdir(path + 'foo')
io.mkdir(path + 'bar')
smart_copy(path + 'foo', path + 'bar') # no crash
assert io.exists(path + '[000] bar')

View File

@@ -0,0 +1,209 @@
# Created By: Virgil Dupras
# Created On: 2008-04-20
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from datetime import date
import sqlite3 as sqlite
from .. import io
from ..testutil import eq_, assert_almost_equal
from ..currency import Currency, RatesDB, CAD, EUR, PLN, USD
def setup_module(module):
global FOO
global BAR
FOO = Currency.register('FOO', 'Currency with start date', start_date=date(2009, 1, 12), start_rate=2)
BAR = Currency.register('BAR', 'Currency with stop date', stop_date=date(2010, 1, 12), latest_rate=2)
def teardown_module(module):
# We must unset our test currencies or else we might mess up with other tests.
from .. import currency
import imp
imp.reload(currency)
def teardown_function(function):
Currency.set_rates_db(None)
def test_currency_creation():
# Different ways to create a currency.
eq_(Currency('CAD'), CAD)
eq_(Currency(name='Canadian dollar'), CAD)
def test_currency_copy():
# Currencies can be copied.
import copy
eq_(copy.copy(CAD), CAD)
eq_(copy.deepcopy(CAD), CAD)
def test_get_rate_on_empty_db():
# When there is no data available, use the start_rate.
eq_(CAD.value_in(USD, date(2008, 4, 20)), 1 / USD.latest_rate)
def test_physical_rates_db_remember_rates(tmpdir):
# When a rates db uses a real file, rates are remembered
dbpath = str(tmpdir.join('foo.db'))
db = RatesDB(dbpath)
db.set_CAD_value(date(2008, 4, 20), 'USD', 1/0.996115)
db = RatesDB(dbpath)
assert_almost_equal(db.get_rate(date(2008, 4, 20), 'CAD', 'USD'), 0.996115)
def test_db_with_connection():
# the supplied connection is used by the rates db.
con = sqlite.connect(':memory:')
db = RatesDB(con)
try:
con.execute("select * from rates where 1=2")
except sqlite.OperationalError: # new db
raise AssertionError()
def test_corrupt_db(tmpdir):
dbpath = str(tmpdir.join('foo.db'))
fh = io.open(dbpath, 'w')
fh.write('corrupted')
fh.close()
db = RatesDB(dbpath) # no crash. deletes the old file and start a new db
db.set_CAD_value(date(2008, 4, 20), 'USD', 42)
db = RatesDB(dbpath)
eq_(db.get_rate(date(2008, 4, 20), 'USD', 'CAD'), 42)
#--- Daily rate
def setup_daily_rate():
USD.set_CAD_value(1/0.996115, date(2008, 4, 20))
def test_get_rate_with_daily_rate():
# Getting the rate exactly as set_rate happened returns the same rate.
setup_daily_rate()
assert_almost_equal(CAD.value_in(USD, date(2008, 4, 20)), 0.996115)
def test_get_rate_different_currency():
# Use fallback rates when necessary.
setup_daily_rate()
eq_(CAD.value_in(EUR, date(2008, 4, 20)), 1 / EUR.latest_rate)
eq_(EUR.value_in(USD, date(2008, 4, 20)), EUR.latest_rate * 0.996115)
def test_get_rate_reverse():
# It's possible to get the reverse value of a rate using the same data.
setup_daily_rate()
assert_almost_equal(USD.value_in(CAD, date(2008, 4, 20)), 1 / 0.996115)
def test_set_rate_twice():
# When setting a rate for an index that already exists, the old rate is replaced by the new.
setup_daily_rate()
USD.set_CAD_value(1/42, date(2008, 4, 20))
assert_almost_equal(CAD.value_in(USD, date(2008, 4, 20)), 42)
def test_set_rate_after_get():
# When setting a rate after a get of the same rate, the rate cache is correctly updated.
setup_daily_rate()
CAD.value_in(USD, date(2008, 4, 20)) # value will be cached
USD.set_CAD_value(1/42, date(2008, 4, 20))
assert_almost_equal(CAD.value_in(USD, date(2008, 4, 20)), 42)
def test_set_rate_after_get_the_day_after():
# When setting a rate, the cache for the whole currency is reset, or else we get old fallback
# values for dates where the currency server returned no value.
setup_daily_rate()
CAD.value_in(USD, date(2008, 4, 21)) # value will be cached
USD.set_CAD_value(1/42, date(2008, 4, 20))
assert_almost_equal(CAD.value_in(USD, date(2008, 4, 21)), 42)
#--- Two daily rates
def setup_two_daily_rate():
# Don't change the set order, it's important for the tests
USD.set_CAD_value(1/0.997115, date(2008, 4, 25))
USD.set_CAD_value(1/0.996115, date(2008, 4, 20))
def test_date_range_range():
# USD.rates_date_range() returns the USD's limits.
setup_two_daily_rate()
eq_(USD.rates_date_range(), (date(2008, 4, 20), date(2008, 4, 25)))
def test_date_range_for_unfetched_currency():
# If the curency is not in the DB, return None.
setup_two_daily_rate()
assert PLN.rates_date_range() is None
def test_seek_rate_middle():
# A rate request with seek in the middle will return the lowest date.
setup_two_daily_rate()
eq_(USD.value_in(CAD, date(2008, 4, 24)), 1/0.996115)
def test_seek_rate_after():
# Make sure that the *nearest* lowest rate is returned. Because the 25th have been set
# before the 20th, an order by clause is required in the seek SQL to make this test pass.
setup_two_daily_rate()
eq_(USD.value_in(CAD, date(2008, 4, 26)), 1/0.997115)
def test_seek_rate_before():
# If there are no rate in the past, seek for a rate in the future.
setup_two_daily_rate()
eq_(USD.value_in(CAD, date(2008, 4, 19)), 1/0.996115)
#--- Rates of multiple currencies
def setup_rates_of_multiple_currencies():
USD.set_CAD_value(1/0.996115, date(2008, 4, 20))
EUR.set_CAD_value(1/0.633141, date(2008, 4, 20))
def test_get_rate_multiple_currencies():
# Don't mix currency rates up.
setup_rates_of_multiple_currencies()
assert_almost_equal(CAD.value_in(USD, date(2008, 4, 20)), 0.996115)
assert_almost_equal(CAD.value_in(EUR, date(2008, 4, 20)), 0.633141)
def test_get_rate_with_pivotal():
# It's possible to get a rate by using 2 records.
# if 1 CAD = 0.996115 USD and 1 CAD = 0.633141 then 0.996115 USD = 0.633141 then 1 USD = 0.633141 / 0.996115 EUR
setup_rates_of_multiple_currencies()
assert_almost_equal(USD.value_in(EUR, date(2008, 4, 20)), 0.633141 / 0.996115)
def test_get_rate_doesnt_exist():
# Don't crash when trying to do pivotal calculation with non-existing currencies.
setup_rates_of_multiple_currencies()
eq_(USD.value_in(PLN, date(2008, 4, 20)), 1 / 0.996115 / PLN.latest_rate)
#--- Problems after connection
def get_problematic_db():
class MockConnection(sqlite.Connection): # can't mock sqlite3.Connection's attribute, so we subclass it
mocking = False
def execute(self, *args, **kwargs):
if self.mocking:
raise sqlite.OperationalError()
else:
return sqlite.Connection.execute(self, *args, **kwargs)
con = MockConnection(':memory:')
db = RatesDB(con)
con.mocking = True
return db
def test_date_range_with_problematic_db():
db = get_problematic_db()
db.date_range('USD') # no crash
def test_get_rate_with_problematic_db():
db = get_problematic_db()
db.get_rate(date(2008, 4, 20), 'USD', 'CAD') # no crash
def test_set_rate_with_problematic_db():
db = get_problematic_db()
db.set_CAD_value(date(2008, 4, 20), 'USD', 42) # no crash
#--- DB that doesn't allow get_rate calls
def setup_db_raising_error_on_getrate():
db = RatesDB()
def mock_get_rate(*args, **kwargs):
raise AssertionError()
db.get_rate = mock_get_rate
Currency.set_rates_db(db)
def test_currency_with_start_date():
setup_db_raising_error_on_getrate()
eq_(FOO.value_in(CAD, date(2009, 1, 11)), 2)
def test_currency_with_stop_date():
setup_db_raising_error_on_getrate()
eq_(BAR.value_in(CAD, date(2010, 1, 13)), 2)

View File

@@ -0,0 +1,140 @@
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from ..testutil import eq_
from ..notify import Broadcaster, Listener, Repeater
class HelloListener(Listener):
def __init__(self, broadcaster):
Listener.__init__(self, broadcaster)
self.hello_count = 0
def hello(self):
self.hello_count += 1
class HelloRepeater(Repeater):
def __init__(self, broadcaster):
Repeater.__init__(self, broadcaster)
self.hello_count = 0
def hello(self):
self.hello_count += 1
def create_pair():
b = Broadcaster()
l = HelloListener(b)
return b, l
def test_disconnect_during_notification():
# When a listener disconnects another listener the other listener will not receive a
# notification.
# This whole complication scheme below is because the order of the notification is not
# guaranteed. We could disconnect everything from self.broadcaster.listeners, but this
# member is supposed to be private. Hence, the '.other' scheme
class Disconnecter(Listener):
def __init__(self, broadcaster):
Listener.__init__(self, broadcaster)
self.hello_count = 0
def hello(self):
self.hello_count += 1
self.other.disconnect()
broadcaster = Broadcaster()
first = Disconnecter(broadcaster)
second = Disconnecter(broadcaster)
first.other, second.other = second, first
first.connect()
second.connect()
broadcaster.notify('hello')
# only one of them was notified
eq_(first.hello_count + second.hello_count, 1)
def test_disconnect():
# After a disconnect, the listener doesn't hear anything.
b, l = create_pair()
l.connect()
l.disconnect()
b.notify('hello')
eq_(l.hello_count, 0)
def test_disconnect_when_not_connected():
# When disconnecting an already disconnected listener, nothing happens.
b, l = create_pair()
l.disconnect()
def test_not_connected_on_init():
# A listener is not initialized connected.
b, l = create_pair()
b.notify('hello')
eq_(l.hello_count, 0)
def test_notify():
# The listener listens to the broadcaster.
b, l = create_pair()
l.connect()
b.notify('hello')
eq_(l.hello_count, 1)
def test_reconnect():
# It's possible to reconnect a listener after disconnection.
b, l = create_pair()
l.connect()
l.disconnect()
l.connect()
b.notify('hello')
eq_(l.hello_count, 1)
def test_repeater():
b = Broadcaster()
r = HelloRepeater(b)
l = HelloListener(r)
r.connect()
l.connect()
b.notify('hello')
eq_(r.hello_count, 1)
eq_(l.hello_count, 1)
def test_repeater_with_repeated_notifications():
# If REPEATED_NOTIFICATIONS is not empty, only notifs in this set are repeated (but they're
# still dispatched locally).
class MyRepeater(HelloRepeater):
REPEATED_NOTIFICATIONS = set(['hello'])
def __init__(self, broadcaster):
HelloRepeater.__init__(self, broadcaster)
self.foo_count = 0
def foo(self):
self.foo_count += 1
b = Broadcaster()
r = MyRepeater(b)
l = HelloListener(r)
r.connect()
l.connect()
b.notify('hello')
b.notify('foo') # if the repeater repeated this notif, we'd get a crash on HelloListener
eq_(r.hello_count, 1)
eq_(l.hello_count, 1)
eq_(r.foo_count, 1)
def test_repeater_doesnt_try_to_dispatch_to_self_if_it_cant():
# if a repeater doesn't handle a particular message, it doesn't crash and simply repeats it.
b = Broadcaster()
r = Repeater(b) # doesnt handle hello
l = HelloListener(r)
r.connect()
l.connect()
b.notify('hello') # no crash
eq_(l.hello_count, 1)
def test_bind_messages():
b, l = create_pair()
l.bind_messages({'foo', 'bar'}, l.hello)
l.connect()
b.notify('foo')
b.notify('bar')
b.notify('hello') # Normal dispatching still work
eq_(l.hello_count, 3)

209
hscommon/tests/path_test.py Normal file
View File

@@ -0,0 +1,209 @@
# Created By: Virgil Dupras
# Created On: 2006/02/21
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import sys
from pytest import raises, mark
from ..path import *
from ..testutil import eq_
def pytest_funcarg__force_ossep(request):
monkeypatch = request.getfuncargvalue('monkeypatch')
monkeypatch.setattr(os, 'sep', '/')
def test_empty(force_ossep):
path = Path('')
eq_('',str(path))
eq_(0,len(path))
path = Path(())
eq_('',str(path))
eq_(0,len(path))
def test_single(force_ossep):
path = Path('foobar')
eq_('foobar',path)
eq_(1,len(path))
def test_multiple(force_ossep):
path = Path('foo/bar')
eq_('foo/bar',path)
eq_(2,len(path))
def test_init_with_tuple_and_list(force_ossep):
path = Path(('foo','bar'))
eq_('foo/bar',path)
path = Path(['foo','bar'])
eq_('foo/bar',path)
def test_init_with_invalid_value(force_ossep):
try:
path = Path(42)
self.fail()
except TypeError:
pass
def test_access(force_ossep):
path = Path('foo/bar/bleh')
eq_('foo',path[0])
eq_('foo',path[-3])
eq_('bar',path[1])
eq_('bar',path[-2])
eq_('bleh',path[2])
eq_('bleh',path[-1])
def test_slicing(force_ossep):
path = Path('foo/bar/bleh')
subpath = path[:2]
eq_('foo/bar',subpath)
assert isinstance(subpath,Path)
def test_deal_with_empty_components(force_ossep):
"""Keep ONLY a leading space, which means we want a leading slash.
"""
eq_('foo//bar',str(Path(('foo','','bar'))))
eq_('/foo/bar',str(Path(('','foo','bar'))))
eq_('foo/bar',str(Path('foo/bar/')))
def test_old_compare_paths(force_ossep):
eq_(Path('foobar'),Path('foobar'))
eq_(Path('foobar/'),Path('foobar\\','\\'))
eq_(Path('/foobar/'),Path('\\foobar\\','\\'))
eq_(Path('/foo/bar'),Path('\\foo\\bar','\\'))
eq_(Path('/foo/bar'),Path('\\foo\\bar\\','\\'))
assert Path('/foo/bar') != Path('\\foo\\foo','\\')
#We also have to test __ne__
assert not (Path('foobar') != Path('foobar'))
assert Path('/a/b/c.x') != Path('/a/b/c.y')
def test_old_split_path(force_ossep):
eq_(Path('foobar'),('foobar',))
eq_(Path('foo/bar'),('foo','bar'))
eq_(Path('/foo/bar/'),('','foo','bar'))
eq_(Path('\\foo\\bar','\\'),('','foo','bar'))
def test_representation(force_ossep):
eq_("('foo', 'bar')",repr(Path(('foo','bar'))))
def test_add(force_ossep):
eq_('foo/bar/bar/foo',Path(('foo','bar')) + Path('bar/foo'))
eq_('foo/bar/bar/foo',Path('foo/bar') + 'bar/foo')
eq_('foo/bar/bar/foo',Path('foo/bar') + ('bar','foo'))
eq_('foo/bar/bar/foo',('foo','bar') + Path('bar/foo'))
eq_('foo/bar/bar/foo','foo/bar' + Path('bar/foo'))
#Invalid concatenation
try:
Path(('foo','bar')) + 1
self.fail()
except TypeError:
pass
def test_path_slice(force_ossep):
foo = Path('foo')
bar = Path('bar')
foobar = Path('foo/bar')
eq_('bar',foobar[foo:])
eq_('foo',foobar[:bar])
eq_('foo/bar',foobar[bar:])
eq_('foo/bar',foobar[:foo])
eq_((),foobar[foobar:])
eq_((),foobar[:foobar])
abcd = Path('a/b/c/d')
a = Path('a')
b = Path('b')
c = Path('c')
d = Path('d')
z = Path('z')
eq_('b/c',abcd[a:d])
eq_('b/c/d',abcd[a:d+z])
eq_('b/c',abcd[a:z+d])
eq_('a/b/c/d',abcd[:z])
def test_add_with_root_path(force_ossep):
"""if I perform /a/b/c + /d/e/f, I want /a/b/c/d/e/f, not /a/b/c//d/e/f
"""
eq_('/foo/bar',str(Path('/foo') + Path('/bar')))
def test_create_with_tuple_that_have_slash_inside(force_ossep, monkeypatch):
eq_(('','foo','bar'), Path(('/foo','bar')))
monkeypatch.setattr(os, 'sep', '\\')
eq_(('','foo','bar'), Path(('\\foo','bar')))
def test_auto_decode_os_sep(force_ossep, monkeypatch):
"""Path should decode any either / or os.sep, but always encode in os.sep.
"""
eq_(('foo\\bar','bleh'),Path('foo\\bar/bleh'))
monkeypatch.setattr(os, 'sep', '\\')
eq_(('foo','bar/bleh'),Path('foo\\bar/bleh'))
path = Path('foo/bar')
eq_(('foo','bar'),path)
eq_('foo\\bar',str(path))
def test_contains(force_ossep):
p = Path(('foo','bar'))
assert Path(('foo','bar','bleh')) in p
assert Path(('foo','bar')) in p
assert 'foo' in p
assert 'bleh' not in p
assert Path('foo') not in p
def test_windows_drive_letter(force_ossep):
p = Path(('c:',))
eq_('c:\\',str(p))
def test_root_path(force_ossep):
p = Path('/')
eq_('/',str(p))
def test_str_encodes_unicode_to_getfilesystemencoding(force_ossep):
p = Path(('foo','bar\u00e9'))
eq_('foo/bar\u00e9'.encode(sys.getfilesystemencoding()), p.tobytes())
def test_unicode(force_ossep):
p = Path(('foo','bar\u00e9'))
eq_('foo/bar\u00e9',str(p))
def test_str_repr_of_mix_between_non_ascii_str_and_unicode(force_ossep):
u = 'foo\u00e9'
encoded = u.encode(sys.getfilesystemencoding())
p = Path((encoded,'bar'))
print(repr(tuple(p)))
eq_('foo\u00e9/bar'.encode(sys.getfilesystemencoding()), p.tobytes())
def test_Path_of_a_Path_returns_self(force_ossep):
#if Path() is called with a path as value, just return value.
p = Path('foo/bar')
assert Path(p) is p
@mark.xfail(reason="pytest's capture mechanism is flaky, I have to investigate")
def test_log_unicode_errors(force_ossep, monkeypatch, capsys):
# When an there's a UnicodeDecodeError on path creation, log it so it can be possible
# to debug the cause of it.
monkeypatch.setattr(sys, 'getfilesystemencoding', lambda: 'ascii')
with raises(UnicodeDecodeError):
Path(['', b'foo\xe9'])
out, err = capsys.readouterr()
assert repr(b'foo\xe9') in err
def test_has_drive_letter(monkeypatch):
monkeypatch.setattr(os, 'sep', '\\')
p = Path('foo\\bar')
assert not p.has_drive_letter()
p = Path('C:\\')
assert p.has_drive_letter()
p = Path('z:\\foo')
assert p.has_drive_letter()
def test_remove_drive_letter(monkeypatch):
monkeypatch.setattr(os, 'sep', '\\')
p = Path('foo\\bar')
eq_(p.remove_drive_letter(), Path('foo\\bar'))
p = Path('C:\\')
eq_(p.remove_drive_letter(), Path(''))
p = Path('z:\\foo')
eq_(p.remove_drive_letter(), Path('foo'))

View File

@@ -0,0 +1,68 @@
# Created By: Virgil Dupras
# Created On: 2010-01-31
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from hashlib import md5
from ..testutil import CallLogger
from ..reg import RegistrableApplication, InvalidCodeError
def md5s(s):
return md5(s.encode('utf-8')).hexdigest()
def assert_valid(appid, code, email):
app = RegistrableApplication(CallLogger(), appid)
try:
app.validate_code(code, email)
except InvalidCodeError as e:
raise AssertionError("Registration failed: {0}".format(str(e)))
def assert_invalid(appid, code, email, msg_contains=None):
app = RegistrableApplication(CallLogger(), appid)
try:
app.validate_code(code, email)
except InvalidCodeError as e:
if msg_contains:
assert msg_contains in str(e)
else:
raise AssertionError("InvalidCodeError not raised")
def test_valid_code():
email = 'foo@bar.com'
appid = 42
code = md5s('42' + email + '43' + 'aybabtu')
assert_valid(appid, code, email)
def test_invalid_code():
email = 'foo@bar.com'
appid = 42
code = md5s('43' + email + '43' + 'aybabtu')
assert_invalid(appid, code, email)
def test_suggest_other_apps():
# If a code is valid for another app, say so in the error message.
email = 'foo@bar.com'
appid = 42
# 2 is moneyGuru's appid
code = md5s('2' + email + '43' + 'aybabtu')
assert_invalid(appid, code, email, msg_contains="moneyGuru")
def test_invert_code_and_email():
# Try inverting code and email during validation in case the user mixed the fields up.
# We still show an error here. It kind of sucks, but if we don't, the email and code fields
# end up mixed up in the preferences. It's not as if this kind of error happened all the time...
email = 'foo@bar.com'
appid = 42
code = md5s('42' + email + '43' + 'aybabtu')
assert_invalid(appid, email, code, msg_contains="inverted")
def test_paypal_transaction():
# If the code looks like a paypal transaction, mention it in the error message.
email = 'foo@bar.com'
appid = 42
code = '2A693827WX9676888'
assert_invalid(appid, code, email, 'Paypal transaction')

View File

@@ -0,0 +1,65 @@
# Created By: Virgil Dupras
# Created On: 2011-09-06
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from ..testutil import eq_, callcounter, CallLogger
from ..gui.selectable_list import SelectableList, GUISelectableList
def test_in():
# When a SelectableList is in a list, doing "in list" with another instance returns false, even
# if they're the same as lists.
sl = SelectableList()
some_list = [sl]
assert SelectableList() not in some_list
def test_selection_range():
# selection is correctly adjusted on deletion
sl = SelectableList(['foo', 'bar', 'baz'])
sl.selected_index = 3
eq_(sl.selected_index, 2)
del sl[2]
eq_(sl.selected_index, 1)
def test_update_selection_called():
# _update_selection_is called after a change in selection. However, we only do so on select()
# calls. I follow the old behavior of the Table class. At the moment, I don't quite remember
# why there was a specific select() method for triggering _update_selection(), but I think I
# remember there was a reason, so I keep it that way.
sl = SelectableList(['foo', 'bar'])
sl._update_selection = callcounter()
sl.select(1)
eq_(sl._update_selection.callcount, 1)
sl.selected_index = 0
eq_(sl._update_selection.callcount, 1) # no call
def test_guicalls():
# A GUISelectableList appropriately calls its view.
sl = GUISelectableList(['foo', 'bar'])
sl.view = CallLogger()
sl.view.check_gui_calls(['refresh']) # Upon setting the view, we get a call to refresh()
sl[1] = 'baz'
sl.view.check_gui_calls(['refresh'])
sl.append('foo')
sl.view.check_gui_calls(['refresh'])
del sl[2]
sl.view.check_gui_calls(['refresh'])
sl.remove('baz')
sl.view.check_gui_calls(['refresh'])
sl.insert(0, 'foo')
sl.view.check_gui_calls(['refresh'])
sl.select(1)
sl.view.check_gui_calls(['update_selection'])
# XXX We have to give up on this for now because of a breakage it causes in the tables.
# sl.select(1) # don't update when selection stays the same
# gui.check_gui_calls([])
def test_search_by_prefix():
sl = SelectableList(['foo', 'bAr', 'baZ'])
eq_(sl.search_by_prefix('b'), 1)
eq_(sl.search_by_prefix('BA'), 1)
eq_(sl.search_by_prefix('BAZ'), 2)
eq_(sl.search_by_prefix('BAZZ'), -1)

View File

@@ -0,0 +1,126 @@
# Created By: Virgil Dupras
# Created On: 2007/05/19
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import time
import threading
import os
import sqlite3 as sqlite
from pytest import raises
from ..testutil import eq_
from ..sqlite import ThreadedConn
# Threading is hard to test. In a lot of those tests, a failure means that the test run will
# hang forever. Well... I don't know a better alternative.
def test_can_access_from_multiple_threads():
def run():
con.execute('insert into foo(bar) values(\'baz\')')
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
t = threading.Thread(target=run)
t.start()
t.join()
result = con.execute('select * from foo')
eq_(1, len(result))
eq_('baz', result[0][0])
def test_exception_during_query():
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
with raises(sqlite.OperationalError):
con.execute('select * from bleh')
def test_not_autocommit(tmpdir):
dbpath = str(tmpdir.join('foo.db'))
con = ThreadedConn(dbpath, False)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz\')')
del con
#The data shouldn't have been inserted
con = ThreadedConn(dbpath, False)
result = con.execute('select * from foo')
eq_(0, len(result))
con.execute('insert into foo(bar) values(\'baz\')')
con.commit()
del con
# Now the data should be there
con = ThreadedConn(dbpath, False)
result = con.execute('select * from foo')
eq_(1, len(result))
def test_rollback():
con = ThreadedConn(':memory:', False)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz\')')
con.rollback()
result = con.execute('select * from foo')
eq_(0, len(result))
def test_query_palceholders():
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(?)', ['baz'])
result = con.execute('select * from foo')
eq_(1, len(result))
eq_('baz', result[0][0])
def test_make_sure_theres_no_messup_between_queries():
def run(expected_rowid):
time.sleep(0.1)
result = con.execute('select rowid from foo where rowid = ?', [expected_rowid])
assert expected_rowid == result[0][0]
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
for i in range(100):
con.execute('insert into foo(bar) values(\'baz\')')
threads = []
for i in range(1, 101):
t = threading.Thread(target=run, args=(i,))
t.start
threads.append(t)
while threads:
time.sleep(0.1)
threads = [t for t in threads if t.isAlive()]
def test_query_after_close():
con = ThreadedConn(':memory:', True)
con.close()
con.execute('select 1')
def test_lastrowid():
# It's not possible to return a cursor because of the threading, but lastrowid should be
# fetchable from the connection itself
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz\')')
eq_(1, con.lastrowid)
def test_add_fetchone_fetchall_interface_to_results():
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz1\')')
con.execute('insert into foo(bar) values(\'baz2\')')
result = con.execute('select * from foo')
ref = result[:]
eq_(ref, result.fetchall())
eq_(ref[0], result.fetchone())
eq_(ref[1], result.fetchone())
assert result.fetchone() is None
def test_non_ascii_dbname(tmpdir):
ThreadedConn(str(tmpdir.join('foo\u00e9.db')), True)
def test_non_ascii_dbdir(tmpdir):
# when this test fails, it doesn't fail gracefully, it brings the whole test suite with it.
dbdir = tmpdir.join('foo\u00e9')
os.mkdir(str(dbdir))
ThreadedConn(str(dbdir.join('foo.db')), True)

View File

@@ -0,0 +1,313 @@
# Created By: Virgil Dupras
# Created On: 2008-08-12
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from ..testutil import CallLogger, eq_
from ..gui.table import Table, GUITable, Row
class TestRow(Row):
def __init__(self, table, index, is_new=False):
Row.__init__(self, table)
self.is_new = is_new
self._index = index
def load(self):
pass
def save(self):
self.is_new = False
@property
def index(self):
return self._index
class TestGUITable(GUITable):
def __init__(self, rowcount):
GUITable.__init__(self)
self.view = CallLogger()
self.rowcount = rowcount
self.updated_rows = None
def _do_add(self):
return TestRow(self, len(self), is_new=True), len(self)
def _is_edited_new(self):
return self.edited is not None and self.edited.is_new
def _fill(self):
for i in range(self.rowcount):
self.append(TestRow(self, i))
def _update_selection(self):
self.updated_rows = self.selected_rows[:]
def table_with_footer():
table = Table()
table.append(TestRow(table, 0))
footer = TestRow(table, 1)
table.footer = footer
return table, footer
def table_with_header():
table = Table()
table.append(TestRow(table, 1))
header = TestRow(table, 0)
table.header = header
return table, header
#--- Tests
def test_allow_edit_when_attr_is_property_with_fset():
# When a row has a property that has a fset, by default, make that cell editable.
class TestRow(Row):
@property
def foo(self):
pass
@property
def bar(self):
pass
@bar.setter
def bar(self, value):
pass
row = TestRow(Table())
assert row.can_edit_cell('bar')
assert not row.can_edit_cell('foo')
assert not row.can_edit_cell('baz') # doesn't exist, can't edit
def test_can_edit_prop_has_priority_over_fset_checks():
# When a row has a cen_edit_* property, it's the result of that property that is used, not the
# result of a fset check.
class TestRow(Row):
@property
def bar(self):
pass
@bar.setter
def bar(self, value):
pass
can_edit_bar = False
row = TestRow(Table())
assert not row.can_edit_cell('bar')
def test_in():
# When a table is in a list, doing "in list" with another instance returns false, even if
# they're the same as lists.
table = Table()
some_list = [table]
assert Table() not in some_list
def test_footer_del_all():
# Removing all rows doesn't crash when doing the footer check.
table, footer = table_with_footer()
del table[:]
assert table.footer is None
def test_footer_del_row():
# Removing the footer row sets it to None
table, footer = table_with_footer()
del table[-1]
assert table.footer is None
eq_(len(table), 1)
def test_footer_is_appened_to_table():
# A footer is appended at the table's bottom
table, footer = table_with_footer()
eq_(len(table), 2)
assert table[1] is footer
def test_footer_remove():
# remove() on footer sets it to None
table, footer = table_with_footer()
table.remove(footer)
assert table.footer is None
def test_footer_replaces_old_footer():
table, footer = table_with_footer()
other = Row(table)
table.footer = other
assert table.footer is other
eq_(len(table), 2)
assert table[1] is other
def test_footer_rows_and_row_count():
# rows() and row_count() ignore footer.
table, footer = table_with_footer()
eq_(table.row_count, 1)
eq_(table.rows, table[:-1])
def test_footer_setting_to_none_removes_old_one():
table, footer = table_with_footer()
table.footer = None
assert table.footer is None
eq_(len(table), 1)
def test_footer_stays_there_on_append():
# Appending another row puts it above the footer
table, footer = table_with_footer()
table.append(Row(table))
eq_(len(table), 3)
assert table[2] is footer
def test_footer_stays_there_on_insert():
# Inserting another row puts it above the footer
table, footer = table_with_footer()
table.insert(3, Row(table))
eq_(len(table), 3)
assert table[2] is footer
def test_header_del_all():
# Removing all rows doesn't crash when doing the header check.
table, header = table_with_header()
del table[:]
assert table.header is None
def test_header_del_row():
# Removing the header row sets it to None
table, header = table_with_header()
del table[0]
assert table.header is None
eq_(len(table), 1)
def test_header_is_inserted_in_table():
# A header is inserted at the table's top
table, header = table_with_header()
eq_(len(table), 2)
assert table[0] is header
def test_header_remove():
# remove() on header sets it to None
table, header = table_with_header()
table.remove(header)
assert table.header is None
def test_header_replaces_old_header():
table, header = table_with_header()
other = Row(table)
table.header = other
assert table.header is other
eq_(len(table), 2)
assert table[0] is other
def test_header_rows_and_row_count():
# rows() and row_count() ignore header.
table, header = table_with_header()
eq_(table.row_count, 1)
eq_(table.rows, table[1:])
def test_header_setting_to_none_removes_old_one():
table, header = table_with_header()
table.header = None
assert table.header is None
eq_(len(table), 1)
def test_header_stays_there_on_insert():
# Inserting another row at the top puts it below the header
table, header = table_with_header()
table.insert(0, Row(table))
eq_(len(table), 3)
assert table[0] is header
def test_refresh_view_on_refresh():
# If refresh_view is not False, we refresh the table's view on refresh()
table = TestGUITable(1)
table.refresh()
table.view.check_gui_calls(['refresh'])
table.view.clear_calls()
table.refresh(refresh_view=False)
table.view.check_gui_calls([])
def test_restore_selection():
# By default, after a refresh, selection goes on the last row
table = TestGUITable(10)
table.refresh()
eq_(table.selected_indexes, [9])
def test_restore_selection_after_cancel_edits():
# _restore_selection() is called after cancel_edits(). Previously, only _update_selection would
# be called.
class MyTable(TestGUITable):
def _restore_selection(self, previous_selection):
self.selected_indexes = [6]
table = MyTable(10)
table.refresh()
table.add()
table.cancel_edits()
eq_(table.selected_indexes, [6])
def test_restore_selection_with_previous_selection():
# By default, we try to restore the selection that was there before a refresh
table = TestGUITable(10)
table.refresh()
table.selected_indexes = [2, 4]
table.refresh()
eq_(table.selected_indexes, [2, 4])
def test_restore_selection_custom():
# After a _fill() called, the virtual _restore_selection() is called so that it's possible for a
# GUITable subclass to customize its post-refresh selection behavior.
class MyTable(TestGUITable):
def _restore_selection(self, previous_selection):
self.selected_indexes = [6]
table = MyTable(10)
table.refresh()
eq_(table.selected_indexes, [6])
def test_row_cell_value():
# *_cell_value() correctly mangles attrnames that are Python reserved words.
row = Row(Table())
row.from_ = 'foo'
eq_(row.get_cell_value('from'), 'foo')
row.set_cell_value('from', 'bar')
eq_(row.get_cell_value('from'), 'bar')
def test_sort_table_also_tries_attributes_without_underscores():
# When determining a sort key, after having unsuccessfully tried the attribute with the,
# underscore, try the one without one.
table = Table()
row1 = Row(table)
row1._foo = 'a' # underscored attr must be checked first
row1.foo = 'b'
row1.bar = 'c'
row2 = Row(table)
row2._foo = 'b'
row2.foo = 'a'
row2.bar = 'b'
table.append(row1)
table.append(row2)
table.sort_by('foo')
assert table[0] is row1
assert table[1] is row2
table.sort_by('bar')
assert table[0] is row2
assert table[1] is row1
def test_sort_table_updates_selection():
table = TestGUITable(10)
table.refresh()
table.select([2, 4])
table.sort_by('index', desc=True)
# Now, the updated rows should be 7 and 5
eq_(len(table.updated_rows), 2)
r1, r2 = table.updated_rows
eq_(r1.index, 7)
eq_(r2.index, 5)
def test_sort_table_with_footer():
# Sorting a table with a footer keeps it at the bottom
table, footer = table_with_footer()
table.sort_by('index', desc=True)
assert table[-1] is footer
def test_sort_table_with_header():
# Sorting a table with a header keeps it at the top
table, header = table_with_header()
table.sort_by('index', desc=True)
assert table[0] is header

109
hscommon/tests/tree_test.py Normal file
View File

@@ -0,0 +1,109 @@
# Created By: Virgil Dupras
# Created On: 2010-02-12
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from ..testutil import eq_
from ..gui.tree import Tree, Node
def tree_with_some_nodes():
t = Tree()
t.append(Node('foo'))
t.append(Node('bar'))
t.append(Node('baz'))
t[0].append(Node('sub1'))
t[0].append(Node('sub2'))
return t
def test_selection():
t = tree_with_some_nodes()
assert t.selected_node is None
eq_(t.selected_nodes, [])
assert t.selected_path is None
eq_(t.selected_paths, [])
def test_select_one_node():
t = tree_with_some_nodes()
t.selected_node = t[0][0]
assert t.selected_node is t[0][0]
eq_(t.selected_nodes, [t[0][0]])
eq_(t.selected_path, [0, 0])
eq_(t.selected_paths, [[0, 0]])
def test_select_one_path():
t = tree_with_some_nodes()
t.selected_path = [0, 1]
assert t.selected_node is t[0][1]
def test_select_multiple_nodes():
t = tree_with_some_nodes()
t.selected_nodes = [t[0], t[1]]
eq_(t.selected_paths, [[0], [1]])
def test_select_multiple_paths():
t = tree_with_some_nodes()
t.selected_paths = [[0], [1]]
eq_(t.selected_nodes, [t[0], t[1]])
def test_select_none_path():
# setting selected_path to None clears the selection
t = Tree()
t.selected_path = None
assert t.selected_path is None
def test_select_none_node():
# setting selected_node to None clears the selection
t = Tree()
t.selected_node = None
eq_(t.selected_nodes, [])
def test_clear_removes_selection():
# When clearing a tree, we want to clear the selection as well or else we end up with a crash
# when calling selected_paths.
t = tree_with_some_nodes()
t.selected_path = [0]
t.clear()
assert t.selected_node is None
def test_selection_override():
# All selection changed pass through the _select_node() method so it's easy for subclasses to
# customize the tree's behavior.
class MyTree(Tree):
called = False
def _select_nodes(self, nodes):
self.called = True
t = MyTree()
t.selected_paths = []
assert t.called
t.called = False
t.selected_node = None
assert t.called
def test_findall():
t = tree_with_some_nodes()
r = t.findall(lambda n: n.name.startswith('sub'))
eq_(set(r), set([t[0][0], t[0][1]]))
def test_findall_dont_include_self():
# When calling findall with include_self=False, the node itself is never evaluated.
t = tree_with_some_nodes()
del t._name # so that if the predicate is called on `t`, we crash
r = t.findall(lambda n: not n.name.startswith('sub'), include_self=False) # no crash
eq_(set(r), set([t[0], t[1], t[2]]))
def test_find_dont_include_self():
# When calling find with include_self=False, the node itself is never evaluated.
t = tree_with_some_nodes()
del t._name # so that if the predicate is called on `t`, we crash
r = t.find(lambda n: not n.name.startswith('sub'), include_self=False) # no crash
assert r is t[0]
def test_find_none():
# when find() yields no result, return None
t = Tree()
assert t.find(lambda n: False) is None # no StopIteration exception

310
hscommon/tests/util_test.py Normal file
View File

@@ -0,0 +1,310 @@
# Created By: Virgil Dupras
# Created On: 2011-01-11
# Copyright 2013 Hardcoded Software (http://www.hardcoded.net)
#
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
from io import StringIO
from pytest import raises
from ..testutil import eq_
from .. import io
from ..path import Path
from ..util import *
def test_nonone():
eq_('foo', nonone('foo', 'bar'))
eq_('bar', nonone(None, 'bar'))
def test_tryint():
eq_(42,tryint('42'))
eq_(0,tryint('abc'))
eq_(0,tryint(None))
eq_(42,tryint(None, 42))
def test_minmax():
eq_(minmax(2, 1, 3), 2)
eq_(minmax(0, 1, 3), 1)
eq_(minmax(4, 1, 3), 3)
#--- Sequence
def test_first():
eq_(first([3, 2, 1]), 3)
eq_(first(i for i in [3, 2, 1] if i < 3), 2)
def test_flatten():
eq_([1,2,3,4],flatten([[1,2],[3,4]]))
eq_([],flatten([]))
def test_dedupe():
reflist = [0,7,1,2,3,4,4,5,6,7,1,2,3]
eq_(dedupe(reflist),[0,7,1,2,3,4,5,6])
def test_stripfalse():
eq_([1, 2, 3], stripfalse([None, 0, 1, 2, 3, None]))
def test_extract():
wheat, shaft = extract(lambda n: n % 2 == 0, list(range(10)))
eq_(wheat, [0, 2, 4, 6, 8])
eq_(shaft, [1, 3, 5, 7, 9])
def test_allsame():
assert allsame([42, 42, 42])
assert not allsame([42, 43, 42])
assert not allsame([43, 42, 42])
# Works on non-sequence as well
assert allsame(iter([42, 42, 42]))
def test_trailiter():
eq_(list(trailiter([])), [])
eq_(list(trailiter(['foo'])), [(None, 'foo')])
eq_(list(trailiter(['foo', 'bar'])), [(None, 'foo'), ('foo', 'bar')])
eq_(list(trailiter(['foo', 'bar'], skipfirst=True)), [('foo', 'bar')])
eq_(list(trailiter([], skipfirst=True)), []) # no crash
#--- String
def test_escape():
eq_('f\\o\\ob\\ar', escape('foobar', 'oa'))
eq_('f*o*ob*ar', escape('foobar', 'oa', '*'))
eq_('f*o*ob*ar', escape('foobar', set('oa'), '*'))
def test_get_file_ext():
eq_(get_file_ext("foobar"), "")
eq_(get_file_ext("foo.bar"), "bar")
eq_(get_file_ext("foobar."), "")
eq_(get_file_ext(".foobar"), "foobar")
def test_rem_file_ext():
eq_(rem_file_ext("foobar"), "foobar")
eq_(rem_file_ext("foo.bar"), "foo")
eq_(rem_file_ext("foobar."), "foobar")
eq_(rem_file_ext(".foobar"), "")
def test_pluralize():
eq_('0 song', pluralize(0,'song'))
eq_('1 song', pluralize(1,'song'))
eq_('2 songs', pluralize(2,'song'))
eq_('1 song', pluralize(1.1,'song'))
eq_('2 songs', pluralize(1.5,'song'))
eq_('1.1 songs', pluralize(1.1,'song',1))
eq_('1.5 songs', pluralize(1.5,'song',1))
eq_('2 entries', pluralize(2,'entry', plural_word='entries'))
def test_format_time():
eq_(format_time(0),'00:00:00')
eq_(format_time(1),'00:00:01')
eq_(format_time(23),'00:00:23')
eq_(format_time(60),'00:01:00')
eq_(format_time(101),'00:01:41')
eq_(format_time(683),'00:11:23')
eq_(format_time(3600),'01:00:00')
eq_(format_time(3754),'01:02:34')
eq_(format_time(36000),'10:00:00')
eq_(format_time(366666),'101:51:06')
eq_(format_time(0, with_hours=False),'00:00')
eq_(format_time(1, with_hours=False),'00:01')
eq_(format_time(23, with_hours=False),'00:23')
eq_(format_time(60, with_hours=False),'01:00')
eq_(format_time(101, with_hours=False),'01:41')
eq_(format_time(683, with_hours=False),'11:23')
eq_(format_time(3600, with_hours=False),'60:00')
eq_(format_time(6036, with_hours=False),'100:36')
eq_(format_time(60360, with_hours=False),'1006:00')
def test_format_time_decimal():
eq_(format_time_decimal(0), '0.0 second')
eq_(format_time_decimal(1), '1.0 second')
eq_(format_time_decimal(23), '23.0 seconds')
eq_(format_time_decimal(60), '1.0 minute')
eq_(format_time_decimal(101), '1.7 minutes')
eq_(format_time_decimal(683), '11.4 minutes')
eq_(format_time_decimal(3600), '1.0 hour')
eq_(format_time_decimal(6036), '1.7 hours')
eq_(format_time_decimal(86400), '1.0 day')
eq_(format_time_decimal(160360), '1.9 days')
def test_format_size():
eq_(format_size(1024), '1 KB')
eq_(format_size(1024,2), '1.00 KB')
eq_(format_size(1024,0,2), '1 MB')
eq_(format_size(1024,2,2), '0.01 MB')
eq_(format_size(1024,3,2), '0.001 MB')
eq_(format_size(1024,3,2,False), '0.001')
eq_(format_size(1023), '1023 B')
eq_(format_size(1023,0,1), '1 KB')
eq_(format_size(511,0,1), '1 KB')
eq_(format_size(9), '9 B')
eq_(format_size(99), '99 B')
eq_(format_size(999), '999 B')
eq_(format_size(9999), '10 KB')
eq_(format_size(99999), '98 KB')
eq_(format_size(999999), '977 KB')
eq_(format_size(9999999), '10 MB')
eq_(format_size(99999999), '96 MB')
eq_(format_size(999999999), '954 MB')
eq_(format_size(9999999999), '10 GB')
eq_(format_size(99999999999), '94 GB')
eq_(format_size(999999999999), '932 GB')
eq_(format_size(9999999999999), '10 TB')
eq_(format_size(99999999999999), '91 TB')
eq_(format_size(999999999999999), '910 TB')
eq_(format_size(9999999999999999), '9 PB')
eq_(format_size(99999999999999999), '89 PB')
eq_(format_size(999999999999999999), '889 PB')
eq_(format_size(9999999999999999999), '9 EB')
eq_(format_size(99999999999999999999), '87 EB')
eq_(format_size(999999999999999999999), '868 EB')
eq_(format_size(9999999999999999999999), '9 ZB')
eq_(format_size(99999999999999999999999), '85 ZB')
eq_(format_size(999999999999999999999999), '848 ZB')
def test_remove_invalid_xml():
eq_(remove_invalid_xml('foo\0bar\x0bbaz'), 'foo bar baz')
# surrogate blocks have to be replaced, but not the rest
eq_(remove_invalid_xml('foo\ud800bar\udfffbaz\ue000'), 'foo bar baz\ue000')
# replace with something else
eq_(remove_invalid_xml('foo\0baz', replace_with='bar'), 'foobarbaz')
def test_multi_replace():
eq_('136',multi_replace('123456',('2','45')))
eq_('1 3 6',multi_replace('123456',('2','45'),' '))
eq_('1 3 6',multi_replace('123456','245',' '))
eq_('173896',multi_replace('123456','245','789'))
eq_('173896',multi_replace('123456','245',('7','8','9')))
eq_('17386',multi_replace('123456',('2','45'),'78'))
eq_('17386',multi_replace('123456',('2','45'),('7','8')))
with raises(ValueError):
multi_replace('123456',('2','45'),('7','8','9'))
eq_('17346',multi_replace('12346',('2','45'),'78'))
#--- Files
class TestCase_modified_after:
def test_first_is_modified_after(self, monkeyplus):
monkeyplus.patch_osstat('first', st_mtime=42)
monkeyplus.patch_osstat('second', st_mtime=41)
assert modified_after('first', 'second')
def test_second_is_modified_after(self, monkeyplus):
monkeyplus.patch_osstat('first', st_mtime=42)
monkeyplus.patch_osstat('second', st_mtime=43)
assert not modified_after('first', 'second')
def test_same_mtime(self, monkeyplus):
monkeyplus.patch_osstat('first', st_mtime=42)
monkeyplus.patch_osstat('second', st_mtime=42)
assert not modified_after('first', 'second')
def test_first_file_does_not_exist(self, monkeyplus):
# when the first file doesn't exist, we return False
monkeyplus.patch_osstat('second', st_mtime=42)
assert not modified_after('does_not_exist', 'second') # no crash
def test_second_file_does_not_exist(self, monkeyplus):
# when the second file doesn't exist, we return True
monkeyplus.patch_osstat('first', st_mtime=42)
assert modified_after('first', 'does_not_exist') # no crash
class TestCase_delete_if_empty:
def test_is_empty(self, tmpdir):
testpath = Path(str(tmpdir))
assert delete_if_empty(testpath)
assert not io.exists(testpath)
def test_not_empty(self, tmpdir):
testpath = Path(str(tmpdir))
io.mkdir(testpath + 'foo')
assert not delete_if_empty(testpath)
assert io.exists(testpath)
def test_with_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir))
io.open(testpath + 'foo', 'w')
io.open(testpath + 'bar', 'w')
assert delete_if_empty(testpath, ['foo', 'bar'])
assert not io.exists(testpath)
def test_directory_in_files_to_delete(self, tmpdir):
testpath = Path(str(tmpdir))
io.mkdir(testpath + 'foo')
assert not delete_if_empty(testpath, ['foo'])
assert io.exists(testpath)
def test_delete_files_to_delete_only_if_dir_is_empty(self, tmpdir):
testpath = Path(str(tmpdir))
io.open(testpath + 'foo', 'w')
io.open(testpath + 'bar', 'w')
assert not delete_if_empty(testpath, ['foo'])
assert io.exists(testpath)
assert io.exists(testpath + 'foo')
def test_doesnt_exist(self):
# When the 'path' doesn't exist, just do nothing.
delete_if_empty(Path('does_not_exist')) # no crash
def test_is_file(self, tmpdir):
# When 'path' is a file, do nothing.
p = Path(str(tmpdir)) + 'filename'
io.open(p, 'w').close()
delete_if_empty(p) # no crash
def test_ioerror(self, tmpdir, monkeypatch):
# if an IO error happens during the operation, ignore it.
def do_raise(*args, **kw):
raise OSError()
monkeypatch.setattr(io, 'rmdir', do_raise)
delete_if_empty(Path(str(tmpdir))) # no crash
class TestCase_open_if_filename:
def test_file_name(self, tmpdir):
filepath = str(tmpdir.join('test.txt'))
open(filepath, 'wb').write(b'test_data')
file, close = open_if_filename(filepath)
assert close
eq_(b'test_data', file.read())
file.close()
def test_opened_file(self):
sio = StringIO()
sio.write('test_data')
sio.seek(0)
file, close = open_if_filename(sio)
assert not close
eq_('test_data', file.read())
def test_mode_is_passed_to_open(self, tmpdir):
filepath = str(tmpdir.join('test.txt'))
open(filepath, 'w').close()
file, close = open_if_filename(filepath, 'a')
eq_('a', file.mode)
file.close()
class TestCase_FileOrPath:
def test_path(self, tmpdir):
filepath = str(tmpdir.join('test.txt'))
open(filepath, 'wb').write(b'test_data')
with FileOrPath(filepath) as fp:
eq_(b'test_data', fp.read())
def test_opened_file(self):
sio = StringIO()
sio.write('test_data')
sio.seek(0)
with FileOrPath(sio) as fp:
eq_('test_data', fp.read())
def test_mode_is_passed_to_open(self, tmpdir):
filepath = str(tmpdir.join('test.txt'))
open(filepath, 'w').close()
with FileOrPath(filepath, 'a') as fp:
eq_('a', fp.mode)