Commit 831a20d8 authored by Aurel's avatar Aurel

defined a master node list in zope.conf instead of just one master node


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@105 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 2323b91a
...@@ -21,7 +21,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -21,7 +21,7 @@ class NEOStorage(BaseStorage.BaseStorage,
__name__ = 'NEOStorage' __name__ = 'NEOStorage'
def __init__(self, master_addr, master_port, name, read_only=False, **kw): def __init__(self, master_nodes, name, read_only=False, **kw):
self._is_read_only = read_only self._is_read_only = read_only
# Transaction must be under protection of lock # Transaction must be under protection of lock
l = Lock() l = Lock()
...@@ -42,7 +42,7 @@ class NEOStorage(BaseStorage.BaseStorage, ...@@ -42,7 +42,7 @@ class NEOStorage(BaseStorage.BaseStorage,
dispatcher.start() dispatcher.start()
# Import here to prevent recursive import # Import here to prevent recursive import
from neo.client.app import Application from neo.client.app import Application
self.app = Application(master_addr, master_port, name, em, dispatcher, self.app = Application(master_nodes, name, em, dispatcher,
message_queue, request_queue) message_queue, request_queue)
def load(self, oid, version=None): def load(self, oid, version=None):
......
...@@ -109,11 +109,10 @@ class ConnectionManager(object): ...@@ -109,11 +109,10 @@ class ConnectionManager(object):
class Application(ThreadingMixIn, object): class Application(ThreadingMixIn, object):
"""The client node application.""" """The client node application."""
def __init__(self, master_addr, master_port, name, em, dispatcher, message_queue, def __init__(self, master_nodes, name, em, dispatcher, message_queue,
request_queue, **kw): request_queue, **kw):
logging.basicConfig(level = logging.DEBUG) logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address is %s, port is %d' %(master_addr, logging.debug('master node address are %s' %(master_nodes,))
master_port))
# Internal Attributes common to all thread # Internal Attributes common to all thread
self.name = name self.name = name
self.em = em self.em = em
...@@ -163,11 +162,11 @@ class Application(ThreadingMixIn, object): ...@@ -163,11 +162,11 @@ class Application(ThreadingMixIn, object):
break break
self.uuid = uuid self.uuid = uuid
# Connect to primary master node # Connect to primary master node
defined_master_addr = (master_addr, master_port) self.master_node_list = master_nodes.split(' ')
while 1: while 1:
self.node_not_ready = 0 self.node_not_ready = 0
logging.debug("trying to connect to primary master...") logging.debug("trying to connect to primary master...")
self.connectToPrimaryMasterNode(defined_master_addr) self.connectToPrimaryMasterNode()
if not self.node_not_ready and self.pt.filled(): if not self.node_not_ready and self.pt.filled():
# got a connection and partition table # got a connection and partition table
break break
...@@ -203,21 +202,22 @@ class Application(ThreadingMixIn, object): ...@@ -203,21 +202,22 @@ class Application(ThreadingMixIn, object):
global_message[0].handler.dispatch(global_message[0], global_message[1]) global_message[0].handler.dispatch(global_message[0], global_message[1])
def connectToPrimaryMasterNode(self, defined_master_addr): def connectToPrimaryMasterNode(self):
"""Connect to the primary master node.""" """Connect to the primary master node."""
addr, port = self.master_node_list[0].split(':')
port = int(port)
handler = ClientEventHandler(self, self.dispatcher) handler = ClientEventHandler(self, self.dispatcher)
n = MasterNode(server = defined_master_addr) n = MasterNode(server = (addr, port))
self.nm.add(n) self.nm.add(n)
# Connect to defined master node and get primary master node # Connect to first master node defined and get primary master node
self.local_var.tmp_q = Queue(1) self.local_var.tmp_q = Queue(1)
if self.primary_master_node is None: if self.primary_master_node is None:
conn = ClientConnection(self.em, handler, defined_master_addr) conn = ClientConnection(self.em, handler, (addr, port))
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid, p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
defined_master_addr[0], addr, port, self.name)
defined_master_addr[1], self.name)
# send message to dispatcher # send message to dispatcher
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.primary_master_node = None self.primary_master_node = None
...@@ -226,7 +226,7 @@ class Application(ThreadingMixIn, object): ...@@ -226,7 +226,7 @@ class Application(ThreadingMixIn, object):
while 1: while 1:
self._waitMessage(block=0) self._waitMessage(block=0)
if self.primary_master_node == -1: if self.primary_master_node == -1:
raise NEOStorageError("Unable to initialize connection to master node %s" %(defined_master_addr,)) raise NEOStorageError("Unable to initialize connection to master node %s:%d" %(addr, port))
if self.primary_master_node is not None: if self.primary_master_node is not None:
break break
if self.node_not_ready: if self.node_not_ready:
...@@ -234,7 +234,7 @@ class Application(ThreadingMixIn, object): ...@@ -234,7 +234,7 @@ class Application(ThreadingMixIn, object):
return return
logging.info('primary master node is %s' %(self.primary_master_node.server,)) logging.info('primary master node is %s' %(self.primary_master_node.server,))
# Close connection if not already connected to primary master node # Close connection if not already connected to primary master node
if self.primary_master_node.getServer() != defined_master_addr: if self.primary_master_node.getServer() != (addr, port):
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
conn.close() conn.close()
......
...@@ -4,6 +4,6 @@ class NEOStorage(BaseConfig): ...@@ -4,6 +4,6 @@ class NEOStorage(BaseConfig):
def open(self): def open(self):
from NEOStorage import NEOStorage from NEOStorage import NEOStorage
return NEOStorage(master_addr = self.config.master_addr, master_port = int(self.config.master_port), name = self.config.name) return NEOStorage(master_nodes = self.config.master_nodes, name = self.config.name)
...@@ -4,16 +4,10 @@ ...@@ -4,16 +4,10 @@
<description> <description>
A scalable storage for Zope A scalable storage for Zope
</description> </description>
<key name="master_addr" required="yes"> <key name="master_nodes" required="yes">
<description> <description>
Give the ip of the master node Give the list of the master node like ip:port ip:port...
</description> </description>
</key>
<key name="master_port" required="yes">
<description>
Give the port of the master node
</description>
</key>
<key name="name" required="yes"> <key name="name" required="yes">
<description> <description>
Give the name of the cluster Give the name of the cluster
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment