Commit 09d8df62 authored by Jim Fulton's avatar Jim Fulton

Converted the last promise to a generator

Bye bye Promise.

Also fixed a comment.
parent 7cdcc013
......@@ -161,14 +161,7 @@ class Protocol(base.Protocol):
@future_generator
def finish_connect(self, protocol_version):
# We use a promise model rather than coroutines here because
# for the most part, this class is reactive and coroutines
# aren't a good model of it's activities. During
# initialization, however, we use promises to provide an
# imperative flow.
# The promise(/future) implementation we use differs from
# The future implementation we use differs from
# asyncio.Future in that callbacks are called immediately,
# rather than using the loops call_soon. We want to avoid a
# race between invalidations and cache initialization. In
......@@ -243,9 +236,6 @@ class Protocol(base.Protocol):
self._write(self.encode(self.message_id, False, method, args))
return future
def promise(self, method, *args):
return self.call(Promise(), method, args)
def fut(self, method, *args):
return self.call(Fut(), method, args)
......@@ -567,30 +557,28 @@ class Client(object):
else:
future.set_exception(ClientDisconnected())
@future_generator
def tpc_finish_threadsafe(self, future, tid, updates, f):
if self.ready:
@self.protocol.promise('tpc_finish', tid)
def committed(tid):
try:
cache = self.cache
for oid, data, resolved in updates:
cache.invalidate(oid, tid)
if data and not resolved:
cache.store(oid, tid, None, data)
cache.setLastTid(tid)
except Exception as exc:
future.set_exception(exc)
# At this point, our cache is in an inconsistent
# state. We need to reconnect in hopes of
# recovering to a consistent state.
self.protocol.close()
self.disconnected(self.protocol)
else:
f(tid)
future.set_result(tid)
try:
tid = yield self.protocol.fut('tpc_finish', tid)
cache = self.cache
for oid, data, resolved in updates:
cache.invalidate(oid, tid)
if data and not resolved:
cache.store(oid, tid, None, data)
cache.setLastTid(tid)
except Exception as exc:
future.set_exception(exc)
committed.catch(future.set_exception)
# At this point, our cache is in an inconsistent
# state. We need to reconnect in hopes of
# recovering to a consistent state.
self.protocol.close()
self.disconnected(self.protocol)
else:
f(tid)
future.set_result(tid)
else:
future.set_exception(ClientDisconnected())
......@@ -815,96 +803,3 @@ class Fut(object):
raise self.exc
else:
return self._result
class Promise(object):
"""Lightweight future with a partial promise API.
These are lighweight because they call callbacks synchronously
rather than through an event loop, and because they ony support
single callbacks.
"""
# Note that we can know that they are completed after callbacks
# are set up because they're used to make network requests.
# Requests are made by writing to a transport. Because we're used
# in a single-threaded protocol, we can't get a response and be
# completed if the callbacks are set in the same code that
# created the promise, which they are.
next = success_callback = error_callback = cancelled = None
def __call__(self, success_callback = None, error_callback = None):
"""Set the promises success and error handlers and beget a new promise
The promise returned provides for promise chaining, providing
a sane imperative flow. Let's call this the "next" promise.
Any results or exceptions generated by the promise or it's
callbacks are passed on to the next promise.
When the promise completes successfully, if a success callback
isn't set, then the next promise is completed with the
successfull result. If a success callback is provided, it's
called. If the call succeeds, and the result is a promise,
them the result is called with the next promise's set_result
and set_exception methods, chaining the result and next
promise. If the result isn't a promise, then the next promise
is completed with it by calling set_result. If the success
callback fails, then it's exception is passed to
next.set_exception.
If the promise completes with an error and the error callback
isn't set, then the exception is passed to the next promises
set_exception. If an error handler is provided, it's called
and if it doesn't error, then the original exception is passed
to the next promise's set_exception. If there error handler
errors, then that exception is passed to the next promise's
set_exception.
"""
self.next = self.__class__()
self.success_callback = success_callback
self.error_callback = error_callback
return self.next
def cancel(self):
self.set_exception(concurrent.futures.CancelledError)
def catch(self, error_callback):
self.error_callback = error_callback
def set_exception(self, exc):
self._notify(None, exc)
def set_result(self, result):
self._notify(result, None)
def _notify(self, result, exc):
next = self.next
if exc is not None:
if self.error_callback is not None:
try:
result = self.error_callback(exc)
except Exception:
logger.exception("Exception handling error %s", exc)
if next is not None:
next.set_exception(exc)
else:
if next is not None:
next.set_result(result)
elif next is not None:
next.set_exception(exc)
else:
if self.success_callback is not None:
try:
result = self.success_callback(result)
except Exception as exc:
logger.exception("Exception in success callback")
if next is not None:
next.set_exception(exc)
else:
if next is not None:
if isinstance(result, Promise):
result(next.set_result, next.set_exception)
else:
next.set_result(result)
elif next is not None:
next.set_result(result)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment