Commit 97c6be1c authored by Aurel's avatar Aurel

add a block parameter to _waitMessage


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@85 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent cb80604f
...@@ -18,7 +18,7 @@ from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \ ...@@ -18,7 +18,7 @@ from neo.client.NEOStorage import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError NEOStorageNotFoundError
from neo.client.multithreading import ThreadingMixIn from neo.client.multithreading import ThreadingMixIn
from ZODB.POSException import UndoError, StorageTransactionError from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
class ConnectionManager(object): class ConnectionManager(object):
...@@ -60,7 +60,7 @@ class ConnectionManager(object): ...@@ -60,7 +60,7 @@ class ConnectionManager(object):
p.notifyNodeInformation(msg_id, node_list) p.notifyNodeInformation(msg_id, node_list)
self.storage.queue.put((None, msg_id, conn, p), True) self.storage.queue.put((None, msg_id, conn, p), True)
return None return None
logging.debug('connected to storage node %s' %(addr,)) logging.info('connected to storage node %s' %(addr,))
return conn return conn
def _dropConnection(self,): def _dropConnection(self,):
...@@ -128,6 +128,7 @@ class Application(ThreadingMixIn, object): ...@@ -128,6 +128,7 @@ class Application(ThreadingMixIn, object):
self.uuid = None self.uuid = None
self.mq_cache = MQ() self.mq_cache = MQ()
self.new_oid_list = [] self.new_oid_list = []
self.ptid = None
# Transaction specific variable # Transaction specific variable
self.tid = None self.tid = None
self.txn = None self.txn = None
...@@ -175,11 +176,18 @@ class Application(ThreadingMixIn, object): ...@@ -175,11 +176,18 @@ class Application(ThreadingMixIn, object):
pass pass
logging.info("connected to primary master node") logging.info("connected to primary master node")
def _waitMessage(self): def _waitMessage(self,block=1):
"""Wait for a message returned by dispatcher in queues.""" """Wait for a message returned by dispatcher in queues."""
# First get message we are waiting for # First get message we are waiting for
message = None message = None
message = self.local_var.tmp_q.get(True, None) if block:
message = self.local_var.tmp_q.get(True, None)
else:
# we don't want to block until we got a message
try:
message = self.local_var.tmp_q.get_nowait()
except Empty:
pass
if message is not None: if message is not None:
message[0].handler.dispatch(message[0], message[1]) message[0].handler.dispatch(message[0], message[1])
# Now check if there is global messages and execute them # Now check if there is global messages and execute them
...@@ -190,7 +198,7 @@ class Application(ThreadingMixIn, object): ...@@ -190,7 +198,7 @@ class Application(ThreadingMixIn, object):
except Empty: except Empty:
break break
if global_message is not None: if global_message is not None:
global_message[0].handler.dispatch(message[0], message[1]) global_message[0].handler.dispatch(global_message[0], global_message[1])
def connectToPrimaryMasterNode(self, defined_master_addr): def connectToPrimaryMasterNode(self, defined_master_addr):
...@@ -214,7 +222,7 @@ class Application(ThreadingMixIn, object): ...@@ -214,7 +222,7 @@ class Application(ThreadingMixIn, object):
self.node_not_ready = 0 self.node_not_ready = 0
while 1: while 1:
self._waitMessage() self._waitMessage(block=0)
if self.primary_master_node == -1: if self.primary_master_node == -1:
raise NEOStorageError("Unable to initialize connection to master node %s" %(defined_master_addr,)) raise NEOStorageError("Unable to initialize connection to master node %s" %(defined_master_addr,))
if self.primary_master_node is not None: if self.primary_master_node is not None:
...@@ -222,12 +230,11 @@ class Application(ThreadingMixIn, object): ...@@ -222,12 +230,11 @@ class Application(ThreadingMixIn, object):
if self.node_not_ready: if self.node_not_ready:
# must wait # must wait
return return
logging.debug('primary master node is %s' %(self.primary_master_node.server,)) logging.info('primary master node is %s' %(self.primary_master_node.server,))
# Close connection if not already connected to primary master node # Close connection if not already connected to primary master node
if self.primary_master_node.getServer() != defined_master_addr: if self.primary_master_node.getServer() != defined_master_addr:
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
if not isinstance(conn, ListeningConnection): conn.close()
conn.close()
# Connect to primary master node # Connect to primary master node
conn = ClientConnection(self.em, handler, self.primary_master_node.server) conn = ClientConnection(self.em, handler, self.primary_master_node.server)
...@@ -242,7 +249,7 @@ class Application(ThreadingMixIn, object): ...@@ -242,7 +249,7 @@ class Application(ThreadingMixIn, object):
self.master_conn = conn self.master_conn = conn
# Wait for primary master node information # Wait for primary master node information
while 1: while 1:
self._waitMessage() self._waitMessage(block=0)
if self.pt.filled() or self.node_not_ready: if self.pt.filled() or self.node_not_ready:
break break
...@@ -600,7 +607,7 @@ class Application(ThreadingMixIn, object): ...@@ -600,7 +607,7 @@ class Application(ThreadingMixIn, object):
self.store(oid, self.tid, data, None, txn) self.store(oid, self.tid, data, None, txn)
except NEOStorageConflictError, serial: except NEOStorageConflictError, serial:
if serial <= self.tid: if serial <= self.tid:
new_data = wrapper.tryToResolveConflict(oid, self.tid, new_data = wrapper.tryToResolveConflict(oid, self.tid,
serial, data) serial, data)
if new_data is not None: if new_data is not None:
self.store(oid, self.tid, new_data, None, txn) self.store(oid, self.tid, new_data, None, txn)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment