Commit cc333941 authored by Jim Fulton's avatar Jim Fulton

Lots of changes while integrating wth ClientStorage

- testZEO tests now pass

- async tests now pass again

  Probably need to write more async tests to reflect changes.
  (Or maybe the ZEO tests that drove tem are enough.)

- dropped heartbeat tests, which were insane. Will add simpler test
  when I add heartbeats to the async implementation.
parent 3f31236b
...@@ -29,6 +29,8 @@ import time ...@@ -29,6 +29,8 @@ import time
import weakref import weakref
from binascii import hexlify from binascii import hexlify
import BTrees.OOBTree
import zc.lockfile import zc.lockfile
import ZODB import ZODB
import ZODB.BaseStorage import ZODB.BaseStorage
...@@ -223,7 +225,8 @@ class ClientStorage(object): ...@@ -223,7 +225,8 @@ class ClientStorage(object):
self._oids = [] # List of pre-fetched oids from server self._oids = [] # List of pre-fetched oids from server
cache = self._cache = open_cache(cache, var, client, cache_size) cache = self._cache = open_cache(
cache, var, client, storage, cache_size)
# XXX need to check for POSIX-ness here # XXX need to check for POSIX-ness here
self.blob_dir = blob_dir self.blob_dir = blob_dir
...@@ -257,8 +260,8 @@ class ClientStorage(object): ...@@ -257,8 +260,8 @@ class ClientStorage(object):
addr, self, cache, storage, addr, self, cache, storage,
ZEO.asyncio.client.Fallback if read_only_fallback else read_only, ZEO.asyncio.client.Fallback if read_only_fallback else read_only,
wait_timeout or 30, wait_timeout or 30,
wait=wait,
) )
self._server.start()
self._call = self._server.call self._call = self._server.call
self._async = self._server.async self._async = self._server.async
self._async_iter = self._server.async_iter self._async_iter = self._server.async_iter
...@@ -341,13 +344,6 @@ class ClientStorage(object): ...@@ -341,13 +344,6 @@ class ClientStorage(object):
self._info.update(info) self._info.update(info)
# for name in self._info.get('extensionMethods', {}).keys():
# if not hasattr(self, name):
# def mklambda(mname):
# return (lambda *args, **kw:
# self._server.rpc.call(mname, *args, **kw))
# setattr(self, name, mklambda(name))
for iface in ( for iface in (
ZODB.interfaces.IStorageRestoreable, ZODB.interfaces.IStorageRestoreable,
ZODB.interfaces.IStorageIteration, ZODB.interfaces.IStorageIteration,
...@@ -560,7 +556,7 @@ class ClientStorage(object): ...@@ -560,7 +556,7 @@ class ClientStorage(object):
def store(): def store():
yield ('storeBlobStart', ()) yield ('storeBlobStart', ())
f = open(blobfilename, 'rb') f = open(target, 'rb')
while 1: while 1:
chunk = f.read(59000) chunk = f.read(59000)
if not chunk: if not chunk:
...@@ -714,6 +710,12 @@ class ClientStorage(object): ...@@ -714,6 +710,12 @@ class ClientStorage(object):
try: try:
tbuf = txn.data(self) tbuf = txn.data(self)
except AttributeError:
# Gaaaa. This is a recovery transaction. Work around this
# until we can think of something better. XXX
tb = {}
txn.data = tb.__getitem__
txn.set_data = tb.__setitem__
except KeyError: except KeyError:
pass pass
else: else:
...@@ -855,9 +857,6 @@ class ClientStorage(object): ...@@ -855,9 +857,6 @@ class ClientStorage(object):
assert not version assert not version
self._check_trans(transaction, 'restore') self._check_trans(transaction, 'restore')
self._async('restorea', oid, serial, data, prev_txn, id(transaction)) self._async('restorea', oid, serial, data, prev_txn, id(transaction))
# Don't update the transaction buffer, because current data are
# unaffected.
return self._check_serials()
# Below are methods invoked by the StorageServer # Below are methods invoked by the StorageServer
...@@ -871,6 +870,10 @@ class ClientStorage(object): ...@@ -871,6 +870,10 @@ class ClientStorage(object):
"""Server callback to update the info dictionary.""" """Server callback to update the info dictionary."""
self._info.update(dict) self._info.update(dict)
def invalidateCache(self):
if self._db is not None:
self._db.invalidateCache()
def invalidateTransaction(self, tid, oids): def invalidateTransaction(self, tid, oids):
"""Server callback: Invalidate objects modified by tid.""" """Server callback: Invalidate objects modified by tid."""
if self._db is not None: if self._db is not None:
...@@ -1154,14 +1157,16 @@ def _lock_blob(path): ...@@ -1154,14 +1157,16 @@ def _lock_blob(path):
else: else:
break break
def open_cache(cache, var, client, cache_size): def open_cache(cache, var, client, storage, cache_size):
if isinstance(cache, (None.__class__, str)): if isinstance(cache, (None.__class__, str)):
from ZEO.cache import ClientCache from ZEO.cache import ClientCache
if cache is None: if cache is None:
if client: if client:
cache = os.path.join(var or os.getcwd(), client) cache = os.path.join(var or os.getcwd(),
"%s-%s.zec" % (client, storage))
else: else:
return ClientCache(cache, cache_size) # ephemeral cache
return ClientCache(None, cache_size)
cache = ClientCache(cache, cache_size) cache = ClientCache(cache, cache_size)
......
...@@ -62,6 +62,7 @@ class TransactionBuffer: ...@@ -62,6 +62,7 @@ class TransactionBuffer:
def serial(self, oid, serial): def serial(self, oid, serial):
if isinstance(serial, Exception): if isinstance(serial, Exception):
self.exception = serial self.exception = serial
self.serials[oid] = None
else: else:
self.serials[oid] = serial self.serials[oid] = serial
......
...@@ -7,9 +7,13 @@ import logging ...@@ -7,9 +7,13 @@ import logging
import random import random
import threading import threading
import traceback import traceback
import ZEO.Exceptions
import ZODB.event
import ZODB.POSException import ZODB.POSException
import ZEO.Exceptions
import ZEO.interfaces
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
Fallback = object() Fallback = object()
...@@ -272,6 +276,16 @@ class Protocol(asyncio.Protocol): ...@@ -272,6 +276,16 @@ class Protocol(asyncio.Protocol):
type(args[0]) == self.exception_type_type and type(args[0]) == self.exception_type_type and
issubclass(args[0], Exception) issubclass(args[0], Exception)
): ):
if not issubclass(
args[0], (
ZODB.POSException.POSKeyError,
ZODB.POSException.ConflictError,)
):
logger.error("%s from server: %s.%s:%s",
self.name,
args[0].__module__,
args[0].__name__,
args[1])
future.set_exception(args[1]) future.set_exception(args[1])
else: else:
future.set_result(args) future.set_result(args)
...@@ -307,7 +321,7 @@ class Protocol(asyncio.Protocol): ...@@ -307,7 +321,7 @@ class Protocol(asyncio.Protocol):
'receiveBlobStart', 'receiveBlobChunk', 'receiveBlobStop', 'receiveBlobStart', 'receiveBlobChunk', 'receiveBlobStop',
# plus: notify_connected, notify_disconnected # plus: notify_connected, notify_disconnected
) )
client_delegated = client_methods[1:] client_delegated = client_methods[2:]
class Client: class Client:
"""asyncio low-level ZEO client interface """asyncio low-level ZEO client interface
...@@ -432,6 +446,8 @@ class Client: ...@@ -432,6 +446,8 @@ class Client:
self.client.invalidateCache() self.client.invalidateCache()
self.finished_verify(server_tid) self.finished_verify(server_tid)
elif cache_tid > server_tid: elif cache_tid > server_tid:
logger.critical(
'Client has seen newer transactions than server!')
raise AssertionError("Server behind client, %r < %r, %s", raise AssertionError("Server behind client, %r < %r, %s",
server_tid, cache_tid, protocol) server_tid, cache_tid, protocol)
elif cache_tid == server_tid: elif cache_tid == server_tid:
...@@ -447,7 +463,15 @@ class Client: ...@@ -447,7 +463,15 @@ class Client:
return tid return tid
else: else:
# cache is too old # cache is too old
logger.info("cache too old %s", protocol) try:
ZODB.event.notify(
ZEO.interfaces.StaleCache(self.client))
except Exception:
logger.exception("sending StaleCache event")
logger.critical(
"%s dropping stale cache",
getattr(self.client, '__name__', ''),
)
self.cache.clear() self.cache.clear()
self.client.invalidateCache() self.client.invalidateCache()
return server_tid return server_tid
...@@ -561,12 +585,22 @@ class Client: ...@@ -561,12 +585,22 @@ class Client:
if self.ready: if self.ready:
@self.protocol.promise('tpc_finish', tid) @self.protocol.promise('tpc_finish', tid)
def committed(tid): def committed(tid):
try:
cache = self.cache cache = self.cache
for oid, data, resolved in updates: for oid, data, resolved in updates:
cache.invalidate(oid, tid) cache.invalidate(oid, tid)
if data and not resolved: if data and not resolved:
cache.store(oid, tid, None, data) cache.store(oid, tid, None, data)
cache.setLastTid(tid) cache.setLastTid(tid)
except Exception as exc:
future.set_exception(exc)
# At this point, our cache is in an inconsistent
# state. We need to reconnect in hopes of
# recovering to a consistent state.
self.protocol.close()
self.disconnected(self.protocol)
else:
f(tid) f(tid)
future.set_result(tid) future.set_result(tid)
...@@ -585,6 +619,18 @@ class Client: ...@@ -585,6 +619,18 @@ class Client:
self.cache.setLastTid(tid) self.cache.setLastTid(tid)
self.client.invalidateTransaction(tid, oids) self.client.invalidateTransaction(tid, oids)
def serialnos(self, serials):
# Before delegating, check for errors (likely ConflictErrors)
# and invalidate the oids they're associated with. In the
# past, this was done by the client, but now we control the
# cache and this is our last chance, as the client won't call
# back into us when there's an error.
for oid, serial in serials:
if isinstance(serial, Exception):
self.cache.invalidate(oid, None)
self.client.serialnos(serials)
@property @property
def protocol_version(self): def protocol_version(self):
return self.protocol.protocol_version return self.protocol.protocol_version
...@@ -699,19 +745,15 @@ class ClientThread(ClientRunner): ...@@ -699,19 +745,15 @@ class ClientThread(ClientRunner):
def __init__(self, addrs, client, cache, def __init__(self, addrs, client, cache,
storage_key='1', read_only=False, timeout=30, storage_key='1', read_only=False, timeout=30,
disconnect_poll=1, wait=True): disconnect_poll=1):
self.set_options(addrs, client, cache, storage_key, read_only, self.set_options(addrs, client, cache, storage_key, read_only,
timeout, disconnect_poll) timeout, disconnect_poll)
self.thread = threading.Thread( self.thread = threading.Thread(
target=self.run, target=self.run,
name='zeo_client_'+storage_key, name="%s zeo client networking thread" % client.__name__,
daemon=True, daemon=True,
) )
self.started = threading.Event() self.started = threading.Event()
self.thread.start()
self.started.wait()
if wait:
self.connected.result(timeout)
exception = None exception = None
def run(self): def run(self):
...@@ -724,11 +766,24 @@ class ClientThread(ClientRunner): ...@@ -724,11 +766,24 @@ class ClientThread(ClientRunner):
except Exception as exc: except Exception as exc:
logger.exception("Client thread") logger.exception("Client thread")
self.exception = exc self.exception = exc
raise finally:
else: if not self.closed:
if self.client.ready:
self.closed = True
self.client.ready = False
self.client.client.notify_disconnected()
logger.critical("Client loop stopped unexpectedly")
loop.close() loop.close()
logger.debug('Stopping client thread') logger.debug('Stopping client thread')
def start(self, wait=True):
self.thread.start()
self.started.wait()
if self.exception:
raise self.exception
if wait:
self.connected.result(self.timeout)
closed = False closed = False
def close(self): def close(self):
if not self.closed: if not self.closed:
......
...@@ -96,7 +96,16 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -96,7 +96,16 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
# Actually, the client isn't connected until it initializes it's cache: # Actually, the client isn't connected until it initializes it's cache:
self.assertFalse(client.connected.done() or transport.data) self.assertFalse(client.connected.done() or transport.data)
# If we try to make calls while the client is connecting, they're queued # If we try to make calls while the client is *initially*
# connecting, we get an error. This is because some dufus
# decided to create a client storage without waiting for it to
# connect.
f1 = self.call('foo', 1, 2)
self.assertTrue(isinstance(f1.exception(), ClientDisconnected))
# When the client is reconnecting, it's ready flag is set to False and
# it queues calls:
client.ready = False
f1 = self.call('foo', 1, 2) f1 = self.call('foo', 1, 2)
self.assertFalse(f1.done()) self.assertFalse(f1.done())
...@@ -195,7 +204,7 @@ class AsyncTests(setupstack.TestCase, ClientRunner): ...@@ -195,7 +204,7 @@ class AsyncTests(setupstack.TestCase, ClientRunner):
self.assertEqual(parse(transport.pop()), self.assertEqual(parse(transport.pop()),
(8, False, 'tpc_finish', (b'd'*8,))) (8, False, 'tpc_finish', (b'd'*8,)))
respond(8, b'e'*8) respond(8, b'e'*8)
self.assertEqual(committed.result(), None) self.assertEqual(committed.result(), b'e'*8)
self.assertEqual(cache.load(b'1'*8), None) self.assertEqual(cache.load(b'1'*8), None)
self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8)) self.assertEqual(cache.load(b'2'*8), ('committed 2', b'e'*8))
self.assertEqual(cache.load(b'4'*8), ('committed 4', b'e'*8)) self.assertEqual(cache.load(b'4'*8), ('committed 4', b'e'*8))
......
...@@ -2,17 +2,14 @@ Avoiding cache verifification ...@@ -2,17 +2,14 @@ Avoiding cache verifification
============================= =============================
For large databases it is common to also use very large ZEO cache For large databases it is common to also use very large ZEO cache
files. If a client has beed disconnected for too long, cache verification files. If a client has beed disconnected for too long, the server
might be necessary, but cache verification can be very hard on the can't play back missing invalidations. In this case, the cache is
storage server. cleared. When this happens, a ZEO.interfaces.StaleCache event is
published, largely for backward compatibility.
When verification is needed, a ZEO.interfaces.StaleCache event is ClientStorage used to provide an option to drop it's cache rather than
published. Applications may handle this event to perform actions such doing verification. This is now the only behavior. Cache
as exiting the process to avoid a cold restart. verification is no longer supported.
ClientStorage provides an option to drop it's cache rather than doing
verification. When this option is used, and verification would be
necessary, after publishing the event, ClientStorage:
- Invalidates all object caches - Invalidates all object caches
...@@ -27,8 +24,7 @@ Start a server, create a cient to it and commit some data ...@@ -27,8 +24,7 @@ Start a server, create a cient to it and commit some data
>>> addr, admin = start_server(keep=1) >>> addr, admin = start_server(keep=1)
>>> import ZEO, transaction >>> import ZEO, transaction
>>> db = ZEO.DB(addr, drop_cache_rather_verify=True, client='cache', >>> db = ZEO.DB(addr, client='cache', name='test')
... name='test')
>>> wait_connected(db.storage) >>> wait_connected(db.storage)
>>> conn = db.open() >>> conn = db.open()
>>> conn.root()[1] = conn.root().__class__() >>> conn.root()[1] = conn.root().__class__()
...@@ -58,11 +54,11 @@ logging and event data: ...@@ -58,11 +54,11 @@ logging and event data:
>>> import logging, zope.testing.loggingsupport, ZODB.event >>> import logging, zope.testing.loggingsupport, ZODB.event
>>> handler = zope.testing.loggingsupport.InstalledHandler( >>> handler = zope.testing.loggingsupport.InstalledHandler(
... 'ZEO.ClientStorage', level=logging.ERROR) ... 'ZEO', level=logging.ERROR)
>>> events = [] >>> events = []
>>> def event_handler(e): >>> def event_handler(e):
... events.append(( ... events.append((
... len(e.storage._cache), str(handler), e.__class__.__name__)) ... len(e.storage._server.client.cache), str(handler), e.__class__.__name__))
>>> old_notify = ZODB.event.notify >>> old_notify = ZODB.event.notify
>>> ZODB.event.notify = event_handler >>> ZODB.event.notify = event_handler
...@@ -105,8 +101,8 @@ Now, let's verify our assertions above: ...@@ -105,8 +101,8 @@ Now, let's verify our assertions above:
- Logs a CRITICAL message. - Logs a CRITICAL message.
>>> print(handler) >>> print(handler) # doctest: +ELLIPSIS
ZEO.ClientStorage CRITICAL ZEO... CRITICAL
test dropping stale cache test dropping stale cache
>>> handler.clear() >>> handler.clear()
...@@ -156,8 +152,8 @@ in the database, which is why we get 1, rather than 0 objects in the cache.) ...@@ -156,8 +152,8 @@ in the database, which is why we get 1, rather than 0 objects in the cache.)
- Logs a CRITICAL message. - Logs a CRITICAL message.
>>> print(handler) >>> print(handler) # doctest: +ELLIPSIS
ZEO.ClientStorage CRITICAL ZEO... CRITICAL
test dropping stale cache test dropping stale cache
>>> handler.clear() >>> handler.clear()
...@@ -168,49 +164,6 @@ If we access the root object, it'll be loaded from the server: ...@@ -168,49 +164,6 @@ If we access the root object, it'll be loaded from the server:
>>> conn.root()[1].x >>> conn.root()[1].x
11 11
Finally, let's look at what happens without the
drop_cache_rather_verify option:
>>> db.close()
>>> db = ZEO.DB(addr, client='cache')
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root()[1].x
11
>>> conn.root()[2] = conn.root().__class__()
>>> transaction.commit()
>>> len(db.storage._cache)
4
>>> stop_server(admin)
>>> addr2, admin = start_server(keep=1)
>>> db2 = ZEO.DB(addr2)
>>> wait_connected(db2.storage)
>>> conn2 = db2.open()
>>> for i in range(5):
... conn2.root()[1].x += 1
... transaction.commit()
>>> db2.close()
>>> stop_server(admin)
>>> _, admin = start_server(zeo_conf=dict(invalidation_queue_size=1),
... addr=addr)
>>> wait_connected(db.storage)
>>> for e in events:
... print(e)
(4, '', 'StaleCache')
>>> print(handler)
<BLANKLINE>
>>> len(db.storage._cache)
3
Here we see the cache wasn't dropped, although one of the records was
invalidated during verification.
.. Cleanup .. Cleanup
>>> db.close() >>> db.close()
......
...@@ -108,6 +108,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False, ...@@ -108,6 +108,7 @@ def start_zeo_server(storage_conf=None, zeo_conf=None, port=None, keep=False,
if not storage_conf: if not storage_conf:
storage_conf = '<filestorage>\npath %s\n</filestorage>' % path storage_conf = '<filestorage>\npath %s\n</filestorage>' % path
if blob_dir: if blob_dir:
storage_conf = '<blobstorage>\nblob-dir %s\n%s\n</blobstorage>' % ( storage_conf = '<blobstorage>\nblob-dir %s\n%s\n</blobstorage>' % (
blob_dir, storage_conf) blob_dir, storage_conf)
......
This diff is collapsed.
...@@ -103,7 +103,7 @@ Now, let's see if we can break it. :) ...@@ -103,7 +103,7 @@ Now, let's see if we can break it. :)
... path = s2.fshelper.getBlobFilename(*blob_id) ... path = s2.fshelper.getBlobFilename(*blob_id)
... if os.path.exists(path): ... if os.path.exists(path):
... ZODB.blob.remove_committed(path) ... ZODB.blob.remove_committed(path)
... s2._server.sendBlob(*blob_id) ... s2._call('sendBlob', *blob_id)
... else: print('Dang') ... else: print('Dang')
>>> threadf.join() >>> threadf.join()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment