Commit fa14157b authored by Julien Muchembled's avatar Julien Muchembled

storage: fix storage leak when an oid is stored several times within a transaction

parent 50e7fe52
...@@ -160,7 +160,7 @@ class TransactionManager(EventQueue): ...@@ -160,7 +160,7 @@ class TransactionManager(EventQueue):
for txn, ttid in sorted((txn, ttid) for ttid, txn in for txn, ttid in sorted((txn, ttid) for ttid, txn in
self._transaction_dict.iteritems()): self._transaction_dict.iteritems()):
assert txn.lockless.issubset(txn.serial_dict), ( assert txn.lockless.issubset(txn.serial_dict), (
txn.lockless, txn.serial_dict) ttid, txn.lockless, txn.serial_dict)
for oid in txn.lockless: for oid in txn.lockless:
partition = getPartition(oid) partition = getPartition(oid)
if replicated.get(partition): if replicated.get(partition):
...@@ -471,6 +471,7 @@ class TransactionManager(EventQueue): ...@@ -471,6 +471,7 @@ class TransactionManager(EventQueue):
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
assert oid not in transaction.serial_dict
locked = self.lockObject(ttid, serial, oid) locked = self.lockObject(ttid, serial, oid)
transaction.serial_dict[oid] = serial transaction.serial_dict[oid] = serial
if not locked: if not locked:
...@@ -486,6 +487,12 @@ class TransactionManager(EventQueue): ...@@ -486,6 +487,12 @@ class TransactionManager(EventQueue):
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
locked = self.lockObject(ttid, serial, oid) locked = self.lockObject(ttid, serial, oid)
if oid in transaction.serial_dict: # initially/still lockless, or undo
# XXX: We'd like to do that before calling lockObject,
# to release resources immediately (data, maybe lock)
# in case of delay/conflict.
# But keeping everything consistent is complicated.
self._unstore(transaction, oid)
transaction.serial_dict[oid] = serial transaction.serial_dict[oid] = serial
# store object # store object
if data is None: if data is None:
...@@ -519,6 +526,9 @@ class TransactionManager(EventQueue): ...@@ -519,6 +526,9 @@ class TransactionManager(EventQueue):
except ConflictError, e: except ConflictError, e:
# Move the data back to the client for conflict resolution, # Move the data back to the client for conflict resolution,
# since the client may not have it anymore. # since the client may not have it anymore.
return serial, e.tid, self._unstore(transaction, oid)
def _unstore(self, transaction, oid):
try: try:
data_id = transaction.store_dict.pop(oid)[1] data_id = transaction.store_dict.pop(oid)[1]
except KeyError: # check current except KeyError: # check current
...@@ -531,7 +541,7 @@ class TransactionManager(EventQueue): ...@@ -531,7 +541,7 @@ class TransactionManager(EventQueue):
data = dm.loadData(data_id) data = dm.loadData(data_id)
dm.releaseData([data_id], True) dm.releaseData([data_id], True)
del transaction.serial_dict[oid] del transaction.serial_dict[oid]
return serial, e.tid, data return data
def abort(self, ttid, even_if_locked=False): def abort(self, ttid, even_if_locked=False):
""" """
......
...@@ -162,6 +162,7 @@ class Test(NEOThreadedTest): ...@@ -162,6 +162,7 @@ class Test(NEOThreadedTest):
undo.tpc_finish(txn) undo.tpc_finish(txn)
t.begin() t.begin()
self.assertEqual(ob.value, 5) self.assertEqual(ob.value, 5)
self.assertFalse(cluster.storage.dm.getOrphanList())
return ob return ob
@with_cluster() @with_cluster()
...@@ -2165,6 +2166,7 @@ class Test(NEOThreadedTest): ...@@ -2165,6 +2166,7 @@ class Test(NEOThreadedTest):
1: ['StoreTransaction'], 1: ['StoreTransaction'],
3: [4, 'StoreTransaction'], 3: [4, 'StoreTransaction'],
}) })
self.assertFalse(s1.dm.getOrphanList())
@with_cluster(replicas=1) @with_cluster(replicas=1)
def testNotifyReplicated2(self, cluster): def testNotifyReplicated2(self, cluster):
...@@ -2361,6 +2363,59 @@ class Test(NEOThreadedTest): ...@@ -2361,6 +2363,59 @@ class Test(NEOThreadedTest):
self.assertFalse(end) self.assertFalse(end)
self.assertPartitionTable(cluster, 'UU|UU') self.assertPartitionTable(cluster, 'UU|UU')
@with_cluster(partitions=2, storage_count=2)
def testUnstore(self, cluster):
"""
Show that when resolving a conflict after a lockless write, the storage
can't easily discard the data of the previous store, as it would make
internal data inconsistent. This is currently protected by a assertion
when trying to notifying the master that the replication is finished.
"""
t1, c1 = cluster.getTransaction()
r = c1.root()
for x in 'ab':
r[x] = PCounterWithResolution()
t1.commit()
cluster.stop(replicas=1)
cluster.start()
s0, s1 = cluster.sortStorageList()
t1, c1 = cluster.getTransaction()
r = c1.root()
r._p_changed = 1
r['b'].value += 1
with ConnectionFilter() as f:
delayReplication = f.delayAnswerFetchObjects()
cluster.neoctl.tweakPartitionTable()
self.tic()
t2, c2 = cluster.getTransaction()
x = c2.root()['b']
x.value += 2
t2.commit()
delayStore = f.delayAnswerStoreObject()
delayFinish = f.delayAskFinishTransaction()
x.value += 3
commit2 = self.newPausedThread(t2.commit)
def t2_b(*args, **kw):
yield 1
self.tic()
f.remove(delayReplication)
f.remove(delayStore)
self.tic()
def t1_resolve(*args, **kw):
yield 0
self.tic()
f.remove(delayFinish)
with self.thread_switcher((commit2,),
(1, 0, 0, 1, t2_b, 0, t1_resolve),
('tpc_begin', 'tpc_begin', 0, 2, 2, 'StoreTransaction')) as end:
t1.commit()
commit2.join()
t1.begin()
self.assertEqual(c1.root()['b'].value, 6)
self.assertPartitionTable(cluster, 'UU|UU')
self.assertEqual(end, {0: [2, 2, 'StoreTransaction']})
self.assertFalse(s1.dm.getOrphanList())
@with_cluster(storage_count=2, partitions=2) @with_cluster(storage_count=2, partitions=2)
def testDeadlockAvoidanceBeforeInvolvingAnotherNode(self, cluster): def testDeadlockAvoidanceBeforeInvolvingAnotherNode(self, cluster):
t1, c1 = cluster.getTransaction() t1, c1 = cluster.getTransaction()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment