Commit 6df1aa10 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Do not use INVALID_OID/INVALID_TID/INVALID_PTID out of protocol.py module,

handle special cases with None values, protocol handle the few cases where a TID
can be invalid to indicate there is no value.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@872 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ad0d2ccf
...@@ -19,7 +19,7 @@ import logging ...@@ -19,7 +19,7 @@ import logging
from time import time from time import time
from neo.config import ConfigurationManager from neo.config import ConfigurationManager
from neo.protocol import INVALID_PTID, MASTER_NODE_TYPE from neo.protocol import MASTER_NODE_TYPE
from neo.node import NodeManager, MasterNode from neo.node import NodeManager, MasterNode
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
...@@ -73,7 +73,7 @@ class Application(object): ...@@ -73,7 +73,7 @@ class Application(object):
self.pt = None self.pt = None
self.uuid = None self.uuid = None
self.primary_master_node = None self.primary_master_node = None
self.ptid = INVALID_PTID self.ptid = None
self.monitoring_handler = MasterMonitoringEventHandler(self) self.monitoring_handler = MasterMonitoringEventHandler(self)
self.request_handler = MasterRequestEventHandler(self) self.request_handler = MasterRequestEventHandler(self)
self.dispatcher = Dispatcher() self.dispatcher = Dispatcher()
......
...@@ -26,9 +26,7 @@ from neo.client.mq import MQ ...@@ -26,9 +26,7 @@ from neo.client.mq import MQ
from neo.node import NodeManager, MasterNode, StorageNode from neo.node import NodeManager, MasterNode, StorageNode
from neo.connection import MTClientConnection from neo.connection import MTClientConnection
from neo import protocol from neo import protocol
from neo.protocol import INVALID_TID, INVALID_PARTITION, \ from neo.protocol import DOWN_STATE, HIDDEN_STATE
INVALID_PTID, CLIENT_NODE_TYPE, INVALID_SERIAL, \
DOWN_STATE, HIDDEN_STATE
from neo.client.handlers.master import PrimaryBootstrapHandler, \ from neo.client.handlers.master import PrimaryBootstrapHandler, \
PrimaryNotificationsHandler, PrimaryAnswersHandler PrimaryNotificationsHandler, PrimaryAnswersHandler
from neo.client.handlers.storage import StorageBootstrapHandler, \ from neo.client.handlers.storage import StorageBootstrapHandler, \
...@@ -83,7 +81,7 @@ class ConnectionPool(object): ...@@ -83,7 +81,7 @@ class ConnectionPool(object):
logging.error('Connection to storage node %s failed', node) logging.error('Connection to storage node %s failed', node)
return None return None
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
app.uuid, '0.0.0.0', 0, app.name) app.uuid, '0.0.0.0', 0, app.name)
msg_id = conn.ask(app.local_var.queue, p) msg_id = conn.ask(app.local_var.queue, p)
finally: finally:
...@@ -255,7 +253,7 @@ class Application(object): ...@@ -255,7 +253,7 @@ class Application(object):
self.uuid = None self.uuid = None
self.mq_cache = MQ() self.mq_cache = MQ()
self.new_oid_list = [] self.new_oid_list = []
self.ptid = INVALID_PTID self.ptid = None
self.storage_event_handler = StorageEventHandler(self, self.dispatcher) self.storage_event_handler = StorageEventHandler(self, self.dispatcher)
self.storage_bootstrap_handler = StorageBootstrapHandler(self) self.storage_bootstrap_handler = StorageBootstrapHandler(self)
self.storage_handler = StorageAnswersHandler(self) self.storage_handler = StorageAnswersHandler(self)
...@@ -441,7 +439,7 @@ class Application(object): ...@@ -441,7 +439,7 @@ class Application(object):
self.trying_master_node) self.trying_master_node)
self.primary_master_node = None self.primary_master_node = None
break break
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(protocol.CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name) self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(self.local_var.queue, p) msg_id = conn.ask(self.local_var.queue, p)
finally: finally:
...@@ -526,7 +524,7 @@ class Application(object): ...@@ -526,7 +524,7 @@ class Application(object):
return hist[1][0][0] return hist[1][0][0]
def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0): def _load(self, oid, serial=None, tid=None, cache=0):
"""Internal method which manage load ,loadSerial and loadBefore.""" """Internal method which manage load ,loadSerial and loadBefore."""
cell_list = self._getCellListForID(oid, readable=True) cell_list = self._getCellListForID(oid, readable=True)
if len(cell_list) == 0: if len(cell_list) == 0:
...@@ -592,8 +590,6 @@ class Application(object): ...@@ -592,8 +590,6 @@ class Application(object):
self.mq_cache[oid] = start_serial, data self.mq_cache[oid] = start_serial, data
finally: finally:
self._cache_lock_release() self._cache_lock_release()
if end_serial == INVALID_SERIAL:
end_serial = None
return data, start_serial, end_serial return data, start_serial, end_serial
...@@ -625,8 +621,6 @@ class Application(object): ...@@ -625,8 +621,6 @@ class Application(object):
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
"""Load an object for a given oid before tid committed.""" """Load an object for a given oid before tid committed."""
# Do not try in cache as it manages only up-to-date object # Do not try in cache as it manages only up-to-date object
if tid is None:
tid = INVALID_TID
logging.debug('loading %s before %s', dump(oid), dump(tid)) logging.debug('loading %s before %s', dump(oid), dump(tid))
data, start, end = self._load(oid, tid=tid) data, start, end = self._load(oid, tid=tid)
if end is None: if end is None:
...@@ -657,8 +651,6 @@ class Application(object): ...@@ -657,8 +651,6 @@ class Application(object):
"""Store object.""" """Store object."""
if transaction is not self.local_var.txn: if transaction is not self.local_var.txn:
raise StorageTransactionError(self, transaction) raise StorageTransactionError(self, transaction)
if serial is None:
serial = INVALID_SERIAL
logging.debug('storing oid %s serial %s', logging.debug('storing oid %s serial %s',
dump(oid), dump(serial)) dump(oid), dump(serial))
# Find which storage node to use # Find which storage node to use
...@@ -884,7 +876,7 @@ class Application(object): ...@@ -884,7 +876,7 @@ class Application(object):
try: try:
conn.ask(self.local_var.queue, conn.ask(self.local_var.queue,
protocol.askTIDs(first, last, INVALID_PARTITION)) protocol.askTIDs(first, last, protocol.INVALID_PARTITION))
finally: finally:
conn.unlock() conn.unlock()
......
...@@ -24,7 +24,6 @@ from neo.config import ConfigurationManager ...@@ -24,7 +24,6 @@ from neo.config import ConfigurationManager
from neo import protocol from neo import protocol
from neo.protocol import \ from neo.protocol import \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
INVALID_OID, INVALID_TID, INVALID_PTID, \
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \ CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
UUID_NAMESPACES, ADMIN_NODE_TYPE, BOOTING UUID_NAMESPACES, ADMIN_NODE_TYPE, BOOTING
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
...@@ -77,9 +76,9 @@ class Application(object): ...@@ -77,9 +76,9 @@ class Application(object):
self.uuid = self.getNewUUID(MASTER_NODE_TYPE) self.uuid = self.getNewUUID(MASTER_NODE_TYPE)
# The last OID. # The last OID.
self.loid = INVALID_OID self.loid = None
# The last TID. # The last TID.
self.ltid = INVALID_TID self.ltid = None
# The target node's uuid to request next. # The target node's uuid to request next.
self.target_uuid = None self.target_uuid = None
...@@ -372,9 +371,9 @@ class Application(object): ...@@ -372,9 +371,9 @@ class Application(object):
self.changeClusterState(protocol.RECOVERING) self.changeClusterState(protocol.RECOVERING)
em = self.em em = self.em
self.loid = INVALID_OID self.loid = None
self.ltid = INVALID_TID self.ltid = None
self.pt.setID(INVALID_PTID) self.pt.setID(None)
self.target_uuid = None self.target_uuid = None
# collect the last partition table available # collect the last partition table available
...@@ -386,7 +385,7 @@ class Application(object): ...@@ -386,7 +385,7 @@ class Application(object):
self.changeClusterState(protocol.VERIFYING) self.changeClusterState(protocol.VERIFYING)
logging.info('startup allowed') logging.info('startup allowed')
if self.pt.getID() == INVALID_PTID: if self.pt.getID() is None:
self.buildFromScratch() self.buildFromScratch()
# FIXME: storage node with existing partition but not in the selected PT # FIXME: storage node with existing partition but not in the selected PT
......
...@@ -21,7 +21,7 @@ from neo import protocol ...@@ -21,7 +21,7 @@ from neo import protocol
from neo.protocol import RUNNING_STATE, BROKEN_STATE, \ from neo.protocol import RUNNING_STATE, BROKEN_STATE, \
TEMPORARILY_DOWN_STATE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE TEMPORARILY_DOWN_STATE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE
from neo.master.handlers import MasterHandler from neo.master.handlers import MasterHandler
from neo.protocol import UnexpectedPacketError, INVALID_PTID from neo.protocol import UnexpectedPacketError
from neo.node import StorageNode from neo.node import StorageNode
from neo.util import dump from neo.util import dump
...@@ -89,7 +89,7 @@ class RecoveryHandler(MasterHandler): ...@@ -89,7 +89,7 @@ class RecoveryHandler(MasterHandler):
app.loid = loid app.loid = loid
if app.ltid < ltid: if app.ltid < ltid:
app.ltid = ltid app.ltid = ltid
if app.pt.getID() == INVALID_PTID or app.pt.getID() < lptid: if app.pt.getID() is None or app.pt.getID() < lptid:
# something newer # something newer
app.pt.setID(lptid) app.pt.setID(lptid)
app.target_uuid = conn.getUUID() app.target_uuid = conn.getUUID()
......
...@@ -28,7 +28,7 @@ class PartitionTable(neo.pt.PartitionTable): ...@@ -28,7 +28,7 @@ class PartitionTable(neo.pt.PartitionTable):
self.id = id self.id = id
def setNextID(self): def setNextID(self):
if self.id == protocol.INVALID_PTID: if self.id is None:
raise RuntimeError, 'I do not know the last Partition Table ID' raise RuntimeError, 'I do not know the last Partition Table ID'
last_id = unpack('!Q', self.id)[0] last_id = unpack('!Q', self.id)[0]
self.id = pack('!Q', last_id + 1) self.id = pack('!Q', last_id + 1)
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
import logging import logging
from neo.protocol import node_types, node_states from neo.protocol import node_types, node_states
from neo.protocol import INVALID_PTID
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.exception import OperationFailure from neo.exception import OperationFailure
...@@ -144,7 +143,7 @@ class Application(object): ...@@ -144,7 +143,7 @@ class Application(object):
self.connector_handler = getConnectorHandler(handler) self.connector_handler = getConnectorHandler(handler)
self.server = (ip, port) self.server = (ip, port)
self.em = EventManager() self.em = EventManager()
self.ptid = INVALID_PTID self.ptid = None
def getConnection(self): def getConnection(self):
if self.conn is None: if self.conn is None:
......
...@@ -15,11 +15,9 @@ ...@@ -15,11 +15,9 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import struct from struct import pack, unpack, error
from struct import pack, unpack
from socket import inet_ntoa, inet_aton from socket import inet_ntoa, inet_aton
class EnumItem(int): class EnumItem(int):
""" """
Enumerated value type. Enumerated value type.
...@@ -364,11 +362,13 @@ VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, ...@@ -364,11 +362,13 @@ VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE,
# Other constants. # Other constants.
INVALID_UUID = '\0' * 16 INVALID_UUID = '\0' * 16
INVALID_TID = '\0' * 8 INVALID_TID = '\0' * 8
INVALID_SERIAL = '\0' * 8
INVALID_OID = '\0' * 8
INVALID_PTID = '\0' * 8 INVALID_PTID = '\0' * 8
INVALID_PARTITION = 0xffffffff INVALID_PARTITION = 0xffffffff
# TODO: delete those definitions when tests are fixed
INVALID_SERIAL = INVALID_TID
INVALID_OID = '\0' * 16
STORAGE_NS = 'S' STORAGE_NS = 'S'
MASTER_NS = 'M' MASTER_NS = 'M'
CLIENT_NS = 'C' CLIENT_NS = 'C'
...@@ -475,7 +475,7 @@ def handle_errors(decoder): ...@@ -475,7 +475,7 @@ def handle_errors(decoder):
def wrapper(body): def wrapper(body):
try: try:
return decoder(body) return decoder(body)
except struct.error, msg: except error, msg: # struct.error
name = decoder.__name__ name = decoder.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg)) raise PacketMalformedError("%s fail (%s)" % (name, msg))
except PacketMalformedError, msg: except PacketMalformedError, msg:
...@@ -511,21 +511,16 @@ def _encodeUUID(uuid): ...@@ -511,21 +511,16 @@ def _encodeUUID(uuid):
return INVALID_UUID return INVALID_UUID
return uuid return uuid
def _checkOID(oid):
if oid == INVALID_OID:
return None
return oid
def _checkTID(tid):
if tid == INVALID_TID:
return None
return tid
def _checkPTID(ptid): def _checkPTID(ptid):
if ptid == INVALID_PTID: if ptid == INVALID_PTID:
return None return None
return ptid return ptid
def _encodePTID(ptid):
if ptid is None:
return INVALID_PTID
return ptid
def _readString(buffer, name, offset=0): def _readString(buffer, name, offset=0):
buffer = buffer[offset:] buffer = buffer[offset:]
(size, ) = unpack('!L', buffer[:4]) (size, ) = unpack('!L', buffer[:4])
...@@ -626,7 +621,10 @@ decode_table[ASK_LAST_IDS] = _decodeAskLastIDs ...@@ -626,7 +621,10 @@ decode_table[ASK_LAST_IDS] = _decodeAskLastIDs
@handle_errors @handle_errors
def _decodeAnswerLastIDs(body): def _decodeAnswerLastIDs(body):
return unpack('!8s8s8s', body) # (loid, ltid, lptid)
(loid, ltid, lptid) = unpack('!8s8s8s', body)
lptid = _checkPTID(lptid)
return (loid, ltid, lptid)
decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs
@handle_errors @handle_errors
...@@ -643,6 +641,7 @@ decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable ...@@ -643,6 +641,7 @@ decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable
def _decodeAnswerPartitionTable(body): def _decodeAnswerPartitionTable(body):
index = 12 index = 12
(ptid, n) = unpack('!8sL', body[:index]) (ptid, n) = unpack('!8sL', body[:index])
ptid = _checkPTID(ptid)
row_list = [] row_list = []
cell_list = [] cell_list = []
for i in xrange(n): for i in xrange(n):
...@@ -663,6 +662,7 @@ decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable ...@@ -663,6 +662,7 @@ decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
def _decodeSendPartitionTable(body): def _decodeSendPartitionTable(body):
index = 12 index = 12
(ptid, n,) = unpack('!8sL', body[:index]) (ptid, n,) = unpack('!8sL', body[:index])
ptid = _checkPTID(ptid)
row_list = [] row_list = []
cell_list = [] cell_list = []
for i in xrange(n): for i in xrange(n):
...@@ -682,6 +682,7 @@ decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable ...@@ -682,6 +682,7 @@ decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable
@handle_errors @handle_errors
def _decodeNotifyPartitionChanges(body): def _decodeNotifyPartitionChanges(body):
(ptid, n) = unpack('!8sL', body[:12]) (ptid, n) = unpack('!8sL', body[:12])
ptid = _checkPTID(ptid)
cell_list = [] cell_list = []
for i in xrange(n): for i in xrange(n):
(offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22]) (offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22])
...@@ -718,22 +719,26 @@ decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactio ...@@ -718,22 +719,26 @@ decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactio
@handle_errors @handle_errors
def _decodeAskObjectPresent(body): def _decodeAskObjectPresent(body):
return unpack('8s8s', body) # oid, tid (oid, tid) = unpack('8s8s', body)
return (oid, tid)
decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
@handle_errors @handle_errors
def _decodeAnswerObjectPresent(body): def _decodeAnswerObjectPresent(body):
return unpack('8s8s', body) # oid, tid (oid, tid) = unpack('8s8s', body)
return (oid, tid)
decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
@handle_errors @handle_errors
def _decodeDeleteTransaction(body): def _decodeDeleteTransaction(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction
@handle_errors @handle_errors
def _decodeCommitTransaction(body): def _decodeCommitTransaction(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
@handle_errors @handle_errors
...@@ -743,7 +748,8 @@ decode_table[ASK_NEW_TID] = _decodeAskNewTID ...@@ -743,7 +748,8 @@ decode_table[ASK_NEW_TID] = _decodeAskNewTID
@handle_errors @handle_errors
def _decodeAnswerNewTID(body): def _decodeAnswerNewTID(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
@handle_errors @handle_errors
...@@ -773,17 +779,20 @@ decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction ...@@ -773,17 +779,20 @@ decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction
@handle_errors @handle_errors
def _decodeNotifyTransactionFinished(body): def _decodeNotifyTransactionFinished(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
@handle_errors @handle_errors
def _decodeLockInformation(body): def _decodeLockInformation(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[LOCK_INFORMATION] = _decodeLockInformation decode_table[LOCK_INFORMATION] = _decodeLockInformation
@handle_errors @handle_errors
def _decodeNotifyInformationLocked(body): def _decodeNotifyInformationLocked(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
@handle_errors @handle_errors
...@@ -798,12 +807,14 @@ decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects ...@@ -798,12 +807,14 @@ decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects
@handle_errors @handle_errors
def _decodeUnlockInformation(body): def _decodeUnlockInformation(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation
@handle_errors @handle_errors
def _decodeAbortTransaction(body): def _decodeAbortTransaction(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction
@handle_errors @handle_errors
...@@ -816,7 +827,8 @@ decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject ...@@ -816,7 +827,8 @@ decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
@handle_errors @handle_errors
def _decodeAnswerStoreObject(body): def _decodeAnswerStoreObject(body):
return unpack('!B8s8s', body) # conflicting, oid, serial (conflicting, oid, serial) = unpack('!B8s8s', body)
return (conflicting, oid, serial)
decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
@handle_errors @handle_errors
...@@ -840,18 +852,26 @@ decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction ...@@ -840,18 +852,26 @@ decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
@handle_errors @handle_errors
def _decodeAnswerStoreTransaction(body): def _decodeAnswerStoreTransaction(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
@handle_errors @handle_errors
def _decodeAskObject(body): def _decodeAskObject(body):
return unpack('8s8s8s', body) # oid, serial, tid (oid, serial, tid) = unpack('8s8s8s', body)
if serial == INVALID_TID:
serial = None
if tid == INVALID_TID:
tid = None
return (oid, serial, tid)
decode_table[ASK_OBJECT] = _decodeAskObject decode_table[ASK_OBJECT] = _decodeAskObject
@handle_errors @handle_errors
def _decodeAnswerObject(body): def _decodeAnswerObject(body):
r = unpack('!8s8s8sBL', body[:29]) r = unpack('!8s8s8sBL', body[:29])
oid, serial_start, serial_end, compression, checksum = r oid, serial_start, serial_end, compression, checksum = r
if serial_end == INVALID_TID:
serial_end = None
(data, _) = _readString(body, 'data', offset=29) (data, _) = _readString(body, 'data', offset=29)
return (oid, serial_start, serial_end, compression, checksum, data) return (oid, serial_start, serial_end, compression, checksum, data)
decode_table[ANSWER_OBJECT] = _decodeAnswerObject decode_table[ANSWER_OBJECT] = _decodeAnswerObject
...@@ -873,7 +893,8 @@ decode_table[ANSWER_TIDS] = _decodeAnswerTIDs ...@@ -873,7 +893,8 @@ decode_table[ANSWER_TIDS] = _decodeAnswerTIDs
@handle_errors @handle_errors
def _decodeAskTransactionInformation(body): def _decodeAskTransactionInformation(body):
return unpack('8s', body) # tid (tid, ) = unpack('8s', body)
return (tid, )
decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
@handle_errors @handle_errors
...@@ -897,7 +918,8 @@ decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformati ...@@ -897,7 +918,8 @@ decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformati
@handle_errors @handle_errors
def _decodeAskObjectHistory(body): def _decodeAskObjectHistory(body):
return unpack('!8sQQ', body) # oid, first, last (oid, first, last) = unpack('!8sQQ', body)
return (oid, first, last)
decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
@handle_errors @handle_errors
...@@ -936,6 +958,7 @@ decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList ...@@ -936,6 +958,7 @@ decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList
def _decodeAnswerPartitionList(body): def _decodeAnswerPartitionList(body):
index = 12 index = 12
(ptid, n) = unpack('!8sL', body[:index]) (ptid, n) = unpack('!8sL', body[:index])
ptid = _checkPTID(ptid)
row_list = [] row_list = []
cell_list = [] cell_list = []
for i in xrange(n): for i in xrange(n):
...@@ -1118,6 +1141,13 @@ def askLastIDs(): ...@@ -1118,6 +1141,13 @@ def askLastIDs():
return Packet(ASK_LAST_IDS) return Packet(ASK_LAST_IDS)
def answerLastIDs(loid, ltid, lptid): def answerLastIDs(loid, ltid, lptid):
# XXX: this is a valid oid, an error should be returned instead of this
# packet when no last IDs are known
if loid is None:
loid = '\0' * 16
if ltid is None:
ltid = INVALID_TID
lptid = _encodePTID(lptid)
return Packet(ANSWER_LAST_IDS, loid + ltid + lptid) return Packet(ANSWER_LAST_IDS, loid + ltid + lptid)
def askPartitionTable(offset_list): def askPartitionTable(offset_list):
...@@ -1128,6 +1158,7 @@ def askPartitionTable(offset_list): ...@@ -1128,6 +1158,7 @@ def askPartitionTable(offset_list):
return Packet(ASK_PARTITION_TABLE, body) return Packet(ASK_PARTITION_TABLE, body)
def answerPartitionTable(ptid, row_list): def answerPartitionTable(ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))] body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list: for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list))) body.append(pack('!LL', offset, len(cell_list)))
...@@ -1138,6 +1169,7 @@ def answerPartitionTable(ptid, row_list): ...@@ -1138,6 +1169,7 @@ def answerPartitionTable(ptid, row_list):
return Packet(ANSWER_PARTITION_TABLE, body) return Packet(ANSWER_PARTITION_TABLE, body)
def sendPartitionTable(ptid, row_list): def sendPartitionTable(ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))] body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list: for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list))) body.append(pack('!LL', offset, len(cell_list)))
...@@ -1148,6 +1180,7 @@ def sendPartitionTable(ptid, row_list): ...@@ -1148,6 +1180,7 @@ def sendPartitionTable(ptid, row_list):
return Packet(SEND_PARTITION_TABLE, body) return Packet(SEND_PARTITION_TABLE, body)
def notifyPartitionChanges(ptid, cell_list): def notifyPartitionChanges(ptid, cell_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(cell_list))] body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
uuid = _encodeUUID(uuid) uuid = _encodeUUID(uuid)
...@@ -1240,19 +1273,32 @@ def answerStoreTransaction(tid): ...@@ -1240,19 +1273,32 @@ def answerStoreTransaction(tid):
return Packet(ANSWER_STORE_TRANSACTION, tid) return Packet(ANSWER_STORE_TRANSACTION, tid)
def askStoreObject(oid, serial, compression, checksum, data, tid): def askStoreObject(oid, serial, compression, checksum, data, tid):
if serial is None:
serial = INVALID_TID
body = pack('!8s8s8sBLL', oid, serial, tid, compression, body = pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data checksum, len(data)) + data
return Packet(ASK_STORE_OBJECT, body) return Packet(ASK_STORE_OBJECT, body)
def answerStoreObject(conflicting, oid, serial): def answerStoreObject(conflicting, oid, serial):
if serial is None:
serial = INVALID_TID
body = pack('!B8s8s', conflicting, oid, serial) body = pack('!B8s8s', conflicting, oid, serial)
return Packet(ANSWER_STORE_OBJECT, body) return Packet(ANSWER_STORE_OBJECT, body)
def askObject(oid, serial, tid): def askObject(oid, serial, tid):
if tid is None:
# tid can be unspecified
tid = INVALID_TID
if serial is None:
serial = INVALID_TID
return Packet(ASK_OBJECT, pack('!8s8s8s', oid, serial, tid)) return Packet(ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
def answerObject(oid, serial_start, serial_end, compression, def answerObject(oid, serial_start, serial_end, compression,
checksum, data): checksum, data):
if serial_start is None:
serial_start = INVALID_TID
if serial_end is None:
serial_end = INVALID_TID
body = pack('!8s8s8sBLL', oid, serial_start, serial_end, body = pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data compression, checksum, len(data)) + data
return Packet(ANSWER_OBJECT, body) return Packet(ANSWER_OBJECT, body)
...@@ -1283,9 +1329,8 @@ def askObjectHistory(oid, first, last): ...@@ -1283,9 +1329,8 @@ def askObjectHistory(oid, first, last):
def answerObjectHistory(oid, history_list): def answerObjectHistory(oid, history_list):
body = [pack('!8sL', oid, len(history_list))] body = [pack('!8sL', oid, len(history_list))]
# history_list is a list of tuple (serial, size) for serial, size in history_list:
for history_tuple in history_list: body.append(pack('!8sL', serial, size))
body.append(pack('!8sL', history_tuple[0], history_tuple[1]))
body = ''.join(body) body = ''.join(body)
return Packet(ANSWER_OBJECT_HISTORY, body) return Packet(ANSWER_OBJECT_HISTORY, body)
...@@ -1305,6 +1350,7 @@ def askPartitionList(min_offset, max_offset, uuid): ...@@ -1305,6 +1350,7 @@ def askPartitionList(min_offset, max_offset, uuid):
return Packet(ASK_PARTITION_LIST, body) return Packet(ASK_PARTITION_LIST, body)
def answerPartitionList(ptid, row_list): def answerPartitionList(ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))] body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list: for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list))) body.append(pack('!LL', offset, len(cell_list)))
......
...@@ -57,7 +57,7 @@ class PartitionTable(object): ...@@ -57,7 +57,7 @@ class PartitionTable(object):
"""This class manages a partition table.""" """This class manages a partition table."""
def __init__(self, num_partitions, num_replicas): def __init__(self, num_partitions, num_replicas):
self.id = protocol.INVALID_PTID self.id = None
self.np = num_partitions self.np = num_partitions
self.nr = num_replicas self.nr = num_replicas
self.num_filled_rows = 0 self.num_filled_rows = 0
......
...@@ -23,7 +23,7 @@ from collections import deque ...@@ -23,7 +23,7 @@ from collections import deque
from neo.config import ConfigurationManager from neo.config import ConfigurationManager
from neo import protocol from neo import protocol
from neo.protocol import TEMPORARILY_DOWN_STATE, INVALID_PTID, \ from neo.protocol import TEMPORARILY_DOWN_STATE, \
partition_cell_states, HIDDEN_STATE partition_cell_states, HIDDEN_STATE
from neo.node import NodeManager, MasterNode, StorageNode from neo.node import NodeManager, MasterNode, StorageNode
from neo.event import EventManager from neo.event import EventManager
...@@ -95,9 +95,7 @@ class Application(object): ...@@ -95,9 +95,7 @@ class Application(object):
dm.setName(self.name) dm.setName(self.name)
elif name != self.name: elif name != self.name:
raise RuntimeError('name does not match with the database') raise RuntimeError('name does not match with the database')
self.ptid = dm.getPTID() # return ptid or INVALID_PTID self.ptid = dm.getPTID()
if self.ptid == INVALID_PTID:
dm.setPTID(self.ptid)
logging.info("loaded configuration from db : uuid = %s, ptid = %s, name = %s, np = %s, nr = %s" \ logging.info("loaded configuration from db : uuid = %s, ptid = %s, name = %s, np = %s, nr = %s" \
%(dump(self.uuid), dump(self.ptid), name, num_partitions, num_replicas)) %(dump(self.uuid), dump(self.ptid), name, num_partitions, num_replicas))
......
...@@ -226,16 +226,9 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler): ...@@ -226,16 +226,9 @@ class BaseClientAndStorageOperationHandler(BaseStorageHandler):
app.queueEvent(self.handleAskObject, conn, packet, oid, app.queueEvent(self.handleAskObject, conn, packet, oid,
serial, tid) serial, tid)
return return
if serial == protocol.INVALID_SERIAL:
serial = None
if tid == protocol.INVALID_TID:
tid = None
o = app.dm.getObject(oid, serial, tid) o = app.dm.getObject(oid, serial, tid)
if o is not None: if o is not None:
serial, next_serial, compression, checksum, data = o serial, next_serial, compression, checksum, data = o
if next_serial is None:
next_serial = protocol.INVALID_SERIAL
logging.debug('oid = %s, serial = %s, next_serial = %s', logging.debug('oid = %s, serial = %s, next_serial = %s',
dump(oid), dump(serial), dump(next_serial)) dump(oid), dump(serial), dump(next_serial))
p = protocol.answerObject(oid, serial, next_serial, p = protocol.answerObject(oid, serial, next_serial,
......
...@@ -19,7 +19,7 @@ import logging ...@@ -19,7 +19,7 @@ import logging
from neo import protocol from neo import protocol
from neo.storage.handlers import BaseClientAndStorageOperationHandler from neo.storage.handlers import BaseClientAndStorageOperationHandler
from neo.protocol import INVALID_SERIAL, INVALID_TID, INVALID_PARTITION, \ from neo.protocol import INVALID_PARTITION, \
TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump from neo.util import dump
......
...@@ -19,8 +19,7 @@ import logging ...@@ -19,8 +19,7 @@ import logging
from neo import protocol from neo import protocol
from neo.storage.handlers import BaseMasterHandler from neo.storage.handlers import BaseMasterHandler
from neo.protocol import INVALID_SERIAL, INVALID_TID, INVALID_PARTITION, \ from neo.protocol import TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE
TEMPORARILY_DOWN_STATE, DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump from neo.util import dump
from neo.node import StorageNode from neo.node import StorageNode
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
......
...@@ -113,7 +113,7 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -113,7 +113,7 @@ class ReplicationHandler(BaseStorageHandler):
present_serial_list = app.dm.getSerialListPresent(oid, serial_list) present_serial_list = app.dm.getSerialListPresent(oid, serial_list)
serial_set = set(serial_list) - set(present_serial_list) serial_set = set(serial_list) - set(present_serial_list)
for serial in serial_set: for serial in serial_set:
conn.ask(protocol.askObject(oid, serial, protocol.INVALID_TID), timeout=300) conn.ask(protocol.askObject(oid, serial, None), timeout=300)
# And, ask more serials. # And, ask more serials.
app.replicator.serial_offset += 1000 app.replicator.serial_offset += 1000
......
...@@ -28,8 +28,8 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -28,8 +28,8 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
app = self.app app = self.app
oid = app.dm.getLastOID() or protocol.INVALID_OID oid = app.dm.getLastOID()
tid = app.dm.getLastTID() or protocol.INVALID_TID tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.ptid) p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.answer(p, packet) conn.answer(p, packet)
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
import logging import logging
from neo.storage.handlers import BaseMasterHandler from neo.storage.handlers import BaseMasterHandler
from neo.protocol import INVALID_OID, INVALID_TID, TEMPORARILY_DOWN_STATE from neo.protocol import TEMPORARILY_DOWN_STATE
from neo import protocol from neo import protocol
from neo.util import dump from neo.util import dump
from neo.node import StorageNode from neo.node import StorageNode
...@@ -29,8 +29,8 @@ class VerificationHandler(BaseMasterHandler): ...@@ -29,8 +29,8 @@ class VerificationHandler(BaseMasterHandler):
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
app = self.app app = self.app
oid = app.dm.getLastOID() or INVALID_OID oid = app.dm.getLastOID()
tid = app.dm.getLastTID() or INVALID_TID tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.ptid) p = protocol.answerLastIDs(oid, tid, app.ptid)
conn.answer(p, packet) conn.answer(p, packet)
......
...@@ -25,7 +25,7 @@ from struct import pack, unpack ...@@ -25,7 +25,7 @@ from struct import pack, unpack
from neo.storage.database import DatabaseManager from neo.storage.database import DatabaseManager
from neo.exception import DatabaseFailure from neo.exception import DatabaseFailure
from neo.protocol import DISCARDED_STATE, INVALID_PTID from neo.protocol import DISCARDED_STATE
LOG_QUERIES = False LOG_QUERIES = False
...@@ -243,10 +243,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -243,10 +243,7 @@ class MySQLDatabaseManager(DatabaseManager):
self.commit() self.commit()
def getPTID(self): def getPTID(self):
ptid = self.getConfiguration('ptid') return self.getConfiguration('ptid')
if ptid is None:
return INVALID_PTID
return ptid
def setPTID(self, ptid): def setPTID(self, ptid):
self.begin() self.begin()
......
...@@ -21,7 +21,7 @@ from random import choice ...@@ -21,7 +21,7 @@ from random import choice
from neo.storage import handlers from neo.storage import handlers
from neo import protocol from neo import protocol
from neo.protocol import STORAGE_NODE_TYPE, UP_TO_DATE_STATE, \ from neo.protocol import STORAGE_NODE_TYPE, UP_TO_DATE_STATE, \
OUT_OF_DATE_STATE, INVALID_TID, RUNNING_STATE OUT_OF_DATE_STATE, RUNNING_STATE
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo.util import dump from neo.util import dump
......
...@@ -27,7 +27,7 @@ from neo.storage.app import Application, StorageNode ...@@ -27,7 +27,7 @@ from neo.storage.app import Application, StorageNode
from neo.storage.handlers import InitializationHandler from neo.storage.handlers import InitializationHandler
from neo.protocol import STORAGE_NODE_TYPE, MASTER_NODE_TYPE, CLIENT_NODE_TYPE from neo.protocol import STORAGE_NODE_TYPE, MASTER_NODE_TYPE, CLIENT_NODE_TYPE
from neo.protocol import BROKEN_STATE, RUNNING_STATE, Packet, INVALID_UUID, \ from neo.protocol import BROKEN_STATE, RUNNING_STATE, Packet, INVALID_UUID, \
UP_TO_DATE_STATE, INVALID_OID, INVALID_TID, PROTOCOL_ERROR_CODE UP_TO_DATE_STATE, INVALID_TID, PROTOCOL_ERROR_CODE
from neo.protocol import ACCEPT_NODE_IDENTIFICATION, REQUEST_NODE_IDENTIFICATION, \ from neo.protocol import ACCEPT_NODE_IDENTIFICATION, REQUEST_NODE_IDENTIFICATION, \
NOTIFY_PARTITION_CHANGES, STOP_OPERATION, ASK_LAST_IDS, ASK_PARTITION_TABLE, \ NOTIFY_PARTITION_CHANGES, STOP_OPERATION, ASK_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_OBJECT_PRESENT, ASK_OBJECT_PRESENT, OID_NOT_FOUND_CODE, LOCK_INFORMATION, \ ANSWER_OBJECT_PRESENT, ASK_OBJECT_PRESENT, OID_NOT_FOUND_CODE, LOCK_INFORMATION, \
......
...@@ -19,7 +19,7 @@ import unittest, logging, os ...@@ -19,7 +19,7 @@ import unittest, logging, os
from mock import Mock from mock import Mock
from neo.tests.base import NeoTestBase from neo.tests.base import NeoTestBase
from neo.storage.app import Application from neo.storage.app import Application
from neo.protocol import INVALID_PTID, INVALID_OID, INVALID_TID, \ from neo.protocol import INVALID_PTID, INVALID_TID, \
INVALID_UUID, Packet, NOTIFY_NODE_INFORMATION, UP_TO_DATE_STATE INVALID_UUID, Packet, NOTIFY_NODE_INFORMATION, UP_TO_DATE_STATE
from neo.node import MasterNode, ClientNode, StorageNode from neo.node import MasterNode, ClientNode, StorageNode
from neo.storage.mysqldb import p64, u64, MySQLDatabaseManager from neo.storage.mysqldb import p64, u64, MySQLDatabaseManager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment