Connection.py 48 KB
Newer Older
Jim Fulton's avatar
Jim Fulton committed
1
##############################################################################
matt@zope.com's avatar
matt@zope.com committed
2
#
3 4
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
5
#
matt@zope.com's avatar
matt@zope.com committed
6
# This software is subject to the provisions of the Zope Public License,
Jim Fulton's avatar
Jim Fulton committed
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
matt@zope.com's avatar
matt@zope.com committed
8 9 10 11
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12
#
Jim Fulton's avatar
Jim Fulton committed
13 14 15
##############################################################################
"""Database connection support

16
$Id$"""
Jeremy Hylton's avatar
Jeremy Hylton committed
17

18
import logging
19
import sys
20
import tempfile
21
import threading
22
import warnings
23 24
import os
import shutil
25
from time import time
26

27 28
from persistent import PickleCache

29
# interfaces
30 31
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
32
from ZODB.interfaces import IBlobStorage
33
from ZODB.blob import Blob, rename_or_copy_blob
34 35 36
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
37 38
from zope.interface import implements

39 40
import transaction

41
from ZODB.blob import SAVEPOINT_SUFFIX
42 43
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
44 45 46
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
47
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
48
from ZODB.POSException import POSKeyError
49
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
50
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
51
from ZODB import utils
52

53 54 55 56
global_reset_counter = 0

def resetCaches():
    """Causes all connection caches to be reset as connections are reopened.
Sidnei da Silva's avatar
Sidnei da Silva committed
57

58 59 60 61 62 63
    Zope's refresh feature uses this.  When you reload Python modules,
    instances of classes continue to use the old class definitions.
    To use the new code immediately, the refresh feature asks ZODB to
    clear caches by calling resetCaches().  When the instances are
    loaded by subsequent connections, they will use the new class
    definitions.
Barry Warsaw's avatar
Typos  
Barry Warsaw committed
64
    """
65 66
    global global_reset_counter
    global_reset_counter += 1
Shane Hathaway's avatar
Shane Hathaway committed
67

68
class Connection(ExportImport, object):
69
    """Connection to ZODB for loading and storing objects."""
70

71 72 73 74
    implements(IConnection,
               ISavepointDataManager,
               IPersistentDataManager,
               ISynchronizer)
75

76

77
    _code_timestamp = 0
Jim Fulton's avatar
Jim Fulton committed
78

79 80
    ##########################################################################
    # Connection methods, ZODB.IConnection
81

82
    def __init__(self, db, cache_size=400, before=None):
83
        """Create a new Connection."""
84

85 86 87
        self._log = logging.getLogger('ZODB.Connection')
        self._debug_info = ()

88
        self._db = db
89 90 91 92
        
        # historical connection
        self.before = before
        
93 94 95
        # Multi-database support
        self.connections = {self._db.database_name: self}

96 97 98 99
        self._normal_storage = self._storage = db._storage
        self.new_oid = db._storage.new_oid
        self._savepoint_storage = None

100 101
        # Do we need to join a txn manager?
        self._needs_to_join = True
Florent Guillaume's avatar
Florent Guillaume committed
102
        self.transaction_manager = None
103
        self._opened = None # time.time() when DB.open() opened us
104

105 106 107 108 109 110 111
        self._reset_counter = global_reset_counter
        self._load_count = 0   # Number of objects unghosted
        self._store_count = 0  # Number of objects stored

        # Cache which can ghostify (forget the state of) objects not
        # recently used. Its API is roughly that of a dict, with
        # additional gc-related and invalidation-related methods.
Florent Guillaume's avatar
Florent Guillaume committed
112
        self._cache = PickleCache(self, cache_size)
113 114 115 116 117

        # The pre-cache is used by get to avoid infinite loops when
        # objects immediately load their state whern they get their
        # persistent data set.
        self._pre_cache = {}
118

119 120 121 122 123 124 125 126 127 128 129
        # List of all objects (not oids) registered as modified by the
        # persistence machinery, or by add(), or whose access caused a
        # ReadConflictError (just to be able to clean them up from the
        # cache on abort with the other modified objects). All objects
        # of this list are either in _cache or in _added.
        self._registered_objects = []

        # Dict of oid->obj added explicitly through add(). Used as a
        # preliminary cache until commit time when objects are all moved
        # to the real _cache. The objects are moved to _creating at
        # commit time.
130
        self._added = {}
131 132 133

        # During commit this is turned into a list, which receives
        # objects added as a side-effect of storing a modified object.
Jeremy Hylton's avatar
Jeremy Hylton committed
134
        self._added_during_commit = None
135 136 137 138

        # During commit, all objects go to either _modified or _creating:

        # Dict of oid->flag of new objects (without serial), either
139
        # added by add() or implicitly added (discovered by the
140 141 142 143
        # serializer during commit). The flag is True for implicit
        # adding. Used during abort to remove created objects from the
        # _cache, and by persistent_id to check that a new object isn't
        # reachable from multiple databases.
144
        self._creating = {}
145

146 147
        # List of oids of modified objects, which have to be invalidated
        # in the cache on abort and in other connections on finish.
148 149
        self._modified = []

150

151 152 153 154 155 156
        # _invalidated queues invalidate messages delivered from the DB
        # _inv_lock prevents one thread from modifying the set while
        # another is processing invalidations.  All the invalidations
        # from a single transaction should be applied atomically, so
        # the lock must be held when reading _invalidated.

157
        # It sucks that we have to hold the lock to read _invalidated.
158 159 160 161
        # Normally, _invalidated is written by calling dict.update, which
        # will execute atomically by virtue of the GIL.  But some storage
        # might generate oids where hash or compare invokes Python code.  In
        # that case, the GIL can't save us.
162 163 164
        # Note:  since that was written, it was officially declared that the
        # type of an oid is str.  TODO:  remove the related now-unnecessary
        # critical sections (if any -- this needs careful thought).
165

166
        self._inv_lock = threading.Lock()
167
        self._invalidated = set()
168

169 170 171
        # Flag indicating whether the cache has been invalidated:
        self._invalidatedCache = False

172 173 174 175 176 177 178
        # We intend to prevent committing a transaction in which
        # ReadConflictError occurs.  _conflicts is the set of oids that
        # experienced ReadConflictError.  Any time we raise ReadConflictError,
        # the oid should be added to this set, and we should be sure that the
        # object is registered.  Because it's registered, Connection.commit()
        # will raise ReadConflictError again (because the oid is in
        # _conflicts).
179
        self._conflicts = {}
180 181 182 183 184 185

        # If MVCC is enabled, then _mvcc is True and _txn_time stores
        # the upper bound on transactions visible to this connection.
        # That is, all object revisions must be written before _txn_time.
        # If it is None, then the current revisions are acceptable.
        self._txn_time = None
186

187 188 189 190 191 192
        # To support importFile(), implemented in the ExportImport base
        # class, we need to run _importDuringCommit() from our commit()
        # method.  If _import is not None, it is a two-tuple of arguments
        # to pass to _importDuringCommit().
        self._import = None

193 194
        self._reader = ObjectReader(self, self._cache, self._db.classFactory)

195

196
    def add(self, obj):
197
        """Add a new object 'obj' to the database and assign it an oid."""
198
        if self._opened is None:
199
            raise ConnectionStateError("The database connection is closed")
200

201 202 203 204 205 206
        marker = object()
        oid = getattr(obj, "_p_oid", marker)
        if oid is marker:
            raise TypeError("Only first-class persistent objects may be"
                            " added to a Connection.", obj)
        elif obj._p_jar is None:
207
            assert obj._p_oid is None
208 209 210 211
            oid = obj._p_oid = self._storage.new_oid()
            obj._p_jar = self
            if self._added_during_commit is not None:
                self._added_during_commit.append(obj)
212
            self._register(obj)
213 214 215 216
            # Add to _added after calling register(), so that _added
            # can be used as a test for whether the object has been
            # registered with the transaction.
            self._added[oid] = obj
217 218 219
        elif obj._p_jar is not self:
            raise InvalidObjectReference(obj, obj._p_jar)

220 221
    def get(self, oid):
        """Return the persistent object with oid 'oid'."""
222
        if self._opened is None:
223
            raise ConnectionStateError("The database connection is closed")
Shane Hathaway's avatar
Shane Hathaway committed
224

225 226 227 228
        obj = self._cache.get(oid, None)
        if obj is not None:
            return obj
        obj = self._added.get(oid, None)
229 230 231
        if obj is not None:
            return obj        
        obj = self._pre_cache.get(oid, None)
232 233
        if obj is not None:
            return obj
234

235 236
        # This appears to be an MVCC violation because we are loading
        # the must recent data when perhaps we shouldnt. The key is
237 238 239 240 241
        # that we are only creating a ghost!
        # A disadvantage to this optimization is that _p_serial cannot be
        # trusted until the object has been loaded, which affects both MVCC
        # and historical connections.
        p, serial = self._storage.load(oid, '')
242
        obj = self._reader.getGhost(p)
243

244 245 246
        # Avoid infiniate loop if obj tries to load its state before
        # it is added to the cache and it's state refers to it.
        self._pre_cache[oid] = obj
247 248 249 250
        obj._p_oid = oid
        obj._p_jar = self
        obj._p_changed = None
        obj._p_serial = serial
251
        self._pre_cache.pop(oid)
252 253
        self._cache[oid] = obj
        return obj
Jim Fulton's avatar
Jim Fulton committed
254

255
    def cacheMinimize(self):
256 257
        """Deactivate all unmodified objects in the cache."""
        self._cache.minimize()
258

259
    # TODO: we should test what happens when cacheGC is called mid-transaction.
260
    def cacheGC(self):
261
        """Reduce cache size to target size."""
262 263
        self._cache.incrgc()

264
    __onCloseCallbacks = None
265
    def onCloseCallback(self, f):
266
        """Register a callable, f, to be called by close()."""
267 268 269
        if self.__onCloseCallbacks is None:
            self.__onCloseCallbacks = []
        self.__onCloseCallbacks.append(f)
270

271
    def close(self, primary=True):
272
        """Close the Connection."""
273 274 275 276 277
        if not self._needs_to_join:
            # We're currently joined to a transaction.
            raise ConnectionStateError("Cannot close a connection joined to "
                                       "a transaction")

278 279
        if self._cache is not None:
            self._cache.incrgc() # This is a good time to do some GC
Jim Fulton's avatar
Jim Fulton committed
280

281
        # Call the close callbacks.
282 283
        if self.__onCloseCallbacks is not None:
            for f in self.__onCloseCallbacks:
284 285 286 287 288
                try:
                    f()
                except: # except what?
                    f = getattr(f, 'im_self', f)
                    self._log.error("Close callback failed for %s", f,
289
                                    exc_info=sys.exc_info())
290
            self.__onCloseCallbacks = None
291

292
        self._debug_info = ()
293

294
        if self._opened:
295 296 297 298 299 300 301 302 303 304
            self.transaction_manager.unregisterSynch(self)

        if primary:
            for connection in self.connections.values():
                if connection is not self:
                    connection.close(False)

            # Return the connection to the pool.
            if self._opened is not None:
                self._db._returnToPool(self)
305

306 307 308 309 310 311
                # _returnToPool() set self._opened to None.
                # However, we can't assert that here, because self may
                # have been reused (by another thread) by the time we
                # get back here.
        else:
            self._opened = None
312

313 314 315
    def db(self):
        """Returns a handle to the database this connection belongs to."""
        return self._db
316

317
    def isReadOnly(self):
318
        """Returns True if this connection is read only."""
319
        if self._opened is None:
320
            raise ConnectionStateError("The database connection is closed")
321
        return self.before is not None or self._storage.isReadOnly()
322

323 324
    def invalidate(self, tid, oids):
        """Notify the Connection that transaction 'tid' invalidated oids."""
325 326
        if self.before is not None:
            # this is an historical connection.  Invalidations are irrelevant.
327
            return
328 329 330 331 332 333 334
        self._inv_lock.acquire()
        try:
            if self._txn_time is None:
                self._txn_time = tid
            self._invalidated.update(oids)
        finally:
            self._inv_lock.release()
335

336 337 338 339 340 341 342
    def invalidateCache(self):
        self._inv_lock.acquire()
        try:
            self._invalidatedCache = True
        finally:
            self._inv_lock.release()

343 344 345
    def root(self):
        """Return the database root object."""
        return self.get(z64)
346

347 348 349 350
    def get_connection(self, database_name):
        """Return a Connection for the named database."""
        connection = self.connections.get(database_name)
        if connection is None:
351 352
            new_con = self._db.databases[database_name].open(
                transaction_manager=self.transaction_manager,
353
                before=self.before,
354
                )
355 356 357 358
            self.connections.update(new_con.connections)
            new_con.connections = self.connections
            connection = new_con
        return connection
359

360 361 362 363 364 365
    def _implicitlyAdding(self, oid):
        """Are we implicitly adding an object within the current transaction

        This is used in a check to avoid implicitly adding an object
        to a database in a multi-database situation.
        See serialize.ObjectWriter.persistent_id.
366

367 368 369 370 371 372 373 374 375
        """
        return (self._creating.get(oid, 0)
                or
                ((self._savepoint_storage is not None)
                 and
                 self._savepoint_storage.creating.get(oid, 0)
                 )
                )

376 377
    def sync(self):
        """Manually update the view on the database."""
378
        self.transaction_manager.abort()
379
        self._storage_sync()
Jeremy Hylton's avatar
Jeremy Hylton committed
380

381 382 383 384 385
    def getDebugInfo(self):
        """Returns a tuple with different items for debugging the
        connection.
        """
        return self._debug_info
Jeremy Hylton's avatar
Jeremy Hylton committed
386

387 388 389
    def setDebugInfo(self, *args):
        """Add the given items to the debug information of this connection."""
        self._debug_info = self._debug_info + args
390

391 392 393 394 395 396 397
    def getTransferCounts(self, clear=False):
        """Returns the number of objects loaded and stored."""
        res = self._load_count, self._store_count
        if clear:
            self._load_count = 0
            self._store_count = 0
        return res
398

399 400
    # Connection methods
    ##########################################################################
401

402
    ##########################################################################
403
    # Data manager (ISavepointDataManager) methods
404

405 406
    def abort(self, transaction):
        """Abort a transaction and forget all changes."""
407

408 409 410 411 412 413
        # The order is important here.  We want to abort registered
        # objects before we process the cache.  Otherwise, we may un-add
        # objects added in savepoints.  If they've been modified since
        # the savepoint, then they won't have _p_oid or _p_jar after
        # they've been unadded. This will make the code in _abort
        # confused.
414

415
        self._abort()
416

417 418
        if self._savepoint_storage is not None:
            self._abort_savepoint()
419

420
        self._tpc_cleanup()
421

422 423
    def _abort(self):
        """Abort a transaction and forget all changes."""
424

425 426 427 428 429 430 431 432
        for obj in self._registered_objects:
            oid = obj._p_oid
            assert oid is not None
            if oid in self._added:
                del self._added[oid]
                del obj._p_jar
                del obj._p_oid
            else:
433

434 435
                # Note: If we invalidate a non-ghostifiable object
                # (i.e. a persistent class), the object will
436
                # immediately reread its state.  That means that the
437
                # following call could result in a call to
438
                # self.setstate, which, of course, must succeed.
439 440 441 442 443 444 445 446 447
                # In general, it would be better if the read could be
                # delayed until the start of the next transaction.  If
                # we read at the end of a transaction and if the
                # object was invalidated during this transaction, then
                # we'll read non-current data, which we'll discard
                # later in transaction finalization.  Unfortnately, we
                # can only delay the read if this abort corresponds to
                # a top-level-transaction abort.  We can't tell if
                # this is a top-level-transaction abort, so we have to
448
                # go ahead and invalidate now.  Fortunately, it's
449 450 451
                # pretty unlikely that the object we are invalidating
                # was invalidated by another thread, so the risk of a
                # reread is pretty low.
452

453
                self._cache.invalidate(oid)
454

455 456 457 458 459
    def _tpc_cleanup(self):
        """Performs cleanup operations to support tpc_finish and tpc_abort."""
        self._conflicts.clear()
        self._needs_to_join = True
        self._registered_objects = []
460
        self._creating.clear()
461

462
    # Process pending invalidations.
463 464 465
    def _flush_invalidations(self):
        self._inv_lock.acquire()
        try:
466
            # Non-ghostifiable objects may need to read when they are
467
            # invalidated, so we'll quickly just replace the
468 469
            # invalidating dict with a new one.  We'll then process
            # the invalidations after freeing the lock *and* after
470
            # resetting the time.  This means that invalidations will
471
            # happen after the start of the transactions.  They are
472
            # subject to conflict errors and to reading old data.
473 474

            # TODO: There is a potential problem lurking for persistent
475
            # classes.  Suppose we have an invalidation of a persistent
476 477 478
            # class and of an instance.  If the instance is
            # invalidated first and if the invalidation logic uses
            # data read from the class, then the invalidation could
479
            # be performed with stale data.  Or, suppose that there
480 481 482 483 484 485 486 487 488
            # are instances of the class that are freed as a result of
            # invalidating some object.  Perhaps code in their __del__
            # uses class data.  Really, the only way to properly fix
            # this is to, in fact, make classes ghostifiable.  Then
            # we'd have to reimplement attribute lookup to check the
            # class state and, if necessary, activate the class.  It's
            # much worse than that though, because we'd also need to
            # deal with slots.  When a class is ghostified, we'd need
            # to replace all of the slot operations with versions that
489 490
            # reloaded the object when called. It's hard to say which
            # is better or worse.  For now, it seems the risk of
491
            # using a class while objects are being invalidated seems
492
            # small enough to be acceptable.
493

494 495
            invalidated = dict.fromkeys(self._invalidated)
            self._invalidated = set()
496
            self._txn_time = None
497 498 499
            if self._invalidatedCache:
                self._invalidatedCache = False
                invalidated = self._cache.cache_data.copy()
500 501
        finally:
            self._inv_lock.release()
502 503 504 505

        self._cache.invalidate(invalidated)

        # Now is a good time to collect some garbage.
506
        self._cache.incrgc()
Jim Fulton's avatar
Jim Fulton committed
507

508 509 510 511 512 513
    def tpc_begin(self, transaction):
        """Begin commit of a transaction, starting the two-phase commit."""
        self._modified = []

        # _creating is a list of oids of new objects, which is used to
        # remove them from the cache if a transaction aborts.
514
        self._creating.clear()
515 516 517 518 519 520 521 522 523 524 525 526 527 528
        self._normal_storage.tpc_begin(transaction)

    def commit(self, transaction):
        """Commit changes to an object"""

        if self._savepoint_storage is not None:

            # We first checkpoint the current changes to the savepoint
            self.savepoint()

            # then commit all of the savepoint changes at once
            self._commit_savepoint(transaction)

            # No need to call _commit since savepoint did.
529

530 531 532 533 534
        else:
            self._commit(transaction)

    def _commit(self, transaction):
        """Commit changes to an object"""
535 536 537
        
        if self.before is not None:
            raise ReadOnlyHistoryError()
538 539

        if self._import:
540 541
            # We are importing an export file. We alsways do this
            # while making a savepoint so we can copy export data
Jim Fulton's avatar
typo  
Jim Fulton committed
542
            # directly to our storage, typically a TmpStore.
543 544 545 546 547 548 549 550
            self._importDuringCommit(transaction, *self._import)
            self._import = None

        # Just in case an object is added as a side-effect of storing
        # a modified object.  If, for example, a __getstate__() method
        # calls add(), the newly added objects will show up in
        # _added_during_commit.  This sounds insane, but has actually
        # happened.
Jim Fulton's avatar
Jim Fulton committed
551

552
        self._added_during_commit = []
553

554 555 556
        if self._invalidatedCache:
            raise ConflictError()            

557 558 559 560 561
        for obj in self._registered_objects:
            oid = obj._p_oid
            assert oid
            if oid in self._conflicts:
                raise ReadConflictError(object=obj)
562

563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
            if obj._p_jar is not self:
                raise InvalidObjectReference(obj, obj._p_jar)
            elif oid in self._added:
                assert obj._p_serial == z64
            elif obj._p_changed:
                if oid in self._invalidated:
                    resolve = getattr(obj, "_p_resolveConflict", None)
                    if resolve is None:
                        raise ConflictError(object=obj)
                self._modified.append(oid)
            else:
                # Nothing to do.  It's been said that it's legal, e.g., for
                # an object to set _p_changed to false after it's been
                # changed and registered.
                continue
578

579
            self._store_objects(ObjectWriter(obj), transaction)
580

581 582 583
        for obj in self._added_during_commit:
            self._store_objects(ObjectWriter(obj), transaction)
        self._added_during_commit = None
584

585 586 587 588
    def _store_objects(self, writer, transaction):
        for obj in writer:
            oid = obj._p_oid
            serial = getattr(obj, "_p_serial", z64)
589

590 591
            if serial == z64:
                # obj is a new object
592 593 594 595

                # Because obj was added, it is now in _creating, so it
                # can be removed from _added.  If oid wasn't in
                # adding, then we are adding it implicitly.
596

597 598 599
                implicitly_adding = self._added.pop(oid, None) is None

                self._creating[oid] = implicitly_adding
600

601 602 603 604 605 606
            else:
                if (oid in self._invalidated
                    and not hasattr(obj, '_p_resolveConflict')):
                    raise ConflictError(object=obj)
                self._modified.append(oid)
            p = writer.serialize(obj)  # This calls __getstate__ of obj
607

608
            if isinstance(obj, Blob):
609 610 611 612
                if not IBlobStorage.providedBy(self._storage):
                    raise Unsupported(
                        "Storing Blobs in %s is not supported." % 
                        repr(self._storage))
613 614
                if obj.opened():
                    raise ValueError("Can't commit with opened blobs.")
615
                s = self._storage.storeBlob(oid, serial, p,
616
                                            obj._uncommitted(),
617
                                            '', transaction)
618 619 620 621 622 623
                # we invalidate the object here in order to ensure
                # that that the next attribute access of its name
                # unghostify it, which will cause its blob data
                # to be reattached "cleanly"
                obj._p_invalidate()
            else:
624
                s = self._storage.store(oid, serial, p, '', transaction)
625 626 627 628 629 630 631 632 633 634 635 636 637
            self._store_count += 1
            # Put the object in the cache before handling the
            # response, just in case the response contains the
            # serial number for a newly created object
            try:
                self._cache[oid] = obj
            except:
                # Dang, I bet it's wrapped:
                # TODO:  Deprecate, then remove, this.
                if hasattr(obj, 'aq_base'):
                    self._cache[oid] = obj.aq_base
                else:
                    raise
Jim Fulton's avatar
Jim Fulton committed
638

639
            self._handle_serial(s, oid)
640 641 642 643 644 645 646 647 648 649

    def _handle_serial(self, store_return, oid=None, change=1):
        """Handle the returns from store() and tpc_vote() calls."""

        # These calls can return different types depending on whether
        # ZEO is used.  ZEO uses asynchronous returns that may be
        # returned in batches by the ClientStorage.  ZEO1 can also
        # return an exception object and expect that the Connection
        # will raise the exception.

650 651 652 653
        # When conflict resolution occurs, the object state held by
        # the connection does not match what is written to the
        # database.  Invalidate the object here to guarantee that
        # the new state is read the next time the object is used.
654

655 656
        if not store_return:
            return
657
        if isinstance(store_return, str):
658
            assert oid is not None
659
            self._handle_one_serial(oid, store_return, change)
660 661
        else:
            for oid, serial in store_return:
662 663 664
                self._handle_one_serial(oid, serial, change)

    def _handle_one_serial(self, oid, serial, change):
665
        if not isinstance(serial, str):
666 667 668 669 670 671 672 673
            raise serial
        obj = self._cache.get(oid, None)
        if obj is None:
            return
        if serial == ResolvedSerial:
            del obj._p_changed # transition from changed to ghost
        else:
            if change:
674
                obj._p_changed = 0 # transition from changed to up-to-date
675
            obj._p_serial = serial
676

677 678 679
    def tpc_abort(self, transaction):
        if self._import:
            self._import = None
680 681 682

        if self._savepoint_storage is not None:
            self._abort_savepoint()
683

684
        self._storage.tpc_abort(transaction)
685

686 687
        # Note: If we invalidate a non-ghostifiable object (i.e. a
        # persistent class), the object will immediately reread its
688 689 690 691 692 693 694 695 696 697 698 699 700 701
        # state.  That means that the following call could result in a
        # call to self.setstate, which, of course, must succeed.  In
        # general, it would be better if the read could be delayed
        # until the start of the next transaction.  If we read at the
        # end of a transaction and if the object was invalidated
        # during this transaction, then we'll read non-current data,
        # which we'll discard later in transaction finalization.  We
        # could, theoretically queue this invalidation by calling
        # self.invalidate.  Unfortunately, attempts to make that
        # change resulted in mysterious test failures.  It's pretty
        # unlikely that the object we are invalidating was invalidated
        # by another thread, so the risk of a reread is pretty low.
        # It's really not worth the effort to pursue this.

702 703 704 705 706 707 708 709
        self._cache.invalidate(self._modified)
        self._invalidate_creating()
        while self._added:
            oid, obj = self._added.popitem()
            del obj._p_oid
            del obj._p_jar
        self._tpc_cleanup()

710 711 712 713
    def _invalidate_creating(self, creating=None):
        """Disown any objects newly saved in an uncommitted transaction."""
        if creating is None:
            creating = self._creating
714
            self._creating = {}
715

716 717 718 719 720 721
        for oid in creating:
            o = self._cache.get(oid)
            if o is not None:
                del self._cache[oid]
                del o._p_jar
                del o._p_oid
722

723 724 725 726 727 728 729 730
    def tpc_vote(self, transaction):
        """Verify that a data manager can commit the transaction."""
        try:
            vote = self._storage.tpc_vote
        except AttributeError:
            return
        s = vote(transaction)
        self._handle_serial(s)
731

732 733
    def tpc_finish(self, transaction):
        """Indicate confirmation that the transaction is done."""
734

735
        def callback(tid):
736
            d = dict.fromkeys(self._modified)
737
            self._db.invalidate(tid, d, self)
738 739 740 741 742
#       It's important that the storage calls the passed function
#       while it still has its lock.  We don't want another thread
#       to be able to read any updated data until we've had a chance
#       to send an invalidation message to all of the other
#       connections!
743 744
        self._storage.tpc_finish(transaction, callback)
        self._tpc_cleanup()
745

746 747 748 749
    def sortKey(self):
        """Return a consistent sort key for this connection."""
        return "%s:%s" % (self._storage.sortKey(), id(self))

750
    # Data manager (ISavepointDataManager) methods
751 752 753 754 755 756 757 758 759
    ##########################################################################

    ##########################################################################
    # Transaction-manager synchronization -- ISynchronizer

    def beforeCompletion(self, txn):
        # We don't do anything before a commit starts.
        pass

760 761 762 763 764 765 766
    # Call the underlying storage's sync() method (if any), and process
    # pending invalidations regardless.  Of course this should only be
    # called at transaction boundaries.
    def _storage_sync(self, *ignored):
        sync = getattr(self._storage, 'sync', 0)
        if sync:
            sync()
767
        self._flush_invalidations()
768

769 770 771 772
    afterCompletion =  _storage_sync
    newTransaction = _storage_sync

     # Transaction-manager synchronization -- ISynchronizer
773
    ##########################################################################
774

775
    ##########################################################################
776 777 778 779 780 781 782 783 784
    # persistent.interfaces.IPersistentDatamanager

    def oldstate(self, obj, tid):
        """Return copy of 'obj' that was written by transaction 'tid'."""
        assert obj._p_jar is self
        p = self._storage.loadSerial(obj._p_oid, tid)
        return self._reader.getState(p)

    def setstate(self, obj):
785 786
        """Turns the ghost 'obj' into a real object by loading its state from
        the database."""
787 788
        oid = obj._p_oid

789
        if self._opened is None:
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
            msg = ("Shouldn't load state for %s "
                   "when the connection is closed" % oid_repr(oid))
            self._log.error(msg)
            raise ConnectionStateError(msg)

        try:
            self._setstate(obj)
        except ConflictError:
            raise
        except:
            self._log.error("Couldn't load state for %s", oid_repr(oid),
                            exc_info=sys.exc_info())
            raise

    def _setstate(self, obj):
        # Helper for setstate(), which provides logging of failures.

        # The control flow is complicated here to avoid loading an
        # object revision that we are sure we aren't going to use.  As
        # a result, invalidation tests occur before and after the
        # load.  We can only be sure about invalidations after the
        # load.

        # If an object has been invalidated, there are several cases
        # to consider:
        # 1. Check _p_independent()
        # 2. Try MVCC
        # 3. Raise ConflictError.

        # Does anything actually use _p_independent()?  It would simplify
820 821
        # the code if we could drop support for it.  
        # (BTrees.Length does.)
822 823


824 825
        if self.before is not None:
            # Load data that was current before the time we have.
826
            before = self.before
827 828
            t = self._storage.loadBefore(obj._p_oid, before)
            if t is None:
829
                raise POSKeyError() # historical connection!
830 831 832 833 834 835 836 837 838 839 840 841 842
            p, serial, end = t
        
        else:
            # There is a harmless data race with self._invalidated.  A
            # dict update could go on in another thread, but we don't care
            # because we have to check again after the load anyway.

            if self._invalidatedCache:
                raise ReadConflictError()
    
            if (obj._p_oid in self._invalidated and
                    not myhasattr(obj, "_p_independent")):
                # If the object has _p_independent(), we will handle it below.
843 844
                self._load_before_or_conflict(obj)
                return
845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
    
            p, serial = self._storage.load(obj._p_oid, '')
            self._load_count += 1
    
            self._inv_lock.acquire()
            try:
                invalid = obj._p_oid in self._invalidated
            finally:
                self._inv_lock.release()
    
            if invalid:
                if myhasattr(obj, "_p_independent"):
                    # This call will raise a ReadConflictError if something
                    # goes wrong
                    self._handle_independent(obj)
                else:
                    self._load_before_or_conflict(obj)
                    return
863 864 865 866

        self._reader.setGhostState(obj, p)
        obj._p_serial = serial

867
        # Blob support
868
        if isinstance(obj, Blob):
869
            obj._p_blob_uncommitted = None
870
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, serial)
871

872 873
    def _load_before_or_conflict(self, obj):
        """Load non-current state for obj or raise ReadConflictError."""
874
        if not self._setstate_noncurrent(obj):
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900
            self._register(obj)
            self._conflicts[obj._p_oid] = True
            raise ReadConflictError(object=obj)

    def _setstate_noncurrent(self, obj):
        """Set state using non-current data.

        Return True if state was available, False if not.
        """
        try:
            # Load data that was current before the commit at txn_time.
            t = self._storage.loadBefore(obj._p_oid, self._txn_time)
        except KeyError:
            return False
        if t is None:
            return False
        data, start, end = t
        # The non-current transaction must have been written before
        # txn_time.  It must be current at txn_time, but could have
        # been modified at txn_time.

        assert start < self._txn_time, (u64(start), u64(self._txn_time))
        assert end is not None
        assert self._txn_time <= end, (u64(self._txn_time), u64(end))
        self._reader.setGhostState(obj, data)
        obj._p_serial = start
Gary Poster's avatar
Gary Poster committed
901 902 903 904 905 906

        # MVCC Blob support
        if isinstance(obj, Blob):
            obj._p_blob_uncommitted = None
            obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)

907 908 909 910 911 912 913 914 915 916 917
        return True

    def _handle_independent(self, obj):
        # Helper method for setstate() handles possibly independent objects
        # Call _p_independent(), if it returns True, setstate() wins.
        # Otherwise, raise a ConflictError.

        if obj._p_independent():
            self._inv_lock.acquire()
            try:
                try:
918
                    self._invalidated.remove(obj._p_oid)
919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
                except KeyError:
                    pass
            finally:
                self._inv_lock.release()
        else:
            self._conflicts[obj._p_oid] = 1
            self._register(obj)
            raise ReadConflictError(object=obj)

    def register(self, obj):
        """Register obj with the current transaction manager.

        A subclass could override this method to customize the default
        policy of one transaction manager for each thread.

        obj must be an object loaded from this Connection.
        """
        assert obj._p_jar is self
        if obj._p_oid is None:
            # The actual complaint here is that an object without
            # an oid is being registered.  I can't think of any way to
            # achieve that without assignment to _p_jar.  If there is
941 942
            # a way, this will be a very confusing exception.
            raise ValueError("assigning to _p_jar is not supported")
943 944 945 946 947 948
        elif obj._p_oid in self._added:
            # It was registered before it was added to _added.
            return
        self._register(obj)

    def _register(self, obj=None):
949 950 951 952 953

        # The order here is important.  We need to join before
        # registering the object, because joining may take a
        # savepoint, and the savepoint should not reflect the change
        # to the object.
954

955
        if self._needs_to_join:
956
            self.transaction_manager.get().join(self)
957 958
            self._needs_to_join = False

959 960 961
        if obj is not None:
            self._registered_objects.append(obj)

962

963 964 965 966
    # persistent.interfaces.IPersistentDatamanager
    ##########################################################################

    ##########################################################################
967 968 969 970 971 972 973 974 975 976 977 978 979
    # PROTECTED stuff (used by e.g. ZODB.DB.DB)

    def _cache_items(self):
        # find all items on the lru list
        items = self._cache.lru_items()
        # fine everything. some on the lru list, some not
        everything = self._cache.cache_data
        # remove those items that are on the lru list
        for k,v in items:
            del everything[k]
        # return a list of [ghosts....not recently used.....recently used]
        return everything.items() + items

980
    def open(self, transaction_manager=None, delegate=True):
981 982 983 984 985 986 987 988 989 990 991
        """Register odb, the DB that this Connection uses.

        This method is called by the DB every time a Connection
        is opened.  Any invalidations received while the Connection
        was closed will be processed.

        If the global module function resetCaches() was called, the
        cache will be cleared.

        Parameters:
        odb: database that owns the Connection
992 993
        transaction_manager: transaction manager to use.  None means
            use the default transaction manager.
994 995 996 997
        register for afterCompletion() calls.
        """

        self._opened = time()
998 999 1000

        if transaction_manager is None:
            transaction_manager = transaction.manager
1001

1002 1003
        self.transaction_manager = transaction_manager

1004 1005 1006 1007 1008 1009
        if self._reset_counter != global_reset_counter:
            # New code is in place.  Start a new cache.
            self._resetCache()
        else:
            self._flush_invalidations()

1010
        transaction_manager.registerSynch(self)
1011 1012 1013 1014 1015 1016 1017 1018

        if self._cache is not None:
            self._cache.incrgc() # This is a good time to do some GC

        if delegate:
            # delegate open to secondary connections
            for connection in self.connections.values():
                if connection is not self:
1019
                    connection.open(transaction_manager, False)
1020 1021 1022 1023 1024 1025 1026 1027

    def _resetCache(self):
        """Creates a new cache, discarding the old one.

        See the docstring for the resetCaches() function.
        """
        self._reset_counter = global_reset_counter
        self._invalidated.clear()
1028
        self._invalidatedCache = False
1029 1030 1031
        cache_size = self._cache.cache_size
        self._cache = cache = PickleCache(self, cache_size)

1032
    ##########################################################################
1033 1034 1035
    # Python protocol

    def __repr__(self):
1036
        return '<Connection at %08x>' % (positive_id(self),)
1037

1038 1039 1040 1041
    # Python protocol
    ##########################################################################

    ##########################################################################
1042 1043 1044 1045
    # DEPRECATION candidates

    __getitem__ = get

1046
    def exchange(self, old, new):
1047 1048 1049 1050 1051
        # called by a ZClasses method that isn't executed by the test suite
        oid = old._p_oid
        new._p_oid = oid
        new._p_jar = self
        new._p_changed = 1
1052
        self._register(new)
1053
        self._cache[oid] = new
1054

1055 1056 1057 1058
    # DEPRECATION candidates
    ##########################################################################

    ##########################################################################
1059 1060
    # DEPRECATED methods

1061
    # None at present.
1062

1063 1064
    # DEPRECATED methods
    ##########################################################################
1065

1066 1067 1068 1069 1070
    #####################################################################
    # Savepoint support

    def savepoint(self):
        if self._savepoint_storage is None:
1071
            tmpstore = TmpStore(self._normal_storage)
1072
            self._savepoint_storage = tmpstore
1073 1074
            self._storage = self._savepoint_storage

1075
        self._creating.clear()
1076
        self._commit(None)
1077 1078
        self._storage.creating.update(self._creating)
        self._creating.clear()
1079 1080 1081
        self._registered_objects = []

        state = self._storage.position, self._storage.index.copy()
1082 1083 1084 1085 1086 1087 1088
        result = Savepoint(self, state)
        # While the interface doesn't guarantee this, savepoints are
        # sometimes used just to "break up" very long transactions, and as
        # a pragmatic matter this is a good time to reduce the cache
        # memory burden.
        self.cacheGC()
        return result
1089 1090 1091 1092 1093

    def _rollback(self, state):
        self._abort()
        self._registered_objects = []
        src = self._storage
1094
        self._cache.invalidate(src.index)
1095 1096 1097
        src.reset(*state)

    def _commit_savepoint(self, transaction):
1098
        """Commit all changes made in savepoints and begin 2-phase commit
1099 1100 1101 1102 1103
        """
        src = self._savepoint_storage
        self._storage = self._normal_storage
        self._savepoint_storage = None

1104
        self._log.debug("Committing savepoints of size %s", src.getSize())
1105 1106 1107 1108
        oids = src.index.keys()

        # Copy invalidating and creating info from temporary storage:
        self._modified.extend(oids)
1109
        self._creating.update(src.creating)
1110 1111 1112

        for oid in oids:
            data, serial = src.load(oid, src)
1113
            if isinstance(self._reader.getGhost(data), Blob):
1114
                blobfilename = src.loadBlob(oid, serial)
1115
                s = self._storage.storeBlob(oid, serial, data, blobfilename,
1116
                                            '', transaction)
1117 1118 1119 1120 1121
                # we invalidate the object here in order to ensure
                # that that the next attribute access of its name
                # unghostify it, which will cause its blob data
                # to be reattached "cleanly"
                self.invalidate(s, {oid:True})
1122 1123
            else:
                s = self._storage.store(oid, serial, data,
1124
                                        '', transaction)
1125

1126 1127 1128 1129
            self._handle_serial(s, oid, change=False)
        src.close()

    def _abort_savepoint(self):
1130
        """Discard all savepoint data."""
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
        src = self._savepoint_storage
        self._storage = self._normal_storage
        self._savepoint_storage = None

        # Note: If we invalidate a non-ghostifiable object (i.e. a
        # persistent class), the object will immediately reread it's
        # state.  That means that the following call could result in a
        # call to self.setstate, which, of course, must succeed.  In
        # general, it would be better if the read could be delayed
        # until the start of the next transaction.  If we read at the
        # end of a transaction and if the object was invalidated
        # during this transaction, then we'll read non-current data,
        # which we'll discard later in transaction finalization.  We
        # could, theoretically queue this invalidation by calling
        # self.invalidate.  Unfortunately, attempts to make that
        # change resulted in mysterious test failures.  It's pretty
        # unlikely that the object we are invalidating was invalidated
        # by another thread, so the risk of a reread is pretty low.
        # It's really not worth the effort to pursue this.

        self._cache.invalidate(src.index)
        self._invalidate_creating(src.creating)
        src.close()

    # Savepoint support
    #####################################################################

class Savepoint:

1160 1161
    implements(IDataManagerSavepoint)

1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
    def __init__(self, datamanager, state):
        self.datamanager = datamanager
        self.state = state

    def rollback(self):
        self.datamanager._rollback(self.state)

class TmpStore:
    """A storage-like thing to support savepoints."""

1172 1173
    implements(IBlobStorage)

1174
    def __init__(self, storage):
1175 1176
        self._storage = storage
        for method in (
1177
            'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
1178
            'isReadOnly'
1179 1180
            ):
            setattr(self, method, getattr(storage, method))
1181

1182 1183 1184 1185 1186 1187
        self._file = tempfile.TemporaryFile()
        # position: current file position
        # _tpos: file position at last commit point
        self.position = 0L
        # index: map oid to pos of last committed version
        self.index = {}
1188
        self.creating = {}
1189

1190 1191 1192 1193 1194 1195 1196 1197 1198
    def __len__(self):
        return len(self.index)

    def close(self):
        self._file.close()

    def load(self, oid, version):
        pos = self.index.get(oid)
        if pos is None:
1199
            return self._storage.load(oid, '')
1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
        self._file.seek(pos)
        h = self._file.read(8)
        oidlen = u64(h)
        read_oid = self._file.read(oidlen)
        if read_oid != oid:
            raise POSException.StorageSystemError('Bad temporary storage')
        h = self._file.read(16)
        size = u64(h[8:])
        serial = h[:8]
        return self._file.read(size), serial

    def store(self, oid, serial, data, version, transaction):
        # we have this funny signature so we can reuse the normal non-commit
        # commit logic
1214
        assert version == ''
1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
        self._file.seek(self.position)
        l = len(data)
        if serial is None:
            serial = z64
        header = p64(len(oid)) + oid + serial + p64(l)
        self._file.write(header)
        self._file.write(data)
        self.index[oid] = self.position
        self.position += l + len(header)
        return serial

1226 1227
    def storeBlob(self, oid, serial, data, blobfilename, version,
                  transaction):
1228 1229
        assert version == ''
        serial = self.store(oid, serial, data, '', transaction)
1230 1231 1232 1233 1234 1235

        targetpath = self._getBlobPath(oid)
        if not os.path.exists(targetpath):
            os.makedirs(targetpath, 0700)

        targetname = self._getCleanFilename(oid, serial)
1236
        rename_or_copy_blob(blobfilename, targetname, chmod=False)
1237

1238
    def loadBlob(self, oid, serial):
1239 1240
        """Return the filename where the blob file can be found.
        """
1241 1242 1243 1244
        if not IBlobStorage.providedBy(self._storage):
            raise Unsupported(
                "Blobs are not supported by the underlying storage %r." %
                self._storage)
1245 1246
        filename = self._getCleanFilename(oid, serial)
        if not os.path.exists(filename):
1247
            raise POSKeyError("No blob file", oid, serial)
1248 1249 1250
        return filename

    def _getBlobPath(self, oid):
1251
        return os.path.join(self.temporaryDirectory(),
1252 1253 1254 1255 1256
                            utils.oid_repr(oid)
                            )

    def _getCleanFilename(self, oid, tid):
        return os.path.join(self._getBlobPath(oid),
1257
                            "%s%s" % (utils.tid_repr(tid), SAVEPOINT_SUFFIX,)
1258
                            )
1259 1260 1261 1262

    def temporaryDirectory(self):
        return self._storage.temporaryDirectory()

1263 1264 1265
    def reset(self, position, index):
        self._file.truncate(position)
        self.position = position
1266 1267 1268 1269 1270 1271 1272 1273 1274 1275
        # Caution:  We're typically called as part of a savepoint rollback.
        # Other machinery remembers the index to restore, and passes it to
        # us.  If we simply bind self.index to `index`, then if the caller
        # didn't pass a copy of the index, the caller's index will mutate
        # when self.index mutates.  This can be a disaster if the caller is a
        # savepoint to which the user rolls back again later (the savepoint
        # loses the original index it passed).  Therefore, to be safe, we make
        # a copy of the index here.  An alternative would be to ensure that
        # all callers pass copies.  As is, our callers do not make copies.
        self.index = index.copy()