Commit d3c22487 authored by Julien Muchembled's avatar Julien Muchembled

storage: fix possible crash when delaying replication requests

Traceback (most recent call last):
  [...]
  File "neo/storage/handlers/client.py", line 115, in askStoreObject
    *e.args)
  File "neo/lib/handler.py", line 333, in queueEvent
    self.sortQueuedEvents()
  File "neo/lib/handler.py", line 326, in <lambda>
    self._event_queue.sort(key=key))()
  File "neo/storage/transactions.py", line 67, in __lt__
    return self.locking_tid < other.locking_tid
AttributeError: 'NoneType' object has no attribute 'locking_tid'

Pending events:
  (None, <askFetchTransactions: ...>)
  (<Transaction(C13, locking_tid=03c266508a058388, tid=None, age=0.21s) at 0x7f086bbc3d50>, <_askStoreObject: ...>)
parent 49631a9f
...@@ -314,7 +314,8 @@ class _DelayedConnectionEvent(EventHandler): ...@@ -314,7 +314,8 @@ class _DelayedConnectionEvent(EventHandler):
class EventQueue(object): class EventQueue(object):
def __init__(self): def __init__(self, new_key=lambda prev: None):
self._new_key = new_key
self._event_queue = [] self._event_queue = []
self._executing_event = -1 self._executing_event = -1
...@@ -327,10 +328,12 @@ class EventQueue(object): ...@@ -327,10 +328,12 @@ class EventQueue(object):
def queueEvent(self, func, conn=None, args=(), key=None): def queueEvent(self, func, conn=None, args=(), key=None):
assert self._executing_event < 0, self._executing_event assert self._executing_event < 0, self._executing_event
self._event_queue.append((key, func if conn is None else self._event_queue.append((
_DelayedConnectionEvent(func, conn, args))) self._new_key(self._event_queue[-1][0] if self._event_queue else
if key is not None: None) if key is None else key,
self.sortQueuedEvents() func if conn is None else _DelayedConnectionEvent(
func, conn, args)))
self.sortQueuedEvents()
def sortAndExecuteQueuedEvents(self): def sortAndExecuteQueuedEvents(self):
if self._executing_event < 0: if self._executing_event < 0:
...@@ -369,8 +372,7 @@ class EventQueue(object): ...@@ -369,8 +372,7 @@ class EventQueue(object):
self._executing_event = 0 self._executing_event = 0
# What sortAndExecuteQueuedEvents could not do immediately # What sortAndExecuteQueuedEvents could not do immediately
# is done here: # is done here:
if event[0] is not None: self.sortQueuedEvents()
self.sortQueuedEvents()
self._executing_event = -1 self._executing_event = -1
def logQueuedEvents(self): def logQueuedEvents(self):
......
...@@ -37,7 +37,21 @@ class NotRegisteredError(Exception): ...@@ -37,7 +37,21 @@ class NotRegisteredError(Exception):
Raised when a ttid is not registered Raised when a ttid is not registered
""" """
class Transaction(object): class TransactionManagerEventKey(object):
def __init__(self, tid):
self.locking_tid = tid
def __lt__(self, other):
return self.locking_tid < other.locking_tid
def __repr__(self):
return "<%s(%s) at 0x%x>" % (
self.__class__.__name__,
dump(self.locking_tid),
id(self))
class Transaction(TransactionManagerEventKey):
""" """
Container for a pending transaction Container for a pending transaction
""" """
...@@ -47,7 +61,7 @@ class Transaction(object): ...@@ -47,7 +61,7 @@ class Transaction(object):
def __init__(self, uuid, ttid): def __init__(self, uuid, ttid):
self._birth = time() self._birth = time()
self.locking_tid = ttid TransactionManagerEventKey.__init__(self, ttid)
self.uuid = uuid self.uuid = uuid
self.serial_dict = {} self.serial_dict = {}
self.store_dict = {} self.store_dict = {}
...@@ -63,9 +77,6 @@ class Transaction(object): ...@@ -63,9 +77,6 @@ class Transaction(object):
time() - self._birth, time() - self._birth,
id(self)) id(self))
def __lt__(self, other):
return self.locking_tid < other.locking_tid
def logDelay(self, ttid, locked, oid_serial): def logDelay(self, ttid, locked, oid_serial):
if self._delayed.get(oid_serial) != locked: if self._delayed.get(oid_serial) != locked:
if self._delayed: if self._delayed:
...@@ -88,7 +99,8 @@ class TransactionManager(EventQueue): ...@@ -88,7 +99,8 @@ class TransactionManager(EventQueue):
""" """
def __init__(self, app): def __init__(self, app):
EventQueue.__init__(self) EventQueue.__init__(self, lambda prev: TransactionManagerEventKey(
"" if prev is None else prev.locking_tid))
self.read_queue = EventQueue() self.read_queue = EventQueue()
self._app = app self._app = app
self._transaction_dict = {} self._transaction_dict = {}
......
...@@ -256,11 +256,26 @@ class ReplicationTests(NEOThreadedTest): ...@@ -256,11 +256,26 @@ class ReplicationTests(NEOThreadedTest):
the backup cluster reacts very quickly to a new transaction. the backup cluster reacts very quickly to a new transaction.
""" """
upstream = backup.upstream upstream = backup.upstream
t1, c1 = upstream.getTransaction()
ob = c1.root()[''] = PCounterWithResolution()
t1.commit()
ob.value += 2
t2, c2 = upstream.getTransaction()
c2.root()[''].value += 3
self.tic()
with upstream.master.filterConnection(upstream.storage) as f: with upstream.master.filterConnection(upstream.storage) as f:
f.delayNotifyUnlockInformation() delay = f.delayNotifyUnlockInformation()
upstream.importZODB()(1) t1.commit()
self.tic() self.tic()
def storeObject(orig, *args, **kw):
p.revert()
f.remove(delay)
return orig(*args, **kw)
with Patch(upstream.storage.tm, storeObject=storeObject) as p:
t2.commit()
self.tic() self.tic()
t1.begin()
self.assertEqual(5, ob.value)
self.assertEqual(1, self.checkBackup(backup)) self.assertEqual(1, self.checkBackup(backup))
@with_cluster() @with_cluster()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment