Commit 2730315a authored by Yoshinori Okuji's avatar Yoshinori Okuji

Detect a conflict correctly. Accept Notify Node Identification from any master...

Detect a conflict correctly. Accept Notify Node Identification from any master node. Enhance the output of PartitionTable.log.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@146 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 116b33a0
...@@ -539,7 +539,7 @@ class Application(ThreadingMixIn, object): ...@@ -539,7 +539,7 @@ class Application(ThreadingMixIn, object):
return return
# Call function given by ZODB # Call function given by ZODB
if f is not None: if f is not None:
f(self.tid) f(self.tid)
# Call finish on master # Call finish on master
oid_list = self.txn_data_dict.keys() oid_list = self.txn_data_dict.keys()
conn = self.master_conn conn = self.master_conn
......
...@@ -9,6 +9,7 @@ from neo.node import MasterNode, StorageNode, ClientNode ...@@ -9,6 +9,7 @@ from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.client.NEOStorage import NEOStorageError from neo.client.NEOStorage import NEOStorageError
from neo.exception import ElectionFailure from neo.exception import ElectionFailure
from neo.util import dump
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
...@@ -244,10 +245,12 @@ class ClientEventHandler(EventHandler): ...@@ -244,10 +245,12 @@ class ClientEventHandler(EventHandler):
app = self.app app = self.app
nm = app.nm nm = app.nm
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
# This must be sent only by primary master node # This must be sent only by a primary master node.
if not isinstance(node, MasterNode) \ # Note that this may be sent before I know that it is
or app.primary_master_node is None \ # a primary master node.
or app.primary_master_node.getUUID() != uuid: if not isinstance(node, MasterNode):
logging.warn('ignoring notify node information from %s',
dump(uuid))
return return
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
...@@ -287,6 +290,7 @@ class ClientEventHandler(EventHandler): ...@@ -287,6 +290,7 @@ class ClientEventHandler(EventHandler):
n = app.nm.getNodeByUUID(uuid) n = app.nm.getNodeByUUID(uuid)
if n is not None: if n is not None:
app.nm.remove(n) app.nm.remove(n)
continue
n.setState(state) n.setState(state)
else: else:
...@@ -385,7 +389,7 @@ class ClientEventHandler(EventHandler): ...@@ -385,7 +389,7 @@ class ClientEventHandler(EventHandler):
def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial): def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
app = self.app app = self.app
if conflicting == '1': if conflicting:
app.txn_object_stored = -1, serial app.txn_object_stored = -1, serial
else: else:
app.txn_object_stored = oid, serial app.txn_object_stored = oid, serial
......
...@@ -143,15 +143,20 @@ class PartitionTable(object): ...@@ -143,15 +143,20 @@ class PartitionTable(object):
node_dict[node] = i node_dict[node] = i
for node, i in node_dict.iteritems(): for node, i in node_dict.iteritems():
logging.debug('pt: node %d: %s', i, dump(node.getUUID())) logging.debug('pt: node %d: %s', i, dump(node.getUUID()))
state_dict = { UP_TO_DATE_STATE: 'U', node_state_dict = { RUNNING_STATE: 'R',
OUT_OF_DATE_STATE: 'O', TEMPORARILY_DOWN_STATE: 'T',
FEEDING_STATE: 'F' } DOWN_STATE: 'D',
BROKEN_STATE: 'B' }
cell_state_dict = { UP_TO_DATE_STATE: 'U',
OUT_OF_DATE_STATE: 'O',
FEEDING_STATE: 'F' }
for offset, row in enumerate(self.partition_list): for offset, row in enumerate(self.partition_list):
desc_list = [] desc_list = []
for cell in row: for cell in row:
i = node_dict[cell.getNode()] i = node_dict[cell.getNode()]
s = state_dict[cell.getState()] cell_state = cell_state_dict[cell.getState()]
desc_list.append('%d %s' % (i, s)) node_state = node_state_dict[cell.getNodeState()]
desc_list.append('%d %s %s' % (i, cell_state, node_state))
logging.debug('pt: row %d: %s', offset, ', '.join(desc_list)) logging.debug('pt: row %d: %s', offset, ', '.join(desc_list))
def operational(self): def operational(self):
......
...@@ -370,6 +370,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -370,6 +370,7 @@ class OperationEventHandler(StorageEventHandler):
else: else:
# If a newer transaction already locks this object, # If a newer transaction already locks this object,
# do not try to resolve a conflict, so return immediately. # do not try to resolve a conflict, so return immediately.
logging.info('unresolvable conflict in %s', dump(oid))
conn.addPacket(Packet().answerStoreObject(packet.getId(), 1, conn.addPacket(Packet().answerStoreObject(packet.getId(), 1,
oid, locking_tid)) oid, locking_tid))
return return
...@@ -379,6 +380,7 @@ class OperationEventHandler(StorageEventHandler): ...@@ -379,6 +380,7 @@ class OperationEventHandler(StorageEventHandler):
if history_list: if history_list:
last_serial = history_list[0][0] last_serial = history_list[0][0]
if last_serial != serial: if last_serial != serial:
logging.info('resolvable conflict in %s', dump(oid))
conn.addPacket(Packet().answerStoreObject(packet.getId(), 1, conn.addPacket(Packet().answerStoreObject(packet.getId(), 1,
oid, last_serial)) oid, last_serial))
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment