From 80f94b31e2cdef385804ac440d3778001f8c1f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9gory=20Wisniewski?= <gregory@nexedi.com> Date: Mon, 27 Jul 2009 09:43:14 +0000 Subject: [PATCH] Implement iterator() on the Storage. git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1014 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/client/Storage.py | 2 +- neo/client/app.py | 31 +++++++----- neo/client/iterator.py | 107 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 12 deletions(-) create mode 100644 neo/client/iterator.py diff --git a/neo/client/Storage.py b/neo/client/Storage.py index 85744a86..853eba78 100644 --- a/neo/client/Storage.py +++ b/neo/client/Storage.py @@ -125,7 +125,7 @@ class Storage(BaseStorage.BaseStorage, raise POSException.POSKeyError (oid, tid) def iterator(self, start=None, stop=None): - raise NotImplementedError + return self.app.iterator(start, stop) # undo def undo(self, transaction_id, txn): diff --git a/neo/client/app.py b/neo/client/app.py index 75d4012e..71b6e53d 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -1,3 +1,4 @@ +# # Copyright (C) 2006-2009 Nexedi SA # # This program is free software; you can redistribute it and/or @@ -22,26 +23,29 @@ from Queue import Queue, Empty from random import shuffle from time import sleep -from neo.client.mq import MQ -from neo.node import NodeManager, MasterNode, StorageNode -from neo.connection import MTClientConnection +from ZODB.POSException import UndoError, StorageTransactionError, ConflictError + from neo import protocol -from neo.client.handlers import storage, master -from neo.client.exception import NEOStorageError, NEOStorageConflictError, \ - NEOStorageNotFoundError -from neo.exception import NeoException +from neo.event import EventManager from neo.util import makeChecksum, dump +from neo.locking import RLock, Lock +from neo.connection import MTClientConnection +from neo.node import NodeManager, MasterNode, StorageNode from neo.connector import getConnectorHandler +from neo.client.exception import NEOStorageError, NEOStorageConflictError +from neo.client.exception import NEOStorageNotFoundError +from neo.exception import NeoException +from neo.client.handlers import storage, master from neo.client.dispatcher import Dispatcher from neo.client.poll import ThreadedPoll -from neo.event import EventManager -from neo.locking import RLock, Lock +from neo.client.iterator import Iterator +from neo.client.mq import MQ -from ZODB.POSException import UndoError, StorageTransactionError, ConflictError class ConnectionClosed(Exception): pass + class ConnectionPool(object): """This class manages a pool of connections to storage nodes.""" @@ -942,7 +946,9 @@ class Application(object): 'be found' % (tid, ) if filter is None or filter(self.local_var.txn_info): - self.local_var.txn_info.pop("oids") + # XXX: oids entry is not needed by undoLog but required for the + # iterator, this code should be splited then specialized + #self.local_var.txn_info.pop("oids") append(self.local_var.txn_info) if len(undo_info) >= last - first: break @@ -1018,6 +1024,9 @@ class Application(object): return history_list + def iterator(self, start=None, stop=None): + return Iterator(self, start, stop) + def __del__(self): """Clear all connection.""" # Due to bug in ZODB, close is not always called when shutting diff --git a/neo/client/iterator.py b/neo/client/iterator.py new file mode 100644 index 00000000..0d9e0e3c --- /dev/null +++ b/neo/client/iterator.py @@ -0,0 +1,107 @@ +# +# Copyright (C) 2006-2009 Nexedi SA +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from ZODB import BaseStorage +from neo import util import + + +class Record(BaseStorage.DataRecord): + """ TBaseStorageransaction record yielded by the Transaction object """ + + def __init__(self, oid, tid, version, data, prev): + self.oid = oid + self.tid = tid + self.version = version + self.data = data + self.data_txn = prev + + def __str__(self): + oid = util.u64(self.oid) + tid = util.u64(self.tid) + args = (oid, tid, len(self.data), self.data_txn) + return 'Record %s:%s: %s (%s)' % args + + +class Transaction(BaseStorage.TransactionRecord): + """ Transaction object yielded by the NEO iterator """ + + def __init__(self, app, tid, status, user, desc, ext, oid_list): + self.app = app + self.tid = tid + self.status = status + self.user = user + self.description = desc + self._extension = ext + self.oid_list = oid_list + self.history = [] + + def __iter__(self): + return self + + def next(self): + """ Iterate over the transaction records """ + app = self.app + if not self.oid_list: + # no more records for this transaction + raise StopIteration + oid = self.oid_list.pop() + # load an object + result = app._load(oid, serial=self.tid) + data, start_serial, end_serial = result + record = Record(oid, self.tid, '', data, end_serial) + return record + + def __str__(self): + tid = util.u64(self.tid) + args = (tid, self.user, self.status) + return 'Transaction #%s: %s %s' % args + + +class Iterator(object): + """ An iterator for the NEO storage """ + + def __init__(self, app, start, stop): + if start is not None or stop is not None: + raise NotImplementedError('partial scan not implemented yet') + self.app = app + self.txn_list = [] + self.index = 0 + + def __iter__(self): + return self + + def next(self): + """ Return an iterator for the next transaction""" + app = self.app + if not self.txn_list: + # ask some transactions + self.txn_list = app.undoLog(self.index, self.index + 100) + if not self.txn_list: + # scan finished + raise StopIteration + self.index += len(self.txn_list) + txn = self.txn_list.pop() + tid = txn['id'] + user = txn['user_name'] + desc = txn['description'] + oid_list = txn['oids'] + txn = Transaction(self.app, tid, ' ', user, desc, None, oid_list) + return txn + + def __str__(self): + return 'NEO transactions iteratpr' + -- 2.30.9