Commit 93ec6ced authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #78 from zopefoundation/avoid-future-timeouts

Avoid waiting on futures using timeouts -> major performance win on Python 2
parents e5318884 38c09539
...@@ -4,8 +4,11 @@ Changelog ...@@ -4,8 +4,11 @@ Changelog
5.0.2 (unreleased) 5.0.2 (unreleased)
------------------ ------------------
- Provide much better performance on Python 2.
- Provide better error messages when pip tries to install ZEO on an - Provide better error messages when pip tries to install ZEO on an
unsupported Python version. See `issue 75 <https://github.com/zopefoundation/ZEO/issues/75>`_. unsupported Python version. See `issue 75
<https://github.com/zopefoundation/ZEO/issues/75>`_.
5.0.1 (2016-09-06) 5.0.1 (2016-09-06)
------------------ ------------------
......
...@@ -547,7 +547,7 @@ class Client(object): ...@@ -547,7 +547,7 @@ class Client(object):
def get_peername(self): def get_peername(self):
return self.protocol.get_peername() return self.protocol.get_peername()
def call_async_threadsafe(self, future, method, args): def call_async_threadsafe(self, future, wait_ready, method, args):
if self.ready: if self.ready:
self.protocol.call_async(method, args) self.protocol.call_async(method, args)
future.set_result(None) future.set_result(None)
...@@ -557,7 +557,7 @@ class Client(object): ...@@ -557,7 +557,7 @@ class Client(object):
def call_async_from_same_thread(self, method, *args): def call_async_from_same_thread(self, method, *args):
return self.protocol.call_async(method, args) return self.protocol.call_async(method, args)
def call_async_iter_threadsafe(self, future, it): def call_async_iter_threadsafe(self, future, wait_ready, it):
if self.ready: if self.ready:
self.protocol.call_async_iter(it) self.protocol.call_async_iter(it)
future.set_result(None) future.set_result(None)
...@@ -581,16 +581,19 @@ class Client(object): ...@@ -581,16 +581,19 @@ class Client(object):
else: else:
self._when_ready(func, result_future, *args) self._when_ready(func, result_future, *args)
def call_threadsafe(self, future, method, args): def call_threadsafe(self, future, wait_ready, method, args):
if self.ready: if self.ready:
self.protocol.call(future, method, args) self.protocol.call(future, method, args)
elif wait_ready:
self._when_ready(
self.call_threadsafe, future, wait_ready, method, args)
else: else:
self._when_ready(self.call_threadsafe, future, method, args) future.set_exception(ClientDisconnected())
# Special methods because they update the cache. # Special methods because they update the cache.
@future_generator @future_generator
def load_before_threadsafe(self, future, oid, tid): def load_before_threadsafe(self, future, wait_ready, oid, tid):
data = self.cache.loadBefore(oid, tid) data = self.cache.loadBefore(oid, tid)
if data is not None: if data is not None:
future.set_result(data) future.set_result(data)
...@@ -604,8 +607,11 @@ class Client(object): ...@@ -604,8 +607,11 @@ class Client(object):
if data: if data:
data, start, end = data data, start, end = data
self.cache.store(oid, start, end, data) self.cache.store(oid, start, end, data)
elif wait_ready:
self._when_ready(
self.load_before_threadsafe, future, wait_ready, oid, tid)
else: else:
self._when_ready(self.load_before_threadsafe, future, oid, tid) future.set_exception(ClientDisconnected())
@future_generator @future_generator
def _prefetch(self, oid, tid): def _prefetch(self, oid, tid):
...@@ -617,7 +623,7 @@ class Client(object): ...@@ -617,7 +623,7 @@ class Client(object):
except Exception: except Exception:
logger.exception("prefetch %r %r" % (oid, tid)) logger.exception("prefetch %r %r" % (oid, tid))
def prefetch(self, future, oids, tid): def prefetch(self, future, wait_ready, oids, tid):
if self.ready: if self.ready:
for oid in oids: for oid in oids:
if self.cache.loadBefore(oid, tid) is None: if self.cache.loadBefore(oid, tid) is None:
...@@ -628,7 +634,7 @@ class Client(object): ...@@ -628,7 +634,7 @@ class Client(object):
future.set_exception(ClientDisconnected()) future.set_exception(ClientDisconnected())
@future_generator @future_generator
def tpc_finish_threadsafe(self, future, tid, updates, f): def tpc_finish_threadsafe(self, future, wait_ready, tid, updates, f):
if self.ready: if self.ready:
try: try:
tid = yield self.protocol.fut('tpc_finish', tid) tid = yield self.protocol.fut('tpc_finish', tid)
...@@ -652,7 +658,7 @@ class Client(object): ...@@ -652,7 +658,7 @@ class Client(object):
else: else:
future.set_exception(ClientDisconnected()) future.set_exception(ClientDisconnected())
def close_threadsafe(self, future): def close_threadsafe(self, future, _):
self.close() self.close()
future.set_result(None) future.set_result(None)
...@@ -720,15 +726,30 @@ class ClientRunner(object): ...@@ -720,15 +726,30 @@ class ClientRunner(object):
def call(meth, *args, **kw): def call(meth, *args, **kw):
timeout = kw.pop('timeout', None) timeout = kw.pop('timeout', None)
assert not kw assert not kw
# Some explanation of the code below.
# Timeouts on Python 2 are expensive, so we try to avoid
# them if we're connected. The 3rd argument below is a
# wait flag. If false, and we're disconnected, we fail
# immediately. If that happens, then we try again with the
# wait flag set to True and wait with the default timeout.
result = Future() result = Future()
call_soon_threadsafe(meth, result, *args) call_soon_threadsafe(meth, result, timeout is not None, *args)
try:
return self.wait_for_result(result, timeout) return self.wait_for_result(result, timeout)
except ClientDisconnected:
if timeout is None:
result = Future()
call_soon_threadsafe(meth, result, True, *args)
return self.wait_for_result(result, self.timeout)
else:
raise
self.__call = call self.__call = call
def wait_for_result(self, future, timeout): def wait_for_result(self, future, timeout):
try: try:
return future.result(self.timeout if timeout is None else timeout) return future.result(timeout)
except concurrent.futures.TimeoutError: except concurrent.futures.TimeoutError:
if not self.client.ready: if not self.client.ready:
raise ClientDisconnected("timed out waiting for connection") raise ClientDisconnected("timed out waiting for connection")
...@@ -742,7 +763,7 @@ class ClientRunner(object): ...@@ -742,7 +763,7 @@ class ClientRunner(object):
# for tests # for tests
result = concurrent.futures.Future() result = concurrent.futures.Future()
self.loop.call_soon_threadsafe( self.loop.call_soon_threadsafe(
self.call_threadsafe, result, method, args) self.call_threadsafe, result, True, method, args)
return result return result
def async(self, method, *args): def async(self, method, *args):
...@@ -783,7 +804,7 @@ class ClientRunner(object): ...@@ -783,7 +804,7 @@ class ClientRunner(object):
self.__call = call_closed self.__call = call_closed
def apply_threadsafe(self, future, func, *args): def apply_threadsafe(self, future, wait_ready, func, *args):
try: try:
future.set_result(func(*args)) future.set_result(func(*args))
except Exception as exc: except Exception as exc:
......
...@@ -113,6 +113,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -113,6 +113,8 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
sized(self.encode(message_id, False, '.reply', result))) sized(self.encode(message_id, False, '.reply', result)))
def wait_for_result(self, future, timeout): def wait_for_result(self, future, timeout):
if future.done() and future.exception() is not None:
raise future.exception()
return future return future
def testClientBasics(self): def testClientBasics(self):
...@@ -145,8 +147,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -145,8 +147,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
# connecting, we get an error. This is because some dufus # connecting, we get an error. This is because some dufus
# decided to create a client storage without waiting for it to # decided to create a client storage without waiting for it to
# connect. # connect.
f1 = self.call('foo', 1, 2) self.assertRaises(ClientDisconnected, self.call, 'foo', 1, 2)
self.assertTrue(isinstance(f1.exception(), ClientDisconnected))
# When the client is reconnecting, it's ready flag is set to False and # When the client is reconnecting, it's ready flag is set to False and
# it queues calls: # it queues calls:
...@@ -155,8 +156,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner): ...@@ -155,8 +156,7 @@ class ClientTests(Base, setupstack.TestCase, ClientRunner):
self.assertFalse(f1.done()) self.assertFalse(f1.done())
# If we try to make an async call, we get an immediate error: # If we try to make an async call, we get an immediate error:
f2 = self.async('bar', 3, 4) self.assertRaises(ClientDisconnected, self.async, 'bar', 3, 4)
self.assert_(isinstance(f2.exception(), ClientDisconnected))
# The wrapper object (ClientStorage) hasn't been notified: # The wrapper object (ClientStorage) hasn't been notified:
self.assertFalse(wrapper.notify_connected.called) self.assertFalse(wrapper.notify_connected.called)
......
...@@ -114,9 +114,6 @@ If we access the root object, it'll be loaded from the server: ...@@ -114,9 +114,6 @@ If we access the root object, it'll be loaded from the server:
>>> conn.root()[1].x >>> conn.root()[1].x
6 6
>>> len(db.storage._cache)
2
Similarly, if we simply disconnect the client, and write data from Similarly, if we simply disconnect the client, and write data from
another client: another client:
...@@ -138,8 +135,8 @@ another client: ...@@ -138,8 +135,8 @@ another client:
- Drops or clears it's client cache. (The end result is that the cache - Drops or clears it's client cache. (The end result is that the cache
is working but empty.) is working but empty.)
>>> len(db.storage._cache) >>> len(db.storage._cache) <= 1
1 True
(When a database is created, it checks to make sure the root object is (When a database is created, it checks to make sure the root object is
in the database, which is why we get 1, rather than 0 objects in the cache.) in the database, which is why we get 1, rather than 0 objects in the cache.)
......
...@@ -971,8 +971,8 @@ def test_prefetch(self): ...@@ -971,8 +971,8 @@ def test_prefetch(self):
>>> conn.close() >>> conn.close()
>>> conn = ZEO.connection(addr) >>> conn = ZEO.connection(addr)
>>> storage = conn.db().storage >>> storage = conn.db().storage
>>> len(storage._cache) >>> len(storage._cache) <= 1
1 True
>>> storage.prefetch(oids, conn._storage._start) >>> storage.prefetch(oids, conn._storage._start)
The prefetch returns before the cache is filled: The prefetch returns before the cache is filled:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment