Commit e25fa5d9 authored by Julien Muchembled's avatar Julien Muchembled

storage: speed up transaction registration

parent c3d3dabd
...@@ -69,7 +69,7 @@ class ClientOperationHandler(EventHandler): ...@@ -69,7 +69,7 @@ class ClientOperationHandler(EventHandler):
self.app.tm.abort(ttid) self.app.tm.abort(ttid)
def askStoreTransaction(self, conn, ttid, *txn_info): def askStoreTransaction(self, conn, ttid, *txn_info):
self.app.tm.register(conn.getUUID(), ttid) self.app.tm.register(conn, ttid)
self.app.tm.vote(ttid, txn_info) self.app.tm.vote(ttid, txn_info)
conn.answer(Packets.AnswerStoreTransaction()) conn.answer(Packets.AnswerStoreTransaction())
...@@ -116,7 +116,7 @@ class ClientOperationHandler(EventHandler): ...@@ -116,7 +116,7 @@ class ClientOperationHandler(EventHandler):
if 1 < compression: if 1 < compression:
raise ProtocolError('invalid compression value') raise ProtocolError('invalid compression value')
# register the transaction # register the transaction
self.app.tm.register(conn.getUUID(), ttid) self.app.tm.register(conn, ttid)
if data or checksum != ZERO_HASH: if data or checksum != ZERO_HASH:
# TODO: return an appropriate error packet # TODO: return an appropriate error packet
assert makeChecksum(data) == checksum assert makeChecksum(data) == checksum
...@@ -191,7 +191,7 @@ class ClientOperationHandler(EventHandler): ...@@ -191,7 +191,7 @@ class ClientOperationHandler(EventHandler):
conn.answer(p) conn.answer(p)
def askCheckCurrentSerial(self, conn, ttid, serial, oid): def askCheckCurrentSerial(self, conn, ttid, serial, oid):
self.app.tm.register(conn.getUUID(), ttid) self.app.tm.register(conn, ttid)
self._askCheckCurrentSerial(conn, ttid, serial, oid, time.time()) self._askCheckCurrentSerial(conn, ttid, serial, oid, time.time())
def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time): def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
......
...@@ -130,16 +130,14 @@ class TransactionManager(object): ...@@ -130,16 +130,14 @@ class TransactionManager(object):
self._store_lock_dict = {} self._store_lock_dict = {}
self._load_lock_dict = {} self._load_lock_dict = {}
def register(self, uuid, ttid): def register(self, conn, ttid):
""" """
Register a transaction, it may be already registered Register a transaction, it may be already registered
""" """
if ttid not in self._transaction_dict:
uuid = conn.getUUID()
logging.debug('Register TXN %s for %s', dump(ttid), uuid_str(uuid)) logging.debug('Register TXN %s for %s', dump(ttid), uuid_str(uuid))
transaction = self._transaction_dict.get(ttid) self._transaction_dict[ttid] = Transaction(uuid, ttid)
if transaction is None:
transaction = Transaction(uuid, ttid)
self._transaction_dict[ttid] = transaction
return transaction
def getObjectFromTransaction(self, ttid, oid): def getObjectFromTransaction(self, ttid, oid):
""" """
......
...@@ -80,6 +80,9 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -80,6 +80,9 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager = TransactionManager(self.app) self.manager = TransactionManager(self.app)
self.ltid = None self.ltid = None
def register(self, uuid, ttid):
self.manager.register(Mock({'getUUID': uuid}), ttid)
def _getTransaction(self): def _getTransaction(self):
tid = self.getNextTID(self.ltid) tid = self.getNextTID(self.ltid)
oid_list = [self.getOID(1), self.getOID(2)] oid_list = [self.getOID(1), self.getOID(2)]
...@@ -124,7 +127,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -124,7 +127,7 @@ class TransactionManagerTests(NeoUnitTestBase):
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
serial1, object1 = self._getObject(1) serial1, object1 = self._getObject(1)
serial2, object2 = self._getObject(2) serial2, object2 = self._getObject(2)
self.manager.register(uuid, ttid) self.register(uuid, ttid)
self.manager.storeObject(ttid, serial1, *object1) self.manager.storeObject(ttid, serial1, *object1)
self.manager.storeObject(ttid, serial2, *object2) self.manager.storeObject(ttid, serial2, *object2)
self.assertRegistered(ttid) self.assertRegistered(ttid)
...@@ -152,13 +155,13 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -152,13 +155,13 @@ class TransactionManagerTests(NeoUnitTestBase):
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1) serial, obj = self._getObject(1)
# first transaction lock the object # first transaction lock the object
self.manager.register(uuid, ttid1) self.register(uuid, ttid1)
self.assertRegistered(ttid1) self.assertRegistered(ttid1)
self._storeTransactionObjects(ttid1, txn1) self._storeTransactionObjects(ttid1, txn1)
self.manager.vote(ttid1, txn1) self.manager.vote(ttid1, txn1)
self.manager.lock(ttid1, tid1) self.manager.lock(ttid1, tid1)
# the second is delayed # the second is delayed
self.manager.register(uuid, ttid2) self.register(uuid, ttid2)
self.assertRegistered(ttid2) self.assertRegistered(ttid2)
self.assertRaises(DelayedError, self.manager.storeObject, self.assertRaises(DelayedError, self.manager.storeObject,
ttid2, serial, *obj) ttid2, serial, *obj)
...@@ -172,13 +175,13 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -172,13 +175,13 @@ class TransactionManagerTests(NeoUnitTestBase):
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
serial, obj = self._getObject(1) serial, obj = self._getObject(1)
# the (later) transaction lock (change) the object # the (later) transaction lock (change) the object
self.manager.register(uuid, ttid2) self.register(uuid, ttid2)
self.assertRegistered(ttid2) self.assertRegistered(ttid2)
self._storeTransactionObjects(ttid2, txn2) self._storeTransactionObjects(ttid2, txn2)
self.manager.vote(ttid2, txn2) self.manager.vote(ttid2, txn2)
self.manager.lock(ttid2, tid2) self.manager.lock(ttid2, tid2)
# the previous it's not using the latest version # the previous it's not using the latest version
self.manager.register(uuid, ttid1) self.register(uuid, ttid1)
self.assertRegistered(ttid1) self.assertRegistered(ttid1)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
ttid1, serial, *obj) ttid1, serial, *obj)
...@@ -191,7 +194,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -191,7 +194,7 @@ class TransactionManagerTests(NeoUnitTestBase):
next_serial = self.getNextTID(serial) next_serial = self.getNextTID(serial)
# try to store without the last revision # try to store without the last revision
self.app.dm = Mock({'getLastObjectTID': next_serial}) self.app.dm = Mock({'getLastObjectTID': next_serial})
self.manager.register(uuid, tid) self.register(uuid, tid)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
tid, serial, *obj) tid, serial, *obj)
...@@ -207,14 +210,14 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -207,14 +210,14 @@ class TransactionManagerTests(NeoUnitTestBase):
serial1, obj1 = self._getObject(1) serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2) serial2, obj2 = self._getObject(2)
# first transaction lock objects # first transaction lock objects
self.manager.register(uuid1, ttid1) self.register(uuid1, ttid1)
self.assertRegistered(ttid1) self.assertRegistered(ttid1)
self.manager.storeObject(ttid1, serial1, *obj1) self.manager.storeObject(ttid1, serial1, *obj1)
self.manager.storeObject(ttid1, serial1, *obj2) self.manager.storeObject(ttid1, serial1, *obj2)
self.manager.vote(ttid1, txn1) self.manager.vote(ttid1, txn1)
self.manager.lock(ttid1, tid1) self.manager.lock(ttid1, tid1)
# second transaction is delayed # second transaction is delayed
self.manager.register(uuid2, ttid2) self.register(uuid2, ttid2)
self.assertRegistered(ttid2) self.assertRegistered(ttid2)
self.assertRaises(DelayedError, self.manager.storeObject, self.assertRaises(DelayedError, self.manager.storeObject,
ttid2, serial1, *obj1) ttid2, serial1, *obj1)
...@@ -233,14 +236,14 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -233,14 +236,14 @@ class TransactionManagerTests(NeoUnitTestBase):
serial1, obj1 = self._getObject(1) serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2) serial2, obj2 = self._getObject(2)
# the second transaction lock objects # the second transaction lock objects
self.manager.register(uuid2, ttid2) self.register(uuid2, ttid2)
self.manager.storeObject(ttid2, serial1, *obj1) self.manager.storeObject(ttid2, serial1, *obj1)
self.manager.storeObject(ttid2, serial2, *obj2) self.manager.storeObject(ttid2, serial2, *obj2)
self.assertRegistered(ttid2) self.assertRegistered(ttid2)
self.manager.vote(ttid2, txn2) self.manager.vote(ttid2, txn2)
self.manager.lock(ttid2, tid2) self.manager.lock(ttid2, tid2)
# the first get a conflict # the first get a conflict
self.manager.register(uuid1, ttid1) self.register(uuid1, ttid1)
self.assertRegistered(ttid1) self.assertRegistered(ttid1)
self.assertRaises(ConflictError, self.manager.storeObject, self.assertRaises(ConflictError, self.manager.storeObject,
ttid1, serial1, *obj1) ttid1, serial1, *obj1)
...@@ -252,7 +255,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -252,7 +255,7 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid = self.getClientUUID() uuid = self.getClientUUID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
serial, obj = self._getObject(1) serial, obj = self._getObject(1)
self.manager.register(uuid, tid) self.register(uuid, tid)
self.manager.storeObject(tid, serial, *obj) self.manager.storeObject(tid, serial, *obj)
self.assertRegistered(tid) self.assertRegistered(tid)
self.manager.vote(tid, txn) self.manager.vote(tid, txn)
...@@ -267,7 +270,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -267,7 +270,7 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid = self.getClientUUID() uuid = self.getClientUUID()
ttid = self.getNextTID() ttid = self.getNextTID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
self.manager.register(uuid, ttid) self.register(uuid, ttid)
self._storeTransactionObjects(ttid, txn) self._storeTransactionObjects(ttid, txn)
self.manager.vote(ttid, txn) self.manager.vote(ttid, txn)
# lock transaction # lock transaction
...@@ -290,9 +293,9 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -290,9 +293,9 @@ class TransactionManagerTests(NeoUnitTestBase):
tid1, txn1 = self._getTransaction() tid1, txn1 = self._getTransaction()
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
tid3, txn3 = self._getTransaction() tid3, txn3 = self._getTransaction()
self.manager.register(uuid1, ttid1) self.register(uuid1, ttid1)
self.manager.register(uuid2, ttid2) self.register(uuid2, ttid2)
self.manager.register(uuid2, ttid3) self.register(uuid2, ttid3)
self.manager.vote(ttid1, txn1) self.manager.vote(ttid1, txn1)
# node 2 owns tid2 & tid3 and lock tid2 only # node 2 owns tid2 & tid3 and lock tid2 only
self._storeTransactionObjects(ttid2, txn2) self._storeTransactionObjects(ttid2, txn2)
...@@ -314,7 +317,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -314,7 +317,7 @@ class TransactionManagerTests(NeoUnitTestBase):
uuid = self.getClientUUID() uuid = self.getClientUUID()
tid, txn = self._getTransaction() tid, txn = self._getTransaction()
ttid = self.getNextTID() ttid = self.getNextTID()
self.manager.register(uuid, ttid) self.register(uuid, ttid)
self._storeTransactionObjects(ttid, txn) self._storeTransactionObjects(ttid, txn)
self.manager.vote(ttid, txn) self.manager.vote(ttid, txn)
self.manager.lock(ttid, tid) self.manager.lock(ttid, tid)
...@@ -332,7 +335,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -332,7 +335,7 @@ class TransactionManagerTests(NeoUnitTestBase):
tid2, txn2 = self._getTransaction() tid2, txn2 = self._getTransaction()
serial1, obj1 = self._getObject(1) serial1, obj1 = self._getObject(1)
serial2, obj2 = self._getObject(2) serial2, obj2 = self._getObject(2)
self.manager.register(uuid, tid1) self.register(uuid, tid1)
self.manager.storeObject(tid1, serial1, *obj1) self.manager.storeObject(tid1, serial1, *obj1)
self.assertEqual(self.manager.getObjectFromTransaction(tid2, obj1[0]), self.assertEqual(self.manager.getObjectFromTransaction(tid2, obj1[0]),
None) None)
...@@ -347,7 +350,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -347,7 +350,7 @@ class TransactionManagerTests(NeoUnitTestBase):
oid1 = obj1[0] oid1 = obj1[0]
tid1, txn1 = self._getTransaction() tid1, txn1 = self._getTransaction()
self.assertEqual(self.manager.getLockingTID(oid1), None) self.assertEqual(self.manager.getLockingTID(oid1), None)
self.manager.register(uuid, tid1) self.register(uuid, tid1)
self.manager.storeObject(tid1, serial1, *obj1) self.manager.storeObject(tid1, serial1, *obj1)
self.assertEqual(self.manager.getLockingTID(oid1), tid1) self.assertEqual(self.manager.getLockingTID(oid1), tid1)
...@@ -360,7 +363,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -360,7 +363,7 @@ class TransactionManagerTests(NeoUnitTestBase):
other_serial = self.getNextTID() other_serial = self.getNextTID()
new_serial = self.getNextTID() new_serial = self.getNextTID()
checksum = "2" * 20 checksum = "2" * 20
self.manager.register(uuid, locking_serial) self.register(uuid, locking_serial)
# Object not known, nothing happens # Object not known, nothing happens
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial, self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), None) oid), None)
...@@ -369,7 +372,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -369,7 +372,7 @@ class TransactionManagerTests(NeoUnitTestBase):
oid), None) oid), None)
self.manager.abort(locking_serial, even_if_locked=True) self.manager.abort(locking_serial, even_if_locked=True)
# Object known, but doesn't point at orig_serial, it is not updated # Object known, but doesn't point at orig_serial, it is not updated
self.manager.register(uuid, locking_serial) self.register(uuid, locking_serial)
self.manager.storeObject(locking_serial, ram_serial, oid, 0, "3" * 20, self.manager.storeObject(locking_serial, ram_serial, oid, 0, "3" * 20,
'bar', None) 'bar', None)
holdData = self.app.dm.mockGetNamedCalls('holdData') holdData = self.app.dm.mockGetNamedCalls('holdData')
...@@ -381,7 +384,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -381,7 +384,7 @@ class TransactionManagerTests(NeoUnitTestBase):
oid), orig_object) oid), orig_object)
self.manager.abort(locking_serial, even_if_locked=True) self.manager.abort(locking_serial, even_if_locked=True)
self.manager.register(uuid, locking_serial) self.register(uuid, locking_serial)
self.manager.storeObject(locking_serial, ram_serial, oid, None, None, self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
None, other_serial) None, other_serial)
orig_object = self.manager.getObjectFromTransaction(locking_serial, orig_object = self.manager.getObjectFromTransaction(locking_serial,
...@@ -391,7 +394,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -391,7 +394,7 @@ class TransactionManagerTests(NeoUnitTestBase):
oid), orig_object) oid), orig_object)
self.manager.abort(locking_serial, even_if_locked=True) self.manager.abort(locking_serial, even_if_locked=True)
# Object known and points at undone data it gets updated # Object known and points at undone data it gets updated
self.manager.register(uuid, locking_serial) self.register(uuid, locking_serial)
self.manager.storeObject(locking_serial, ram_serial, oid, None, None, self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
None, orig_serial) None, orig_serial)
self.manager.updateObjectDataForPack(oid, orig_serial, new_serial, self.manager.updateObjectDataForPack(oid, orig_serial, new_serial,
...@@ -400,7 +403,7 @@ class TransactionManagerTests(NeoUnitTestBase): ...@@ -400,7 +403,7 @@ class TransactionManagerTests(NeoUnitTestBase):
oid), (oid, None, new_serial)) oid), (oid, None, new_serial))
self.manager.abort(locking_serial, even_if_locked=True) self.manager.abort(locking_serial, even_if_locked=True)
self.manager.register(uuid, locking_serial) self.register(uuid, locking_serial)
self.manager.storeObject(locking_serial, ram_serial, oid, None, None, self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
None, orig_serial) None, orig_serial)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum) self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment