testClientApp.py 40.6 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2009-2010  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18

import unittest
19
from cPickle import dumps
20
from mock import Mock, ReturnValues
21
from ZODB.POSException import StorageTransactionError, UndoError, ConflictError
22
from neo.tests import NeoTestBase
23
from neo.client.app import Application
24
from neo.client.exception import NEOStorageError, NEOStorageNotFoundError
25
from neo.protocol import Packet, Packets, Errors, INVALID_TID, INVALID_SERIAL
26
from neo.util import makeChecksum
27

28
def _getMasterConnection(self):
29
    if self.master_conn is None:
30 31 32 33
        self.uuid = 'C' * 16
        self.num_partitions = 10
        self.num_replicas = 1
        self.pt = Mock({
34 35
            'getCellListForOID': (),
            'getCellListForTID': (),
36
        })
37 38
        self.master_conn = Mock()
    return self.master_conn
39

40 41 42 43 44
def _getPartitionTable(self):
    if self.pt is None:
        self.master_conn = _getMasterConnection(self)
    return self.pt

45 46
def _waitMessage(self, conn, msg_id, handler=None):
    if handler is None:
47
        raise NotImplementedError
48 49
    else:
        handler.dispatch(conn, conn.fakeReceived())
50

51
def resolving_tryToResolveConflict(oid, conflict_serial, serial, data):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
52
    return data
53 54

def failing_tryToResolveConflict(oid, conflict_serial, serial, data):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
55
    return None
56

57
class ClientApplicationTests(NeoTestBase):
58

59
    def setUp(self):
60 61 62
        # apply monkey patches
        self._getMasterConnection = Application._getMasterConnection
        self._waitMessage = Application._waitMessage
63
        self._getPartitionTable = Application._getPartitionTable
64 65
        Application._getMasterConnection = _getMasterConnection
        Application._waitMessage = _waitMessage
66
        Application._getPartitionTable = _getPartitionTable
67 68 69 70 71

    def tearDown(self):
        # restore environnement
        Application._getMasterConnection = self._getMasterConnection
        Application._waitMessage = self._waitMessage
72
        Application._getPartitionTable = self._getPartitionTable
73

74 75
    # some helpers

76 77 78 79
    def checkAskPacket(self, conn, packet_type, decode=False):
        calls = conn.mockGetNamedCalls('ask')
        self.assertEquals(len(calls), 1)
        # client connection got queue as first parameter
80
        packet = calls[0].getParam(0)
81
        self.assertTrue(isinstance(packet, Packet))
82 83
        self.assertEquals(packet.getType(), packet_type)
        if decode:
84
            return packet.decode()
85 86
        return packet

87 88 89
    def getApp(self, master_nodes='127.0.0.1:10010', name='test',
               connector='SocketConnector', **kw):
        app = Application(master_nodes, name, connector, **kw)
90
        app.dispatcher = Mock({ })
91
        return app
92

93 94 95 96 97 98 99 100
    def makeOID(self, value=None):
        from random import randint
        if value is None:
            value = randint(0, 255)
        return '\00' * 7 + chr(value)
    makeTID = makeOID

    def makeTransactionObject(self, user='u', description='d', _extension='e'):
101 102
        class Transaction(object):
            pass
103 104 105 106 107 108
        txn = Transaction()
        txn.user = user
        txn.description = description
        txn._extension = _extension
        return txn

109
    def beginTransaction(self, app, tid):
110
        packet = Packets.AnswerBeginTransaction(tid=tid)
111 112
        packet.setId(0)
        app.master_conn = Mock({ 'fakeReceived': packet, })
113 114 115 116 117
        txn = self.makeTransactionObject()
        app.tpc_begin(txn, tid=tid)
        return txn

    def storeObject(self, app, oid=None, data='DATA'):
118
        tid = app.local_var.tid
119 120
        if oid is None:
            oid = self.makeOID()
121
        obj = (oid, tid, 'DATA', '', app.local_var.txn)
122
        packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
123
        packet.setId(0)
124
        conn = Mock({ 'getNextId': 1, 'fakeReceived': packet, })
125
        cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
126
        app.cp = Mock({ 'getConnForCell': conn})
127
        app.pt = Mock({ 'getCellListForOID': (cell, cell, ) })
128 129 130
        return oid

    def voteTransaction(self, app):
131 132
        tid = app.local_var.tid
        txn = app.local_var.txn
133
        packet = Packets.AnswerStoreTransaction(tid=tid)
134
        packet.setId(0)
135
        conn = Mock({ 'getNextId': 1, 'fakeReceived': packet, })
136
        cell = Mock({ 'getAddress': 'FakeServer', 'getState': 'FakeState', })
137
        app.pt = Mock({ 'getCellListForTID': (cell, cell, ) })
138
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
139
        app.tpc_vote(txn, resolving_tryToResolveConflict)
140

141
    def askFinishTransaction(self, app):
142 143
        txn = app.local_var.txn
        tid = app.local_var.tid
144
        packet = Packets.AnswerTransactionFinished(tid)
145
        packet.setId(0)
146
        app.master_conn = Mock({
147 148
            'getNextId': 1,
            'getAddress': ('127.0.0.1', 10010),
149
            'fakeReceived': packet,
150 151 152 153 154
        })
        app.tpc_finish(txn)

    # common checks

155
    def checkDispatcherRegisterCalled(self, app, conn):
156
        calls = app.dispatcher.mockGetNamedCalls('register')
157
        #self.assertEquals(len(calls), 1)
158
        #self.assertEquals(calls[0].getParam(0), conn)
159
        #self.assertTrue(isinstance(calls[0].getParam(2), Queue))
160 161 162 163 164

    def test_getQueue(self):
        app = self.getApp()
        # Test sanity check
        self.assertTrue(getattr(app, 'local_var', None) is not None)
165
        # Test that queue is created
166
        self.assertTrue(getattr(app.local_var, 'queue', None) is not None)
167 168 169 170 171 172 173 174 175 176 177

    def test_registerDB(self):
        app = self.getApp()
        dummy_db = []
        app.registerDB(dummy_db, None)
        self.assertTrue(app.getDB() is dummy_db)

    def test_new_oid(self):
        app = self.getApp()
        test_msg_id = 50
        test_oid_list = ['\x00\x00\x00\x00\x00\x00\x00\x01', '\x00\x00\x00\x00\x00\x00\x00\x02']
178
        response_packet = Packets.AnswerNewOIDs(test_oid_list[:])
179
        response_packet.setId(0)
180
        app.master_conn = Mock({'getNextId': test_msg_id, '_addPacket': None,
181 182 183 184 185 186 187 188 189 190 191
                                'expectMessage': None, 'lock': None,
                                'unlock': None,
                                # Test-specific method
                                'fakeReceived': response_packet})
        new_oid = app.new_oid()
        self.assertTrue(new_oid in test_oid_list)
        self.assertEqual(len(app.new_oid_list), 1)
        self.assertTrue(app.new_oid_list[0] in test_oid_list)
        self.assertNotEqual(app.new_oid_list[0], new_oid)

    def test_getSerial(self):
192 193 194 195
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid = self.makeTID()
196
        # cache cleared
197
        self.assertTrue(oid not in mq)
198
        app.pt = Mock({ 'getCellListForOID': (), })
199
        app.local_var.history = (oid, [(tid, 0)])
200 201 202 203 204 205 206
        # If object len is 0, this object doesn't exist anymore because its
        # creation has been undone.
        self.assertRaises(KeyError, app.getSerial, oid)
        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1)
        # Otherwise, result from ZODB
        app.pt = Mock({ 'getCellListForOID': (), })
        app.local_var.history = (oid, [(tid, 1)])
207
        self.assertEquals(app.getSerial(oid), tid)
208
        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 1)
209
        # fill the cache -> hit
210
        mq.store(oid, (tid, ' '))
211
        self.assertTrue(oid in mq)
212
        app.pt = Mock({ 'getCellListForOID': (), })
213 214
        app.getSerial(oid)
        self.assertEquals(app.getSerial(oid), tid)
215
        self.assertEquals(len(app.pt.mockGetNamedCalls('getCellListForOID')), 0)
216

217
    def test_load(self):
218 219 220 221 222
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid1 = self.makeTID(1)
        tid2 = self.makeTID(2)
223
        an_object = (1, oid, tid1, tid2, 0, makeChecksum('OBJ'), 'OBJ', None)
224 225
        # connection to SN close
        self.assertTrue(oid not in mq)
226
        packet = Errors.OidNotFound('')
227
        packet.setId(0)
228 229
        cell = Mock({ 'getUUID': '\x00' * 16})
        conn = Mock({'getUUID': '\x10' * 16,
230
                     'getAddress': ('127.0.0.1', 0),
231
                     'fakeReceived': packet,
232
                     })
233
        app.local_var.queue = Mock({'get' : (conn, None)})
234
        app.pt = Mock({ 'getCellListForOID': (cell, ), })
235
        app.cp = Mock({ 'getConnForCell' : conn})
236
        app.local_var.asked_object = -1
237
        Application._waitMessage = self._waitMessage
238 239 240
        self.assertRaises(NEOStorageNotFoundError, app.load, oid)
        self.checkAskObject(conn)
        Application._waitMessage = _waitMessage
241 242
        # object not found in NEO -> NEOStorageNotFoundError
        self.assertTrue(oid not in mq)
243
        packet = Errors.OidNotFound('')
244
        packet.setId(0)
245
        cell = Mock({ 'getUUID': '\x00' * 16})
246
        conn = Mock({
247
            'getAddress': ('127.0.0.1', 0),
248
            'fakeReceived': packet,
249
        })
250
        app.pt = Mock({ 'getCellListForOID': (cell, ), })
251
        app.cp = Mock({ 'getConnForCell' : conn})
252 253
        app.local_var.asked_object = -1
        self.assertRaises(NEOStorageNotFoundError, app.load, oid)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
254
        self.checkAskObject(conn)
255
        # object found on storage nodes and put in cache
256
        packet = Packets.AnswerObject(*an_object[1:])
257
        packet.setId(0)
258
        conn = Mock({
259
            'getAddress': ('127.0.0.1', 0),
260
            'fakeReceived': packet,
261
        })
262
        app.cp = Mock({ 'getConnForCell' : conn})
263
        app.local_var.asked_object = an_object[:-1]
264
        result = app.load(oid)
265
        self.assertEquals(result, ('OBJ', tid1))
Grégory Wisniewski's avatar
Grégory Wisniewski committed
266
        self.checkAskObject(conn)
267
        self.assertTrue(oid in mq)
268 269
        # object is now cached, try to reload it
        conn = Mock({
270
            'getAddress': ('127.0.0.1', 0),
271
        })
272
        app.cp = Mock({ 'getConnForCell' : conn})
273
        result = app.load(oid)
274
        self.assertEquals(result, ('OBJ', tid1))
275
        self.checkNoPacketSent(conn)
276

277
    def test_loadSerial(self):
278 279 280 281 282 283 284
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid1 = self.makeTID(1)
        tid2 = self.makeTID(2)
        # object not found in NEO -> NEOStorageNotFoundError
        self.assertTrue(oid not in mq)
285
        packet = Errors.OidNotFound('')
286
        packet.setId(0)
287
        cell = Mock({ 'getUUID': '\x00' * 16})
288
        conn = Mock({
289
            'getAddress': ('127.0.0.1', 0),
290
            'fakeReceived': packet,
291
        })
292
        app.pt = Mock({ 'getCellListForOID': (cell, ), })
293
        app.cp = Mock({ 'getConnForCell' : conn})
294 295
        app.local_var.asked_object = -1
        self.assertRaises(NEOStorageNotFoundError, app.loadSerial, oid, tid2)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
296
        self.checkAskObject(conn)
297 298
        # object should not have been cached
        self.assertFalse(oid in mq)
299
        # now a cached version ewxists but should not be hit
300 301
        mq.store(oid, (tid1, 'WRONG'))
        self.assertTrue(oid in mq)
302 303
        another_object = (1, oid, tid2, INVALID_SERIAL, 0,
            makeChecksum('RIGHT'), 'RIGHT', None)
304
        packet = Packets.AnswerObject(*another_object[1:])
305
        packet.setId(0)
306
        conn = Mock({
307
            'getAddress': ('127.0.0.1', 0),
308
            'fakeReceived': packet,
309
        })
310
        app.cp = Mock({ 'getConnForCell' : conn})
311
        app.local_var.asked_object = another_object[:-1]
312 313
        result = app.loadSerial(oid, tid1)
        self.assertEquals(result, 'RIGHT')
Grégory Wisniewski's avatar
Grégory Wisniewski committed
314
        self.checkAskObject(conn)
315
        self.assertTrue(oid in mq)
316 317

    def test_loadBefore(self):
318 319 320 321 322 323 324
        app = self.getApp()
        mq = app.mq_cache
        oid = self.makeOID()
        tid1 = self.makeTID(1)
        tid2 = self.makeTID(2)
        # object not found in NEO -> NEOStorageNotFoundError
        self.assertTrue(oid not in mq)
325
        packet = Errors.OidNotFound('')
326
        packet.setId(0)
327
        cell = Mock({ 'getUUID': '\x00' * 16})
328
        conn = Mock({
329
            'getAddress': ('127.0.0.1', 0),
330
            'fakeReceived': packet,
331
        })
332
        app.pt = Mock({ 'getCellListForOID': (cell, ), })
333
        app.cp = Mock({ 'getConnForCell' : conn})
334 335
        app.local_var.asked_object = -1
        self.assertRaises(NEOStorageNotFoundError, app.loadBefore, oid, tid2)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
336
        self.checkAskObject(conn)
337
        # no previous versions -> return None
338 339
        an_object = (1, oid, tid2, INVALID_SERIAL, 0, makeChecksum(''), '',
            None)
340
        packet = Packets.AnswerObject(*an_object[1:])
341
        packet.setId(0)
342
        conn = Mock({
343
            'getAddress': ('127.0.0.1', 0),
344
            'fakeReceived': packet,
345
        })
346
        app.cp = Mock({ 'getConnForCell' : conn})
347
        app.local_var.asked_object = an_object[:-1]
348 349 350 351
        result = app.loadBefore(oid, tid1)
        self.assertEquals(result, None)
        # object should not have been cached
        self.assertFalse(oid in mq)
352
        # as for loadSerial, the object is cached but should be loaded from db
353 354
        mq.store(oid, (tid1, 'WRONG'))
        self.assertTrue(oid in mq)
355 356
        another_object = (1, oid, tid1, tid2, 0, makeChecksum('RIGHT'),
            'RIGHT', None)
357
        packet = Packets.AnswerObject(*another_object[1:])
358
        packet.setId(0)
359
        conn = Mock({
360
            'getAddress': ('127.0.0.1', 0),
361
            'fakeReceived': packet,
362
        })
363
        app.cp = Mock({ 'getConnForCell' : conn})
364 365 366
        app.local_var.asked_object = another_object
        result = app.loadBefore(oid, tid1)
        self.assertEquals(result, ('RIGHT', tid1, tid2))
Grégory Wisniewski's avatar
Grégory Wisniewski committed
367
        self.checkAskObject(conn)
368
        self.assertTrue(oid in mq)
369 370

    def test_tpc_begin(self):
371 372 373
        app = self.getApp()
        tid = self.makeTID()
        txn = Mock()
374
        # first, tid is supplied
375 376
        self.assertNotEquals(getattr(app, 'tid', None), tid)
        self.assertNotEquals(getattr(app, 'txn', None), txn)
377
        packet = Packets.AnswerBeginTransaction(tid=tid)
378
        packet.setId(0)
379 380 381 382
        app.master_conn = Mock({
            'getNextId': 1,
            'fakeReceived': packet,
        })
383
        app.tpc_begin(transaction=txn, tid=tid)
384 385
        self.assertTrue(app.local_var.txn is txn)
        self.assertEquals(app.local_var.tid, tid)
386 387
        # next, the transaction already begin -> do nothing
        app.tpc_begin(transaction=txn, tid=None)
388 389
        self.assertTrue(app.local_var.txn is txn)
        self.assertEquals(app.local_var.tid, tid)
390
        # cancel and start a transaction without tid
391 392
        app.local_var.txn = None
        app.local_var.tid = None
393 394
        # no connection -> NEOStorageError (wait until connected to primary)
        #self.assertRaises(NEOStorageError, app.tpc_begin, transaction=txn, tid=None)
395
        # ask a tid to pmn
396
        packet = Packets.AnswerBeginTransaction(tid=tid)
397
        packet.setId(0)
398 399 400 401
        app.master_conn = Mock({
            'getNextId': 1,
            'fakeReceived': packet,
        })
402
        app.dispatcher = Mock({ })
403
        app.tpc_begin(transaction=txn, tid=None)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
404
        self.checkAskNewTid(app.master_conn)
405
        self.checkDispatcherRegisterCalled(app, app.master_conn)
406
        # check attributes
407 408
        self.assertTrue(app.local_var.txn is txn)
        self.assertEquals(app.local_var.tid, tid)
409 410 411 412 413 414 415

    def test_store1(self):
        app = self.getApp()
        oid = self.makeOID(11)
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        # invalid transaction > StorageTransactionError
416 417
        app.local_var.txn = old_txn = object()
        self.assertTrue(app.local_var.txn is not txn)
418
        self.assertRaises(StorageTransactionError, app.store, oid, tid, '',
Vincent Pelletier's avatar
Vincent Pelletier committed
419
            None, txn)
420
        self.assertEquals(app.local_var.txn, old_txn)
421
        # check partition_id and an empty cell list -> NEOStorageError
422 423
        app.local_var.txn = txn
        app.local_var.tid = tid
424
        app.pt = Mock({ 'getCellListForOID': (), })
425
        app.num_partitions = 2
426
        self.assertRaises(NEOStorageError, app.store, oid, tid, '',  None,
Vincent Pelletier's avatar
Vincent Pelletier committed
427
            txn)
428
        calls = app.pt.mockGetNamedCalls('getCellListForOID')
429
        self.assertEquals(len(calls), 1)
430
        self.assertEquals(calls[0].getParam(0), oid) # oid=11
431 432 433 434 435 436 437

    def test_store2(self):
        app = self.getApp()
        oid = self.makeOID(11)
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        # build conflicting state
438 439
        app.local_var.txn = txn
        app.local_var.tid = tid
440
        packet = Packets.AnswerStoreObject(conflicting=1, oid=oid, serial=tid)
441
        packet.setId(0)
442
        storage_address = ('127.0.0.1', 10020)
443
        conn = Mock({
444
            'getNextId': 1,
445
            'getAddress': storage_address,
446
            '__repr__': 'connection mock'
447 448
        })
        cell = Mock({
449
            'getAddress': 'FakeServer',
450 451
            'getState': 'FakeState',
        })
452
        app.pt = Mock({ 'getCellListForOID': (cell, cell, )})
453
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn)})
454 455 456 457 458
        class Dispatcher(object):
            def pending(self, queue): 
                return not queue.empty()
        app.dispatcher = Dispatcher()
        app.nm.createStorage(address=storage_address)
459 460
        app.local_var.object_stored = (oid, tid)
        app.local_var.data_dict[oid] = 'BEFORE'
Vincent Pelletier's avatar
Vincent Pelletier committed
461
        app.store(oid, tid, '', None, txn)
462 463 464
        app.local_var.queue.put((conn, packet))
        self.assertRaises(ConflictError, app.waitStoreResponses,
            failing_tryToResolveConflict)
465
        self.assertTrue(oid not in app.local_var.data_dict)
466
        self.assertEquals(app.local_var.conflict_serial_dict[oid], set([tid, ]))
467
        self.assertEquals(app.local_var.object_stored_counter_dict[oid], {})
Grégory Wisniewski's avatar
Grégory Wisniewski committed
468
        self.checkAskStoreObject(conn)
469 470 471 472 473 474 475

    def test_store3(self):
        app = self.getApp()
        oid = self.makeOID(11)
        tid = self.makeTID()
        txn = self.makeTransactionObject()
        # case with no conflict
476 477
        app.local_var.txn = txn
        app.local_var.tid = tid
478
        packet = Packets.AnswerStoreObject(conflicting=0, oid=oid, serial=tid)
479
        packet.setId(0)
480
        storage_address = ('127.0.0.1', 10020)
481
        conn = Mock({
482
            'getNextId': 1,
483
            'getAddress': storage_address,
484
        })
485
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn, ) })
486
        cell = Mock({
487
            'getAddress': 'FakeServer',
488 489
            'getState': 'FakeState',
        })
490
        app.pt = Mock({ 'getCellListForOID': (cell, cell, ) })
491 492 493 494 495
        class Dispatcher(object):
            def pending(self, queue): 
                return not queue.empty()
        app.dispatcher = Dispatcher()
        app.nm.createStorage(address=storage_address)
Vincent Pelletier's avatar
Vincent Pelletier committed
496
        app.store(oid, tid, 'DATA', None, txn)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
497
        self.checkAskStoreObject(conn)
498 499
        app.local_var.queue.put((conn, packet))
        app.waitStoreResponses(resolving_tryToResolveConflict)
500
        self.assertEquals(app.local_var.object_stored_counter_dict[oid], {tid: 1})
501 502
        self.assertEquals(app.local_var.data_dict.get(oid, None), 'DATA')
        self.assertFalse(oid in app.local_var.conflict_serial_dict)
503 504 505 506 507 508

    def test_tpc_vote1(self):
        app = self.getApp()
        oid = self.makeOID(11)
        txn = self.makeTransactionObject()
        # invalid transaction > StorageTransactionError
509 510
        app.local_var.txn = old_txn = object()
        self.assertTrue(app.local_var.txn is not txn)
511 512
        self.assertRaises(StorageTransactionError, app.tpc_vote, txn,
            resolving_tryToResolveConflict)
513
        self.assertEquals(app.local_var.txn, old_txn)
514 515 516 517 518 519

    def test_tpc_vote2(self):
        # fake transaction object
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
520 521
        app.local_var.txn = txn
        app.local_var.tid = tid
522
        # wrong answer -> failure
523
        packet = Packets.AnswerTIDs(())
524
        packet.setId(0)
525
        conn = Mock({
526
            'getNextId': 1,
527
            'fakeReceived': packet,
528
            'getAddress': ('127.0.0.1', 0),
529 530
        })
        cell = Mock({
531
            'getAddress': 'FakeServer',
532 533
            'getState': 'FakeState',
        })
534
        app.pt = Mock({ 'getCellListForTID': (cell, cell, ) })
535
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
536 537
        app.dispatcher = Mock()
        app.tpc_begin(txn, tid)
538 539
        self.assertRaises(NEOStorageError, app.tpc_vote, txn,
            resolving_tryToResolveConflict)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
540
        self.checkAskPacket(conn, Packets.AskStoreTransaction)
541 542 543 544 545

    def test_tpc_vote3(self):
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
546 547
        app.local_var.txn = txn
        app.local_var.tid = tid
548
        # response -> OK
549
        packet = Packets.AnswerStoreTransaction(tid=tid)
550
        packet.setId(0)
551
        conn = Mock({
552
            'getNextId': 1,
553
            'fakeReceived': packet,
554 555
        })
        cell = Mock({
556
            'getAddress': 'FakeServer',
557 558
            'getState': 'FakeState',
        })
559
        app.pt = Mock({ 'getCellListForTID': (cell, cell, ) })
560
        app.cp = Mock({ 'getConnForCell': ReturnValues(None, conn), })
561 562
        app.dispatcher = Mock()
        app.tpc_begin(txn, tid)
563
        app.tpc_vote(txn, resolving_tryToResolveConflict)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
564
        self.checkAskStoreTransaction(conn)
565
        self.checkDispatcherRegisterCalled(app, conn)
566 567 568 569 570 571

    def test_tpc_abort1(self):
        # ignore mismatch transaction
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
572
        app.local_var.txn = old_txn = object()
573
        app.master_conn = Mock()
574 575
        app.local_var.tid = tid
        self.assertFalse(app.local_var.txn is txn)
576 577
        conn = Mock()
        cell = Mock()
578
        app.pt = Mock({'getCellListForTID': (cell, cell)})
579
        app.cp = Mock({'getConnForCell': ReturnValues(None, cell)})
580 581 582 583
        app.tpc_abort(txn)
        # no packet sent
        self.checkNoPacketSent(conn)
        self.checkNoPacketSent(app.master_conn)
584 585
        self.assertEquals(app.local_var.txn, old_txn)
        self.assertEquals(app.local_var.tid, tid)
586 587 588 589 590 591 592 593 594

    def test_tpc_abort2(self):
        # 2 nodes : 1 transaction in the first, 2 objects in the second
        # connections to each node should received only one packet to abort
        # and transaction must also be aborted on the master node
        # for simplicity, just one cell per partition
        oid1, oid2 = self.makeOID(2), self.makeOID(4) # on partition 0
        app, tid = self.getApp(), self.makeTID(1)     # on partition 1
        txn = self.makeTransactionObject()
595
        app.local_var.txn, app.local_var.tid = txn, tid
596 597 598 599 600
        app.master_conn = Mock({'__hash__': 0})
        app.num_partitions = 2
        cell1 = Mock({ 'getNode': 'NODE1', '__hash__': 1 })
        cell2 = Mock({ 'getNode': 'NODE2', '__hash__': 2 })
        conn1, conn2 = Mock({ 'getNextId': 1, }), Mock({ 'getNextId': 2, })
601 602 603
        app.pt = Mock({
            'getCellListForOID': ReturnValues((cell1, ), (cell1, )),
            'getCellListForTID': (cell1, cell2),
604
        })
605
        app.cp = Mock({ 'getConnForNode': ReturnValues(conn1, conn2), })
606
        # fake data
607
        app.local_var.data_dict = {oid1: '', oid2: ''}
608 609
        app.tpc_abort(txn)
        # will check if there was just one call/packet :
610 611 612
        self.checkNotifyPacket(conn1, Packets.AbortTransaction)
        self.checkNotifyPacket(conn2, Packets.AbortTransaction)
        self.checkNotifyPacket(app.master_conn, Packets.AbortTransaction)
613 614 615 616 617
        self.assertEquals(app.local_var.tid, None)
        self.assertEquals(app.local_var.txn, None)
        self.assertEquals(app.local_var.data_dict, {})
        self.assertEquals(app.local_var.txn_voted, False)
        self.assertEquals(app.local_var.txn_finished, False)
618 619 620 621 622 623

    def test_tpc_finish1(self):
        # ignore mismatch transaction
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
624
        app.local_var.txn = old_txn = object()
625
        app.master_conn = Mock()
626
        self.assertFalse(app.local_var.txn is txn)
627 628
        conn = Mock()
        cell = Mock()
629
        app.pt = Mock({'getCellListForTID': (cell, cell)})
630
        app.cp = Mock({'getConnForCell': ReturnValues(None, cell)})
631 632 633 634
        app.tpc_finish(txn)
        # no packet sent
        self.checkNoPacketSent(conn)
        self.checkNoPacketSent(app.master_conn)
635
        self.assertEquals(app.local_var.txn, old_txn)
636 637 638 639 640 641

    def test_tpc_finish2(self):
        # bad answer -> NEOStorageError
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
642
        app.local_var.txn, app.local_var.tid = txn, tid
643 644 645
        # test callable passed to tpc_finish
        self.f_called = False
        self.f_called_with_tid = None
646
        def hook(tid):
647 648
            self.f_called = True
            self.f_called_with_tid = tid
649
        packet = Packets.AnswerBeginTransaction(INVALID_TID)
650
        packet.setId(0)
651
        app.master_conn = Mock({
652 653
            'getNextId': 1,
            'getAddress': ('127.0.0.1', 10000),
654
            'fakeReceived': packet,
655 656
        })
        app.dispatcher = Mock({})
657
        app.local_var.txn_finished = False
658 659 660
        self.assertRaises(NEOStorageError, app.tpc_finish, txn, hook)
        self.assertTrue(self.f_called)
        self.assertEquals(self.f_called_with_tid, tid)
661
        self.checkAskFinishTransaction(app.master_conn)
662
        self.checkDispatcherRegisterCalled(app, app.master_conn)
663 664

    def test_tpc_finish3(self):
665
        # transaction is finished
666 667 668
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
669
        app.local_var.txn, app.local_var.tid = txn, tid
670 671
        self.f_called = False
        self.f_called_with_tid = None
672
        def hook(tid):
673 674
            self.f_called = True
            self.f_called_with_tid = tid
675
        packet = Packets.AnswerTransactionFinished(tid)
676
        packet.setId(0)
677
        app.master_conn = Mock({
678 679
            'getNextId': 1,
            'getAddress': ('127.0.0.1', 10010),
680
            'fakeReceived': packet,
681 682
        })
        app.dispatcher = Mock({})
683
        app.local_var.txn_finished = True
684 685 686
        app.tpc_finish(txn, hook)
        self.assertTrue(self.f_called)
        self.assertEquals(self.f_called_with_tid, tid)
687
        self.checkAskFinishTransaction(app.master_conn)
688
        #self.checkDispatcherRegisterCalled(app, app.master_conn)
689 690 691 692 693
        self.assertEquals(app.local_var.tid, None)
        self.assertEquals(app.local_var.txn, None)
        self.assertEquals(app.local_var.data_dict, {})
        self.assertEquals(app.local_var.txn_voted, False)
        self.assertEquals(app.local_var.txn_finished, False)
694 695 696 697 698 699

    def test_undo1(self):
        # invalid transaction
        app = self.getApp()
        tid = self.makeTID()
        txn = self.makeTransactionObject()
700 701 702
        marker = []
        def tryToResolveConflict(oid, conflict_serial, serial, data):
            marker.append(1)
703
        app.local_var.txn = old_txn = object()
704
        app.master_conn = Mock()
705
        self.assertFalse(app.local_var.txn is txn)
706 707
        conn = Mock()
        cell = Mock()
708 709
        self.assertRaises(StorageTransactionError, app.undo, tid, txn,
            tryToResolveConflict)
710 711 712 713
        # no packet sent
        self.checkNoPacketSent(conn)
        self.checkNoPacketSent(app.master_conn)
        # nothing done
714
        self.assertEquals(marker, [])
715
        self.assertEquals(app.local_var.txn, old_txn)
716 717

    def test_undo2(self):
718
        # Three tests here :
719 720 721 722 723 724
        # undo txn2 where obj2 was modified in tid3 -> fail
        # undo txn3 where obj2 was altered from tid2 -> ok
        # txn4 is the transaction where the undo occurs
        app = self.getApp()
        app.num_partitions = 2
        oid1, oid2 = self.makeOID(1), self.makeOID(2)
725 726
        oid3 = self.makeOID(3)
        tid1, tid2 = self.makeTID(1), self.makeTID(2)
727 728 729 730 731
        tid3, tid4 = self.makeTID(3), self.makeTID(4)
        # commit version 1 of object 2
        txn2 = self.beginTransaction(app, tid=tid2)
        self.storeObject(app, oid=oid2, data='O1V2')
        self.voteTransaction(app)
732
        self.askFinishTransaction(app)
733 734 735 736
        # commit version 2 of object 2
        txn3 = self.beginTransaction(app, tid=tid3)
        self.storeObject(app, oid=oid2, data='O2V2')
        self.voteTransaction(app)
737
        self.askFinishTransaction(app)
738 739 740 741 742
        # undo 1 -> undoing non-last TID, and conflict resolution succeeded
        u1p1 = Packets.AnswerTransactionInformation(tid1, '', '', '',
                False, (oid2, ))
        u1p2 = Packets.AnswerUndoTransaction([], [oid2], [])
        # undo 2 -> undoing non-last TID, and conflict resolution failed
Grégory Wisniewski's avatar
Grégory Wisniewski committed
743 744
        u2p1 = Packets.AnswerTransactionInformation(tid2, '', '', '',
                False, (oid2, ))
745 746 747
        u2p2 = Packets.AnswerUndoTransaction([], [oid2], [])
        # undo 3 -> "live" conflict (another transaction modifying the object
        # we want to undo)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
748
        u3p1 = Packets.AnswerTransactionInformation(tid3, '', '', '',
749
                False, (oid3, ))
750 751
        u3p2 = Packets.AnswerUndoTransaction([], [], [oid3])
        # undo 4 -> undoing last tid
Grégory Wisniewski's avatar
Grégory Wisniewski committed
752
        u4p1 = Packets.AnswerTransactionInformation(tid3, '', '', '',
753
                False, (oid1, ))
754
        u4p2 = Packets.AnswerUndoTransaction([oid1], [], [])
755
        # test logic
756
        packets = (u1p1, u1p2, u2p1, u2p2, u3p1, u3p2, u4p1, u4p2)
757
        for i, p in enumerate(packets):
758
            p.setId(i)
759
        storage_address = ('127.0.0.1', 10010)
760 761
        conn = Mock({
            'getNextId': 1,
762
            'fakeReceived': ReturnValues(
763 764 765 766
                u1p1,
                u2p1,
                u4p1,
                u3p1,
767 768
            ),
            'getAddress': storage_address,
769
        })
770 771 772 773
        cell = Mock({
            'getAddress': 'FakeServer',
            'getState': 'FakeState',
        })
774
        app.pt = Mock({
775 776 777
            'getCellListForTID': (cell, ),
            'getCellListForOID': (cell, ),
        })
778 779 780
        app.cp = Mock({'getConnForCell': conn, 'getConnForNode': conn})
        def tryToResolveConflict(oid, conflict_serial, serial, data,
                committedData=''):
781
            marker.append(1)
782
            return resolution_result
783 784 785 786
        class Dispatcher(object):
            def pending(self, queue): 
                return not queue.empty()
        app.dispatcher = Dispatcher()
787 788 789 790 791
        def _load(oid, tid=None, serial=None):
            assert tid is not None
            assert serial is None, serial
            return ('dummy', oid, tid)
        app._load = _load
792
        app.nm.createStorage(address=storage_address)
793
        # all start here
794 795 796 797 798 799 800 801 802 803 804 805 806
        app.local_var.clear()
        txn4 = self.beginTransaction(app, tid=tid4)
        marker = []
        resolution_result = 'solved'
        app.local_var.queue.put((conn, u1p2))
        app.undo(tid1, txn4, tryToResolveConflict)
        self.assertEquals(marker, [1])

        app.local_var.clear()
        txn4 = self.beginTransaction(app, tid=tid4)
        marker = []
        resolution_result = None
        app.local_var.queue.put((conn, u2p2))
807 808
        self.assertRaises(UndoError, app.undo, tid2, txn4,
            tryToResolveConflict)
809 810 811 812 813 814 815
        self.assertEquals(marker, [1])

        app.local_var.clear()
        txn4 = self.beginTransaction(app, tid=tid4)
        marker = []
        resolution_result = None
        app.local_var.queue.put((conn, u4p2))
816
        self.assertEquals(app.undo(tid3, txn4, tryToResolveConflict),
817
            (tid4, [oid1, ]))
818 819 820 821 822 823 824
        self.assertEquals(marker, [])

        app.local_var.clear()
        txn4 = self.beginTransaction(app, tid=tid4)
        marker = []
        resolution_result = None
        app.local_var.queue.put((conn, u3p2))
825 826
        self.assertRaises(ConflictError, app.undo, tid3, txn4,
            tryToResolveConflict)
827 828
        self.assertEquals(marker, [])

829
        self.askFinishTransaction(app)
830 831

    def test_undoLog(self):
832 833 834 835 836 837 838 839 840 841
        app = self.getApp()
        app.num_partitions = 2
        uuid1, uuid2 = '\x00' * 15 + '\x01', '\x00' * 15 + '\x02'
        # two nodes, two partition, two transaction, two objects :
        node1, node2 = Mock({}), Mock({})
        cell1, cell2 = Mock({}), Mock({})
        tid1, tid2 = self.makeTID(1), self.makeTID(2)
        oid1, oid2 = self.makeOID(1), self.makeOID(2)
        # TIDs packets supplied by _waitMessage hook
        # TXN info packets
842 843 844 845 846
        extension = dumps({})
        p3 = Packets.AnswerTransactionInformation(tid1, '', '',
                extension, False, (oid1, ))
        p4 = Packets.AnswerTransactionInformation(tid2, '', '',
                extension, False, (oid2, ))
847 848
        p3.setId(0)
        p4.setId(1)
849 850 851 852 853 854 855 856 857
        conn = Mock({
            'getNextId': 1,
            'getUUID': ReturnValues(uuid1, uuid2),
            'fakeGetApp': app,
            'fakeReceived': ReturnValues(p3, p4),
            'getAddress': ('127.0.0.1', 10010),
        })
        app.pt = Mock({
            'getNodeList': (node1, node2, ),
858
            'getCellListForTID': ReturnValues([cell1], [cell2]),
859
        })
860
        app.cp = Mock({ 'getConnForCell': conn})
861
        def _waitAnyMessage(self):
862
            self.local_var.node_tids = {uuid1: (tid1, ), uuid2: (tid2, )}
863 864 865
            Application._waitAnyMessage = _waitAnyMessage_old
        _waitAnyMessage_old = Application._waitAnyMessage
        Application._waitAnyMessage = _waitAnyMessage
866 867 868 869 870
        def txn_filter(info):
            return info['id'] > '\x00' * 8
        result = app.undoLog(0, 4, filter=txn_filter)
        self.assertEquals(result[0]['id'], tid1)
        self.assertEquals(result[1]['id'], tid2)
871 872

    def test_history(self):
873 874 875 876 877
        app = self.getApp()
        oid = self.makeOID(1)
        tid1, tid2 = self.makeTID(1), self.makeTID(2)
        object_history = ( (tid1, 42), (tid2, 42),)
        # object history, first is a wrong oid, second is valid
878
        p2 = Packets.AnswerObjectHistory(oid, object_history)
879 880
        extension = dumps({'k': 'v'})
        # transaction history
881
        p3 = Packets.AnswerTransactionInformation(tid1, 'u', 'd',
882
                extension, False, (oid, ))
883
        p4 = Packets.AnswerTransactionInformation(tid2, 'u', 'd',
884
                extension, False, (oid, ))
885 886 887
        p2.setId(0)
        p3.setId(1)
        p4.setId(2)
888
        # faked environnement
889 890 891
        conn = Mock({
            'getNextId': 1,
            'fakeGetApp': app,
892
            'fakeReceived': ReturnValues(p2, p3, p4),
893 894
            'getAddress': ('127.0.0.1', 10010),
        })
895
        object_cells = [ Mock({}), ]
896 897
        history_cells = [ Mock({}), Mock({}) ]
        app.pt = Mock({
898 899
            'getCellListForOID': object_cells,
            'getCellListForTID': ReturnValues(history_cells, history_cells),
900
        })
901
        app.cp = Mock({ 'getConnForCell': conn})
902
        # start test here
903 904
        result = app.history(oid)
        self.assertEquals(len(result), 2)
905 906
        self.assertEquals(result[0]['tid'], tid1)
        self.assertEquals(result[1]['tid'], tid2)
907 908 909
        self.assertEquals(result[0]['size'], 42)
        self.assertEquals(result[1]['size'], 42)

Grégory Wisniewski's avatar
Grégory Wisniewski committed
910
    def _test_connectToPrimaryNode(self):
911 912 913 914
        # here we have three master nodes :
        # the connection to the first will fail
        # the second will have changed
        # the third will not be ready
915
        # after the third, the partition table will be operational
916
        # (as if it was connected to the primary master node)
917
        from neo.tests import DoNothingConnector
918
        # will raise IndexError at the third iteration
Grégory Wisniewski's avatar
Grégory Wisniewski committed
919
        app = self.getApp('127.0.0.1:10010 127.0.0.1:10011')
920 921 922
        # TODO: test more connection failure cases
        # Seventh packet : askNodeInformation succeeded
        all_passed = []
923
        def _waitMessage8(self, conn, msg_id, handler=None):
924 925
            all_passed.append(1)
        # Sixth packet : askPartitionTable succeeded
926
        def _waitMessage7(self, conn, msg_id, handler=None):
927
            app.pt = Mock({'operational': True})
928 929
            Application._waitMessage = _waitMessage8
        # fifth packet : request node identification succeeded
930
        def _waitMessage6(self, conn, msg_id, handler=None):
931
            conn.setUUID('D' * 16)
932
            app.uuid = 'C' * 16
933 934
            Application._waitMessage = _waitMessage7
        # fourth iteration : connection to primary master succeeded
935
        def _waitMessage5(self, conn, msg_id, handler=None):
936
            app.trying_master_node = app.primary_master_node = Mock({
937
                'getAddress': ('192.168.1.1', 10000),
938 939 940 941
                '__str__': 'Fake master node',
            })
            Application._waitMessage = _waitMessage6
        # third iteration : node not ready
942
        def _waitMessage4(app, conn, msg_id, handler=None):
943
            app.setNodeNotReady()
944 945
            app.trying_master_node = None
            Application._waitMessage = _waitMessage5
946
        # second iteration : master node changed
947
        def _waitMessage3(app, conn, msg_id, handler=None):
948
            app.primary_master_node = Mock({
949
                'getAddress': ('192.168.1.1', 10000),
950 951 952 953
                '__str__': 'Fake master node',
            })
            Application._waitMessage = _waitMessage4
        # first iteration : connection failed
954
        def _waitMessage2(app, conn, msg_id, handler=None):
955
            app.trying_master_node = None
956 957
            Application._waitMessage = _waitMessage3
        # do nothing for the first call
958
        def _waitMessage1(app, conn, msg_id, handler=None):
959 960 961 962 963 964
            Application._waitMessage = _waitMessage2
        _waitMessage_old = Application._waitMessage
        Application._waitMessage = _waitMessage1
        # faked environnement
        app.connector_handler = DoNothingConnector
        app.em = Mock({})
965
        app.pt = Mock({ 'operational': False})
966
        try:
967
            app.master_conn = app._connectToPrimaryNode()
968 969 970
            self.assertEqual(len(all_passed), 1)
            self.assertTrue(app.master_conn is not None)
            self.assertTrue(app.pt.operational())
971 972 973
        finally:
            Application._waitMessage = _waitMessage_old

974 975 976 977 978 979
    def test_askStorage(self):
        """ _askStorage is private but test it anyway """
        app = self.getApp('')
        app.dispatcher = Mock()
        conn = Mock()
        self.test_ok = False
980
        def _waitMessage_hook(app, conn, msg_id, handler=None):
981 982
            self.test_ok = True
        _waitMessage_old = Application._waitMessage
983
        packet = Packets.AskBeginTransaction(None)
984
        packet.setId(0)
985
        Application._waitMessage = _waitMessage_hook
986 987 988 989 990
        try:
            app._askStorage(conn, packet)
        finally:
            Application._waitMessage = _waitMessage_old
        # check packet sent, connection unlocked and dispatcher updated
Grégory Wisniewski's avatar
Grégory Wisniewski committed
991
        self.checkAskNewTid(conn)
992
        self.checkDispatcherRegisterCalled(app, conn)
993 994 995 996 997 998 999 1000 1001
        # and _waitMessage called
        self.assertTrue(self.test_ok)

    def test_askPrimary(self):
        """ _askPrimary is private but test it anyway """
        app = self.getApp('')
        app.dispatcher = Mock()
        conn = Mock()
        app.master_conn = conn
1002
        app.primary_handler = Mock()
1003
        self.test_ok = False
1004
        def _waitMessage_hook(app, conn, msg_id, handler=None):
1005
            self.assertTrue(handler is app.primary_handler)
1006 1007
            self.test_ok = True
        _waitMessage_old = Application._waitMessage
1008
        Application._waitMessage = _waitMessage_hook
1009
        packet = Packets.AskBeginTransaction(None)
1010
        packet.setId(0)
1011 1012 1013 1014 1015
        try:
            app._askPrimary(packet)
        finally:
            Application._waitMessage = _waitMessage_old
        # check packet sent, connection locked during process and dispatcher updated
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1016
        self.checkAskNewTid(conn)
1017
        self.checkDispatcherRegisterCalled(app, conn)
1018 1019 1020 1021
        # and _waitMessage called
        self.assertTrue(self.test_ok)
        # check NEOStorageError is raised when the primary connection is lost
        app.master_conn = None
1022 1023
        # check disabled since we reonnect to pmn
        #self.assertRaises(NEOStorageError, app._askPrimary, packet)
1024

1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
    def test_threadContextIsolation(self):
        """ Thread context properties must not be visible accross instances
            while remaining in the same thread """
        app1 = self.getApp()
        app1_local = app1.local_var
        app2 = self.getApp()
        app2_local = app2.local_var
        property_id = 'thread_context_test'
        self.assertFalse(hasattr(app1_local, property_id))
        self.assertFalse(hasattr(app2_local, property_id))
        setattr(app1_local, property_id, 'value')
        self.assertTrue(hasattr(app1_local, property_id))
        self.assertFalse(hasattr(app2_local, property_id))
1038 1039 1040 1041

if __name__ == '__main__':
    unittest.main()