Commit 03e3386a authored by Aurel's avatar Aurel

implement more method and fix bugs


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@52 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 0db5b119
...@@ -30,13 +30,15 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -30,13 +30,15 @@ class NEOStorage(BaseStorage.BaseStorage,
self.app.close() self.app.close()
def cleanup(self): def cleanup(self):
self.app.cleanup() raise NotImplementedError
def lastSerial(self): def lastSerial(self):
self.app.lastSerial() # does not seem to be used
raise NotImplementedError
def lastTransaction(self): def lastTransaction(self):
self.app.lastTransaction() # does not seem to be used
raise NotImplementedError
def new_oid(self): def new_oid(self):
if self._is_read_only: if self._is_read_only:
...@@ -67,7 +69,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -67,7 +69,7 @@ class NEOStorage(BaseStorage.BaseStorage,
try: try:
self.app.store(oid, serial, data, version, transaction) self.app.store(oid, serial, data, version, transaction)
except NEOStorageConflictError: except NEOStorageConflictError:
new_data = self.app.tryToResolveConflict(oid, self.app.tid, new_data = self.tryToResolveConflict(oid, self.app.tid,
serial, data) serial, data)
if new_data is not None: if new_data is not None:
# try again after conflict resolution # try again after conflict resolution
...@@ -78,7 +80,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -78,7 +80,7 @@ class NEOStorage(BaseStorage.BaseStorage,
serial),data=data) serial),data=data)
def _clear_temp(self): def _clear_temp(self):
self.app._clear_temp() raise NotImplementedError
def getSerial(self, oid): def getSerial(self, oid):
try: try:
......
...@@ -97,11 +97,13 @@ class Application(object): ...@@ -97,11 +97,13 @@ class Application(object):
self.uuid = None self.uuid = None
self.mq_cache = MQ() self.mq_cache = MQ()
self.new_oid_list = [] # List of new oid for ZODB self.new_oid_list = [] # List of new oid for ZODB
self.txn_data_dict = {} # Data for each object used in current transaction
self.txn = None # The current transaction self.txn = None # The current transaction
self.tid = None # The current transaction id self.tid = None # The current transaction id
self.txn_finished = 0 # Flag to know when transaction finished on master self.txn_finished = 0 # Flag to know when transaction finished on master
self.txn_stored = 0 # Flag to knwo when transaction has well been stored self.txn_stored = 0 # Flag to knwo when transaction has well been stored
self.loaded_object = None # Current data of the object we are loading self.loaded_object = None # Current data of the object we are loading
self.history = None # History data for a given object
# object_stored is used to know if storage node # object_stored is used to know if storage node
# accepted the object or raised a conflict # accepted the object or raised a conflict
# 0 : no answer yet # 0 : no answer yet
...@@ -116,7 +118,7 @@ class Application(object): ...@@ -116,7 +118,7 @@ class Application(object):
# _cache_lock is used for the client cache # _cache_lock is used for the client cache
# _load_lock is acquire to protect self.loaded_object used in event # _load_lock is acquire to protect self.loaded_object used in event
# handler when retrieving object from storage node # handler when retrieving object from storage node
# _undo_log_lock is used when retrieving undo information # _info_lock is used when retrieving information for object or transaction
lock = Lock() lock = Lock()
self._oid_lock_acquire = lock.acquire self._oid_lock_acquire = lock.acquire
self._oid_lock_release = lock.release self._oid_lock_release = lock.release
...@@ -130,9 +132,8 @@ class Application(object): ...@@ -130,9 +132,8 @@ class Application(object):
self._load_lock_acquire = lock.acquire self._load_lock_acquire = lock.acquire
self._load_lock_release = lock.release self._load_lock_release = lock.release
lock = Lock() lock = Lock()
self._undo_log_lock_acquire = lock.acquire self._info_lock_acquire = lock.acquire
self._undo_log_lock_release = lock.release self._info_lock_release = lock.release
# XXX Generate an UUID for self. For now, just use a random string. # XXX Generate an UUID for self. For now, just use a random string.
# Avoid an invalid UUID. # Avoid an invalid UUID.
if self.uuid is None: if self.uuid is None:
...@@ -181,7 +182,6 @@ class Application(object): ...@@ -181,7 +182,6 @@ class Application(object):
if self.node_not_ready: if self.node_not_ready:
# must wait # must wait
return return
logging.debug('primary master node is %s' %(self.primary_master_node.server,)) logging.debug('primary master node is %s' %(self.primary_master_node.server,))
# Close connection if not already connected to primary master node # Close connection if not already connected to primary master node
if self.primary_master_node.server != defined_master_addr: if self.primary_master_node.server != defined_master_addr:
...@@ -215,9 +215,9 @@ class Application(object): ...@@ -215,9 +215,9 @@ class Application(object):
# from asking too many time new oid one by one # from asking too many time new oid one by one
# from master node # from master node
conn = self.master_conn conn = self.master_conn
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askNewOIDList(msg_id) p.askNewOIDs(msg_id)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
# Wait for answer # Wait for answer
...@@ -225,11 +225,25 @@ class Application(object): ...@@ -225,11 +225,25 @@ class Application(object):
self.em.poll(1) self.em.poll(1)
if len(self.new_oid_list) > 0: if len(self.new_oid_list) > 0:
break break
return sellf.new_oid_list.pop() return self.new_oid_list.pop()
finally: finally:
self._oid_lock_release() self._oid_lock_release()
def _load(self, oid, serial=""): def getSerial(self, oid):
# Try in cache first
self._cache_lock_acquire()
try:
if oid in self.cache:
return self.cache[oid][0]
finally:
self._cache_lock_release()
# history return serial, so use it
hist = self.history(oid, length=1, object_only=1)
if len(hist) == 0:
raise NEOStorageNotFoundError()
return hist[0][0]
def _load(self, oid, serial="", cache=0):
"""Internal method which manage load and loadSerial.""" """Internal method which manage load and loadSerial."""
partition_id = oid % self.num_paritions partition_id = oid % self.num_paritions
# Only used up to date node for retrieving object # Only used up to date node for retrieving object
...@@ -240,7 +254,7 @@ class Application(object): ...@@ -240,7 +254,7 @@ class Application(object):
# Store data on each node # Store data on each node
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askObjectByOID(msg_id, oid, serial) p.askObjectByOID(msg_id, oid, serial)
conn.addPacket(p) conn.addPacket(p)
...@@ -256,7 +270,7 @@ class Application(object): ...@@ -256,7 +270,7 @@ class Application(object):
# OID not found # OID not found
continue continue
# Copy object data here to release lock as soon as possible # Copy object data here to release lock as soon as possible
noid, serial, compressed, crc, data = self.loaded_object noid, serial, compression, checksum, data = self.loaded_object
finally: finally:
self._load_lock_release() self._load_lock_release()
# Check data here # Check data here
...@@ -267,51 +281,47 @@ class Application(object): ...@@ -267,51 +281,47 @@ class Application(object):
# Reacquire lock and try again # Reacquire lock and try again
self._load_lock_acquire() self._load_lock_acquire()
continue continue
elif compressed and crc != alder32(data): elif compression and checksum != alder32(data):
# Check crc if we use compression # Check checksum if we use compression
logging.error('wrong crc from node %s for oid %s' \ logging.error('wrong checksum from node %s for oid %s' \
%(storage_node.getServer(), oid)) %(storage_node.getServer(), oid))
# Reacquire lock and try again # Reacquire lock and try again
self._load_lock_acquire() self._load_lock_acquire()
continue continue
else: else:
break break
if data is None: if self.loaded_object == -1:
# We didn't got any object from storage node # We didn't got any object from storage node
raise NEOStorageNotFoundError() raise NEOStorageNotFoundError()
# Uncompress data # Uncompress data
if compressed: if compression:
data = decompressed(data) data = decompressed(data)
try: # Put in cache only when using load
# Put object into cache if cache:
self.cache_lock_acquire() self.cache_lock_acquire()
try:
self.cache[oid] = serial, data self.cache[oid] = serial, data
return loads(data), serial
finally: finally:
self.cache_lock_release() self.cache_lock_release()
return loads(data), serial
def load(self, oid, version=None): def load(self, oid, version=None):
"""Load an object for a given oid.""" """Load an object for a given oid."""
# First try from cache # First try from cache
self._cache_lock_acquire = lock.acquire self._cache_lock_acquire()
try: try:
if oid in self.cache: if oid in self.cache:
return loads(self.cache[oid][1]), self.cache[oid][0] return loads(self.cache[oid][1]), self.cache[oid][0]
finally: finally:
self._cache_lock_release = lock.release self._cache_lock_release()
# Otherwise get it from storage node # Otherwise get it from storage node
return self._load(oid) return self._load(oid, cache=1)
def loadSerial(self, oid, serial): def loadSerial(self, oid, serial):
"""Load an object for a given oid.""" """Load an object for a given oid and serial."""
# Do not try in cache as it managed only up-to-date object # Do not try in cache as it managed only up-to-date object
return self._load(oid, serial), None return self._load(oid, serial), None
def lastTransaction(self):
# does not seem to be used
return
def tpc_begin(self, transaction, tid=None, status=' '): def tpc_begin(self, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
# First get a transaction, only one is allowed at a time # First get a transaction, only one is allowed at a time
...@@ -321,12 +331,12 @@ class Application(object): ...@@ -321,12 +331,12 @@ class Application(object):
return return
self.txn = transaction self.txn = transaction
# Init list of oid used in this transaction # Init list of oid used in this transaction
self.txn_oid_list = [] self.txn_oid_list = {}
# Get a new transaction id if necessary # Get a new transaction id if necessary
if tid is None: if tid is None:
self.tid = None self.tid = None
conn = self.master_conn conn = self.master_conn
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askNewTID(msg_id) p.askNewTID(msg_id)
conn.addPacket(p) conn.addPacket(p)
...@@ -347,31 +357,39 @@ class Application(object): ...@@ -347,31 +357,39 @@ class Application(object):
partition_id = oid % self.num_paritions partition_id = oid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node # Store data on each node
ddata = dumps(data)
compressed_data = compress(ddata)
crc = alder32(compressed_data)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
# Compres data with zlib # Compres data with zlib
compressed_data = compress(dumps(data)) p.askStoreObject(msg_id, oid, serial, 1, crc, compressed_data)
crc = alder32(compressed_data)
p.askStoreObject(msg_id, oid, serial, 1, crc, compressed_data, self.tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
# Wait for answer # Wait for answer
self.object_stored = 0 self.object_stored = 0
while 1: while 1:
self.em.poll(1) self.em.poll(1)
if self.object_stored == 1: if self.object_stored != 0:
self.txn_oid_list.append(oid)
break break
elif self.object.stored == 2: if self.object_stored == -1:
# Conflict, removed oid from list if txn_data_dict.has_key(oid):
try: # One storage already accept the object, is it normal ??
self.txn_oid_list.remove(oid) # remove from dict and raise ConflictError, don't care of
except ValueError: # previous node which already store data as it would be resent
# Oid wasn't already stored in list # again if conflict is resolved or txn will be aborted
pass txn_dict.pop(oid)
raise NEOStorageConflictError() raise NEOStorageConflictError()
noid, nserial = self.object_stored
if noid != oid:
# Huh!!
raise NEOStorageError()
else:
# Store object in tmp cache
self.txn_data_dict[oid] = ddata
break
def tpc_vote(self, transaction): def tpc_vote(self, transaction):
"""Store current transaction.""" """Store current transaction."""
...@@ -379,13 +397,13 @@ class Application(object): ...@@ -379,13 +397,13 @@ class Application(object):
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
user = transaction.user user = transaction.user
desc = transaction.description desc = transaction.description
ext = transaction._extension # XXX Need a dump ? ext = dumps(transaction._extension)
partition_id = self.tid % self.num_paritions partition_id = self.tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
# Store data on each node # Store data on each node
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askStoreTransaction(msg_id, self.tid, user, desc, ext, oid_list) p.askStoreTransaction(msg_id, self.tid, user, desc, ext, oid_list)
conn.addPacket(p) conn.addPacket(p)
...@@ -414,8 +432,8 @@ class Application(object): ...@@ -414,8 +432,8 @@ class Application(object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
conn = self.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.abortTransaction(msg_id, self.tid) p.abortTransaction(msg_id, self.tid)
conn.addPacket(p) conn.addPacket(p)
...@@ -425,8 +443,8 @@ class Application(object): ...@@ -425,8 +443,8 @@ class Application(object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
conn = self.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.abortTransaction(msg_id, self.tid) p.abortTransaction(msg_id, self.tid)
conn.addPacket(p) conn.addPacket(p)
...@@ -443,7 +461,7 @@ class Application(object): ...@@ -443,7 +461,7 @@ class Application(object):
f() f()
# Call finish on master # Call finish on master
conn = self.master_conn conn = self.master_conn
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.finishTransaction(msg_id, self.oid_list, self.tid) p.finishTransaction(msg_id, self.oid_list, self.tid)
conn.addPacket(p) conn.addPacket(p)
...@@ -454,31 +472,88 @@ class Application(object): ...@@ -454,31 +472,88 @@ class Application(object):
self.em.poll(1) self.em.poll(1)
if self.txn_finished: if self.txn_finished:
break break
# XXX must update cache here... # Update cache
self.cache_lock_acquire()
try:
for oid in self.txn_data_dict.keys:
ddata = self.txn_data_dict[oid]
# Now serial is same as tid
self.cache[oid] = self.tid, ddata
finally:
self.cache_lock_release()
# Release transaction # Release transaction
return self.tid return self.tid
finally: finally:
self._clear_txn() self._clear_txn()
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
raise NotImplementedError partition_id = oid % self.num_paritions
# Only used up to date node for retrieving object
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE]
self._load_before_lock_acquire()
data = None
# Store data on each node
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askObjectByTID(msg_id, oid, tid)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.loaded_object_by_tid = None
try:
while 1:
self.em.poll(1)
if self.loaded_object_by_tid is not None:
break
if self.loaded_object_by_tid == -1:
# OID not found
continue
# Copy object data here to release lock as soon as possible
noid, start, end, compression, checksum, data = self.loaded_object
finally:
self._load_before_lock_release()
# Check data here
if noid != oid:
# Oops, try with next node
logging.error('got wrong oid %s instead of %s from node %s' \
%(noid, oid, storage_node.getServer()))
# Reacquire lock and try again
self._load_before_lock_acquire()
continue
elif compression and checksum != alder32(data):
# Check checksum if we use compression
logging.error('wrong checksum from node %s for oid %s' \
%(storage_node.getServer(), oid))
# Reacquire lock and try again
self._load_before_lock_acquire()
continue
else:
break
if data is None:
# We didn't got any object from storage node
raise NEOStorageNotFoundError()
# Uncompress data
if compression:
data = decompressed(data)
return loads(data), start, end
def undo(self, transaction_id, txn): def undo(self, transaction_id, txn):
if transaction is not self.txn: if transaction is not self.txn:
raise POSException.StorageTransactionError(self, transaction) raise POSException.StorageTransactionError(self, transaction)
# First abort on primary master node raise NotImplementedError
# Second abort on storage node
# Then invalidate cache
return tid, oid_list
def undoLog(self, first, last, filter): def undoInfo(self, first, last, specification=None):
# First get list of transaction from master node # First get list of transaction from master node
self.undo_log_lock_acquire() self._info_lock_acquire()
try: try:
conn = self.master_conn conn = self.master_conn
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.getTIDList(msg_id, first, last) p.getTIDList(msg_id, first, last, specification)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
# Wait for answer # Wait for answer
...@@ -494,10 +569,10 @@ class Application(object): ...@@ -494,10 +569,10 @@ class Application(object):
partition_id = tid % self.num_paritions partition_id = tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.getTransactionInformation(msg_id, tid) p.askTransactionInformation(msg_id, tid)
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
# Wait for answer # Wait for answer
...@@ -509,7 +584,63 @@ class Application(object): ...@@ -509,7 +584,63 @@ class Application(object):
undo_txn_list.append(self.undo_txn_info) undo_txn_list.append(self.undo_txn_info)
return undo_txn_dict return undo_txn_dict
finally: finally:
self.undo_log_lock_release() self._info_lock_release()
def history(self, oid, version, length=1, filter=None, object_only=0):
self._info_lock_acquire()
history_list = []
try:
# Get history informations for object first
partition_id = oid % self.num_paritions
storage_node_list = [x for x in self.pt.getCellList(partition_id, True) \
if x.getState() == UP_TO_DATE_STATE]
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askObjectHistory(msg_id, oid, length)
conn.addPacket(p)
conn.expectMessage(msg_id)
# must be a list of dict
self.history = None
while 1:
self.em.poll(1)
if self.history is not None:
break
if self.history == -1:
# not found, go on with next node
continue
if self.history[0] != oid:
# got history for wrong oid
continue
if object_only:
return self.history[1]
# Now that we have object informations, get txn informations
for serial, size in self.hisory[1]:
partition_id = tid % self.num_paritions
storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID())
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, serial)
conn.addPacket(p)
conn.expectMessage(msg_id)
# Wait for answer
self.txn_info = None
while 1:
self.em.poll(1)
if self.txn_info is not None:
break
# create history dict
self.txn_info.remove('id')
self.txn_info['serial'] = serial
self.txn_info['version'] = None
self.txn_info['size'] = size
history_list.append(self.txn_info)
return history_list
finally:
self._info_lock_release()
def __del__(self): def __del__(self):
"""Clear all connection.""" """Clear all connection."""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment