app.py 32.3 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18 19
import logging
import os
Yoshinori Okuji's avatar
Yoshinori Okuji committed
20
from threading import Lock, RLock, local
21
from cPickle import dumps, loads
22
from zlib import compress, decompress
23
from Queue import Queue, Empty
24
from random import shuffle
Yoshinori Okuji's avatar
Yoshinori Okuji committed
25
from time import sleep
26 27

from neo.client.mq import MQ
28
from neo.node import NodeManager, MasterNode, StorageNode
29
from neo.connection import MTClientConnection
30
from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
31
        STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
Yoshinori Okuji's avatar
Yoshinori Okuji committed
32
        RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
33
        UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
Aurel's avatar
Aurel committed
34
from neo.client.handler import ClientEventHandler
35 36
from neo.client.Storage import NEOStorageError, NEOStorageConflictError, \
     NEOStorageNotFoundError
37
from neo.util import makeChecksum, dump
38

39
from ZODB.POSException import UndoError, StorageTransactionError, ConflictError
40
from ZODB.utils import p64, u64, oid_repr
Aurel's avatar
Aurel committed
41

42 43
class ConnectionPool(object):
    """This class manages a pool of connections to storage nodes."""
44

45
    def __init__(self, app, max_pool_size = 25):
46
        self.app = app
47
        self.max_pool_size = max_pool_size
48
        self.connection_dict = {}
49 50 51
        # Define a lock in order to create one connection to
        # a storage node at a time to avoid multiple connections
        # to the same node.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52
        l = RLock()
53 54
        self.connection_lock_acquire = l.acquire
        self.connection_lock_release = l.release
Aurel's avatar
Aurel committed
55

56
    def _initNodeConnection(self, node):
Aurel's avatar
Aurel committed
57
        """Init a connection to a given storage node."""
Aurel's avatar
Aurel committed
58
        addr = node.getNode().getServer()
59 60
        if addr is None:
            return None
61

Yoshinori Okuji's avatar
Yoshinori Okuji committed
62 63 64 65 66 67 68 69 70 71 72
        if node.getState() != RUNNING_STATE:
            return None

        app = self.app
        handler = ClientEventHandler(app, app.dispatcher)

        # Loop until a connection is obtained.
        while 1:
            logging.info('trying to connect to %s:%d', *addr)
            app.local_var.node_not_ready = 0
            conn = MTClientConnection(app.em, handler, addr)
73 74
            conn.lock()
            try:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
75 76 77 78 79
                if conn.getSocket() is None:
                    # This happens, if a connection could not be established.
                    logging.error('Connection to storage node %s failed', addr)
                    return None

80 81
                msg_id = conn.getNextId()
                p = Packet()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
82 83 84
                p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE,
                                            app.uuid, addr[0],
                                            addr[1], app.name)
85
                conn.addPacket(p)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
86 87
                conn.expectMessage(msg_id)
                app.dispatcher.register(conn, msg_id, app.getQueue())
88 89 90
            finally:
                conn.unlock()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
91 92 93 94 95 96 97 98 99
            try:
                app._waitMessage(conn, msg_id)
            except NEOStorageError:
                logging.error('Connection to storage node %s failed', addr)
                return None

            if app.local_var.node_not_ready:
                # Connection failed, notify primary master node
                logging.info('Storage node %s not ready', addr)
Aurel's avatar
Aurel committed
100
                return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
101 102 103 104 105
            else:
                logging.info('connected to storage node %s:%d', *addr)
                return conn

            sleep(1)
Aurel's avatar
Aurel committed
106

107 108
    def _dropConnections(self):
        """Drop connections."""
Aurel's avatar
Aurel committed
109 110
        for node_uuid, conn in self.connection_dict.items():
            # Drop first connection which looks not used
Aurel's avatar
Aurel committed
111 112
            conn.lock()
            try:
113
                if not conn.pending() and \
114
                        not self.app.dispatcher.registered(id(conn)):
115
                    del self.connection_dict[conn.getUUID()]
Aurel's avatar
Aurel committed
116
                    conn.close()
Aurel's avatar
Aurel committed
117
                    logging.info('_dropConnections : connection to storage node %s:%d closed', 
118 119 120
                                 *(conn.getAddress()))
                    if len(self.connection_dict) <= self.max_pool_size:
                        break
Aurel's avatar
Aurel committed
121 122
            finally:
                conn.unlock()
123 124 125

    def _createNodeConnection(self, node):
        """Create a connection to a given storage node."""
126
        if len(self.connection_dict) > self.max_pool_size:
Aurel's avatar
Aurel committed
127
            # must drop some unused connections
128
            self._dropConnections()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
129 130 131 132 133 134 135

        self.connection_lock_release()
        try:
            conn = self._initNodeConnection(node)
        finally:
            self.connection_lock_acquire()

Aurel's avatar
Aurel committed
136 137
        if conn is None:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
138

Aurel's avatar
Aurel committed
139 140 141 142 143 144 145 146 147 148 149
        # add node to node manager
        if self.app.nm.getNodeByServer(node.getServer()) is None:
            n = StorageNode(node.getServer())
            self.app.nm.add(n)
        self.connection_dict[node.getUUID()] = conn
        conn.lock()
        return conn

    def getConnForNode(self, node):
        """Return a locked connection object to a given node
        If no connection exists, create a new one"""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
150
        uuid = node.getUUID()
151 152
        self.connection_lock_acquire()
        try:
153 154
            try:
                conn = self.connection_dict[uuid]
Aurel's avatar
Aurel committed
155 156 157
                # Already connected to node
                conn.lock()
                return conn
158
            except KeyError:
Aurel's avatar
Aurel committed
159 160
                # Create new connection to node
                return self._createNodeConnection(node)
161 162 163
        finally:
            self.connection_lock_release()

164 165
    def removeConnection(self, node):
        """Explicitly remove connection when a node is broken."""
Aurel's avatar
Aurel committed
166 167
        self.connection_lock_acquire()
        try:
168 169 170 171
            try:
                del self.connection_dict[node.getUUID()]
            except KeyError:
                pass
Aurel's avatar
Aurel committed
172 173
        finally:
            self.connection_lock_release()
174

175

Aurel's avatar
Aurel committed
176
class Application(object):
177 178
    """The client node application."""

179
    def __init__(self, master_nodes, name, em, dispatcher, request_queue, **kw):
180
        logging.basicConfig(level = logging.DEBUG)
181
        logging.debug('master node address are %s' %(master_nodes,))
182
        # Internal Attributes common to all thread
Aurel's avatar
Aurel committed
183
        self.name = name
184 185
        self.em = em
        self.dispatcher = dispatcher
186
        self.nm = NodeManager()
187
        self.cp = ConnectionPool(self)
188
        self.pt = None
189
        self.request_queue = request_queue
190
        self.primary_master_node = None
191
        self.master_node_list = master_nodes.split(' ')
192 193 194
        self.master_conn = None
        self.uuid = None
        self.mq_cache = MQ()
195
        self.new_oid_list = []
196
        self.ptid = None
197 198
        self.num_replicas = 0
        self.num_partitions = 0
199 200 201 202
        # Transaction specific variable
        self.tid = None
        self.txn = None
        self.txn_data_dict = {}
Aurel's avatar
Aurel committed
203
        self.txn_object_stored = 0
204 205 206 207
        self.txn_voted = 0
        self.txn_finished = 0
        # Internal attribute distinct between thread
        self.local_var = local()
208
        # Lock definition :
209
        # _load_lock is used to make loading and storing atmic
210 211 212
        # _oid_lock is used in order to not call multiple oid
        # generation at the same time
        # _cache_lock is used for the client cache
213
        lock = Lock()
214 215
        self._load_lock_acquire = lock.acquire
        self._load_lock_release = lock.release
216 217
        lock = Lock()
        self._oid_lock_acquire = lock.acquire
Aurel's avatar
Aurel committed
218
        self._oid_lock_release = lock.release
219 220 221 222 223 224 225 226 227 228 229 230
        lock = Lock()
        self._cache_lock_acquire = lock.acquire
        self._cache_lock_release = lock.release
        # XXX Generate an UUID for self. For now, just use a random string.
        # Avoid an invalid UUID.
        if self.uuid is None:
            while 1:
                uuid = os.urandom(16)
                if uuid != INVALID_UUID:
                    break
            self.uuid = uuid

231
    def getQueue(self):
232 233 234 235 236
        try:
            return self.local_var.queue
        except AttributeError:
            self.local_var.queue = Queue(5)
            return self.local_var.queue
237 238 239 240 241 242

    def _waitMessage(self, target_conn = None, msg_id = None):
        """Wait for a message returned by the dispatcher in queues."""
        global_queue = self.request_queue
        local_queue = self.getQueue()

243 244
        while 1:
            try:
245
                conn, packet = global_queue.get_nowait()
246
            except Empty:
247 248 249 250 251 252 253 254
                if msg_id is None:
                    try:
                        conn, packet = local_queue.get_nowait()
                    except Empty:
                        break
                else:
                    conn, packet = local_queue.get()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
255 256 257 258 259 260
            if packet is None:
                if conn is target_conn:
                    raise NEOStorageError('connection closed')
                else:
                    continue

261
            conn.lock()
262
            try:
263 264 265
                conn.handler.dispatch(conn, packet)
            finally:
                conn.unlock()
266

Yoshinori Okuji's avatar
Yoshinori Okuji committed
267 268
            if target_conn is conn and msg_id == packet.getId() \
                    and packet.getType() & 0x8000:
269
                break
270

271 272 273
    def registerDB(self, db, limit):
        self._db = db

274 275 276 277 278
    def new_oid(self):
        """Get a new OID."""
        self._oid_lock_acquire()
        try:
            if len(self.new_oid_list) == 0:
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
                # Get new oid list from master node
                # we manage a list of oid here to prevent
                # from asking too many time new oid one by one
                # from master node
                conn = self.master_conn
                conn.lock()
                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askNewOIDs(msg_id, 25)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                finally:
                    conn.unlock()

                self._waitMessage(conn, msg_id)
                if len(self.new_oid_list) <= 0:
                    raise NEOStorageError('new_oid failed')
Aurel's avatar
Aurel committed
298
            return self.new_oid_list.pop()
299 300 301
        finally:
            self._oid_lock_release()

302

Aurel's avatar
Aurel committed
303 304 305 306
    def getSerial(self, oid):
        # Try in cache first
        self._cache_lock_acquire()
        try:
307 308
            if oid in self.mq_cache:
                return self.mq_cache[oid][0]
Aurel's avatar
Aurel committed
309 310 311
        finally:
            self._cache_lock_release()
        # history return serial, so use it
312
        hist = self.history(oid, length = 1, object_only = 1)
Aurel's avatar
Aurel committed
313 314
        if len(hist) == 0:
            raise NEOStorageNotFoundError()
315 316 317 318
        if hist[0] != oid:
            raise NEOStorageError('getSerial failed')
        return hist[1][0][0]

Aurel's avatar
Aurel committed
319

320
    def _load(self, oid, serial = INVALID_TID, tid = INVALID_TID, cache = 0):
Aurel's avatar
Aurel committed
321
        """Internal method which manage load ,loadSerial and loadBefore."""
322
        partition_id = u64(oid) % self.num_partitions
323

Yoshinori Okuji's avatar
Yoshinori Okuji committed
324 325 326 327 328
        self.local_var.asked_object = None
        while self.local_var.asked_object is None:
            cell_list = self.pt.getCellList(partition_id, True)
            if len(cell_list) == 0:
                sleep(1)
329
                continue
330

Yoshinori Okuji's avatar
Yoshinori Okuji committed
331 332 333 334 335 336 337 338
            shuffle(cell_list)
            self.local_var.asked_object = None
            for cell in cell_list:
                logging.debug('trying to load %s from %s',
                              dump(oid), dump(cell.getUUID()))
                conn = self.cp.getConnForNode(cell)
                if conn is None:
                    continue
339

Yoshinori Okuji's avatar
Yoshinori Okuji committed
340 341 342 343 344 345 346 347 348 349
                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askObject(msg_id, oid, serial, tid)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                    self.local_var.asked_object = 0
                finally:
                    conn.unlock()
350

Yoshinori Okuji's avatar
Yoshinori Okuji committed
351 352 353 354 355 356 357
                self._waitMessage(conn, msg_id)
                if self.local_var.asked_object == -1:
                    # OID not found
                    break

                # Check data
                noid, start_serial, end_serial, compression, checksum, data \
358
                    = self.local_var.asked_object
Yoshinori Okuji's avatar
Yoshinori Okuji committed
359 360 361 362 363 364 365 366 367 368 369 370 371
                if noid != oid:
                    # Oops, try with next node
                    logging.error('got wrong oid %s instead of %s from node %s',
                                  noid, oid, cell.getServer())
                    continue
                elif checksum != makeChecksum(data):
                    # Check checksum.
                    logging.error('wrong checksum from node %s for oid %s',
                                  cell.getServer(), oid)
                    continue
                else:
                    # Everything looks alright.
                    break
372

Aurel's avatar
Aurel committed
373
        if self.local_var.asked_object == -1:
374
            # We didn't got any object from all storage node
375
            logging.debug('oid %s not found', dump(oid))
376
            raise NEOStorageNotFoundError()
377

378
        # Uncompress data
Aurel's avatar
Aurel committed
379
        if compression:
380
            data = decompress(data)
381

Aurel's avatar
Aurel committed
382 383
        # Put in cache only when using load
        if cache:
Aurel's avatar
Aurel committed
384
            self._cache_lock_acquire()
Aurel's avatar
Aurel committed
385
            try:
386
                self.mq_cache[oid] = start_serial, data
Aurel's avatar
Aurel committed
387
            finally:
Aurel's avatar
Aurel committed
388
                self._cache_lock_release()
Aurel's avatar
Aurel committed
389 390
        if end_serial == INVALID_SERIAL:
            end_serial = None
Aurel's avatar
Aurel committed
391
        return loads(data), start_serial, end_serial
392

393

394 395 396
    def load(self, oid, version=None):
        """Load an object for a given oid."""
        # First try from cache
397
        self._load_lock_acquire()
398
        try:
399 400 401 402 403 404 405 406 407
            self._cache_lock_acquire()
            try:
                if oid in self.mq_cache:
                    logging.debug('load oid %s is cached', dump(oid))
                    return loads(self.mq_cache[oid][1]), self.mq_cache[oid][0]
            finally:
                self._cache_lock_release()
            # Otherwise get it from storage node
            return self._load(oid, cache=1)[:2]
408
        finally:
409
            self._load_lock_release()
Aurel's avatar
Aurel committed
410

411

412
    def loadSerial(self, oid, serial):
Aurel's avatar
Aurel committed
413
        """Load an object for a given oid and serial."""
414 415
        # Do not try in cache as it manages only up-to-date object
        logging.debug('loading %s at %s', dump(oid), dump(serial))
416
        return self._load(oid, serial=serial)[0]
Aurel's avatar
Aurel committed
417

418

419
    def loadBefore(self, oid, tid):
Aurel's avatar
Aurel committed
420
        """Load an object for a given oid before tid committed."""
421
        # Do not try in cache as it manages only up-to-date object
422 423
        if tid is None:
            tid = INVALID_TID
424
        logging.debug('loading %s before %s', dump(oid), dump(tid))
425
        data, start, end = self._load(oid, tid=tid)
426 427 428 429 430
        if end is None:
            # No previous version
            return None
        else:
            return data, start, end
Aurel's avatar
Aurel committed
431

432

433 434 435 436
    def tpc_begin(self, transaction, tid=None, status=' '):
        """Begin a new transaction."""
        # First get a transaction, only one is allowed at a time
        if self.txn == transaction:
437
            # We already begin the same transaction
438 439 440 441 442
            return
        # Get a new transaction id if necessary
        if tid is None:
            self.tid = None
            conn = self.master_conn
443
            if conn is None:
Aurel's avatar
Aurel committed
444
                raise NEOStorageError("Connection to master node failed")
445 446 447 448 449 450 451 452 453 454
            conn.lock()
            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.askNewTID(msg_id)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
            finally:
                conn.unlock()
Aurel's avatar
Aurel committed
455
            # Wait for answer
456
            self._waitMessage(conn, msg_id)
457 458
            if self.tid is None:
                raise NEOStorageError('tpc_begin failed')
459 460
        else:
            self.tid = tid
461
        self.txn = transaction            
462

463

464 465 466
    def store(self, oid, serial, data, version, transaction):
        """Store object."""
        if transaction is not self.txn:
467
            raise StorageTransactionError(self, transaction)
468 469
        if serial is None:
            serial = INVALID_SERIAL
Aurel's avatar
Aurel committed
470
        logging.debug('storing oid %s serial %s',
471
                     dump(oid), dump(serial))
472
        # Find which storage node to use
473
        partition_id = u64(oid) % self.num_partitions
Aurel's avatar
Aurel committed
474
        cell_list = self.pt.getCellList(partition_id, True)
475
        if len(cell_list) == 0:
Aurel's avatar
Aurel committed
476 477
            # FIXME must wait for cluster to be ready
            raise NEOStorageError
478
        # Store data on each node
Aurel's avatar
Aurel committed
479 480
        ddata = dumps(data)
        compressed_data = compress(ddata)
481
        checksum = makeChecksum(compressed_data)
482
        for cell in cell_list:
Aurel's avatar
Aurel committed
483
            logging.info("storing object %s %s" %(cell.getServer(),cell.getState()))
484
            conn = self.cp.getConnForNode(cell)
485 486
            if conn is None:
                continue
487 488 489 490

            try:
                msg_id = conn.getNextId()
                p = Packet()
Aurel's avatar
Aurel committed
491
                p.askStoreObject(msg_id, oid, serial, 1,
492 493 494 495 496 497 498
                                 checksum, compressed_data, self.tid)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.txn_object_stored = 0
            finally:
                conn.unlock()
499 500

            # Check we don't get any conflict
501
            self._waitMessage(conn, msg_id)
Aurel's avatar
Aurel committed
502
            if self.txn_object_stored[0] == -1:
503
                if self.txn_data_dict.has_key(oid):
Aurel's avatar
Aurel committed
504 505 506 507
                    # One storage already accept the object, is it normal ??
                    # remove from dict and raise ConflictError, don't care of
                    # previous node which already store data as it would be resent
                    # again if conflict is resolved or txn will be aborted
508
                    del self.txn_data_dict[oid]
Aurel's avatar
Aurel committed
509
                self.conflict_serial = self.txn_object_stored[1]
510
                raise NEOStorageConflictError
511 512

        # Store object in tmp cache
Aurel's avatar
Aurel committed
513
        noid, nserial = self.txn_object_stored
514 515
        self.txn_data_dict[oid] = ddata

516 517
        return self.tid

Aurel's avatar
Aurel committed
518

519 520 521
    def tpc_vote(self, transaction):
        """Store current transaction."""
        if transaction is not self.txn:
522
            raise StorageTransactionError(self, transaction)
523 524
        user = transaction.user
        desc = transaction.description
Aurel's avatar
Aurel committed
525
        ext = dumps(transaction._extension)
526
        oid_list = self.txn_data_dict.keys()
527
        # Store data on each node
528
        partition_id = u64(self.tid) % self.num_partitions
Aurel's avatar
Aurel committed
529
        cell_list = self.pt.getCellList(partition_id, True)
530
        for cell in cell_list:
Aurel's avatar
Aurel committed
531
            logging.info("voting object %s %s" %(cell.getServer(), cell.getState()))
532
            conn = self.cp.getConnForNode(cell)
533 534
            if conn is None:
                continue
535 536 537 538

            try:
                msg_id = conn.getNextId()
                p = Packet()
Aurel's avatar
Aurel committed
539
                p.askStoreTransaction(msg_id, self.tid, user, desc, ext,
540 541 542 543 544 545 546 547 548
                                      oid_list)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.txn_voted == 0
            finally:
                conn.unlock()

            self._waitMessage(conn, msg_id)
549 550 551
            if self.txn_voted != 1:
                raise NEOStorageError('tpc_vote failed')

552
    def _clear_txn(self):
553 554
        """Clear some transaction parameters."""
        self.tid = None
555
        self.txn = None
556
        self.txn_data_dict.clear()
557 558 559
        self.txn_voted = 0
        self.txn_finished = 0

560 561 562 563
    def tpc_abort(self, transaction):
        """Abort current transaction."""
        if transaction is not self.txn:
            return
Aurel's avatar
Aurel committed
564 565

        # Abort txn in node where objects were stored
566
        aborted_node_set = set()
Aurel's avatar
Aurel committed
567
        for oid in self.txn_data_dict.iterkeys():
568
            partition_id = u64(oid) % self.num_partitions
Aurel's avatar
Aurel committed
569
            cell_list = self.pt.getCellList(partition_id, True)
570 571 572
            for cell in cell_list:
                if cell.getNode() not in aborted_node_set:
                    conn = self.cp.getConnForNode(cell)
573 574
                    if conn is None:
                        continue
575 576 577 578 579 580 581 582 583 584

                    try:
                        msg_id = conn.getNextId()
                        p = Packet()
                        p.abortTransaction(msg_id, self.tid)
                        conn.addPacket(p)
                    finally:
                        conn.unlock()

                    aborted_node_set.add(cell.getNode())
Aurel's avatar
Aurel committed
585 586

        # Abort in nodes where transaction was stored
587
        partition_id = u64(self.tid) % self.num_partitions
Aurel's avatar
Aurel committed
588
        cell_list = self.pt.getCellList(partition_id, True)
589 590 591
        for cell in cell_list:
            if cell.getNode() not in aborted_node_set:
                conn = self.cp.getConnForNode(cell)
Aurel's avatar
Aurel committed
592 593 594
                if conn is None:
                    continue

595 596 597 598 599 600 601
                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.abortTransaction(msg_id, self.tid)
                    conn.addPacket(p)
                finally:
                    conn.unlock()
Aurel's avatar
Aurel committed
602

603 604
                aborted_node_set.add(cell.getNode())

605 606 607 608
        # Abort the transaction in the primary master node.
        conn = self.master_conn
        conn.lock()
        try:
Aurel's avatar
Aurel committed
609
            conn.addPacket(Packet().abortTransaction(conn.getNextId(), self.tid))
610 611 612
        finally:
            conn.unlock()

613
        self._clear_txn()
614

615 616 617 618
    def tpc_finish(self, transaction, f=None):
        """Finish current transaction."""
        if self.txn is not transaction:
            return
619
        self._load_lock_acquire()
620
        try:
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637
            # Call function given by ZODB
            if f is not None:
                f(self.tid)

            # Call finish on master
            oid_list = self.txn_data_dict.keys()
            conn = self.master_conn
            conn.lock()
            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.finishTransaction(msg_id, oid_list, self.tid)
                conn.addPacket(p)
                conn.expectMessage(msg_id, additional_timeout = 300)
                self.dispatcher.register(conn, msg_id, self.getQueue())
            finally:
                conn.unlock()
638

639 640 641 642
            # Wait for answer
            self._waitMessage(conn, msg_id)
            if self.txn_finished != 1:
                raise NEOStorageError('tpc_finish failed')
Aurel's avatar
Aurel committed
643

644 645 646 647 648 649 650 651 652 653 654
            # Update cache
            self._cache_lock_acquire()
            try:
                for oid in self.txn_data_dict.iterkeys():
                    ddata = self.txn_data_dict[oid]
                    # Now serial is same as tid
                    self.mq_cache[oid] = self.tid, ddata
            finally:
                self._cache_lock_release()
            self._clear_txn()
            return self.tid
655
        finally:
656
            self._load_lock_release()
657

658

Aurel's avatar
Aurel committed
659
    def undo(self, transaction_id, txn, wrapper):
660
        if txn is not self.txn:
661
            raise StorageTransactionError(self, transaction_id)
662

663
        # First get transaction information from a storage node.
664
        partition_id = u64(transaction_id) % self.num_partitions
665 666 667 668
        cell_list = self.pt.getCellList(partition_id, True)
        shuffle(cell_list)
        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
669 670
            if conn is None:
                continue
671 672 673 674 675 676 677 678 679 680 681 682

            try:
                msg_id = conn.getNextId()
                p = Packet()
                p.askTransactionInformation(msg_id, transaction_id)
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.local_var.txn_info = 0
            finally:
                conn.unlock()

683
            # Wait for answer
684
            self._waitMessage(conn, msg_id)
685 686 687
            if self.local_var.txn_info == -1:
                # Tid not found, try with next node
                continue
688
            elif isinstance(self.local_var.txn_info, dict):
689 690 691 692 693 694 695 696
                break
            else:
                raise NEOStorageError('undo failed')

        if self.local_var.txn_info == -1:
            raise NEOStorageError('undo failed')

        oid_list = self.local_var.txn_info['oids']
Aurel's avatar
Aurel committed
697 698 699
        # Second get object data from storage node using loadBefore
        data_dict = {}
        for oid in oid_list:
Aurel's avatar
Aurel committed
700 701 702 703 704 705 706 707 708 709
            try:
                data, start, end = self.loadBefore(oid, transaction_id)
            except NEOStorageNotFoundError:
                # Object created by transaction, so no previous record
                data_dict[oid] = None
                continue
            # end must be TID we are going to undone otherwise it means
            # a later transaction modify the object
            if end != transaction_id:
                raise UndoError("non-undoable transaction")
Aurel's avatar
Aurel committed
710
            data_dict[oid] = data
Aurel's avatar
Aurel committed
711

712
        # Third do transaction with old data
713 714
        oid_list = data_dict.keys()
        for oid in oid_list:
Aurel's avatar
Aurel committed
715
            data = data_dict[oid]
Aurel's avatar
Aurel committed
716
            try:
717
                self.store(oid, transaction_id, data, None, txn)
Aurel's avatar
Aurel committed
718 719
            except NEOStorageConflictError, serial:
                if serial <= self.tid:
720
                    new_data = wrapper.tryToResolveConflict(oid, self.tid,
721
                                                            serial, data)
Aurel's avatar
Aurel committed
722 723 724
                    if new_data is not None:
                        self.store(oid, self.tid, new_data, None, txn)
                        continue
725 726
                raise ConflictError(oid = oid, serials = (self.tid, serial),
                                    data = data)
727
        return self.tid, oid_list
728

729
    def undoLog(self, first, last, filter=None, block=0):
730 731 732 733
        if last < 0:
            # See FileStorage.py for explanation
            last = first - last

734
        # First get a list of transactions from all storage nodes.
735 736 737 738
        storage_node_list = [x for x in self.pt.getNodeList() if x.getState() \
                             in (UP_TO_DATE_STATE, FEEDING_STATE)]
        self.local_var.node_tids = {}
        for storage_node in storage_node_list:
739
            conn = self.cp.getConnForNode(storage_node)
740 741
            if conn is None:
                continue
742

743 744 745
            try:
                msg_id = conn.getNextId()
                p = Packet()
746
                p.askTIDs(msg_id, first, last, INVALID_PARTITION)
747 748 749 750 751 752
                conn.addPacket(p)
            finally:
                conn.unlock()

        # Wait for answers from all storages.
        # FIXME this is a busy loop.
753 754
        while True:
            self._waitMessage()
Aurel's avatar
Aurel committed
755
            if len(self.local_var.node_tids.keys()) == len(storage_node_list):
756 757 758 759 760
                break

        # Reorder tids
        ordered_tids = []
        for tids in self.local_var.node_tids.values():
Aurel's avatar
Aurel committed
761
            ordered_tids.extend(tids)
762 763
        # XXX do we need a special cmp function here ?
        ordered_tids.sort(reverse=True)
Aurel's avatar
Aurel committed
764
        logging.info("UndoLog, tids %s", ordered_tids)
765 766 767
        # For each transaction, get info
        undo_info = []
        for tid in ordered_tids:
768
            partition_id = u64(tid) % self.num_partitions
769 770 771 772
            cell_list = self.pt.getCellList(partition_id, True)
            shuffle(cell_list)
            for cell in cell_list:
                conn = self.cp.getConnForNode(storage_node)
773 774
                if conn is None:
                    continue
775 776 777 778 779 780 781 782 783 784 785 786

                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askTransactionInformation(msg_id, tid)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                    self.local_var.txn_info = 0
                finally:
                    conn.unlock()

787
                # Wait for answer
788
                self._waitMessage(conn, msg_id)
789 790 791
                if self.local_var.txn_info == -1:
                    # TID not found, go on with next node
                    continue
792
                elif isinstance(self.local_var.txn_info, dict):
793
                    break
794

Aurel's avatar
Aurel committed
795 796 797 798
            if self.local_var.txn_info == -1:
                # TID not found at all
                continue

799 800 801
            # Filter result if needed
            if filter is not None:
                # Filter method return True if match
802
                if not filter(self.local_var.txn_info):
803 804 805 806 807
                    continue

            # Append to returned list
            self.local_var.txn_info.pop("oids")
            undo_info.append(self.local_var.txn_info)
808
            if len(undo_info) >= last - first:
809
                break
810 811 812 813
        # Check we return at least one element, otherwise call
        # again but extend offset
        if len(undo_info) == 0 and not block:
            undo_info = self.undoLog(first=first, last=last*5, filter=filter, block=1)
814 815
        return undo_info

Aurel's avatar
Aurel committed
816

Aurel's avatar
Aurel committed
817
    def history(self, oid, version, length=1, filter=None, object_only=0):
818
        # Get history informations for object first
819
        partition_id = u64(oid) % self.num_partitions
820 821 822 823 824
        cell_list = self.pt.getCellList(partition_id, True)
        shuffle(cell_list)

        for cell in cell_list:
            conn = self.cp.getConnForNode(cell)
825 826
            if conn is None:
                continue
827 828 829 830

            try:
                msg_id = conn.getNextId()
                p = Packet()
831
                p.askObjectHistory(msg_id, oid, 0, length)
832 833 834 835 836 837 838 839
                conn.addPacket(p)
                conn.expectMessage(msg_id)
                self.dispatcher.register(conn, msg_id, self.getQueue())
                self.local_var.history = None
            finally:
                conn.unlock()

            self._waitMessage(conn, msg_id)
840 841 842 843 844 845
            if self.local_var.history == -1:
                # Not found, go on with next node
                continue
            if self.local_var.history[0] != oid:
                # Got history for wrong oid
                continue
846 847

        if not isinstance(self.local_var.history, dict):
848 849 850 851 852 853
            raise NEOStorageError('history failed')
        if object_only:
            # Use by getSerial
            return self.local_var.history

        # Now that we have object informations, get txn informations
Aurel's avatar
Aurel committed
854
        history_list = []
855
        for serial, size in self.local_var.hisory[1]:
856
            partition_id = u64(serial) % self.num_partitions
857 858 859 860 861
            cell_list = self.pt.getCellList(partition_id, True)
            shuffle(cell_list)

            for cell in cell_list:
                conn = self.cp.getConnForNode(cell)
862 863
                if conn is None:
                    continue
864 865 866 867 868 869 870 871 872 873 874 875

                try:
                    msg_id = conn.getNextId()
                    p = Packet()
                    p.askTransactionInformation(msg_id, serial)
                    conn.addPacket(p)
                    conn.expectMessage(msg_id)
                    self.dispatcher.register(conn, msg_id, self.getQueue())
                    self.local_var.txn_info = None
                finally:
                    conn.unlock()

876
                # Wait for answer
877
                self._waitMessage(conn, msg_id)
878 879
                if self.local_var.txn_info == -1:
                    # TID not found
Aurel's avatar
Aurel committed
880
                    continue
881
                if isinstance(self.local_var.txn_info, dict):
882 883 884 885 886 887 888 889 890 891 892
                    break

            # create history dict
            self.txn_info.remove('id')
            self.txn_info.remove('oids')
            self.txn_info['serial'] = serial
            self.txn_info['version'] = None
            self.txn_info['size'] = size
            history_list.append(self.txn_info)

        return history_list
Aurel's avatar
Aurel committed
893

894 895 896 897 898 899 900
    def __del__(self):
        """Clear all connection."""
        # Due to bug in ZODB, close is not always called when shutting
        # down zope, so use __del__ to close connections
        for conn in self.em.getConnectionList():
            conn.close()
    close = __del__
901 902 903

    def sync(self):
        self._waitMessage()