Commit 020a9f8c authored by Jim Fulton's avatar Jim Fulton

Simplified error handling during vote

ZEO's vote error handling was extremely and weirdly complicated.
There's a hysterical explanation: Originally, the ZEO protocol was
synchronous and mirrored the storage API. That was too slow, so store
was made asynchronous.  But then there was no way to return new
serials, so we added a serialnos client API that the server called
during TPC. This was used both to serials and errors.  Later, tpc_vote
was added, which is a synchronous method. That would have been an
execllent opportunity to fix this, but noooooooooo.

I'd like the server vote logic to get simpler, and we also want to get rid
of serialnos, as it makes it hard to commit multiple transactions at
once on the client.  We can't get rid of serialnos now, because the
client needs to work with old servers.

Of course, nothing is easy. There was a special edge case where if the
client saw an unhandled conflict error, it would invalidate it's cache
for that object, allowing it to eventually recover when caches had
missed invalidations. This required a change to ClientStorage, which
meant that the server wouldn't work with older ZEO versions, which
meant that the server could only support protocol Z5, which in tern
affected protocol-negotiation tests....
parent 5e6fd05f
...@@ -84,7 +84,7 @@ class ZEOStorage: ...@@ -84,7 +84,7 @@ class ZEOStorage:
blob_tempfile = None blob_tempfile = None
log_label = 'unconnected' log_label = 'unconnected'
locked = False # Don't have storage lock locked = False # Don't have storage lock
verifying = store_failed = 0 verifying = 0
def __init__(self, server, read_only=0): def __init__(self, server, read_only=0):
self.server = server self.server = server
...@@ -338,7 +338,6 @@ class ZEOStorage: ...@@ -338,7 +338,6 @@ class ZEOStorage:
self.blob_log = [] self.blob_log = []
self.tid = tid self.tid = tid
self.status = status self.status = status
self.store_failed = 0
self.stats.active_txns += 1 self.stats.active_txns += 1
# Assign the transaction attribute last. This is so we don't # Assign the transaction attribute last. This is so we don't
...@@ -426,29 +425,30 @@ class ZEOStorage: ...@@ -426,29 +425,30 @@ class ZEOStorage:
self.storage.tpc_begin(self.transaction) self.storage.tpc_begin(self.transaction)
for op, args in self.txnlog: for op, args in self.txnlog:
if not getattr(self, op)(*args): getattr(self, op)(*args)
break
# Blob support # Blob support
while self.blob_log and not self.store_failed: while self.blob_log:
oid, oldserial, data, blobfilename = self.blob_log.pop() oid, oldserial, data, blobfilename = self.blob_log.pop()
self._store(oid, oldserial, data, blobfilename) self._store(oid, oldserial, data, blobfilename)
if not self.store_failed:
# Only call tpc_vote of no store call failed,
# otherwise the serialnos() call will deliver an
# exception that will be handled by the client in
# its tpc_vote() method.
serials = self.storage.tpc_vote(self.transaction) serials = self.storage.tpc_vote(self.transaction)
if serials: if serials:
self.serials.extend(serials) self.serials.extend(serials)
self.connection.async('serialnos', self.serials) self.connection.async('serialnos', self.serials)
except Exception: except Exception as err:
self.storage.tpc_abort(self.transaction) self.storage.tpc_abort(self.transaction)
self._clear_transaction() self._clear_transaction()
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error %s" % err, BLATHER)
if not isinstance(err, TransactionError):
logger.exception("While voting")
if delay is not None: if delay is not None:
delay.error(sys.exc_info()) delay.error(sys.exc_info())
else: else:
...@@ -549,60 +549,21 @@ class ZEOStorage: ...@@ -549,60 +549,21 @@ class ZEOStorage:
self._check_tid(tid, exc=StorageTransactionError) self._check_tid(tid, exc=StorageTransactionError)
self.txnlog.undo(trans_id) self.txnlog.undo(trans_id)
def _op_error(self, oid, err, op):
self.store_failed = 1
if isinstance(err, ConflictError):
self.stats.conflicts += 1
self.log("conflict error oid=%s msg=%s" %
(oid_repr(oid), str(err)), BLATHER)
if not isinstance(err, TransactionError):
# Unexpected errors are logged and passed to the client
self.log("%s error: %s, %s" % ((op,)+ sys.exc_info()[:2]),
logging.ERROR, exc_info=True)
err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
self.serials.append((oid, err))
def _delete(self, oid, serial): def _delete(self, oid, serial):
err = None
try:
self.storage.deleteObject(oid, serial, self.transaction) self.storage.deleteObject(oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(oid, err, 'delete')
return err is None
def _checkread(self, oid, serial): def _checkread(self, oid, serial):
err = None
try:
self.storage.checkCurrentSerialInTransaction( self.storage.checkCurrentSerialInTransaction(
oid, serial, self.transaction) oid, serial, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(oid, err, 'checkCurrentSerialInTransaction')
return err is None
def _store(self, oid, serial, data, blobfile=None): def _store(self, oid, serial, data, blobfile=None):
err = None
try:
if blobfile is None: if blobfile is None:
newserial = self.storage.store( newserial = self.storage.store(
oid, serial, data, '', self.transaction) oid, serial, data, '', self.transaction)
else: else:
newserial = self.storage.storeBlob( newserial = self.storage.storeBlob(
oid, serial, data, blobfile, '', self.transaction) oid, serial, data, blobfile, '', self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as error:
self._op_error(oid, error, 'store')
err = error
else:
if serial != b"\0\0\0\0\0\0\0\0": if serial != b"\0\0\0\0\0\0\0\0":
self.invalidated.append(oid) self.invalidated.append(oid)
...@@ -618,52 +579,15 @@ class ZEOStorage: ...@@ -618,52 +579,15 @@ class ZEOStorage:
self.serials.append((oid, s)) self.serials.append((oid, s))
return err is None
def _restore(self, oid, serial, data, prev_txn): def _restore(self, oid, serial, data, prev_txn):
err = None
try:
self.storage.restore(oid, serial, data, '', prev_txn, self.storage.restore(oid, serial, data, '', prev_txn,
self.transaction) self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as err:
self._op_error(oid, err, 'restore')
return err is None
def _undo(self, trans_id): def _undo(self, trans_id):
err = None
try:
tid, oids = self.storage.undo(trans_id, self.transaction) tid, oids = self.storage.undo(trans_id, self.transaction)
except (SystemExit, KeyboardInterrupt):
raise
except Exception as e:
err = e
self._op_error(z64, err, 'undo')
else:
self.invalidated.extend(oids) self.invalidated.extend(oids)
self.serials.extend((oid, ResolvedSerial) for oid in oids) self.serials.extend((oid, ResolvedSerial) for oid in oids)
return err is None
def _marshal_error(self, error):
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something that can be pickled.
if PY3:
pickler = Pickler(BytesIO(), 3)
else:
# The pure-python version requires at least one argument (PyPy)
pickler = Pickler(0)
pickler.fast = 1
try:
pickler.dump(error)
except:
msg = "Couldn't pickle storage exception: %s" % repr(error)
self.log(msg, logging.ERROR)
error = StorageServerError(msg)
return error
# IStorageIteration support # IStorageIteration support
def iterator_start(self, start, stop): def iterator_start(self, start, stop):
......
...@@ -17,7 +17,7 @@ class ServerProtocol(base.Protocol): ...@@ -17,7 +17,7 @@ class ServerProtocol(base.Protocol):
"""asyncio low-level ZEO server interface """asyncio low-level ZEO server interface
""" """
protocols = b'Z4', b'Z5' protocols = (b'Z5', )
name = 'server protocol' name = 'server protocol'
methods = set(('register', )) methods = set(('register', ))
...@@ -161,7 +161,7 @@ class Delay: ...@@ -161,7 +161,7 @@ class Delay:
def error(self, exc_info): def error(self, exc_info):
self.sent = 'error' self.sent = 'error'
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info) logger.error("Error raised in delayed method", exc_info=exc_info)
self.protocol.send_error(self.msgid, exc_info[1]) self.protocol.send_error(self.msgid, exc_info[1])
def __repr__(self): def __repr__(self):
...@@ -198,5 +198,4 @@ class MTDelay(Delay): ...@@ -198,5 +198,4 @@ class MTDelay(Delay):
def error(self, exc_info): def error(self, exc_info):
self.ready.wait() self.ready.wait()
log("Error raised in delayed method", logging.ERROR, exc_info=exc_info)
self.protocol.call_soon_threadsafe(Delay.error, self, exc_info) self.protocol.call_soon_threadsafe(Delay.error, self, exc_info)
...@@ -750,7 +750,7 @@ class ServerTests(Base, setupstack.TestCase): ...@@ -750,7 +750,7 @@ class ServerTests(Base, setupstack.TestCase):
self.target = protocol.zeo_storage self.target = protocol.zeo_storage
if finish: if finish:
self.assertEqual(self.pop(parse=False), best_protocol_version) self.assertEqual(self.pop(parse=False), best_protocol_version)
protocol.data_received(sized(b'Z4')) protocol.data_received(sized(b'Z5'))
return protocol return protocol
message_id = 0 message_id = 0
...@@ -788,9 +788,9 @@ class ServerTests(Base, setupstack.TestCase): ...@@ -788,9 +788,9 @@ class ServerTests(Base, setupstack.TestCase):
self.assertEqual(self.pop(parse=False), best_protocol_version) self.assertEqual(self.pop(parse=False), best_protocol_version)
# The client sends it's protocol: # The client sends it's protocol:
protocol.data_received(sized(b'Z4')) protocol.data_received(sized(b'Z5'))
self.assertEqual(protocol.protocol_version, b'Z4') self.assertEqual(protocol.protocol_version, b'Z5')
protocol.zeo_storage.notify_connected.assert_called_once_with(protocol) protocol.zeo_storage.notify_connected.assert_called_once_with(protocol)
......
...@@ -94,6 +94,10 @@ def runner(config, qin, qout, timeout=None, ...@@ -94,6 +94,10 @@ def runner(config, qin, qout, timeout=None,
import ZEO.asyncio.server import ZEO.asyncio.server
old_protocol = ZEO.asyncio.server.best_protocol_version old_protocol = ZEO.asyncio.server.best_protocol_version
ZEO.asyncio.server.best_protocol_version = protocol ZEO.asyncio.server.best_protocol_version = protocol
old_protocols = ZEO.asyncio.server.ServerProtocol.protocols
ZEO.asyncio.server.ServerProtocol.protocols = tuple(sorted(
set(old_protocols) | set([protocol])
))
try: try:
import ZEO.runzeo, threading import ZEO.runzeo, threading
...@@ -144,8 +148,8 @@ def runner(config, qin, qout, timeout=None, ...@@ -144,8 +148,8 @@ def runner(config, qin, qout, timeout=None,
finally: finally:
if old_protocol: if old_protocol:
ZEO.asyncio.server.best_protocol_version = protocol ZEO.asyncio.server.best_protocol_version = old_protocol
ZEO.asyncio.server.ServerProtocol.protocols = old_protocols
def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None): def stop_runner(thread, config, qin, qout, stop_timeout=9, pid=None):
qin.put('stop') qin.put('stop')
......
...@@ -5,7 +5,7 @@ A full test of all protocols isn't practical. But we'll do a limited ...@@ -5,7 +5,7 @@ A full test of all protocols isn't practical. But we'll do a limited
test that at least the current and previous protocols are supported in test that at least the current and previous protocols are supported in
both directions. both directions.
Let's start a Z309 server Let's start a Z4 server
>>> storage_conf = ''' >>> storage_conf = '''
... <blobstorage> ... <blobstorage>
...@@ -94,82 +94,85 @@ A current client should be able to connect to a old server: ...@@ -94,82 +94,85 @@ A current client should be able to connect to a old server:
>>> zope.testing.setupstack.rmtree('blobs') >>> zope.testing.setupstack.rmtree('blobs')
>>> zope.testing.setupstack.rmtree('server-blobs') >>> zope.testing.setupstack.rmtree('server-blobs')
And the other way around: #############################################################################
# Note that the ZEO 5.0 server only supports clients that use the Z5 protocol
>>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5)) # And the other way around:
Note that we'll have to pull some hijinks: # >>> addr, _ = start_server(storage_conf, dict(invalidation_queue_size=5))
>>> import ZEO.asyncio.client # Note that we'll have to pull some hijinks:
>>> old_protocols = ZEO.asyncio.client.Protocol.protocols
>>> ZEO.asyncio.client.Protocol.protocols = [b'Z4']
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') # >>> import ZEO.asyncio.client
>>> db.storage.protocol_version # >>> old_protocols = ZEO.asyncio.client.Protocol.protocols
b'Z4' # >>> ZEO.asyncio.client.Protocol.protocols = [b'Z4']
>>> wait_connected(db.storage)
>>> conn = db.open()
>>> conn.root().x = 0
>>> transaction.commit()
>>> len(db.history(conn.root()._p_oid, 99))
2
>>> conn.root()['blob1'] = ZODB.blob.Blob() # >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
>>> with conn.root()['blob1'].open('w') as f: # >>> db.storage.protocol_version
... r = f.write(b'blob data 1') # b'Z4'
>>> transaction.commit() # >>> wait_connected(db.storage)
# >>> conn = db.open()
# >>> conn.root().x = 0
# >>> transaction.commit()
# >>> len(db.history(conn.root()._p_oid, 99))
# 2
>>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True) # >>> conn.root()['blob1'] = ZODB.blob.Blob()
>>> wait_connected(db2.storage) # >>> with conn.root()['blob1'].open('w') as f:
>>> conn2 = db2.open() # ... r = f.write(b'blob data 1')
>>> for i in range(5): # >>> transaction.commit()
... conn2.root().x += 1
... transaction.commit()
>>> conn2.root()['blob2'] = ZODB.blob.Blob()
>>> with conn2.root()['blob2'].open('w') as f:
... r = f.write(b'blob data 2')
>>> transaction.commit()
# >>> db2 = ZEO.DB(addr, blob_dir='server-blobs', shared_blob_dir=True)
# >>> wait_connected(db2.storage)
# >>> conn2 = db2.open()
# >>> for i in range(5):
# ... conn2.root().x += 1
# ... transaction.commit()
# >>> conn2.root()['blob2'] = ZODB.blob.Blob()
# >>> with conn2.root()['blob2'].open('w') as f:
# ... r = f.write(b'blob data 2')
# >>> transaction.commit()
>>> @wait_until()
... def x_to_be_5():
... conn.sync()
... return conn.root().x == 5
>>> db.close() # >>> @wait_until()
# ... def x_to_be_5():
# ... conn.sync()
# ... return conn.root().x == 5
>>> for i in range(2): # >>> db.close()
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') # >>> for i in range(2):
>>> wait_connected(db.storage) # ... conn2.root().x += 1
>>> conn = db.open() # ... transaction.commit()
>>> conn.root().x
7
>>> db.close() # >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
# >>> wait_connected(db.storage)
# >>> conn = db.open()
# >>> conn.root().x
# 7
>>> for i in range(10): # >>> db.close()
... conn2.root().x += 1
... transaction.commit()
>>> db = ZEO.DB(addr, client='client', blob_dir='blobs') # >>> for i in range(10):
>>> wait_connected(db.storage) # ... conn2.root().x += 1
>>> conn = db.open() # ... transaction.commit()
>>> conn.root().x
17
>>> with conn.root()['blob1'].open() as f: # >>> db = ZEO.DB(addr, client='client', blob_dir='blobs')
... f.read() # >>> wait_connected(db.storage)
b'blob data 1' # >>> conn = db.open()
>>> with conn.root()['blob2'].open() as f: # >>> conn.root().x
... f.read() # 17
b'blob data 2'
>>> db2.close() # >>> with conn.root()['blob1'].open() as f:
>>> db.close() # ... f.read()
# b'blob data 1'
# >>> with conn.root()['blob2'].open() as f:
# ... f.read()
# b'blob data 2'
# >>> db2.close()
# >>> db.close()
Undo the hijinks: # Undo the hijinks:
>>> ZEO.asyncio.client.Protocol.protocols = old_protocols # >>> ZEO.asyncio.client.Protocol.protocols = old_protocols
...@@ -78,6 +78,8 @@ will conflict. It will be blocked at the vote call. ...@@ -78,6 +78,8 @@ will conflict. It will be blocked at the vote call.
>>> class Sender: >>> class Sender:
... def send_reply(self, id, reply): ... def send_reply(self, id, reply):
... print('reply', id, reply) ... print('reply', id, reply)
... def send_error(self, id, err):
... print('error', id, err)
>>> delay.set_sender(1, Sender()) >>> delay.set_sender(1, Sender())
>>> logger = logging.getLogger('ZEO') >>> logger = logging.getLogger('ZEO')
...@@ -87,13 +89,20 @@ will conflict. It will be blocked at the vote call. ...@@ -87,13 +89,20 @@ will conflict. It will be blocked at the vote call.
Now, when we abort the transaction for the first client. The second Now, when we abort the transaction for the first client. The second
client will be restarted. It will get a conflict error, that is client will be restarted. It will get a conflict error, that is
handled correctly: raised to the client:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS >>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
reply 1 None Error raised in delayed method
Traceback (most recent call last):
...
ZODB.POSException.ConflictError: ...
error 1 database conflict error ...
The transaction is aborted by the server:
>>> fs.tpc_transaction() is not None >>> fs.tpc_transaction() is None
True True
>>> zs2.connected >>> zs2.connected
True True
...@@ -116,7 +125,7 @@ And an initial client. ...@@ -116,7 +125,7 @@ And an initial client.
>>> zs1 = ZEO.tests.servertesting.client(server, 1) >>> zs1 = ZEO.tests.servertesting.client(server, 1)
>>> zs1.tpc_begin('0', '', '', {}) >>> zs1.tpc_begin('0', '', '', {})
>>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '0') >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, b'x', '0')
Intentionally break zs1: Intentionally break zs1:
...@@ -135,7 +144,7 @@ We can start another client and get the storage lock. ...@@ -135,7 +144,7 @@ We can start another client and get the storage lock.
>>> zs1 = ZEO.tests.servertesting.client(server, 1) >>> zs1 = ZEO.tests.servertesting.client(server, 1)
>>> zs1.tpc_begin('1', '', '', {}) >>> zs1.tpc_begin('1', '', '', {})
>>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, 'x', '1') >>> zs1.storea(ZODB.utils.p64(99), ZODB.utils.z64, b'x', '1')
>>> _ = zs1.vote('1') # doctest: +ELLIPSIS >>> _ = zs1.vote('1') # doctest: +ELLIPSIS
>>> zs1.tpc_finish('1').set_sender(0, zs1.connection) >>> zs1.tpc_finish('1').set_sender(0, zs1.connection)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment