Commit f4bcea77 authored by Jim Fulton's avatar Jim Fulton

Internal refactoring of the commit lock manager

In working on the next iteration of the lock manager to provide
object-level locking, I realized:

- It was saner let all waiting try to get locks when locks are
  released, at least in the more complicated logic to follow.

- We really do almost certianly want a multi-threaded server, even if
  it doesn't run faster (still an open question), because otherwise,
  big commits will completely block loads.

- We don't really want to hold the lock-manager lock while calling the
  callback.  Again, this really only matters if we have a
  multi-threaded server, but it also feels like a matter of hygiene :)

I decided to rework this branch:

- Don't hold lock-manager internal lock when calling the callnack.

- When releasing the lock, use call_soon_threadsafe to let all waiting
  have a chance to get the lock.

- A little bit of factoring to DRY. (This factoring will be much more
  useful in the follow-on branch.

This rework restores the workability of the thread-per-client model.
parent d4a4a766
...@@ -106,6 +106,7 @@ class ZEOStorage: ...@@ -106,6 +106,7 @@ class ZEOStorage:
def notify_connected(self, conn): def notify_connected(self, conn):
self.connection = conn self.connection = conn
self.call_soon_threadsafe = conn.call_soon_threadsafe
self.connected = True self.connected = True
assert conn.protocol_version is not None assert conn.protocol_version is not None
self.log_label = _addr_label(conn.addr) self.log_label = _addr_label(conn.addr)
...@@ -847,7 +848,7 @@ class StorageServer: ...@@ -847,7 +848,7 @@ class StorageServer:
# later transactions since they will have to validate their # later transactions since they will have to validate their
# caches anyway. # caches anyway.
for zs in self.zeo_storages_by_storage_id[storage_id][:]: for zs in self.zeo_storages_by_storage_id[storage_id][:]:
zs.connection.call_soon_threadsafe(zs.connection.close) zs.call_soon_threadsafe(zs.connection.close)
def invalidate( def invalidate(
self, zeo_storage, storage_id, tid, invalidated=(), info=None): self, zeo_storage, storage_id, tid, invalidated=(), info=None):
...@@ -985,8 +986,7 @@ class StorageServer: ...@@ -985,8 +986,7 @@ class StorageServer:
for zs in zeo_storages[:]: for zs in zeo_storages[:]:
try: try:
logger.debug("Closing %s", zs.connection) logger.debug("Closing %s", zs.connection)
zs.connection.call_soon_threadsafe( zs.call_soon_threadsafe(zs.connection.close)
zs.connection.close)
except Exception: except Exception:
logger.exception("closing connection %r", zs) logger.exception("closing connection %r", zs)
...@@ -1025,14 +1025,6 @@ class StorageServer: ...@@ -1025,14 +1025,6 @@ class StorageServer:
for storage_id in self.storages) for storage_id in self.storages)
def _level_for_waiting(waiting):
if len(waiting) > 9:
return logging.CRITICAL
if len(waiting) > 3:
return logging.WARNING
else:
return logging.DEBUG
class StubTimeoutThread: class StubTimeoutThread:
def begin(self, client): def begin(self, client):
...@@ -1093,8 +1085,7 @@ class TimeoutThread(threading.Thread): ...@@ -1093,8 +1085,7 @@ class TimeoutThread(threading.Thread):
client.log("Transaction timeout after %s seconds" % client.log("Transaction timeout after %s seconds" %
self._timeout, logging.CRITICAL) self._timeout, logging.CRITICAL)
try: try:
client.connection.call_soon_threadsafe( client.call_soon_threadsafe(client.connection.close)
client.connection.close)
except: except:
client.log("Timeout failure", logging.CRITICAL, client.log("Timeout failure", logging.CRITICAL,
exc_info=sys.exc_info()) exc_info=sys.exc_info())
...@@ -1209,16 +1200,13 @@ def never_resolve_conflict(oid, committedSerial, oldSerial, newpickle, ...@@ -1209,16 +1200,13 @@ def never_resolve_conflict(oid, committedSerial, oldSerial, newpickle,
data=newpickle) data=newpickle)
class LockManager(object): class LockManager(object):
# NOTE: This implementation assumes a single server thread.
# It could be updated to work with a thread-per-client, but
# the waiting-management logic would have to be more complex.
def __init__(self, storage_id, stats, timeout): def __init__(self, storage_id, stats, timeout):
self.storage_id = storage_id self.storage_id = storage_id
self.stats = stats self.stats = stats
self.timeout = timeout self.timeout = timeout
self.locked = None self.locked = None
self.waiting = [] self.waiting = {} # {ZEOStorage -> (func, delay)}
self._lock = RLock() self._lock = RLock()
def lock(self, zs, func): def lock(self, zs, func):
...@@ -1234,74 +1222,99 @@ class LockManager(object): ...@@ -1234,74 +1222,99 @@ class LockManager(object):
the lock isn't held pas the call. the lock isn't held pas the call.
""" """
with self._lock: with self._lock:
locked = self.locked if self._can_lock(zs):
if locked is zs:
raise StorageTransactionError("Already voting (locked)")
if locked is not None and not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)
self._unlocked(locked)
locked = None
if locked is None:
result = func()
self._locked(zs) self._locked(zs)
return result else:
if any(w for w in self.waiting if w is zs):
assert locked.locked
if any(w for w in self.waiting if w[0] is zs):
raise StorageTransactionError("Already voting (waiting)") raise StorageTransactionError("Already voting (waiting)")
delay = Delay() delay = Delay()
self.waiting.append((zs, func, delay)) self.waiting[zs] = (func, delay)
zs.log("(%r) queue lock: transactions waiting: %s" self._log_waiting(
% (self.storage_id, len(self.waiting)), zs, "(%r) queue lock: transactions waiting: %s")
_level_for_waiting(self.waiting)
)
return delay return delay
def release(self, zs): try:
result = func()
except Exception:
self.release(zs)
raise
else:
if not zs.locked:
self.release(zs)
return result
def _lock_waiting(self, zs):
waiting = None
with self._lock: with self._lock:
locked = self.locked if self.locked is zs:
if locked is zs: assert zs.locked
self._unlocked(zs) return
if self._can_lock(zs):
waiting = self.waiting.pop(zs, None)
if waiting:
self._locked(zs)
while self.waiting: if waiting:
zs, func, delay = self.waiting.pop(0) func, delay = waiting
try: try:
result = func() result = func()
except Exception: except Exception:
delay.error(sys.exc_info()) delay.error(sys.exc_info())
self.release(zs)
else: else:
delay.reply(result) delay.reply(result)
if self._locked(zs): if not zs.locked:
break self.release(zs)
def release(self, zs):
with self._lock:
locked = self.locked
if locked is zs:
self._unlocked(zs)
for zs in list(self.waiting):
zs.call_soon_threadsafe(self._lock_waiting, zs)
else: else:
waiting = [w for w in self.waiting if w[0] is not zs] if self.waiting.pop(zs, None):
if len(waiting) < len(self.waiting): self._log_waiting(
zs.log( zs, "(%r) dequeue lock: transactions waiting: %s")
"(%r) dequeue lock: transactions waiting: %s" % (
self.storage_id, len(waiting)), def _log_waiting(self, zs, message):
_level_for_waiting(waiting) l = len(self.waiting)
zs.log(message % (self.storage_id, l),
logging.CRITICAL if l > 9 else (
logging.WARNING if l > 3 else logging.DEBUG)
) )
self.waiting = waiting
def _can_lock(self, zs):
locked = self.locked
if locked is zs:
raise StorageTransactionError("Already voting (locked)")
if locked is not None:
if not locked.connected:
locked.log("Still locked after disconnected. Unlocking.",
logging.CRITICAL)
if locked.transaction:
locked.storage.tpc_abort(locked.transaction)
self._unlocked(locked)
locked = None
else:
assert locked.locked
return locked is None
def _locked(self, zs): def _locked(self, zs):
if zs.locked:
self.locked = zs self.locked = zs
self.stats.lock_time = time.time() self.stats.lock_time = time.time()
zs.log( self._log_waiting(zs, "(%r) lock: transactions waiting: %s")
"(%r) lock: transactions waiting: %s" % (
self.storage_id, len(self.waiting)),
_level_for_waiting(self.waiting)
)
self.timeout.begin(zs) self.timeout.begin(zs)
return True return True
...@@ -1310,8 +1323,4 @@ class LockManager(object): ...@@ -1310,8 +1323,4 @@ class LockManager(object):
self.timeout.end(zs) self.timeout.end(zs)
self.locked = self.stats.lock_time = None self.locked = self.stats.lock_time = None
zs.locked = False zs.locked = False
zs.log( self._log_waiting(zs, "(%r) unlock: transactions waiting: %s")
"(%r) unlock: transactions waiting: %s" % (
self.storage_id, len(self.waiting)),
_level_for_waiting(self.waiting)
)
...@@ -31,9 +31,15 @@ http://bugs.python.org/issue27392, but it's hard to justify the fix to ...@@ -31,9 +31,15 @@ http://bugs.python.org/issue27392, but it's hard to justify the fix to
get it accepted, so we won't bother for now. This currently uses a get it accepted, so we won't bother for now. This currently uses a
horrible monley patch to work with SSL. horrible monley patch to work with SSL.
Note that the latest server commit-lock manager assumes a single To use this module, replace::
thread. To use this Acceptor, the lock manager would need to be
updated. from .asyncio.server import Acceptor
with::
from .asyncio.mtacceptor import Acceptor
in ZEO.StorageServer.
""" """
from .._compat import PY3 from .._compat import PY3
......
...@@ -59,6 +59,8 @@ class FakeConnection: ...@@ -59,6 +59,8 @@ class FakeConnection:
protocol_version = b'Z4' protocol_version = b'Z4'
addr = 'test' addr = 'test'
call_soon_threadsafe = lambda f, *a: f(*a)
def test_server_record_iternext(): def test_server_record_iternext():
""" """
......
...@@ -230,10 +230,10 @@ We start a transaction and vote, this leads to getting the lock. ...@@ -230,10 +230,10 @@ We start a transaction and vote, this leads to getting the lock.
received handshake 'Z5' received handshake 'Z5'
>>> tid1 = start_trans(zs1) >>> tid1 = start_trans(zs1)
>>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
If another client tried to vote, it's lock request will be queued and If another client tried to vote, it's lock request will be queued and
a delay will be returned: a delay will be returned:
...@@ -255,10 +255,10 @@ When we end the first transaction, the queued vote gets the lock. ...@@ -255,10 +255,10 @@ When we end the first transaction, the queued vote gets the lock.
>>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS >>> zs1.tpc_abort(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') unlock: transactions waiting: 1 (test-addr-1) ('1') unlock: transactions waiting: 1
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0 (test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
Let's try again with the first client. The vote will be queued: Let's try again with the first client. The vote will be queued:
...@@ -362,13 +362,13 @@ release the lock and one of the waiting clients will get the lock. ...@@ -362,13 +362,13 @@ release the lock and one of the waiting clients will get the lock.
>>> zs2.notify_disconnected() # doctest: +ELLIPSIS >>> zs2.notify_disconnected() # doctest: +ELLIPSIS
ZEO.StorageServer INFO ZEO.StorageServer INFO
(test-addr-2) disconnected during locked transaction (test-addr-...) disconnected during locked transaction
ZEO.StorageServer CRITICAL ZEO.StorageServer CRITICAL
(test-addr-2) ('1') unlock: transactions waiting: 10 (test-addr-...) ('1') unlock: transactions waiting: 10
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer WARNING ZEO.StorageServer WARNING
(test-addr-1) ('1') lock: transactions waiting: 9 (test-addr-...) ('1') lock: transactions waiting: 9
ZEO.StorageServer BLATHER
(test-addr-...) Preparing to commit transaction: 1 objects, ... bytes
(In practice, waiting clients won't necessarily get the lock in order.) (In practice, waiting clients won't necessarily get the lock in order.)
...@@ -393,43 +393,15 @@ statistics using the server_status method: ...@@ -393,43 +393,15 @@ statistics using the server_status method:
If clients disconnect while waiting, they will be dequeued: If clients disconnect while waiting, they will be dequeued:
>>> for client in clients: >>> for client in clients:
... client.notify_disconnected() ... client.notify_disconnected() # doctest: +ELLIPSIS
ZEO.StorageServer INFO ZEO.StorageServer INFO
(test-addr-10) disconnected during unlocked transaction (test-addr-10) disconnected during unlocked transaction
ZEO.StorageServer WARNING ZEO.StorageServer WARNING
(test-addr-10) ('1') dequeue lock: transactions waiting: 8 (test-addr-10) ('1') dequeue lock: transactions waiting: 8
ZEO.StorageServer INFO ...
(test-addr-11) disconnected during unlocked transaction
ZEO.StorageServer WARNING >>> zs1.server_status()['waiting']
(test-addr-11) ('1') dequeue lock: transactions waiting: 7 0
ZEO.StorageServer INFO
(test-addr-12) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-12) ('1') dequeue lock: transactions waiting: 6
ZEO.StorageServer INFO
(test-addr-13) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-13) ('1') dequeue lock: transactions waiting: 5
ZEO.StorageServer INFO
(test-addr-14) disconnected during unlocked transaction
ZEO.StorageServer WARNING
(test-addr-14) ('1') dequeue lock: transactions waiting: 4
ZEO.StorageServer INFO
(test-addr-15) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-15) ('1') dequeue lock: transactions waiting: 3
ZEO.StorageServer INFO
(test-addr-16) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-16) ('1') dequeue lock: transactions waiting: 2
ZEO.StorageServer INFO
(test-addr-17) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-17) ('1') dequeue lock: transactions waiting: 1
ZEO.StorageServer INFO
(test-addr-18) disconnected during unlocked transaction
ZEO.StorageServer DEBUG
(test-addr-18) ('1') dequeue lock: transactions waiting: 0
>>> zs1.tpc_abort(tid1) >>> zs1.tpc_abort(tid1)
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
...@@ -489,10 +461,10 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -489,10 +461,10 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
received handshake 'Z5' received handshake 'Z5'
>>> tid1 = start_trans(zs1) >>> tid1 = start_trans(zs1)
>>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS >>> resolved1 = zs1.vote(tid1) # doctest: +ELLIPSIS
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-1) ('1') lock: transactions waiting: 0 (test-addr-1) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-1) Preparing to commit transaction: 1 objects, ... bytes
>>> zs1.connection.connection_lost(None) >>> zs1.connection.connection_lost(None)
ZEO.StorageServer INFO ZEO.StorageServer INFO
...@@ -507,10 +479,10 @@ ZEOStorage as closed and see if trying to get a lock cleans it up: ...@@ -507,10 +479,10 @@ ZEOStorage as closed and see if trying to get a lock cleans it up:
received handshake 'Z5' received handshake 'Z5'
>>> tid2 = start_trans(zs2) >>> tid2 = start_trans(zs2)
>>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS >>> resolved2 = zs2.vote(tid2) # doctest: +ELLIPSIS
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
(test-addr-2) ('1') lock: transactions waiting: 0 (test-addr-2) ('1') lock: transactions waiting: 0
ZEO.StorageServer BLATHER
(test-addr-2) Preparing to commit transaction: 1 objects, ... bytes
>>> zs2.tpc_abort(tid2) >>> zs2.tpc_abort(tid2)
ZEO.StorageServer DEBUG ZEO.StorageServer DEBUG
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment