Commit 033baabe authored by Grégory Wisniewski's avatar Grégory Wisniewski

Fix a bug where a storage node was not handled properly by the master when he

come back after a downtime. The STOP_OPERATION packet was never deleted from the
input buffer when OperationFailure was raised. The input & ouput buffers no more
use string list but one string only, performances should not be affected.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@460 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 63d41bc0
...@@ -132,8 +132,8 @@ class Connection(BaseConnection): ...@@ -132,8 +132,8 @@ class Connection(BaseConnection):
def __init__(self, event_manager, handler, def __init__(self, event_manager, handler,
connector = None, addr = None, connector = None, addr = None,
connector_handler = None): connector_handler = None):
self.read_buf = [] self.read_buf = ""
self.write_buf = [] self.write_buf = ""
self.cur_id = 0 self.cur_id = 0
self.event_dict = {} self.event_dict = {}
self.aborted = False self.aborted = False
...@@ -203,43 +203,35 @@ class Connection(BaseConnection): ...@@ -203,43 +203,35 @@ class Connection(BaseConnection):
def analyse(self): def analyse(self):
"""Analyse received data.""" """Analyse received data."""
if self.read_buf: while 1:
if len(self.read_buf) == 1: try:
msg = self.read_buf[0] packet = Packet.parse(self.read_buf)
else: except ProtocolError, m:
msg = ''.join(self.read_buf) self.handler.packetMalformed(self, *m)
return
while 1: if packet is None:
break
# Remove idle events, if appropriate packets were received.
for msg_id in (None, packet.getId()):
try: try:
packet = Packet.parse(msg) event = self.event_dict[msg_id]
except ProtocolError, m: del self.event_dict[msg_id]
self.handler.packetMalformed(self, *m) self.em.removeIdleEvent(event)
return except KeyError:
pass
if packet is None:
break
# Remove idle events, if appropriate packets were received.
for msg_id in (None, packet.getId()):
try:
event = self.event_dict[msg_id]
del self.event_dict[msg_id]
self.em.removeIdleEvent(event)
except KeyError:
pass
logging.debug('#0x%04x %-30s from %s (%s:%d)', packet.getId(),
packet.getType(), dump(self.uuid), *self.getAddress())
self.handler.packetReceived(self, packet)
msg = msg[len(packet):]
if msg: logging.debug('#0x%04x %-30s from %s (%s:%d)', packet.getId(),
self.read_buf = [msg] packet.getType(), dump(self.uuid), *self.getAddress())
else:
del self.read_buf[:] try:
self.handler.packetReceived(self, packet)
finally:
self.read_buf = self.read_buf[len(packet):]
def pending(self): def pending(self):
return self.connector is not None and len(self.write_buf) != 0 return self.connector is not None and self.write_buf
def recv(self): def recv(self):
"""Receive data from a connector.""" """Receive data from a connector."""
...@@ -250,7 +242,7 @@ class Connection(BaseConnection): ...@@ -250,7 +242,7 @@ class Connection(BaseConnection):
self.handler.connectionClosed(self) self.handler.connectionClosed(self)
self.close() self.close()
else: else:
self.read_buf.append(r) self.read_buf += r
except ConnectorTryAgainException: except ConnectorTryAgainException:
pass pass
except: except:
...@@ -262,20 +254,14 @@ class Connection(BaseConnection): ...@@ -262,20 +254,14 @@ class Connection(BaseConnection):
def send(self): def send(self):
"""Send data to a connector.""" """Send data to a connector."""
if self.write_buf: if self.write_buf:
if len(self.write_buf) == 1:
msg = self.write_buf[0]
else:
msg = ''.join(self.write_buf)
try: try:
r = self.connector.send(msg) r = self.connector.send(self.write_buf)
if not r: if not r:
logging.error('cannot write') logging.error('cannot write')
self.handler.connectionClosed(self) self.handler.connectionClosed(self)
self.close() self.close()
elif r == len(msg):
del self.write_buf[:]
else: else:
self.write_buf = [msg[r:]] self.write_buf = self.write_buf[r:]
except ConnectorTryAgainException: except ConnectorTryAgainException:
return return
except: except:
...@@ -292,13 +278,13 @@ class Connection(BaseConnection): ...@@ -292,13 +278,13 @@ class Connection(BaseConnection):
logging.debug('#0x%04x %-30s to %s (%s:%d)', packet.getId(), logging.debug('#0x%04x %-30s to %s (%s:%d)', packet.getId(),
packet.getType(), dump(self.uuid), *self.getAddress()) packet.getType(), dump(self.uuid), *self.getAddress())
try: try:
self.write_buf.append(packet.encode()) self.write_buf += packet.encode()
except ProtocolError, m: except ProtocolError, m:
logging.critical('trying to send a too big message') logging.critical('trying to send a too big message')
return self.addPacket(packet.internalError(packet.getId(), m[0])) return self.addPacket(packet.internalError(packet.getId(), m[0]))
# If this is the first time, enable polling for writing. # If this is the first time, enable polling for writing.
if len(self.write_buf) == 1: if self.write_buf:
self.em.addWriter(self) self.em.addWriter(self)
def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30): def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment