Commit b10366be authored by Chris McDonough's avatar Chris McDonough

Rework multithread tests and re-enable testforeignadd tests.

parent 477604d0
...@@ -112,91 +112,64 @@ toc_name = 'temp_transient_container' ...@@ -112,91 +112,64 @@ toc_name = 'temp_transient_container'
stuff = {} stuff = {}
def _getApp(): def _getDB():
db = stuff.get('db')
app = stuff.get('app', None) if not db:
if not app:
ds = DemoStorage(quota=(1<<20)) ds = DemoStorage(quota=(1<<20))
db = ZODB.DB(ds, pool_size=60) db = ZODB.DB(ds, pool_size=60)
conn = db.open() conn = db.open()
root = conn.root() root = conn.root()
app = Application() app = Application()
root['Application']= app root['Application']= app
_populate(app)
get_transaction().commit() get_transaction().commit()
stuff['app'] = app
stuff['conn'] = conn
stuff['db'] = db stuff['db'] = db
return app conn.close()
return db
def _openApp(): def _delDB():
conn = stuff['db'].open()
root = conn.root()
app = root['Application']
return conn, app
def _delApp():
get_transaction().abort() get_transaction().abort()
stuff['conn'].close()
del stuff['conn']
del stuff['app']
del stuff['db'] del stuff['db']
def f(sdo):
pass
class Foo(Acquisition.Implicit): pass class Foo(Acquisition.Implicit): pass
class TestBase(TestCase): def _populate(app):
def setUp(self): bidmgr = BrowserIdManager(idmgr_name)
self.app = makerequest.makerequest(_getApp()) tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
timeout = self.timeout = 1 toc = TransientObjectContainer(toc_name, title='Temporary '
'Transient Object Container', timeout_mins=20)
session_data_manager=SessionDataManager(id='session_data_manager',
path='/'+tf_name+'/'+toc_name, title='Session Data Manager')
# Try to work around some testrunner snafus try: app._delObject(idmgr_name)
if 1 and __name__ is not '__main__': except AttributeError: pass
bidmgr = BrowserIdManager(idmgr_name) try: app._delObject(tf_name)
except AttributeError: pass
tf = MountedTemporaryFolder(tf_name, title="Temporary Folder")
toc = TransientObjectContainer(toc_name, title='Temporary '
'Transient Object Container', timeout_mins=20)
session_data_manager=SessionDataManager(id='session_data_manager',
path='/'+tf_name+'/'+toc_name, title='Session Data Manager')
try: self.app._delObject(idmgr_name)
except AttributeError: pass
try: self.app._delObject(tf_name)
except AttributeError: pass
try: self.app._delObject('session_data_manager')
except AttributeError: pass
self.app._setObject(idmgr_name, bidmgr) try: app._delObject('session_data_manager')
self.app._setObject(tf_name, tf) except AttributeError: pass
get_transaction().commit() app._setObject(idmgr_name, bidmgr)
self.app.temp_folder._setObject(toc_name, toc) app._setObject('session_data_manager', session_data_manager)
self.app._setObject('session_data_manager', session_data_manager)
get_transaction().commit()
# leans on the fact that these things exist by app init app._setObject(tf_name, tf)
get_transaction().commit()
app.temp_folder._setObject(toc_name, toc)
get_transaction().commit()
## admin = self.app.acl_users.getUser('admin') class TestBase(TestCase):
## if admin is None: def setUp(self):
## raise "Need to define an 'admin' user before running these tests" db = _getDB()
## admin = admin.__of__(self.app.acl_users) conn = db.open()
## self.app.session_data_manager.changeOwnership(admin) root = conn.root()
self.app = makerequest.makerequest(root['Application'])
timeout = self.timeout = 1
def tearDown(self): def tearDown(self):
get_transaction().abort() _delDB()
#self.app._p_jar.close()
#self.app = None
_delApp()
del self.app del self.app
class TestSessionManager(TestBase): class TestSessionManager(TestBase):
...@@ -270,15 +243,14 @@ class TestSessionManager(TestBase): ...@@ -270,15 +243,14 @@ class TestSessionManager(TestBase):
sd.set('foo', 'bar') sd.set('foo', 'bar')
assert get_transaction().commit(1) == None assert get_transaction().commit(1) == None
# Why would this have failed?? Not sure what it was meant to test def testForeignObject(self):
#def testForeignObject(self): self.assertRaises(InvalidObjectReference, self._foreignAdd)
# self.assertRaises(InvalidObjectReference, self._foreignAdd)
#def _foreignAdd(self): def _foreignAdd(self):
# ob = self.app.session_data_manager ob = self.app.session_data_manager
# sd = self.app.session_data_manager.getSessionData() sd = self.app.session_data_manager.getSessionData()
# sd.set('foo', ob) sd.set('foo', ob)
# get_transaction().commit() get_transaction().commit()
def testAqWrappedObjectsFail(self): def testAqWrappedObjectsFail(self):
a = Foo() a = Foo()
...@@ -288,7 +260,7 @@ class TestSessionManager(TestBase): ...@@ -288,7 +260,7 @@ class TestSessionManager(TestBase):
sd.set('foo', aq_wrapped) sd.set('foo', aq_wrapped)
self.assertRaises(UnpickleableError, get_transaction().commit) self.assertRaises(UnpickleableError, get_transaction().commit)
class TestMultiThread(TestBase): class TestMultiThread(TestCase):
def testNonOverlappingSids(self): def testNonOverlappingSids(self):
readers = [] readers = []
writers = [] writers = []
...@@ -298,20 +270,13 @@ class TestMultiThread(TestBase): ...@@ -298,20 +270,13 @@ class TestMultiThread(TestBase):
writeout = [] writeout = []
numreaders = 20 numreaders = 20
numwriters = 5 numwriters = 5
#rlock = threading.Lock()
rlock = DumboLock()
sdm_name = 'session_data_manager' sdm_name = 'session_data_manager'
db = _getDB()
for i in range(numreaders): for i in range(numreaders):
mgr = getattr(self.app, idmgr_name) thread = ReaderThread(db, readiters, sdm_name)
sid = mgr._getNewToken()
app = aq_base(self.app)
thread = ReaderThread(sid, app, readiters, sdm_name, rlock)
readers.append(thread) readers.append(thread)
for i in range(numwriters): for i in range(numwriters):
mgr = getattr(self.app, idmgr_name) thread = WriterThread(db, writeiters, sdm_name)
sid = mgr._getNewToken()
app = aq_base(self.app)
thread = WriterThread(sid, app, writeiters, sdm_name, rlock)
writers.append(thread) writers.append(thread)
for thread in readers: for thread in readers:
thread.start() thread.start()
...@@ -334,18 +299,13 @@ class TestMultiThread(TestBase): ...@@ -334,18 +299,13 @@ class TestMultiThread(TestBase):
writeout = [] writeout = []
numreaders = 20 numreaders = 20
numwriters = 5 numwriters = 5
#rlock = threading.Lock()
rlock = DumboLock()
sdm_name = 'session_data_manager' sdm_name = 'session_data_manager'
mgr = getattr(self.app, idmgr_name) db = _getDB()
sid = mgr._getNewToken()
for i in range(numreaders): for i in range(numreaders):
app = aq_base(self.app) thread = ReaderThread(db, readiters, sdm_name)
thread = ReaderThread(sid, app, readiters, sdm_name, rlock)
readers.append(thread) readers.append(thread)
for i in range(numwriters): for i in range(numwriters):
app = aq_base(self.app) thread = WriterThread(db, writeiters, sdm_name)
thread = WriterThread(sid, app, writeiters, sdm_name, rlock)
writers.append(thread) writers.append(thread)
for thread in readers: for thread in readers:
thread.start() thread.start()
...@@ -359,134 +319,60 @@ class TestMultiThread(TestBase): ...@@ -359,134 +319,60 @@ class TestMultiThread(TestBase):
for thread in readers: for thread in readers:
assert thread.out == [], thread.out assert thread.out == [], thread.out
class ReaderThread(threading.Thread): class BaseReaderWriter(threading.Thread):
def __init__(self, sid, app, iters, sdm_name, rlock): def __init__(self, db, iters, sdm_name):
self.sid = sid self.conn = db.open()
self.conn, self.app = _openApp() self.app = self.conn.root()['Application']
self.app = makerequest.makerequest(self.app)
token = self.app.browser_id_manager._getNewToken()
self.app.REQUEST.session_token_ = token
self.iters = iters self.iters = iters
self.sdm_name = sdm_name self.sdm_name = sdm_name
self.out = [] self.out = []
self.rlock = rlock
#print "Reader SID %s" % sid
threading.Thread.__init__(self) threading.Thread.__init__(self)
def run1(self):
try:
self.rlock.acquire("Reader 1")
self.app = self.conn.root()['Application']
self.app = makerequest.makerequest(self.app)
self.app.REQUEST.session_token_ = self.sid
session_data_manager = getattr(self.app, self.sdm_name)
data = session_data_manager.getSessionData(create=1)
t = time.time()
data[t] = 1
get_transaction().commit()
self.rlock.release("Reader 1")
for i in range(self.iters):
self.rlock.acquire("Reader 2")
try:
data = session_data_manager.getSessionData()
except KeyError: # Ugh
raise ConflictError
if not data.has_key(t): self.out.append(1)
self.rlock.release("Reader 2")
time.sleep(whrandom.choice(range(3)))
self.rlock.acquire("Reader 3")
get_transaction().commit()
self.rlock.release("Reader 3")
finally:
self.rlock.release("Reader catchall")
try:
self.conn.close()
except AttributeError: pass # ugh
def run(self): def run(self):
i = 0 i = 0
while 1:
try:
self.run1()
return
except ConflictError:
i = i + 1
#print "conflict %d" % i
if i > 3: raise
pass
class WriterThread(threading.Thread):
def __init__(self, sid, app, iters, sdm_name, rlock):
self.sid = sid
self.conn, self.app = _openApp()
self.iters = iters
self.sdm_name = sdm_name
self.rlock = rlock
#print "Writer SID %s" % sid
threading.Thread.__init__(self)
def run1(self):
try: try:
self.rlock.acquire("Writer 1") while 1:
self.app = self.conn.root()['Application']
self.app = makerequest.makerequest(self.app)
self.app.REQUEST.session_token_ = self.sid
session_data_manager = getattr(self.app, self.sdm_name)
self.rlock.release("Writer 1")
for i in range(self.iters):
self.rlock.acquire("Writer 2")
try: try:
data = session_data_manager.getSessionData() self.run1()
except KeyError: # Ugh return
raise ConflictError except ConflictError:
data[time.time()] = 1 i = i + 1
n = whrandom.choice(range(8)) print "conflict %d" % i
self.rlock.release("Writer 2") if i > 3: raise
time.sleep(n)
self.rlock.acquire("Writer 3")
if n % 2 == 0:
get_transaction().commit()
else:
get_transaction().abort()
self.rlock.release("Writer 3")
finally: finally:
self.rlock.release("Writer Catchall") self.conn.close()
try: del self.app
self.conn.close()
except AttributeError: pass # ugh class ReaderThread(BaseReaderWriter):
def run1(self):
def run(self): session_data_manager = getattr(self.app, self.sdm_name)
data = session_data_manager.getSessionData(create=1)
i = 0 t = time.time()
data[t] = 1
get_transaction().commit()
for i in range(self.iters):
data = session_data_manager.getSessionData()
if not data.has_key(t):
self.out.append(1)
time.sleep(whrandom.choice(range(3)))
get_transaction().commit()
while 1: class WriterThread(BaseReaderWriter):
try: def run1(self):
self.run1() session_data_manager = getattr(self.app, self.sdm_name)
return for i in range(self.iters):
except ConflictError: data = session_data_manager.getSessionData()
i = i + 1 data[time.time()] = 1
#print "conflict %d" % i n = whrandom.choice(range(3))
if i > 3: raise time.sleep(n)
pass if n % 2 == 0:
get_transaction().commit()
else:
get_transaction().abort()
class DumboLock:
def __init__(self):
self.lock = threading.Lock()
self._locked_ = 0
def acquire(self, msg):
#print "Acquiring lock %s" % msg
#self.lock.acquire()
#if self._locked_ == 1:
# print "already locked?!"
self._locked_ = 1
def release(self, msg):
#print "Releasing lock %s" % msg
#if self._locked_ == 0:
# print "already released?!"
# return
#self.lock.release()
self._locked_ = 0
def test_suite(): def test_suite():
test_datamgr = makeSuite(TestSessionManager, 'test') test_datamgr = makeSuite(TestSessionManager, 'test')
test_multithread = makeSuite(TestMultiThread, 'test') test_multithread = makeSuite(TestMultiThread, 'test')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment