Commit 82c3dfdd authored by Joshua Woelfel's avatar Joshua Woelfel

Created unit tests for zconn_at

parent 65a9ba3e
from __future__ import print_function
from ZODB import DB
from ZODB.MappingStorage import MappingStorage
import transaction
import unittest
from persistent import Persistent
from wendelin.lib.zodb import zconn_at
from ZODB.utils import p64, u64
from ZODB.POSException import ConflictError
from golang import func, defer, chan, select, default
from golang import sync, context
from random import randint
import threading
CONNECTED = 1
DONE = 1
THREAD1 = 0
THREAD2 = 1
START = 0
END = 1
TID = 0
OBJ1_VAL = 0
OBJ2_VAL = 1
OBJECTS = 1
TRANSACTION_CONFLICT = None
# PInt is persistent integer.
class PInt(Persistent):
def __init__(self, i):
self.i = i
class Zconn_atTest(unittest.TestCase):
def setUp(self):
self.db = DB(MappingStorage())
transaction.begin()
connection = self.db.open()
root = connection.root()
root['obj1'] = PInt(0)
root['obj2'] = PInt(0)
transaction.commit()
connection.close()
def increaseByOne(self):
transaction.begin()
connection = self.db.open()
root = connection.root()
obj1 = root['obj1']
obj2 = root['obj2']
obj1.i += 1
obj2.i += 1
try:
transaction.commit()
except ConflictError:
transaction.abort()
connection.close()
# check initial db state id is correct
def test_init_tid(self):
storage = MappingStorage()
db = DB(storage)
transaction.begin()
connection = db.open()
actual = u64(zconn_at(connection))
expected = u64(db.lastTransaction())
transaction.abort()
connection.close()
self.assertEqual(actual, expected, 'Wrong tid on unchanged initial storage')
# check that db state tid is being updated after transaction commits
def test_updated_tid_after_commit(self):
transaction.begin()
connection = self.db.open()
obj1 = connection.root()['obj1']
obj2 = connection.root()['obj2']
obj1.i += 1
obj2.i += 1
tid_before_commit = u64(zconn_at(connection))
transaction.commit()
tid_after_commit = u64(zconn_at(connection))
connection.close()
self.assertNotEqual(tid_after_commit, tid_before_commit, 'tid did not change after commit')
expected = u64(self.db.lastTransaction())
actual = tid_after_commit
self.assertEqual(actual, expected, 'tid is not last transaction')
connection = self.db.open(None, p64(tid_after_commit))
obj1 = connection.root()['obj1']
obj2 = connection.root()['obj2']
self.assertEqual(obj1.i, 1, 'db returns unexpected obj1 at tid')
self.assertEqual(obj2.i, 1, 'db returns unexpected obj2 at tid')
# check that different connections at same db state report the same
# db state tid
@func
def test_same_tid_multi_thread(self):
tids = [None, None]
T1_is_connected = chan()
T2_is_connected = chan()
def T1(ctx, self):
transaction.begin()
connection = self.db.open()
T1_is_connected.send(CONNECTED)
tids[THREAD1] = u64(zconn_at(connection))
T2_is_connected.recv()
transaction.abort()
connection.close()
def T2(ctx, self):
T1_is_connected.recv()
transaction.begin()
connection = self.db.open()
T2_is_connected.send(CONNECTED)
tids[THREAD2] = u64(zconn_at(connection))
transaction.abort()
connection.close()
wg = sync.WorkGroup(context.background())
wg.go(T1, self)
wg.go(T2, self)
wg.wait()
self.assertIsNotNone(tids[THREAD1], 't1 tid not recorded')
self.assertEqual(tids[THREAD1], tids[THREAD2], 'threads accessing same db state report different connection tid')
# check that the db state tid of a connection is static while connection itself is not commiting changes
@func
def test_tid_remains_static(self):
tids = [None, None]
T1_is_connected = chan()
T2_is_done = chan()
def T1(ctx, self):
transaction.begin()
connection = self.db.open()
tids[START] = u64(zconn_at(connection))
T1_is_connected.send(CONNECTED)
T2_is_done.recv()
tids[END] = u64(zconn_at(connection))
transaction.abort()
connection.close()
def T2(ctx, self):
T1_is_connected.recv()
for i in range(0, 10):
self.increaseByOne()
T2_is_done.send(DONE)
wg = sync.WorkGroup(context.background())
wg.go(T1, self)
wg.go(T2, self)
wg.wait()
self.assertIsNotNone(tids[START], 'START tid not recorded')
expected = tids[START]
actual = tids[END]
self.assertEqual(actual, expected, 'tid changed after opening connection without commiting transaction')
# check for possible concurrency issues when opening connection during a commit that changes values of
# stored objects, should fail due to https://github.com/zopefoundation/ZODB/issues/290
@func
def test_open_conn_while_committing_val(self):
correct_states = {}
actual_states = []
def T1(ctx, self):
for i in range(0, 5000):
if ready(ctx.done()):
break
else:
increaseByOne(self, correct_states)
def T2(ctx, self):
for i in range(0, 5000):
if ready(ctx.done()):
break
else:
recordDbState(self, actual_states)
storeInitTid(self, correct_states)
wg = sync.WorkGroup(context.background())
wg.go(T1, self)
wg.go(T2, self)
wg.wait()
compareRecordedStates(self, correct_states, actual_states)
# check for possible concurrency issues when opening connection during a commit that adds/removes
# stored objects, not sure if failing due to https://github.com/zopefoundation/ZODB/issues/290
# or different issue
@func
def test_open_conn_while_committing_obj(self):
correct_states = {}
actual_states = []
def T1(ctx, self):
for i in range(0, 5000):
if ready(ctx.done()):
break
else:
removeOrAddObj(self, correct_states)
def T2(ctx, self):
for i in range(0, 5000):
if ready(ctx.done()):
break
else:
recordDbState(self, actual_states)
storeInitTid(self, correct_states)
wg = sync.WorkGroup(context.background())
wg.go(T1, self)
wg.go(T2, self)
wg.wait()
compareRecordedStates(self, correct_states, actual_states)
# Check for concurrency issues with simultaneous commits.
# On the assumption that following a transaction by connection A (and not
# the connection) zconn_at(A) should then return the tid of the db state with changes
# just made by A but not any following changes afterwards by other connections,
# it currently fails as the db state tid following successful transaction
# is not unique.
@func
def test_multi_commit(self):
correct_states = {}
actual_states = []
def T(ctx, self):
for i in range(0, 1000):
if ready(ctx.done()):
break
else:
new_state = increaseByOne(self, correct_states)
if not(new_state == TRANSACTION_CONFLICT):
validateState(self, correct_states, new_state)
wg = sync.WorkGroup(context.background())
wg.go(T, self)
wg.go(T, self)
wg.go(T, self)
wg.wait()
# Compares all observed tids and their object values recorded in actual_states
# and sees if they match with the recorded changes and their tids in correct_states
def compareRecordedStates(test, correct_states, actual_states):
for state in actual_states:
actual_tid = state[TID]
test.assertIn(actual_tid, correct_states, 'tid (%d) should not exist' % actual_tid)
actual_obj1_val = state[OBJECTS][OBJ1_VAL]
actual_obj2_val = state[OBJECTS][OBJ2_VAL]
expected_obj1_val = correct_states[actual_tid][OBJ1_VAL]
expected_obj2_val = correct_states[actual_tid][OBJ2_VAL]
test.assertEqual(actual_obj1_val, expected_obj1_val,
'at db state (%d) recorded obj1 was (%s), should have been (%s)'
% (actual_tid, actual_obj1_val, expected_obj1_val))
test.assertEqual(actual_obj2_val, expected_obj2_val,
'at db state (%d) recorded obj2 was (%s), should have been (%s)'
% (actual_tid, actual_obj2_val, expected_obj2_val))
# Connects with the db and records the tid of the current db state along with
# the value of obj1 and obj2 in actual_states. If obj1 or obj2 does not exist
# stores None instead
def recordDbState(test, actual_states):
transaction.begin()
connection = test.db.open()
db_state = u64(zconn_at(connection))
root = connection.root()
obj1_val = None
obj2_val = None
if 'obj1' in root:
obj1_val = root['obj1'].i
if 'obj2' in root:
obj2_val = root['obj2'].i
transaction.abort()
connection.close()
actual_states.append((db_state, (obj1_val, obj2_val)))
# If obj1/obj2 exist in db, removes them. Otherwise creates and
# initializes them with value of 0. Then commits changes and stores
# tid of change as well as updated object values, or None if
# objects were removed, in correct_states
def removeOrAddObj(test, correct_states):
transaction.begin()
connection = test.db.open()
root = connection.root()
obj1_val = None
obj2_val = None
if 'obj1' in root:
del root['obj1']
else:
root['obj1'] = PInt(0)
obj1_val = 0
if 'obj2' in root:
del root['obj2']
else:
root['obj2'] = PInt(0)
obj2_val = 0
try:
transaction.commit()
correct_states[u64(zconn_at(connection))] = (obj1_val, obj2_val)
except ConflictError:
transaction.abort()
transaction.commit()
connection.close()
# stores the current tid of db state along with object values in
# correct_states
def storeInitTid(test, correct_states):
transaction.begin()
connection = test.db.open()
root = connection.root()
obj1_val = root['obj1'].i
obj2_val = root['obj2'].i
correct_states[u64(test.db.lastTransaction())] = (obj1_val, obj2_val)
transaction.abort()
connection.close()
# Increases values of objects in db, then stores them with the tid
# of the changes made in correct_states
def increaseByOne(test, correct_states):
transaction.begin()
connection = test.db.open()
root = connection.root()
obj1 = root['obj1']
obj2 = root['obj2']
obj1._p_invalidate()
obj2._p_invalidate()
obj1.i += randint(1, 4)
obj2.i += randint(1, 4)
obj1_val = obj1.i
obj2_val = obj2.i
try:
transaction.commit()
curr_state = u64(zconn_at(connection))
if curr_state in correct_states:
connection.close()
rec_obj1 = correct_states[curr_state][OBJ1_VAL]
rec_obj2 = correct_states[curr_state][OBJ2_VAL]
test.fail('db state after successfull transaction is not unique. State: %d already recorded with obj1: %s, obj2: %s. This state: %d, obj1: %s, obj2: %s'
% (curr_state, rec_obj1, rec_obj2, curr_state, obj1_val, obj2_val))
else:
correct_states[curr_state] = (obj1_val, obj2_val)
except ConflictError:
transaction.abort()
curr_state = None
connection.close()
return curr_state
# validates that the db state at curr_state has the same values stored in
# current_states
def validateState(test, current_states, curr_state):
connection = test.db.open(None, p64(curr_state))
root = connection.root()
obj1 = root['obj1']
obj2 = root['obj2']
obj1._p_invalidate()
obj2._p_invalidate()
actual_obj1_val = obj1.i
actual_obj2_val = obj2.i
expected_obj1_val = current_states[curr_state][OBJ1_VAL]
expected_obj2_val = current_states[curr_state][OBJ2_VAL]
connection.close()
test.assertEqual(actual_obj1_val, expected_obj1_val,
'db state (%d) is invalid, obj1 had value (%s) but should be (%s)'
% (curr_state, actual_obj1_val, expected_obj1_val))
def ready(ch):
_, _rx = select(
default, # 0
ch.recv, # 1
)
if _ == 0:
return False
return True
suite = unittest.TestLoader().loadTestsFromTestCase(Zconn_atTest)
unittest.TextTestRunner(verbosity=2).run(suite)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment