Commit 9ed4c4f2 authored by Julien Muchembled's avatar Julien Muchembled

storage: wait no tid is locked in the range being checked

parent 1cdf18b6
...@@ -338,9 +338,9 @@ class Application(object): ...@@ -338,9 +338,9 @@ class Application(object):
if not node.isHidden(): if not node.isHidden():
break break
def queueEvent(self, some_callable, conn, args, key=None, def queueEvent(self, some_callable, conn=None, args=(), key=None,
raise_on_duplicate=True): raise_on_duplicate=True):
msg_id = conn.getPeerId() msg_id = None if conn is None else conn.getPeerId()
event_queue_dict = self.event_queue_dict event_queue_dict = self.event_queue_dict
if raise_on_duplicate and key in event_queue_dict: if raise_on_duplicate and key in event_queue_dict:
raise AlreadyPendingError() raise AlreadyPendingError()
...@@ -362,8 +362,9 @@ class Application(object): ...@@ -362,8 +362,9 @@ class Application(object):
event_queue_dict[key] -= 1 event_queue_dict[key] -= 1
if event_queue_dict[key] == 0: if event_queue_dict[key] == 0:
del event_queue_dict[key] del event_queue_dict[key]
if conn.isClosed(): if conn is None:
continue some_callable(*args)
elif not conn.isClosed():
orig_msg_id = conn.getPeerId() orig_msg_id = conn.getPeerId()
try: try:
conn.setPeerId(msg_id) conn.setPeerId(msg_id)
......
...@@ -104,11 +104,16 @@ class Checker(object): ...@@ -104,11 +104,16 @@ class Checker(object):
self.next_oid = None self.next_oid = None
self.partition = partition self.partition = partition
self.source = source self.source = source
def start():
if app.tm.isLockedTid(max_tid):
app.queueEvent(start)
return
args = partition, CHECK_COUNT, min_tid, max_tid args = partition, CHECK_COUNT, min_tid, max_tid
p = Packets.AskCheckTIDRange(*args) p = Packets.AskCheckTIDRange(*args)
for conn, identified in self.conn_dict.items(): for conn, identified in self.conn_dict.items():
self.conn_dict[conn] = conn.ask(p) if identified else None self.conn_dict[conn] = conn.ask(p) if identified else None
self.conn_dict[None] = app.dm.checkTIDRange(*args) self.conn_dict[None] = app.dm.checkTIDRange(*args)
start()
def connected(self, node): def connected(self, node):
conn = node.getConnection() conn = node.getConnection()
......
...@@ -144,29 +144,36 @@ class StorageOperationHandler(EventHandler): ...@@ -144,29 +144,36 @@ class StorageOperationHandler(EventHandler):
@checkFeedingConnection(check=True) @checkFeedingConnection(check=True)
def askCheckTIDRange(self, conn, *args): def askCheckTIDRange(self, conn, *args):
app = self.app
if app.tm.isLockedTid(args[3]): # max_tid
app.queueEvent(self.askCheckTIDRange, conn, args)
return
msg_id = conn.getPeerId() msg_id = conn.getPeerId()
conn = weakref.proxy(conn) conn = weakref.proxy(conn)
def check(): def check():
r = self.app.dm.checkTIDRange(*args) r = app.dm.checkTIDRange(*args)
try: try:
conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id) conn.answer(Packets.AnswerCheckTIDRange(*r), msg_id)
except (weakref.ReferenceError, ConnectorConnectionClosedException): except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass pass
yield yield
self.app.newTask(check()) app.newTask(check())
@checkFeedingConnection(check=True) @checkFeedingConnection(check=True)
def askCheckSerialRange(self, conn, *args): def askCheckSerialRange(self, conn, *args):
app = self.app
if app.tm.isLockedTid(args[3]): # max_tid
raise ProtocolError("transactions must be checked before objects")
msg_id = conn.getPeerId() msg_id = conn.getPeerId()
conn = weakref.proxy(conn) conn = weakref.proxy(conn)
def check(): def check():
r = self.app.dm.checkSerialRange(*args) r = app.dm.checkSerialRange(*args)
try: try:
conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id) conn.answer(Packets.AnswerCheckSerialRange(*r), msg_id)
except (weakref.ReferenceError, ConnectorConnectionClosedException): except (weakref.ReferenceError, ConnectorConnectionClosedException):
pass pass
yield yield
self.app.newTask(check()) app.newTask(check())
@checkFeedingConnection(check=False) @checkFeedingConnection(check=False)
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid, def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment