StorageServer.py 55.4 KB
Newer Older
1 2
##############################################################################
#
3
# Copyright (c) 2001, 2002, 2003 Zope Corporation and Contributors.
4
# All Rights Reserved.
Guido van Rossum's avatar
Guido van Rossum committed
5
#
6
# This software is subject to the provisions of the Zope Public License,
Jim Fulton's avatar
Jim Fulton committed
7
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
8 9 10 11
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
Guido van Rossum's avatar
Guido van Rossum committed
12
#
13
##############################################################################
14
"""The StorageServer class and the exception that it may raise.
Christopher Petrilli's avatar
Christopher Petrilli committed
15

Jeremy Hylton's avatar
Jeremy Hylton committed
16 17
This server acts as a front-end for one or more real storages, like
file storage or Berkeley storage.
Jim Fulton's avatar
Jim Fulton committed
18

19
TODO:  Need some basic access control-- a declaration of the methods
Jeremy Hylton's avatar
Jeremy Hylton committed
20 21 22
exported for invocation by the server.
"""

23 24
from __future__ import with_statement

Jim Fulton's avatar
Jim Fulton committed
25 26 27 28 29 30 31 32 33 34 35 36 37
from ZEO.CommitLog import CommitLog
from ZEO.Exceptions import AuthError
from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
from ZEO.zrpc.server import Dispatcher
from ZEO.zrpc.trigger import trigger
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.loglevels import BLATHER
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
from ZODB.utils import oid_repr, p64, u64, z64

Jeremy Hylton's avatar
Jeremy Hylton committed
38
import asyncore
39
import cPickle
Jim Fulton's avatar
Jim Fulton committed
40
import itertools
41
import logging
Jeremy Hylton's avatar
Jeremy Hylton committed
42 43
import os
import sys
44
import tempfile
Jeremy Hylton's avatar
Jeremy Hylton committed
45
import threading
Jeremy Hylton's avatar
Jeremy Hylton committed
46
import time
47
import transaction
Jim Fulton's avatar
Jim Fulton committed
48 49
import warnings
import ZEO.zrpc.error
50
import ZODB.blob
51
import ZODB.serialize
52
import ZODB.TimeStamp
53
import zope.interface
54

55

56 57
logger = logging.getLogger('ZEO.StorageServer')

Jim Fulton's avatar
Jim Fulton committed
58
def log(message, level=logging.INFO, label='', exc_info=False):
59 60 61 62
    """Internal helper to log a message."""
    if label:
        message = "(%s) %s" % (label, message)
    logger.log(level, message, exc_info=exc_info)
Jeremy Hylton's avatar
Jeremy Hylton committed
63

64

Jeremy Hylton's avatar
Jeremy Hylton committed
65
class StorageServerError(StorageError):
66
    """Error reported when an unpicklable exception is raised."""
67

68

Jeremy Hylton's avatar
Jeremy Hylton committed
69
class ZEOStorage:
70
    """Proxy to underlying storage for a single remote client."""
Guido van Rossum's avatar
Guido van Rossum committed
71

72 73 74 75 76
    # A list of extension methods.  A subclass with extra methods
    # should override.
    extensions = []

    def __init__(self, server, read_only=0, auth_realm=None):
Jeremy Hylton's avatar
Jeremy Hylton committed
77
        self.server = server
78 79
        # timeout and stats will be initialized in register()
        self.stats = None
80
        self.connection = None
Jeremy Hylton's avatar
Jeremy Hylton committed
81
        self.client = None
82 83 84
        self.storage = None
        self.storage_id = "uninitialized"
        self.transaction = None
85
        self.read_only = read_only
Jim Fulton's avatar
Jim Fulton committed
86
        self.log_label = 'unconnected'
87
        self.locked = False             # Don't have storage lock
88
        self.verifying = 0
89
        self.store_failed = 0
90 91
        self.authenticated = 0
        self.auth_realm = auth_realm
92
        self.blob_tempfile = None
93 94 95 96
        # The authentication protocol may define extra methods.
        self._extensions = {}
        for func in self.extensions:
            self._extensions[func.func_name] = None
97 98 99 100 101
        self._iterators = {}
        self._iterator_ids = itertools.count()
        # Stores the last item that was handed out for a
        # transaction iterator.
        self._txn_iterators_last = {}
102

Jim Fulton's avatar
Jim Fulton committed
103
    def _finish_auth(self, authenticated):
104 105 106 107
        if not self.auth_realm:
            return 1
        self.authenticated = authenticated
        return authenticated
Jeremy Hylton's avatar
Jeremy Hylton committed
108

109 110
    def set_database(self, database):
        self.database = database
111

112
    def notifyConnected(self, conn):
113
        self.connection = conn
114 115 116 117 118 119
        assert conn.peer_protocol_version is not None
        if conn.peer_protocol_version < 'Z309':
            self.client = ClientStub308(conn)
            conn.register_object(ZEOStorage308Adapter(self))
        else:
            self.client = ClientStub(conn)
Jim Fulton's avatar
Jim Fulton committed
120
        self.log_label = _addr_label(conn.addr)
121 122

    def notifyDisconnected(self):
123
        # When this storage closes, we must ensure that it aborts
Guido van Rossum's avatar
Guido van Rossum committed
124
        # any pending transaction.
125
        if self.transaction is not None:
126
            self.log("disconnected during %s transaction"
Jim Fulton's avatar
Jim Fulton committed
127
                     % (self.locked and 'locked' or 'unlocked'))
128
            self.tpc_abort(self.transaction.id)
129
        else:
130
            self.log("disconnected")
131

132 133
        self.connection = None

134
    def __repr__(self):
135 136
        tid = self.transaction and repr(self.transaction.id)
        if self.storage:
137 138
            stid = (self.tpc_transaction() and
                    repr(self.tpc_transaction().id))
Jeremy Hylton's avatar
Jeremy Hylton committed
139 140 141 142 143
        else:
            stid = None
        name = self.__class__.__name__
        return "<%s %X trans=%s s_trans=%s>" % (name, id(self), tid, stid)

144 145
    def log(self, msg, level=logging.INFO, exc_info=False):
        log(msg, level=level, label=self.log_label, exc_info=exc_info)
Jeremy Hylton's avatar
Jeremy Hylton committed
146 147

    def setup_delegation(self):
148 149
        """Delegate several methods to the storage
        """
150
        # Called from register
151 152 153 154 155 156 157 158 159 160 161 162

        storage = self.storage

        info = self.get_info()

        if not info['supportsUndo']:
            self.undoLog = self.undoInfo = lambda *a,**k: ()

        self.getTid = storage.getTid
        self.load = storage.load
        self.loadSerial = storage.loadSerial
        record_iternext = getattr(storage, 'record_iternext', None)
163 164
        if record_iternext is not None:
            self.record_iternext = record_iternext
165

166
        try:
167
            fn = storage.getExtensionMethods
168
        except AttributeError:
169
            pass # no extension methods
170
        else:
171 172
            d = fn()
            self._extensions.update(d)
173
            for name in d:
174
                assert not hasattr(self, name)
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
                setattr(self, name, getattr(storage, name))
        self.lastTransaction = storage.lastTransaction

        try:
            self.tpc_transaction = storage.tpc_transaction
        except AttributeError:
            if hasattr(storage, '_transaction'):
                log("Storage %r doesn't have a tpc_transaction method.\n"
                    "See ZEO.interfaces.IServeable."
                    "Falling back to using _transaction attribute, which\n."
                    "is icky.",
                    logging.ERROR)
                self.tpc_transaction = lambda : storage._transaction
            else:
                raise
190

Jim Fulton's avatar
Jim Fulton committed
191 192 193 194 195
    def history(self,tid,size=1):
        # This caters for storages which still accept
        # a version parameter.
        return self.storage.history(tid,size=size)

196
    def _check_tid(self, tid, exc=None):
197 198
        if self.read_only:
            raise ReadOnlyError()
199
        if self.transaction is None:
200
            caller = sys._getframe().f_back.f_code.co_name
201 202
            self.log("no current transaction: %s()" % caller,
                     level=logging.WARNING)
Jeremy Hylton's avatar
Jeremy Hylton committed
203 204
            if exc is not None:
                raise exc(None, tid)
205
            else:
Jeremy Hylton's avatar
Jeremy Hylton committed
206
                return 0
207
        if self.transaction.id != tid:
208
            caller = sys._getframe().f_back.f_code.co_name
209 210
            self.log("%s(%s) invalid; current transaction = %s" %
                     (caller, repr(tid), repr(self.transaction.id)),
211
                     logging.WARNING)
Jeremy Hylton's avatar
Jeremy Hylton committed
212
            if exc is not None:
213
                raise exc(self.transaction.id, tid)
Jim Fulton's avatar
Jim Fulton committed
214
            else:
Jeremy Hylton's avatar
Jeremy Hylton committed
215 216
                return 0
        return 1
217

218 219 220 221 222 223 224 225
    def getAuthProtocol(self):
        """Return string specifying name of authentication module to use.

        The module name should be auth_%s where %s is auth_protocol."""
        protocol = self.server.auth_protocol
        if not protocol or protocol == 'none':
            return None
        return protocol
226

Jeremy Hylton's avatar
Jeremy Hylton committed
227 228
    def register(self, storage_id, read_only):
        """Select the storage that this client will use
229

Jeremy Hylton's avatar
Jeremy Hylton committed
230
        This method must be the first one called by the client.
231 232
        For authenticated storages this method will be called by the client
        immediately after authentication is finished.
Jeremy Hylton's avatar
Jeremy Hylton committed
233
        """
234
        if self.auth_realm and not self.authenticated:
235
            raise AuthError("Client was never authenticated with server!")
236

237
        if self.storage is not None:
238
            self.log("duplicate register() call")
239
            raise ValueError("duplicate register() call")
240

Jeremy Hylton's avatar
Jeremy Hylton committed
241 242
        storage = self.server.storages.get(storage_id)
        if storage is None:
243
            self.log("unknown storage_id: %s" % storage_id)
244
            raise ValueError("unknown storage: %s" % storage_id)
245

246
        if not read_only and (self.read_only or storage.isReadOnly()):
Jeremy Hylton's avatar
Jeremy Hylton committed
247
            raise ReadOnlyError()
248

249
        self.read_only = self.read_only or read_only
250 251
        self.storage_id = storage_id
        self.storage = storage
Jeremy Hylton's avatar
Jeremy Hylton committed
252
        self.setup_delegation()
253
        self.stats = self.server.register_connection(storage_id, self)
Jim Fulton's avatar
Jim Fulton committed
254 255

    def get_info(self):
256 257
        storage = self.storage

258 259 260

        supportsUndo = (getattr(storage, 'supportsUndo', lambda : False)()
                        and self.connection.peer_protocol_version >= 'Z310')
261

262 263 264 265 266 267
        # Communicate the backend storage interfaces to the client
        storage_provides = zope.interface.providedBy(storage)
        interfaces = []
        for candidate in storage_provides.__iro__:
            interfaces.append((candidate.__module__, candidate.__name__))

268 269 270 271
        return {'length': len(storage),
                'size': storage.getSize(),
                'name': storage.getName(),
                'supportsUndo': supportsUndo,
272
                'extensionMethods': self.getExtensionMethods(),
273
                'supports_record_iternext': hasattr(self, 'record_iternext'),
274
                'interfaces': tuple(interfaces),
Jeremy Hylton's avatar
Jeremy Hylton committed
275
                }
Jim Fulton's avatar
Jim Fulton committed
276

Jim Fulton's avatar
Jim Fulton committed
277
    def get_size_info(self):
278 279
        return {'length': len(self.storage),
                'size': self.storage.getSize(),
Jeremy Hylton's avatar
Jeremy Hylton committed
280
                }
Jim Fulton's avatar
Jim Fulton committed
281

282
    def getExtensionMethods(self):
283
        return self._extensions
284

285
    def loadEx(self, oid):
286
        self.stats.loads += 1
287
        return self.storage.load(oid, '')
288 289 290 291 292

    def loadBefore(self, oid, tid):
        self.stats.loads += 1
        return self.storage.loadBefore(oid, tid)

293
    def getInvalidations(self, tid):
294
        invtid, invlist = self.server.get_invalidations(self.storage_id, tid)
295 296 297 298 299
        if invtid is None:
            return None
        self.log("Return %d invalidations up to tid %s"
                 % (len(invlist), u64(invtid)))
        return invtid, invlist
Jeremy Hylton's avatar
Jeremy Hylton committed
300

301
    def verify(self, oid, tid):
302
        try:
303
            t = self.getTid(oid)
304
        except KeyError:
305
            self.client.invalidateVerify(oid)
306 307
        else:
            if tid != t:
308
                self.client.invalidateVerify(oid)
309

310
    def zeoVerify(self, oid, s):
311 312 313
        if not self.verifying:
            self.verifying = 1
            self.stats.verifying_clients += 1
Jeremy Hylton's avatar
Jeremy Hylton committed
314
        try:
315
            os = self.getTid(oid)
316
        except KeyError:
317
            self.client.invalidateVerify((oid, ''))
318
            # It's not clear what we should do now.  The KeyError
319 320 321 322
            # could be caused by an object uncreation, in which case
            # invalidation is right.  It could be an application bug
            # that left a dangling reference, in which case it's bad.
        else:
323 324
            if s != os:
                self.client.invalidateVerify((oid, ''))
325

326
    def endZeoVerify(self):
327 328 329
        if self.verifying:
            self.stats.verifying_clients -= 1
        self.verifying = 0
Jeremy Hylton's avatar
Jeremy Hylton committed
330
        self.client.endVerify()
331

332
    def pack(self, time, wait=1):
333
        # Yes, you can pack a read-only server or storage!
334
        if wait:
335
            return run_in_thread(self._pack_impl, time)
336
        else:
337 338
            # If the client isn't waiting for a reply, start a thread
            # and forget about it.
339
            t = threading.Thread(target=self._pack_impl, args=(time,))
340
            t.start()
341
            return None
342

343
    def _pack_impl(self, time):
344
        self.log("pack(time=%s) started..." % repr(time))
345
        self.storage.pack(time, referencesf)
346
        self.log("pack(time=%s) complete" % repr(time))
347
        # Broadcast new size statistics
348 349
        self.server.invalidate(0, self.storage_id, None,
                               (), self.get_size_info())
Jeremy Hylton's avatar
Jeremy Hylton committed
350 351 352

    def new_oids(self, n=100):
        """Return a sequence of n new oids, where n defaults to 100"""
Jim Fulton's avatar
Jim Fulton committed
353
        n = min(n, 100)
354 355
        if self.read_only:
            raise ReadOnlyError()
356
        if n <= 0:
Jeremy Hylton's avatar
Jeremy Hylton committed
357
            n = 1
358
        return [self.storage.new_oid() for i in range(n)]
Jeremy Hylton's avatar
Jeremy Hylton committed
359

360 361 362
    # undoLog and undoInfo are potentially slow methods

    def undoInfo(self, first, last, spec):
363
        return run_in_thread(self.storage.undoInfo, first, last, spec)
364 365

    def undoLog(self, first, last):
366
        return run_in_thread(self.storage.undoLog, first, last)
367

368
    def tpc_begin(self, id, user, description, ext, tid=None, status=" "):
369 370
        if self.read_only:
            raise ReadOnlyError()
371 372 373
        if self.transaction is not None:
            if self.transaction.id == id:
                self.log("duplicate tpc_begin(%s)" % repr(id))
Jeremy Hylton's avatar
Jeremy Hylton committed
374 375 376 377 378
                return
            else:
                raise StorageTransactionError("Multiple simultaneous tpc_begin"
                                              " requests from one client.")

Jim Fulton's avatar
Jim Fulton committed
379
        t = transaction.Transaction()
Jeremy Hylton's avatar
Jeremy Hylton committed
380 381 382 383 384
        t.id = id
        t.user = user
        t.description = description
        t._extension = ext

385 386 387
        self.serials = []
        self.invalidated = []
        self.txnlog = CommitLog()
388
        self.blob_log = []
389 390
        self.tid = tid
        self.status = status
391
        self.store_failed = 0
392
        self.stats.active_txns += 1
Jeremy Hylton's avatar
Jeremy Hylton committed
393

Jim Fulton's avatar
Jim Fulton committed
394 395 396 397 398 399 400 401 402 403
        # Assign the transaction attribute last. This is so we don't
        # think we've entered TPC until everything is set.  Why?
        # Because if we have an error after this, the server will
        # think it is in TPC and the client will think it isn't.  At
        # that point, the client will keep trying to enter TPC and
        # server won't let it.  Errors *after* the tpc_begin call will
        # cause the client to abort the transaction.
        # (Also see https://bugs.launchpad.net/zodb/+bug/374737.)
        self.transaction = t

Jeremy Hylton's avatar
Jeremy Hylton committed
404
    def tpc_finish(self, id):
405
        if not self._check_tid(id):
Jeremy Hylton's avatar
Jeremy Hylton committed
406
            return
407 408
        assert self.locked, "finished called wo lock"

409
        self.stats.commits += 1
410 411 412
        self.storage.tpc_finish(self.transaction, self._invalidate)
        # Note that the tid is still current because we still hold the
        # commit lock. We'll relinquish it in _clear_transaction.
413
        tid = self.storage.lastTransaction()
414 415 416 417
        # Return the tid, for cache invalidation optimization
        return Result(tid, self._clear_transaction)

    def _invalidate(self, tid):
418
        if self.invalidated:
419
            self.server.invalidate(self, self.storage_id, tid,
420
                                   self.invalidated, self.get_size_info())
Jeremy Hylton's avatar
Jeremy Hylton committed
421

422 423
    def tpc_abort(self, tid):
        if not self._check_tid(tid):
Jeremy Hylton's avatar
Jeremy Hylton committed
424
            return
425
        self.stats.aborts += 1
426
        self.storage.tpc_abort(self.transaction)
427 428 429 430 431
        self._clear_transaction()

    def _clear_transaction(self):
        # Common code at end of tpc_finish() and tpc_abort()
        if self.locked:
432
            self.server.unlock_storage(self)
433
            self.locked = 0
Jim Fulton's avatar
Jim Fulton committed
434 435 436
        if self.transaction is not None:
            self.server.stop_waiting(self)
            self.transaction = None
437 438 439 440 441 442 443 444 445 446
        self.stats.active_txns -= 1
        if self.txnlog is not None:
            self.txnlog.close()
            self.txnlog = None
            for oid, oldserial, data, blobfilename in self.blob_log:
                ZODB.blob.remove_committed(blobfilename)
            del self.blob_log

    def vote(self, tid):
        self._check_tid(tid, exc=StorageTransactionError)
Jim Fulton's avatar
Jim Fulton committed
447 448 449 450
        if self.locked or self.server.already_waiting(self):
            raise StorageTransactionError(
                'Already voting (%s)' % (self.locked and 'locked' or 'waiting')
                )
451 452 453 454 455
        return self._try_to_vote()

    def _try_to_vote(self, delay=None):
        if self.connection is None:
            return # We're disconnected
Jim Fulton's avatar
Jim Fulton committed
456 457 458 459 460 461 462
        if delay is not None and delay.sent:
            # as a consequence of the unlocking strategy, _try_to_vote
            # may be called multiple times for delayed
            # transactions. The first call will mark the delay as
            # sent. We should skip if the delay was already sent.
            return
        self.locked, delay = self.server.lock_storage(self, delay)
463
        if self.locked:
464
            try:
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
                self.log(
                    "Preparing to commit transaction: %d objects, %d bytes"
                    % (self.txnlog.stores, self.txnlog.size()),
                    level=BLATHER)

                if (self.tid is not None) or (self.status != ' '):
                    self.storage.tpc_begin(self.transaction,
                                           self.tid, self.status)
                else:
                    self.storage.tpc_begin(self.transaction)

                for op, args in self.txnlog:
                    if not getattr(self, op)(*args):
                        break

                # Blob support
                while self.blob_log and not self.store_failed:
                    oid, oldserial, data, blobfilename = self.blob_log.pop()
                    self._store(oid, oldserial, data, blobfilename)

                if not self.store_failed:
                    # Only call tpc_vote of no store call failed,
                    # otherwise the serialnos() call will deliver an
                    # exception that will be handled by the client in
                    # its tpc_vote() method.
                    serials = self.storage.tpc_vote(self.transaction)
                    if serials:
                        self.serials.extend(serials)

                self.client.serialnos(self.serials)

496
            except Exception:
497 498
                self.storage.tpc_abort(self.transaction)
                self._clear_transaction()
499 500 501 502 503 504 505
                if delay is not None:
                    delay.error()
                else:
                    raise
            else:
                if delay is not None:
                    delay.reply(None)
Jim Fulton's avatar
Jim Fulton committed
506 507 508
                else:
                    return None

509
        else:
510 511 512 513
            return delay

    def _unlock_callback(self, delay):
        connection = self.connection
Jim Fulton's avatar
Jim Fulton committed
514 515 516
        if connection is None:
            self.server.stop_waiting(self)
        else:
517
            connection.call_from_thread(self._try_to_vote, delay)
518

519 520 521 522
    # The public methods of the ZEO client API do not do the real work.
    # They defer work until after the storage lock has been acquired.
    # Most of the real implementations are in methods beginning with
    # an _.
Jeremy Hylton's avatar
Jeremy Hylton committed
523

524 525 526 527 528
    def deleteObject(self, oid, serial, id):
        self._check_tid(id, exc=StorageTransactionError)
        self.stats.stores += 1
        self.txnlog.delete(oid, serial)

529
    def storea(self, oid, serial, data, id):
530 531
        self._check_tid(id, exc=StorageTransactionError)
        self.stats.stores += 1
532
        self.txnlog.store(oid, serial, data)
533

534 535 536 537 538
    def restorea(self, oid, serial, data, prev_txn, id):
        self._check_tid(id, exc=StorageTransactionError)
        self.stats.stores += 1
        self.txnlog.restore(oid, serial, data, prev_txn)

539 540 541 542
    def storeBlobStart(self):
        assert self.blob_tempfile is None
        self.blob_tempfile = tempfile.mkstemp(
            dir=self.storage.temporaryDirectory())
543

544 545 546
    def storeBlobChunk(self, chunk):
        os.write(self.blob_tempfile[0], chunk)

547
    def storeBlobEnd(self, oid, serial, data, id):
548 549
        self._check_tid(id, exc=StorageTransactionError)
        assert self.txnlog is not None # effectively not allowed after undo
550 551 552
        fd, tempname = self.blob_tempfile
        self.blob_tempfile = None
        os.close(fd)
553
        self.blob_log.append((oid, serial, data, tempname))
554

555
    def storeBlobShared(self, oid, serial, data, filename, id):
556 557
        self._check_tid(id, exc=StorageTransactionError)
        assert self.txnlog is not None # effectively not allowed after undo
Jim Fulton's avatar
Jim Fulton committed
558

559
        # Reconstruct the full path from the filename in the OID directory
Jim Fulton's avatar
Jim Fulton committed
560 561 562 563 564 565 566 567 568 569
        if (os.path.sep in filename
            or not (filename.endswith('.tmp')
                    or filename[:-1].endswith('.tmp')
                    )
            ):
            logger.critical(
                "We're under attack! (bad filename to storeBlobShared, %r)",
                filename)
            raise ValueError(filename)

570 571
        filename = os.path.join(self.storage.fshelper.getPathForOID(oid),
                                filename)
572
        self.blob_log.append((oid, serial, data, filename))
573

574 575
    def sendBlob(self, oid, serial):
        self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
576

577 578 579 580 581 582 583
    def undo(*a, **k):
        raise NotImplementedError

    def undoa(self, trans_id, tid):
        self._check_tid(tid, exc=StorageTransactionError)
        self.txnlog.undo(trans_id)

584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607
    def _delete(self, oid, serial):
        err = None
        try:
            self.storage.deleteObject(oid, serial, self.transaction)
        except (SystemExit, KeyboardInterrupt):
            raise
        except Exception, err:
            self.store_failed = 1
            if isinstance(err, ConflictError):
                self.stats.conflicts += 1
                self.log("conflict error oid=%s msg=%s" %
                         (oid_repr(oid), str(err)), BLATHER)
            if not isinstance(err, TransactionError):
                # Unexpected errors are logged and passed to the client
                self.log("store error: %s, %s" % sys.exc_info()[:2],
                         logging.ERROR, exc_info=True)
            err = self._marshal_error(err)
            # The exception is reported back as newserial for this oid
            self.serials.append((oid, err))
        else:
            self.invalidated.append(oid)

        return err is None

608
    def _store(self, oid, serial, data, blobfile=None):
609
        err = None
Jeremy Hylton's avatar
Jeremy Hylton committed
610
        try:
611 612 613 614 615 616
            if blobfile is None:
                newserial = self.storage.store(
                    oid, serial, data, '', self.transaction)
            else:
                newserial = self.storage.storeBlob(
                    oid, serial, data, blobfile, '', self.transaction)
617 618
        except (SystemExit, KeyboardInterrupt):
            raise
619
        except Exception, err:
620
            self.store_failed = 1
621 622
            if isinstance(err, ConflictError):
                self.stats.conflicts += 1
623
                self.log("conflict error oid=%s msg=%s" %
624
                         (oid_repr(oid), str(err)), BLATHER)
625 626
            if not isinstance(err, TransactionError):
                # Unexpected errors are logged and passed to the client
627 628
                self.log("store error: %s, %s" % sys.exc_info()[:2],
                         logging.ERROR, exc_info=True)
629
            err = self._marshal_error(err)
630
            # The exception is reported back as newserial for this oid
631
            newserial = [(oid, err)]
632
        else:
Jeremy Hylton's avatar
Jeremy Hylton committed
633
            if serial != "\0\0\0\0\0\0\0\0":
634
                self.invalidated.append(oid)
635 636 637 638 639 640 641 642 643 644 645 646 647 648

            if isinstance(newserial, str):
                newserial = [(oid, newserial)]

        if newserial:
            for oid, s in newserial:

                if s == ResolvedSerial:
                    self.stats.conflicts_resolved += 1
                    self.log("conflict resolved oid=%s"
                             % oid_repr(oid), BLATHER)

                self.serials.append((oid, s))

649
        return err is None
Jeremy Hylton's avatar
Jeremy Hylton committed
650

651 652 653
    def _restore(self, oid, serial, data, prev_txn):
        err = None
        try:
Jim Fulton's avatar
Jim Fulton committed
654 655
            self.storage.restore(oid, serial, data, '', prev_txn,
                                 self.transaction)
656 657 658 659 660 661 662 663 664 665 666 667 668 669
        except (SystemExit, KeyboardInterrupt):
            raise
        except Exception, err:
            self.store_failed = 1
            if not isinstance(err, TransactionError):
                # Unexpected errors are logged and passed to the client
                self.log("store error: %s, %s" % sys.exc_info()[:2],
                         logging.ERROR, exc_info=True)
            err = self._marshal_error(err)
            # The exception is reported back as newserial for this oid
            self.serials.append((oid, err))

        return err is None

670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
    def _undo(self, trans_id):
        err = None
        try:
            tid, oids = self.storage.undo(trans_id, self.transaction)
        except (SystemExit, KeyboardInterrupt):
            raise
        except Exception, err:
            self.store_failed = 1
            if not isinstance(err, TransactionError):
                # Unexpected errors are logged and passed to the client
                self.log("store error: %s, %s" % sys.exc_info()[:2],
                         logging.ERROR, exc_info=True)
            err = self._marshal_error(err)
            # The exception is reported back as newserial for this oid
            self.serials.append((z64, err))
        else:
            self.invalidated.extend(oids)
            self.serials.extend((oid, ResolvedSerial) for oid in oids)

        return err is None

691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735
    def _marshal_error(self, error):
        # Try to pickle the exception.  If it can't be pickled,
        # the RPC response would fail, so use something that can be pickled.
        pickler = cPickle.Pickler()
        pickler.fast = 1
        try:
            pickler.dump(error, 1)
        except:
            msg = "Couldn't pickle storage exception: %s" % repr(error)
            self.log(msg, logging.ERROR)
            error = StorageServerError(msg)
        return error

    # IStorageIteration support

    def iterator_start(self, start, stop):
        iid = self._iterator_ids.next()
        self._iterators[iid] = iter(self.storage.iterator(start, stop))
        return iid

    def iterator_next(self, iid):
        iterator = self._iterators[iid]
        try:
            info = iterator.next()
        except StopIteration:
            del self._iterators[iid]
            item = None
            if iid in self._txn_iterators_last:
                del self._txn_iterators_last[iid]
        else:
            item = (info.tid,
                    info.status,
                    info.user,
                    info.description,
                    info.extension)
            # Keep a reference to the last iterator result to allow starting a
            # record iterator off it.
            self._txn_iterators_last[iid] = info
        return item

    def iterator_record_start(self, txn_iid, tid):
        record_iid = self._iterator_ids.next()
        txn_info = self._txn_iterators_last[txn_iid]
        if txn_info.tid != tid:
            raise Exception(
Jim Fulton's avatar
Jim Fulton committed
736 737
                'Out-of-order request for record iterator for transaction %r'
                % tid)
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
        self._iterators[record_iid] = iter(txn_info)
        return record_iid

    def iterator_record_next(self, iid):
        iterator = self._iterators[iid]
        try:
            info = iterator.next()
        except StopIteration:
            del self._iterators[iid]
            item = None
        else:
            item = (info.oid,
                    info.tid,
                    info.data,
                    info.data_txn)
        return item

    def iterator_gc(self, iids):
        for iid in iids:
            self._iterators.pop(iid, None)

Jim Fulton's avatar
Jim Fulton committed
759 760
    def server_status(self):
        return self.server.server_status(self)
761

762 763 764
    def set_client_label(self, label):
        self.log_label = str(label)+' '+_addr_label(self.connection.addr)

765 766 767 768 769 770 771 772
class StorageServerDB:

    def __init__(self, server, storage_id):
        self.server = server
        self.storage_id = storage_id
        self.references = ZODB.serialize.referencesf

    def invalidate(self, tid, oids, version=''):
773 774
        if version:
            raise StorageServerError("Versions aren't supported.")
775
        storage_id = self.storage_id
776
        self.server.invalidate(None, storage_id, tid, oids)
777 778 779

    def invalidateCache(self):
        self.server._invalidateCache(self.storage_id)
780

781
    transform_record_data = untransform_record_data = lambda self, data: data
782

783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
class StorageServer:

    """The server side implementation of ZEO.

    The StorageServer is the 'manager' for incoming connections.  Each
    connection is associated with its own ZEOStorage instance (defined
    below).  The StorageServer may handle multiple storages; each
    ZEOStorage instance only handles a single storage.
    """

    # Classes we instantiate.  A subclass might override.

    DispatcherClass = Dispatcher
    ZEOStorageClass = ZEOStorage
    ManagedServerConnectionClass = ManagedServerConnection

    def __init__(self, addr, storages, read_only=0,
                 invalidation_queue_size=100,
801
                 invalidation_age=None,
802
                 transaction_timeout=None,
803 804
                 monitor_address=None,
                 auth_protocol=None,
805
                 auth_database=None,
806
                 auth_realm=None):
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
        """StorageServer constructor.

        This is typically invoked from the start.py script.

        Arguments (the first two are required and positional):

        addr -- the address at which the server should listen.  This
            can be a tuple (host, port) to signify a TCP/IP connection
            or a pathname string to signify a Unix domain socket
            connection.  A hostname may be a DNS name or a dotted IP
            address.

        storages -- a dictionary giving the storage(s) to handle.  The
            keys are the storage names, the values are the storage
            instances, typically FileStorage or Berkeley storage
            instances.  By convention, storage names are typically
            strings representing small integers starting at '1'.

        read_only -- an optional flag saying whether the server should
            operate in read-only mode.  Defaults to false.  Note that
            even if the server is operating in writable mode,
            individual storages may still be read-only.  But if the
            server is in read-only mode, no write operations are
            allowed, even if the storages are writable.  Note that
            pack() is considered a read-only operation.

        invalidation_queue_size -- The storage server keeps a queue
            of the objects modified by the last N transactions, where
            N == invalidation_queue_size.  This queue is used to
            speed client cache verification when a client disconnects
            for a short period of time.

839 840 841 842 843 844 845
        invalidation_age --
            If the invalidation queue isn't big enough to support a
            quick verification, but the last transaction seen by a
            client is younger than the invalidation age, then
            invalidations will be computed by iterating over
            transactions later than the given transaction.

846
        transaction_timeout -- The maximum amount of time to wait for
847 848 849
            a transaction to commit after acquiring the storage lock.
            If the transaction takes too long, the client connection
            will be closed and the transaction aborted.
850 851 852 853

        monitor_address -- The address at which the monitor server
            should listen.  If specified, a monitor server is started.
            The monitor server provides server statistics in a simple
854 855 856 857
            text format.

        auth_protocol -- The name of the authentication protocol to use.
            Examples are "digest" and "srp".
Tim Peters's avatar
Tim Peters committed
858

859
        auth_database -- The name of the password database filename.
860 861 862
            It should be in a format compatible with the authentication
            protocol used; for instance, "sha" and "srp" require different
            formats.
863

864 865 866 867 868 869
            Note that to implement an authentication protocol, a server
            and client authentication mechanism must be implemented in a
            auth_* module, which should be stored inside the "auth"
            subdirectory. This module may also define a DatabaseClass
            variable that should indicate what database should be used
            by the authenticator.
870 871 872 873 874 875 876 877 878 879
        """

        self.addr = addr
        self.storages = storages
        msg = ", ".join(
            ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
                           storage.getName())
             for name, storage in storages.items()])
        log("%s created %s with storages: %s" %
            (self.__class__.__name__, read_only and "RO" or "RW", msg))
880 881 882 883


        self._lock = threading.Lock()
        self._commit_locks = {}
Jim Fulton's avatar
Jim Fulton committed
884
        self._waiting = dict((name, []) for name in storages)
885

886
        self.read_only = read_only
887
        self.auth_protocol = auth_protocol
888
        self.auth_database = auth_database
889 890 891 892
        self.auth_realm = auth_realm
        self.database = None
        if auth_protocol:
            self._setup_auth(auth_protocol)
893
        # A list, by server, of at most invalidation_queue_size invalidations.
894 895 896
        # The list is kept in sorted order with the most recent
        # invalidation at the front.  The list never has more than
        # self.invq_bound elements.
897
        self.invq_bound = invalidation_queue_size
898 899
        self.invq = {}
        for name, storage in storages.items():
900 901
            self._setup_invq(name, storage)
            storage.registerDB(StorageServerDB(self, name))
902
        self.invalidation_age = invalidation_age
903 904
        self.connections = {}
        self.dispatcher = self.DispatcherClass(addr,
905 906
                                               factory=self.new_connection)
        self.stats = {}
907 908
        self.timeouts = {}
        for name in self.storages.keys():
Jim Fulton's avatar
Jim Fulton committed
909 910
            self.connections[name] = []
            self.stats[name] = StorageStats(self.connections[name])
911 912 913 914 915 916 917
            if transaction_timeout is None:
                # An object with no-op methods
                timeout = StubTimeoutThread()
            else:
                timeout = TimeoutThread(transaction_timeout)
                timeout.start()
            self.timeouts[name] = timeout
918
        if monitor_address:
Jim Fulton's avatar
Jim Fulton committed
919 920 921 922
            warnings.warn(
                "The monitor server is deprecated. Use the server_status\n"
                "ZEO method instead.",
                DeprecationWarning)
923 924 925
            self.monitor = StatsServer(monitor_address, self.stats)
        else:
            self.monitor = None
926

927 928 929 930 931 932 933 934 935 936 937
    def _setup_invq(self, name, storage):
        lastInvalidations = getattr(storage, 'lastInvalidations', None)
        if lastInvalidations is None:
            self.invq[name] = [(storage.lastTransaction(), None)]
        else:
            self.invq[name] = list(
                lastInvalidations(self.invq_bound)
                )
            self.invq[name].reverse()


938 939 940 941 942
    def _setup_auth(self, protocol):
        # Can't be done in global scope, because of cyclic references
        from ZEO.auth import get_module

        name = self.__class__.__name__
943

944 945 946 947
        module = get_module(protocol)
        if not module:
            log("%s: no such an auth protocol: %s" % (name, protocol))
            return
948

949
        storage_class, client, db_class = module
950

951 952 953 954 955 956 957 958
        if not storage_class or not issubclass(storage_class, ZEOStorage):
            log(("%s: %s isn't a valid protocol, must have a StorageClass" %
                 (name, protocol)))
            self.auth_protocol = None
            return
        self.ZEOStorageClass = storage_class

        log("%s: using auth protocol: %s" % (name, protocol))
959

960 961 962 963 964
        # We create a Database instance here for use with the authenticator
        # modules. Having one instance allows it to be shared between multiple
        # storages, avoiding the need to bloat each with a new authenticator
        # Database that would contain the same info, and also avoiding any
        # possibly synchronization issues between them.
965
        self.database = db_class(self.auth_database)
966 967 968 969 970
        if self.database.realm != self.auth_realm:
            raise ValueError("password database realm %r "
                             "does not match storage realm %r"
                             % (self.database.realm, self.auth_realm))

971

972 973 974 975 976 977 978
    def new_connection(self, sock, addr):
        """Internal: factory to create a new connection.

        This is called by the Dispatcher class in ZEO.zrpc.server
        whenever accept() returns a socket for a new incoming
        connection.
        """
979 980 981 982 983 984
        if self.auth_protocol and self.database:
            zstorage = self.ZEOStorageClass(self, self.read_only,
                                            auth_realm=self.auth_realm)
            zstorage.set_database(self.database)
        else:
            zstorage = self.ZEOStorageClass(self, self.read_only)
985

986
        c = self.ManagedServerConnectionClass(sock, addr, zstorage, self)
987
        log("new connection %s: %s" % (addr, repr(c)))
988 989 990 991 992 993 994 995 996 997 998 999
        return c

    def register_connection(self, storage_id, conn):
        """Internal: register a connection with a particular storage.

        This is called by ZEOStorage.register().

        The dictionary self.connections maps each storage name to a
        list of current connections for that storage; this information
        is needed to handle invalidation.  This function updates this
        dictionary.

1000
        Returns the timeout and stats objects for the appropriate storage.
1001
        """
Jim Fulton's avatar
Jim Fulton committed
1002
        self.connections[storage_id].append(conn)
1003
        return self.stats[storage_id]
1004

1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
    def _invalidateCache(self, storage_id):
        """We need to invalidate any caches we have.

        This basically means telling our clients to
        invalidate/revalidate their caches. We do this by closing them
        and making them reconnect.
        """

        # This method can be called from foreign threads.  We have to
        # worry about interaction with the main thread.

        # 1. We modify self.invq which is read by get_invalidations
        #    below. This is why get_invalidations makes a copy of
        #    self.invq.

        # 2. We access connections.  There are two dangers:
        #
        # a. We miss a new connection.  This is not a problem because
        #    if a client connects after we get the list of connections,
        #    then it will have to read the invalidation queue, which
        #    has already been reset.
        #
        # b. A connection is closes while we are iterating.  This
        #    doesn't matter, bacause we can call should_close on a closed
        #    connection.

        # Rebuild invq
        self._setup_invq(storage_id, self.storages[storage_id])

        # Make a copy since we are going to be mutating the
        # connections indirectoy by closing them.  We don't care about
        # later transactions since they will have to validate their
        # caches anyway.
Jim Fulton's avatar
Jim Fulton committed
1038
        for p in self.connections[storage_id][:]:
1039 1040
            try:
                p.connection.should_close()
1041
                p.connection.trigger.pull_trigger()
1042 1043
            except ZEO.zrpc.error.DisconnectedError:
                pass
1044

1045

1046 1047 1048 1049 1050
    def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
        """Internal: broadcast info and invalidations to clients.

        This is called from several ZEOStorage methods.

1051
        invalidated is a sequence of oids.
Jim Fulton's avatar
Jim Fulton committed
1052

1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
        This can do three different things:

        - If the invalidated argument is non-empty, it broadcasts
          invalidateTransaction() messages to all clients of the given
          storage except the current client (the conn argument).

        - If the invalidated argument is empty and the info argument
          is a non-empty dictionary, it broadcasts info() messages to
          all clients of the given storage, including the current
          client.

        - If both the invalidated argument and the info argument are
          non-empty, it broadcasts invalidateTransaction() messages to all
          clients except the current, and sends an info() message to
          the current client.

        """
1070

1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
        # This method can be called from foreign threads.  We have to
        # worry about interaction with the main thread.

        # 1. We modify self.invq which is read by get_invalidations
        #    below. This is why get_invalidations makes a copy of
        #    self.invq.

        # 2. We access connections.  There are two dangers:
        #
        # a. We miss a new connection.  This is not a problem because
        #    we are called while the storage lock is held.  A new
        #    connection that tries to read data won't read committed
        #    data without first recieving an invalidation.  Also, if a
        #    client connects after getting the list of connections,
        #    then it will have to read the invalidation queue, which
        #    has been updated to reflect the invalidations.
        #
        # b. A connection is closes while we are iterating. We'll need
        #    to cactch and ignore Disconnected errors.
1090

1091

1092
        if invalidated:
1093 1094 1095 1096 1097
            invq = self.invq[storage_id]
            if len(invq) >= self.invq_bound:
                invq.pop()
            invq.insert(0, (tid, invalidated))

Jim Fulton's avatar
Jim Fulton committed
1098
        for p in self.connections[storage_id]:
Jim Fulton's avatar
Jim Fulton committed
1099 1100
            try:
                if invalidated and p is not conn:
1101
                    p.client.invalidateTransaction(tid, invalidated)
Jim Fulton's avatar
Jim Fulton committed
1102 1103 1104 1105
                elif info is not None:
                    p.client.info(info)
            except ZEO.zrpc.error.DisconnectedError:
                pass
1106

1107
    def get_invalidations(self, storage_id, tid):
1108 1109
        """Return a tid and list of all objects invalidation since tid.

Jeremy Hylton's avatar
Jeremy Hylton committed
1110
        The tid is the most recent transaction id seen by the client.
1111 1112 1113 1114 1115 1116

        Returns None if it is unable to provide a complete list
        of invalidations for tid.  In this case, client should
        do full cache verification.
        """

1117
        # We make a copy of invq because it might be modified by a
1118
        # foreign (other than main thread) calling invalidate above.
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
        invq = self.invq[storage_id][:]

        oids = set()
        latest_tid = None
        if invq and invq[-1][0] <= tid:
            # We have needed data in the queue
            for _tid, L in invq:
                if _tid <= tid:
                    break
                oids.update(L)
            latest_tid = invq[0][0]
        elif (self.invalidation_age and
              (self.invalidation_age >
               (time.time()-ZODB.TimeStamp.TimeStamp(tid).timeTime())
               )
              ):
            for t in self.storages[storage_id].iterator(p64(u64(tid)+1)):
                for r in t:
                    oids.add(r.oid)
                latest_tid = t.tid
        elif not invq:
1140
            log("invq empty")
1141 1142
        else:
            log("tid to old for invq %s < %s" % (u64(tid), u64(invq[-1][0])))
1143

1144
        return latest_tid, list(oids)
1145 1146 1147 1148 1149 1150 1151

    def close_server(self):
        """Close the dispatcher so that there are no new connections.

        This is only called from the test suite, AFAICT.
        """
        self.dispatcher.close()
1152 1153
        if self.monitor is not None:
            self.monitor.close()
1154 1155 1156 1157 1158 1159 1160 1161
        # Force the asyncore mainloop to exit by hackery, i.e. close
        # every socket in the map.  loop() will return when the map is
        # empty.
        for s in asyncore.socket_map.values():
            try:
                s.close()
            except:
                pass
1162
        asyncore.socket_map.clear()
1163 1164
        for storage in self.storages.values():
            storage.close()
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174

    def close_conn(self, conn):
        """Internal: remove the given connection from self.connections.

        This is the inverse of register_connection().
        """
        for cl in self.connections.values():
            if conn.obj in cl:
                cl.remove(conn.obj)

Jim Fulton's avatar
Jim Fulton committed
1175
    def lock_storage(self, zeostore, delay):
1176
        storage_id = zeostore.storage_id
Jim Fulton's avatar
Jim Fulton committed
1177
        waiting = self._waiting[storage_id]
1178
        with self._lock:
Jim Fulton's avatar
Jim Fulton committed
1179

1180
            if storage_id in self._commit_locks:
Jim Fulton's avatar
Jim Fulton committed
1181 1182
                # The lock is held by another zeostore

1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
                locked = self._commit_locks[storage_id]

                assert locked is not zeostore, (storage_id, delay)

                if locked.connection is None:
                    locked.log("Still locked after disconnected. Unlocking.",
                               logging.CRITICAL)
                    if locked.transaction:
                        locked.storage.tpc_abort(locked.transaction)
                    del self._commit_locks[storage_id]
                    # yuck: have to manipulate lock to appease with :(
                    self._lock.release()
                    try:
                        return self.lock_storage(zeostore, delay)
                    finally:
                        self._lock.acquire()
Jim Fulton's avatar
Jim Fulton committed
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223

                if delay is None:
                    # New request, queue it
                    assert not [i for i in waiting if i[0] is zeostore
                                ], "already waiting"
                    delay = Delay()
                    waiting.append((zeostore, delay))
                    zeostore.log("(%r) queue lock: transactions waiting: %s"
                                 % (storage_id, len(waiting)),
                                 _level_for_waiting(waiting)
                                 )

                return False, delay
            else:
                self._commit_locks[storage_id] = zeostore
                self.timeouts[storage_id].begin(zeostore)
                self.stats[storage_id].lock_time = time.time()
                if delay is not None:
                    # we were waiting, stop
                    waiting[:] = [i for i in waiting if i[0] is not zeostore]
                zeostore.log("(%r) lock: transactions waiting: %s"
                             % (storage_id, len(waiting)),
                             _level_for_waiting(waiting)
                             )
                return True, delay
1224 1225 1226

    def unlock_storage(self, zeostore):
        storage_id = zeostore.storage_id
Jim Fulton's avatar
Jim Fulton committed
1227
        waiting = self._waiting[storage_id]
1228 1229 1230 1231 1232
        with self._lock:
            assert self._commit_locks[storage_id] is zeostore
            del self._commit_locks[storage_id]
            self.timeouts[storage_id].end(zeostore)
            self.stats[storage_id].lock_time = None
Jim Fulton's avatar
Jim Fulton committed
1233
            callbacks = waiting[:]
1234 1235

        if callbacks:
Jim Fulton's avatar
Jim Fulton committed
1236 1237
            assert not [i for i in waiting if i[0] is zeostore
                        ], "waiting while unlocking"
1238
            zeostore.log("(%r) unlock: transactions waiting: %s"
Jim Fulton's avatar
Jim Fulton committed
1239 1240 1241
                         % (storage_id, len(callbacks)),
                         _level_for_waiting(callbacks)
                         )
1242 1243 1244 1245 1246 1247 1248 1249 1250

            for zeostore, delay in callbacks:
                try:
                    zeostore._unlock_callback(delay)
                except (SystemExit, KeyboardInterrupt):
                    raise
                except Exception:
                    logger.exception("Calling unlock callback")

Jim Fulton's avatar
Jim Fulton committed
1251 1252

    def stop_waiting(self, zeostore):
1253
        storage_id = zeostore.storage_id
Jim Fulton's avatar
Jim Fulton committed
1254
        waiting = self._waiting[storage_id]
1255
        with self._lock:
Jim Fulton's avatar
Jim Fulton committed
1256 1257 1258 1259
            new_waiting = [i for i in waiting if i[0] is not zeostore]
            if len(new_waiting) == len(waiting):
                return
            waiting[:] = new_waiting
1260

Jim Fulton's avatar
Jim Fulton committed
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
        zeostore.log("(%r) dequeue lock: transactions waiting: %s"
                     % (storage_id, len(waiting)),
                     _level_for_waiting(waiting)
                     )

    def already_waiting(self, zeostore):
        storage_id = zeostore.storage_id
        waiting = self._waiting[storage_id]
        with self._lock:
            return bool([i for i in waiting if i[0] is zeostore])

    def server_status(self, zeostore):
        storage_id = zeostore.storage_id
        status = self.stats[storage_id].__dict__.copy()
        status['connections'] = len(status['connections'])
        status['waiting'] = len(self._waiting[storage_id])
1277
        status['timeout-thread-is-alive'] = self.timeouts[storage_id].isAlive()
Jim Fulton's avatar
Jim Fulton committed
1278 1279 1280 1281 1282 1283 1284 1285 1286
        return status

def _level_for_waiting(waiting):
    if len(waiting) > 9:
        return logging.CRITICAL
    if len(waiting) > 3:
        return logging.WARNING
    else:
        return logging.DEBUG
1287

1288 1289 1290 1291 1292 1293 1294 1295
class StubTimeoutThread:

    def begin(self, client):
        pass

    def end(self, client):
        pass

1296 1297 1298
    isAlive = lambda self: 'stub'


1299 1300 1301
class TimeoutThread(threading.Thread):
    """Monitors transaction progress and generates timeouts."""

1302 1303 1304
    # There is one TimeoutThread per storage, because there's one
    # transaction lock per storage.

1305 1306 1307 1308 1309 1310
    def __init__(self, timeout):
        threading.Thread.__init__(self)
        self.setDaemon(1)
        self._timeout = timeout
        self._client = None
        self._deadline = None
1311
        self._cond = threading.Condition() # Protects _client and _deadline
1312 1313

    def begin(self, client):
1314 1315
        # Called from the restart code the "main" thread, whenever the
        # storage lock is being acquired.  (Serialized by asyncore.)
1316
        with self._cond:
1317
            assert self._client is None
1318 1319
            self._client = client
            self._deadline = time.time() + self._timeout
1320
            self._cond.notify()
1321 1322

    def end(self, client):
1323 1324
        # Called from the "main" thread whenever the storage lock is
        # being released.  (Serialized by asyncore.)
1325
        with self._cond:
1326
            assert self._client is not None
1327
            assert self._client is client
1328 1329 1330 1331
            self._client = None
            self._deadline = None

    def run(self):
1332 1333
        # Code running in the thread.
        while 1:
1334
            with self._cond:
1335
                while self._deadline is None:
1336 1337
                    self._cond.wait()
                howlong = self._deadline - time.time()
1338 1339 1340
                if howlong <= 0:
                    # Prevent reporting timeout more than once
                    self._deadline = None
1341
                client = self._client # For the howlong <= 0 branch below
1342

1343
            if howlong <= 0:
1344
                client.log("Transaction timeout after %s seconds" %
1345 1346 1347 1348 1349 1350 1351
                           self._timeout, logging.ERROR)
                try:
                    client.connection.call_from_thread(client.connection.close)
                except:
                    client.log("Timeout failure", logging.CRITICAL,
                               exc_info=sys.exc_info())
                    self.end(client)
1352 1353 1354
            else:
                time.sleep(howlong)

1355

1356 1357 1358 1359 1360
def run_in_thread(method, *args):
    t = SlowMethodThread(method, args)
    t.start()
    return t.delay

1361

1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383
class SlowMethodThread(threading.Thread):
    """Thread to run potentially slow storage methods.

    Clients can use the delay attribute to access the MTDelay object
    used to send a zrpc response at the right time.
    """

    # Some storage methods can take a long time to complete.  If we
    # run these methods via a standard asyncore read handler, they
    # will block all other server activity until they complete.  To
    # avoid blocking, we spawn a separate thread, return an MTDelay()
    # object, and have the thread reply() when it finishes.

    def __init__(self, method, args):
        threading.Thread.__init__(self)
        self._method = method
        self._args = args
        self.delay = MTDelay()

    def run(self):
        try:
            result = self._method(*self._args)
1384 1385
        except (SystemExit, KeyboardInterrupt):
            raise
1386 1387 1388 1389
        except Exception:
            self.delay.error(sys.exc_info())
        else:
            self.delay.reply(result)
1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406


class ClientStub:

    def __init__(self, rpc):
        self.rpc = rpc

    def beginVerify(self):
        self.rpc.callAsync('beginVerify')

    def invalidateVerify(self, args):
        self.rpc.callAsync('invalidateVerify', args)

    def endVerify(self):
        self.rpc.callAsync('endVerify')

    def invalidateTransaction(self, tid, args):
Jim Fulton's avatar
Jim Fulton committed
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
        # Note that this method is *always* called from a different
        # thread than self.rpc's async thread. It is the only method
        # for which this is true and requires special consideration!

        # callAsyncNoSend is important here because:
        # - callAsyncNoPoll isn't appropriate because
        #   the network thread may not wake up for a long time,
        #   delaying invalidations for too long. (This is demonstrateed
        #   by a test failure.)
        # - callAsync isn't appropriate because (on the server) it tries
        #   to write to the socket.  If self.rpc's network thread also
        #   tries to write at the ame time, we can run into problems
        #   because handle_write isn't thread safe.
        self.rpc.callAsyncNoSend('invalidateTransaction', tid, args)
1421 1422

    def serialnos(self, arg):
1423
        self.rpc.callAsyncNoPoll('serialnos', arg)
1424 1425

    def info(self, arg):
1426
        self.rpc.callAsyncNoPoll('info', arg)
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445

    def storeBlob(self, oid, serial, blobfilename):

        def store():
            yield ('receiveBlobStart', (oid, serial))
            f = open(blobfilename, 'rb')
            while 1:
                chunk = f.read(59000)
                if not chunk:
                    break
                yield ('receiveBlobChunk', (oid, serial, chunk, ))
            f.close()
            yield ('receiveBlobStop', (oid, serial))

        self.rpc.callAsyncIterator(store())

class ClientStub308(ClientStub):

    def invalidateTransaction(self, tid, args):
1446 1447
        ClientStub.invalidateTransaction(
            self, tid, [(arg, '') for arg in args])
1448 1449

    def invalidateVerify(self, oid):
1450
        ClientStub.invalidateVerify(self, (oid, ''))
1451 1452 1453 1454 1455 1456

class ZEOStorage308Adapter:

    def __init__(self, storage):
        self.storage = storage

Jim Fulton's avatar
Jim Fulton committed
1457 1458 1459
    def __eq__(self, other):
        return self is other or self.storage is other

1460 1461 1462 1463 1464 1465
    def getSerial(self, oid):
        return self.storage.loadEx(oid)[1] # Z200

    def history(self, oid, version, size=1):
        if version:
            raise ValueError("Versions aren't supported.")
Jim Fulton's avatar
Jim Fulton committed
1466
        return self.storage.history(oid, size=size)
1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524

    def getInvalidations(self, tid):
        result = self.storage.getInvalidations(tid)
        if result is not None:
            result = result[0], [(oid, '') for oid in result[1]]
        return result

    def verify(self, oid, version, tid):
        if version:
            raise StorageServerError("Versions aren't supported.")
        return self.storage.verify(oid, tid)

    def loadEx(self, oid, version=''):
        if version:
            raise StorageServerError("Versions aren't supported.")
        data, serial = self.storage.loadEx(oid)
        return data, serial, ''

    def storea(self, oid, serial, data, version, id):
        if version:
            raise StorageServerError("Versions aren't supported.")
        self.storage.storea(oid, serial, data, id)

    def storeBlobEnd(self, oid, serial, data, version, id):
        if version:
            raise StorageServerError("Versions aren't supported.")
        self.storage.storeBlobEnd(oid, serial, data, id)

    def storeBlobShared(self, oid, serial, data, filename, version, id):
        if version:
            raise StorageServerError("Versions aren't supported.")
        self.storage.storeBlobShared(oid, serial, data, filename, id)

    def getInfo(self):
        result = self.storage.getInfo()
        result['supportsVersions'] = False
        return result

    def zeoVerify(self, oid, s, sv=None):
        if sv:
            raise StorageServerError("Versions aren't supported.")
        self.storage.zeoVerify(oid, s)

    def modifiedInVersion(self, oid):
        return ''

    def versions(self):
        return ()

    def versionEmpty(self, version):
        return True

    def commitVersion(self, *a, **k):
        raise NotImplementedError

    abortVersion = commitVersion

    def zeoLoad(self, oid):             # Z200
Jim Fulton's avatar
Jim Fulton committed
1525
        p, s = self.storage.loadEx(oid)
1526 1527 1528 1529 1530
        return p, s, '', None, None

    def __getattr__(self, name):
        return getattr(self.storage, name)

Jim Fulton's avatar
Jim Fulton committed
1531 1532 1533 1534 1535 1536
def _addr_label(addr):
    if isinstance(addr, type("")):
        return addr
    else:
        host, port = addr
        return str(host) + ":" + str(port)
1537