Commit 20791999 authored by Julien Muchembled's avatar Julien Muchembled

client: new ignore-wrong-checksum option

parent 3531ee9e
...@@ -36,7 +36,8 @@ from neo.lib.connection import MTClientConnection, ConnectionClosed ...@@ -36,7 +36,8 @@ from neo.lib.connection import MTClientConnection, ConnectionClosed
from neo.lib.exception import NodeNotReady from neo.lib.exception import NodeNotReady
from . import TransactionMetaData from . import TransactionMetaData
from .exception import (NEOStorageError, NEOStorageCreationUndoneError, from .exception import (NEOStorageError, NEOStorageCreationUndoneError,
NEOStorageReadRetry, NEOStorageNotFoundError, NEOPrimaryMasterLost) NEOStorageReadRetry, NEOStorageNotFoundError, NEOStorageWrongChecksum,
NEOPrimaryMasterLost)
from .handlers import storage, master from .handlers import storage, master
from neo.lib.threaded_app import ThreadedApplication from neo.lib.threaded_app import ThreadedApplication
from .cache import ClientCache from .cache import ClientCache
...@@ -70,7 +71,7 @@ class Application(ThreadedApplication): ...@@ -70,7 +71,7 @@ class Application(ThreadedApplication):
wait_for_pack = False wait_for_pack = False
def __init__(self, master_nodes, name, compress=True, cache_size=None, def __init__(self, master_nodes, name, compress=True, cache_size=None,
**kw): ignore_wrong_checksum=False, **kw):
super(Application, self).__init__(parseMasterList(master_nodes), super(Application, self).__init__(parseMasterList(master_nodes),
name, **kw) name, **kw)
# Internal Attributes common to all thread # Internal Attributes common to all thread
...@@ -106,6 +107,7 @@ class Application(ThreadedApplication): ...@@ -106,6 +107,7 @@ class Application(ThreadedApplication):
self._connecting_to_storage_node = Lock() self._connecting_to_storage_node = Lock()
self._node_failure_dict = {} self._node_failure_dict = {}
self.compress = getCompress(compress) self.compress = getCompress(compress)
self.ignore_wrong_checksum = ignore_wrong_checksum
def __getattr__(self, attr): def __getattr__(self, attr):
if attr in ('last_tid', 'pt'): if attr in ('last_tid', 'pt'):
...@@ -459,6 +461,7 @@ class Application(ThreadedApplication): ...@@ -459,6 +461,7 @@ class Application(ThreadedApplication):
return data, tid, next_tid return data, tid, next_tid
def _loadFromStorage(self, oid, at_tid, before_tid): def _loadFromStorage(self, oid, at_tid, before_tid):
wrong_checksum = [] # Py3
def askStorage(conn, packet): def askStorage(conn, packet):
tid, next_tid, compression, checksum, data, data_tid \ tid, next_tid, compression, checksum, data, data_tid \
= self._askStorage(conn, packet) = self._askStorage(conn, packet)
...@@ -466,13 +469,28 @@ class Application(ThreadedApplication): ...@@ -466,13 +469,28 @@ class Application(ThreadedApplication):
if checksum != makeChecksum(data): if checksum != makeChecksum(data):
logging.error('wrong checksum from %s for %s@%s', logging.error('wrong checksum from %s for %s@%s',
conn, dump(oid), dump(tid)) conn, dump(oid), dump(tid))
wrong_checksum.append((tid, next_tid, compression,
checksum, data, data_tid))
raise NEOStorageReadRetry(False) raise NEOStorageReadRetry(False)
return (decompress_list[compression](data), return (decompress_list[compression](data),
tid, next_tid, data_tid) tid, next_tid, data_tid)
raise NEOStorageCreationUndoneError(dump(oid)) raise NEOStorageCreationUndoneError(dump(oid))
return self._askStorageForRead(oid, try:
Packets.AskObject(oid, at_tid, before_tid), return self._askStorageForRead(oid,
askStorage) Packets.AskObject(oid, at_tid, before_tid),
askStorage)
except NEOStorageError:
if not wrong_checksum:
raise
tid, next_tid, compression, checksum, data, data_tid = \
wrong_checksum[0]
if self.ignore_wrong_checksum:
try:
data = decompress_list[compression](data)
except Exception:
data = ''
return data, tid, next_tid, data_tid
raise NEOStorageWrongChecksum(oid, tid)
def tpc_begin(self, storage, transaction, tid=None, status=' '): def tpc_begin(self, storage, transaction, tid=None, status=' '):
"""Begin a new transaction.""" """Begin a new transaction."""
......
...@@ -51,6 +51,13 @@ ...@@ -51,6 +51,13 @@
be added/removed without requiring a config change each time. be added/removed without requiring a config change each time.
</description> </description>
</key> </key>
<key name="ignore-wrong-checksum" datatype="boolean">
<description>
If true whereas checksum does not match, return whatever is stored
instead of raising. When compression is enabled, decompression is
likely to fail and an empty record is returned.
</description>
</key>
<key name="ca" datatype="existing-file"> <key name="ca" datatype="existing-file">
<description> <description>
Certificate authority in PEM format. Certificate authority in PEM format.
......
...@@ -25,6 +25,9 @@ class NEOStorageReadRetry(NEOStorageError): ...@@ -25,6 +25,9 @@ class NEOStorageReadRetry(NEOStorageError):
class NEOStorageNotFoundError(NEOStorageError): class NEOStorageNotFoundError(NEOStorageError):
pass pass
class NEOStorageWrongChecksum(NEOStorageError):
pass
class NEOStorageDoesNotExistError(NEOStorageNotFoundError): class NEOStorageDoesNotExistError(NEOStorageNotFoundError):
""" """
This error is a refinement of NEOStorageNotFoundError: this means This error is a refinement of NEOStorageNotFoundError: this means
......
...@@ -125,8 +125,8 @@ class ClientOperationHandler(BaseHandler): ...@@ -125,8 +125,8 @@ class ClientOperationHandler(BaseHandler):
# register the transaction # register the transaction
self.app.tm.register(conn, ttid) self.app.tm.register(conn, ttid)
if data or checksum != ZERO_HASH: if data or checksum != ZERO_HASH:
# TODO: return an appropriate error packet if makeChecksum(data) != checksum:
assert makeChecksum(data) == checksum raise ProtocolError('invalid checksum')
else: else:
checksum = data = None checksum = data = None
try: try:
......
...@@ -949,13 +949,14 @@ class NEOCluster(object): ...@@ -949,13 +949,14 @@ class NEOCluster(object):
for node in getattr(self, node_type + '_list'): for node in getattr(self, node_type + '_list'):
node.resetNode(**reset_kw) node.resetNode(**reset_kw)
def _newClient(self): def _newClient(self, **kw):
kw.setdefault('compress', self.compress)
return ClientApplication(name=self.name, master_nodes=self.master_nodes, return ClientApplication(name=self.name, master_nodes=self.master_nodes,
compress=self.compress, ssl=self.SSL) ssl=self.SSL, **kw)
@contextmanager @contextmanager
def newClient(self, with_db=False): def newClient(self, with_db=False, **kw):
x = self._newClient() x = self._newClient(**kw)
try: try:
t = x.poll_thread t = x.poll_thread
closed = [] closed = []
......
...@@ -40,6 +40,7 @@ from .. import Patch, TransactionalResource, getTransactionMetaData ...@@ -40,6 +40,7 @@ from .. import Patch, TransactionalResource, getTransactionMetaData
from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOCluster, \
NEOThreadedTest, RandomConflictDict, Serialized, ThreadId, with_cluster NEOThreadedTest, RandomConflictDict, Serialized, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client import exception
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.handlers.storage import _DeadlockPacket from neo.client.handlers.storage import _DeadlockPacket
from neo.client.transactions import Transaction from neo.client.transactions import Transaction
...@@ -2881,6 +2882,31 @@ class Test(NEOThreadedTest): ...@@ -2881,6 +2882,31 @@ class Test(NEOThreadedTest):
storage.tpc_vote(txn) storage.tpc_vote(txn)
self.assertEqual(add64(tid, 1), storage.tpc_finish(txn)) self.assertEqual(add64(tid, 1), storage.tpc_finish(txn))
@with_cluster()
def testCorruptedData(self, cluster):
def holdData(orig, *args):
args = list(args)
args[2] = '!' + args[2]
return orig(*args)
data = 'foo' * 10
tid = None
for compress in False, True:
with cluster.newClient(ignore_wrong_checksum=True,
compress=compress) as client, \
Patch(cluster.storage.dm, holdData=holdData):
storage = cluster.getZODBStorage(client=client)
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.store(ZERO_OID, tid, data, '', txn)
storage.tpc_vote(txn)
tid = storage.tpc_finish(txn)
storage._cache.clear()
self.assertEqual(('' if compress else '!' + data, tid),
storage.load(ZERO_OID))
with self.assertRaises(exception.NEOStorageWrongChecksum) as cm:
cluster.client.load(ZERO_OID)
self.assertEqual(tid, cm.exception.args[1])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment