Commit 46c36465 authored by Julien Muchembled's avatar Julien Muchembled

client: fix possible data corruption after conflict resolutions with replicas

This really fixes the bug described in
commit 40bac312,
which could probably be reverted. It only reduced the probability of failure.

What happened is that the second conflict on 'a' for t3 what first reported by
an answer to first store with:
- a base serial at which a=0
- a conflict serial at which a=7
However, the cached data is not 8 anymore but 12, since a second store already
occurred after the first conflict (reported by the other storage node).

When this conflict was resolved before receiving the conflict for second store,
it gave:

  resolve(old=0, saved=7, new=12) -> 19

instead of:

  resolve(old=4, saved=7, new=12) -> 15

(if we still had the data of the first store, we could also do
  resolve(old=0, saved=7, new=8)
 but that would be inefficient from a memory point of view)

The bug was difficult to reproduce. testNotifyReplicated had to be run many
many times before that race conditions trigger it. The test was changed to
enforce some of them, and the above scenario now happens almost always.
parent 3dabba3a
...@@ -442,8 +442,8 @@ class Application(ThreadedApplication): ...@@ -442,8 +442,8 @@ class Application(ThreadedApplication):
# Store object in tmp cache # Store object in tmp cache
packet = Packets.AskStoreObject(oid, serial, compression, packet = Packets.AskStoreObject(oid, serial, compression,
checksum, compressed_data, data_serial, ttid) checksum, compressed_data, data_serial, ttid)
txn_context.data_dict[oid] = data, txn_context.write( txn_context.data_dict[oid] = data, serial, txn_context.write(
self, packet, oid, oid=oid, serial=serial) self, packet, oid, oid=oid)
while txn_context.data_size >= self._cache._max_size: while txn_context.data_size >= self._cache._max_size:
self._waitAnyTransactionMessage(txn_context) self._waitAnyTransactionMessage(txn_context)
...@@ -460,13 +460,13 @@ class Application(ThreadedApplication): ...@@ -460,13 +460,13 @@ class Application(ThreadedApplication):
# This is also done atomically, to avoid race conditions # This is also done atomically, to avoid race conditions
# with PrimaryNotificationsHandler.notifyDeadlock # with PrimaryNotificationsHandler.notifyDeadlock
try: try:
oid, (serial, conflict_serial) = pop_conflict() oid, serial = pop_conflict()
except KeyError: except KeyError:
return return
try: try:
data = data_dict.pop(oid)[0] data, old_serial, _ = data_dict.pop(oid)
except KeyError: except KeyError:
assert oid is conflict_serial is None, (oid, conflict_serial) assert oid is None, (oid, serial)
# Storage refused us from taking object lock, to avoid a # Storage refused us from taking object lock, to avoid a
# possible deadlock. TID is actually used for some kind of # possible deadlock. TID is actually used for some kind of
# "locking priority": when a higher value has the lock, # "locking priority": when a higher value has the lock,
...@@ -485,33 +485,32 @@ class Application(ThreadedApplication): ...@@ -485,33 +485,32 @@ class Application(ThreadedApplication):
self._askStorageForWrite(txn_context, uuid, packet) self._askStorageForWrite(txn_context, uuid, packet)
else: else:
if data is CHECKED_SERIAL: if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid, serials=(conflict_serial, raise ReadConflictError(oid=oid,
serial)) serials=(serial, old_serial))
# TODO: data can be None if a conflict happens during undo # TODO: data can be None if a conflict happens during undo
if data: if data:
txn_context.data_size -= len(data) txn_context.data_size -= len(data)
if self.last_tid < conflict_serial: if self.last_tid < serial:
self.sync() # possible late invalidation (very rare) self.sync() # possible late invalidation (very rare)
try: try:
data = tryToResolveConflict(oid, conflict_serial, data = tryToResolveConflict(oid, serial, old_serial, data)
serial, data)
except ConflictError: except ConflictError:
logging.info( logging.info(
'Conflict resolution failed for %s@%s with %s', 'Conflict resolution failed for %s@%s with %s',
dump(oid), dump(serial), dump(conflict_serial)) dump(oid), dump(old_serial), dump(serial))
# With recent ZODB, get_pickle_metadata (from ZODB.utils) # With recent ZODB, get_pickle_metadata (from ZODB.utils)
# does not support empty values, so do not pass 'data' # does not support empty values, so do not pass 'data'
# in this case. # in this case.
raise ConflictError(oid=oid, serials=(conflict_serial, raise ConflictError(oid=oid, serials=(serial, old_serial),
serial), data=data or None) data=data or None)
else: else:
logging.info( logging.info(
'Conflict resolution succeeded for %s@%s with %s', 'Conflict resolution succeeded for %s@%s with %s',
dump(oid), dump(serial), dump(conflict_serial)) dump(oid), dump(old_serial), dump(serial))
# Mark this conflict as resolved # Mark this conflict as resolved
resolved_dict[oid] = conflict_serial resolved_dict[oid] = serial
# Try to store again # Try to store again
self._store(txn_context, oid, conflict_serial, data) self._store(txn_context, oid, serial, data)
def _askStorageForWrite(self, txn_context, uuid, packet): def _askStorageForWrite(self, txn_context, uuid, packet):
node = self.nm.getByUUID(uuid) node = self.nm.getByUUID(uuid)
...@@ -927,7 +926,7 @@ class Application(ThreadedApplication): ...@@ -927,7 +926,7 @@ class Application(ThreadedApplication):
assert oid not in txn_context.cache_dict, oid assert oid not in txn_context.cache_dict, oid
assert oid not in txn_context.data_dict, oid assert oid not in txn_context.data_dict, oid
packet = Packets.AskCheckCurrentSerial(ttid, oid, serial) packet = Packets.AskCheckCurrentSerial(ttid, oid, serial)
txn_context.data_dict[oid] = CHECKED_SERIAL, txn_context.write( txn_context.data_dict[oid] = CHECKED_SERIAL, serial, txn_context.write(
self, packet, oid, 0, oid=oid, serial=serial) self, packet, oid, 0, oid=oid)
self._waitAnyTransactionMessage(txn_context, False) self._waitAnyTransactionMessage(txn_context, False)
...@@ -154,7 +154,7 @@ class PrimaryNotificationsHandler(MTEventHandler): ...@@ -154,7 +154,7 @@ class PrimaryNotificationsHandler(MTEventHandler):
def notifyDeadlock(self, conn, ttid, locking_tid): def notifyDeadlock(self, conn, ttid, locking_tid):
for txn_context in self.app.txn_contexts(): for txn_context in self.app.txn_contexts():
if txn_context.ttid == ttid: if txn_context.ttid == ttid:
txn_context.conflict_dict[None] = locking_tid, None txn_context.conflict_dict[None] = locking_tid
txn_context.wakeup(conn) txn_context.wakeup(conn)
break break
......
...@@ -58,7 +58,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -58,7 +58,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
def answerObject(self, conn, oid, *args): def answerObject(self, conn, oid, *args):
self.app.setHandlerData(args) self.app.setHandlerData(args)
def answerStoreObject(self, conn, conflict, oid, serial): def answerStoreObject(self, conn, conflict, oid):
txn_context = self.app.getHandlerData() txn_context = self.app.getHandlerData()
if conflict: if conflict:
# Conflicts can not be resolved now because 'conn' is locked. # Conflicts can not be resolved now because 'conn' is locked.
...@@ -75,7 +75,7 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -75,7 +75,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
# If this conflict is not already resolved, mark it for # If this conflict is not already resolved, mark it for
# resolution. # resolution.
if txn_context.resolved_dict.get(oid, '') < conflict: if txn_context.resolved_dict.get(oid, '') < conflict:
txn_context.conflict_dict[oid] = serial, conflict txn_context.conflict_dict[oid] = conflict
else: else:
txn_context.written(self.app, conn.getUUID(), oid) txn_context.written(self.app, conn.getUUID(), oid)
...@@ -107,10 +107,10 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -107,10 +107,10 @@ class StorageAnswersHandler(AnswerBaseHandler):
except KeyError: except KeyError:
if resolved: if resolved:
# We should still be waiting for an answer from this node. # We should still be waiting for an answer from this node.
assert conn.uuid in txn_context.data_dict[oid][1] assert conn.uuid in txn_context.data_dict[oid][2]
return return
assert oid in txn_context.data_dict assert oid in txn_context.data_dict
if serial <= txn_context.conflict_dict.get(oid, ('',))[0]: if serial <= txn_context.conflict_dict.get(oid, ''):
# Another node already reported this conflict or a newer, # Another node already reported this conflict or a newer,
# by answering to this rebase or to the previous store. # by answering to this rebase or to the previous store.
return return
...@@ -136,8 +136,8 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -136,8 +136,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
if cached: if cached:
assert cached == data assert cached == data
txn_context.cache_size -= size txn_context.cache_size -= size
txn_context.data_dict[oid] = data, None txn_context.data_dict[oid] = data, serial, None
txn_context.conflict_dict[oid] = serial, conflict txn_context.conflict_dict[oid] = conflict
def answerStoreTransaction(self, conn): def answerStoreTransaction(self, conn):
pass pass
......
...@@ -40,11 +40,11 @@ class Transaction(object): ...@@ -40,11 +40,11 @@ class Transaction(object):
self.queue = SimpleQueue() self.queue = SimpleQueue()
self.txn = txn self.txn = txn
# data being stored # data being stored
self.data_dict = {} # {oid: (value, [node_id])} self.data_dict = {} # {oid: (value, serial, [node_id])}
# data stored: this will go to the cache on tpc_finish # data stored: this will go to the cache on tpc_finish
self.cache_dict = {} # {oid: value} self.cache_dict = {} # {oid: value}
# conflicts to resolve # conflicts to resolve
self.conflict_dict = {} # {oid: (base_serial, serial)} self.conflict_dict = {} # {oid: serial}
# resolved conflicts # resolved conflicts
self.resolved_dict = {} # {oid: serial} self.resolved_dict = {} # {oid: serial}
# Keys are node ids instead of Node objects because a node may # Keys are node ids instead of Node objects because a node may
...@@ -98,7 +98,7 @@ class Transaction(object): ...@@ -98,7 +98,7 @@ class Transaction(object):
# the data in self.data_dict until all nodes have answered so we remain # the data in self.data_dict until all nodes have answered so we remain
# able to resolve conflicts. # able to resolve conflicts.
try: try:
data, uuid_list = self.data_dict[oid] data, serial, uuid_list = self.data_dict[oid]
uuid_list.remove(uuid) uuid_list.remove(uuid)
except KeyError: except KeyError:
# 1. store to S1 and S2 # 1. store to S1 and S2
......
...@@ -320,6 +320,10 @@ class EventQueue(object): ...@@ -320,6 +320,10 @@ class EventQueue(object):
self._event_queue = [] self._event_queue = []
self._executing_event = -1 self._executing_event = -1
# Stable sort when 2 keys are equal.
# XXX: Is it really useful to keep events with same key ordered
# chronologically ? The caller could use more specific keys. For
# write-locks (by the storage node), the locking tid seems enough.
sortQueuedEvents = (lambda key=itemgetter(0): lambda self: sortQueuedEvents = (lambda key=itemgetter(0): lambda self:
self._event_queue.sort(key=key))() self._event_queue.sort(key=key))()
......
...@@ -28,6 +28,8 @@ import weakref ...@@ -28,6 +28,8 @@ import weakref
import MySQLdb import MySQLdb
import transaction import transaction
from cStringIO import StringIO
from cPickle import Unpickler
from functools import wraps from functools import wraps
from inspect import isclass from inspect import isclass
from .mock import Mock from .mock import Mock
...@@ -40,6 +42,7 @@ from unittest.case import _ExpectedFailure, _UnexpectedSuccess ...@@ -40,6 +42,7 @@ from unittest.case import _ExpectedFailure, _UnexpectedSuccess
try: try:
from transaction.interfaces import IDataManager from transaction.interfaces import IDataManager
from ZODB.utils import newTid from ZODB.utils import newTid
from ZODB.ConflictResolution import PersistentReferenceFactory
except ImportError: except ImportError:
pass pass
...@@ -492,5 +495,11 @@ class Patch(object): ...@@ -492,5 +495,11 @@ class Patch(object):
self.__del__() self.__del__()
def unpickle_state(data):
unpickler = Unpickler(StringIO(data))
unpickler.persistent_load = PersistentReferenceFactory().persistent_load
unpickler.load() # skip the class tuple
return unpickler.load()
__builtin__.pdb = lambda depth=0: \ __builtin__.pdb = lambda depth=0: \
debug.getPdb().set_trace(sys._getframe(depth+1)) debug.getPdb().set_trace(sys._getframe(depth+1))
...@@ -36,7 +36,7 @@ from neo.lib.handler import DelayEvent ...@@ -36,7 +36,7 @@ from neo.lib.handler import DelayEvent
from neo.lib import logging from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes, from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
Packets, Packet, uuid_str, ZERO_OID, ZERO_TID) Packets, Packet, uuid_str, ZERO_OID, ZERO_TID)
from .. import expectedFailure, Patch, TransactionalResource from .. import expectedFailure, unpickle_state, Patch, TransactionalResource
from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \ from . import ClientApplication, ConnectionFilter, LockLock, NEOThreadedTest, \
RandomConflictDict, ThreadId, with_cluster RandomConflictDict, ThreadId, with_cluster
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
...@@ -1481,11 +1481,11 @@ class Test(NEOThreadedTest): ...@@ -1481,11 +1481,11 @@ class Test(NEOThreadedTest):
reports a conflict after that this conflict was fully resolved with reports a conflict after that this conflict was fully resolved with
another node. another node.
""" """
def answerStoreObject(orig, conn, conflict, oid, serial): def answerStoreObject(orig, conn, conflict, oid):
if not conflict: if not conflict:
p.revert() p.revert()
ll() ll()
orig(conn, conflict, oid, serial) orig(conn, conflict, oid)
if 1: if 1:
s0, s1 = cluster.storage_list s0, s1 = cluster.storage_list
t1, c1 = cluster.getTransaction() t1, c1 = cluster.getTransaction()
...@@ -1975,6 +1975,35 @@ class Test(NEOThreadedTest): ...@@ -1975,6 +1975,35 @@ class Test(NEOThreadedTest):
@with_cluster(replicas=1, partitions=4) @with_cluster(replicas=1, partitions=4)
def testNotifyReplicated(self, cluster): def testNotifyReplicated(self, cluster):
"""
Check replication while several concurrent transactions leads to
conflict resolutions and deadlock avoidances, and in particular the
handling of write-locks when the storage node is about to notify the
master that partitions are replicated.
Transactions are committed in the following order:
- t2
- t4, conflict on 'd'
- t1, deadlock on 'a'
- t3, deadlock on 'b', and 2 conflicts on 'a'
Special care is also taken for the change done by t3 on 'a', to check
that the client resolves conflicts with correct oldSerial:
1. The initial store (a=8) is first delayed by t2.
2. It is then kept aside by the deadlock.
3. On s1, deadlock avoidance happens after t1 stores a=7 and the store
is delayed again. However, it's the contrary on s0, and a conflict
is reported to the client.
4. Second store (a=12) based on t2.
5. t1 finishes and s1 reports the conflict for first store (with t1).
At that point, the base serial of this store is meaningless:
the client only has data for last store (based on t2), and it's its
base serial that must be used. t3 write 15 (and not 19 !).
6. Conflicts for the second store are with t2 and they're ignored
because they're already resolved.
Note that this test method lacks code to enforce some events to happen
in the expected order. Sometimes, the above scenario is not reproduced
entirely, but it's so rare that there's no point in making the code
further complicated.
"""
s0, s1 = cluster.storage_list s0, s1 = cluster.storage_list
s1.stop() s1.stop()
cluster.join((s1,)) cluster.join((s1,))
...@@ -2020,14 +2049,33 @@ class Test(NEOThreadedTest): ...@@ -2020,14 +2049,33 @@ class Test(NEOThreadedTest):
yield 1 yield 1
self.tic() self.tic()
self.assertPartitionTable(cluster, 'UO|UU|UU|UU') self.assertPartitionTable(cluster, 'UO|UU|UU|UU')
def t4_vote(*args, **kw): def t4_d(*args, **kw):
self.tic() self.tic()
self.assertPartitionTable(cluster, 'UU|UU|UU|UU') self.assertPartitionTable(cluster, 'UU|UU|UU|UU')
yield 0 yield 2
# Delay the conflict for the second store of 'a' by t3.
delay_conflict = {s0.uuid: [1], s1.uuid: [1,0]}
def delayConflict(conn, packet):
app = conn.getHandler().app
if (isinstance(packet, Packets.AnswerStoreObject)
and packet.decode()[0]):
conn, = cluster.client.getConnectionList(app)
kw = conn._handlers._pending[0][0][packet._id][3]
return 1 == u64(kw['oid']) and delay_conflict[app.uuid].pop()
def writeA(orig, txn_context, oid, serial, data):
if u64(oid) == 1:
value = unpickle_state(data)['value']
if value > 12:
f.remove(delayConflict)
elif value == 12:
f.add(delayConflict)
return orig(txn_context, oid, serial, data)
###
with ConnectionFilter() as f, \ with ConnectionFilter() as f, \
Patch(cluster.client, _store=writeA), \
self.thread_switcher(threads, self.thread_switcher(threads,
(1, 2, 3, 0, 1, 0, 2, t3_c, 1, 3, 2, t3_resolve, 0, 0, 0, (1, 2, 3, 0, 1, 0, 2, t3_c, 1, 3, 2, t3_resolve, 0, 0, 0,
t1_rebase, 2, t3_b, 3, t4_vote), t1_rebase, 2, t3_b, 3, t4_d, 0, 2, 2),
('tpc_begin', 'tpc_begin', 'tpc_begin', 'tpc_begin', 2, 1, 1, ('tpc_begin', 'tpc_begin', 'tpc_begin', 'tpc_begin', 2, 1, 1,
3, 3, 4, 4, 3, 1, 'RebaseTransaction', 'RebaseTransaction', 3, 3, 4, 4, 3, 1, 'RebaseTransaction', 'RebaseTransaction',
'AnswerRebaseTransaction', 'AnswerRebaseTransaction', 2 'AnswerRebaseTransaction', 'AnswerRebaseTransaction', 2
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment