Commit 02943acd authored by Jim Fulton's avatar Jim Fulton

Storage-server cleanups

Especially splitting the method used to send invalidations and info,
since we no-longer send both in the same call.
parent c8f1c523
...@@ -104,6 +104,8 @@ class ZEOStorage: ...@@ -104,6 +104,8 @@ class ZEOStorage:
self.connected = True self.connected = True
assert conn.protocol_version is not None assert conn.protocol_version is not None
self.log_label = _addr_label(conn.addr) self.log_label = _addr_label(conn.addr)
self.async = conn.async
self.async_threadsafe = conn.async_threadsafe
def notify_disconnected(self): def notify_disconnected(self):
# When this storage closes, we must ensure that it aborts # When this storage closes, we must ensure that it aborts
...@@ -274,7 +276,7 @@ class ZEOStorage: ...@@ -274,7 +276,7 @@ class ZEOStorage:
self.storage.pack(time, referencesf) self.storage.pack(time, referencesf)
self.log("pack(time=%s) complete" % repr(time)) self.log("pack(time=%s) complete" % repr(time))
# Broadcast new size statistics # Broadcast new size statistics
self.server.invalidate(None, self.storage_id, info=self.get_size_info()) self.server.broadcast_info(self.storage_id, self.get_size_info())
def new_oids(self, n=100): def new_oids(self, n=100):
"""Return a sequence of n new oids, where n defaults to 100""" """Return a sequence of n new oids, where n defaults to 100"""
...@@ -336,7 +338,7 @@ class ZEOStorage: ...@@ -336,7 +338,7 @@ class ZEOStorage:
self.stats.commits += 1 self.stats.commits += 1
self.storage.tpc_finish(self.transaction, self._invalidate) self.storage.tpc_finish(self.transaction, self._invalidate)
self.connection.async('info', self.get_size_info()) self.async('info', self.get_size_info())
# Note that the tid is still current because we still hold the # Note that the tid is still current because we still hold the
# commit lock. We'll relinquish it in _clear_transaction. # commit lock. We'll relinquish it in _clear_transaction.
tid = self.storage.lastTransaction() tid = self.storage.lastTransaction()
...@@ -801,21 +803,6 @@ class StorageServer: ...@@ -801,21 +803,6 @@ class StorageServer:
# This method is called from foreign threads. We have to # This method is called from foreign threads. We have to
# worry about interaction with the main thread. # worry about interaction with the main thread.
# 1. We modify self.invq which is read by get_invalidations
# below. This is why get_invalidations makes a copy of
# self.invq.
# 2. We access connections. There are two dangers:
#
# a. We miss a new connection. This is not a problem because
# if a client connects after we get the list of connections,
# then it will have to read the invalidation queue, which
# has already been reset.
#
# b. A connection is closes while we are iterating. This
# doesn't matter, bacause we can call should_close on a closed
# connection.
# Rebuild invq # Rebuild invq
self._setup_invq(storage_id, self.storages[storage_id]) self._setup_invq(storage_id, self.storages[storage_id])
...@@ -826,72 +813,31 @@ class StorageServer: ...@@ -826,72 +813,31 @@ class StorageServer:
for zs in self.zeo_storages_by_storage_id[storage_id][:]: for zs in self.zeo_storages_by_storage_id[storage_id][:]:
zs.call_soon_threadsafe(zs.connection.close) zs.call_soon_threadsafe(zs.connection.close)
def invalidate( def invalidate(self, zeo_storage, storage_id, tid, invalidated):
self, zeo_storage, storage_id, tid=None, invalidated=None, info=None): """Internal: broadcast invalidations to clients.
"""Internal: broadcast info and invalidations to clients.
This is called from several ZEOStorage methods. This is called from several ZEOStorage methods.
invalidated is a sequence of oids. invalidated is a sequence of oids.
This can do three different things:
- If the invalidated argument is non-empty, it broadcasts
invalidateTransaction() messages to all clients of the given
storage except the current client (the zeo_storage argument).
- If the invalidated argument is empty and the info argument
is a non-empty dictionary, it broadcasts info() messages to
all clients of the given storage, including the current
client.
- If both the invalidated argument and the info argument are
non-empty, it broadcasts invalidateTransaction() messages to all
clients except the current, and sends an info() message to
the current client.
""" """
# This method can be called from foreign threads. We have to # This method can be called from foreign threads. We have to
# worry about interaction with the main thread. # worry about interaction with the main thread.
# 1. We modify self.invq which is read by get_invalidations invq = self.invq[storage_id]
# below. This is why get_invalidations makes a copy of if len(invq) >= self.invq_bound:
# self.invq. invq.pop()
invq.insert(0, (tid, invalidated))
# 2. We access connections. There are two dangers:
#
# a. We miss a new connection. This is not a problem because
# we are called while the storage lock is held. A new
# connection that tries to read data won't read committed
# data without first recieving an invalidation. Also, if a
# client connects after getting the list of connections,
# then it will have to read the invalidation queue, which
# has been updated to reflect the invalidations.
#
# b. A connection is closes while we are iterating. We'll need
# to cactch and ignore Disconnected errors.
if invalidated is not None:
assert tid is not None
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
invq.pop()
invq.insert(0, (tid, invalidated))
# serialize invalidation message, so we don't have to to
# it over and over
else:
assert info is not None
for zs in self.zeo_storages_by_storage_id[storage_id]: for zs in self.zeo_storages_by_storage_id[storage_id]:
connection = zs.connection if zs is not zeo_storage:
if invalidated is not None and zs is not zeo_storage: zs.async_threadsafe('invalidateTransaction', tid, invalidated)
connection.call_soon_threadsafe(
connection.async, 'invalidateTransaction', tid, invalidated) def broadcast_info(self, storage_id, info):
elif info is not None: """Internal: broadcast info to clients.
connection.call_soon_threadsafe( """
connection.async, 'info', info) for zs in self.zeo_storages_by_storage_id[storage_id]:
zs.async_threadsafe('info', info)
def get_invalidations(self, storage_id, tid): def get_invalidations(self, storage_id, tid):
"""Return a tid and list of all objects invalidation since tid. """Return a tid and list of all objects invalidation since tid.
...@@ -1032,7 +978,7 @@ class TimeoutThread(threading.Thread): ...@@ -1032,7 +978,7 @@ class TimeoutThread(threading.Thread):
def begin(self, client): def begin(self, client):
# Called from the restart code the "main" thread, whenever the # Called from the restart code the "main" thread, whenever the
# storage lock is being acquired. (Serialized by asyncore.) # storage lock is being acquired.
with self._cond: with self._cond:
assert self._client is None assert self._client is None
self._client = client self._client = client
...@@ -1041,7 +987,7 @@ class TimeoutThread(threading.Thread): ...@@ -1041,7 +987,7 @@ class TimeoutThread(threading.Thread):
def end(self, client): def end(self, client):
# Called from the "main" thread whenever the storage lock is # Called from the "main" thread whenever the storage lock is
# being released. (Serialized by asyncore.) # being released.
with self._cond: with self._cond:
assert self._client is not None assert self._client is not None
assert self._client is client assert self._client is client
...@@ -1087,7 +1033,7 @@ class SlowMethodThread(threading.Thread): ...@@ -1087,7 +1033,7 @@ class SlowMethodThread(threading.Thread):
""" """
# Some storage methods can take a long time to complete. If we # Some storage methods can take a long time to complete. If we
# run these methods via a standard asyncore read handler, they # run these methods in response to an I/O event, they
# will block all other server activity until they complete. To # will block all other server activity until they complete. To
# avoid blocking, we spawn a separate thread, return an MTDelay() # avoid blocking, we spawn a separate thread, return an MTDelay()
# object, and have the thread reply() when it finishes. # object, and have the thread reply() when it finishes.
......
...@@ -139,6 +139,9 @@ class ServerProtocol(base.Protocol): ...@@ -139,6 +139,9 @@ class ServerProtocol(base.Protocol):
def async(self, method, *args): def async(self, method, *args):
self.call_async(method, args) self.call_async(method, args)
def async_threadsafe(self, method, *args):
self.call_soon_threadsafe(self.call_async, method, args)
best_protocol_version = os.environ.get( best_protocol_version = os.environ.get(
'ZEO_SERVER_PROTOCOL', 'ZEO_SERVER_PROTOCOL',
ServerProtocol.protocols[-1].decode('utf-8')).encode('utf-8') ServerProtocol.protocols[-1].decode('utf-8')).encode('utf-8')
......
...@@ -60,6 +60,7 @@ class FakeConnection: ...@@ -60,6 +60,7 @@ class FakeConnection:
addr = 'test' addr = 'test'
call_soon_threadsafe = lambda f, *a: f(*a) call_soon_threadsafe = lambda f, *a: f(*a)
async = async_threadsafe = None
def test_server_record_iternext(): def test_server_record_iternext():
""" """
......
...@@ -714,7 +714,7 @@ class FauxConn: ...@@ -714,7 +714,7 @@ class FauxConn:
if method == 'serialnos': if method == 'serialnos':
self.serials.extend(args[0]) self.serials.extend(args[0])
call_soon_threadsafe = async call_soon_threadsafe = async_threadsafe = async
class StorageServerWrapper: class StorageServerWrapper:
......
...@@ -28,6 +28,8 @@ class ServerProtocol: ...@@ -28,6 +28,8 @@ class ServerProtocol:
def async(self, *args): def async(self, *args):
self.calls.append(args) self.calls.append(args)
async_threadsafe = async
class StorageServer: class StorageServer:
"""Create a client interface to a StorageServer. """Create a client interface to a StorageServer.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment