app.py 51.5 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
23
import time
24
import os
25

26
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
27
from ZODB.POSException import ReadConflictError
28
from ZODB.ConflictResolution import ResolvedSerial
29
from persistent.TimeStamp import TimeStamp
30

31
import neo
32
from neo.protocol import NodeTypes, Packets, INVALID_PARTITION, ZERO_TID
33
from neo.event import EventManager
34
from neo.util import makeChecksum as real_makeChecksum, dump
35
from neo.locking import Lock
36
from neo.connection import MTClientConnection, OnTimeout, ConnectionClosed
37
from neo.node import NodeManager
38
from neo.connector import getConnectorHandler
39
from neo.client.exception import NEOStorageError, NEOStorageCreationUndoneError
40
from neo.client.exception import NEOStorageNotFoundError
41 42
from neo.exception import NeoException
from neo.client.handlers import storage, master
43
from neo.dispatcher import Dispatcher, ForgottenPacket
44
from neo.client.poll import ThreadedPoll, psThreadedPoll
45
from neo.client.iterator import Iterator
46
from neo.client.mq import MQ, MQIndex
47
from neo.client.pool import ConnectionPool
48
from neo.util import u64, parseMasterList
49
from neo.profiling import profiler_decorator, PROFILING_ENABLED
50
from neo.live_debug import register as registerLiveDebugger
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
66

67 68
class ThreadContext(object):

69
    def __init__(self):
70
        super(ThreadContext, self).__setattr__('_threads_dict', {})
71

72
    def __getThreadData(self):
73
        thread_id = get_ident()
74
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
75
            result = self._threads_dict[thread_id]
76
        except KeyError:
77 78
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
79 80 81 82 83 84 85 86
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
87

88 89 90 91
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

92 93 94
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
95 96 97 98 99
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
100
        self._threads_dict[thread_id] = {
101 102 103
            'tid': None,
            'txn': None,
            'data_dict': {},
104
            'data_list': [],
105 106 107
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
108
            'resolved_conflict_serial_dict': {},
109
            'txn_voted': False,
110
            'queue': queue,
111 112 113 114 115
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
116
            'undo_object_tid_dict': {},
117
            'involved_nodes': set(),
118
            'last_transaction': None,
119 120
        }

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
class RevisionIndex(MQIndex):
    """
    This cache index allows accessing a specifig revision of a cached object.
    It requires cache key to be a 2-tuple, composed of oid and revision.

    Note: it is expected that rather few revisions are held in cache, with few
    lookups for old revisions, so they are held in a simple sorted list
    Note2: all methods here must be called with cache lock acquired.
    """
    def __init__(self):
        # key: oid
        # value: tid list, from highest to lowest
        self._oid_dict = {}
        # key: oid
        # value: tid list, from lowest to highest
        self._invalidated = {}

    def clear(self):
        self._oid_dict.clear()
        self._invalidated.clear()

    def remove(self, key):
        oid_dict = self._oid_dict
        oid, tid = key
        tid_list = oid_dict[oid]
        tid_list.remove(tid)
        if not tid_list:
            # No more serial known for this object, drop entirely
            del oid_dict[oid]
            self._invalidated.pop(oid, None)

    def add(self, key):
        oid_dict = self._oid_dict
        oid, tid = key
        try:
            serial_list = oid_dict[oid]
        except KeyError:
            serial_list = oid_dict[oid] = []
        else:
            assert tid not in serial_list
        if not(serial_list) or tid > serial_list[0]:
            serial_list.insert(0, tid)
        else:
            serial_list.insert(0, tid)
            serial_list.sort(reverse=True)
        invalidated = self._invalidated
        try:
            tid_list = invalidated[oid]
        except KeyError:
            pass
        else:
            try:
                tid_list.remove(tid)
            except ValueError:
                pass
            else:
                if not tid_list:
                    del invalidated[oid]

    def invalidate(self, oid_list, tid):
        """
        Mark object invalidated by given transaction.
        Must be called with increasing TID values (which is standard for
        ZODB).
        """
        invalidated = self._invalidated
        oid_dict = self._oid_dict
        for oid in (x for x in oid_list if x in oid_dict):
            try:
                tid_list = invalidated[oid]
            except KeyError:
                tid_list = invalidated[oid] = []
            assert not tid_list or tid > tid_list[-1], (dump(oid), dump(tid),
                dump(tid_list[-1]))
            tid_list.append(tid)

    def getSerialBefore(self, oid, tid):
        """
        Get the first tid in cache which value is lower that given tid.
        """
        # WARNING: return-intensive to save on indentation
        oid_list = self._oid_dict.get(oid)
        if oid_list is None:
            # Unknown oid
            return None
        for result in oid_list:
            if result < tid:
                # Candidate found
                break
        else:
            # No candidate in cache.
            return None
        # Check if there is a chance that an intermediate revision would
        # exist, while missing from cache.
        try:
            inv_tid_list = self._invalidated[oid]
        except KeyError:
            return result
        # Remember: inv_tid_list is sorted in ascending order.
        for inv_tid in inv_tid_list:
            if tid < inv_tid:
                # We don't care about invalidations past requested TID.
                break
            elif result < inv_tid < tid:
                # An invalidation was received between candidate revision,
                # and before requested TID: there is a matching revision we
                # don't know of, so we cannot answer.
                return None
        return result

    def getLatestSerial(self, oid):
        """
        Get the latest tid for given object.
        """
        result = self._oid_dict.get(oid)
        if result is not None:
            result = result[0]
            try:
                tid_list = self._invalidated[oid]
            except KeyError:
                pass
            else:
                if result < tid_list[-1]:
                    # An invalidation happened from a transaction later than our
                    # most recent view of this object, so we cannot answer.
                    result = None
        return result

    def getSerialList(self, oid):
        """
        Get the list of all serials cache knows about for given object.
        """
        return self._oid_dict.get(oid, [])[:]
254

Aurel's avatar
Aurel committed
255
class Application(object):
256 257
    """The client node application."""

258
    def __init__(self, master_nodes, name, connector=None, compress=True, **kw):
259
        # Start polling thread
260
        self.em = EventManager()
261 262
        self.poll_thread = ThreadedPoll(self.em, name=name)
        psThreadedPoll()
263
        # Internal Attributes common to all thread
264
        self._db = None
Aurel's avatar
Aurel committed
265
        self.name = name
266
        self.connector_handler = getConnectorHandler(connector)
267
        self.dispatcher = Dispatcher(self.poll_thread)
268
        self.nm = NodeManager()
269
        self.cp = ConnectionPool(self)
270
        self.pt = None
271
        self.master_conn = None
272
        self.primary_master_node = None
273
        self.trying_master_node = None
274 275

        # load master node list
276
        for address in parseMasterList(master_nodes):
277
            self.nm.createMaster(address=address)
278

279
        # no self-assigned UUID, primary master will supply us one
280
        self.uuid = None
281
        self.mq_cache = MQ()
282 283
        self.cache_revision_index = RevisionIndex()
        self.mq_cache.addIndex(self.cache_revision_index)
284
        self.new_oid_list = []
285
        self.last_oid = '\0' * 8
286
        self.storage_event_handler = storage.StorageEventHandler(self)
287
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
288
        self.storage_handler = storage.StorageAnswersHandler(self)
289 290
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
291
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
292
        # Internal attribute distinct between thread
293
        self.local_var = ThreadContext()
294
        # Lock definition :
295
        # _load_lock is used to make loading and storing atomic
296
        lock = Lock()
297 298
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
299 300
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
301 302
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
303
        self._oid_lock_release = lock.release
304
        lock = Lock()
305
        # _cache_lock is used for the client cache
306 307
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
308
        lock = Lock()
309 310
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
311 312
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
313 314 315 316
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
317
        self.compress = compress
318
        registerLiveDebugger(on_log=self.log)
319

320 321 322 323 324 325
    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()

326
    @profiler_decorator
327 328 329 330 331 332 333 334 335 336 337
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
338 339
            # Guess the handler to use based on the type of node on the
            # connection
340 341 342 343 344 345 346 347 348 349
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
350 351 352 353 354
        conn.lock()
        try:
            handler.dispatch(conn, packet)
        finally:
            conn.unlock()
355

356
    @profiler_decorator
357 358 359 360 361 362 363
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
364 365 366
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
367
        _handlePacket = self._handlePacket
368
        while pending(queue):
369 370 371
            try:
                conn, packet = get(block)
            except Empty:
372
                break
373 374
            if packet is None or isinstance(packet, ForgottenPacket):
                # connection was closed or some packet was forgotten
375
                continue
376 377 378 379 380 381
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

382
    @profiler_decorator
383 384 385 386 387 388
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
389
            is_forgotten = isinstance(packet, ForgottenPacket)
390 391 392 393 394
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
395 396 397
                    if is_forgotten:
                        raise ValueError, 'ForgottenPacket for an ' \
                            'explicitely expected packet.'
398
                    _handlePacket(conn, packet, handler=handler)
399
                    break
400
            if not is_forgotten and packet is not None:
401
                _handlePacket(conn, packet)
402

403
    @profiler_decorator
404
    def _askStorage(self, conn, packet):
405
        """ Send a request to a storage node and process it's answer """
406
        msg_id = conn.ask(packet, queue=self.local_var.queue)
407 408
        self._waitMessage(conn, msg_id, self.storage_handler)

409
    @profiler_decorator
410
    def _askPrimary(self, packet):
411
        """ Send a request to the primary master and process it's answer """
412
        conn = self._getMasterConnection()
413
        msg_id = conn.ask(packet, queue=self.local_var.queue)
414 415
        self._waitMessage(conn, msg_id, self.primary_handler)

416
    @profiler_decorator
417 418
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
419
        # acquire the lock to allow only one thread to connect to the primary
420 421 422 423
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
424
                self.new_oid_list = []
425 426 427 428 429
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
430

431
    def getPartitionTable(self):
432
        """ Return the partition table manager, reconnect the PMN if needed """
433 434 435 436 437
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

438
    @profiler_decorator
439
    def _connectToPrimaryNode(self):
440 441 442
        """
            Lookup for the current primary master node
        """
443
        neo.logging.debug('connecting to primary master...')
444 445
        ready = False
        nm = self.nm
446
        queue = self.local_var.queue
447 448 449 450 451 452 453 454
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
455
                    self.primary_master_node = None
456 457
                else:
                    # Otherwise, check one by one.
458
                    master_list = nm.getMasterList()
459 460 461
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
462
                        time.sleep(1)
463 464 465 466
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
467
                conn = MTClientConnection(self.em,
468
                        self.notifications_handler,
469
                        addr=self.trying_master_node.getAddress(),
470
                        connector=self.connector_handler(),
471
                        dispatcher=self.dispatcher)
472
                # Query for primary master node
473 474
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
475
                    neo.logging.error('Connection to master node %s failed',
476 477
                                  self.trying_master_node)
                    continue
478
                try:
479
                    msg_id = conn.ask(Packets.AskPrimary(), queue=queue)
480
                    self._waitMessage(conn, msg_id,
481
                            handler=self.primary_bootstrap_handler)
482 483
                except ConnectionClosed:
                    continue
484
                # If we reached the primary master node, mark as connected
485 486
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
487
            neo.logging.info('Connected to %s' % (self.primary_master_node, ))
488 489 490
            try:
                ready = self.identifyToPrimaryNode(conn)
            except ConnectionClosed:
491 492
                neo.logging.error('Connection to %s lost',
                    self.trying_master_node)
493 494
                self.primary_master_node = None
                continue
495
        neo.logging.info("Connected and ready")
496
        return conn
497

498 499 500 501 502 503
    def identifyToPrimaryNode(self, conn):
        """
            Request identification and required informations to be operational.
            Might raise ConnectionClosed so that the new primary can be
            looked-up again.
        """
504
        neo.logging.info('Initializing from master')
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525
        queue = self.local_var.queue
        # Identify to primary master and request initial data
        while conn.getUUID() is None:
            p = Packets.RequestIdentification(NodeTypes.CLIENT, self.uuid,
                    None, self.name)
            self._waitMessage(conn, conn.ask(p, queue=queue),
                    handler=self.primary_bootstrap_handler)
            if conn.getUUID() is None:
                # Node identification was refused by master, it is considered
                # as the primary as long as we are connected to it.
                time.sleep(1)
        if self.uuid is not None:
            msg_id = conn.ask(Packets.AskNodeInformation(), queue=queue)
            self._waitMessage(conn, msg_id,
                    handler=self.primary_bootstrap_handler)
            msg_id = conn.ask(Packets.AskPartitionTable(), queue=queue)
            self._waitMessage(conn, msg_id,
                    handler=self.primary_bootstrap_handler)
        return self.uuid is not None and self.pt is not None \
                             and self.pt.operational()

526 527 528
    def registerDB(self, db, limit):
        self._db = db

529 530 531
    def getDB(self):
        return self._db

532
    @profiler_decorator
533 534 535 536 537
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
538 539 540 541
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
542
                self._askPrimary(Packets.AskNewOIDs(100))
543 544
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
545
            self.last_oid = self.new_oid_list.pop(0)
546
            return self.last_oid
547 548 549
        finally:
            self._oid_lock_release()

550 551 552
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
553

554
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
555
    def load(self, snapshot_tid, oid, serial=None, tid=None):
Vincent Pelletier's avatar
Vincent Pelletier committed
556 557 558
        """
        Internal method which manage load, loadSerial and loadBefore.
        OID and TID (serial) parameters are expected packed.
Vincent Pelletier's avatar
Vincent Pelletier committed
559 560 561
        snapshot_tid
            First TID not visible to current transaction.
            Set to None for no limit.
Vincent Pelletier's avatar
Vincent Pelletier committed
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
        oid
            OID of object to get.
        serial
            If given, the exact serial at which OID is desired.
            tid should be None.
        tid
            If given, the excluded upper bound serial at which OID is desired.
            serial should be None.

        Return value: (3-tuple)
        - Object data (None if object creation was undone).
        - Serial of given data.
        - Next serial at which object exists, or None. Only set when tid
          parameter is not None.

        Exceptions:
            NEOStorageError
                technical problem
            NEOStorageNotFoundError
                object exists but no data satisfies given parameters
Vincent Pelletier's avatar
Vincent Pelletier committed
582
            NEOStorageDoesNotExistError
Vincent Pelletier's avatar
Vincent Pelletier committed
583
                object doesn't exist
584 585
            NEOStorageCreationUndoneError
                object existed, but its creation was undone
Vincent Pelletier's avatar
Vincent Pelletier committed
586 587 588
        """
        # TODO:
        # - rename parameters (here and in handlers & packet definitions)
Vincent Pelletier's avatar
Vincent Pelletier committed
589 590 591 592 593 594 595 596 597 598
        if snapshot_tid is not None:
            if serial is None:
                if tid is None:
                    tid = snapshot_tid
                else:
                    tid = min(tid, snapshot_tid)
            # XXX: we must not clamp serial with snapshot_tid, as loadSerial is
            # used during conflict resolution to load object's current version,
            # which is not visible to us normaly (it was committed after our
            # snapshot was taken).
599

600 601
        self._load_lock_acquire()
        try:
602 603 604 605 606 607
            try:
                result = self._loadFromCache(oid, serial, tid)
            except KeyError:
                pass
            else:
                return result
608 609
            data, start_serial, end_serial = self._loadFromStorage(oid, serial,
                tid)
610 611 612 613 614
            self._cache_lock_acquire()
            try:
                self.mq_cache[(oid, start_serial)] = data, end_serial
            finally:
                self._cache_lock_release()
615 616 617 618 619
            if data == '':
                raise NEOStorageCreationUndoneError(dump(oid))
            return data, start_serial, end_serial
        finally:
            self._load_lock_release()
620 621

    @profiler_decorator
622
    def _loadFromStorage(self, oid, at_tid, before_tid):
623
        self.local_var.asked_object = 0
624
        packet = Packets.AskObject(oid, at_tid, before_tid)
625 626 627 628 629
        for node, conn in self.cp.iterateForObject(oid, readable=True):
            try:
                self._askStorage(conn, packet)
            except ConnectionClosed:
                continue
630

631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646
            # Check data
            noid, tid, next_tid, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                neo.logging.error('got wrong oid %s instead of %s from %s',
                    noid, dump(oid), conn)
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                neo.logging.error('wrong checksum from %s for oid %s',
                              conn, dump(oid))
                self.local_var.asked_object = -1
                continue
            break
Aurel's avatar
Aurel committed
647
        if self.local_var.asked_object == -1:
648
            raise NEOStorageError('inconsistent data')
649

650
        # Uncompress data
Aurel's avatar
Aurel committed
651
        if compression:
652
            data = decompress(data)
653
        return data, tid, next_tid
654

655
    @profiler_decorator
656 657 658 659 660
    def _loadFromCache(self, oid, at_tid, before_tid):
        """
        Load from local cache, raising KeyError if not found.
        """
        self._cache_lock_acquire()
661
        try:
662 663 664 665 666 667 668 669 670 671 672 673
            if at_tid is not None:
                tid = at_tid
            elif before_tid is not None:
                tid = self.cache_revision_index.getSerialBefore(oid,
                    before_tid)
            else:
                tid = self.cache_revision_index.getLatestSerial(oid)
            if tid is None:
                raise KeyError
            # Raises KeyError on miss
            data, next_tid = self.mq_cache[(oid, tid)]
            return (data, tid, next_tid)
674
        finally:
675
            self._cache_lock_release()
Aurel's avatar
Aurel committed
676

677
    @profiler_decorator
678 679 680
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
681
        if self.local_var.txn is transaction:
682
            # We already begin the same transaction
683
            raise StorageTransactionError('Duplicate tpc_begin calls')
684 685
        if self.local_var.txn is not None:
            raise NeoException, 'local_var is not clean in tpc_begin'
686
        # use the given TID or request a new one to the master
687 688 689 690
        self._askPrimary(Packets.AskBeginTransaction(tid))
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
        assert tid in (None, self.local_var.tid), (tid, self.local_var.tid)
691
        self.local_var.txn = transaction
692

693
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
694
    def store(self, oid, serial, data, version, transaction):
695
        """Store object."""
696
        if transaction is not self.local_var.txn:
697
            raise StorageTransactionError(self, transaction)
698
        neo.logging.debug('storing oid %s serial %s', dump(oid), dump(serial))
699 700 701
        self._store(oid, serial, data)
        return None

702
    def _store(self, oid, serial, data, data_serial=None, unlock=False):
703 704 705 706
        if data is None:
            # This is some undo: either a no-data object (undoing object
            # creation) or a back-pointer to an earlier revision (going back to
            # an older object revision).
707
            data = compressed_data = ''
708
            compression = 0
709 710
        else:
            assert data_serial is None
711 712
            compression = self.compress
            compressed_data = data
713 714 715 716 717 718 719
            if self.compress:
                compressed_data = compress(data)
                if len(compressed_data) > len(data):
                    compressed_data = data
                    compression = 0
                else:
                    compression = 1
720
        checksum = makeChecksum(compressed_data)
721
        on_timeout = OnTimeout(self.onStoreTimeout, self.local_var.tid, oid)
722
        # Store object in tmp cache
723 724 725 726 727
        local_var = self.local_var
        data_dict = local_var.data_dict
        if oid not in data_dict:
            local_var.data_list.append(oid)
        data_dict[oid] = data
728
        # Store data on each node
729
        self.local_var.object_stored_counter_dict[oid] = {}
730
        self.local_var.object_serial_dict[oid] = serial
731
        queue = self.local_var.queue
732
        add_involved_nodes = self.local_var.involved_nodes.add
733
        packet = Packets.AskStoreObject(oid, serial, compression,
734
            checksum, compressed_data, data_serial, self.local_var.tid, unlock)
735
        for node, conn in self.cp.iterateForObject(oid, writable=True):
736
            try:
737 738
                conn.ask(packet, on_timeout=on_timeout, queue=queue)
                add_involved_nodes(node)
739
            except ConnectionClosed:
740
                continue
741 742
        if not self.local_var.involved_nodes:
            raise NEOStorageError("Store failed")
743

744
        self._waitAnyMessage(False)
745

746
    def onStoreTimeout(self, conn, msg_id, tid, oid):
747 748 749 750
        # NOTE: this method is called from poll thread, don't use
        # local_var !
        # Stop expecting the timed-out store request.
        queue = self.dispatcher.forget(conn, msg_id)
Vincent Pelletier's avatar
Vincent Pelletier committed
751 752
        # Ask the storage if someone locks the object.
        # Shorten timeout to react earlier to an unresponding storage.
753
        conn.ask(Packets.AskHasLock(tid, oid), timeout=5, queue=queue)
Vincent Pelletier's avatar
Vincent Pelletier committed
754 755
        return True

756
    @profiler_decorator
757 758 759 760 761 762 763
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
764 765
        conflict_serial_dict = local_var.conflict_serial_dict.copy()
        local_var.conflict_serial_dict.clear()
766
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
767
        for oid, conflict_serial_set in conflict_serial_dict.iteritems():
768 769 770 771 772
            resolved_serial_set = resolved_conflict_serial_dict.setdefault(
                oid, set())
            conflict_serial = max(conflict_serial_set)
            if resolved_serial_set and conflict_serial <= max(resolved_serial_set):
                # A later serial has already been resolved, skip.
773
                resolved_serial_set.update(conflict_serial_set)
774
                continue
775
            serial = object_serial_dict[oid]
776 777 778
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
            if conflict_serial == ZERO_TID:
                # Storage refused us from taking object lock, to avoid a
                # possible deadlock. TID is actually used for some kind of
                # "locking priority": when a higher value has the lock,
                # this means we stored objects "too late", and we would
                # otherwise cause a deadlock.
                # To recover, we must ask storages to release locks we
                # hold (to let possibly-competing transactions acquire
                # them), and requeue our already-sent store requests.
                # XXX: currently, brute-force is implemented: we send
                # object data again.
                neo.logging.info('Deadlock avoidance triggered on %r:%r',
                    dump(oid), dump(serial))
                for store_oid, store_data in \
                        local_var.data_dict.iteritems():
                    store_serial = object_serial_dict[store_oid]
                    if store_data is None:
                        self.checkCurrentSerialInTransaction(store_oid,
                            store_serial)
                    else:
                        if store_data is '':
                            # Some undo
                            neo.logging.warning('Deadlock avoidance cannot'
                                ' reliably work with undo, this must be '
                                'implemented.')
                            break
                        self._store(store_oid, store_serial, store_data,
                            unlock=True)
                else:
                    resolved = True
            elif data is not None:
810 811 812 813 814 815 816
                new_data = tryToResolveConflict(oid, conflict_serial,
                    serial, data)
                if new_data is not None:
                    neo.logging.info('Conflict resolution succeed for ' \
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
                    # Mark this conflict as resolved
817
                    resolved_serial_set.update(conflict_serial_set)
818 819 820 821
                    # Try to store again
                    self._store(oid, conflict_serial, new_data)
                    append(oid)
                    resolved = True
822
                else:
823 824 825
                    neo.logging.info('Conflict resolution failed for ' \
                        '%r:%r with %r', dump(oid), dump(serial),
                        dump(conflict_serial))
826 827 828
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
829
                local_var.data_list.remove(oid)
830 831 832 833 834 835 836
                if data is None:
                    exc = ReadConflictError(oid=oid, serials=(conflict_serial,
                        serial))
                else:
                    exc = ConflictError(oid=oid, serials=(tid, serial),
                        data=data)
                raise exc
837
        return result
838

839 840 841 842 843 844 845 846 847 848
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

849
    @profiler_decorator
850 851 852 853 854 855 856 857
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
858 859 860 861 862 863 864 865 866 867
        conflict_serial_dict = local_var.conflict_serial_dict
        queue = local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue) or conflict_serial_dict:
            _waitAnyMessage()
            if conflict_serial_dict:
                conflicts = _handleConflicts(tryToResolveConflict)
                if conflicts:
                    update(conflicts)
868

Vincent Pelletier's avatar
Vincent Pelletier committed
869
        # Check for never-stored objects, and update result for all others
870
        for oid, store_dict in \
Vincent Pelletier's avatar
Vincent Pelletier committed
871
            local_var.object_stored_counter_dict.iteritems():
872
            if not store_dict:
873
                neo.logging.error('tpc_store failed')
Vincent Pelletier's avatar
Vincent Pelletier committed
874 875 876
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
877
        return result
Aurel's avatar
Aurel committed
878

879
    @profiler_decorator
880
    def tpc_vote(self, transaction, tryToResolveConflict):
881
        """Store current transaction."""
882 883
        local_var = self.local_var
        if transaction is not local_var.txn:
884
            raise StorageTransactionError(self, transaction)
885 886 887

        result = self.waitStoreResponses(tryToResolveConflict)

888
        tid = local_var.tid
889
        # Store data on each node
890
        txn_stored_counter = 0
891
        packet = Packets.AskStoreTransaction(tid, str(transaction.user),
892
            str(transaction.description), dumps(transaction._extension),
893
            local_var.data_list)
894
        add_involved_nodes = self.local_var.involved_nodes.add
895
        for node, conn in self.cp.iterateForObject(tid, writable=True):
896 897
            neo.logging.debug("voting object %s on %s", dump(tid),
                dump(conn.getUUID()))
898
            try:
899 900
                self._askStorage(conn, packet)
                add_involved_nodes(node)
901
            except ConnectionClosed:
902
                continue
903
            txn_stored_counter += 1
904 905

        # check at least one storage node accepted
906
        if txn_stored_counter == 0:
907
            neo.logging.error('tpc_vote failed')
908
            raise NEOStorageError('tpc_vote failed')
909 910 911 912 913
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
914

915
        local_var.txn_voted = True
916 917
        return result

918
    @profiler_decorator
919 920
    def tpc_abort(self, transaction):
        """Abort current transaction."""
921
        if transaction is not self.local_var.txn:
922
            return
Aurel's avatar
Aurel committed
923

924 925
        tid = self.local_var.tid
        p = Packets.AbortTransaction(tid)
926
        getConnForNode = self.cp.getConnForNode
927
        # cancel transaction one all those nodes
928
        for node in self.local_var.involved_nodes:
929
            conn = getConnForNode(node)
930 931
            if conn is None:
                continue
932 933 934
            try:
                conn.notify(p)
            except:
935
                neo.logging.error('Exception in tpc_abort while notifying ' \
936
                    'storage node %r of abortion, ignoring.', conn, exc_info=1)
937
        self._getMasterConnection().notify(p)
938

939 940 941 942 943 944 945 946 947 948
        # Just wait for responses to arrive. If any leads to an exception,
        # log it and continue: we *must* eat all answers to not disturb the
        # next transaction.
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            try:
                _waitAnyMessage()
            except:
949
                neo.logging.error('Exception in tpc_abort while handling ' \
950
                    'pending answers, ignoring.', exc_info=1)
951

952
        self.local_var.clear()
953

954
    @profiler_decorator
955
    def tpc_finish(self, transaction, tryToResolveConflict, f=None):
956
        """Finish current transaction."""
957 958
        local_var = self.local_var
        if local_var.txn is not transaction:
959
            raise StorageTransactionError('tpc_finish called for wrong '
960
                'transaction')
961
        if not local_var.txn_voted:
962
            self.tpc_vote(transaction, tryToResolveConflict)
963
        self._load_lock_acquire()
964
        try:
965 966 967 968 969 970
            # Call finish on master
            oid_list = local_var.data_list
            p = Packets.AskFinishTransaction(local_var.tid, oid_list)
            self._askPrimary(p)

            # From now on, self.local_var.tid holds the "real" TID.
971
            tid = local_var.tid
972 973
            # Call function given by ZODB
            if f is not None:
974
                f(tid)
975 976 977 978

            # Update cache
            self._cache_lock_acquire()
            try:
979
                mq_cache = self.mq_cache
980 981 982 983 984 985 986
                update = mq_cache.update
                def updateNextSerial(value):
                    data, next_tid = value
                    assert next_tid is None, (dump(oid), dump(base_tid),
                        dump(next_tid))
                    return (data, tid)
                get_baseTID = local_var.object_serial_dict.get
987
                for oid, data in local_var.data_dict.iteritems():
988 989 990 991 992 993 994 995 996 997 998
                    if data is None:
                        # this is just a remain of
                        # checkCurrentSerialInTransaction call, ignore (no data
                        # was modified).
                        continue
                    # Update ex-latest value in cache
                    base_tid = get_baseTID(oid)
                    try:
                        update((oid, base_tid), updateNextSerial)
                    except KeyError:
                        pass
999
                    if data == '':
1000
                        self.cache_revision_index.invalidate([oid], tid)
1001
                    else:
1002 1003
                        # Store in cache with no next_tid
                        mq_cache[(oid, tid)] = (data, None)
1004 1005
            finally:
                self._cache_lock_release()
1006
            local_var.clear()
1007
            return tid
1008
        finally:
1009
            self._load_lock_release()
1010

Vincent Pelletier's avatar
Vincent Pelletier committed
1011
    def undo(self, snapshot_tid, undone_tid, txn, tryToResolveConflict):
1012
        if txn is not self.local_var.txn:
1013
            raise StorageTransactionError(self, undone_tid)
1014

1015
        txn_info, txn_ext = self._getTransactionInformation(undone_tid)
1016
        txn_oid_list = txn_info['oids']
1017

1018 1019
        # Regroup objects per partition, to ask a minimum set of storage.
        partition_oid_dict = {}
1020
        pt = self.getPartitionTable()
1021
        for oid in txn_oid_list:
1022
            partition = pt.getPartition(oid)
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032
            try:
                oid_list = partition_oid_dict[partition]
            except KeyError:
                oid_list = partition_oid_dict[partition] = []
            oid_list.append(oid)

        # Ask storage the undo serial (serial at which object's previous data
        # is)
        getCellList = pt.getCellList
        getCellSortKey = self.cp.getCellSortKey
1033
        getConnForCell = self.cp.getConnForCell
1034
        queue = self.local_var.queue
1035 1036 1037 1038 1039
        undo_object_tid_dict = self.local_var.undo_object_tid_dict = {}
        for partition, oid_list in partition_oid_dict.iteritems():
            cell_list = getCellList(partition, readable=True)
            shuffle(cell_list)
            cell_list.sort(key=getCellSortKey)
1040
            storage_conn = getConnForCell(cell_list[0])
1041
            storage_conn.ask(Packets.AskObjectUndoSerial(self.local_var.tid,
Vincent Pelletier's avatar
Vincent Pelletier committed
1042
                snapshot_tid, undone_tid, oid_list), queue=queue)
1043 1044 1045 1046 1047 1048 1049

        # Wait for all AnswerObjectUndoSerial. We might get OidNotFoundError,
        # meaning that objects in transaction's oid_list do not exist any
        # longer. This is the symptom of a pack, so forbid undoing transaction
        # when it happens, but sill keep waiting for answers.
        failed = False
        while True:
1050
            try:
1051 1052 1053
                self.waitResponses()
            except NEOStorageNotFoundError:
                failed = True
1054
            else:
1055 1056 1057
                break
        if failed:
            raise UndoError('non-undoable transaction')
1058

1059
        # Send undo data to all storage nodes.
1060
        for oid in txn_oid_list:
1061 1062 1063 1064 1065 1066 1067 1068
            current_serial, undo_serial, is_current = undo_object_tid_dict[oid]
            if is_current:
                data = None
            else:
                # Serial being undone is not the latest version for this
                # object. This is an undo conflict, try to resolve it.
                try:
                    # Load the latest version we are supposed to see
Vincent Pelletier's avatar
Vincent Pelletier committed
1069
                    data = self.load(snapshot_tid, oid, serial=current_serial)[0]
1070
                    # Load the version we were undoing to
Vincent Pelletier's avatar
Vincent Pelletier committed
1071
                    undo_data = self.load(snapshot_tid, oid, serial=undo_serial)[0]
1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
                except NEOStorageNotFoundError:
                    raise UndoError('Object not found while resolving undo '
                        'conflict')
                # Resolve conflict
                try:
                    data = tryToResolveConflict(oid, current_serial,
                        undone_tid, undo_data, data)
                except ConflictError:
                    data = None
                if data is None:
                    raise UndoError('Some data were modified by a later ' \
                        'transaction', oid)
                undo_serial = None
            self._store(oid, current_serial, data, undo_serial)
1086

1087 1088 1089 1090
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

1091
    def _getTransactionInformation(self, tid):
1092
        packet = Packets.AskTransactionInformation(tid)
1093
        for node, conn in self.cp.iterateForObject(tid, readable=True):
1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
            try:
                self._askStorage(conn, packet)
            except ConnectionClosed:
                continue
            except NEOStorageNotFoundError:
                # TID not found
                continue
            break
        else:
            raise NEOStorageError('Transaction %r not found' % (tid, ))
1104 1105 1106 1107
        return (self.local_var.txn_info, self.local_var.txn_ext)

    def undoLog(self, first, last, filter=None, block=0):
        # XXX: undoLog is broken
1108 1109 1110 1111
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

1112
        # First get a list of transactions from all storage nodes.
1113 1114
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
1115
        pt = self.getPartitionTable()
1116
        storage_node_list = pt.getNodeList()
1117

1118
        self.local_var.node_tids = {}
1119
        queue = self.local_var.queue
1120
        for storage_node in storage_node_list:
1121
            conn = self.cp.getConnForNode(storage_node)
1122 1123
            if conn is None:
                continue
1124
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION), queue=queue)
1125 1126

        # Wait for answers from all storages.
1127
        self.waitResponses()
1128 1129

        # Reorder tids
1130 1131 1132
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
1133
            update(tid_list)
1134
        ordered_tids = list(ordered_tids)
1135
        ordered_tids.sort(reverse=True)
1136
        neo.logging.debug("UndoLog tids %s", [dump(x) for x in ordered_tids])
1137 1138
        # For each transaction, get info
        undo_info = []
1139
        append = undo_info.append
1140
        for tid in ordered_tids:
1141
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
1142
            if filter is None or filter(self.local_var.txn_info):
1143 1144
                txn_info = self.local_var.txn_info
                txn_info.pop('packed')
1145 1146
                txn_info.pop("oids")
                self._insertMetadata(txn_info, self.local_var.txn_ext)
1147
                append(txn_info)
1148 1149
                if len(undo_info) >= last - first:
                    break
1150 1151 1152
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
1153 1154
            undo_info = self.undoLog(first=first, last=last*5, filter=filter,
                    block=1)
1155 1156
        return undo_info

1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
    def transactionLog(self, start, stop, limit):
        node_map = self.pt.getNodeMap()
        node_list = node_map.keys()
        node_list.sort(key=self.cp.getCellSortKey)
        partition_set = set(range(self.pt.getPartitions()))
        queue = self.local_var.queue
        # request a tid list for each partition
        self.local_var.tids_from = set()
        for node in node_list:
            conn = self.cp.getConnForNode(node)
            request_set = set(node_map[node]) & partition_set
            if conn is None or not request_set:
                continue
            partition_set -= set(request_set)
            packet = Packets.AskTIDsFrom(start, stop, limit, request_set)
            conn.ask(packet, queue=queue)
            if not partition_set:
                break
        assert not partition_set
        self.waitResponses()
        # request transactions informations
        txn_list = []
        append = txn_list.append
        tid = None
        for tid in sorted(self.local_var.tids_from):
            (txn_info, txn_ext) = self._getTransactionInformation(tid)
            txn_info['ext'] = loads(self.local_var.txn_ext)
            append(txn_info)
        return (tid, txn_list)
1186

1187
    def history(self, oid, version=None, size=1, filter=None):
1188
        # Get history informations for object first
1189
        packet = Packets.AskObjectHistory(oid, 0, size)
1190
        for node, conn in self.cp.iterateForObject(oid, readable=True):
1191 1192
            # FIXME: we keep overwriting self.local_var.history here, we
            # should aggregate it instead.
1193
            self.local_var.history = None
1194
            try:
1195
                self._askStorage(conn, packet)
1196
            except ConnectionClosed:
1197
                continue
1198

1199 1200
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
1201
                raise NEOStorageError('inconsistency in storage: asked oid ' \
1202
                      '%r, got %r' % (oid, self.local_var.history[0]))
1203

1204
        if not isinstance(self.local_var.history, tuple):
1205 1206 1207
            raise NEOStorageError('history failed')

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
1208
        history_list = []
1209
        for serial, size in self.local_var.history[1]:
1210
            txn_info, txn_ext = self._getTransactionInformation(serial)
1211
            # create history dict
1212 1213 1214 1215 1216 1217 1218 1219 1220
            txn_info.pop('id')
            txn_info.pop('oids')
            txn_info.pop('packed')
            txn_info['tid'] = serial
            txn_info['version'] = ''
            txn_info['size'] = size
            if filter is None or filter(txn_info):
                history_list.append(txn_info)
            self._insertMetadata(txn_info, txn_ext)
1221 1222

        return history_list
Aurel's avatar
Aurel committed
1223

1224
    @profiler_decorator
1225
    def importFrom(self, source, start, stop, tryToResolveConflict):
1226
        serials = {}
1227
        transaction_iter = source.iterator(start, stop)
1228
        for transaction in transaction_iter:
1229 1230
            tid = transaction.tid
            self.tpc_begin(transaction, tid, transaction.status)
1231
            for r in transaction:
1232 1233
                oid = r.oid
                pre = serials.get(oid, None)
1234
                # TODO: bypass conflict resolution, locks...
1235 1236 1237 1238 1239 1240
                self.store(oid, pre, r.data, r.version, transaction)
                serials[oid] = tid
            conflicted = self.tpc_vote(transaction, tryToResolveConflict)
            assert not conflicted, conflicted
            real_tid = self.tpc_finish(transaction, tryToResolveConflict)
            assert real_tid == tid, (real_tid, tid)
1241 1242
        transaction_iter.close()

1243 1244 1245
    def iterator(self, start, stop):
        if start is None:
            start = ZERO_TID
1246 1247
        return Iterator(self, start, stop)

1248
    def lastTransaction(self):
1249
        self._askPrimary(Packets.AskLastTransaction())
1250
        return self.local_var.last_transaction
1251

1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

1262 1263 1264 1265 1266
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1267
            conn.close()
1268 1269
        self.cp.flush()
        self.master_conn = None
1270
        # Stop polling thread
1271
        neo.logging.debug('Stopping %s', self.poll_thread)
1272
        self.poll_thread.stop()
1273
        psThreadedPoll()
1274
    close = __del__
1275

1276 1277 1278
    def invalidationBarrier(self):
        self._askPrimary(Packets.AskBarrier())

1279 1280 1281 1282 1283 1284 1285 1286 1287
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1288
    def setTID(self, value):
1289
        self.local_var.tid = value
1290 1291

    def getTID(self):
1292
        return self.local_var.tid
1293

1294 1295 1296 1297 1298
    def pack(self, t):
        tid = repr(TimeStamp(*time.gmtime(t)[:5] + (t % 60, )))
        if tid == ZERO_TID:
            raise NEOStorageError('Invalid pack time')
        self._askPrimary(Packets.AskPack(tid))
1299 1300 1301 1302 1303 1304 1305 1306 1307
        # XXX: this is only needed to make ZODB unit tests pass.
        # It should not be otherwise required (clients should be free to load
        # old data as long as it is available in cache, event if it was pruned
        # by a pack), so don't bother invalidating on other clients.
        self._cache_lock_acquire()
        try:
            self.mq_cache.clear()
        finally:
            self._cache_lock_release()
1308

1309
    def getLastTID(self, oid):
Vincent Pelletier's avatar
Vincent Pelletier committed
1310
        return self.load(None, oid)[1]
1311 1312

    def checkCurrentSerialInTransaction(self, oid, serial, transaction):
1313 1314
        local_var = self.local_var
        if transaction is not local_var.txn:
1315
              raise StorageTransactionError(self, transaction)
1316 1317
        local_var.object_serial_dict[oid] = serial
        # Placeholders
1318
        queue = local_var.queue
1319 1320 1321 1322 1323 1324
        local_var.object_stored_counter_dict[oid] = {}
        data_dict = local_var.data_dict
        if oid not in data_dict:
            # Marker value so we don't try to resolve conflicts.
            data_dict[oid] = None
            local_var.data_list.append(oid)
1325
        packet = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
1326
        for node, conn in self.cp.iterateForObject(oid, writable=True):
1327
            try:
1328
                conn.ask(packet, queue=queue)
1329 1330 1331 1332
            except ConnectionClosed:
                continue

        self._waitAnyMessage(False)
1333