1
0
mirror of https://github.com/arsenetar/dupeguru.git synced 2024-10-30 05:15:57 +00:00
dupeguru/hscommon/tests/sqlite_test.py

127 lines
4.2 KiB
Python
Raw Normal View History

2013-06-23 01:32:23 +00:00
# Created By: Virgil Dupras
# Created On: 2007/05/19
2014-02-01 21:18:00 +00:00
# Copyright 2014 Hardcoded Software (http://www.hardcoded.net)
2013-06-23 01:32:23 +00:00
# This software is licensed under the "BSD" License as described in the "LICENSE" file,
# which should be included with this package. The terms are also available at
# http://www.hardcoded.net/licenses/bsd_license
import time
import threading
import os
import sqlite3 as sqlite
from pytest import raises
from ..testutil import eq_
from ..sqlite import ThreadedConn
# Threading is hard to test. In a lot of those tests, a failure means that the test run will
# hang forever. Well... I don't know a better alternative.
def test_can_access_from_multiple_threads():
def run():
con.execute('insert into foo(bar) values(\'baz\')')
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
t = threading.Thread(target=run)
t.start()
t.join()
result = con.execute('select * from foo')
eq_(1, len(result))
eq_('baz', result[0][0])
def test_exception_during_query():
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
with raises(sqlite.OperationalError):
con.execute('select * from bleh')
def test_not_autocommit(tmpdir):
dbpath = str(tmpdir.join('foo.db'))
con = ThreadedConn(dbpath, False)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz\')')
del con
#The data shouldn't have been inserted
con = ThreadedConn(dbpath, False)
result = con.execute('select * from foo')
eq_(0, len(result))
con.execute('insert into foo(bar) values(\'baz\')')
con.commit()
del con
# Now the data should be there
con = ThreadedConn(dbpath, False)
result = con.execute('select * from foo')
eq_(1, len(result))
def test_rollback():
con = ThreadedConn(':memory:', False)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz\')')
con.rollback()
result = con.execute('select * from foo')
eq_(0, len(result))
def test_query_palceholders():
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(?)', ['baz'])
result = con.execute('select * from foo')
eq_(1, len(result))
eq_('baz', result[0][0])
def test_make_sure_theres_no_messup_between_queries():
def run(expected_rowid):
time.sleep(0.1)
result = con.execute('select rowid from foo where rowid = ?', [expected_rowid])
assert expected_rowid == result[0][0]
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
for i in range(100):
con.execute('insert into foo(bar) values(\'baz\')')
threads = []
for i in range(1, 101):
t = threading.Thread(target=run, args=(i,))
t.start
threads.append(t)
while threads:
time.sleep(0.1)
threads = [t for t in threads if t.isAlive()]
def test_query_after_close():
con = ThreadedConn(':memory:', True)
con.close()
con.execute('select 1')
def test_lastrowid():
# It's not possible to return a cursor because of the threading, but lastrowid should be
# fetchable from the connection itself
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz\')')
eq_(1, con.lastrowid)
def test_add_fetchone_fetchall_interface_to_results():
con = ThreadedConn(':memory:', True)
con.execute('create table foo(bar TEXT)')
con.execute('insert into foo(bar) values(\'baz1\')')
con.execute('insert into foo(bar) values(\'baz2\')')
result = con.execute('select * from foo')
ref = result[:]
eq_(ref, result.fetchall())
eq_(ref[0], result.fetchone())
eq_(ref[1], result.fetchone())
assert result.fetchone() is None
def test_non_ascii_dbname(tmpdir):
ThreadedConn(str(tmpdir.join('foo\u00e9.db')), True)
def test_non_ascii_dbdir(tmpdir):
# when this test fails, it doesn't fail gracefully, it brings the whole test suite with it.
dbdir = tmpdir.join('foo\u00e9')
os.mkdir(str(dbdir))
ThreadedConn(str(dbdir.join('foo.db')), True)