Commit d08804c5 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement more protocols. Outdate down nodes. Fix some typos.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@32 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 08cfd95a
...@@ -8,6 +8,9 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT ...@@ -8,6 +8,9 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \ REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \ STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \ ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OIDS_BY_TID, ANSWER_OIDS_BY_TID, ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \ NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \ PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE INTERNAL_ERROR_CODE
...@@ -149,6 +152,31 @@ class EventHandler(object): ...@@ -149,6 +152,31 @@ class EventHandler(object):
def handleStopOperation(self, conn, packet): def handleStopOperation(self, conn, packet):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskUnfinishedTransactions(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
self.handleUnexpectedPacket(conn, packet)
def handleAskOIDsByTID(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerOIDsByTID(self, conn, packet, oid_list, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskObjectPresent(self, conn, packet, oid, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerObjectPresent(self, conn, packet, oid, tid):
self.handleUnexpectedPacket(conn, packet)
def handleDeleteTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleCommitTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
# Error packet handlers. # Error packet handlers.
handleNotReady = handleUnexpectedPacket handleNotReady = handleUnexpectedPacket
...@@ -190,6 +218,14 @@ class EventHandler(object): ...@@ -190,6 +218,14 @@ class EventHandler(object):
d[NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges d[NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges
d[START_OPERATION] = self.handleStartOperation d[START_OPERATION] = self.handleStartOperation
d[STOP_OPERATION] = self.handleStopOperation d[STOP_OPERATION] = self.handleStopOperation
d[ASK_UNFINISHED_TRANSACTIONS] = self.handleAskUnfinishedTransactions
d[ANSWER_UNFINISHED_TRANSACTIONS] = self.handleAnswerUnfinishedTransactions
d[ASK_OIDS_BY_TID] = self.handleAskOIDsByTID
d[ANSWER_OIDS_BY_TID] = self.handleAnswerOIDsByTID
d[ASK_OBJECT_PRESENT] = self.handleAskObjectPresent
d[ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
d[DELETE_TRANSACTION] = self.handleDeleteTransaction
d[COMMIT_TRANSACTION] = self.handleCommitTransaction
self.packet_dispatch_table = d self.packet_dispatch_table = d
......
...@@ -559,9 +559,12 @@ class Application(object): ...@@ -559,9 +559,12 @@ class Application(object):
# If possible, send the packets now. # If possible, send the packets now.
em.poll(0) em.poll(0)
# At this stage, all non-working nodes are out-of-date.
cell_list = self.pt.outdate()
# Tweak the partition table, if the distribution of storage nodes # Tweak the partition table, if the distribution of storage nodes
# is not uniform. # is not uniform.
cell_list = self.pt.tweak() cell_list.extend(self.pt.tweak())
# And, add unused nodes. # And, add unused nodes.
node_list = self.pt.getNodeList() node_list = self.pt.getNodeList()
......
...@@ -76,6 +76,31 @@ START_OPERATION = 0x000b ...@@ -76,6 +76,31 @@ START_OPERATION = 0x000b
# it must not serve client nodes. PM -> S. # it must not serve client nodes. PM -> S.
STOP_OPERATION = 0x000c STOP_OPERATION = 0x000c
# Ask unfinished transactions' IDs. PM -> S.
ASK_UNFINISHED_TRANSACTIONS = 0x000d
# Answer unfinished transactions' IDs. S -> PM.
ANSWER_UNFINISHED_TRANSACTIONS = 0x800d
# Ask OIDs by a TID. PM -> S.
ASK_OIDS_BY_TID = 0x000e
# Answer OIDs by a TID. S -> PM.
ANSWER_OIDS_BY_TID = 0x800e
# Ask if an object is present. If not present, OID_NOT_FOUND should be returned. PM -> S.
ASK_OBJECT_PRESENT = 0x000f
# Answer that an object is present. PM -> S.
ANSWER_OBJECT_PRESENT = 0x800f
# Delete a transaction. PM -> S.
DELETE_TRANSACTION = 0x0010
# Commit a transaction. PM -> S.
COMMIT_TRANSACTION = 0x0011
# Error codes. # Error codes.
NOT_READY_CODE = 1 NOT_READY_CODE = 1
OID_NOT_FOUND_CODE = 2 OID_NOT_FOUND_CODE = 2
...@@ -307,10 +332,62 @@ class Packet(object): ...@@ -307,10 +332,62 @@ class Packet(object):
def stopOperation(self, msg_id): def stopOperation(self, msg_id):
self._id = msg_id self._id = msg_id
self._type = START_OPERATION self._type = STOP_OPERATION
self._body = ''
return self
def askUnfinishedTransactions(self, msg_id):
self._id = msg_id
self._type = ASK_UNFINISHED_TRANSACTIONS
self._body = '' self._body = ''
return self return self
def answerUnfinishedTransactions(self, msg_id, tid_list):
self._id = msg_id
self._type = ANSWER_UNFINISHED_TRANSACTIONS
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
self._body = ''.join(body)
return self
def askOIDsByTID(self, msg_id, tid):
self._id = msg_id
self._type = ASK_OIDS_BY_TID
self._body = tid
return self
def answerOIDsByTID(self, msg_id, oid_list, tid):
self._id = msg_id
self._type = ANSWER_OIDS_BY_TID
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
def askObjectPresent(self, msg_id, oid, tid):
self._id = msg_id
self._type = ASK_OBJECT_PRESENT
self._body = oid + tid
return self
def answerObjectPresent(self, msg_id, oid, tid):
self._id = msg_id
self._type = ANSWER_OBJECT_PRESENT
self._body = oid + tid
return self
def deleteTransaction(self, msg_id, tid):
self._id = msg_id
self._type = DELETE_TRANSACTION
self._body = tid
return self
def commitTransaction(self, msg_id, tid):
self._id = msg_id
self._type = COMMIT_TRANSACTION
self._body = tid
return self
# Decoders. # Decoders.
def decode(self): def decode(self):
try: try:
...@@ -497,3 +574,71 @@ class Packet(object): ...@@ -497,3 +574,71 @@ class Packet(object):
def _decodeStopOperation(self): def _decodeStopOperation(self):
pass pass
decode_table[STOP_OPERATION] = _decodeStopOperation decode_table[STOP_OPERATION] = _decodeStopOperation
def _decodeAskUnfinishedTransactions(self):
pass
decode_table[ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions
def _decodeAnswerUnfinishedTransactions(self):
try:
n = unpack('!L', self._body[:4])
tid_list = []
for i in xrange(n):
tid = unpack('8s', self._body[4+i*8:12+i*8])
tid_list.append(tid)
except:
raise ProtocolError(self, 'invalid answer unfinished transactions')
return tid_list
decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
def _decodeAskOIDsByTID(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid ask oids by tid')
return tid
decode_table[ASK_OIDS_BY_TID] = _decodeAskOIDsByTID
def _decodeAnswerOIDsByTID(self):
try:
tid, n = unpack('!8sL', self._body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[12+i*8:20+i*8])
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid answer oids by tid')
return oid_list, tid
decode_table[ANSWER_OIDS_BY_TID] = _decodeAnswerOIDsByTID
def _decodeAskObjectPresent(self):
try:
oid, tid = unpack('8s8s', self._body)
except:
raise ProtocolError(self, 'invalid ask object present')
return oid, tid
decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
def _decodeAnswerObjectPresent(self):
try:
oid, tid = unpack('8s8s', self._body)
except:
raise ProtocolError(self, 'invalid answer object present')
return oid, tid
decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
def _decodeDeleteTransaction(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid delete transaction')
return tid
decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction
def _decodeCommitTransaction(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid commit transaction')
return tid
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
...@@ -76,7 +76,7 @@ class PartitionTable(object): ...@@ -76,7 +76,7 @@ class PartitionTable(object):
for i in xrange(repeats): for i in xrange(repeats):
node = node_list[index] node = node_list[index]
row.append(Cell(node)) row.append(Cell(node))
self.count_dict.setdefault(node, 0) += 1 self.count_dict[node] = self.count_dict.get(node, 0) + 1
index += 1 index += 1
if index == len(node_list): if index == len(node_list):
index = 0 index = 0
...@@ -93,7 +93,7 @@ class PartitionTable(object): ...@@ -93,7 +93,7 @@ class PartitionTable(object):
# Create a new row. # Create a new row.
row = [Cell(node, state)] row = [Cell(node, state)]
if state != FEEDING_STATE: if state != FEEDING_STATE:
self.count_dict.setdefault(node, 0) += 1 self.count_dict[node] = self.count_dict.get(node, 0) + 1
self.partition_list[offset] = row self.partition_list[offset] = row
self.num_filled_rows += 1 self.num_filled_rows += 1
...@@ -104,11 +104,11 @@ class PartitionTable(object): ...@@ -104,11 +104,11 @@ class PartitionTable(object):
if cell.getNode() == node: if cell.getNode() == node:
row.remove(cell) row.remove(cell)
if state != FEEDING_STATE: if state != FEEDING_STATE:
self.count_dict.setdefault(node, 0) -= 1 self.count_dict[node] = self.count_dict.get(node, 0) - 1
break break
row.append(Cell(node, state)) row.append(Cell(node, state))
if state != FEEDING_STATE: if state != FEEDING_STATE:
self.count_dict.setdefault(node, 0) += 1 self.count_dict[node] = self.count_dict.get(node, 0) + 1
def filled(self): def filled(self):
return self.num_filled_rows == self.np return self.num_filled_rows == self.np
...@@ -304,3 +304,14 @@ class PartitionTable(object): ...@@ -304,3 +304,14 @@ class PartitionTable(object):
# nodes by replacing cells. # nodes by replacing cells.
return changed_cell_list return changed_cell_list
def outdate(self):
"""Outdate all non-working nodes."""
cell_list = []
for offset, row in enumerate(self.partition_list):
for cell in row:
if cell.getNodeState() != RUNNING_STATE \
and cell.getState() != OUT_OF_DATE_STATE:
cell.setState(OUT_OF_DATE_STATE)
cell_list.append((offset, cell.getUUID(), OUT_OF_DATE_STATE))
return cell_list
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment