Commit 4833183f authored by Jim Fulton's avatar Jim Fulton

Fixed a bug in low-level data input handling

In ZEO.asyncio.client.Protocol.data_received, the logic was broken if
there we unhandled errors, for example in calling client storage methods.
parent 9e0f98f9
......@@ -235,30 +235,43 @@ class Protocol(asyncio.Protocol):
# Low-level input handler collects data into sized messages.
# Note that the logic below assume that when new data pushes
# us over what we want, we process it in one call until we
# need more, because we assume that excess data is all in the
# last item of self.input. This is why the exception handling
# in the while loop is critical. Without it, an exception
# might cause us to exit before processing all of the data we
# should, when then causes the logic to be broken in
# subsequent calls.
self.got += len(data)
self.input.append(data)
while self.got >= self.want:
extra = self.got - self.want
if extra == 0:
collected = b''.join(self.input)
self.input = []
else:
input = self.input
self.input = [data[-extra:]]
input[-1] = input[-1][:-extra]
collected = b''.join(input)
self.got = extra
if self.getting_size:
# we were recieving the message size
assert self.want == 4
self.want = unpack(">I", collected)[0]
self.getting_size = False
else:
self.want = 4
self.getting_size = True
self.message_received(collected)
try:
extra = self.got - self.want
if extra == 0:
collected = b''.join(self.input)
self.input = []
else:
input = self.input
self.input = [input[-1][-extra:]]
input[-1] = input[-1][:-extra]
collected = b''.join(input)
self.got = extra
if self.getting_size:
# we were recieving the message size
assert self.want == 4
self.want = unpack(">I", collected)[0]
self.getting_size = False
else:
self.want = 4
self.getting_size = True
self.message_received(collected)
except Exception:
logger.exception("data_received %s %s %s",
self.want, self.got, self.getting_size)
def first_message_received(self, data):
# Handler for first/handshake message, set up in __init__
......
......@@ -613,6 +613,34 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
client.ready = False
self.assertRaises(ClientDisconnected, self.call, 'foo')
def test_errors_in_data_received(self):
# There was a bug in ZEO.async.client.Protocol.data_recieved
# that caused it to fail badly if errors were raised while
# handling data.
wrapper, cache, loop, client, protocol, transport, send, respond = (
self.start(finish_start=True))
wrapper.receiveBlobStart.side_effect = ValueError('test')
chunk = 'x' * 99999
try:
loop.protocol.data_received(b''.join((
sized(pickle.dumps(
(0, True, 'receiveBlobStart', ('oid', 'serial')), 3)),
sized(pickle.dumps(
(0, True, 'receiveBlobChunk',
('oid', 'serial', chunk)), 3)),
)))
except ValueError:
pass
loop.protocol.data_received(
sized(pickle.dumps(
(0, True, 'receiveBlobStop', ('oid', 'serial')), 3)),
)
wrapper.receiveBlobChunk.assert_called_with('oid', 'serial', chunk)
wrapper.receiveBlobStop.assert_called_with('oid', 'serial')
def unsized(self, data, unpickle=False):
result = []
while data:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment