Commit 1e8b930a authored by Grégory Wisniewski's avatar Grégory Wisniewski

Master broadcast to storage nodes the last OID generated. Storage store this

information in their configuration table instead of looking for the greater OID
in the obj table. When a client ask to store an object with an OID greater than
ther last notified by the primary master, the storage log a warning message.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1026 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent fb06337c
......@@ -359,6 +359,14 @@ class Application(object):
if node_list:
conn.notify(protocol.notifyNodeInformation(node_list))
def broadcastLastOID(self, oid):
logging.debug('Broadcast last OID to storages : %s' % dump(oid))
packet = protocol.notifyLastOID(oid)
for conn in self.em.getConnectionList():
node = self.nm.getNodeByUUID(conn.getUUID())
if node is not None and node.isStorage():
conn.notify(packet)
def buildFromScratch(self):
nm, em, pt = self.nm, self.em, self.pt
logging.debug('creating a new partition table, wait for a storage node')
......@@ -370,8 +378,8 @@ class Application(object):
node.setState(protocol.RUNNING_STATE)
self.broadcastNodeInformation(node)
# resert IDs generators
self.loid = '\0'*8
self.ltid = '\0'*8
self.loid = '\0' * 8
self.ltid = '\0' * 8
# build the partition with this node
pt.setID(pack('!Q', 1))
pt.make([node])
......@@ -682,7 +690,8 @@ class Application(object):
handler.connectionCompleted(conn)
self.cluster_state = state
# XXX: switch this to private and optimize since it's used only to generate
# ranges of OIDs
def getNextOID(self):
if self.loid is None:
raise RuntimeError, 'I do not know the last OID'
......@@ -719,7 +728,9 @@ class Application(object):
return unpack('!Q', oid_or_tid)[0] % self.pt.getPartitions()
def getNewOIDList(self, num_oids):
return [self.getNextOID() for i in xrange(num_oids)]
oid_list = [self.getNextOID() for i in xrange(num_oids)]
self.broadcastLastOID(self.loid)
return oid_list
def getNewUUID(self, node_type):
# build an UUID
......
......@@ -107,8 +107,7 @@ class ClientServiceHandler(BaseServiceHandler):
conn.answer(protocol.answerBeginTransaction(tid), packet)
def handleAskNewOIDs(self, conn, packet, num_oids):
app = self.app
oid_list = app.getNewOIDList(num_oids)
oid_list = self.app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet)
def handleFinishTransaction(self, conn, packet, oid_list, tid):
......
......@@ -59,6 +59,7 @@ class Application(object):
self.dm = MySQLDatabaseManager(database = config.getDatabase(),
user = config.getUser(),
password = config.getPassword())
self.loid = None
# The partition table is initialized after getting the number of
# partitions.
self.pt = None
......
......@@ -49,6 +49,10 @@ class BaseMasterHandler(BaseStorageHandler):
def handleNotifyClusterInformation(self, conn, packet, state):
logging.error('ignoring notify cluster information in %s' % self.__class__.__name__)
def handleNotifyLastOID(self, conn, packet, oid):
self.app.loid = oid
self.app.dm.setLastOID(oid)
def handleNotifyNodeInformation(self, conn, packet, node_list):
"""Store information on nodes, only if this is sent by a primary
master node."""
......
......@@ -107,6 +107,11 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid):
if oid != protocol.INVALID_OID and oid > self.app.loid:
# TODO: notify the master to invalidate lower OIDs
args = dump(oid), dump(self.app.loid)
logging.warning('Greater OID used in StoreObject : %s > %s', *args)
# XXX: should check if the oid match with our partition table
uuid = conn.getUUID()
# First, check for the locking state.
app = self.app
......
......@@ -231,23 +231,16 @@ class MySQLDatabaseManager(DatabaseManager):
def setPTID(self, ptid):
self.setConfiguration('ptid', ptid)
def getLastOID(self):
return self.getConfiguration('loid')
def setLastOID(self, loid):
self.setConfiguration('loid', loid)
def getPartitionTable(self):
q = self.query
return q("""SELECT rid, uuid, state FROM pt""")
def getLastOID(self, all = True):
q = self.query
self.begin()
loid = q("""SELECT MAX(oid) FROM obj""")[0][0]
if all:
tmp_loid = q("""SELECT MAX(oid) FROM tobj""")[0][0]
if loid is None or (tmp_loid is not None and loid < tmp_loid):
loid = tmp_loid
self.commit()
if loid is not None:
loid = p64(loid)
return loid
def getLastTID(self, all = True):
# XXX this does not consider serials in obj.
# I am not sure if this is really harmful. For safety,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment