Commit 9f75d6c6 authored by Jeremy Hylton's avatar Jeremy Hylton

New timeout feature.

If a transaction takes a long time to complete after it has the
storage lock, close the connection and abort the transaction.  The
abort may cause the client to be hosed.

XXX There should be a configuration option to enable / disable this
feature.  That should be done before too long.

When a transaction takes a long time to complete, it is often a sign
of deadlock involving multiple storages.  Until the possibility of
deadlock is eliminated, this feature will prevent an entire ZEO
cluster from deadlocking (on writes).
parent 2a246039
...@@ -25,6 +25,7 @@ import cPickle ...@@ -25,6 +25,7 @@ import cPickle
import os import os
import sys import sys
import threading import threading
import time
from ZEO import ClientStub from ZEO import ClientStub
from ZEO.CommitLog import CommitLog from ZEO.CommitLog import CommitLog
...@@ -210,9 +211,12 @@ class ZEOStorage: ...@@ -210,9 +211,12 @@ class ZEOStorage:
self.storage_id = "uninitialized" self.storage_id = "uninitialized"
self.transaction = None self.transaction = None
self.read_only = read_only self.read_only = read_only
self.timeout = TimeoutThread()
self.timeout.start()
def notifyConnected(self, conn): def notifyConnected(self, conn):
self.client = self.ClientStorageStubClass(conn) self.client = self.ClientStorageStubClass(conn)
self.timeout.notifyConnected(conn)
def notifyDisconnected(self): def notifyDisconnected(self):
# When this storage closes, we must ensure that it aborts # When this storage closes, we must ensure that it aborts
...@@ -222,6 +226,7 @@ class ZEOStorage: ...@@ -222,6 +226,7 @@ class ZEOStorage:
self.abort() self.abort()
else: else:
self.log("disconnected") self.log("disconnected")
self.timeout.notifyDisconnected()
def __repr__(self): def __repr__(self):
tid = self.transaction and repr(self.transaction.id) tid = self.transaction and repr(self.transaction.id)
...@@ -393,6 +398,7 @@ class ZEOStorage: ...@@ -393,6 +398,7 @@ class ZEOStorage:
if self.storage._transaction is None: if self.storage._transaction is None:
self.strategy = self.ImmediateCommitStrategyClass(self.storage, self.strategy = self.ImmediateCommitStrategyClass(self.storage,
self.client) self.client)
self.timeout.begin()
else: else:
self.strategy = self.DelayedCommitStrategyClass(self.storage, self.strategy = self.DelayedCommitStrategyClass(self.storage,
self.wait) self.wait)
...@@ -409,6 +415,7 @@ class ZEOStorage: ...@@ -409,6 +415,7 @@ class ZEOStorage:
def tpc_finish(self, id): def tpc_finish(self, id):
if not self.check_tid(id): if not self.check_tid(id):
return return
self.timeout.end()
invalidated = self.strategy.tpc_finish() invalidated = self.strategy.tpc_finish()
if invalidated: if invalidated:
self.server.invalidate(self, self.storage_id, self.server.invalidate(self, self.storage_id,
...@@ -420,6 +427,7 @@ class ZEOStorage: ...@@ -420,6 +427,7 @@ class ZEOStorage:
def tpc_abort(self, id): def tpc_abort(self, id):
if not self.check_tid(id): if not self.check_tid(id):
return return
self.timeout.end()
strategy = self.strategy strategy = self.strategy
strategy.tpc_abort() strategy.tpc_abort()
self.transaction = None self.transaction = None
...@@ -440,7 +448,9 @@ class ZEOStorage: ...@@ -440,7 +448,9 @@ class ZEOStorage:
def vote(self, id): def vote(self, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
return self.strategy.tpc_vote() r = self.strategy.tpc_vote()
self.timeout.begin()
return r
def abortVersion(self, src, id): def abortVersion(self, src, id):
self.check_tid(id, exc=StorageTransactionError) self.check_tid(id, exc=StorageTransactionError)
...@@ -737,6 +747,79 @@ class SlowMethodThread(threading.Thread): ...@@ -737,6 +747,79 @@ class SlowMethodThread(threading.Thread):
else: else:
self.delay.reply(result) self.delay.reply(result)
class TimeoutThread(threading.Thread):
# A TimeoutThread is associated with a ZEOStorage. It trackes
# how long transactions take to commit. If a transaction takes
# too long, it will close the connection.
TIMEOUT = 30
def __init__(self):
threading.Thread.__init__(self)
self._lock = threading.Lock()
self._timestamp = None
self._conn = None
def begin(self):
self._lock.acquire()
try:
self._timestamp = time.time()
finally:
self._lock.release()
def end(self):
self._lock.acquire()
try:
self._timestamp = None
finally:
self._lock.release()
# There's a race here, but I hope it is harmless.
def notifyConnected(self, conn):
self._conn = conn
def notifyDisconnected(self):
self._conn = None
def run(self):
timeout = self.TIMEOUT
while self._conn is not None:
time.sleep(timeout)
self._lock.acquire()
try:
if self._timestamp is not None:
deadline = self._timestamp + self.TIMEOUT
else:
log("TimeoutThread no current transaction",
zLOG.BLATHER)
timeout = self.TIMEOUT
continue
finally:
self._lock.release()
timeout = deadline - time.time()
if deadline < time.time():
self._abort()
break
else:
elapsed = self.TIMEOUT - timeout
log("TimeoutThread transaction has %0.2f sec to complete"
" (%.2f elapsed)" % (timeout, elapsed), zLOG.BLATHER)
log("TimeoutThread exiting. Connection closed.", zLOG.BLATHER)
def _abort(self):
# It's possible for notifyDisconnected to remove the connection
# just before we use it. I think that's harmless, since it means
# the connection was closed.
log("TimeoutThread aborting transaction", zLOG.WARNING)
try:
self._conn.close()
except AttributeError, msg:
log(msg)
# Patch up class references # Patch up class references
StorageServer.ZEOStorageClass = ZEOStorage StorageServer.ZEOStorageClass = ZEOStorage
ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy ZEOStorage.DelayedCommitStrategyClass = DelayedCommitStrategy
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment