2019-09-10 00:54:28 +00:00
|
|
|
# Created By: Virgil Dupras
|
|
|
|
# Created On: 2007/05/19
|
|
|
|
# Copyright 2015 Hardcoded Software (http://www.hardcoded.net)
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
# This software is licensed under the "GPLv3" License as described in the "LICENSE" file,
|
|
|
|
# which should be included with this package. The terms are also available at
|
2019-09-10 00:54:28 +00:00
|
|
|
# http://www.gnu.org/licenses/gpl-3.0.html
|
|
|
|
|
|
|
|
import time
|
|
|
|
import threading
|
|
|
|
import os
|
|
|
|
import sqlite3 as sqlite
|
|
|
|
|
|
|
|
from pytest import raises
|
|
|
|
|
|
|
|
from ..testutil import eq_
|
|
|
|
from ..sqlite import ThreadedConn
|
|
|
|
|
|
|
|
# Threading is hard to test. In a lot of those tests, a failure means that the test run will
|
|
|
|
# hang forever. Well... I don't know a better alternative.
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
def test_can_access_from_multiple_threads():
|
|
|
|
def run():
|
2020-01-01 02:16:27 +00:00
|
|
|
con.execute("insert into foo(bar) values('baz')")
|
|
|
|
|
|
|
|
con = ThreadedConn(":memory:", True)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
2019-09-10 00:54:28 +00:00
|
|
|
t = threading.Thread(target=run)
|
|
|
|
t.start()
|
|
|
|
t.join()
|
2020-01-01 02:16:27 +00:00
|
|
|
result = con.execute("select * from foo")
|
2019-09-10 00:54:28 +00:00
|
|
|
eq_(1, len(result))
|
2020-01-01 02:16:27 +00:00
|
|
|
eq_("baz", result[0][0])
|
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
|
|
|
|
def test_exception_during_query():
|
2020-01-01 02:16:27 +00:00
|
|
|
con = ThreadedConn(":memory:", True)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
2019-09-10 00:54:28 +00:00
|
|
|
with raises(sqlite.OperationalError):
|
2020-01-01 02:16:27 +00:00
|
|
|
con.execute("select * from bleh")
|
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
|
|
|
|
def test_not_autocommit(tmpdir):
|
2020-01-01 02:16:27 +00:00
|
|
|
dbpath = str(tmpdir.join("foo.db"))
|
2019-09-10 00:54:28 +00:00
|
|
|
con = ThreadedConn(dbpath, False)
|
2020-01-01 02:16:27 +00:00
|
|
|
con.execute("create table foo(bar TEXT)")
|
|
|
|
con.execute("insert into foo(bar) values('baz')")
|
2019-09-10 00:54:28 +00:00
|
|
|
del con
|
2020-01-01 02:16:27 +00:00
|
|
|
# The data shouldn't have been inserted
|
2019-09-10 00:54:28 +00:00
|
|
|
con = ThreadedConn(dbpath, False)
|
2020-01-01 02:16:27 +00:00
|
|
|
result = con.execute("select * from foo")
|
2019-09-10 00:54:28 +00:00
|
|
|
eq_(0, len(result))
|
2020-01-01 02:16:27 +00:00
|
|
|
con.execute("insert into foo(bar) values('baz')")
|
2019-09-10 00:54:28 +00:00
|
|
|
con.commit()
|
|
|
|
del con
|
|
|
|
# Now the data should be there
|
|
|
|
con = ThreadedConn(dbpath, False)
|
2020-01-01 02:16:27 +00:00
|
|
|
result = con.execute("select * from foo")
|
2019-09-10 00:54:28 +00:00
|
|
|
eq_(1, len(result))
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
def test_rollback():
|
2020-01-01 02:16:27 +00:00
|
|
|
con = ThreadedConn(":memory:", False)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
|
|
|
con.execute("insert into foo(bar) values('baz')")
|
2019-09-10 00:54:28 +00:00
|
|
|
con.rollback()
|
2020-01-01 02:16:27 +00:00
|
|
|
result = con.execute("select * from foo")
|
2019-09-10 00:54:28 +00:00
|
|
|
eq_(0, len(result))
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
def test_query_palceholders():
|
2020-01-01 02:16:27 +00:00
|
|
|
con = ThreadedConn(":memory:", True)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
|
|
|
con.execute("insert into foo(bar) values(?)", ["baz"])
|
|
|
|
result = con.execute("select * from foo")
|
2019-09-10 00:54:28 +00:00
|
|
|
eq_(1, len(result))
|
2020-01-01 02:16:27 +00:00
|
|
|
eq_("baz", result[0][0])
|
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
|
|
|
|
def test_make_sure_theres_no_messup_between_queries():
|
|
|
|
def run(expected_rowid):
|
|
|
|
time.sleep(0.1)
|
2020-01-01 02:16:27 +00:00
|
|
|
result = con.execute("select rowid from foo where rowid = ?", [expected_rowid])
|
2019-09-10 00:54:28 +00:00
|
|
|
assert expected_rowid == result[0][0]
|
2020-01-01 02:16:27 +00:00
|
|
|
|
|
|
|
con = ThreadedConn(":memory:", True)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
2019-09-10 00:54:28 +00:00
|
|
|
for i in range(100):
|
2020-01-01 02:16:27 +00:00
|
|
|
con.execute("insert into foo(bar) values('baz')")
|
2019-09-10 00:54:28 +00:00
|
|
|
threads = []
|
|
|
|
for i in range(1, 101):
|
|
|
|
t = threading.Thread(target=run, args=(i,))
|
|
|
|
t.start
|
|
|
|
threads.append(t)
|
|
|
|
while threads:
|
|
|
|
time.sleep(0.1)
|
|
|
|
threads = [t for t in threads if t.isAlive()]
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
def test_query_after_close():
|
2020-01-01 02:16:27 +00:00
|
|
|
con = ThreadedConn(":memory:", True)
|
2019-09-10 00:54:28 +00:00
|
|
|
con.close()
|
2020-01-01 02:16:27 +00:00
|
|
|
con.execute("select 1")
|
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
|
|
|
|
def test_lastrowid():
|
|
|
|
# It's not possible to return a cursor because of the threading, but lastrowid should be
|
|
|
|
# fetchable from the connection itself
|
2020-01-01 02:16:27 +00:00
|
|
|
con = ThreadedConn(":memory:", True)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
|
|
|
con.execute("insert into foo(bar) values('baz')")
|
2019-09-10 00:54:28 +00:00
|
|
|
eq_(1, con.lastrowid)
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
def test_add_fetchone_fetchall_interface_to_results():
|
2020-01-01 02:16:27 +00:00
|
|
|
con = ThreadedConn(":memory:", True)
|
|
|
|
con.execute("create table foo(bar TEXT)")
|
|
|
|
con.execute("insert into foo(bar) values('baz1')")
|
|
|
|
con.execute("insert into foo(bar) values('baz2')")
|
|
|
|
result = con.execute("select * from foo")
|
2019-09-10 00:54:28 +00:00
|
|
|
ref = result[:]
|
|
|
|
eq_(ref, result.fetchall())
|
|
|
|
eq_(ref[0], result.fetchone())
|
|
|
|
eq_(ref[1], result.fetchone())
|
|
|
|
assert result.fetchone() is None
|
|
|
|
|
2020-01-01 02:16:27 +00:00
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
def test_non_ascii_dbname(tmpdir):
|
2020-01-01 02:16:27 +00:00
|
|
|
ThreadedConn(str(tmpdir.join("foo\u00e9.db")), True)
|
|
|
|
|
2019-09-10 00:54:28 +00:00
|
|
|
|
|
|
|
def test_non_ascii_dbdir(tmpdir):
|
|
|
|
# when this test fails, it doesn't fail gracefully, it brings the whole test suite with it.
|
2020-01-01 02:16:27 +00:00
|
|
|
dbdir = tmpdir.join("foo\u00e9")
|
2019-09-10 00:54:28 +00:00
|
|
|
os.mkdir(str(dbdir))
|
2020-01-01 02:16:27 +00:00
|
|
|
ThreadedConn(str(dbdir.join("foo.db")), True)
|