Commit 65bc5c11 authored by Julien Muchembled's avatar Julien Muchembled

fixup! storage: fix possible crash when delaying replication requests

This reverts commit d3c22487 partially
and fixes the bug in a much simpler way.
parents 7e186442 49631a9f
...@@ -314,8 +314,7 @@ class _DelayedConnectionEvent(EventHandler): ...@@ -314,8 +314,7 @@ class _DelayedConnectionEvent(EventHandler):
class EventQueue(object): class EventQueue(object):
def __init__(self, new_key=lambda prev: None): def __init__(self):
self._new_key = new_key
self._event_queue = [] self._event_queue = []
self._executing_event = -1 self._executing_event = -1
...@@ -328,12 +327,10 @@ class EventQueue(object): ...@@ -328,12 +327,10 @@ class EventQueue(object):
def queueEvent(self, func, conn=None, args=(), key=None): def queueEvent(self, func, conn=None, args=(), key=None):
assert self._executing_event < 0, self._executing_event assert self._executing_event < 0, self._executing_event
self._event_queue.append(( self._event_queue.append((key, func if conn is None else
self._new_key(self._event_queue[-1][0] if self._event_queue else _DelayedConnectionEvent(func, conn, args)))
None) if key is None else key, if key is not None:
func if conn is None else _DelayedConnectionEvent( self.sortQueuedEvents()
func, conn, args)))
self.sortQueuedEvents()
def sortAndExecuteQueuedEvents(self): def sortAndExecuteQueuedEvents(self):
if self._executing_event < 0: if self._executing_event < 0:
...@@ -372,7 +369,8 @@ class EventQueue(object): ...@@ -372,7 +369,8 @@ class EventQueue(object):
self._executing_event = 0 self._executing_event = 0
# What sortAndExecuteQueuedEvents could not do immediately # What sortAndExecuteQueuedEvents could not do immediately
# is done here: # is done here:
self.sortQueuedEvents() if event[0] is not None:
self.sortQueuedEvents()
self._executing_event = -1 self._executing_event = -1
def logQueuedEvents(self): def logQueuedEvents(self):
......
...@@ -142,7 +142,7 @@ class StorageOperationHandler(EventHandler): ...@@ -142,7 +142,7 @@ class StorageOperationHandler(EventHandler):
# if client tasks are finished) # if client tasks are finished)
def getEventQueue(self): def getEventQueue(self):
return self.app.tm return self.app.tm.read_queue
@checkFeedingConnection(check=True) @checkFeedingConnection(check=True)
def askCheckTIDRange(self, conn, *args): def askCheckTIDRange(self, conn, *args):
......
...@@ -37,21 +37,7 @@ class NotRegisteredError(Exception): ...@@ -37,21 +37,7 @@ class NotRegisteredError(Exception):
Raised when a ttid is not registered Raised when a ttid is not registered
""" """
class TransactionManagerEventKey(object): class Transaction(object):
def __init__(self, tid):
self.locking_tid = tid
def __lt__(self, other):
return self.locking_tid < other.locking_tid
def __repr__(self):
return "<%s(%s) at 0x%x>" % (
self.__class__.__name__,
dump(self.locking_tid),
id(self))
class Transaction(TransactionManagerEventKey):
""" """
Container for a pending transaction Container for a pending transaction
""" """
...@@ -61,7 +47,7 @@ class Transaction(TransactionManagerEventKey): ...@@ -61,7 +47,7 @@ class Transaction(TransactionManagerEventKey):
def __init__(self, uuid, ttid): def __init__(self, uuid, ttid):
self._birth = time() self._birth = time()
TransactionManagerEventKey.__init__(self, ttid) self.locking_tid = ttid
self.uuid = uuid self.uuid = uuid
self.serial_dict = {} self.serial_dict = {}
self.store_dict = {} self.store_dict = {}
...@@ -77,6 +63,9 @@ class Transaction(TransactionManagerEventKey): ...@@ -77,6 +63,9 @@ class Transaction(TransactionManagerEventKey):
time() - self._birth, time() - self._birth,
id(self)) id(self))
def __lt__(self, other):
return self.locking_tid < other.locking_tid
def logDelay(self, ttid, locked, oid_serial): def logDelay(self, ttid, locked, oid_serial):
if self._delayed.get(oid_serial) != locked: if self._delayed.get(oid_serial) != locked:
if self._delayed: if self._delayed:
...@@ -99,8 +88,7 @@ class TransactionManager(EventQueue): ...@@ -99,8 +88,7 @@ class TransactionManager(EventQueue):
""" """
def __init__(self, app): def __init__(self, app):
EventQueue.__init__(self, lambda prev: TransactionManagerEventKey( EventQueue.__init__(self)
"" if prev is None else prev.locking_tid))
self.read_queue = EventQueue() self.read_queue = EventQueue()
self._app = app self._app = app
self._transaction_dict = {} self._transaction_dict = {}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment