app.py 41.4 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from thread import get_ident
19
from cPickle import dumps, loads
20
from zlib import compress as real_compress, decompress
21
from neo.locking import Queue, Empty
22
from random import shuffle
23
import time
24

25
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
26
from ZODB.ConflictResolution import ResolvedSerial
27

28 29 30 31
from neo import setupLog
setupLog('CLIENT', verbose=True)

from neo import logging
32
from neo.protocol import NodeTypes, Packets, INVALID_PARTITION
33
from neo.event import EventManager
34
from neo.util import makeChecksum as real_makeChecksum, dump
35
from neo.locking import Lock
36
from neo.connection import MTClientConnection
37
from neo.node import NodeManager
38
from neo.connector import getConnectorHandler
39
from neo.client.exception import NEOStorageError
40
from neo.client.exception import NEOStorageNotFoundError, ConnectionClosed
41 42
from neo.exception import NeoException
from neo.client.handlers import storage, master
43
from neo.dispatcher import Dispatcher
44
from neo.client.poll import ThreadedPoll
45 46
from neo.client.iterator import Iterator
from neo.client.mq import MQ
47
from neo.client.pool import ConnectionPool
48
from neo.util import u64, parseMasterList
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
from neo.profiling import profiler_decorator, PROFILING_ENABLED

if PROFILING_ENABLED:
    # Those functions require a "real" python function wrapper before they can
    # be decorated.
    @profiler_decorator
    def compress(data):
        return real_compress(data)

    @profiler_decorator
    def makeChecksum(data):
        return real_makeChecksum(data)
else:
    # If profiling is disabled, directly use original functions.
    compress = real_compress
    makeChecksum = real_makeChecksum
Aurel's avatar
Aurel committed
65

66 67
class ThreadContext(object):

68
    def __init__(self):
69
        super(ThreadContext, self).__setattr__('_threads_dict', {})
70

71
    def __getThreadData(self):
72
        thread_id = get_ident()
73
        try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
74
            result = self._threads_dict[thread_id]
75
        except KeyError:
76 77
            self.clear(thread_id)
            result = self._threads_dict[thread_id]
78 79 80 81 82 83 84 85
        return result

    def __getattr__(self, name):
        thread_data = self.__getThreadData()
        try:
            return thread_data[name]
        except KeyError:
            raise AttributeError, name
86

87 88 89 90
    def __setattr__(self, name, value):
        thread_data = self.__getThreadData()
        thread_data[name] = value

91 92 93
    def clear(self, thread_id=None):
        if thread_id is None:
            thread_id = get_ident()
94 95 96 97 98
        thread_dict = self._threads_dict.get(thread_id)
        if thread_dict is None:
            queue = Queue(0)
        else:
            queue = thread_dict['queue']
99
        self._threads_dict[thread_id] = {
100 101 102
            'tid': None,
            'txn': None,
            'data_dict': {},
103 104 105
            'object_serial_dict': {},
            'object_stored_counter_dict': {},
            'conflict_serial_dict': {},
106
            'resolved_conflict_serial_dict': {},
107 108 109
            'object_stored': 0,
            'txn_voted': False,
            'txn_finished': False,
110
            'queue': queue,
111 112 113 114 115
            'txn_info': 0,
            'history': None,
            'node_tids': {},
            'node_ready': False,
            'asked_object': 0,
116 117
            'undo_conflict_oid_list': [],
            'undo_error_oid_list': [],
118 119 120
        }


Aurel's avatar
Aurel committed
121
class Application(object):
122 123
    """The client node application."""

124
    def __init__(self, master_nodes, name, connector=None, **kw):
125
        # Start polling thread
126 127
        self.em = EventManager()
        self.poll_thread = ThreadedPoll(self.em)
128
        # Internal Attributes common to all thread
129
        self._db = None
Aurel's avatar
Aurel committed
130
        self.name = name
131
        self.connector_handler = getConnectorHandler(connector)
132
        self.dispatcher = Dispatcher()
133
        self.nm = NodeManager()
134
        self.cp = ConnectionPool(self)
135
        self.pt = None
136
        self.master_conn = None
137
        self.primary_master_node = None
138
        self.trying_master_node = None
139 140

        # load master node list
141
        for address in parseMasterList(master_nodes):
142
            self.nm.createMaster(address=address)
143

144
        # no self-assigned UUID, primary master will supply us one
145
        self.uuid = None
146
        self.mq_cache = MQ()
147
        self.new_oid_list = []
148
        self.last_oid = '\0' * 8
149
        self.storage_event_handler = storage.StorageEventHandler(self)
150
        self.storage_bootstrap_handler = storage.StorageBootstrapHandler(self)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
151
        self.storage_handler = storage.StorageAnswersHandler(self)
152 153
        self.primary_handler = master.PrimaryAnswersHandler(self)
        self.primary_bootstrap_handler = master.PrimaryBootstrapHandler(self)
154
        self.notifications_handler = master.PrimaryNotificationsHandler( self)
155
        # Internal attribute distinct between thread
156
        self.local_var = ThreadContext()
157
        # Lock definition :
158
        # _load_lock is used to make loading and storing atomic
159
        lock = Lock()
160 161
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
162 163
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
164 165
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
166
        self._oid_lock_release = lock.release
167
        lock = Lock()
168
        # _cache_lock is used for the client cache
169 170
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
171
        lock = Lock()
172 173
        # _connecting_to_master_node is used to prevent simultaneous master
        # node connection attemps
174 175
        self._connecting_to_master_node_acquire = lock.acquire
        self._connecting_to_master_node_release = lock.release
176 177 178 179
        # _nm ensure exclusive access to the node manager
        lock = Lock()
        self._nm_acquire = lock.acquire
        self._nm_release = lock.release
180

181
    @profiler_decorator
182 183 184 185 186 187 188 189 190 191 192
    def _handlePacket(self, conn, packet, handler=None):
        """
          conn
            The connection which received the packet (forwarded to handler).
          packet
            The packet to handle.
          handler
            The handler to use to handle packet.
            If not given, it will be guessed from connection's not type.
        """
        if handler is None:
193 194
            # Guess the handler to use based on the type of node on the
            # connection
195 196 197 198 199 200 201 202 203 204
            node = self.nm.getByAddress(conn.getAddress())
            if node is None:
                raise ValueError, 'Expecting an answer from a node ' \
                    'which type is not known... Is this right ?'
            if node.isStorage():
                handler = self.storage_handler
            elif node.isMaster():
                handler = self.primary_handler
            else:
                raise ValueError, 'Unknown node type: %r' % (node.__class__, )
205 206 207 208 209
        conn.lock()
        try:
            handler.dispatch(conn, packet)
        finally:
            conn.unlock()
210

211
    @profiler_decorator
212 213 214 215 216 217 218
    def _waitAnyMessage(self, block=True):
        """
          Handle all pending packets.
          block
            If True (default), will block until at least one packet was
            received.
        """
219 220 221
        pending = self.dispatcher.pending
        queue = self.local_var.queue
        get = queue.get
222
        _handlePacket = self._handlePacket
223
        while pending(queue):
224 225 226
            try:
                conn, packet = get(block)
            except Empty:
227
                break
228 229 230
            if packet is None:
                # connection was closed
                continue
231 232 233 234 235 236
            block = False
            try:
                _handlePacket(conn, packet)
            except ConnectionClosed:
                pass

237
    @profiler_decorator
238 239 240 241 242 243 244 245 246 247 248 249 250
    def _waitMessage(self, target_conn, msg_id, handler=None):
        """Wait for a message returned by the dispatcher in queues."""
        get = self.local_var.queue.get
        _handlePacket = self._handlePacket
        while True:
            conn, packet = get(True)
            if target_conn is conn:
                # check fake packet
                if packet is None:
                    raise ConnectionClosed
                if msg_id == packet.getId():
                    self._handlePacket(conn, packet, handler=handler)
                    break
251 252
            elif packet is not None:
                self._handlePacket(conn, packet)
253

254
    @profiler_decorator
255
    def _askStorage(self, conn, packet):
256
        """ Send a request to a storage node and process it's answer """
257
        msg_id = conn.ask(packet)
258 259
        self._waitMessage(conn, msg_id, self.storage_handler)

260
    @profiler_decorator
261
    def _askPrimary(self, packet):
262
        """ Send a request to the primary master and process it's answer """
263
        conn = self._getMasterConnection()
264
        msg_id = conn.ask(packet)
265 266
        self._waitMessage(conn, msg_id, self.primary_handler)

267
    @profiler_decorator
268 269
    def _getMasterConnection(self):
        """ Connect to the primary master node on demand """
270
        # acquire the lock to allow only one thread to connect to the primary
271 272 273 274
        result = self.master_conn
        if result is None:
            self._connecting_to_master_node_acquire()
            try:
275
                self.new_oid_list = []
276 277 278 279 280
                result = self._connectToPrimaryNode()
                self.master_conn = result
            finally:
                self._connecting_to_master_node_release()
        return result
281

282
    def _getPartitionTable(self):
283
        """ Return the partition table manager, reconnect the PMN if needed """
284 285 286 287 288
        # this ensure the master connection is established and the partition
        # table is up to date.
        self._getMasterConnection()
        return self.pt

289
    @profiler_decorator
290 291
    def _getCellListForOID(self, oid, readable=False, writable=False):
        """ Return the cells available for the specified OID """
292
        pt = self._getPartitionTable()
293 294 295 296
        return pt.getCellListForOID(oid, readable, writable)

    def _getCellListForTID(self, tid, readable=False, writable=False):
        """ Return the cells available for the specified TID """
297
        pt = self._getPartitionTable()
298
        return pt.getCellListForTID(tid, readable, writable)
299

300
    @profiler_decorator
301
    def _connectToPrimaryNode(self):
302
        logging.debug('connecting to primary master...')
303 304 305 306 307 308 309 310 311 312
        ready = False
        nm = self.nm
        while not ready:
            # Get network connection to primary master
            index = 0
            connected = False
            while not connected:
                if self.primary_master_node is not None:
                    # If I know a primary master node, pinpoint it.
                    self.trying_master_node = self.primary_master_node
313
                    self.primary_master_node = None
314 315
                else:
                    # Otherwise, check one by one.
316
                    master_list = nm.getMasterList()
317 318 319
                    try:
                        self.trying_master_node = master_list[index]
                    except IndexError:
320
                        time.sleep(1)
321 322 323 324
                        index = 0
                        self.trying_master_node = master_list[0]
                    index += 1
                # Connect to master
325 326
                conn = MTClientConnection(self.local_var, self.em,
                        self.notifications_handler,
327
                        addr=self.trying_master_node.getAddress(),
328
                        connector=self.connector_handler(),
329
                        dispatcher=self.dispatcher)
330
                # Query for primary master node
331 332 333 334 335 336
                if conn.getConnector() is None:
                    # This happens if a connection could not be established.
                    logging.error('Connection to master node %s failed',
                                  self.trying_master_node)
                    continue
                msg_id = conn.ask(Packets.AskPrimary())
337
                try:
338
                    self._waitMessage(conn, msg_id,
339
                            handler=self.primary_bootstrap_handler)
340 341
                except ConnectionClosed:
                    continue
342
                # If we reached the primary master node, mark as connected
343 344
                connected = self.primary_master_node is not None and \
                        self.primary_master_node is self.trying_master_node
345

346
            logging.info('connected to a primary master node')
347
            # Identify to primary master and request initial data
348
            while conn.getUUID() is None:
349 350 351 352 353 354 355 356
                if conn.getConnector() is None:
                    logging.error('Connection to master node %s lost',
                                  self.trying_master_node)
                    self.primary_master_node = None
                    break
                p = Packets.RequestIdentification(NodeTypes.CLIENT,
                        self.uuid, None, self.name)
                msg_id = conn.ask(p)
357
                try:
358
                    self._waitMessage(conn, msg_id,
359
                            handler=self.primary_bootstrap_handler)
360 361 362
                except ConnectionClosed:
                    self.primary_master_node = None
                    break
363
                if conn.getUUID() is None:
364
                    # Node identification was refused by master.
365
                    time.sleep(1)
366
            if self.uuid is not None:
367
                msg_id = conn.ask(Packets.AskNodeInformation())
368
                self._waitMessage(conn, msg_id,
369
                        handler=self.primary_bootstrap_handler)
370
                msg_id = conn.ask(Packets.AskPartitionTable([]))
371
                self._waitMessage(conn, msg_id,
372
                        handler=self.primary_bootstrap_handler)
373
            ready = self.uuid is not None and self.pt is not None \
374
                                 and self.pt.operational()
375
        logging.info("connected to primary master node %s" %
376
                self.primary_master_node)
377
        return conn
378

379 380 381
    def registerDB(self, db, limit):
        self._db = db

382 383 384
    def getDB(self):
        return self._db

385
    @profiler_decorator
386 387 388 389 390
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
391 392 393 394
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
395
                self._askPrimary(Packets.AskNewOIDs(100))
396 397
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
398 399
            self.last_oid = self.new_oid_list.pop()
            return self.last_oid
400 401 402
        finally:
            self._oid_lock_release()

403 404 405
    def getStorageSize(self):
        # return the last OID used, this is innacurate
        return int(u64(self.last_oid))
406

407
    @profiler_decorator
Aurel's avatar
Aurel committed
408 409 410 411
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
412 413
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
414 415 416
        finally:
            self._cache_lock_release()
        # history return serial, so use it
Grégory Wisniewski's avatar
Grégory Wisniewski committed
417
        hist = self.history(oid, size=1, object_only=1)
Aurel's avatar
Aurel committed
418 419
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
420 421 422 423
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
424

425
    @profiler_decorator
426
    def _load(self, oid, serial=None, tid=None, cache=0):
Aurel's avatar
Aurel committed
427
        """Internal method which manage load ,loadSerial and loadBefore."""
428
        cell_list = self._getCellListForOID(oid, readable=True)
429 430
        if len(cell_list) == 0:
            # No cells available, so why are we running ?
431 432
            logging.error('oid %s not found because no storage is ' \
                    'available for it', dump(oid))
433 434 435 436 437 438 439
            raise NEOStorageNotFoundError()

        shuffle(cell_list)
        self.local_var.asked_object = 0
        for cell in cell_list:
            logging.debug('trying to load %s from %s',
                          dump(oid), dump(cell.getUUID()))
440
            conn = self.cp.getConnForCell(cell)
441 442
            if conn is None:
                continue
443

444
            try:
445
                self._askStorage(conn, Packets.AskObject(oid, serial, tid))
446
            except ConnectionClosed:
447
                continue
448

449 450 451 452 453 454 455 456 457 458
            if self.local_var.asked_object == -1:
                # OID not found
                break

            # Check data
            noid, start_serial, end_serial, compression, checksum, data \
                = self.local_var.asked_object
            if noid != oid:
                # Oops, try with next node
                logging.error('got wrong oid %s instead of %s from node %s',
459
                              noid, dump(oid), cell.getAddress())
460 461 462 463 464
                self.local_var.asked_object = -1
                continue
            elif checksum != makeChecksum(data):
                # Check checksum.
                logging.error('wrong checksum from node %s for oid %s',
465
                              cell.getAddress(), dump(oid))
466 467 468 469 470 471 472
                self.local_var.asked_object = -1
                continue
            else:
                # Everything looks alright.
                break

        if self.local_var.asked_object == 0:
473
            # We didn't got any object from all storage node because of
474
            # connection error
475
            logging.warning('oid %s not found because of connection failure',
476
                    dump(oid))
477
            raise NEOStorageNotFoundError()
478

Aurel's avatar
Aurel committed
479
        if self.local_var.asked_object == -1:
480
            # We didn't got any object from all storage node
481
            logging.info('oid %s not found', dump(oid))
482
            raise NEOStorageNotFoundError()
483

484
        # Uncompress data
Aurel's avatar
Aurel committed
485
        if compression:
486
            data = decompress(data)
487

Aurel's avatar
Aurel committed
488 489
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
490
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
491
            try:
492
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
493
            finally:
Aurel's avatar
Aurel committed
494
                self._cache_lock_release()
495 496
        if data == '':
            data = None
497
        return data, start_serial, end_serial
498

499

500
    @profiler_decorator
501 502 503
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
504
        self._load_lock_acquire()
505
        try:
506 507 508 509
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
510 511
                    serial, data = self.mq_cache[oid]
                    return data, serial
512 513 514 515
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
516
        finally:
517
            self._load_lock_release()
Aurel's avatar
Aurel committed
518

519

520
    @profiler_decorator
521
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
522
        """Load an object for a given oid and serial."""
523 524
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
525
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
526

527

528
    @profiler_decorator
529
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
530
        """Load an object for a given oid before tid committed."""
531 532
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s before %s', dump(oid), dump(tid))
533
        data, start, end = self._load(oid, tid=tid)
534 535 536 537 538
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
539

540

541
    @profiler_decorator
542 543 544
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
545
        if self.local_var.txn is transaction:
546
            # We already begin the same transaction
547
            return
548 549
        if self.local_var.txn is not None:
            raise NeoException, 'local_var is not clean in tpc_begin'
550 551 552 553
        # ask the primary master to start a transaction, if no tid is supplied,
        # the master will supply us one. Otherwise the requested tid will be
        # used if possible.
        self.local_var.tid = None
554
        self._askPrimary(Packets.AskBeginTransaction(tid))
555 556
        if self.local_var.tid is None:
            raise NEOStorageError('tpc_begin failed')
557
        self.local_var.txn = transaction
558

559

560
    @profiler_decorator
Vincent Pelletier's avatar
Vincent Pelletier committed
561
    def store(self, oid, serial, data, version, transaction):
562
        """Store object."""
563
        if transaction is not self.local_var.txn:
564
            raise StorageTransactionError(self, transaction)
Aurel's avatar
Aurel committed
565
        logging.debug('storing oid %s serial %s',
566
                     dump(oid), dump(serial))
567
        # Find which storage node to use
568
        cell_list = self._getCellListForOID(oid, writable=True)
569
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
570
            raise NEOStorageError
571 572 573
        if data is None:
            # this is a George Bailey object, stored as an empty string
            data = ''
574
        compressed_data = compress(data)
575
        if len(compressed_data) > len(data):
576 577 578 579
            compressed_data = data
            compression = 0
        else:
            compression = 1
580
        checksum = makeChecksum(compressed_data)
581
        p = Packets.AskStoreObject(oid, serial, compression,
582
                 checksum, compressed_data, self.local_var.tid)
583 584
        # Store object in tmp cache
        self.local_var.data_dict[oid] = data
585
        # Store data on each node
586 587
        self.local_var.object_stored_counter_dict[oid] = 0
        self.local_var.object_serial_dict[oid] = (serial, version)
588
        getConnForCell = self.cp.getConnForCell
589
        for cell in cell_list:
590
            conn = getConnForCell(cell)
591
            if conn is None:
592
                continue
593
            try:
594
                conn.ask(p)
595
            except ConnectionClosed:
596
                continue
597

598 599
        self._waitAnyMessage(False)
        return None
600

601
    @profiler_decorator
602 603 604 605 606 607 608
    def _handleConflicts(self, tryToResolveConflict):
        result = []
        append = result.append
        local_var = self.local_var
        # Check for conflicts
        data_dict = local_var.data_dict
        object_serial_dict = local_var.object_serial_dict
609 610 611
        conflict_serial_dict = local_var.conflict_serial_dict
        resolved_conflict_serial_dict = local_var.resolved_conflict_serial_dict
        for oid, conflict_serial in conflict_serial_dict.items():
612 613 614 615 616 617 618 619
            serial, version = object_serial_dict[oid]
            data = data_dict[oid]
            tid = local_var.tid
            resolved = False
            if conflict_serial <= tid:
                new_data = tryToResolveConflict(oid, conflict_serial, serial,
                    data)
                if new_data is not None:
620
                    # Mark this conflict as resolved
621 622
                    resolved_conflict_serial_dict[oid] = \
                        conflict_serial_dict.pop(oid)
623 624
                    # Try to store again
                    self.store(oid, conflict_serial, new_data, version,
Vincent Pelletier's avatar
Vincent Pelletier committed
625
                        local_var.txn)
626 627 628 629 630 631 632 633
                    append(oid)
                    resolved = True
            if not resolved:
                # XXX: Is it really required to remove from data_dict ?
                del data_dict[oid]
                raise ConflictError(oid=oid,
                    serials=(tid, serial), data=data)
        return result
634

635 636 637 638 639 640 641 642 643 644
    @profiler_decorator
    def waitResponses(self):
        """Wait for all requests to be answered (or their connection to be
        dected as closed)"""
        queue = self.local_var.queue
        pending = self.dispatcher.pending
        _waitAnyMessage = self._waitAnyMessage
        while pending(queue):
            _waitAnyMessage()

645
    @profiler_decorator
646 647 648 649 650 651 652 653 654
    def waitStoreResponses(self, tryToResolveConflict):
        result = []
        append = result.append
        resolved_oid_set = set()
        update = resolved_oid_set.update
        local_var = self.local_var
        tid = local_var.tid
        _handleConflicts = self._handleConflicts
        while True:
655
            self.waitResponses()
656 657 658 659 660 661 662
            conflicts = _handleConflicts(tryToResolveConflict)
            if conflicts:
                update(conflicts)
            else:
                # No more conflict resolutions to do, no more pending store
                # requests
                break
663

Vincent Pelletier's avatar
Vincent Pelletier committed
664 665 666 667 668 669 670 671 672
        # Check for never-stored objects, and update result for all others
        for oid, store_count in \
            local_var.object_stored_counter_dict.iteritems():
            if store_count == 0:
                raise NEOStorageError('tpc_store failed')
            elif oid in resolved_oid_set:
                append((oid, ResolvedSerial))
            else:
                append((oid, tid))
673
        return result
Aurel's avatar
Aurel committed
674

675
    @profiler_decorator
676
    def tpc_vote(self, transaction, tryToResolveConflict):
677
        """Store current transaction."""
678 679
        local_var = self.local_var
        if transaction is not local_var.txn:
680
            raise StorageTransactionError(self, transaction)
681 682 683

        result = self.waitStoreResponses(tryToResolveConflict)

684
        tid = local_var.tid
685
        # Store data on each node
686
        voted_counter = 0
687 688 689 690
        p = Packets.AskStoreTransaction(tid, transaction.user,
            transaction.description, dumps(transaction._extension),
            local_var.data_dict.keys())
        for cell in self._getCellListForTID(tid, writable=True):
691 692
            logging.debug("voting object %s %s", cell.getAddress(),
                cell.getState())
693
            conn = self.cp.getConnForCell(cell)
694 695
            if conn is None:
                continue
696

697
            local_var.txn_voted = False
698 699
            try:
                self._askStorage(conn, p)
700
            except ConnectionClosed:
701
                continue
702

703
            if not self.isTransactionVoted():
704
                raise NEOStorageError('tpc_vote failed')
705
            voted_counter += 1
706 707

        # check at least one storage node accepted
708
        if voted_counter == 0:
709
            raise NEOStorageError('tpc_vote failed')
710 711 712 713 714
        # Check if master connection is still alive.
        # This is just here to lower the probability of detecting a problem
        # in tpc_finish, as we should do our best to detect problem before
        # tpc_finish.
        self._getMasterConnection()
715

716 717
        return result

718
    @profiler_decorator
719 720
    def tpc_abort(self, transaction):
        """Abort current transaction."""
721
        if transaction is not self.local_var.txn:
722
            return
Aurel's avatar
Aurel committed
723

724 725
        # Just wait for response to arrive, don't handle any conflict, and
        # ignore the outcome: we are going to abort anyway.
Vincent Pelletier's avatar
Vincent Pelletier committed
726
        self.waitResponses()
727

728
        cell_set = set()
729 730
        # select nodes where objects were stored
        for oid in self.local_var.data_dict.iterkeys():
731
            cell_set |= set(self._getCellListForOID(oid, writable=True))
732
        # select nodes where transaction was stored
733
        cell_set |= set(self._getCellListForTID(self.local_var.tid,
734
            writable=True))
Aurel's avatar
Aurel committed
735

736 737
        # cancel transaction one all those nodes
        for cell in cell_set:
738
            conn = self.cp.getConnForCell(cell)
739 740
            if conn is None:
                continue
741
            conn.notify(Packets.AbortTransaction(self.local_var.tid))
742

743
        # Abort the transaction in the primary master node.
744
        conn = self._getMasterConnection()
745
        conn.notify(Packets.AbortTransaction(self.local_var.tid))
746
        self.local_var.clear()
747

748
    @profiler_decorator
749 750
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
751
        if self.local_var.txn is not transaction:
752
            return
753
        self._load_lock_acquire()
754
        try:
755
            tid = self.local_var.tid
756 757
            # Call function given by ZODB
            if f is not None:
758
                f(tid)
759 760

            # Call finish on master
761
            oid_list = self.local_var.data_dict.keys()
762
            p = Packets.AskFinishTransaction(oid_list, tid)
763
            self._askPrimary(p)
764

765
            if not self.isTransactionFinished():
766
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
767

768 769 770
            # Update cache
            self._cache_lock_acquire()
            try:
771
                mq_cache = self.mq_cache
772
                for oid, data in self.local_var.data_dict.iteritems():
773
                    if data == '':
774 775
                        if oid in mq_cache:
                            del mq_cache[oid]
776 777
                    else:
                        # Now serial is same as tid
778
                        mq_cache[oid] = tid, data
779 780
            finally:
                self._cache_lock_release()
781
            self.local_var.clear()
782
            return tid
783
        finally:
784
            self._load_lock_release()
785

786
    def undo(self, undone_tid, txn, tryToResolveConflict):
787
        if txn is not self.local_var.txn:
788
            raise StorageTransactionError(self, undone_tid)
789

790
        # First get transaction information from a storage node.
791
        cell_list = self._getCellListForTID(undone_tid, readable=True)
792
        assert len(cell_list), 'No cell found for transaction %s' % (
793
            dump(undone_tid), )
794 795
        shuffle(cell_list)
        for cell in cell_list:
796
            conn = self.cp.getConnForCell(cell)
797 798
            if conn is None:
                continue
799

800
            self.local_var.txn_info = 0
801
            self.local_var.txn_ext = 0
802
            try:
803
                self._askStorage(conn, Packets.AskTransactionInformation(
804
                    undone_tid))
805
            except ConnectionClosed:
806
                continue
807

808 809
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
810
                logging.warning('Transaction %s was not found on node %s',
811
                    dump(undone_tid), self.nm.getByAddress(conn.getAddress()))
812
                continue
813
            elif isinstance(self.local_var.txn_info, dict):
814 815 816
                break
            else:
                raise NEOStorageError('undo failed')
817
        else:
818 819
            raise NEOStorageError('undo failed')

820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
        if self.local_var.txn_info['packed']:
            UndoError('non-undoable transaction')

        tid = self.local_var.tid

        undo_conflict_oid_list = self.local_var.undo_conflict_oid_list = []
        undo_error_oid_list = self.local_var.undo_error_oid_list = []
        ask_undo_transaction = Packets.AskUndoTransaction(tid, undone_tid)
        getConnForNode = self.cp.getConnForNode
        for storage_node in self.nm.getStorageList():
            storage_conn = getConnForNode(storage_node)
            storage_conn.ask(ask_undo_transaction)
        # Wait for all AnswerUndoTransaction.
        self.waitResponses()

        # Don't do any handling for "live" conflicts, raise
        if undo_conflict_oid_list:
            raise ConflictError(oid=undo_conflict_oid_list[0], serials=(tid,
                undone_tid), data=None)

        # Try to resolve undo conflicts
        for oid in undo_error_oid_list:
            def loadBefore(oid, tid):
                try:
                    result = self._load(oid, tid=tid)
                except NEOStorageNotFoundError:
                    raise UndoError("Object not found while resolving undo " \
                        "conflict")
                return result[:2]
            # Load the latest version we are supposed to see
            data, data_tid = loadBefore(oid, tid)
            # Load the version we were undoing to
            undo_data, _ = loadBefore(oid, undone_tid)
            # Resolve conflict
            new_data = tryToResolveConflict(oid, data_tid, undone_tid, undo_data,
                data)
            if new_data is None:
                raise UndoError('Some data were modified by a later ' \
                    'transaction', oid)
            else:
                self.store(oid, data_tid, new_data, '', self.local_var.txn)

862
        oid_list = self.local_var.txn_info['oids']
863 864 865
        # Consistency checking: all oids of the transaction must have been
        # reported as undone
        data_dict = self.local_var.data_dict
Aurel's avatar
Aurel committed
866
        for oid in oid_list:
867
            assert oid in data_dict, repr(oid)
868
        return self.local_var.tid, oid_list
869

870 871 872 873
    def _insertMetadata(self, txn_info, extension):
        for k, v in loads(extension).items():
            txn_info[k] = v

874
    def __undoLog(self, first, last, filter=None, block=0, with_oids=False):
875 876 877 878
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

879
        # First get a list of transactions from all storage nodes.
880 881
        # Each storage node will return TIDs only for UP_TO_DATE state and
        # FEEDING state cells
882
        pt = self._getPartitionTable()
883
        storage_node_list = pt.getNodeList()
884

885 886
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
887
            conn = self.cp.getConnForNode(storage_node)
888 889
            if conn is None:
                continue
890
            conn.ask(Packets.AskTIDs(first, last, INVALID_PARTITION))
891 892

        # Wait for answers from all storages.
893
        while len(self.local_var.node_tids) != len(storage_node_list):
894
            self._waitAnyMessage()
895 896

        # Reorder tids
897 898 899
        ordered_tids = set()
        update = ordered_tids.update
        for tid_list in self.local_var.node_tids.itervalues():
900
            update(tid_list)
901
        ordered_tids = list(ordered_tids)
902
        ordered_tids.sort(reverse=True)
903
        logging.debug("UndoLog, tids %s", ordered_tids)
904 905
        # For each transaction, get info
        undo_info = []
906
        append = undo_info.append
907
        for tid in ordered_tids:
908
            cell_list = self._getCellListForTID(tid, readable=True)
909 910
            shuffle(cell_list)
            for cell in cell_list:
911
                conn = self.cp.getConnForCell(cell)
912 913
                if conn is not None:
                    self.local_var.txn_info = 0
914
                    self.local_var.txn_ext = 0
915
                    try:
916
                        self._askStorage(conn,
917
                                Packets.AskTransactionInformation(tid))
918
                    except ConnectionClosed:
919 920 921
                        continue
                    if isinstance(self.local_var.txn_info, dict):
                        break
922

923
            if self.local_var.txn_info in (-1, 0):
Aurel's avatar
Aurel committed
924
                # TID not found at all
925 926 927
                raise NeoException, 'Data inconsistency detected: ' \
                                    'transaction info for TID %r could not ' \
                                    'be found' % (tid, )
Aurel's avatar
Aurel committed
928

929
            if filter is None or filter(self.local_var.txn_info):
930
                self.local_var.txn_info.pop('packed')
931 932
                if not with_oids:
                    self.local_var.txn_info.pop("oids")
933
                append(self.local_var.txn_info)
934 935
                self._insertMetadata(self.local_var.txn_info,
                        self.local_var.txn_ext)
936 937
                if len(undo_info) >= last - first:
                    break
938 939 940
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
941 942
            undo_info = self.__undoLog(first=first, last=last*5, filter=filter,
                    block=1, with_oids=with_oids)
943 944
        return undo_info

945
    def undoLog(self, first, last, filter=None, block=0):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
946
        return self.__undoLog(first, last, filter, block)
947 948

    def transactionLog(self, first, last):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
949
        return self.__undoLog(first, last, with_oids=True)
950

Grégory Wisniewski's avatar
Grégory Wisniewski committed
951
    def history(self, oid, version=None, size=1, filter=None, object_only=0):
952
        # Get history informations for object first
953
        cell_list = self._getCellListForOID(oid, readable=True)
954 955 956
        shuffle(cell_list)

        for cell in cell_list:
957
            conn = self.cp.getConnForCell(cell)
958 959
            if conn is None:
                continue
960

961
            self.local_var.history = None
962
            try:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
963
                self._askStorage(conn, Packets.AskObjectHistory(oid, 0, size))
964
            except ConnectionClosed:
965
                continue
966

967 968 969 970 971
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
972 973 974
                raise NEOStorageError('inconsistency in storage: asked oid ' \
                                      '%r, got %r' % (
                                      oid, self.local_var.history[0]))
975

976
        if not isinstance(self.local_var.history, tuple):
977 978
            raise NEOStorageError('history failed')

979 980
        if self.local_var.history[1] == [] or \
            self.local_var.history[1][0][1] == 0:
981 982 983 984
            # KeyError expected if no history was found
            # XXX: this may requires an error from the storages
            raise KeyError

985 986 987 988
        if object_only:
            # Use by getSerial
            return self.local_var.history

989
        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
990
        history_list = []
991
        for serial, size in self.local_var.history[1]:
992
            self._getCellListForTID(serial, readable=True)
993 994 995
            shuffle(cell_list)

            for cell in cell_list:
996
                conn = self.cp.getConnForCell(cell)
997 998
                if conn is None:
                    continue
999

1000 1001
                # ask transaction information
                self.local_var.txn_info = None
1002
                try:
1003
                    self._askStorage(conn,
1004
                            Packets.AskTransactionInformation(serial))
1005
                except ConnectionClosed:
1006
                    continue
1007

1008 1009
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
1010
                    continue
1011
                if isinstance(self.local_var.txn_info, dict):
1012 1013 1014
                    break

            # create history dict
1015 1016
            self.local_var.txn_info.pop('id')
            self.local_var.txn_info.pop('oids')
1017
            self.local_var.txn_info.pop('packed')
1018
            self.local_var.txn_info['tid'] = serial
1019
            self.local_var.txn_info['version'] = ''
1020
            self.local_var.txn_info['size'] = size
1021 1022
            if filter is None or filter(self.local_var.txn_info):
                history_list.append(self.local_var.txn_info)
1023 1024
            self._insertMetadata(self.local_var.txn_info,
                    self.local_var.txn_ext)
1025 1026

        return history_list
Aurel's avatar
Aurel committed
1027

1028
    @profiler_decorator
1029
    def importFrom(self, source, start, stop, tryToResolveConflict):
1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
        serials = {}
        def updateLastSerial(oid, result):
            if result:
                if isinstance(result, str):
                    assert oid is not None
                    serials[oid] = result
                else:
                    for oid, serial in result:
                        assert isinstance(serial, str), serial
                        serials[oid] = serial
1040
        transaction_iter = source.iterator(start, stop)
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
        for transaction in transaction_iter:
            self.tpc_begin(transaction, transaction.tid, transaction.status)
            for r in transaction:
                pre = serials.get(r.oid, None)
                # TODO: bypass conflict resolution, locks...
                result = self.store(r.oid, pre, r.data, r.version, transaction)
                updateLastSerial(r.oid, result)
            updateLastSerial(None, self.tpc_vote(transaction,
                        tryToResolveConflict))
            self.tpc_finish(transaction)
        transaction_iter.close()

1053 1054 1055
    def iterator(self, start=None, stop=None):
        return Iterator(self, start, stop)

1056 1057 1058 1059 1060
    def lastTransaction(self):
        # XXX: this doesn't consider transactions created by other clients,
        #  should ask the primary master
        return self.local_var.tid

1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
    def abortVersion(self, src, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

    def commitVersion(self, src, dest, transaction):
        if transaction is not self.local_var.txn:
            raise StorageTransactionError(self, transaction)
        return '', []

Grégory Wisniewski's avatar
Grégory Wisniewski committed
1071 1072 1073 1074
    def loadEx(self, oid, version):
        data, serial = self.load(oid=oid)
        return data, serial, ''

1075 1076 1077 1078 1079
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
1080
            conn.close()
1081 1082
        # Stop polling thread
        self.poll_thread.stop()
1083
    close = __del__
1084 1085

    def sync(self):
1086
        self._waitAnyMessage(False)
1087

1088 1089 1090 1091 1092 1093 1094 1095 1096
    def setNodeReady(self):
        self.local_var.node_ready = True

    def setNodeNotReady(self):
        self.local_var.node_ready = False

    def isNodeReady(self):
        return self.local_var.node_ready

1097
    def setTID(self, value):
1098
        self.local_var.tid = value
1099 1100

    def getTID(self):
1101
        return self.local_var.tid
1102 1103

    def setTransactionFinished(self):
1104
        self.local_var.txn_finished = True
1105 1106

    def isTransactionFinished(self):
1107
        return self.local_var.txn_finished
1108 1109

    def setTransactionVoted(self):
1110
        self.local_var.txn_voted = True
1111 1112

    def isTransactionVoted(self):
1113
        return self.local_var.txn_voted
1114