Commit 4f8ecce8 authored by Vincent Pelletier's avatar Vincent Pelletier

Fix conflict resolution race-condition.

Fixes the following scenario (committing an object):
2 storage nodes: S1 and S2
1 replicas
- store in S1 (1)
- store in S2 (2)
- S1 answers (1) with a conflict
- client resolves the conflict, and sends to S1 (3) and S2 (4)
- S1 answers (3) with ok
- S2 answers (2) with a conflict

Old code raises because object store counter was not 0 on an object for
which a conflict was reported.
New code ignores answers notifying of a conflict if given serial is the
already-resolved conflict.
Split & extend existing tests.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2015 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 1cb4155d
...@@ -103,6 +103,7 @@ class ThreadContext(object): ...@@ -103,6 +103,7 @@ class ThreadContext(object):
'object_serial_dict': {}, 'object_serial_dict': {},
'object_stored_counter_dict': {}, 'object_stored_counter_dict': {},
'conflict_serial_dict': {}, 'conflict_serial_dict': {},
'resolved_conflict_serial_dict': {},
'object_stored': 0, 'object_stored': 0,
'txn_voted': False, 'txn_voted': False,
'txn_finished': False, 'txn_finished': False,
...@@ -606,8 +607,9 @@ class Application(object): ...@@ -606,8 +607,9 @@ class Application(object):
new_data = tryToResolveConflict(oid, conflict_serial, serial, new_data = tryToResolveConflict(oid, conflict_serial, serial,
data) data)
if new_data is not None: if new_data is not None:
# Forget this conflict # Mark this conflict as resolved
del local_var.conflict_serial_dict[oid] local_var.resolved_conflict_serial_dict = \
local_var.conflict_serial_dict.pop(oid)
# Try to store again # Try to store again
self.store(oid, conflict_serial, new_data, version, self.store(oid, conflict_serial, new_data, version,
local_var.txn) local_var.txn)
......
...@@ -71,13 +71,24 @@ class StorageAnswersHandler(AnswerBaseHandler): ...@@ -71,13 +71,24 @@ class StorageAnswersHandler(AnswerBaseHandler):
local_var = self.app.local_var local_var = self.app.local_var
object_stored_counter_dict = local_var.object_stored_counter_dict object_stored_counter_dict = local_var.object_stored_counter_dict
if conflicting: if conflicting:
assert object_stored_counter_dict[oid] == 0, \ conflict_serial_dict = local_var.conflict_serial_dict
object_stored_counter_dict[oid] pending_serial = conflict_serial_dict.get(oid)
previous_conflict_serial = local_var.conflict_serial_dict.get(oid, resolved_serial = local_var.resolved_conflict_serial_dict.get(oid)
None) if pending_serial not in (None, serial) or \
assert previous_conflict_serial in (None, serial), \ resolved_serial not in (None, serial):
(previous_conflict_serial, serial) raise NEOStorageError, 'Multiple conflicts for a single ' \
local_var.conflict_serial_dict[oid] = serial 'object in a single store: %r, %r, %r' % (pending_serial,
resolved_serial, serial)
# If this conflict is not already resolved, mark it for
# resolution.
if resolved_serial is None:
if object_stored_counter_dict[oid]:
raise NEOStorageError, 'Storage node(s) accepted ' \
'object, but one (%s) reports a conflict.' % (
dump(conn.getUUID()), )
# Note: we might overwrite an entry, but above test protects
# against overwriting a different value.
conflict_serial_dict[oid] = serial
else: else:
object_stored_counter_dict[oid] += 1 object_stored_counter_dict[oid] += 1
......
...@@ -87,7 +87,7 @@ class StorageAnswerHandlerTests(NeoTestBase): ...@@ -87,7 +87,7 @@ class StorageAnswerHandlerTests(NeoTestBase):
self.assertRaises(NEOStorageError, self.handler.answerObject, conn, self.assertRaises(NEOStorageError, self.handler.answerObject, conn,
*the_object) *the_object)
def test_answerStoreObject(self): def test_answerStoreObject_1(self):
conn = self.getConnection() conn = self.getConnection()
oid = self.getOID(0) oid = self.getOID(0)
tid = self.getNextTID() tid = self.getNextTID()
...@@ -95,14 +95,81 @@ class StorageAnswerHandlerTests(NeoTestBase): ...@@ -95,14 +95,81 @@ class StorageAnswerHandlerTests(NeoTestBase):
local_var = self.app.local_var local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: 0} local_var.object_stored_counter_dict = {oid: 0}
local_var.conflict_serial_dict = {} local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 1, oid, tid) self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertEqual(local_var.conflict_serial_dict[oid], tid) self.assertEqual(local_var.conflict_serial_dict[oid], tid)
self.assertEqual(local_var.object_stored_counter_dict[oid], 0) self.assertEqual(local_var.object_stored_counter_dict[oid], 0)
self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
# object was already accepted by another storage, raise
local_var.object_stored_counter_dict = {oid: 1}
local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
conn, 1, oid, tid)
def test_answerStoreObject_2(self):
conn = self.getConnection()
oid = self.getOID(0)
tid = self.getNextTID()
tid_2 = self.getNextTID()
# resolution-pending conflict
local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: 0}
local_var.conflict_serial_dict = {oid: tid}
local_var.resolved_conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertEqual(local_var.conflict_serial_dict[oid], tid)
self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
self.assertEqual(local_var.object_stored_counter_dict[oid], 0)
# object was already accepted by another storage, raise
local_var.object_stored_counter_dict = {oid: 1}
local_var.conflict_serial_dict = {oid: tid}
local_var.resolved_conflict_serial_dict = {}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
conn, 1, oid, tid)
# detected conflict is different, raise
local_var.object_stored_counter_dict = {oid: 0}
local_var.conflict_serial_dict = {oid: tid}
local_var.resolved_conflict_serial_dict = {}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
conn, 1, oid, tid_2)
def test_answerStoreObject_3(self):
conn = self.getConnection()
oid = self.getOID(0)
tid = self.getNextTID()
tid_2 = self.getNextTID()
# already-resolved conflict
# This case happens if a storage is answering a store action for which
# any other storage already answered (with same conflict) and any other
# storage accepted the resolved object.
local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: 1}
local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {oid: tid}
self.handler.answerStoreObject(conn, 1, oid, tid)
self.assertFalse(oid in local_var.conflict_serial_dict)
self.assertEqual(local_var.resolved_conflict_serial_dict[oid], tid)
self.assertEqual(local_var.object_stored_counter_dict[oid], 1)
# detected conflict is different, raise
local_var.object_stored_counter_dict = {oid: 1}
local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {oid: tid}
self.assertRaises(NEOStorageError, self.handler.answerStoreObject,
conn, 1, oid, tid_2)
def test_answerStoreObject_4(self):
conn = self.getConnection()
oid = self.getOID(0)
tid = self.getNextTID()
# no conflict # no conflict
local_var = self.app.local_var
local_var.object_stored_counter_dict = {oid: 0} local_var.object_stored_counter_dict = {oid: 0}
local_var.conflict_serial_dict = {} local_var.conflict_serial_dict = {}
local_var.resolved_conflict_serial_dict = {}
self.handler.answerStoreObject(conn, 0, oid, tid) self.handler.answerStoreObject(conn, 0, oid, tid)
self.assertFalse(oid in local_var.conflict_serial_dict) self.assertFalse(oid in local_var.conflict_serial_dict)
self.assertFalse(oid in local_var.resolved_conflict_serial_dict)
self.assertEqual(local_var.object_stored_counter_dict[oid], 1) self.assertEqual(local_var.object_stored_counter_dict[oid], 1)
def test_answerStoreTransaction(self): def test_answerStoreTransaction(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment