Commit 1eed0239 authored by Julien Muchembled's avatar Julien Muchembled

Code clean up: PartitionTable

parent b61ee7f1
...@@ -15,12 +15,10 @@ ...@@ -15,12 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import math import math
from functools import wraps
from . import logging, protocol from . import logging, protocol
from .locking import Lock
from .protocol import uuid_str, CellStates from .protocol import uuid_str, CellStates
from .util import u64 from .util import u64
from .locking import RLock
class PartitionTableException(Exception): class PartitionTableException(Exception):
""" """
...@@ -163,7 +161,7 @@ class PartitionTable(object): ...@@ -163,7 +161,7 @@ class PartitionTable(object):
if cell.getUUID() == uuid: if cell.getUUID() == uuid:
return cell return cell
def setCell(self, offset, node, state): def _setCell(self, offset, node, state):
if state == CellStates.DISCARDED: if state == CellStates.DISCARDED:
return self.removeCell(offset, node) return self.removeCell(offset, node)
if node.isBroken() or node.isDown(): if node.isBroken() or node.isDown():
...@@ -182,7 +180,6 @@ class PartitionTable(object): ...@@ -182,7 +180,6 @@ class PartitionTable(object):
row.append(Cell(node, state)) row.append(Cell(node, state))
if state != CellStates.FEEDING: if state != CellStates.FEEDING:
self.count_dict[node] += 1 self.count_dict[node] += 1
return offset, node.getUUID(), state
def removeCell(self, offset, node): def removeCell(self, offset, node):
row = self.partition_list[offset] row = self.partition_list[offset]
...@@ -193,7 +190,6 @@ class PartitionTable(object): ...@@ -193,7 +190,6 @@ class PartitionTable(object):
if not cell.isFeeding(): if not cell.isFeeding():
self.count_dict[node] -= 1 self.count_dict[node] -= 1
break break
return (offset, node.getUUID(), CellStates.DISCARDED)
def load(self, ptid, row_list, nm): def load(self, ptid, row_list, nm):
""" """
...@@ -209,24 +205,21 @@ class PartitionTable(object): ...@@ -209,24 +205,21 @@ class PartitionTable(object):
node = nm.getByUUID(uuid) node = nm.getByUUID(uuid)
# the node must be known by the node manager # the node must be known by the node manager
assert node is not None assert node is not None
self.setCell(offset, node, state) self._setCell(offset, node, state)
logging.debug('partition table loaded (ptid=%s)', ptid) logging.debug('partition table loaded (ptid=%s)', ptid)
self.log() self.log()
def update(self, ptid, cell_list, nm): def update(self, ptid, cell_list, nm):
""" """
Update the partition with the cell list supplied. Ignore those changes Update the partition with the cell list supplied. If a node
if the partition table ID is not greater than the current one. If a node
is not known, it is created in the node manager and set as unavailable is not known, it is created in the node manager and set as unavailable
""" """
if ptid <= self._id: assert self._id < ptid, (self._id, ptid)
logging.warning('ignoring older partition changes')
return
self._id = ptid self._id = ptid
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
node = nm.getByUUID(uuid) node = nm.getByUUID(uuid)
assert node is not None, 'No node found for uuid ' + uuid_str(uuid) assert node is not None, 'No node found for uuid ' + uuid_str(uuid)
self.setCell(offset, node, state) self._setCell(offset, node, state)
logging.debug('partition table updated (ptid=%s)', ptid) logging.debug('partition table updated (ptid=%s)', ptid)
self.log() self.log()
...@@ -310,38 +303,22 @@ class PartitionTable(object): ...@@ -310,38 +303,22 @@ class PartitionTable(object):
getRow = self.getRow getRow = self.getRow
return [(x, getRow(x)) for x in xrange(self.np)] return [(x, getRow(x)) for x in xrange(self.np)]
def thread_safe(method):
def wrapper(self, *args, **kwargs):
self.lock()
try:
return method(self, *args, **kwargs)
finally:
self.unlock()
return wraps(method)(wrapper)
class MTPartitionTable(PartitionTable): class MTPartitionTable(PartitionTable):
""" Thread-safe aware version of the partition table, override only methods """ Thread-safe aware version of the partition table, override only methods
used in the client """ used in the client """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kw):
self._lock = RLock() self._lock = Lock()
PartitionTable.__init__(self, *args, **kwargs) PartitionTable.__init__(self, *args, **kw)
def lock(self):
self._lock.acquire()
def unlock(self):
self._lock.release()
@thread_safe def update(self, *args, **kw):
def setCell(self, *args, **kwargs): with self._lock:
return PartitionTable.setCell(self, *args, **kwargs) return PartitionTable.update(self, *args, **kw)
@thread_safe def clear(self, *args, **kw):
def clear(self, *args, **kwargs): with self._lock:
return PartitionTable.clear(self, *args, **kwargs) return PartitionTable.clear(self, *args, **kw)
@thread_safe def operational(self, *args, **kw):
def operational(self, *args, **kwargs): with self._lock:
return PartitionTable.operational(self, *args, **kwargs) return PartitionTable.operational(self, *args, **kw)
...@@ -147,7 +147,7 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -147,7 +147,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
if node is None: if node is None:
node = nm.createStorage(uuid=uuid) node = nm.createStorage(uuid=uuid)
new_nodes.append(node.asTuple()) new_nodes.append(node.asTuple())
self.setCell(offset, node, state) self._setCell(offset, node, state)
return new_nodes return new_nodes
def setUpToDate(self, node, offset): def setUpToDate(self, node, offset):
...@@ -163,13 +163,15 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -163,13 +163,15 @@ class PartitionTable(neo.lib.pt.PartitionTable):
raise neo.lib.pt.PartitionTableException('Non-assigned partition') raise neo.lib.pt.PartitionTableException('Non-assigned partition')
# update the partition table # update the partition table
cell_list = [self.setCell(offset, node, CellStates.UP_TO_DATE)] self._setCell(offset, node, CellStates.UP_TO_DATE)
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
# If the partition contains a feeding cell, drop it now. # If the partition contains a feeding cell, drop it now.
for feeding_cell in self.getCellList(offset): for feeding_cell in self.getCellList(offset):
if feeding_cell.isFeeding(): if feeding_cell.isFeeding():
cell_list.append(self.removeCell(offset, node = feeding_cell.getNode()
feeding_cell.getNode())) self.removeCell(offset, node)
cell_list.append((offset, node.getUUID(), CellStates.DISCARDED))
break break
return cell_list return cell_list
......
...@@ -133,19 +133,17 @@ class Application(BaseApplication): ...@@ -133,19 +133,17 @@ class Application(BaseApplication):
def loadPartitionTable(self): def loadPartitionTable(self):
"""Load a partition table from the database.""" """Load a partition table from the database."""
self.pt.clear()
ptid = self.dm.getPTID() ptid = self.dm.getPTID()
cell_list = self.dm.getPartitionTable() if ptid is None:
new_cell_list = [] return
for offset, uuid, state in cell_list: cell_list = []
# convert from int to Enum for offset, uuid, state in self.dm.getPartitionTable():
state = CellStates[state]
# register unknown nodes # register unknown nodes
if self.nm.getByUUID(uuid) is None: if self.nm.getByUUID(uuid) is None:
self.nm.createStorage(uuid=uuid) self.nm.createStorage(uuid=uuid)
new_cell_list.append((offset, uuid, state)) cell_list.append((offset, uuid, CellStates[state]))
# load the partition table in manager self.pt.update(ptid, cell_list, self.nm)
self.pt.clear()
self.pt.update(ptid, new_cell_list, self.nm)
def run(self): def run(self):
try: try:
......
...@@ -70,15 +70,15 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -70,15 +70,15 @@ class MasterPartitionTableTests(NeoUnitTestBase):
num_partitions = 5 num_partitions = 5
num_replicas = 3 num_replicas = 3
pt = PartitionTable(num_partitions, num_replicas) pt = PartitionTable(num_partitions, num_replicas)
pt.setCell(0, sn1, CellStates.OUT_OF_DATE) pt._setCell(0, sn1, CellStates.OUT_OF_DATE)
sn1.setState(NodeStates.RUNNING) sn1.setState(NodeStates.RUNNING)
pt.setCell(1, sn2, CellStates.UP_TO_DATE) pt._setCell(1, sn2, CellStates.UP_TO_DATE)
sn2.setState(NodeStates.TEMPORARILY_DOWN) sn2.setState(NodeStates.TEMPORARILY_DOWN)
pt.setCell(2, sn3, CellStates.UP_TO_DATE) pt._setCell(2, sn3, CellStates.UP_TO_DATE)
sn3.setState(NodeStates.DOWN) sn3.setState(NodeStates.DOWN)
pt.setCell(3, sn4, CellStates.UP_TO_DATE) pt._setCell(3, sn4, CellStates.UP_TO_DATE)
sn4.setState(NodeStates.BROKEN) sn4.setState(NodeStates.BROKEN)
pt.setCell(4, sn5, CellStates.UP_TO_DATE) pt._setCell(4, sn5, CellStates.UP_TO_DATE)
sn5.setState(NodeStates.RUNNING) sn5.setState(NodeStates.RUNNING)
# outdate nodes # outdate nodes
cells_outdated = pt.outdate() cells_outdated = pt.outdate()
...@@ -118,12 +118,12 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -118,12 +118,12 @@ class MasterPartitionTableTests(NeoUnitTestBase):
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING) sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(3)] for i in xrange(3)]
pt = PartitionTable(3, 0) pt = PartitionTable(3, 0)
pt.setCell(0, sn[0], CellStates.OUT_OF_DATE) pt._setCell(0, sn[0], CellStates.OUT_OF_DATE)
pt.setCell(1, sn[1], CellStates.FEEDING) pt._setCell(1, sn[1], CellStates.FEEDING)
pt.setCell(1, sn[2], CellStates.OUT_OF_DATE) pt._setCell(1, sn[2], CellStates.OUT_OF_DATE)
pt.setCell(2, sn[0], CellStates.OUT_OF_DATE) pt._setCell(2, sn[0], CellStates.OUT_OF_DATE)
pt.setCell(2, sn[1], CellStates.FEEDING) pt._setCell(2, sn[1], CellStates.FEEDING)
pt.setCell(2, sn[2], CellStates.UP_TO_DATE) pt._setCell(2, sn[2], CellStates.UP_TO_DATE)
self.assertEqual(sorted(pt.dropNodeList(sn[:1], True)), [ self.assertEqual(sorted(pt.dropNodeList(sn[:1], True)), [
(0, 1, CellStates.DISCARDED), (0, 1, CellStates.DISCARDED),
...@@ -137,7 +137,7 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -137,7 +137,7 @@ class MasterPartitionTableTests(NeoUnitTestBase):
(2, 3, CellStates.DISCARDED)]) (2, 3, CellStates.DISCARDED)])
self.assertRaises(PartitionTableException, pt.dropNodeList, sn[1:2]) self.assertRaises(PartitionTableException, pt.dropNodeList, sn[1:2])
pt.setCell(1, sn[2], CellStates.UP_TO_DATE) pt._setCell(1, sn[2], CellStates.UP_TO_DATE)
self.assertEqual(sorted(pt.dropNodeList(sn[1:2])), [ self.assertEqual(sorted(pt.dropNodeList(sn[1:2])), [
(1, 2, CellStates.DISCARDED), (1, 2, CellStates.DISCARDED),
(2, 2, CellStates.DISCARDED)]) (2, 2, CellStates.DISCARDED)])
...@@ -233,24 +233,24 @@ class MasterPartitionTableTests(NeoUnitTestBase): ...@@ -233,24 +233,24 @@ class MasterPartitionTableTests(NeoUnitTestBase):
for i in xrange(5)] for i in xrange(5)]
pt = PartitionTable(5, 2) pt = PartitionTable(5, 2)
# part 0 # part 0
pt.setCell(0, sn[0], CellStates.DISCARDED) pt._setCell(0, sn[0], CellStates.DISCARDED)
pt.setCell(0, sn[1], CellStates.UP_TO_DATE) pt._setCell(0, sn[1], CellStates.UP_TO_DATE)
# part 1 # part 1
pt.setCell(1, sn[0], CellStates.FEEDING) pt._setCell(1, sn[0], CellStates.FEEDING)
pt.setCell(1, sn[1], CellStates.FEEDING) pt._setCell(1, sn[1], CellStates.FEEDING)
pt.setCell(1, sn[2], CellStates.OUT_OF_DATE) pt._setCell(1, sn[2], CellStates.OUT_OF_DATE)
# part 2 # part 2
pt.setCell(2, sn[0], CellStates.FEEDING) pt._setCell(2, sn[0], CellStates.FEEDING)
pt.setCell(2, sn[1], CellStates.UP_TO_DATE) pt._setCell(2, sn[1], CellStates.UP_TO_DATE)
pt.setCell(2, sn[2], CellStates.UP_TO_DATE) pt._setCell(2, sn[2], CellStates.UP_TO_DATE)
# part 3 # part 3
pt.setCell(3, sn[0], CellStates.UP_TO_DATE) pt._setCell(3, sn[0], CellStates.UP_TO_DATE)
pt.setCell(3, sn[1], CellStates.UP_TO_DATE) pt._setCell(3, sn[1], CellStates.UP_TO_DATE)
pt.setCell(3, sn[2], CellStates.UP_TO_DATE) pt._setCell(3, sn[2], CellStates.UP_TO_DATE)
pt.setCell(3, sn[3], CellStates.UP_TO_DATE) pt._setCell(3, sn[3], CellStates.UP_TO_DATE)
# part 4 # part 4
pt.setCell(4, sn[0], CellStates.UP_TO_DATE) pt._setCell(4, sn[0], CellStates.UP_TO_DATE)
pt.setCell(4, sn[4], CellStates.UP_TO_DATE) pt._setCell(4, sn[4], CellStates.UP_TO_DATE)
count_dict = defaultdict(int) count_dict = defaultdict(int)
change_list = self.tweak(pt) change_list = self.tweak(pt)
......
...@@ -62,8 +62,8 @@ class StorageAppTests(NeoUnitTestBase): ...@@ -62,8 +62,8 @@ class StorageAppTests(NeoUnitTestBase):
storage = self.app.nm.createStorage(uuid=storage_uuid) storage = self.app.nm.createStorage(uuid=storage_uuid)
client_uuid = self.getClientUUID() client_uuid = self.getClientUUID()
self.app.pt.setCell(0, master, CellStates.UP_TO_DATE) self.app.pt._setCell(0, master, CellStates.UP_TO_DATE)
self.app.pt.setCell(0, storage, CellStates.UP_TO_DATE) self.app.pt._setCell(0, storage, CellStates.UP_TO_DATE)
self.assertEqual(len(self.app.pt.getNodeSet()), 2) self.assertEqual(len(self.app.pt.getNodeSet()), 2)
self.assertFalse(self.app.pt.filled()) self.assertFalse(self.app.pt.filled())
for x in xrange(num_partitions): for x in xrange(num_partitions):
...@@ -79,8 +79,8 @@ class StorageAppTests(NeoUnitTestBase): ...@@ -79,8 +79,8 @@ class StorageAppTests(NeoUnitTestBase):
self.assertFalse(self.app.pt.hasOffset(x)) self.assertFalse(self.app.pt.hasOffset(x))
# add some node # add some node
self.app.pt.setCell(0, master, CellStates.UP_TO_DATE) self.app.pt._setCell(0, master, CellStates.UP_TO_DATE)
self.app.pt.setCell(0, storage, CellStates.UP_TO_DATE) self.app.pt._setCell(0, storage, CellStates.UP_TO_DATE)
self.assertEqual(len(self.app.pt.getNodeSet()), 2) self.assertEqual(len(self.app.pt.getNodeSet()), 2)
self.assertFalse(self.app.pt.filled()) self.assertFalse(self.app.pt.filled())
for x in xrange(num_partitions): for x in xrange(num_partitions):
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment