Commit 1f629dfc authored by Grégory Wisniewski's avatar Grégory Wisniewski

Move stored OIDs check to master side.

- The storages no more check the last OID during a store
- The storages inconditionnaly store the last OID notified by the master
- The master check during the if a greater oid was used by a client
- The master always notify the last OID when a pool is generated or if the
check above is True
- The master's transaction manager manager the last oid and oid generator

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2180 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5a5b0e5a
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
from neo import logging from neo import logging
import os, sys import os, sys
from time import time from time import time
from struct import pack, unpack
from neo import protocol from neo import protocol
from neo.protocol import UUID_NAMESPACES from neo.protocol import UUID_NAMESPACES
...@@ -84,9 +83,6 @@ class Application(object): ...@@ -84,9 +83,6 @@ class Application(object):
uuid = self.getNewUUID(NodeTypes.MASTER) uuid = self.getNewUUID(NodeTypes.MASTER)
self.uuid = uuid self.uuid = uuid
# The last OID.
self.loid = None
# election related data # election related data
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
...@@ -303,7 +299,8 @@ class Application(object): ...@@ -303,7 +299,8 @@ class Application(object):
" Outdate cell of non-working nodes and broadcast changes """ " Outdate cell of non-working nodes and broadcast changes """
self.broadcastPartitionChanges(self.pt.outdate()) self.broadcastPartitionChanges(self.pt.outdate())
def broadcastLastOID(self, oid): def broadcastLastOID(self):
oid = self.tm.getLastOID()
logging.debug('Broadcast last OID to storages : %s' % dump(oid)) logging.debug('Broadcast last OID to storages : %s' % dump(oid))
packet = Packets.NotifyLastOID(oid) packet = Packets.NotifyLastOID(oid)
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
...@@ -456,15 +453,6 @@ class Application(object): ...@@ -456,15 +453,6 @@ class Application(object):
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
self.cluster_state = state self.cluster_state = state
def getNewOIDList(self, num_oids):
if self.loid is None:
raise RuntimeError, 'I do not know the last OID'
oid = unpack('!Q', self.loid)[0] + 1
oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
self.loid = oid_list[-1]
self.broadcastLastOID(self.loid)
return oid_list
def getNewUUID(self, node_type): def getNewUUID(self, node_type):
# build an UUID # build an UUID
uuid = os.urandom(15) uuid = os.urandom(15)
......
...@@ -56,8 +56,8 @@ class ClientServiceHandler(MasterHandler): ...@@ -56,8 +56,8 @@ class ClientServiceHandler(MasterHandler):
conn.answer(Packets.AnswerBeginTransaction(tid)) conn.answer(Packets.AnswerBeginTransaction(tid))
def askNewOIDs(self, conn, num_oids): def askNewOIDs(self, conn, num_oids):
oid_list = self.app.getNewOIDList(num_oids) conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
conn.answer(Packets.AnswerNewOIDs(oid_list)) self.app.broadcastLastOID()
def askFinishTransaction(self, conn, tid, oid_list): def askFinishTransaction(self, conn, tid, oid_list):
app = self.app app = self.app
...@@ -78,6 +78,10 @@ class ClientServiceHandler(MasterHandler): ...@@ -78,6 +78,10 @@ class ClientServiceHandler(MasterHandler):
uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part) uuid_set.update((cell.getUUID() for cell in app.pt.getCellList(part)
if cell.getNodeState() != NodeStates.HIDDEN)) if cell.getNodeState() != NodeStates.HIDDEN))
# check if greater and foreign OID was stored
if self.app.tm.updateLastOID(oid_list):
self.app.broadcastLastOID()
# Request locking data. # Request locking data.
# build a new set as we may not send the message to all nodes as some # build a new set as we may not send the message to all nodes as some
# might be not reachable at that time # might be not reachable at that time
......
...@@ -48,8 +48,9 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -48,8 +48,9 @@ class StorageServiceHandler(BaseServiceHandler):
def askLastIDs(self, conn): def askLastIDs(self, conn):
app = self.app app = self.app
conn.answer(Packets.AnswerLastIDs(app.loid, app.tm.getLastTID(), loid = app.tm.getLastOID()
app.pt.getID())) ltid = app.tm.getLastTID()
conn.answer(Packets.AnswerLastIDs(loid, ltid, app.pt.getID()))
def askUnfinishedTransactions(self, conn): def askUnfinishedTransactions(self, conn):
p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList()) p = Packets.AnswerUnfinishedTransactions(self.app.tm.getPendingList())
......
...@@ -59,7 +59,7 @@ class RecoveryManager(MasterHandler): ...@@ -59,7 +59,7 @@ class RecoveryManager(MasterHandler):
self.app.changeClusterState(ClusterStates.RECOVERING) self.app.changeClusterState(ClusterStates.RECOVERING)
em = self.app.em em = self.app.em
self.app.loid = None self.app.tm.setLastOID(None)
self.app.pt.setID(None) self.app.pt.setID(None)
# collect the last partition table available # collect the last partition table available
...@@ -81,7 +81,7 @@ class RecoveryManager(MasterHandler): ...@@ -81,7 +81,7 @@ class RecoveryManager(MasterHandler):
self.app.broadcastNodesInformation(refused_node_set) self.app.broadcastNodesInformation(refused_node_set)
logging.debug('cluster starts with loid=%s and this partition table :', logging.debug('cluster starts with loid=%s and this partition table :',
dump(self.app.loid)) dump(self.app.tm.getLastOID()))
self.app.pt.log() self.app.pt.log()
def buildFromScratch(self): def buildFromScratch(self):
...@@ -96,7 +96,7 @@ class RecoveryManager(MasterHandler): ...@@ -96,7 +96,7 @@ class RecoveryManager(MasterHandler):
node.setRunning() node.setRunning()
self.app.broadcastNodesInformation(node_list) self.app.broadcastNodesInformation(node_list)
# resert IDs generators # resert IDs generators
self.app.loid = '\0' * 8 self.app.tm.setLastOID('\0' * 8)
# build the partition with this node # build the partition with this node
pt.setID(pack('!Q', 1)) pt.setID(pack('!Q', 1))
pt.make(node_list) pt.make(node_list)
...@@ -115,13 +115,9 @@ class RecoveryManager(MasterHandler): ...@@ -115,13 +115,9 @@ class RecoveryManager(MasterHandler):
conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
app = self.app
# Get max values. # Get max values.
if loid is not None: if loid is not None:
if app.loid is None: self.app.tm.setLastOID(max(loid, self.app.tm.getLastOID()))
app.loid = loid
else:
app.loid = max(loid, app.loid)
if ltid is not None: if ltid is not None:
self.app.tm.setLastTID(ltid) self.app.tm.setLastTID(ltid)
if lptid > self.target_ptid: if lptid > self.target_ptid:
...@@ -130,7 +126,6 @@ class RecoveryManager(MasterHandler): ...@@ -130,7 +126,6 @@ class RecoveryManager(MasterHandler):
conn.ask(Packets.AskPartitionTable()) conn.ask(Packets.AskPartitionTable())
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
app = self.app
if ptid != self.target_ptid: if ptid != self.target_ptid:
# If this is not from a target node, ignore it. # If this is not from a target node, ignore it.
logging.warn('Got %s while waiting %s', dump(ptid), logging.warn('Got %s while waiting %s', dump(ptid),
......
...@@ -19,7 +19,6 @@ from time import time, gmtime ...@@ -19,7 +19,6 @@ from time import time, gmtime
from struct import pack, unpack from struct import pack, unpack
from neo.util import dump from neo.util import dump
from neo import logging from neo import logging
from neo import protocol
class Transaction(object): class Transaction(object):
""" """
...@@ -127,6 +126,7 @@ class TransactionManager(object): ...@@ -127,6 +126,7 @@ class TransactionManager(object):
# node -> transactions mapping # node -> transactions mapping
self._node_dict = {} self._node_dict = {}
self._last_tid = None self._last_tid = None
self._last_oid = None
def __getitem__(self, tid): def __getitem__(self, tid):
""" """
...@@ -143,6 +143,32 @@ class TransactionManager(object): ...@@ -143,6 +143,32 @@ class TransactionManager(object):
def items(self): def items(self):
return self._tid_dict.items() return self._tid_dict.items()
def getNextOIDList(self, num_oids):
""" Generate a new OID list """
if self._last_oid is None:
raise RuntimeError, 'I do not know the last OID'
oid = unpack('!Q', self._last_oid)[0] + 1
oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)]
self._last_oid = oid_list[-1]
return oid_list
def updateLastOID(self, oid_list):
"""
Updates the last oid with the max of those supplied if greater than
the current known, returns True if changed
"""
max_oid = oid_list and max(oid_list) or None # oid_list might be empty
if max_oid > self._last_oid:
self._last_oid = max_oid
return True
return False
def setLastOID(self, oid):
self._last_oid = oid
def getLastOID(self):
return self._last_oid
def _nextTID(self): def _nextTID(self):
""" Compute the next TID based on the current time and check collisions """ """ Compute the next TID based on the current time and check collisions """
tm = time() tm = time()
......
...@@ -39,7 +39,6 @@ class BaseMasterHandler(EventHandler): ...@@ -39,7 +39,6 @@ class BaseMasterHandler(EventHandler):
self.__class__.__name__) self.__class__.__name__)
def notifyLastOID(self, conn, oid): def notifyLastOID(self, conn, oid):
self.app.tm.setLastOID(oid)
self.app.dm.setLastOID(oid) self.app.dm.setLastOID(oid)
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, node_list):
......
...@@ -56,7 +56,6 @@ class InitializationHandler(BaseMasterHandler): ...@@ -56,7 +56,6 @@ class InitializationHandler(BaseMasterHandler):
self.app.has_partition_table = True self.app.has_partition_table = True
def answerLastIDs(self, conn, loid, ltid, lptid): def answerLastIDs(self, conn, loid, ltid, lptid):
self.app.tm.setLastOID(loid)
self.app.dm.setLastOID(loid) self.app.dm.setLastOID(loid)
self.app.has_last_ids = True self.app.has_last_ids = True
......
...@@ -114,8 +114,6 @@ class TransactionManager(object): ...@@ -114,8 +114,6 @@ class TransactionManager(object):
self._store_lock_dict = {} self._store_lock_dict = {}
self._load_lock_dict = {} self._load_lock_dict = {}
self._uuid_dict = {} self._uuid_dict = {}
self._loid = None
self._loid_seen = None
def __contains__(self, tid): def __contains__(self, tid):
""" """
...@@ -144,10 +142,6 @@ class TransactionManager(object): ...@@ -144,10 +142,6 @@ class TransactionManager(object):
result = result.getObject(oid) result = result.getObject(oid)
return result return result
def setLastOID(self, oid):
assert oid >= self._loid
self._loid = oid
def reset(self): def reset(self):
""" """
Reset the transaction manager Reset the transaction manager
...@@ -186,13 +180,6 @@ class TransactionManager(object): ...@@ -186,13 +180,6 @@ class TransactionManager(object):
self._app.dm.finishTransaction(tid) self._app.dm.finishTransaction(tid)
self.abort(tid, even_if_locked=True) self.abort(tid, even_if_locked=True)
# update loid if needed
if self._loid_seen > self._loid:
args = dump(self._loid_seen), dump(self._loid)
logging.warning('Greater OID used in StoreObject : %s > %s', *args)
self._loid = self._loid_seen
self._app.dm.setLastOID(self._loid)
def storeTransaction(self, tid, oid_list, user, desc, ext, packed): def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
""" """
Store transaction information received from client node Store transaction information received from client node
...@@ -240,9 +227,6 @@ class TransactionManager(object): ...@@ -240,9 +227,6 @@ class TransactionManager(object):
transaction = self._transaction_dict[tid] transaction = self._transaction_dict[tid]
transaction.addObject(oid, compression, checksum, data, value_serial) transaction.addObject(oid, compression, checksum, data, value_serial)
# update loid
self._loid_seen = oid
def abort(self, tid, even_if_locked=True): def abort(self, tid, even_if_locked=True):
""" """
Abort a transaction Abort a transaction
......
...@@ -295,6 +295,9 @@ class NeoTestBase(unittest.TestCase): ...@@ -295,6 +295,9 @@ class NeoTestBase(unittest.TestCase):
def checkAbortTransaction(self, conn, **kw): def checkAbortTransaction(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw) return self.checkNotifyPacket(conn, Packets.AbortTransaction, **kw)
def checkNotifyLastOID(self, conn, **kw):
return self.checkNotifyPacket(conn, Packets.NotifyLastOID, **kw)
def checkAnswerTransactionFinished(self, conn, **kw): def checkAnswerTransactionFinished(self, conn, **kw):
return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw) return self.checkAnswerPacket(conn, Packets.AnswerTransactionFinished, **kw)
......
...@@ -19,6 +19,7 @@ import os ...@@ -19,6 +19,7 @@ import os
import unittest import unittest
import transaction import transaction
import ZODB import ZODB
from struct import pack, unpack
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle from ZODB.tests.StorageTestBase import zodb_pickle
...@@ -300,6 +301,31 @@ class ClientTests(NEOFunctionalTest): ...@@ -300,6 +301,31 @@ class ClientTests(NEOFunctionalTest):
st3.tpc_finish(t3) st3.tpc_finish(t3)
self.runWithTimeout(10, test) self.runWithTimeout(10, test)
def testGreaterOIDSaved(self):
"""
Store an object with an OID greater than the last generated by the
master. This OID must be intercepted at commit, used for next OID
generations and persistently saved on storage nodes.
"""
self.neo = NEOCluster(['test_neo1'], replicas=0,
temp_dir=self.getTempDirectory())
neoctl = self.neo.getNEOCTL()
self.neo.start()
db1, conn1 = self.neo.getZODBConnection()
st1 = conn1._storage
t1 = transaction.Transaction()
rev = '\0' * 8
data = zodb_pickle(PObject())
my_oid = pack('!Q', 100000)
# store an object with this OID
st1.tpc_begin(t1)
st1.store(my_oid, rev, data, '', t1)
st1.tpc_vote(t1)
st1.tpc_finish(t1)
# request an oid, should be greater than mine
oid = st1.new_oid()
self.assertTrue(oid > my_oid)
def test_suite(): def test_suite():
return unittest.makeSuite(ClientTests) return unittest.makeSuite(ClientTests)
......
...@@ -80,12 +80,19 @@ class MasterClientHandlerTests(NeoTestBase): ...@@ -80,12 +80,19 @@ class MasterClientHandlerTests(NeoTestBase):
def test_08_askNewOIDs(self): def test_08_askNewOIDs(self):
service = self.service service = self.service
loid = self.app.loid oid1, oid2 = self.getOID(1), self.getOID(2)
self.app.tm.setLastOID(oid1)
# client call it # client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port) client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address) conn = self.getFakeConnection(client_uuid, self.client_address)
for node in self.app.nm.getStorageList():
conn = self.getFakeConnection(node.getUUID(), node.getAddress())
node.setConnection(conn)
service.askNewOIDs(conn, 1) service.askNewOIDs(conn, 1)
self.assertTrue(loid < self.app.loid) self.assertTrue(self.app.tm.getLastOID() > oid1)
for node in self.app.nm.getStorageList():
conn = node.getConnection()
self.assertEquals(self.checkNotifyLastOID(conn, decode=True), (oid2,))
def test_09_askFinishTransaction(self): def test_09_askFinishTransaction(self):
service = self.service service = self.service
......
...@@ -32,21 +32,6 @@ class MasterAppTests(NeoTestBase): ...@@ -32,21 +32,6 @@ class MasterAppTests(NeoTestBase):
def tearDown(self): def tearDown(self):
NeoTestBase.tearDown(self) NeoTestBase.tearDown(self)
def test_05_getNewOIDList(self):
# must raise as we don"t have one
self.assertEqual(self.app.loid, None)
self.app.loid = None
self.assertRaises(RuntimeError, self.app.getNewOIDList, 1)
# ask list
self.app.loid = p64(1)
oid_list = self.app.getNewOIDList(15)
self.assertEqual(len(oid_list), 15)
i = 2
# begin from 0, so generated oid from 1 to 15
for oid in oid_list:
self.assertEqual(u64(oid), i)
i+=1
def test_06_broadcastNodeInformation(self): def test_06_broadcastNodeInformation(self):
# defined some nodes to which data will be send # defined some nodes to which data will be send
master_uuid = self.getNewUUID() master_uuid = self.getNewUUID()
......
...@@ -93,26 +93,24 @@ class MasterRecoveryTests(NeoTestBase): ...@@ -93,26 +93,24 @@ class MasterRecoveryTests(NeoTestBase):
def test_09_answerLastIDs(self): def test_09_answerLastIDs(self):
recovery = self.recovery recovery = self.recovery
uuid = self.identifyToMasterNode() uuid = self.identifyToMasterNode()
loid = self.app.loid = '\1' * 8 oid1 = self.getOID(1)
self.app.tm.setLastTID('\1' * 8) oid2 = self.getOID(2)
ltid = self.app.tm.getLastTID() tid1 = self.getNextTID()
self.app.pt.setID('\1' * 8) tid2 = self.getNextTID(tid1)
lptid = self.app.pt.getID() ptid1 = self.getPTID(1)
ptid2 = self.getPTID(2)
self.app.tm.setLastOID(oid1)
self.app.tm.setLastTID(tid1)
self.app.pt.setID(ptid1)
# send information which are later to what PMN knows, this must update target node # send information which are later to what PMN knows, this must update target node
conn = self.getFakeConnection(uuid, self.storage_port) conn = self.getFakeConnection(uuid, self.storage_port)
new_ptid = unpack('!Q', lptid)[0] self.assertTrue(ptid2 > self.app.pt.getID())
new_ptid = pack('!Q', new_ptid + 1) self.assertTrue(oid2 > self.app.tm.getLastOID())
oid = unpack('!Q', loid)[0] self.assertTrue(tid2 > self.app.tm.getLastTID())
new_oid = pack('!Q', oid + 1) recovery.answerLastIDs(conn, oid2, tid2, ptid2)
upper, lower = unpack('!LL', ltid) self.assertEquals(oid2, self.app.tm.getLastOID())
new_tid = pack('!LL', upper, lower + 10) self.assertEquals(tid2, self.app.tm.getLastTID())
self.assertTrue(new_ptid > self.app.pt.getID()) self.assertEquals(ptid2, recovery.target_ptid)
self.assertTrue(new_oid > self.app.loid)
self.assertTrue(new_tid > self.app.tm.getLastTID())
recovery.answerLastIDs(conn, new_oid, new_tid, new_ptid)
self.assertEquals(new_oid, self.app.loid)
self.assertEquals(new_tid, self.app.tm.getLastTID())
self.assertEquals(new_ptid, recovery.target_ptid)
def test_10_answerPartitionTable(self): def test_10_answerPartitionTable(self):
......
...@@ -126,8 +126,9 @@ class MasterStorageHandlerTests(NeoTestBase): ...@@ -126,8 +126,9 @@ class MasterStorageHandlerTests(NeoTestBase):
# give a uuid # give a uuid
conn = self.getFakeConnection(node.getUUID(), self.storage_address) conn = self.getFakeConnection(node.getUUID(), self.storage_address)
ptid = self.app.pt.getID() ptid = self.app.pt.getID()
oid = self.app.loid = '\1' * 8 oid = self.getOID(1)
tid = '\1' * 8 tid = self.getNextTID()
self.app.tm.setLastOID(oid)
self.app.tm.setLastTID(tid) self.app.tm.setLastTID(tid)
service.askLastIDs(conn) service.askLastIDs(conn)
packet = self.checkAnswerLastIDs(conn) packet = self.checkAnswerLastIDs(conn)
...@@ -136,7 +137,6 @@ class MasterStorageHandlerTests(NeoTestBase): ...@@ -136,7 +137,6 @@ class MasterStorageHandlerTests(NeoTestBase):
self.assertEqual(ltid, tid) self.assertEqual(ltid, tid)
self.assertEqual(lptid, ptid) self.assertEqual(lptid, ptid)
def test_13_askUnfinishedTransactions(self): def test_13_askUnfinishedTransactions(self):
service = self.service service = self.service
node, conn = self.identifyToMasterNode() node, conn = self.identifyToMasterNode()
......
...@@ -101,6 +101,19 @@ class testTransactionManager(NeoTestBase): ...@@ -101,6 +101,19 @@ class testTransactionManager(NeoTestBase):
self.assertEqual(txnman.getPendingList(), []) self.assertEqual(txnman.getPendingList(), [])
self.assertFalse(txnman.hasPending()) self.assertFalse(txnman.hasPending())
def test_getNextOIDList(self):
txnman = TransactionManager()
# must raise as we don"t have one
self.assertEqual(txnman.getLastOID(), None)
self.assertRaises(RuntimeError, txnman.getNextOIDList, 1)
# ask list
txnman.setLastOID(self.getOID(1))
oid_list = txnman.getNextOIDList(15)
self.assertEqual(len(oid_list), 15)
# begin from 1, so generated oid from 2 to 16
for i, oid in zip(xrange(len(oid_list)), oid_list):
self.assertEqual(oid, self.getOID(i+2))
def test_getNextTID(self): def test_getNextTID(self):
txnman = TransactionManager() txnman = TransactionManager()
# no previous TID # no previous TID
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment