Commit 1c876254 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Don't process queued lock requests related to an aborted transaction.

When a delayed store is processed, don't take the lock if the transaction
is no more registered (may have been aborted due to a timeout with the
delayed store).
Change the TransactionManager API: add register() that must be called before
any storeObject/storeTransaction operation.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2159 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 25b66510
...@@ -43,16 +43,23 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -43,16 +43,23 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def askStoreTransaction(self, conn, tid, user, desc, def askStoreTransaction(self, conn, tid, user, desc,
ext, oid_list): ext, oid_list):
uuid = conn.getUUID() self.app.tm.register(conn.getUUID(), tid)
self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext, self.app.tm.storeTransaction(tid, oid_list, user, desc, ext,
False) False)
conn.answer(Packets.AnswerStoreTransaction(tid)) conn.answer(Packets.AnswerStoreTransaction(tid))
def _askStoreObject(self, conn, oid, serial, compression, checksum, data, def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
tid, request_time): tid, request_time):
uuid = conn.getUUID() if tid not in self.app.tm:
# transaction was aborted, cancel this event
logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(tid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try: try:
self.app.tm.storeObject(uuid, tid, serial, oid, compression, self.app.tm.storeObject(tid, serial, oid, compression,
checksum, data, None) checksum, data, None)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
...@@ -71,6 +78,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -71,6 +78,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def askStoreObject(self, conn, oid, serial, def askStoreObject(self, conn, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
# register the transaction
self.app.tm.register(conn.getUUID(), tid)
self._askStoreObject(conn, oid, serial, compression, checksum, data, self._askStoreObject(conn, oid, serial, compression, checksum, data,
tid, time.time()) tid, time.time())
......
...@@ -123,9 +123,9 @@ class TransactionManager(object): ...@@ -123,9 +123,9 @@ class TransactionManager(object):
""" """
return tid in self._transaction_dict return tid in self._transaction_dict
def _getTransaction(self, tid, uuid): def register(self, uuid, tid):
""" """
Get or create the transaction object for this tid Register a transaction, it may be already registered
""" """
transaction = self._transaction_dict.get(tid, None) transaction = self._transaction_dict.get(tid, None)
if transaction is None: if transaction is None:
...@@ -193,17 +193,18 @@ class TransactionManager(object): ...@@ -193,17 +193,18 @@ class TransactionManager(object):
self._loid = self._loid_seen self._loid = self._loid_seen
self._app.dm.setLastOID(self._loid) self._app.dm.setLastOID(self._loid)
def storeTransaction(self, uuid, tid, oid_list, user, desc, ext, packed): def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
""" """
Store transaction information received from client node Store transaction information received from client node
""" """
transaction = self._getTransaction(tid, uuid) assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
transaction.prepare(oid_list, user, desc, ext, packed) transaction.prepare(oid_list, user, desc, ext, packed)
def getLockingTID(self, oid): def getLockingTID(self, oid):
return self._store_lock_dict.get(oid) return self._store_lock_dict.get(oid)
def storeObject(self, uuid, tid, serial, oid, compression, checksum, data, def storeObject(self, tid, serial, oid, compression, checksum, data,
value_serial): value_serial):
""" """
Store an object received from client node Store an object received from client node
...@@ -235,7 +236,8 @@ class TransactionManager(object): ...@@ -235,7 +236,8 @@ class TransactionManager(object):
raise ConflictError(locking_tid) raise ConflictError(locking_tid)
# store object # store object
transaction = self._getTransaction(tid, uuid) assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
transaction.addObject(oid, compression, checksum, data, value_serial) transaction.addObject(oid, compression, checksum, data, value_serial)
# update loid # update loid
......
...@@ -44,7 +44,7 @@ class StorageClientHandlerTests(NeoTestBase): ...@@ -44,7 +44,7 @@ class StorageClientHandlerTests(NeoTestBase):
self.app.store_lock_dict = {} self.app.store_lock_dict = {}
self.app.load_lock_dict = {} self.app.load_lock_dict = {}
self.app.event_queue = deque() self.app.event_queue = deque()
self.app.tm = Mock() self.app.tm = Mock({'__contains__': True})
# handler # handler
self.operation = ClientOperationHandler(self.app) self.operation = ClientOperationHandler(self.app)
# set pmn # set pmn
...@@ -211,7 +211,7 @@ class StorageClientHandlerTests(NeoTestBase): ...@@ -211,7 +211,7 @@ class StorageClientHandlerTests(NeoTestBase):
oid, serial, comp, checksum, data = self._getObject() oid, serial, comp, checksum, data = self._getObject()
self.operation.askStoreObject(conn, oid, serial, comp, checksum, self.operation.askStoreObject(conn, oid, serial, comp, checksum,
data, tid) data, tid)
self._checkStoreObjectCalled(uuid, tid, serial, oid, comp, self._checkStoreObjectCalled(tid, serial, oid, comp,
checksum, data, None) checksum, data, None)
self.checkAnswerStoreObject(conn) self.checkAnswerStoreObject(conn)
......
...@@ -96,7 +96,7 @@ class TransactionManagerTests(NeoTestBase): ...@@ -96,7 +96,7 @@ class TransactionManagerTests(NeoTestBase):
def _storeTransactionObjects(self, tid, txn): def _storeTransactionObjects(self, tid, txn):
for i, oid in enumerate(txn[0]): for i, oid in enumerate(txn[0]):
self.manager.storeObject(self.getNewUUID(), tid, None, self.manager.storeObject(tid, None,
oid, 1, str(i), '0' + str(i), None) oid, 1, str(i), '0' + str(i), None)
def _getObject(self, value): def _getObject(self, value):
...@@ -124,9 +124,10 @@ class TransactionManagerTests(NeoTestBase): ...@@ -124,9 +124,10 @@ class TransactionManagerTests(NeoTestBase):
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
serial1, object1 = self._getObject(1) serial1, object1 = self._getObject(1)
serial2, object2 = self._getObject(2) serial2, object2 = self._getObject(2)
self.manager.storeTransaction(uuid, tid, *txn) self.manager.register(uuid,tid)
self.manager.storeObject(uuid, tid, serial1, *object1) self.manager.storeTransaction(tid, *txn)
self.manager.storeObject(uuid, tid, serial2, *object2) self.manager.storeObject(tid, serial1, *object1)
self.manager.storeObject(tid, serial2, *object2)
self.assertTrue(tid in self.manager) self.assertTrue(tid in self.manager)
self.manager.lock(tid, txn[0]) self.manager.lock(tid, txn[0])
self._checkTransactionStored(tid, [object1, object2], txn) self._checkTransactionStored(tid, [object1, object2], txn)
...@@ -141,15 +142,17 @@ class TransactionManagerTests(NeoTestBase): ...@@ -141,15 +142,17 @@ class TransactionManagerTests(NeoTestBase):
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1) serial, obj = self._getObject(1)
# first transaction lock the object # first transaction lock the object
self.manager.storeTransaction(uuid, tid1, *txn1) self.manager.register(uuid, tid1)
self.manager.storeTransaction(tid1, *txn1)
self.assertTrue(tid1 in self.manager) self.assertTrue(tid1 in self.manager)
self._storeTransactionObjects(tid1, txn1) self._storeTransactionObjects(tid1, txn1)
self.manager.lock(tid1, txn1[0]) self.manager.lock(tid1, txn1[0])
# the second is delayed # the second is delayed
self.manager.storeTransaction(uuid, tid2, *txn2) self.manager.register(uuid, tid2)
self.manager.storeTransaction(tid2, *txn2)
self.assertTrue(tid2 in self.manager) self.assertTrue(tid2 in self.manager)
self.assertRaises(DelayedError, self.manager.storeObject, self.assertRaises(DelayedError, self.manager.storeObject,
uuid, tid2, serial, *obj) tid2, serial, *obj)
def testUnresolvableConflict(self): def testUnresolvableConflict(self):
""" A newer transaction has already modified an object """ """ A newer transaction has already modified an object """
...@@ -158,16 +161,18 @@ class TransactionManagerTests(NeoTestBase): ...@@ -158,16 +161,18 @@ class TransactionManagerTests(NeoTestBase):
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1) serial, obj = self._getObject(1)
# the (later) transaction lock (change) the object # the (later) transaction lock (change) the object
self.manager.storeTransaction(uuid, tid2, *txn2) self.manager.register(uuid, tid2)
self.manager.storeObject(uuid, tid2, serial, *obj) self.manager.storeTransaction(tid2, *txn2)
self.manager.storeObject(tid2, serial, *obj)
self.assertTrue(tid2 in self.manager) self.assertTrue(tid2 in self.manager)
self._storeTransactionObjects(tid2, txn2) self._storeTransactionObjects(tid2, txn2)
self.manager.lock(tid2, txn2[0]) self.manager.lock(tid2, txn2[0])
# the previous it's not using the latest version # the previous it's not using the latest version
self.manager.storeTransaction(uuid, tid1, *txn1) self.manager.register(uuid, tid1)
self.manager.storeTransaction(tid1, *txn1)
self.assertTrue(tid1 in self.manager) self.assertTrue(tid1 in self.manager)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
uuid, tid1, serial, *obj) tid1, serial, *obj)
def testResolvableConflict(self): def testResolvableConflict(self):
""" Try to store an object with the lastest revision """ """ Try to store an object with the lastest revision """
...@@ -177,9 +182,10 @@ class TransactionManagerTests(NeoTestBase): ...@@ -177,9 +182,10 @@ class TransactionManagerTests(NeoTestBase):
next_serial = self.getNextTID(serial) next_serial = self.getNextTID(serial)
# try to store without the last revision # try to store without the last revision
self.app.dm = Mock({'getObjectHistory': [next_serial]}) self.app.dm = Mock({'getObjectHistory': [next_serial]})
self.manager.storeTransaction(uuid, tid, *txn) self.manager.register(uuid, tid)
self.manager.storeTransaction(tid, *txn)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
uuid, tid, serial, *obj) tid, serial, *obj)
def testLockDelayed(self): def testLockDelayed(self):
""" Check lock delaytion""" """ Check lock delaytion"""
...@@ -191,18 +197,20 @@ class TransactionManagerTests(NeoTestBase): ...@@ -191,18 +197,20 @@ class TransactionManagerTests(NeoTestBase):
serial1, obj1 = self._getObject(1) serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2) serial2, obj2 = self._getObject(2)
# first transaction lock objects # first transaction lock objects
self.manager.storeTransaction(uuid1, tid1, *txn1) self.manager.register(uuid1, tid1)
self.manager.storeTransaction(tid1, *txn1)
self.assertTrue(tid1 in self.manager) self.assertTrue(tid1 in self.manager)
self.manager.storeObject(uuid1, tid1, serial1, *obj1) self.manager.storeObject(tid1, serial1, *obj1)
self.manager.storeObject(uuid1, tid1, serial1, *obj2) self.manager.storeObject(tid1, serial1, *obj2)
self.manager.lock(tid1, txn1[0]) self.manager.lock(tid1, txn1[0])
# second transaction is delayed # second transaction is delayed
self.manager.storeTransaction(uuid2, tid2, *txn2) self.manager.register(uuid2, tid2)
self.manager.storeTransaction(tid2, *txn2)
self.assertTrue(tid2 in self.manager) self.assertTrue(tid2 in self.manager)
self.assertRaises(DelayedError, self.manager.storeObject, self.assertRaises(DelayedError, self.manager.storeObject,
uuid2, tid2, serial1, *obj1) tid2, serial1, *obj1)
self.assertRaises(DelayedError, self.manager.storeObject, self.assertRaises(DelayedError, self.manager.storeObject,
uuid2, tid2, serial2, *obj2) tid2, serial2, *obj2)
def testLockConflict(self): def testLockConflict(self):
""" Check lock conflict """ """ Check lock conflict """
...@@ -214,26 +222,29 @@ class TransactionManagerTests(NeoTestBase): ...@@ -214,26 +222,29 @@ class TransactionManagerTests(NeoTestBase):
serial1, obj1 = self._getObject(1) serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2) serial2, obj2 = self._getObject(2)
# the second transaction lock objects # the second transaction lock objects
self.manager.storeTransaction(uuid2, tid2, *txn2) self.manager.register(uuid2, tid2)
self.manager.storeObject(uuid2, tid2, serial1, *obj1) self.manager.storeTransaction(tid2, *txn2)
self.manager.storeObject(uuid2, tid2, serial2, *obj2) self.manager.storeObject(tid2, serial1, *obj1)
self.manager.storeObject(tid2, serial2, *obj2)
self.assertTrue(tid2 in self.manager) self.assertTrue(tid2 in self.manager)
self.manager.lock(tid2, txn1[0]) self.manager.lock(tid2, txn1[0])
# the first get a conflict # the first get a conflict
self.manager.storeTransaction(uuid1, tid1, *txn1) self.manager.register(uuid1, tid1)
self.manager.storeTransaction(tid1, *txn1)
self.assertTrue(tid1 in self.manager) self.assertTrue(tid1 in self.manager)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
uuid1, tid1, serial1, *obj1) tid1, serial1, *obj1)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
uuid1, tid1, serial2, *obj2) tid1, serial2, *obj2)
def testAbortUnlocked(self): def testAbortUnlocked(self):
""" Abort a non-locked transaction """ """ Abort a non-locked transaction """
uuid = self.getNewUUID() uuid = self.getNewUUID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
serial, obj = self._getObject(1) serial, obj = self._getObject(1)
self.manager.storeTransaction(uuid, tid, *txn) self.manager.register(uuid, tid)
self.manager.storeObject(uuid, tid, serial, *obj) self.manager.storeTransaction(tid, *txn)
self.manager.storeObject(tid, serial, *obj)
self.assertTrue(tid in self.manager) self.assertTrue(tid in self.manager)
# transaction is not locked # transaction is not locked
self.manager.abort(tid) self.manager.abort(tid)
...@@ -245,7 +256,8 @@ class TransactionManagerTests(NeoTestBase): ...@@ -245,7 +256,8 @@ class TransactionManagerTests(NeoTestBase):
""" Try to abort a locked transaction """ """ Try to abort a locked transaction """
uuid = self.getNewUUID() uuid = self.getNewUUID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
self.manager.storeTransaction(uuid, tid, *txn) self.manager.register(uuid, tid)
self.manager.storeTransaction(tid, *txn)
self._storeTransactionObjects(tid, txn) self._storeTransactionObjects(tid, txn)
# lock transaction # lock transaction
self.manager.lock(tid, txn[0]) self.manager.lock(tid, txn[0])
...@@ -264,10 +276,13 @@ class TransactionManagerTests(NeoTestBase): ...@@ -264,10 +276,13 @@ class TransactionManagerTests(NeoTestBase):
tid1, txn1 = self._getTransaction() tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
tid3, txn3 = self._getTransaction() tid3, txn3 = self._getTransaction()
self.manager.storeTransaction(uuid1, tid1, *txn1) self.manager.register(uuid1, tid1)
self.manager.register(uuid2, tid2)
self.manager.register(uuid2, tid3)
self.manager.storeTransaction(tid1, *txn1)
# node 2 owns tid2 & tid3 and lock tid2 only # node 2 owns tid2 & tid3 and lock tid2 only
self.manager.storeTransaction(uuid2, tid2, *txn2) self.manager.storeTransaction(tid2, *txn2)
self.manager.storeTransaction(uuid2, tid3, *txn3) self.manager.storeTransaction(tid3, *txn3)
self._storeTransactionObjects(tid2, txn2) self._storeTransactionObjects(tid2, txn2)
self.manager.lock(tid2, txn2[0]) self.manager.lock(tid2, txn2[0])
self.assertTrue(tid1 in self.manager) self.assertTrue(tid1 in self.manager)
...@@ -284,7 +299,8 @@ class TransactionManagerTests(NeoTestBase): ...@@ -284,7 +299,8 @@ class TransactionManagerTests(NeoTestBase):
""" Reset the manager """ """ Reset the manager """
uuid = self.getNewUUID() uuid = self.getNewUUID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
self.manager.storeTransaction(uuid, tid, *txn) self.manager.register(uuid, tid)
self.manager.storeTransaction(tid, *txn)
self._storeTransactionObjects(tid, txn) self._storeTransactionObjects(tid, txn)
self.manager.lock(tid, txn[0]) self.manager.lock(tid, txn[0])
self.assertTrue(tid in self.manager) self.assertTrue(tid in self.manager)
...@@ -299,7 +315,8 @@ class TransactionManagerTests(NeoTestBase): ...@@ -299,7 +315,8 @@ class TransactionManagerTests(NeoTestBase):
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1) serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2) serial2, obj2 = self._getObject(2)
self.manager.storeObject(uuid, tid1, serial1, *obj1) self.manager.register(uuid, tid1)
self.manager.storeObject(tid1, serial1, *obj1)
self.assertEqual(self.manager.getObjectFromTransaction(tid2, obj1[0]), self.assertEqual(self.manager.getObjectFromTransaction(tid2, obj1[0]),
None) None)
self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj2[0]), self.assertEqual(self.manager.getObjectFromTransaction(tid1, obj2[0]),
...@@ -313,7 +330,8 @@ class TransactionManagerTests(NeoTestBase): ...@@ -313,7 +330,8 @@ class TransactionManagerTests(NeoTestBase):
oid1 = obj1[0] oid1 = obj1[0]
tid1, txn1 = self._getTransaction() tid1, txn1 = self._getTransaction()
self.assertEqual(self.manager.getLockingTID(oid1), None) self.assertEqual(self.manager.getLockingTID(oid1), None)
self.manager.storeObject(uuid, tid1, serial1, *obj1) self.manager.register(uuid, tid1)
self.manager.storeObject(tid1, serial1, *obj1)
self.assertEqual(self.manager.getLockingTID(oid1), tid1) self.assertEqual(self.manager.getLockingTID(oid1), tid1)
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment