From ad0d2ccf7913d95cc55badb908713858ea6f6f1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9gory=20Wisniewski?= <gregory@nexedi.com> Date: Fri, 17 Jul 2009 09:19:29 +0000 Subject: [PATCH] Replace INVALID_UUID values with None out of protocol.py module to unify condition. Packet encoding and decoding handle None/INVALID_UUID mapping. git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@871 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/admin/app.py | 6 +-- neo/admin/handler.py | 13 +++--- neo/bootstrap.py | 2 +- neo/client/app.py | 8 ++-- neo/client/handlers/master.py | 16 +++---- neo/event.py | 2 +- neo/master/app.py | 20 ++++---- neo/master/handlers/__init__.py | 9 ++-- neo/master/handlers/client.py | 2 +- neo/master/handlers/election.py | 7 ++- neo/master/handlers/identification.py | 2 +- neo/master/handlers/recovery.py | 4 +- neo/master/handlers/secondary.py | 4 +- neo/master/handlers/shutdown.py | 4 +- neo/master/handlers/storage.py | 2 +- neo/master/handlers/verification.py | 3 +- neo/neoctl/app.py | 4 +- neo/node.py | 2 +- neo/protocol.py | 67 +++++++++++++++++++++++++-- neo/storage/app.py | 5 +- neo/storage/handlers/__init__.py | 12 ++--- neo/storage/handlers/hidden.py | 4 +- 22 files changed, 124 insertions(+), 74 deletions(-) diff --git a/neo/admin/app.py b/neo/admin/app.py index f5a99859..f8266884 100644 --- a/neo/admin/app.py +++ b/neo/admin/app.py @@ -19,7 +19,7 @@ import logging from time import time from neo.config import ConfigurationManager -from neo.protocol import INVALID_UUID, INVALID_PTID, MASTER_NODE_TYPE +from neo.protocol import INVALID_PTID, MASTER_NODE_TYPE from neo.node import NodeManager, MasterNode from neo.event import EventManager from neo.connection import ListeningConnection, ClientConnection @@ -71,7 +71,7 @@ class Application(object): # The partition table is initialized after getting the number of # partitions. self.pt = None - self.uuid = INVALID_UUID + self.uuid = None self.primary_master_node = None self.ptid = INVALID_PTID self.monitoring_handler = MasterMonitoringEventHandler(self) @@ -153,7 +153,7 @@ class Application(object): row = [] try: for cell in self.pt.getCellList(offset): - if uuid != INVALID_UUID and cell.getUUID() != uuid: + if uuid is not None and cell.getUUID() != uuid: continue else: row.append((cell.getUUID(), cell.getState())) diff --git a/neo/admin/handler.py b/neo/admin/handler.py index d9def7a8..6644bce2 100644 --- a/neo/admin/handler.py +++ b/neo/admin/handler.py @@ -18,8 +18,7 @@ import logging from neo.handler import EventHandler -from neo.protocol import INVALID_UUID, \ - MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ +from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ ADMIN_NODE_TYPE, TEMPORARILY_DOWN_STATE from neo.node import MasterNode, StorageNode, ClientNode, AdminNode from neo import protocol @@ -168,11 +167,11 @@ class MasterBaseEventHandler(EventHandler): addr = (ip_address, port) # Try to retrieve it from nm n = None - if uuid != INVALID_UUID: + if uuid is not None: n = nm.getNodeByUUID(uuid) if n is None: n = nm.getNodeByServer(addr) - if n is not None and uuid != INVALID_UUID: + if n is not None and uuid is not None: # node only exists by address, remove it nm.remove(n) n = None @@ -185,15 +184,15 @@ class MasterBaseEventHandler(EventHandler): if n is None: n = MasterNode(server = addr) nm.add(n) - if uuid != INVALID_UUID: + if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if n.getUUID() is None: n.setUUID(uuid) else: - n.setUUID(INVALID_UUID) + n.setUUID(None) elif node_type in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE): - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue if n is None: diff --git a/neo/bootstrap.py b/neo/bootstrap.py index 36cc9b90..4bf6c211 100644 --- a/neo/bootstrap.py +++ b/neo/bootstrap.py @@ -32,7 +32,7 @@ class BootstrapManager(EventHandler): Manage the bootstrap stage, lookup for the primary master then connect to it """ - def __init__(self, app, name, node_type, uuid=protocol.INVALID_UUID, server=NO_SERVER): + def __init__(self, app, name, node_type, uuid=None, server=NO_SERVER): EventHandler.__init__(self, app) self.primary = None self.server = server diff --git a/neo/client/app.py b/neo/client/app.py index 049ef4a4..a5cf19ef 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -26,7 +26,7 @@ from neo.client.mq import MQ from neo.node import NodeManager, MasterNode, StorageNode from neo.connection import MTClientConnection from neo import protocol -from neo.protocol import INVALID_UUID, INVALID_TID, INVALID_PARTITION, \ +from neo.protocol import INVALID_TID, INVALID_PARTITION, \ INVALID_PTID, CLIENT_NODE_TYPE, INVALID_SERIAL, \ DOWN_STATE, HIDDEN_STATE from neo.client.handlers.master import PrimaryBootstrapHandler, \ @@ -252,7 +252,7 @@ class Application(object): master_node_list.append(server) self.nm.add(MasterNode(server=server)) # no self-assigned UUID, primary master will supply us one - self.uuid = INVALID_UUID + self.uuid = None self.mq_cache = MQ() self.new_oid_list = [] self.ptid = INVALID_PTID @@ -463,7 +463,7 @@ class Application(object): # (...per client) # - have a sleep in the code (yuck !) sleep(5) - if self.uuid != INVALID_UUID: + if self.uuid is not None: # TODO: pipeline those 2 requests # This is currently impossible because _waitMessage can only # wait on one message at a time @@ -481,7 +481,7 @@ class Application(object): finally: conn.unlock() self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler) - ready = self.uuid != INVALID_UUID and self.pt is not None \ + ready = self.uuid is not None and self.pt is not None \ and self.pt.operational() logging.info("connected to primary master node %s" % self.primary_master_node) return conn diff --git a/neo/client/handlers/master.py b/neo/client/handlers/master.py index 503bc40c..862c8568 100644 --- a/neo/client/handlers/master.py +++ b/neo/client/handlers/master.py @@ -19,7 +19,7 @@ import logging from neo.client.handlers import BaseHandler, AnswerBaseHandler from neo.protocol import MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ - INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE + RUNNING_STATE, TEMPORARILY_DOWN_STATE from neo.node import MasterNode, StorageNode from neo.pt import MTPartitionTable as PartitionTable from neo.util import dump @@ -63,7 +63,7 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): conn.setUUID(uuid) node.setUUID(uuid) - if your_uuid != INVALID_UUID: + if your_uuid is not None: # got an uuid from the primary master app.uuid = your_uuid @@ -80,13 +80,13 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): if n is None: n = MasterNode(server = addr) app.nm.add(n) - if uuid != INVALID_UUID: + if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if n.getUUID() is None or n.getUUID() != uuid: n.setUUID(uuid) - if primary_uuid != INVALID_UUID: + if primary_uuid is not None: primary_node = app.nm.getNodeByUUID(primary_uuid) if primary_node is None: # I don't know such a node. Probably this information @@ -236,11 +236,11 @@ class PrimaryNotificationsHandler(BaseHandler): addr = (ip_address, port) # Try to retrieve it from nm n = None - if uuid != INVALID_UUID: + if uuid is not None: n = nm.getNodeByUUID(uuid) if n is None: n = nm.getNodeByServer(addr) - if n is not None and uuid != INVALID_UUID: + if n is not None and uuid is not None: # node only exists by address, remove it nm.remove(n) n = None @@ -253,13 +253,13 @@ class PrimaryNotificationsHandler(BaseHandler): if n is None: n = MasterNode(server = addr) nm.add(n) - if uuid != INVALID_UUID: + if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if n.getUUID() is None: n.setUUID(uuid) elif node_type == STORAGE_NODE_TYPE: - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue if n is None: diff --git a/neo/event.py b/neo/event.py index 43483a18..5ae0f24b 100644 --- a/neo/event.py +++ b/neo/event.py @@ -200,7 +200,7 @@ class EpollEventManager(object): """ Return the connection associated to the UUID, None if the UUID is None, invalid or not found""" # FIXME: We may optimize this by using use a dict on UUIDs - if uuid in (None, protocol.INVALID_UUID): + if uuid is None: return None for conn in self.connection_dict.values(): if conn.getUUID() == uuid: diff --git a/neo/master/app.py b/neo/master/app.py index 3f9be329..2e18316b 100644 --- a/neo/master/app.py +++ b/neo/master/app.py @@ -24,7 +24,7 @@ from neo.config import ConfigurationManager from neo import protocol from neo.protocol import \ RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ - INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \ + INVALID_OID, INVALID_TID, INVALID_PTID, \ CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \ UUID_NAMESPACES, ADMIN_NODE_TYPE, BOOTING from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode @@ -341,7 +341,7 @@ class Application(object): except TypeError: ip_address, port = '0.0.0.0', 0 node_list.append((n.getNodeType(), ip_address, port, - n.getUUID() or INVALID_UUID, n.getState())) + n.getUUID(), n.getState())) # Split the packet if too huge. if len(node_list) == 10000: conn.notify(protocol.notifyNodeInformation(node_list)) @@ -709,7 +709,7 @@ class Application(object): def getNewUUID(self, node_type): # build an UUID uuid = os.urandom(15) - while uuid == INVALID_UUID[1:]: + while uuid == '\00' * 15: # XXX: INVALID_UUID[1:] uuid = os.urandom(15) # look for the prefix prefix = UUID_NAMESPACES.get(node_type, None) @@ -721,7 +721,7 @@ class Application(object): node = self.nm.getNodeByUUID(uuid) if node is not None and node.getServer() is not None and node.getServer() != addr: return False - return uuid != self.uuid and uuid != INVALID_UUID + return uuid != self.uuid and uuid is not None def getClusterState(self): return self.cluster_state @@ -762,26 +762,26 @@ class Application(object): def identifyStorageNode(self, uuid, node): # TODO: check all cases here, when server address change... # in verification and running states, if the node is unknown but the - # uuid != INVALID_UUID, we have to give it a new uuid, but in recovery + # uuid is not None, we have to give it a new uuid, but in recovery # the node must keep it's UUID state = protocol.RUNNING_STATE handler = None if self.cluster_state == protocol.RECOVERING: - if uuid == protocol.INVALID_UUID: + if uuid is None: logging.info('reject empty storage node') raise protocol.NotReadyError handler = handlers.RecoveryHandler elif self.cluster_state == protocol.VERIFYING: - if uuid == INVALID_UUID or node is None: + if uuid is None or node is None: # if node is unknown, it has been forget when the current # partition was validated by the admin - uuid = INVALID_UUID + uuid = None state = protocol.PENDING_STATE handler = handlers.VerificationHandler elif self.cluster_state == protocol.RUNNING: - if uuid == INVALID_UUID or node is None: + if uuid is None or node is None: # same as for verification - uuid = INVALID_UUID + uuid = None state = protocol.PENDING_STATE handler = handlers.StorageServiceHandler elif self.cluster_state == protocol.STOPPING: diff --git a/neo/master/handlers/__init__.py b/neo/master/handlers/__init__.py index 487051ee..14b0d36a 100644 --- a/neo/master/handlers/__init__.py +++ b/neo/master/handlers/__init__.py @@ -118,19 +118,18 @@ class MasterHandler(EventHandler): elif app.primary_master_node is not None: primary_uuid = app.primary_master_node.getUUID() else: - primary_uuid = protocol.INVALID_UUID + primary_uuid = None known_master_list = [app.server + (app.uuid, )] for n in app.nm.getMasterNodeList(): if n.getState() == protocol.BROKEN_STATE: continue - known_master_list.append(n.getServer() + \ - (n.getUUID() or protocol.INVALID_UUID, )) + known_master_list.append(n.getServer() + (n.getUUID(), )) conn.answer(protocol.answerPrimaryMaster(primary_uuid, known_master_list), packet) def handleAskClusterState(self, conn, packet): - assert conn.getUUID() != protocol.INVALID_UUID + assert conn.getUUID() is not None state = self.app.getClusterState() conn.answer(protocol.answerClusterState(state), packet) @@ -155,7 +154,7 @@ class BaseServiceHandler(MasterHandler): # No interest. continue - if uuid == protocol.INVALID_UUID: + if uuid is None: # No interest. continue diff --git a/neo/master/handlers/client.py b/neo/master/handlers/client.py index fb6e903b..3497e36f 100644 --- a/neo/master/handlers/client.py +++ b/neo/master/handlers/client.py @@ -22,7 +22,7 @@ from neo.protocol import CLIENT_NODE_TYPE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \ STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \ - HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE + HIDDEN_STATE, INTERNAL_ERROR_CODE from neo.master.handlers import BaseServiceHandler from neo.protocol import UnexpectedPacketError from neo.util import dump diff --git a/neo/master/handlers/election.py b/neo/master/handlers/election.py index bab23316..6805a053 100644 --- a/neo/master/handlers/election.py +++ b/neo/master/handlers/election.py @@ -23,7 +23,6 @@ from neo.protocol import MASTER_NODE_TYPE, \ DOWN_STATE from neo.master.handlers import MasterHandler from neo.exception import ElectionFailure -from neo.protocol import INVALID_UUID from neo.node import MasterNode class ElectionHandler(MasterHandler): @@ -51,7 +50,7 @@ class ElectionHandler(MasterHandler): app.nm.add(node) app.unconnected_master_node_set.add(addr) - if uuid != INVALID_UUID: + if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if node.getUUID() is None: @@ -179,13 +178,13 @@ class ClientElectionHandler(MasterHandler): app.nm.add(n) app.unconnected_master_node_set.add(addr) - if uuid != INVALID_UUID: + if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if n.getUUID() is None or n.getUUID() != uuid: n.setUUID(uuid) - if primary_uuid != INVALID_UUID: + if primary_uuid is not None: # The primary master is defined. if app.primary_master_node is not None \ and app.primary_master_node.getUUID() != primary_uuid: diff --git a/neo/master/handlers/identification.py b/neo/master/handlers/identification.py index 33e1607a..cda07604 100644 --- a/neo/master/handlers/identification.py +++ b/neo/master/handlers/identification.py @@ -63,7 +63,7 @@ class IdentificationHandler(MasterHandler): # ask the app the node identification, if refused, an exception is raised result = self.app.identifyNode(node_type, uuid, node) (uuid, node, state, handler, klass) = result - if uuid == protocol.INVALID_UUID: + if uuid is None: # no valid uuid, give it one uuid = app.getNewUUID(node_type) if node is None: diff --git a/neo/master/handlers/recovery.py b/neo/master/handlers/recovery.py index 6c7c2c80..5a20d859 100644 --- a/neo/master/handlers/recovery.py +++ b/neo/master/handlers/recovery.py @@ -21,7 +21,7 @@ from neo import protocol from neo.protocol import RUNNING_STATE, BROKEN_STATE, \ TEMPORARILY_DOWN_STATE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE from neo.master.handlers import MasterHandler -from neo.protocol import UnexpectedPacketError, INVALID_UUID, INVALID_PTID +from neo.protocol import UnexpectedPacketError, INVALID_PTID from neo.node import StorageNode from neo.util import dump @@ -39,7 +39,7 @@ class RecoveryHandler(MasterHandler): # No interest. continue - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue diff --git a/neo/master/handlers/secondary.py b/neo/master/handlers/secondary.py index 367eb4f1..78895c67 100644 --- a/neo/master/handlers/secondary.py +++ b/neo/master/handlers/secondary.py @@ -21,7 +21,7 @@ from neo.protocol import MASTER_NODE_TYPE, \ RUNNING_STATE, BROKEN_STATE, DOWN_STATE from neo.master.handlers import MasterHandler from neo.exception import ElectionFailure, PrimaryFailure -from neo.protocol import UnexpectedPacketError, INVALID_UUID +from neo.protocol import UnexpectedPacketError from neo.node import MasterNode class SecondaryMasterHandler(MasterHandler): @@ -79,7 +79,7 @@ class PrimaryMasterHandler(MasterHandler): n = MasterNode(server = addr) app.nm.add(n) - if uuid != INVALID_UUID: + if uuid is not None: # If I don't know the UUID yet, believe what the peer # told me at the moment. if n.getUUID() is None: diff --git a/neo/master/handlers/shutdown.py b/neo/master/handlers/shutdown.py index e815fecb..c117913b 100644 --- a/neo/master/handlers/shutdown.py +++ b/neo/master/handlers/shutdown.py @@ -17,7 +17,7 @@ import logging from neo import protocol -from neo.protocol import CLIENT_NODE_TYPE, ADMIN_NODE_TYPE, INVALID_UUID, \ +from neo.protocol import CLIENT_NODE_TYPE, ADMIN_NODE_TYPE, \ RUNNING_STATE, STORAGE_NODE_TYPE, TEMPORARILY_DOWN_STATE, STOPPING from neo.master.handlers import BaseServiceHandler from neo import decorators @@ -60,7 +60,7 @@ class ShutdownHandler(BaseServiceHandler): # No interest. continue - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue diff --git a/neo/master/handlers/storage.py b/neo/master/handlers/storage.py index bdb890d0..09f76de1 100644 --- a/neo/master/handlers/storage.py +++ b/neo/master/handlers/storage.py @@ -22,7 +22,7 @@ from neo.protocol import CLIENT_NODE_TYPE, \ RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \ UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \ STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \ - HIDDEN_STATE, INVALID_UUID, INTERNAL_ERROR_CODE + HIDDEN_STATE, INTERNAL_ERROR_CODE from neo.master.handlers import BaseServiceHandler from neo.protocol import UnexpectedPacketError from neo.exception import OperationFailure diff --git a/neo/master/handlers/verification.py b/neo/master/handlers/verification.py index 430c8dac..5a21d1f2 100644 --- a/neo/master/handlers/verification.py +++ b/neo/master/handlers/verification.py @@ -21,7 +21,6 @@ from neo.protocol import CLIENT_NODE_TYPE, RUNNING_STATE, BROKEN_STATE, \ TEMPORARILY_DOWN_STATE, ADMIN_NODE_TYPE from neo.master.handlers import MasterHandler from neo.exception import VerificationFailure -from neo.protocol import INVALID_UUID from neo.util import dump class VerificationHandler(MasterHandler): @@ -41,7 +40,7 @@ class VerificationHandler(MasterHandler): # No interest. continue - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue diff --git a/neo/neoctl/app.py b/neo/neoctl/app.py index 9463ef44..07439b60 100644 --- a/neo/neoctl/app.py +++ b/neo/neoctl/app.py @@ -18,7 +18,7 @@ import logging from neo.protocol import node_types, node_states -from neo.protocol import INVALID_UUID, INVALID_PTID +from neo.protocol import INVALID_PTID from neo.event import EventManager from neo.connection import ClientConnection from neo.exception import OperationFailure @@ -118,7 +118,7 @@ def printPTAction(options): if len(options): uuid = bin(options.pop(0)) else: - uuid = INVALID_UUID + uuid = None return protocol.askPartitionList(min_offset, max_offset, uuid) action_dict = { diff --git a/neo/node.py b/neo/node.py index 92704cf9..bbb6a6f2 100644 --- a/neo/node.py +++ b/neo/node.py @@ -165,7 +165,7 @@ class NodeManager(object): return self.server_dict.get(server) def getNodeByUUID(self, uuid): - if uuid in (None, protocol.INVALID_UUID): + if uuid is None: return None return self.uuid_dict.get(uuid) diff --git a/neo/protocol.py b/neo/protocol.py index 1ab6d72a..d04cc27c 100644 --- a/neo/protocol.py +++ b/neo/protocol.py @@ -501,6 +501,31 @@ def _checkNodeType(type): raise PacketMalformedError('invalide node type %d' % type) return node_type +def _checkUUID(uuid): + if uuid == INVALID_UUID: + return None + return uuid + +def _encodeUUID(uuid): + if uuid is None: + return INVALID_UUID + return uuid + +def _checkOID(oid): + if oid == INVALID_OID: + return None + return oid + +def _checkTID(tid): + if tid == INVALID_TID: + return None + return tid + +def _checkPTID(ptid): + if ptid == INVALID_PTID: + return None + return ptid + def _readString(buffer, name, offset=0): buffer = buffer[offset:] (size, ) = unpack('!L', buffer[:4]) @@ -534,6 +559,7 @@ def _decodeRequestNodeIdentification(body): ip_address = inet_ntoa(ip_address) (name, _) = _readString(body, 'name', offset=32) node_type = _checkNodeType(node_type) + uuid = _checkUUID(uuid) if (major, minor) != PROTOCOL_VERSION: raise PacketMalformedError('protocol version mismatch') return node_type, uuid, ip_address, port, name @@ -545,6 +571,8 @@ def _decodeAcceptNodeIdentification(body): node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid = r ip_address = inet_ntoa(ip_address) node_type = _checkNodeType(node_type) + uuid = _checkUUID(uuid) + your_uuid == _checkUUID(uuid) return (node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid) decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification @@ -560,7 +588,9 @@ def _decodeAnswerPrimaryMaster(body): for i in xrange(n): ip_address, port, uuid = unpack('!4sH16s', body[20+i*22:42+i*22]) ip_address = inet_ntoa(ip_address) + uuid = _checkUUID(uuid) known_master_list.append((ip_address, port, uuid)) + primary_uuid = _checkUUID(primary_uuid) return (primary_uuid, known_master_list) decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster @@ -584,6 +614,7 @@ def _decodeNotifyNodeInformation(body): ip_address = inet_ntoa(ip_address) node_type = _checkNodeType(node_type) state = _checkNodeState(state) + uuid = _checkUUID(uuid) node_list.append((node_type, ip_address, port, uuid, state)) return (node_list,) decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation @@ -621,6 +652,7 @@ def _decodeAnswerPartitionTable(body): uuid, state = unpack('!16sH', body[index:index+18]) index += 18 state = partition_cell_states.get(state) + uuid = _checkUUID(uuid) cell_list.append((uuid, state)) row_list.append((offset, tuple(cell_list))) del cell_list[:] @@ -640,6 +672,7 @@ def _decodeSendPartitionTable(body): uuid, state = unpack('!16sH', body[index:index+18]) index += 18 state = partition_cell_states.get(state) + uuid = _checkUUID(uuid) cell_list.append((uuid, state)) row_list.append((offset, tuple(cell_list))) del cell_list[:] @@ -653,6 +686,7 @@ def _decodeNotifyPartitionChanges(body): for i in xrange(n): (offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22]) state = partition_cell_states.get(state) + uuid = _checkUUID(uuid) cell_list.append((offset, uuid, state)) return ptid, cell_list decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges @@ -893,7 +927,9 @@ decode_table[ANSWER_OIDS] = _decodeAnswerOIDs @handle_errors def _decodeAskPartitionList(body): - return unpack('!LL16s', body) # min_offset, max_offset, uuid + (min_offset, max_offset, uuid) = unpack('!LL16s', body) + uuid = _checkUUID(uuid) + return (min_offset, max_offset, uuid) decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList @handle_errors @@ -909,6 +945,7 @@ def _decodeAnswerPartitionList(body): uuid, state = unpack('!16sH', body[index:index+18]) index += 18 state = partition_cell_states.get(state) + uuid = _checkUUID(uuid) cell_list.append((uuid, state)) row_list.append((offset, tuple(cell_list))) del cell_list[:] @@ -932,6 +969,7 @@ def _decodeAnswerNodeList(body): ip_address = inet_ntoa(ip_address) node_type = _checkNodeType(node_type) state = _checkNodeState(state) + uuid = _checkUUID(uuid) node_list.append((node_type, ip_address, port, uuid, state)) return (node_list,) decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList @@ -940,6 +978,7 @@ decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList def _decodeSetNodeState(body): (uuid, state, modify) = unpack('!16sHB', body) state = _checkNodeState(state) + uuid = _checkUUID(uuid) return (uuid, state, modify) decode_table[SET_NODE_STATE] = _decodeSetNodeState @@ -947,6 +986,7 @@ decode_table[SET_NODE_STATE] = _decodeSetNodeState def _decodeAnswerNodeState(body): (uuid, state) = unpack('!16sH', body) state = _checkNodeState(state) + uuid = _checkUUID(uuid) return (uuid, state) decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState @@ -954,6 +994,7 @@ decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState def _decodeAddPendingNodes(body): (n, ) = unpack('!H', body[:2]) uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)] + uuid_list = map(_checkUUID, uuid_list) return (uuid_list, ) decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes @@ -961,6 +1002,7 @@ decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes def _decodeAnswerNewNodes(body): (n, ) = unpack('!H', body[:2]) uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)] + uuid_list = map(_checkUUID, uuid_list) return (uuid_list, ) decode_table[ANSWER_NEW_NODES] = _decodeAnswerNewNodes @@ -1031,12 +1073,15 @@ def pong(): return Packet(PONG) def requestNodeIdentification(node_type, uuid, ip_address, port, name): + uuid = _encodeUUID(uuid) body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1], node_type, uuid, inet_aton(ip_address), port, len(name)) + name return Packet(REQUEST_NODE_IDENTIFICATION, body) def acceptNodeIdentification(node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid): + uuid = _encodeUUID(uuid) + your_uuid = _encodeUUID(your_uuid) body = pack('!H16s4sHLL16s', node_type, uuid, inet_aton(ip_address), port, num_partitions, num_replicas, your_uuid) @@ -1046,9 +1091,11 @@ def askPrimaryMaster(): return Packet(ASK_PRIMARY_MASTER) def answerPrimaryMaster(primary_uuid, known_master_list): + primary_uuid = _encodeUUID(primary_uuid) body = [primary_uuid, pack('!L', len(known_master_list))] - for master in known_master_list: - body.append(pack('!4sH16s', inet_aton(master[0]), master[1], master[2])) + for address, port, uuid in known_master_list: + uuid = _encodeUUID(uuid) + body.append(pack('!4sH16s', inet_aton(address), port, uuid)) body = ''.join(body) return Packet(ANSWER_PRIMARY_MASTER, body) @@ -1061,6 +1108,7 @@ def reelectPrimaryMaster(): def notifyNodeInformation(node_list): body = [pack('!L', len(node_list))] for node_type, ip_address, port, uuid, state in node_list: + uuid = _encodeUUID(uuid) body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port, uuid, state)) body = ''.join(body) @@ -1084,6 +1132,7 @@ def answerPartitionTable(ptid, row_list): for offset, cell_list in row_list: body.append(pack('!LL', offset, len(cell_list))) for uuid, state in cell_list: + uuid = _encodeUUID(uuid) body.append(pack('!16sH', uuid, state)) body = ''.join(body) return Packet(ANSWER_PARTITION_TABLE, body) @@ -1093,6 +1142,7 @@ def sendPartitionTable(ptid, row_list): for offset, cell_list in row_list: body.append(pack('!LL', offset, len(cell_list))) for uuid, state in cell_list: + uuid = _encodeUUID(uuid) body.append(pack('!16sH', uuid, state)) body = ''.join(body) return Packet(SEND_PARTITION_TABLE, body) @@ -1100,6 +1150,7 @@ def sendPartitionTable(ptid, row_list): def notifyPartitionChanges(ptid, cell_list): body = [pack('!8sL', ptid, len(cell_list))] for offset, uuid, state in cell_list: + uuid = _encodeUUID(uuid) body.append(pack('!L16sH', offset, uuid, state)) body = ''.join(body) return Packet(NOTIFY_PARTITION_CHANGES, body) @@ -1248,6 +1299,7 @@ def answerOIDs(oid_list): return Packet(ANSWER_OIDS, body) def askPartitionList(min_offset, max_offset, uuid): + uuid = _encodeUUID(uuid) body = [pack('!LL16s', min_offset, max_offset, uuid)] body = ''.join(body) return Packet(ASK_PARTITION_LIST, body) @@ -1257,6 +1309,7 @@ def answerPartitionList(ptid, row_list): for offset, cell_list in row_list: body.append(pack('!LL', offset, len(cell_list))) for uuid, state in cell_list: + uuid = _encodeUUID(uuid) body.append(pack('!16sH', uuid, state)) body = ''.join(body) return Packet(ANSWER_PARTITION_LIST, body) @@ -1269,30 +1322,33 @@ def askNodeList(node_type): def answerNodeList(node_list): body = [pack('!L', len(node_list))] for node_type, ip_address, port, uuid, state in node_list: + uuid = _encodeUUID(uuid) body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port, uuid, state)) body = ''.join(body) return Packet(ANSWER_NODE_LIST, body) def setNodeState(uuid, state, modify_partition_table): + uuid = _encodeUUID(uuid) body = [pack('!16sHB', uuid, state, modify_partition_table)] body = ''.join(body) return Packet(SET_NODE_STATE, body) def answerNodeState(uuid, state): + uuid = _encodeUUID(uuid) body = [pack('!16sH', uuid, state)] body = ''.join(body) return Packet(ANSWER_NODE_STATE, body) def addPendingNodes(uuid_list=()): # an empty list means all current pending nodes - uuid_list = [pack('!16s', uuid) for uuid in uuid_list] + uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list] body = pack('!H', len(uuid_list)) + ''.join(uuid_list) return Packet(ADD_PENDING_NODES, body) def answerNewNodes(uuid_list): # an empty list means no new nodes - uuid_list = [pack('!16s', uuid) for uuid in uuid_list] + uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list] body = pack('!H', len(uuid_list)) + ''.join(uuid_list) return Packet(ANSWER_NEW_NODES, body) @@ -1303,6 +1359,7 @@ def answerNodeInformation(node_list): # XXX: copy-paste from notifyNodeInformation body = [pack('!L', len(node_list))] for node_type, ip_address, port, uuid, state in node_list: + uuid = _encodeUUID(uuid) body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port, uuid, state)) body = ''.join(body) diff --git a/neo/storage/app.py b/neo/storage/app.py index 9d8cf938..80c794d5 100644 --- a/neo/storage/app.py +++ b/neo/storage/app.py @@ -23,7 +23,7 @@ from collections import deque from neo.config import ConfigurationManager from neo import protocol -from neo.protocol import TEMPORARILY_DOWN_STATE, INVALID_UUID, INVALID_PTID, \ +from neo.protocol import TEMPORARILY_DOWN_STATE, INVALID_PTID, \ partition_cell_states, HIDDEN_STATE from neo.node import NodeManager, MasterNode, StorageNode from neo.event import EventManager @@ -81,9 +81,6 @@ class Application(object): dm = self.dm self.uuid = dm.getUUID() - if self.uuid is None: - self.uuid = INVALID_UUID - num_partitions = dm.getNumPartitions() num_replicas = dm.getNumReplicas() diff --git a/neo/storage/handlers/__init__.py b/neo/storage/handlers/__init__.py index e215b468..3de3bfe5 100644 --- a/neo/storage/handlers/__init__.py +++ b/neo/storage/handlers/__init__.py @@ -20,7 +20,7 @@ import logging from neo.handler import EventHandler from neo import protocol from neo.protocol import Packet, UnexpectedPacketError, \ - INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ + RUNNING_STATE, BROKEN_STATE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE from neo.util import dump @@ -104,11 +104,11 @@ class BaseMasterHandler(BaseStorageHandler): addr = (ip_address, port) # Try to retrieve it from nm n = None - if uuid != INVALID_UUID: + if uuid is not None: n = app.nm.getNodeByUUID(uuid) if n is None: n = app.nm.getNodeByServer(addr) - if n is not None and uuid != INVALID_UUID: + if n is not None and uuid is not None: # node only exists by address, remove it app.nm.remove(n) n = None @@ -127,12 +127,12 @@ class BaseMasterHandler(BaseStorageHandler): app.nm.add(n) n.setState(state) - if uuid != INVALID_UUID: + if uuid is not None: if n.getUUID() is None: n.setUUID(uuid) elif node_type == STORAGE_NODE_TYPE: - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue @@ -155,7 +155,7 @@ class BaseMasterHandler(BaseStorageHandler): n.setState(state) elif node_type == CLIENT_NODE_TYPE: - if uuid == INVALID_UUID: + if uuid is None: # No interest. continue diff --git a/neo/storage/handlers/hidden.py b/neo/storage/handlers/hidden.py index 6b1d3a7d..718c8aa6 100644 --- a/neo/storage/handlers/hidden.py +++ b/neo/storage/handlers/hidden.py @@ -19,7 +19,7 @@ import logging from neo.storage.handlers import BaseMasterHandler from neo.protocol import Packet, \ - INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ + RUNNING_STATE, BROKEN_STATE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE, \ DISCARDED_STATE, OUT_OF_DATE_STATE, UnexpectedPacketError @@ -51,7 +51,7 @@ class HiddenHandler(BaseMasterHandler): for node_type, ip_address, port, uuid, state in node_list: if node_type == STORAGE_NODE_TYPE: - if uuid == INVALID_UUID: + if uuid == None: # No interest. continue -- 2.30.9