From 99b88cacba475dc21da5a08dbd3d5c8f8448f611 Mon Sep 17 00:00:00 2001 From: Jeremy Hylton <jeremy@svn.zope.org> Date: Thu, 5 Jun 2003 22:38:35 +0000 Subject: [PATCH] Merge atomic cache invalidation code from the 3.1 release branch. --- src/ZEO/ClientStorage.py | 73 ++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/src/ZEO/ClientStorage.py b/src/ZEO/ClientStorage.py index ba04bb27..dfcec185 100644 --- a/src/ZEO/ClientStorage.py +++ b/src/ZEO/ClientStorage.py @@ -268,6 +268,12 @@ class ClientStorage: self._oid_lock = threading.Lock() self._oids = [] # Object ids retrieved from new_oids() + # Can't read data in one thread while writing data + # (tpc_finish) in another thread. In general, the lock + # must prevent access to the cache while _update_cache + # is executing. + self._lock = threading.Lock() + t = self._ts = get_timestamp() self._serial = `t` self._oid = '\0\0\0\0\0\0\0\0' @@ -688,11 +694,19 @@ class ClientStorage: specified by the given object id and version, if they exist; otherwise a KeyError is raised. """ - p = self._cache.load(oid, version) - if p: - return p + self._lock.acquire() # for atomic processing of invalidations + try: + p = self._cache.load(oid, version) + if p: + return p + finally: + self._lock.release() + if self._server is None: raise ClientDisconnected() + + # If an invalidation for oid comes in during zeoLoad, that's OK + # because we'll get oid's new state. p, s, v, pv, sv = self._server.zeoLoad(oid) self._cache.checkSize(0) self._cache.store(oid, p, s, v, pv, sv) @@ -708,9 +722,13 @@ class ClientStorage: If no version modified the object, return an empty string. """ - v = self._cache.modifiedInVersion(oid) - if v is not None: - return v + self._lock.acquire() + try: + v = self._cache.modifiedInVersion(oid) + if v is not None: + return v + finally: + self._lock.release() return self._server.modifiedInVersion(oid) def new_oid(self): @@ -847,16 +865,20 @@ class ClientStorage: if transaction is not self._transaction: return try: + self._lock.acquire() # for atomic processing of invalidations + try: + self._update_cache() + finally: + self._lock.release() + if f is not None: f() tid = self._server.tpc_finish(self._serial) + self._cache.setLastTid(tid) r = self._check_serials() assert r is None or len(r) == 0, "unhandled serialnos: %s" % r - - self._update_cache() - self._cache.setLastTid(tid) finally: self.end_transaction() @@ -866,6 +888,8 @@ class ClientStorage: This iterates over the objects in the transaction buffer and update or invalidate the cache. """ + # Must be called with _lock already acquired. + self._cache.checkSize(self._tbuf.get_size()) try: self._tbuf.begin_iterate() @@ -912,10 +936,13 @@ class ClientStorage: """Storage API: undo a transaction, writing directly to the storage.""" if self._is_read_only: raise POSException.ReadOnlyError() - # XXX what are the sync issues here? oids = self._server.undo(transaction_id) - for oid in oids: - self._cache.invalidate(oid, '') + self._lock.acquire() + try: + for oid in oids: + self._cache.invalidate(oid, '') + finally: + self._lock.release() return oids def undoInfo(self, first=0, last=-20, specification=None): @@ -969,15 +996,19 @@ class ClientStorage: # oid, version pairs. The DB's invalidate() method expects a # dictionary of oids. - # versions maps version names to dictionary of invalidations - versions = {} - for oid, version in invs: - d = versions.setdefault(version, {}) - self._cache.invalidate(oid, version=version) - d[oid] = 1 - if self._db is not None: - for v, d in versions.items(): - self._db.invalidate(d, version=v) + self._lock.acquire() + try: + # versions maps version names to dictionary of invalidations + versions = {} + for oid, version in invs: + d = versions.setdefault(version, {}) + self._cache.invalidate(oid, version=version) + d[oid] = 1 + if self._db is not None: + for v, d in versions.items(): + self._db.invalidate(d, version=v) + finally: + self._lock.release() def endVerify(self): """Server callback to signal end of cache validation.""" -- 2.30.9