__init__.py 23.1 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2009-2010  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
import errno
19 20 21
import os
import sys
import time
22
import ZODB
23
import socket
24
import signal
25
import random
26
import weakref
27
import MySQLdb
28
import unittest
29 30
import tempfile
import traceback
31
import threading
32
import psutil
33
import transaction
34

35
import neo.scripts
36
from neo.neoctl.neoctl import NeoCTL, NotReadyException
37
from neo.lib import setupLog
38
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
39
from neo.lib.util import dump
40
from neo.tests import DB_USER, setupMySQLdb, NeoTestBase, buildUrlFromString, \
41 42
        ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, getTempDirectory
from neo.tests.cluster import SocketLock
43 44
from neo.client.Storage import Storage

45 46 47 48
NEO_MASTER = 'neomaster'
NEO_STORAGE = 'neostorage'
NEO_ADMIN = 'neoadmin'

49
DELAY_SAFETY_MARGIN = 10
50
MAX_START_TIME = 30
51

52 53 54
class NodeProcessError(Exception):
    pass

55 56 57 58 59
class AlreadyRunning(Exception):
    pass

class AlreadyStopped(Exception):
    pass
60

61 62 63
class NotFound(Exception):
    pass

64 65 66
class PortAllocator(object):

    lock = SocketLock('neo.PortAllocator')
67
    allocator_set = weakref.WeakKeyDictionary() # BBB: use WeakSet instead
68 69 70 71 72 73 74 75 76

    def __init__(self):
        self.socket_list = []

    def allocate(self, address_type, local_ip):
        s = socket.socket(address_type, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if not self.lock.locked():
            self.lock.acquire()
77
        self.allocator_set[self] = None
78
        self.socket_list.append(s)
79 80 81 82 83 84 85 86 87 88 89
        while True:
            # Do not let the system choose the port to avoid conflicts
            # with other software. IOW, use a range different than:
            # - /proc/sys/net/ipv4/ip_local_port_range on Linux
            # - what IANA recommends (49152 to 65535)
            try:
                s.bind((local_ip, random.randint(16384, 32767)))
                return s.getsockname()[1]
            except socket.error, e:
              if e.errno != errno.EADDRINUSE:
                raise
90 91 92 93 94 95 96 97

    def release(self):
        for s in self.socket_list:
            s.close()
        self.socket_list = None

    def reset(self):
        if self.lock.locked():
98
            self.allocator_set.pop(self, None)
99 100 101 102 103 104 105
            if not self.allocator_set:
                self.lock.release()
            if self.socket_list:
                for s in self.socket_list:
                    s.close()
            self.__init__()

106 107
    __del__ = reset

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124

class ChildException(KeyboardInterrupt):
    """Wrap any exception into an exception that is not catched by TestCase.run

    The exception is not wrapped and re-raised immediately if there is no need
    to wrap.
    """

    def __init__(self, type, value, tb):
        code = unittest.TestCase.run.im_func.func_code
        f = tb.tb_frame
        while f is not None:
            if f.f_code is code:
                break
            f = f.f_back
        else:
            raise type, value, tb
125
        KeyboardInterrupt.__init__(self, type, value, tb)
126 127 128 129 130 131 132 133 134

    def __call__(self):
        """Re-raise wrapped exception"""
        type, value, tb = self.args
        if type is KeyboardInterrupt:
          sys.exit(1)
        raise type, value, tb


135
class NEOProcess(object):
136 137
    pid = 0

138
    def __init__(self, command, uuid, arg_dict):
139 140 141
        try:
            __import__('neo.scripts.' + command)
        except ImportError:
142
            raise NotFound, '%s not found' % (command)
143
        self.command = command
Vincent Pelletier's avatar
Vincent Pelletier committed
144
        self.arg_dict = arg_dict
145
        self.with_uuid = True
Vincent Pelletier's avatar
Vincent Pelletier committed
146
        self.setUUID(uuid)
147

148
    def start(self, with_uuid=True):
149 150 151 152
        # Prevent starting when already forked and wait wasn't called.
        if self.pid != 0:
            raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
        command = self.command
Vincent Pelletier's avatar
Vincent Pelletier committed
153
        args = []
154
        self.with_uuid = with_uuid
Vincent Pelletier's avatar
Vincent Pelletier committed
155
        for arg, param in self.arg_dict.iteritems():
156 157
            if with_uuid is False and arg == '--uuid':
                continue
Vincent Pelletier's avatar
Vincent Pelletier committed
158 159
            args.append(arg)
            if param is not None:
160
                args.append(str(param))
161 162 163
        self.pid = os.fork()
        if self.pid == 0:
            # Child
164 165
            # prevent child from killing anything
            del self.__class__.__del__
166
            try:
167
                sys.argv = [command] + args
168
                getattr(neo.scripts,  command).main()
169
                sys.exit()
170
            except:
171
                raise ChildException(*sys.exc_info())
172 173
        neo.lib.logging.info('pid %u: %s %s',
                             self.pid, command, ' '.join(map(repr, args)))
174

175 176
    def kill(self, sig=signal.SIGTERM):
        if self.pid:
177
            neo.lib.logging.info('kill pid %u', self.pid)
178
            try:
179 180 181
                pdb.kill(self.pid, sig)
            except OSError:
                traceback.print_last()
182 183
        else:
            raise AlreadyStopped
184 185 186 187 188

    def __del__(self):
        # If we get killed, kill subprocesses aswell.
        try:
            self.kill(signal.SIGKILL)
189
            self.wait()
190 191 192 193 194 195 196
        except:
            # We can ignore all exceptions at this point, since there is no
            # garanteed way to handle them (other objects we would depend on
            # might already have been deleted).
            pass

    def wait(self, options=0):
197 198 199 200
        if self.pid == 0:
            raise AlreadyStopped
        result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
        self.pid = 0
201 202 203
        if result:
            raise NodeProcessError('%r %r exited with status %r' % (
                self.command, self.arg_dict, result))
204
        return result
205

206 207 208 209
    def stop(self):
        self.kill()
        self.wait()

210 211 212
    def getPID(self):
        return self.pid

Vincent Pelletier's avatar
Vincent Pelletier committed
213
    def getUUID(self):
214
        assert self.with_uuid, 'UUID disabled on this process'
Vincent Pelletier's avatar
Vincent Pelletier committed
215 216 217 218 219 220 221
        return self.uuid

    def setUUID(self, uuid):
        """
          Note: for this change to take effect, the node must be restarted.
        """
        self.uuid = uuid
222
        self.arg_dict['--uuid'] = dump(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
223

224
    def isAlive(self):
225
        try:
226 227 228
            return psutil.Process(self.pid).status != psutil.STATUS_ZOMBIE
        except psutil.NoSuchProcess:
            return False
229

230
class NEOCluster(object):
231

232
    def __init__(self, db_list, master_count=1, partitions=1, replicas=0,
233
                 db_user=DB_USER, db_password='',
234
                 cleanup_on_delete=False, temp_dir=None, clear_databases=True,
235
                 adapter=os.getenv('NEO_TESTS_ADAPTER'),
Olivier Cros's avatar
Olivier Cros committed
236 237 238
                 verbose=True,
                 address_type=ADDRESS_TYPE,
        ):
239 240
        if not adapter:
            adapter = 'MySQL'
241
        self.adapter = adapter
242
        self.zodb_storage_list = []
243
        self.cleanup_on_delete = cleanup_on_delete
244
        self.verbose = verbose
Vincent Pelletier's avatar
Vincent Pelletier committed
245
        self.uuid_set = set()
246 247 248
        self.db_user = db_user
        self.db_password = db_password
        self.db_list = db_list
Olivier Cros's avatar
Olivier Cros committed
249
        self.address_type = address_type
250
        self.local_ip = local_ip = IP_VERSION_FORMAT_DICT[self.address_type]
251
        self.setupDB(clear_databases)
Vincent Pelletier's avatar
Vincent Pelletier committed
252
        self.process_dict = {}
253 254 255 256
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
            print 'Using temp directory %r.' % (temp_dir, )
        self.temp_dir = temp_dir
257 258
        self.port_allocator = PortAllocator()
        admin_port = self.port_allocator.allocate(address_type, local_ip)
259
        self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
260
        master_node_list = [self.port_allocator.allocate(address_type, local_ip)
261
                            for i in xrange(master_count)]
Olivier Cros's avatar
Olivier Cros committed
262
        self.master_nodes = '/'.join('%s:%s' % (
263
                buildUrlFromString(self.local_ip), x, )
Olivier Cros's avatar
Olivier Cros committed
264
                for x in master_node_list)
265

266 267 268 269
        # create admin node
        self.__newProcess(NEO_ADMIN, {
            '--cluster': self.cluster_name,
            '--name': 'admin',
Olivier Cros's avatar
Olivier Cros committed
270 271
            '--bind': '%s:%d' % (buildUrlFromString(
                      self.local_ip), admin_port, ),
272
            '--masters': self.master_nodes,
273
        })
274 275 276 277 278
        # create master nodes
        for index, port in enumerate(master_node_list):
            self.__newProcess(NEO_MASTER, {
                '--cluster': self.cluster_name,
                '--name': 'master_%d' % index,
Olivier Cros's avatar
Olivier Cros committed
279 280
                '--bind': '%s:%d' % (buildUrlFromString(
                          self.local_ip), port, ),
281 282 283
                '--masters': self.master_nodes,
                '--replicas': replicas,
                '--partitions': partitions,
284
            })
285 286 287 288 289
        # create storage nodes
        for index, db in enumerate(db_list):
            self.__newProcess(NEO_STORAGE, {
                '--cluster': self.cluster_name,
                '--name': 'storage_%d' % index,
Olivier Cros's avatar
Olivier Cros committed
290 291 292
                '--bind': '%s:%d' % (buildUrlFromString(
                                        self.local_ip),
                                        0 ),
293 294
                '--masters': self.master_nodes,
                '--database': '%s:%s@%s' % (db_user, db_password, db),
295
                '--adapter': adapter,
296
            })
297
        # create neoctl
Olivier Cros's avatar
Olivier Cros committed
298
        self.neoctl = NeoCTL((self.local_ip, admin_port))
299

300
    def __newProcess(self, command, arguments):
Vincent Pelletier's avatar
Vincent Pelletier committed
301
        uuid = self.__allocateUUID()
302
        arguments['--uuid'] = uuid
303 304
        if self.verbose:
            arguments['--verbose'] = True
305 306
        logfile = arguments['--name']
        arguments['--logfile'] = os.path.join(self.temp_dir, '%s.log' % (logfile, ))
307

308
        self.process_dict.setdefault(command, []).append(
309
            NEOProcess(command, uuid, arguments))
310

Vincent Pelletier's avatar
Vincent Pelletier committed
311
    def __allocateUUID(self):
312
        uuid = ('%032x' % random.getrandbits(128)).decode('hex')
313
        self.uuid_set.add(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
314 315
        return uuid

316
    def setupDB(self, clear_databases=True):
317
        if self.adapter == 'MySQL':
318 319
            setupMySQLdb(self.db_list, self.db_user, self.db_password,
                         clear_databases)
320

321 322
    def run(self, except_storages=()):
        """ Start cluster processes except some storage nodes """
Vincent Pelletier's avatar
Vincent Pelletier committed
323
        assert len(self.process_dict)
324
        self.port_allocator.release()
Vincent Pelletier's avatar
Vincent Pelletier committed
325 326
        for process_list in self.process_dict.itervalues():
            for process in process_list:
327 328
                if process not in except_storages:
                    process.start()
329
        # wait for the admin node availability
330
        def test():
331
            try:
332
                self.neoctl.getClusterState()
333
            except NotReadyException:
334 335
                return False
            return True
336
        if not pdb.wait(test, MAX_START_TIME):
337
            raise AssertionError('Timeout when starting cluster')
338
        self.port_allocator.reset()
339

340
    def start(self, except_storages=(), delay_startup=0):
341
        """ Do a complete start of a cluster """
342 343 344 345 346 347 348 349 350
        if delay_startup is None:
            storage_list = self.getStorageProcessList()
            self.run(except_storages=storage_list)
            storage_list = set(storage_list).difference(except_storages)
        else:
            storage_list = ()
            self.run(except_storages=except_storages)
            if delay_startup:
                time.sleep(delay_startup)
351 352
        neoctl = self.neoctl
        neoctl.startCluster()
353 354
        for storage in storage_list:
            storage.start()
355
        target_count = len(self.db_list) - len(except_storages)
356 357 358
        storage_node_list = []
        def test():
            storage_node_list[:] = neoctl.getNodeList(
359
                node_type=NodeTypes.STORAGE)
360 361 362
            # wait at least number of started storages, admin node can know
            # more nodes when the cluster restart with an existing partition
            # table referencing non-running nodes
363
            return len(storage_node_list) >= target_count
364
        if not pdb.wait(test, MAX_START_TIME):
365
            raise AssertionError('Timeout when starting cluster')
366 367 368
        if storage_node_list:
            self.expectClusterRunning()
            neoctl.enableStorageList([x[2] for x in storage_node_list])
369

370
    def stop(self, clients=True):
371
        error_list = []
Vincent Pelletier's avatar
Vincent Pelletier committed
372 373 374
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
375
                    process.kill(signal.SIGKILL)
Vincent Pelletier's avatar
Vincent Pelletier committed
376 377 378
                    process.wait()
                except AlreadyStopped:
                    pass
379 380
                except NodeProcessError, e:
                    error_list += e.args
381 382 383 384
        if clients:
            for zodb_storage in self.zodb_storage_list:
                zodb_storage.close()
            self.zodb_storage_list = []
385
        time.sleep(0.5)
386 387
        if error_list:
            raise NodeProcessError('\n'.join(error_list))
388 389 390 391

    def getNEOCTL(self):
        return self.neoctl

392
    def getZODBStorage(self, **kw):
393
        master_nodes = self.master_nodes.replace('/', ' ')
394
        result = Storage(
395
            master_nodes=master_nodes,
396
            name=self.cluster_name,
397 398
            logfile=os.path.join(self.temp_dir, 'client.log'),
            verbose=self.verbose,
399
            **kw)
400 401
        self.zodb_storage_list.append(result)
        return result
402

403
    def getZODBConnection(self, **kw):
404
        """ Return a tuple with the database and a connection """
405
        db = ZODB.DB(storage=self.getZODBStorage(**kw))
406 407
        return (db, db.open())

408
    def getSQLConnection(self, db, autocommit=False):
409
        assert db in self.db_list
410
        conn = MySQLdb.Connect(user=self.db_user, passwd=self.db_password,
411
                               db=db)
412 413
        conn.autocommit(autocommit)
        return conn
414

Vincent Pelletier's avatar
Vincent Pelletier committed
415 416 417 418 419 420 421 422 423 424 425 426
    def _getProcessList(self, type):
        return self.process_dict.get(type)

    def getMasterProcessList(self):
        return self._getProcessList(NEO_MASTER)

    def getStorageProcessList(self):
        return self._getProcessList(NEO_STORAGE)

    def getAdminProcessList(self):
        return self._getProcessList(NEO_ADMIN)

427 428
    def _killMaster(self, primary=False, all=False):
        killed_uuid_list = []
429
        primary_uuid = self.neoctl.getPrimary()
430 431 432
        for master in self.getMasterProcessList():
            master_uuid = master.getUUID()
            is_primary = master_uuid == primary_uuid
433 434 435 436 437 438
            if primary and is_primary or not (primary or is_primary):
                killed_uuid_list.append(master_uuid)
                master.kill()
                master.wait()
                if not all:
                    break
439 440
        return killed_uuid_list

441
    def killPrimary(self):
442 443 444 445 446 447 448
        return self._killMaster(primary=True)

    def killSecondaryMaster(self, all=False):
        return self._killMaster(primary=False, all=all)

    def killMasters(self):
        secondary_list = self.killSecondaryMaster(all=True)
449
        primary_list = self.killPrimary()
450 451
        return secondary_list + primary_list

452 453 454 455 456 457 458 459 460 461
    def killStorage(self, all=False):
        killed_uuid_list = []
        for storage in self.getStorageProcessList():
            killed_uuid_list.append(storage.getUUID())
            storage.kill()
            storage.wait()
            if not all:
                break
        return killed_uuid_list

462 463
    def __getNodeList(self, node_type, state=None):
        return [x for x in self.neoctl.getNodeList(node_type)
464 465
                if state is None or x[3] == state]

466
    def getMasterList(self, state=None):
467
        return self.__getNodeList(NodeTypes.MASTER, state)
468

469
    def getStorageList(self, state=None):
470
        return self.__getNodeList(NodeTypes.STORAGE, state)
471

472 473 474
    def getClientlist(self, state=None):
        return self.__getNodeList(NodeTypes.CLIENT, state)

475 476
    def __getNodeState(self, node_type, uuid):
        node_list = self.__getNodeList(node_type)
477 478 479 480 481 482 483
        for node_type, address, node_uuid, state in node_list:
            if node_uuid == uuid:
                break
        else:
            state = None
        return state

484
    def getMasterNodeState(self, uuid):
485
        return self.__getNodeState(NodeTypes.MASTER, uuid)
486

487
    def getPrimary(self):
488
        try:
489
            current_try = self.neoctl.getPrimary()
490 491 492 493
        except NotReadyException:
            current_try = None
        return current_try

494
    def expectCondition(self, condition, timeout=0, on_fail=None):
495
        end = time.time() + timeout + DELAY_SAFETY_MARGIN
496 497 498 499
        opaque_history = [None]
        def test():
            reached, opaque = condition(opaque_history[-1])
            if not reached:
500
                opaque_history.append(opaque)
501
            return reached
502
        if not pdb.wait(test, timeout + DELAY_SAFETY_MARGIN):
503
            del opaque_history[0]
504 505
            if on_fail is not None:
                on_fail(opaque_history)
506 507
            raise AssertionError('Timeout while expecting condition. '
                                 'History: %s' % opaque_history)
508

509
    def expectAllMasters(self, node_count, state=None, *args, **kw):
510
        def callback(last_try):
511
            current_try = len(self.getMasterList(state=state))
512 513 514 515
            if last_try is not None and current_try < last_try:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, current_try)
            return (current_try == node_count, current_try)
516
        self.expectCondition(callback, *args, **kw)
517

518
    def __expectNodeState(self, node_type, uuid, state, *args, **kw):
519 520 521
        if not isinstance(state, (tuple, list)):
            state = (state, )
        def callback(last_try):
522
            current_try = self.__getNodeState(node_type, uuid)
523
            return current_try in state, current_try
524
        self.expectCondition(callback, *args, **kw)
525

526 527
    def expectMasterState(self, uuid, state, *args, **kw):
        self.__expectNodeState(NodeTypes.MASTER, uuid, state, *args, **kw)
528

529 530
    def expectStorageState(self, uuid, state, *args, **kw):
        self.__expectNodeState(NodeTypes.STORAGE, uuid, state, *args, **kw)
531

532 533 534
    def expectRunning(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.RUNNING,
                                *args, **kw)
535

536 537 538
    def expectPending(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.PENDING,
                                *args, **kw)
539

540 541 542
    def expectUnknown(self, process, *args, **kw):
        self.expectStorageState(process.getUUID(), NodeStates.UNKNOWN,
                                *args, **kw)
543

544
    def expectUnavailable(self, process, *args, **kw):
545
        self.expectStorageState(process.getUUID(),
546
                NodeStates.TEMPORARILY_DOWN, *args, **kw)
547

548
    def expectPrimary(self, uuid=None, *args, **kw):
549
        def callback(last_try):
550
            current_try = self.getPrimary()
551 552 553 554
            if None not in (uuid, current_try) and uuid != current_try:
                raise AssertionError, 'An unexpected primary arised: %r, ' \
                    'expected %r' % (dump(current_try), dump(uuid))
            return uuid is None or uuid == current_try, current_try
555
        self.expectCondition(callback, *args, **kw)
556

557
    def expectOudatedCells(self, number, *args, **kw):
558 559
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
560
            number_of_oudated = 0
561 562
            for row in row_list:
                for cell in row[1]:
563
                    if cell[1] == CellStates.OUT_OF_DATE:
564 565
                        number_of_oudated += 1
            return number_of_oudated == number, number_of_oudated
566
        self.expectCondition(callback, *args, **kw)
567

568
    def expectAssignedCells(self, process, number, *args, **kw):
569 570 571 572 573
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
            assigned_cells_number = 0
            for row in row_list:
                for cell in row[1]:
574
                    if cell[0] == process.getUUID():
575 576
                        assigned_cells_number += 1
            return assigned_cells_number == number, assigned_cells_number
577
        self.expectCondition(callback, *args, **kw)
578

579
    def expectClusterState(self, state, *args, **kw):
580 581 582
        def callback(last_try):
            current_try = self.neoctl.getClusterState()
            return current_try == state, current_try
583
        self.expectCondition(callback, *args, **kw)
584

585 586
    def expectClusterRecovering(self, *args, **kw):
        self.expectClusterState(ClusterStates.RECOVERING, *args, **kw)
587

588 589
    def expectClusterVerifying(self, *args, **kw):
        self.expectClusterState(ClusterStates.VERIFYING, *args, **kw)
590

591 592
    def expectClusterRunning(self, *args, **kw):
        self.expectClusterState(ClusterStates.RUNNING, *args, **kw)
593

594
    def expectAlive(self, process, *args, **kw):
595 596 597
        def callback(last_try):
            current_try = process.isAlive()
            return current_try, current_try
598
        self.expectCondition(callback, *args, **kw)
599

600
    def expectStorageNotKnown(self, process, *args, **kw):
601 602 603 604 605 606 607
        # /!\ Not Known != Unknown
        process_uuid = process.getUUID()
        def expected_storage_not_known(last_try):
            for storage in self.getStorageList():
                if storage[2] == process_uuid:
                    return False, storage
            return True, None
608
        self.expectCondition(expected_storage_not_known, *args, **kw)
609

610 611 612
    def __del__(self):
        if self.cleanup_on_delete:
            os.removedirs(self.temp_dir)
613

614

615
class NEOFunctionalTest(NeoTestBase):
616

617 618 619 620 621 622 623
    def tearDown(self):
        # Kill all unfinished transactions for next test.
        # Note we don't even abort them because it may require a valid
        # connection to a master node (see Storage.sync()).
        transaction.manager.__init__()
        NeoTestBase.tearDown(self)

624 625 626 627
    def setupLog(self):
        log_file = os.path.join(self.getTempDirectory(), 'test.log')
        setupLog('TEST', log_file, True)

628 629
    def getTempDirectory(self):
        # build the full path based on test case and current test method
630
        temp_dir = os.path.join(getTempDirectory(), self.id())
631 632 633 634
        # build the path if needed
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return temp_dir
635

636 637 638 639 640 641
    def run(self, *args, **kw):
        try:
            return super(NEOFunctionalTest, self).run(*args, **kw)
        except ChildException, e:
            e()

642 643 644
    def runWithTimeout(self, timeout, method, args=(), kwargs=None):
        if kwargs is None:
            kwargs = {}
645 646 647 648 649 650 651
        exc_list = []
        def excWrapper(*args, **kw):
            try:
                method(*args, **kw)
            except:
                exc_list.append(sys.exc_info())
        thread = threading.Thread(None, excWrapper, args=args, kwargs=kwargs)
652 653 654 655
        thread.setDaemon(True)
        thread.start()
        thread.join(timeout)
        self.assertFalse(thread.isAlive(), 'Run timeout')
656 657 658 659
        if exc_list:
            assert len(exc_list) == 1, exc_list
            exc = exc_list[0]
            raise exc[0], exc[1], exc[2]
660