Commit f4eeb598 authored by Jim Fulton's avatar Jim Fulton Committed by GitHub

Merge pull request #40 from zopefoundation/asyncio

Asyncio-based ZEO client and server (2)
parents 50692664 beb50865
language: python
sudo: false
python:
- 2.7
- 3.3
- 3.4
- pypy
matrix:
include:
- os: linux
python: 2.7
- os: linux
python: 3.4
- os: linux
python: 3.5
- os: linux
python: 3.4
env: ZEO_MTACCEPTOR=1
- os: linux
python: 3.5
env: ZEO_MTACCEPTOR=1
- os: linux
python: 2.7
env: ZEO4_SERVER=1
- os: linux
python: 3.5
env: ZEO4_SERVER=1
- os: linux
python: 3.5
env: BUILOUT_OPTIONS=extra=,uvloop
install:
- pip install -U setuptools
- python bootstrap.py
- bin/buildout
- pip install zc.buildout
- buildout $BUILOUT_OPTIONS
cache:
directories:
- eggs
script:
- bin/test -v1 -j99
- bin/test -v1j99
notifications:
email: false
Changelog
=========
4.2.0 (unreleased)
- Added a ``ClientStorage`` ``server-sync`` configuration option and
``server_sync`` constructor argument to force a server round trip at
the beginning of transactions to wait for any outstanding
invalidations at the start of the transaction to be delivered.
- When creating an ad hoc server, a log file isn't created by
default. You must pass a ``log`` option specifying a log file name.
- The ZEO server register method now returns the storage last
transaction, allowing the client to avoid an extra round trip during
cache verification.
- Client disconnect errors are now transient errors. When
applications retry jobs that raise transient errors, jobs (e.g. web
requests) with disconnect errors will be retried. Together with
blocking synchronous ZEO server calls for a limited time while
disconnected, this change should allow brief disconnections due to
server restart to avoid generating client-visible errors (e.g. 500
web responses).
- Fixed bugs in using the ZEO 5 client with ZEO 4 servers.
5.0.0a2 (2016-07-30)
--------------------
- Added the ability to pass credentials when creating client storages.
This is experimental in that passing credentials will cause
connections to an ordinary ZEO server to fail, but it facilitates
experimentation with custom ZEO servers. Doing this with custom ZEO
clients would have been awkward due to the many levels of
composition involved.
In the future, we expect to support server security plugins that
consume credentials for authentication (typically over SSL).
Note that credentials are opaque to ZEO. They can be any object with
a true value. The client mearly passes them to the server, which
will someday pass them to a plugin.
5.0.0a1 (2016-07-21)
--------------------
- Added a ClientStorage prefetch method to prefetch oids.
When oids are prefetched, requests are made at once, but the caller
doesn't block waiting for the results. Rather, then the caller
later tries to fetch data for one of the object ids, it's either
delivered right away from the ZEO cache, if the prefetch for the
object id has completed, or the caller blocks until the inflight
prefetch completes. (No new request is made.)
- Fixed: SSL clients of servers with signed certs didn't load default
certs and were unable to connect.
5.0.0a0 (2016-07-08)
--------------------
This is a major ZEO revision, which replaces the ZEO network protocol
implementation.
New features:
- SSL support
- Optional client-side conflict resolution.
- Lots of mostly internal clean ups.
Dropped features:
- The ZEO authentication protocol.
This will be replaced by new authentication mechanims leveraging SSL.
- The ZEO monitor server.
- Full cache verification.
- Client suppprt for servers older than ZODB 3.9
- Server support for clients older than ZEO 4.2.0
4.2.0 (2016-06-15)
------------------
- Changed loadBefore to operate more like load behaved, especially
with regard to the load lock. This allowes ZEO to work with the
upcoming ZODB 5, which used loadbefore rather than load.
Reimplemented load using loadBefore, thus testing loadBefore
extensively via existing tests.
- Other changes to work with ZODB 5 (as well as ZODB 4)
- Fixed: the ZEO cache loadBefore method failed to utilize current data.
- Drop support for Python 2.6 and 3.2.
4.2.0b1 (2015-06-05)
......
This diff is collapsed.
ZEO networking implemention based on asyncio to-dos
===================================================
First iteration, client only
----------------------------
- socketless tests for protocol and adapters
- Disconnect/reconnect strategy
- Integration with ClientStorage
Second iteration, server
------------------------
TBD after client release.
......@@ -4,6 +4,7 @@ parts =
test
scripts
versions = versions
extra =
[versions]
......@@ -11,7 +12,7 @@ versions = versions
[test]
recipe = zc.recipe.testrunner
eggs =
ZEO [test]
ZEO [test${buildout:extra}]
initialization =
import os, tempfile
try: os.mkdir('tmp')
......@@ -21,6 +22,5 @@ defaults = ['--all']
[scripts]
recipe = zc.recipe.egg
eggs =
ZEO [test]
eggs = ${test:eggs}
interpreter = py
This diff is collapsed.
......@@ -11,31 +11,44 @@
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Setup
"""
version = '4.2.0.dev0'
version = '5.0.0a2'
from setuptools import setup, find_packages
import os
import sys
if sys.version_info < (2, 7):
print("This version of ZEO requires Python 2.7 or higher")
if sys.version_info < (2, 7, 9):
print("This version of ZEO requires Python 2.7.9 or higher")
sys.exit(0)
if (3, 0) < sys.version_info < (3, 3):
print("This version of ZEO requires Python 3.3 or higher")
if (3, 0) < sys.version_info < (3, 4):
print("This version of ZEO requires Python 3.4 or higher")
sys.exit(0)
install_requires = [
'ZODB >= 5.0.0a5',
'six',
'transaction >= 1.6.0',
'persistent >= 4.1.0',
'zc.lockfile',
'ZConfig',
'zdaemon',
'zope.interface',
]
tests_require = ['zope.testing', 'manuel', 'random2', 'mock']
classifiers = """\
if sys.version_info[:2] < (3, ):
install_requires.extend(('futures', 'trollius'))
classifiers = """
Intended Audience :: Developers
License :: OSI Approved :: Zope Public License
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
Programming Language :: Python :: 3.5
Programming Language :: Python :: Implementation :: CPython
Programming Language :: Python :: Implementation :: PyPy
Topic :: Database
......@@ -43,7 +56,7 @@ Topic :: Software Development :: Libraries :: Python Modules
Operating System :: Microsoft :: Windows
Operating System :: Unix
Framework :: ZODB
"""
""".strip().split('\n')
def _modname(path, base, name=''):
if path == base:
......@@ -96,8 +109,6 @@ def alltests():
_unittests_only(suite, mod.test_suite())
return suite
tests_require = ['zope.testing', 'manuel', 'random2']
long_description = (
open('README.rst').read()
+ '\n' +
......@@ -114,20 +125,11 @@ setup(name="ZEO",
package_dir = {'': 'src'},
license = "ZPL 2.1",
platforms = ["any"],
classifiers = filter(None, classifiers.split("\n")),
classifiers = classifiers,
test_suite="__main__.alltests", # to support "setup.py test"
tests_require = tests_require,
extras_require = dict(test=tests_require),
install_requires = [
'ZODB >= 4.2.0b1',
'six',
'transaction',
'persistent >= 4.1.0',
'zc.lockfile',
'ZConfig',
'zdaemon',
'zope.interface',
],
extras_require = dict(test=tests_require, uvloop=['uvloop >=0.5.1']),
install_requires = install_requires,
zip_safe = False,
entry_points = """
[console_scripts]
......
This diff is collapsed.
......@@ -13,16 +13,31 @@
##############################################################################
"""Exceptions for ZEO."""
import transaction.interfaces
from ZODB.POSException import StorageError
class ClientStorageError(StorageError):
"""An error occurred in the ZEO Client Storage."""
"""An error occurred in the ZEO Client Storage.
"""
class UnrecognizedResult(ClientStorageError):
"""A server call returned an unrecognized result."""
"""A server call returned an unrecognized result.
"""
class ClientDisconnected(ClientStorageError):
"""The database storage is disconnected from the storage."""
class ClientDisconnected(ClientStorageError,
transaction.interfaces.TransientError):
"""The database storage is disconnected from the storage.
"""
class AuthError(StorageError):
"""The client provided invalid authentication credentials."""
"""The client provided invalid authentication credentials.
"""
class ProtocolError(ClientStorageError):
"""A client contacted a server with an incomparible protocol
"""
class ServerException(ClientStorageError):
"""
"""
This diff is collapsed.
This diff is collapsed.
......@@ -21,44 +21,22 @@ is used to store the data until a commit or abort.
# A faster implementation might store trans data in memory until it
# reaches a certain size.
from threading import Lock
import os
import tempfile
import ZODB.blob
from ZEO._compat import Pickler, Unpickler
class TransactionBuffer:
# Valid call sequences:
#
# ((store | invalidate)* begin_iterate next* clear)* close
#
# get_size can be called any time
# The TransactionBuffer is used by client storage to hold update
# data until the tpc_finish(). It is normally used by a single
# data until the tpc_finish(). It is only used by a single
# thread, because only one thread can be in the two-phase commit
# at one time.
# It is possible, however, for one thread to close the storage
# while another thread is in the two-phase commit. We must use
# a lock to guard against this race, because unpredictable things
# can happen in Python if one thread closes a file that another
# thread is reading. In a debug build, an assert() can fail.
# Caution: If an operation is performed on a closed TransactionBuffer,
# it has no effect and does not raise an exception. The only time
# this should occur is when a ClientStorage is closed in one
# thread while another thread is in its tpc_finish(). It's not
# clear what should happen in this case. If the tpc_finish()
# completes without error, the Connection using it could have
# inconsistent data. This should have minimal effect, though,
# because the Connection is connected to a closed storage.
def __init__(self):
def __init__(self, connection_generation):
self.connection_generation = connection_generation
self.file = tempfile.TemporaryFile(suffix=".tbuf")
self.lock = Lock()
self.closed = 0
self.count = 0
self.size = 0
self.blobs = []
......@@ -66,89 +44,65 @@ class TransactionBuffer:
# stored are builtin types -- strings or None.
self.pickler = Pickler(self.file, 1)
self.pickler.fast = 1
self.server_resolved = set() # {oid}
self.client_resolved = {} # {oid -> buffer_record_number}
self.exception = None
def close(self):
self.clear()
self.lock.acquire()
try:
self.closed = 1
try:
self.file.close()
except OSError:
pass
finally:
self.lock.release()
self.file.close()
def store(self, oid, data):
"""Store oid, version, data for later retrieval"""
self.lock.acquire()
try:
if self.closed:
return
self.pickler.dump((oid, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + (data and len(data) or 0) + 31
finally:
self.lock.release()
self.pickler.dump((oid, data))
self.count += 1
# Estimate per-record cache size
self.size = self.size + (data and len(data) or 0) + 31
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
def resolve(self, oid, data):
"""Record client-resolved data
"""
self.store(oid, data)
self.client_resolved[oid] = self.count - 1
def invalidate(self, oid):
self.lock.acquire()
try:
if self.closed:
return
self.pickler.dump((oid, None))
self.count += 1
finally:
self.lock.release()
def clear(self):
"""Mark the buffer as empty"""
self.lock.acquire()
try:
if self.closed:
return
self.file.seek(0)
self.count = 0
self.size = 0
while self.blobs:
oid, blobfilename = self.blobs.pop()
if os.path.exists(blobfilename):
ZODB.blob.remove_committed(blobfilename)
finally:
self.lock.release()
def server_resolve(self, oid):
self.server_resolved.add(oid)
def __iter__(self):
self.lock.acquire()
try:
if self.closed:
return
self.file.flush()
self.file.seek(0)
return TBIterator(self.file, self.count)
finally:
self.lock.release()
class TBIterator(object):
def __init__(self, f, count):
self.file = f
self.count = count
self.unpickler = Unpickler(f)
def storeBlob(self, oid, blobfilename):
self.blobs.append((oid, blobfilename))
def __iter__(self):
return self
def __next__(self):
"""Return next tuple of data or None if EOF"""
if self.count == 0:
self.file.seek(0)
self.size = 0
raise StopIteration
oid_ver_data = self.unpickler.load()
self.count -= 1
return oid_ver_data
next = __next__
self.file.seek(0)
unpickler = Unpickler(self.file)
server_resolved = self.server_resolved
client_resolved = self.client_resolved
# Gaaaa, this is awkward. There can be entries in serials that
# aren't in the buffer, because undo. Entries can be repeated
# in the buffer, because ZODB. (Maybe this is a bug now, but
# it may be a feature later.
seen = set()
for i in range(self.count):
oid, data = unpickler.load()
if client_resolved.get(oid, i) == i:
seen.add(oid)
yield oid, data, oid in server_resolved
# We may have leftover oids because undo
for oid in server_resolved:
if oid not in seen:
yield oid, None, True
# Support ZEO4:
def serialnos(self, args):
for oid in args:
if isinstance(oid, bytes):
self.server_resolved.add(oid)
else:
oid, serial = oid
if isinstance(serial, Exception):
self.exception = serial
elif serial == b'rs':
self.server_resolved.add(oid)
......@@ -26,14 +26,24 @@ def client(*args, **kw):
return ZEO.ClientStorage.ClientStorage(*args, **kw)
def DB(*args, **kw):
import ZODB
return ZODB.DB(client(*args, **kw))
s = client(*args, **kw)
try:
import ZODB
return ZODB.DB(s)
except Exception:
s.close()
raise
def connection(*args, **kw):
return DB(*args, **kw).open_then_close_db_when_connection_closes()
db = DB(*args, **kw)
try:
return db.open_then_close_db_when_connection_closes()
except Exception:
db.close()
ra
def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
port=None):
port=0, threaded=True, **kw):
"""Convenience function to start a server for interactive exploration
This fuction starts a ZEO server, given a storage configuration or
......@@ -68,20 +78,13 @@ def server(path=None, blob_dir=None, storage_conf=None, zeo_conf=None,
port
If no ZEO configuration is supplied, the one will be computed
from the port. If no port is supplied, one will be chosedn
randomly.
dynamically.
"""
import os, ZEO.tests.forker
if storage_conf is None and path is None:
storage_conf = '<mappingstorage>\n</mappingstorage>'
if port is None and zeo_conf is None:
port = ZEO.tests.forker.get_port()
addr, admin, pid, config = ZEO.tests.forker.start_zeo_server(
return ZEO.tests.forker.start_zeo_server(
storage_conf, zeo_conf, port, keep=True, path=path,
blob_dir=blob_dir, suicide=False)
os.remove(config)
def stop_server():
ZEO.tests.forker.shutdown_zeo_server(admin)
os.waitpid(pid, 0)
return addr, stop_server
blob_dir=blob_dir, suicide=False, threaded=threaded, **kw)
================================
asyncio-based networking for ZEO
================================
This package provides the networking interface for ZEO. It provides a
somewhat RPC-like API.
Notes
=====
Sending data immediately: ayncio vs asyncore
--------------------------------------------
The previous ZEO networking implementation used the ``asyncore`` library.
When writing with asyncore, writes were done only from the event loop.
This meant that when sending data, code would have to "wake up" the
event loop, typically after adding data to some sort of output buffer.
Asyncio takes an entirely different and saner approach. When an
application wants to send data, it writes to a transport. All
interactions with a transport (in a correct application) are from the
same thread, which is also the thread running any event loop.
Transports are always either idle or sending data. When idle, the
transport writes to the outout socket immediately. If not all data
isn't sent, then it buffers it and becomes sending. If a transport is
sending, then we know that the socket isn't ready for more data, so
``write`` can just buffer the data. There's no point in waking up the
event loop, because the socket will do so when it's ready for more
data.
An exception to the paragraph above occurs when operations cross
threads, as occures for most client operations and when a transaction
commits on the server and results have to be sent to other clients. In
these cases, a call_soon_threadsafe method is used which queues an
operation and has to wake up an event loop to process it.
Server threading
----------------
There are currently two server implementations, an implementation that
used a thread per client (and a thread to listen for connections),
``ZEO.asyncio.mtacceptor.Acceptor``, and an implementation that uses a
single networking thread, ``ZEO.asyncio.server.Acceptor``. The
implementation is selected by changing an import in
``ZEO.StorageServer``. The currently-used implementation is
``ZEO.asyncio.server.Acceptor``, although this sentance is likely to
rot, so check the import to be sure. (Maybe this should be configurable.)
ZEO switched to a multi-threaded implementation several years ago
because it was found to improve performance for large databases using
magnetic disks. Because client threads are always working on behalf of
a single client, there's not really an issue with making blocking
calls, such as executing slow I/O operations.
Initially, the asyncio-based implementation used a multi-threaded
server. A simple thread accepted connections and handed accepted
sockets to ``create_connection``. This became a problem when SSL was
added because ``create_connection`` sets up SSL conections as client
connections, and doesn't provide an option to create server
connections.
In response, I created an ``asyncio.Server``-based implementation.
This required using a single thread. This was a pretty trivial
change, however, it led to the tests becoming unstable to the point
that it was impossible to run all tests without some failing. One
test was broken due to a ``asyncio.Server`` `bug
<http://bugs.python.org/issue27386>`_. It's unclear whether the test
instability is due to ``asyncio.Server`` problems or due to latent
test (or ZEO) bugs, but even after beating the tests mostly into
submission, tests failures are more likely when using
``asyncio.Server``. Beatings will continue.
While fighting test failures using ``asyncio.Server``, the
multi-threaded implementation was updated to use a monkey patch to
allow it to create SSL server connections. Aside from the real risk of a
monkey patch, this works very well.
Both implementations seem to perform about the same.
from .._compat import PY3
if PY3:
import asyncio
else:
import trollius as asyncio
import logging
import socket
from struct import unpack
import sys
from .marshal import encoder
logger = logging.getLogger(__name__)
INET_FAMILIES = socket.AF_INET, socket.AF_INET6
class Protocol(asyncio.Protocol):
"""asyncio low-level ZEO base interface
"""
# All of the code in this class runs in a single dedicated
# thread. Thus, we can mostly avoid worrying about interleaved
# operations.
# One place where special care was required was in cache setup on
# connect. See finish connect below.
transport = protocol_version = None
def __init__(self, loop, addr):
self.loop = loop
self.addr = addr
self.input = [] # Input buffer when assembling messages
self.output = [] # Output buffer when paused
self.paused = [] # Paused indicator, mutable to avoid attr lookup
# Handle the first message, the protocol handshake, differently
self.message_received = self.first_message_received
def __repr__(self):
return self.name
closed = False
def close(self):
if not self.closed:
self.closed = True
if self.transport is not None:
self.transport.close()
def connection_made(self, transport):
logger.info("Connected %s", self)
if sys.version_info < (3, 6):
sock = transport.get_extra_info('socket')
if sock is not None and sock.family in INET_FAMILIES:
# See https://bugs.python.org/issue27456 :(
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True)
self.transport = transport
paused = self.paused
output = self.output
append = output.append
writelines = transport.writelines
from struct import pack
def write(message):
if paused:
append(message)
else:
writelines((pack(">I", len(message)), message))
self._write = write
def writeit(data):
# Note, don't worry about combining messages. Iters
# will be used with blobs, in which case, the individual
# messages will be big to begin with.
data = iter(data)
for message in data:
writelines((pack(">I", len(message)), message))
if paused:
append(data)
break
self._writeit = writeit
got = 0
want = 4
getting_size = True
def data_received(self, data):
# Low-level input handler collects data into sized messages.
# Note that the logic below assume that when new data pushes
# us over what we want, we process it in one call until we
# need more, because we assume that excess data is all in the
# last item of self.input. This is why the exception handling
# in the while loop is critical. Without it, an exception
# might cause us to exit before processing all of the data we
# should, when then causes the logic to be broken in
# subsequent calls.
self.got += len(data)
self.input.append(data)
while self.got >= self.want:
try:
extra = self.got - self.want
if extra == 0:
collected = b''.join(self.input)
self.input = []
else:
input = self.input
self.input = [input[-1][-extra:]]
input[-1] = input[-1][:-extra]
collected = b''.join(input)
self.got = extra
if self.getting_size:
# we were recieving the message size
assert self.want == 4
self.want = unpack(">I", collected)[0]
self.getting_size = False
else:
self.want = 4
self.getting_size = True
self.message_received(collected)
except Exception:
logger.exception("data_received %s %s %s",
self.want, self.got, self.getting_size)
def first_message_received(self, protocol_version):
# Handler for first/handshake message, set up in __init__
del self.message_received # use default handler from here on
self.encode = encoder()
self.finish_connect(protocol_version)
def call_async(self, method, args):
self._write(self.encode(0, True, method, args))
def call_async_iter(self, it):
self._writeit(self.encode(0, True, method, args)
for method, args in it)
def pause_writing(self):
self.paused.append(1)
def resume_writing(self):
paused = self.paused
del paused[:]
output = self.output
writelines = self.transport.writelines
from struct import pack
while output and not paused:
message = output.pop(0)
if isinstance(message, bytes):
writelines((pack(">I", len(message)), message))
else:
data = message
for message in data:
writelines((pack(">I", len(message)), message))
if paused: # paused again. Put iter back.
output.insert(0, data)
break
def get_peername(self):
return self.transport.get_extra_info('peername')
This diff is collapsed.
from .._compat import PY3
if PY3:
import asyncio
try:
from uvloop import new_event_loop
except ImportError:
from asyncio import new_event_loop
else:
import trollius as asyncio
from trollius import new_event_loop
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -12,7 +12,7 @@
<import package="ZODB"/>
<!-- Use the ZEO server information structure. -->
<import package="ZEO"/>
<import package="ZEO" file="server.xml" />
<import package="ZConfig.components.logger"/>
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment