Commit eedbf224 authored by Bryton Lacquement's avatar Bryton Lacquement 🚪

fixup! trace: use temporary connections to the database

This fixes commit 575a5354.
parent 575a5354
from lib2to3.tests.test_fixers import FixerTestCase as lib2to3FixerTestCase from lib2to3.tests.test_fixers import FixerTestCase as lib2to3FixerTestCase
import sqlite3 import sqlite3
from my2to3.trace import conn, get_data, tracing_functions from my2to3.trace import connection, get_data, tracing_functions
class FixerTestCase(lib2to3FixerTestCase): class FixerTestCase(lib2to3FixerTestCase):
...@@ -10,7 +10,7 @@ class FixerTestCase(lib2to3FixerTestCase): ...@@ -10,7 +10,7 @@ class FixerTestCase(lib2to3FixerTestCase):
super(FixerTestCase, self).setUp(fix_list, fixer_pkg, options) super(FixerTestCase, self).setUp(fix_list, fixer_pkg, options)
# Clear the database # Clear the database
with conn: with connection() as conn:
for table in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall(): for table in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall():
conn.execute("DELETE FROM %s" % table) conn.execute("DELETE FROM %s" % table)
......
import __builtin__, imp, os, sqlite3, sys, types import __builtin__, imp, os, sqlite3, sys, types
from contextlib import contextmanager
from lib2to3.pgen2 import tokenize from lib2to3.pgen2 import tokenize
from lib2to3.refactor import get_fixers_from_package, RefactoringTool from lib2to3.refactor import get_fixers_from_package, RefactoringTool
database = "traces.db" database = "traces.db"
@contextmanager
def connection():
conn = sqlite3.connect(database)
try:
with conn:
yield conn
finally:
conn.close()
tracing_functions = [] tracing_functions = []
def create_table(table, *columns): def create_table(table, *columns):
conn = sqlite3.connect(database) with connection() as conn:
with conn:
v = ', '.join(columns) v = ', '.join(columns)
conn.execute( conn.execute(
"CREATE TABLE IF NOT EXISTS %s (%s, UNIQUE (%s))" % (table, v, v) "CREATE TABLE IF NOT EXISTS %s (%s, UNIQUE (%s))" % (table, v, v)
) )
conn.close()
def insert_unique(*values): def insert_unique(*values):
conn = sqlite3.connect(database) with connection() as conn:
with conn:
try: try:
conn.execute( conn.execute(
'INSERT INTO %s VALUES (%s)' % (table, ', '.join('?' * len(values))), 'INSERT INTO %s VALUES (%s)' % (table, ', '.join('?' * len(values))),
...@@ -29,7 +33,6 @@ def create_table(table, *columns): ...@@ -29,7 +33,6 @@ def create_table(table, *columns):
except sqlite3.IntegrityError as e: except sqlite3.IntegrityError as e:
if not str(e).startswith('UNIQUE constraint failed:'): if not str(e).startswith('UNIQUE constraint failed:'):
raise raise
conn.close()
return insert_unique return insert_unique
...@@ -46,10 +49,8 @@ def get_data(table, columns_to_select='*', conditions={}): ...@@ -46,10 +49,8 @@ def get_data(table, columns_to_select='*', conditions={}):
) )
if conditions: if conditions:
query += " WHERE " + ' AND '.join(k + " = :" + k for k in conditions.keys()) query += " WHERE " + ' AND '.join(k + " = :" + k for k in conditions.keys())
conn = sqlite3.connect(database) with connection() as conn:
with conn:
return conn.execute(query, conditions).fetchall() return conn.execute(query, conditions).fetchall()
conn.close()
def apply_fixers(string, name): def apply_fixers(string, name):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment