Commit 6ed40f18 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Client write objects and transactions to nodes where state is up to date,

feeding or out of date, the last was missing. PartitionTable.getCellList() 
accept now two parameters defaulted to False, readable and writable that 
are not mutualy exclusive.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@417 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ef71d454
...@@ -331,7 +331,7 @@ class Application(object): ...@@ -331,7 +331,7 @@ class Application(object):
self.local_var.asked_object = None self.local_var.asked_object = None
while self.local_var.asked_object is None: while self.local_var.asked_object is None:
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, readable=True)
if len(cell_list) == 0: if len(cell_list) == 0:
sleep(1) sleep(1)
continue continue
...@@ -479,7 +479,7 @@ class Application(object): ...@@ -479,7 +479,7 @@ class Application(object):
dump(oid), dump(serial)) dump(oid), dump(serial))
# Find which storage node to use # Find which storage node to use
partition_id = u64(oid) % self.num_partitions partition_id = u64(oid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, writable=True)
if len(cell_list) == 0: if len(cell_list) == 0:
# FIXME must wait for cluster to be ready # FIXME must wait for cluster to be ready
raise NEOStorageError raise NEOStorageError
...@@ -533,7 +533,7 @@ class Application(object): ...@@ -533,7 +533,7 @@ class Application(object):
oid_list = self.txn_data_dict.keys() oid_list = self.txn_data_dict.keys()
# Store data on each node # Store data on each node
partition_id = u64(self.tid) % self.num_partitions partition_id = u64(self.tid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, writable=True)
for cell in cell_list: for cell in cell_list:
logging.info("voting object %s %s" %(cell.getServer(), cell.getState())) logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
conn = self.cp.getConnForNode(cell) conn = self.cp.getConnForNode(cell)
...@@ -574,11 +574,11 @@ class Application(object): ...@@ -574,11 +574,11 @@ class Application(object):
# select nodes where objects were stored # select nodes where objects were stored
for oid in self.txn_data_dict.iterkeys(): for oid in self.txn_data_dict.iterkeys():
partition_id = u64(oid) % self.num_partitions partition_id = u64(oid) % self.num_partitions
cell_set |= set(self.pt.getCellList(partition_id, True)) cell_set |= set(self.pt.getCellList(partition_id, writable=True))
# select nodes where transaction was stored # select nodes where transaction was stored
partition_id = u64(self.tid) % self.num_partitions partition_id = u64(self.tid) % self.num_partitions
cell_set |= set(self.pt.getCellList(partition_id, True)) cell_set |= set(self.pt.getCellList(partition_id, writable=True))
# cancel transaction one all those nodes # cancel transaction one all those nodes
for cell in cell_set: for cell in cell_set:
...@@ -650,7 +650,7 @@ class Application(object): ...@@ -650,7 +650,7 @@ class Application(object):
# First get transaction information from a storage node. # First get transaction information from a storage node.
partition_id = u64(transaction_id) % self.num_partitions partition_id = u64(transaction_id) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, writable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForNode(cell) conn = self.cp.getConnForNode(cell)
...@@ -759,7 +759,7 @@ class Application(object): ...@@ -759,7 +759,7 @@ class Application(object):
undo_info = [] undo_info = []
for tid in ordered_tids: for tid in ordered_tids:
partition_id = u64(tid) % self.num_partitions partition_id = u64(tid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, readable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
conn = self.cp.getConnForNode(storage_node) conn = self.cp.getConnForNode(storage_node)
...@@ -810,7 +810,7 @@ class Application(object): ...@@ -810,7 +810,7 @@ class Application(object):
def history(self, oid, version=None, length=1, filter=None, object_only=0): def history(self, oid, version=None, length=1, filter=None, object_only=0):
# Get history informations for object first # Get history informations for object first
partition_id = u64(oid) % self.num_partitions partition_id = u64(oid) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, readable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
...@@ -847,7 +847,7 @@ class Application(object): ...@@ -847,7 +847,7 @@ class Application(object):
history_list = [] history_list = []
for serial, size in self.local_var.history[1]: for serial, size in self.local_var.history[1]:
partition_id = u64(serial) % self.num_partitions partition_id = u64(serial) % self.num_partitions
cell_list = self.pt.getCellList(partition_id, True) cell_list = self.pt.getCellList(partition_id, readable=True)
shuffle(cell_list) shuffle(cell_list)
for cell in cell_list: for cell in cell_list:
......
...@@ -448,7 +448,7 @@ class Application(object): ...@@ -448,7 +448,7 @@ class Application(object):
# Determine to which nodes I should ask. # Determine to which nodes I should ask.
partition = self.getPartition(tid) partition = self.getPartition(tid)
transaction_uuid_list = [cell.getUUID() for cell \ transaction_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, True)] in self.pt.getCellList(partition, readable=True)]
if len(transaction_uuid_list) == 0: if len(transaction_uuid_list) == 0:
raise VerificationFailure raise VerificationFailure
uuid_set.update(transaction_uuid_list) uuid_set.update(transaction_uuid_list)
...@@ -484,7 +484,7 @@ class Application(object): ...@@ -484,7 +484,7 @@ class Application(object):
self.asking_uuid_dict.clear() self.asking_uuid_dict.clear()
partition = self.getPartition(oid) partition = self.getPartition(oid)
object_uuid_list = [cell.getUUID() for cell \ object_uuid_list = [cell.getUUID() for cell \
in self.pt.getCellList(partition, True)] in self.pt.getCellList(partition, readable=True)]
if len(object_uuid_list) == 0: if len(object_uuid_list) == 0:
raise VerificationFailure raise VerificationFailure
uuid_set.update(object_uuid_list) uuid_set.update(object_uuid_list)
......
...@@ -74,17 +74,21 @@ class PartitionTable(object): ...@@ -74,17 +74,21 @@ class PartitionTable(object):
node_list.append(node) node_list.append(node)
return node_list return node_list
def getCellList(self, offset, usable_only = False): def getCellList(self, offset, readable=False, writable=False):
if usable_only: # allow all cell states
state_set = set(VALID_CELL_STATE_LIST)
if readable or writable:
# except non readables
state_set.remove(DISCARDED_STATE)
if readable:
# except non writables
state_set.remove(OUT_OF_DATE_STATE)
allowed_states = tuple(state_set)
try: try:
return [cell for cell in self.partition_list[offset] \ return [cell for cell in self.partition_list[offset] \
if cell is not None and cell.getState() in (UP_TO_DATE_STATE, FEEDING_STATE)] if cell is not None and cell.getState() in allowed_states]
except (TypeError, KeyError): except (TypeError, KeyError):
return [] return []
try:
return self.partition_list[offset]
except KeyError:
return []
def make(self, node_list): def make(self, node_list):
"""Make a new partition table from scratch.""" """Make a new partition table from scratch."""
......
...@@ -353,7 +353,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -353,7 +353,7 @@ class OperationEventHandler(StorageEventHandler):
getCellList = app.pt.getCellList getCellList = app.pt.getCellList
partition_list = [] partition_list = []
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
for cell in getCellList(offset, True): for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid: if cell.getUUID() == app.uuid:
partition_list.append(offset) partition_list.append(offset)
break break
...@@ -481,7 +481,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -481,7 +481,7 @@ class OperationEventHandler(StorageEventHandler):
getCellList = app.pt.getCellList getCellList = app.pt.getCellList
partition_list = [] partition_list = []
for offset in xrange(app.num_partitions): for offset in xrange(app.num_partitions):
for cell in getCellList(offset, True): for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid: if cell.getUUID() == app.uuid:
partition_list.append(offset) partition_list.append(offset)
break break
......
...@@ -317,7 +317,7 @@ class Replicator(object): ...@@ -317,7 +317,7 @@ class Replicator(object):
app = self.app app = self.app
try: try:
cell_list = app.pt.getCellList(self.current_partition.getRID(), cell_list = app.pt.getCellList(self.current_partition.getRID(),
True) readable=True)
node_list = [cell.getNode() for cell in cell_list node_list = [cell.getNode() for cell in cell_list
if cell.getNodeState() == RUNNING_STATE] if cell.getNodeState() == RUNNING_STATE]
node = choice(node_list) node = choice(node_list)
......
...@@ -228,16 +228,34 @@ class testPartitionTable(unittest.TestCase): ...@@ -228,16 +228,34 @@ class testPartitionTable(unittest.TestCase):
self.assertEqual(len(pt.partition_list[0]), 3) self.assertEqual(len(pt.partition_list[0]), 3)
for x in xrange(num_partitions): for x in xrange(num_partitions):
if x == 0: if x == 0:
self.assertEqual(len(pt.getCellList(0, False)), 3) # all nodes
all_cell = pt.getCellList(0, False) all_cell = pt.getCellList(0)
all_nodes = [x.getNode() for x in all_cell] all_nodes = [x.getNode() for x in all_cell]
self.assertEqual(len(all_cell), 3)
self.failUnless(sn1 in all_nodes) self.failUnless(sn1 in all_nodes)
self.failUnless(sn2 in all_nodes) self.failUnless(sn2 in all_nodes)
self.failUnless(sn3 in all_nodes) self.failUnless(sn3 in all_nodes)
self.failUnless(sn4 not in all_nodes) self.failUnless(sn4 not in all_nodes)
self.assertEqual(len(pt.getCellList(0, True)), 2) # writable nodes
all_cell = pt.getCellList(0, True) all_cell = pt.getCellList(0, writable=True)
all_nodes = [x.getNode() for x in all_cell] all_nodes = [x.getNode() for x in all_cell]
self.assertEqual(len(all_cell), 3)
self.failUnless(sn1 in all_nodes)
self.failUnless(sn2 in all_nodes)
self.failUnless(sn3 in all_nodes)
self.failUnless(sn4 not in all_nodes)
# readable nodes
all_cell = pt.getCellList(0, readable=True)
all_nodes = [x.getNode() for x in all_cell]
self.assertEqual(len(all_cell), 2)
self.failUnless(sn1 in all_nodes)
self.failUnless(sn2 not in all_nodes)
self.failUnless(sn3 in all_nodes)
self.failUnless(sn4 not in all_nodes)
# writable & readable nodes
all_cell = pt.getCellList(0, readable=True, writable=True)
all_nodes = [x.getNode() for x in all_cell]
self.assertEqual(len(all_cell), 2)
self.failUnless(sn1 in all_nodes) self.failUnless(sn1 in all_nodes)
self.failUnless(sn2 not in all_nodes) self.failUnless(sn2 not in all_nodes)
self.failUnless(sn3 in all_nodes) self.failUnless(sn3 in all_nodes)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment