diff --git a/neo/storage/app.py b/neo/storage/app.py index 63b17e8e72962fe234867adebfdd84841c080f7b..20bc6ba3ec86d598b5a88e653f9b996344e36805 100644 --- a/neo/storage/app.py +++ b/neo/storage/app.py @@ -28,6 +28,7 @@ from neo.storage.handlers import identification, verification, initialization from neo.storage.handlers import master, hidden from neo.storage.replicator import Replicator from neo.storage.database import buildDatabaseManager +from neo.storage.transactions import TransactionManager from neo.connector import getConnectorHandler from neo.pt import PartitionTable from neo.util import dump @@ -46,6 +47,7 @@ class Application(object): # Internal attributes. self.em = EventManager() self.nm = NodeManager() + self.tm = TransactionManager(self) self.dm = buildDatabaseManager(config.getAdapter(), config.getDatabase()) # load master nodes @@ -67,9 +69,6 @@ class Application(object): self.master_node = None # operation related data - self.transaction_dict = {} - self.store_lock_dict = {} - self.load_lock_dict = {} self.event_queue = None self.operational = False @@ -242,19 +241,7 @@ class Application(object): # Forget all unfinished data. self.dm.dropUnfinishedData() - - # This is a mapping between transaction IDs and information on - # UUIDs of client nodes which issued transactions and objects - # which were stored. - self.transaction_dict = {} - - # This is a mapping between object IDs and transaction IDs. Used - # for locking objects against store operations. - self.store_lock_dict = {} - - # This is a mapping between object IDs and transactions IDs. Used - # for locking objects against load operations. - self.load_lock_dict = {} + self.tm.reset() # This is a queue of events used to delay operations due to locks. self.event_queue = deque() diff --git a/neo/storage/handlers/__init__.py b/neo/storage/handlers/__init__.py index feb4a5cb9bf786b635639909fc28f1eea0ecf178..cd3ca1d649e0738c5ad6e1744832a844ccbd0815 100644 --- a/neo/storage/handlers/__init__.py +++ b/neo/storage/handlers/__init__.py @@ -97,10 +97,9 @@ class BaseClientAndStorageOperationHandler(EventHandler): def askObject(self, conn, oid, serial, tid): app = self.app - if oid in app.load_lock_dict: + if self.app.tm.loadLocked(oid): # Delay the response. - app.queueEvent(self.askObject, conn, oid, - serial, tid) + app.queueEvent(self.askObject, conn, oid, serial, tid) return o = app.dm.getObject(oid, serial, tid) if o is not None: diff --git a/neo/storage/handlers/client.py b/neo/storage/handlers/client.py index b27c08475a162ab17e76387689a012bde9523cb0..32a86bdb27f66356e073a093caad547b38d783d9 100644 --- a/neo/storage/handlers/client.py +++ b/neo/storage/handlers/client.py @@ -15,159 +15,46 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -from neo import logging - -from neo import protocol from neo.protocol import Packets from neo.storage.handlers import BaseClientAndStorageOperationHandler -from neo.util import dump - - -class TransactionInformation(object): - """This class represents information on a transaction.""" - def __init__(self, uuid): - self._uuid = uuid - self._object_dict = {} - self._transaction = None - self._last_oid_changed = False - self._locked = False - - def isLocked(self): - return self._locked - - def setLocked(self): - self._locked = True - - def lastOIDLchange(self): - self._last_oid_changed = True - - def isLastOIDChanged(self): - return self._last_oid_changed - - def getUUID(self): - return self._uuid - - def addObject(self, oid, compression, checksum, data): - self._object_dict[oid] = (oid, compression, checksum, data) - - def addTransaction(self, oid_list, user, desc, ext): - self._transaction = (oid_list, user, desc, ext) - - def getObjectList(self): - return self._object_dict.values() - - def getTransaction(self): - return self._transaction - +from neo.storage.transactions import ConflictError, DelayedError class ClientOperationHandler(BaseClientAndStorageOperationHandler): - def dealWithClientFailure(self, uuid): - app = self.app - for tid, t in app.transaction_dict.items(): - if t.getUUID() == uuid: - if t.isLocked(): - logging.warning('Node lost while finishing transaction') - break - for o in t.getObjectList(): - oid = o[0] - # TODO: remove try..except: pass - # XXX: we release locks without checking if tid owns them - try: - del app.store_lock_dict[oid] - del app.load_lock_dict[oid] - except KeyError: - pass - del app.transaction_dict[tid] - # Now it may be possible to execute some events. - app.executeQueuedEvents() - def timeoutExpired(self, conn): - self.dealWithClientFailure(conn.getUUID()) + self.app.tm.abortFor(conn.getUUID()) BaseClientAndStorageOperationHandler.timeoutExpired(self, conn) def connectionClosed(self, conn): - self.dealWithClientFailure(conn.getUUID()) + self.app.tm.abortFor(conn.getUUID()) BaseClientAndStorageOperationHandler.connectionClosed(self, conn) def peerBroken(self, conn): - self.dealWithClientFailure(conn.getUUID()) + self.app.tm.abortFor(conn.getUUID()) BaseClientAndStorageOperationHandler.peerBroken(self, conn) def abortTransaction(self, conn, tid): - app = self.app - # TODO: remove try..except: pass - try: - t = app.transaction_dict[tid] - object_list = t.getObjectList() - for o in object_list: - oid = o[0] - # TODO: remove try..except: pass - # XXX: we release locks without checking if tid owns them - try: - del app.load_lock_dict[oid] - except KeyError: - pass - del app.store_lock_dict[oid] - - del app.transaction_dict[tid] - - # Now it may be possible to execute some events. - app.executeQueuedEvents() - except KeyError: - pass + self.app.tm.abort(tid) def askStoreTransaction(self, conn, tid, user, desc, ext, oid_list): uuid = conn.getUUID() - t = self.app.transaction_dict.get(tid, None) - if t is None: - t = TransactionInformation(uuid) - self.app.transaction_dict[tid] = t - if t.isLastOIDChanged(): - self.app.dm.setLastOID(self.app.loid) - t.addTransaction(oid_list, user, desc, ext) + self.app.tm.storeTransaction(uuid, tid, oid_list, user, desc, ext) conn.answer(Packets.AnswerStoreTransaction(tid)) def askStoreObject(self, conn, oid, serial, compression, checksum, data, tid): uuid = conn.getUUID() - # First, check for the locking state. - app = self.app - locking_tid = app.store_lock_dict.get(oid) - if locking_tid is not None: - if locking_tid < tid: - # Delay the response. - app.queueEvent(self.askStoreObject, conn, oid, serial, + try: + self.app.tm.storeObject(uuid, tid, serial, oid, compression, + checksum, data) + conn.answer(Packets.AnswerStoreObject(0, oid, serial)) + except ConflictError, err: + # resolvable or not + tid_or_serial = err.getTID() + conn.answer(Packets.AnswerStoreObject(1, oid, tid_or_serial)) + except DelayedError: + # locked by a previous transaction, retry later + self.app.queueEvent(self.askStoreObject, conn, oid, serial, compression, checksum, data, tid) - else: - # If a newer transaction already locks this object, - # do not try to resolve a conflict, so return immediately. - logging.info('unresolvable conflict in %s', dump(oid)) - conn.answer(Packets.AnswerStoreObject(1, oid, locking_tid)) - return - - # Next, check if this is generated from the latest revision. - history_list = app.dm.getObjectHistory(oid) - if history_list: - last_serial = history_list[0][0] - if last_serial != serial: - logging.info('resolvable conflict in %s', dump(oid)) - conn.answer(Packets.AnswerStoreObject(1, oid, last_serial)) - return - # Now store the object. - t = self.app.transaction_dict.get(tid, None) - if t is None: - t = TransactionInformation(uuid) - self.app.transaction_dict[tid] = t - t.addObject(oid, compression, checksum, data) - conn.answer(Packets.AnswerStoreObject(0, oid, serial)) - app.store_lock_dict[oid] = tid - - # check if a greater OID last the last generated was used - if oid != protocol.INVALID_OID and oid > self.app.loid: - args = dump(oid), dump(self.app.loid) - logging.warning('Greater OID used in StoreObject : %s > %s', *args) - self.app.loid = oid - t.lastOIDLchange() diff --git a/neo/storage/handlers/master.py b/neo/storage/handlers/master.py index ee1d4275ce4182ec1b5de51ce3e385253179460f..59f9c244ff106c669f82235435cb81acc6a96531 100644 --- a/neo/storage/handlers/master.py +++ b/neo/storage/handlers/master.py @@ -57,30 +57,13 @@ class MasterOperationHandler(BaseMasterHandler): app.replicator.addPartition(offset) def lockInformation(self, conn, tid): - t = self.app.transaction_dict.get(tid, None) - if t is None: + if not tid in self.app.tm: raise ProtocolError('Unknown transaction') - t.setLocked() - object_list = t.getObjectList() - for o in object_list: - self.app.load_lock_dict[o[0]] = tid - - self.app.dm.storeTransaction(tid, object_list, t.getTransaction()) + self.app.tm.lock(tid) conn.answer(Packets.AnswerInformationLocked(tid)) def notifyUnlockInformation(self, conn, tid): - t = self.app.transaction_dict.get(tid, None) - if t is None: + if not tid in self.app.tm: raise ProtocolError('Unknown transaction') - object_list = t.getObjectList() - for o in object_list: - oid = o[0] - del self.app.load_lock_dict[oid] - del self.app.store_lock_dict[oid] - - self.app.dm.finishTransaction(tid) - del self.app.transaction_dict[tid] - - # Now it may be possible to execute some events. - self.app.executeQueuedEvents() - + # TODO: send an answer + self.app.tm.unlock(tid) diff --git a/neo/storage/transactions.py b/neo/storage/transactions.py new file mode 100644 index 0000000000000000000000000000000000000000..904a9b0b309959d60cdde2b7b9b3ae391f7f6a69 --- /dev/null +++ b/neo/storage/transactions.py @@ -0,0 +1,231 @@ +# +# Copyright (C) 2010 Nexedi SA +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from neo import logging +from neo.util import dump + + +class ConflictError(Exception): + """ + Raised when a resolvable conflict occurs + Argument: tid of locking transaction or latest revision + """ + + def __init__(self, tid): + self._tid = tid + + def getTID(self): + return self._tid + + +class DelayedError(Exception): + """ + Raised when an object is locked by a previous transaction + """ + + +class Transaction(object): + """ + Container for a pending transaction + """ + + def __init__(self, uuid, tid): + self._uuid = uuid + self._tid = tid + self._object_dict = {} + self._transaction = None + self._locked = False + + def getTID(self): + return self._tid + + def getUUID(self): + return self._uuid + + def lock(self): + assert not self._locked + self._locked = True + + def isLocked(self): + return self._locked + + def prepare(self, oid_list, user, desc, ext): + """ + Set the transaction informations + """ + # assert self._transaction is not None + self._transaction = (oid_list, user, desc, ext) + + def addObject(self, oid, compression, checksum, data): + """ + Add an object to the transaction + """ + self._object_dict[oid] = (oid, compression, checksum, data) + + def getObjectList(self): + return self._object_dict.values() + + def getOIDList(self): + return self._object_dict.keys() + + def getTransactionInformations(self): + assert self._transaction is not None + return self._transaction + + +class TransactionManager(object): + """ + Manage pending transaction and locks + """ + + def __init__(self, app): + self._app = app + self._transaction_dict = {} + self._store_lock_dict = {} + self._load_lock_dict = {} + self._uuid_dict = {} + # TODO: replace app.loid with this one: + self._loid = None + + def __contains__(self, tid): + """ + Returns True if the TID is known by the manager + """ + return tid in self._transaction_dict + + def _getTransaction(self, tid, uuid): + """ + Get or create the transaction object for this tid + """ + transaction = self._transaction_dict.get(tid, None) + if transaction is None: + transaction = Transaction(uuid, tid) + self._uuid_dict.setdefault(uuid, set()).add(transaction) + self._transaction_dict[tid] = transaction + return transaction + + def reset(self): + """ + Reset the transaction manager + """ + self._transaction_dict.clear() + self._store_lock_dict.clear() + self._load_lock_dict.clear() + self._uuid_dict.clear() + + def lock(self, tid): + """ + Lock a transaction + """ + transaction = self._transaction_dict[tid] + # remember that the transaction has been locked + transaction.lock() + for oid in transaction.getOIDList(): + self._load_lock_dict[oid] = tid + object_list = transaction.getObjectList() + txn_info = transaction.getTransactionInformations() + # store data from memory to temporary table + self._app.dm.storeTransaction(tid, object_list, txn_info) + + def unlock(self, tid): + """ + Unlock transaction + """ + self._app.dm.finishTransaction(tid) + self.abort(tid, even_if_locked=True) + + # update loid if needed + if self._loid != self._app.loid: + args = dump(self._loid), dump(self._app.loid) + logging.warning('Greater OID used in StoreObject : %s > %s', *args) + self._app.loid = self._loid + self._app.dm.setLastOID(self._app.loid) + + def storeTransaction(self, uuid, tid, oid_list, user, desc, ext): + """ + Store transaction information received from client node + """ + transaction = self._getTransaction(tid, uuid) + transaction.prepare(oid_list, user, desc, ext) + + def storeObject(self, uuid, tid, serial, oid, compression, checksum, data): + """ + Store an object received from client node + """ + # check if the object if locked + locking_tid = self._store_lock_dict.get(oid, None) + if locking_tid is not None: + if locking_tid < tid: + # a previous transaction lock this object, retry later + raise DelayedError + # If a newer transaction already locks this object, + # do not try to resolve a conflict, so return immediately. + logging.info('unresolvable conflict in %s', dump(oid)) + raise ConflictError(locking_tid) + + # check if this is generated from the latest revision. + history_list = self._app.dm.getObjectHistory(oid) + if history_list and history_list[0][0] != serial: + logging.info('resolvable conflict in %s', dump(oid)) + raise ConflictError(history_list[0][0]) + + # store object + transaction = self._getTransaction(tid, uuid) + transaction.addObject(oid, compression, checksum, data) + self._store_lock_dict[oid] = tid + + # update loid + self._loid = max(oid, self._app.loid) + + def abort(self, tid, even_if_locked=True): + """ + Abort a transaction + """ + if tid not in self._transaction_dict: + # XXX: this happen sometimes, explain or fix + return + transaction = self._transaction_dict[tid] + # if the transaction is locked, ensure we can drop it + if not even_if_locked and transaction.isLocked(): + return + # unlock any object + for oid in transaction.getOIDList(): + try: + del self._load_lock_dict[oid] + except KeyError: + # XXX: explain why + pass + del self._store_lock_dict[oid] + # _uuid_dict entry will be deleted at node disconnection + self._uuid_dict[transaction.getUUID()].discard(transaction) + del self._transaction_dict[tid] + self._app.executeQueuedEvents() + + def abortFor(self, uuid): + """ + Abort any non-locked transaction of a node + """ + # abort any non-locked transaction of this node + for tid in [x.getTID() for x in self._uuid_dict.get(uuid, [])]: + self.abort(tid, even_if_locked=False) + # cleanup _uuid_dict if no transaction remains for this node + if not self._uuid_dict.get(uuid): + del self._uuid_dict[uuid] + + def loadLocked(self, oid): + return oid in self._load_lock_dict +