Commit 4e1657c9 authored by Julien Muchembled's avatar Julien Muchembled

More tests for processing of invalidations including one showing a critical bug

parent b2ce833d
...@@ -27,7 +27,7 @@ from neo.storage.transactions import TransactionManager, \ ...@@ -27,7 +27,7 @@ from neo.storage.transactions import TransactionManager, \
from neo.lib.connection import MTClientConnection from neo.lib.connection import MTClientConnection
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_TID ZERO_TID
from . import NEOCluster, NEOThreadedTest, Patch from . import ClientApplication, NEOCluster, NEOThreadedTest, Patch
from neo.lib.util import makeChecksum from neo.lib.util import makeChecksum
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
...@@ -503,6 +503,87 @@ class Test(NEOThreadedTest): ...@@ -503,6 +503,87 @@ class Test(NEOThreadedTest):
self.assertEqual(s.dm.getPTID(), 1) self.assertEqual(s.dm.getPTID(), 1)
self.assertEqual(list(s.dm.getPartitionTable()), pt) self.assertEqual(list(s.dm.getPartitionTable()), pt)
def testInternalInvalidation(self):
l1 = threading.Lock(); l1.acquire()
l2 = threading.Lock(); l2.acquire()
def _handlePacket(orig, conn, packet, kw={}, handler=None):
if type(packet) is Packets.AnswerTransactionFinished:
l1.release()
l2.acquire()
orig(conn, packet, kw, handler)
cluster = NEOCluster()
try:
cluster.start()
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounter()
t1.commit()
t1.begin()
x.value = 1
t2, c2 = cluster.getTransaction()
x = c2.root()['x']
p = Patch(cluster.client, _handlePacket=_handlePacket)
try:
t = self.newThread(t1.commit)
l1.acquire()
t2.abort()
l2.release()
t.join()
finally:
del p
self.assertEqual(x.value, 1)
finally:
cluster.stop()
def testExternalInvalidation(self):
cluster = NEOCluster()
try:
cluster.start()
# Initialize objects
t1, c1 = cluster.getTransaction()
c1.root()['x'] = x = PCounter()
c1.root()['y'] = y = PCounter()
y.value = 1
t1.commit()
# Get pickle of y
t1.begin()
y = c1._storage.load(y._p_oid)[0]
# Start the testing transaction
# (at this time, we still have x=0 and y=1)
t2, c2 = cluster.getTransaction()
# Copy y to x using a different Master-Client connection
cluster.client.setPoll(0)
client = ClientApplication(name=cluster.name,
master_nodes=cluster.master_nodes)
client.setPoll(1)
txn = transaction.Transaction()
client.tpc_begin(txn)
client.store(x._p_oid, x._p_serial, y, '', txn)
# Delay invalidation for x
master_client = cluster.master.filterConnection(cluster.client)
try:
master_client.add(lambda conn, packet:
isinstance(packet, Packets.InvalidateObjects))
client.tpc_finish(txn, None)
client.close()
client.setPoll(0)
cluster.client.setPoll(1)
# Change to x is committed. Testing connection must ask the
# storage node to return original value of x, even if we
# haven't processed yet any invalidation for x.
x = c2.root()['x']
cluster.client._cache.clear() # bypass cache
self.assertEqual(x.value, 0)
finally:
master_client()
x._p_deactivate()
t1.abort() # process invalidation and sync connection storage
self.assertEqual(x.value, 0)
# New testing transaction. Now we can see the last value of x.
t2.abort()
self.assertEqual(x.value, 1)
finally:
cluster.stop()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment