__init__.py 20.9 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2009-2010  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18 19 20

import os
import sys
import time
21
import ZODB
22
import socket
23
import signal
24
import random
25
import MySQLdb
26
import unittest
27 28
import tempfile
import traceback
29
import threading
30

31
from neo.neoctl.neoctl import NeoCTL, NotReadyException
32
from neo.protocol import ClusterStates, NodeTypes, CellStates, NodeStates
Vincent Pelletier's avatar
Vincent Pelletier committed
33
from neo.util import dump
34
from neo.tests import DB_ADMIN, DB_PASSWD, NeoTestBase
35 36
from neo.client.Storage import Storage

37 38 39 40
NEO_MASTER = 'neomaster'
NEO_STORAGE = 'neostorage'
NEO_ADMIN = 'neoadmin'

41
DELAY_SAFETY_MARGIN = 10
42
MAX_START_TIME = 30
43

44 45 46 47 48
class AlreadyRunning(Exception):
    pass

class AlreadyStopped(Exception):
    pass
49

50
class NEOProcess(object):
51 52
    pid = 0

53
    def __init__(self, command, uuid, arg_dict):
54
        self.command = command
Vincent Pelletier's avatar
Vincent Pelletier committed
55
        self.arg_dict = arg_dict
56
        self.with_uuid = True
Vincent Pelletier's avatar
Vincent Pelletier committed
57
        self.setUUID(uuid)
58

59
    def start(self, with_uuid=True):
60 61 62 63
        # Prevent starting when already forked and wait wasn't called.
        if self.pid != 0:
            raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
        command = self.command
Vincent Pelletier's avatar
Vincent Pelletier committed
64
        args = []
65
        self.with_uuid = with_uuid
Vincent Pelletier's avatar
Vincent Pelletier committed
66
        for arg, param in self.arg_dict.iteritems():
67 68
            if with_uuid is False and arg == '--uuid':
                continue
Vincent Pelletier's avatar
Vincent Pelletier committed
69 70
            args.append(arg)
            if param is not None:
71
                args.append(str(param))
72 73 74 75
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            try:
76 77 78 79
                sys.argv = [command] + args
                execfile(command, {})
            except (SystemExit, KeyboardInterrupt):
                self._exit()
80 81 82 83 84
            except:
                print traceback.format_exc()
            # If we reach this line, exec call failed (is it possible to reach
            # it without going through above "except" branch ?).
            print 'Error executing %r.' % (command + ' ' + ' '.join(args), )
85 86 87 88 89 90 91 92 93
            self._exit()

    def _exit(self):
	# KeyboardInterrupt is not intercepted by test runner (it is still
	# above us in the stack), and we do want to exit.
	# To avoid polluting test foreground output with induced
	# traceback, replace stdout & stderr.
	sys.stdout = sys.stderr = open('/dev/null', 'w')
	raise KeyboardInterrupt
94 95 96 97 98 99 100

    def kill(self, sig=signal.SIGTERM):
        if self.pid:
            try:
                os.kill(self.pid, sig)
            except OSError:
                traceback.print_last()
101 102
        else:
            raise AlreadyStopped
103 104 105 106 107

    def __del__(self):
        # If we get killed, kill subprocesses aswell.
        try:
            self.kill(signal.SIGKILL)
108
            self.wait()
109 110 111 112 113 114 115
        except:
            # We can ignore all exceptions at this point, since there is no
            # garanteed way to handle them (other objects we would depend on
            # might already have been deleted).
            pass

    def wait(self, options=0):
116 117 118 119 120
        if self.pid == 0:
            raise AlreadyStopped
        result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
        self.pid = 0
        return result
121

122 123 124 125
    def stop(self):
        self.kill()
        self.wait()

126 127 128
    def getPID(self):
        return self.pid

Vincent Pelletier's avatar
Vincent Pelletier committed
129
    def getUUID(self):
130
        assert self.with_uuid, 'UUID disabled on this process'
Vincent Pelletier's avatar
Vincent Pelletier committed
131 132 133 134 135 136 137
        return self.uuid

    def setUUID(self, uuid):
        """
          Note: for this change to take effect, the node must be restarted.
        """
        self.uuid = uuid
138
        self.arg_dict['--uuid'] = dump(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
139

140
    def isAlive(self):
141 142 143 144 145 146 147 148 149 150
        try:
            os.kill(self.pid, 0)
        except OSError, (errno, msg):
            if errno == 3: # No such process
                result = False
            else:
                raise
        else:
            result = True
        return result
151

152
class NEOCluster(object):
153

154
    def __init__(self, db_list, master_node_count=1, partitions=1, replicas=0,
155
                 db_user='neo', db_password='neo',
156
                 db_super_user=DB_ADMIN, db_super_password=DB_PASSWD,
157
                 cleanup_on_delete=False, temp_dir=None,
158 159
                 clear_databases=True, adapter='MySQL',
                 verbose=True):
160
        self.zodb_storage_list = []
161
        self.cleanup_on_delete = cleanup_on_delete
162
        self.verbose = verbose
Vincent Pelletier's avatar
Vincent Pelletier committed
163
        self.uuid_set = set()
164 165 166 167 168
        self.db_super_user = db_super_user
        self.db_super_password = db_super_password
        self.db_user = db_user
        self.db_password = db_password
        self.db_list = db_list
169 170
        if clear_databases:
            self.setupDB()
Vincent Pelletier's avatar
Vincent Pelletier committed
171
        self.process_dict = {}
172
        self.port_set = set()
173 174 175 176
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
            print 'Using temp directory %r.' % (temp_dir, )
        self.temp_dir = temp_dir
177
        admin_port = self.__allocatePort()
178 179
        self.cluster_name = 'neo_%s' % (random.randint(0, 100), )
        master_node_list = [self.__allocatePort() for i in xrange(master_node_count)]
180
        self.master_nodes = '/'.join('127.0.0.1:%s' % (x, ) for x in master_node_list)
181 182 183 184 185 186
        # create admin node
        self.__newProcess(NEO_ADMIN, {
            '--cluster': self.cluster_name,
            '--name': 'admin',
            '--bind': '127.0.0.1:%d' % (admin_port, ),
            '--masters': self.master_nodes,
187
        })
188 189 190 191 192 193 194 195 196
        # create master nodes
        for index, port in enumerate(master_node_list):
            self.__newProcess(NEO_MASTER, {
                '--cluster': self.cluster_name,
                '--name': 'master_%d' % index,
                '--bind': '127.0.0.1:%d' % (port, ),
                '--masters': self.master_nodes,
                '--replicas': replicas,
                '--partitions': partitions,
197
            })
198 199 200 201 202 203 204
        # create storage nodes
        for index, db in enumerate(db_list):
            self.__newProcess(NEO_STORAGE, {
                '--cluster': self.cluster_name,
                '--name': 'storage_%d' % index,
                '--masters': self.master_nodes,
                '--database': '%s:%s@%s' % (db_user, db_password, db),
205
                '--adapter': adapter,
206
            })
207 208
        # create neoctl
        self.neoctl = NeoCTL('127.0.0.1', admin_port,
209
                             'SocketConnector')
210

211
    def __newProcess(self, command, arguments):
Vincent Pelletier's avatar
Vincent Pelletier committed
212
        uuid = self.__allocateUUID()
213
        arguments['--uuid'] = uuid
214 215
        if self.verbose:
            arguments['--verbose'] = True
216 217
        logfile = arguments['--name']
        arguments['--logfile'] = os.path.join(self.temp_dir, '%s.log' % (logfile, ))
218

219
        self.process_dict.setdefault(command, []).append(
220
            NEOProcess(command, uuid, arguments))
221

222
    def __allocatePort(self):
223 224 225 226 227 228 229 230 231 232 233
        port_set = self.port_set
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        while True:
            s.bind(('127.0.0.1', 0))
            port = s.getsockname()[1]
            if port not in port_set:
                break
        s.close()
        port_set.add(port)
        return port
234

Vincent Pelletier's avatar
Vincent Pelletier committed
235
    def __allocateUUID(self):
236 237
        uuid = os.urandom(16)
        self.uuid_set.add(uuid)
Vincent Pelletier's avatar
Vincent Pelletier committed
238 239
        return uuid

240
    def __getSuperSQLConnection(self):
241 242 243 244 245
        # Cleanup or bootstrap databases
        connect_arg_dict = {'user': self.db_super_user}
        password = self.db_super_password
        if password is not None:
            connect_arg_dict['passwd'] = password
246 247 248
        return MySQLdb.Connect(**connect_arg_dict)

    def setupDB(self):
249
        sql_connection = self.__getSuperSQLConnection()
250 251 252 253 254 255 256 257
        cursor = sql_connection.cursor()
        for database in self.db_list:
            cursor.execute('DROP DATABASE IF EXISTS `%s`' % (database, ))
            cursor.execute('CREATE DATABASE `%s`' % (database, ))
            cursor.execute('GRANT ALL ON `%s`.* TO "%s"@"localhost" '\
                           'IDENTIFIED BY "%s"' % (database, self.db_user,
                           self.db_password))
        cursor.close()
258
        sql_connection.commit()
259
        sql_connection.close()
260

261
    def switchTables(self, database):
262
        sql_connection = self.__getSuperSQLConnection()
263 264
        cursor = sql_connection.cursor()
        cursor.execute('use %s' % (database, ))
265 266 267 268
        for table in ('trans', 'obj'):
            cursor.execute('rename table %s to tmp' % (table, ))
            cursor.execute('rename table t%s to %s' % (table, table))
            cursor.execute('rename table tmp to t%s' % (table, ))
269
        cursor.execute('truncate table obj_short')
270 271 272
        sql_connection.commit()
        sql_connection.close()

273 274
    def run(self, except_storages=()):
        """ Start cluster processes except some storage nodes """
Vincent Pelletier's avatar
Vincent Pelletier committed
275 276 277
        assert len(self.process_dict)
        for process_list in self.process_dict.itervalues():
            for process in process_list:
278 279
                if process not in except_storages:
                    process.start()
280
        # wait for the admin node availability
281
        end_time = time.time() + MAX_START_TIME
282
        while True:
283 284
            if time.time() > end_time:
                raise AssertionError, 'Timeout when starting cluster'
285
            try:
286
                self.neoctl.getClusterState()
287 288 289 290
            except NotReadyException:
                time.sleep(0.5)
            else:
                break
291 292 293 294 295 296

    def start(self, except_storages=()):
        """ Do a complete start of a cluster """
        self.run(except_storages=except_storages)
        neoctl = self.neoctl
        neoctl.startCluster()
297
        target_count = len(self.db_list) - len(except_storages)
298
        end_time = time.time() + MAX_START_TIME
299 300
        while True:
            storage_node_list = neoctl.getNodeList(
301
                node_type=NodeTypes.STORAGE)
302 303 304 305
            # wait at least number of started storages, admin node can know
            # more nodes when the cluster restart with an existing partition
            # table referencing non-running nodes
            if len(storage_node_list) >= target_count:
306
                break
307
            time.sleep(0.5)
308 309
            if time.time() > end_time:
                raise AssertionError, 'Timeout when starting cluster'
310 311 312
        if storage_node_list:
            self.expectClusterRunning()
            neoctl.enableStorageList([x[2] for x in storage_node_list])
313

314
    def stop(self, clients=True):
Vincent Pelletier's avatar
Vincent Pelletier committed
315 316 317
        for process_list in self.process_dict.itervalues():
            for process in process_list:
                try:
318
                    process.kill(signal.SIGKILL)
Vincent Pelletier's avatar
Vincent Pelletier committed
319 320 321
                    process.wait()
                except AlreadyStopped:
                    pass
322 323 324 325
        if clients:
            for zodb_storage in self.zodb_storage_list:
                zodb_storage.close()
            self.zodb_storage_list = []
326
        time.sleep(0.5)
327 328 329 330

    def getNEOCTL(self):
        return self.neoctl

331
    def getZODBStorage(self, **kw):
332
        master_nodes = self.master_nodes.replace('/', ' ')
333
        result = Storage(
334
            master_nodes=master_nodes,
335
            name=self.cluster_name,
336 337 338
            connector='SocketConnector',
            logfile=os.path.join(self.temp_dir, 'client.log'),
            verbose=self.verbose,
339
            **kw
340
        )
341 342
        self.zodb_storage_list.append(result)
        return result
343

344
    def getZODBConnection(self, **kw):
345
        """ Return a tuple with the database and a connection """
346
        db = ZODB.DB(storage=self.getZODBStorage(**kw))
347 348
        return (db, db.open())

349
    def getSQLConnection(self, db, autocommit=False):
350
        assert db in self.db_list
351
        conn = MySQLdb.Connect(user=self.db_user, passwd=self.db_password,
352
                               db=db)
353 354
        conn.autocommit(autocommit)
        return conn
355

Vincent Pelletier's avatar
Vincent Pelletier committed
356 357 358 359 360 361 362 363 364 365 366 367
    def _getProcessList(self, type):
        return self.process_dict.get(type)

    def getMasterProcessList(self):
        return self._getProcessList(NEO_MASTER)

    def getStorageProcessList(self):
        return self._getProcessList(NEO_STORAGE)

    def getAdminProcessList(self):
        return self._getProcessList(NEO_ADMIN)

368 369
    def _killMaster(self, primary=False, all=False):
        killed_uuid_list = []
370
        primary_uuid = self.neoctl.getPrimary()
371 372 373
        for master in self.getMasterProcessList():
            master_uuid = master.getUUID()
            is_primary = master_uuid == primary_uuid
374 375 376 377 378 379
            if primary and is_primary or not (primary or is_primary):
                killed_uuid_list.append(master_uuid)
                master.kill()
                master.wait()
                if not all:
                    break
380 381
        return killed_uuid_list

382
    def killPrimary(self):
383 384 385 386 387 388 389
        return self._killMaster(primary=True)

    def killSecondaryMaster(self, all=False):
        return self._killMaster(primary=False, all=all)

    def killMasters(self):
        secondary_list = self.killSecondaryMaster(all=True)
390
        primary_list = self.killPrimary()
391 392
        return secondary_list + primary_list

393 394 395 396 397 398 399 400 401 402
    def killStorage(self, all=False):
        killed_uuid_list = []
        for storage in self.getStorageProcessList():
            killed_uuid_list.append(storage.getUUID())
            storage.kill()
            storage.wait()
            if not all:
                break
        return killed_uuid_list

403 404
    def __getNodeList(self, node_type, state=None):
        return [x for x in self.neoctl.getNodeList(node_type)
405 406
                if state is None or x[3] == state]

407
    def getMasterList(self, state=None):
408
        return self.__getNodeList(NodeTypes.MASTER, state)
409

410
    def getStorageList(self, state=None):
411
        return self.__getNodeList(NodeTypes.STORAGE, state)
412

413 414 415
    def getClientlist(self, state=None):
        return self.__getNodeList(NodeTypes.CLIENT, state)

416 417
    def __getNodeState(self, node_type, uuid):
        node_list = self.__getNodeList(node_type)
418 419 420 421 422 423 424
        for node_type, address, node_uuid, state in node_list:
            if node_uuid == uuid:
                break
        else:
            state = None
        return state

425
    def getMasterNodeState(self, uuid):
426
        return self.__getNodeState(NodeTypes.MASTER, uuid)
427

428
    def getPrimary(self):
429
        try:
430
            current_try = self.neoctl.getPrimary()
431 432 433 434
        except NotReadyException:
            current_try = None
        return current_try

435
    def expectCondition(self, condition, timeout=0, delay=1, on_fail=None):
436 437 438 439 440 441 442 443 444 445 446
        end = time.time() + timeout + DELAY_SAFETY_MARGIN
        opaque = None
        opaque_history = []
        while time.time() < end:
            reached, opaque = condition(opaque)
            if reached:
                break
            else:
                opaque_history.append(opaque)
                time.sleep(delay)
        else:
447 448
            if on_fail is not None:
                on_fail(opaque_history)
449
            raise AssertionError, 'Timeout while expecting condition. ' \
450 451 452 453
                                'History: %s' % (opaque_history, )

    def expectAllMasters(self, node_count, state=None, timeout=0, delay=1):
        def callback(last_try):
454
            current_try = len(self.getMasterList(state=state))
455 456 457 458 459 460
            if last_try is not None and current_try < last_try:
                raise AssertionError, 'Regression: %s became %s' % \
                    (last_try, current_try)
            return (current_try == node_count, current_try)
        self.expectCondition(callback, timeout, delay)

461
    def __expectNodeState(self, node_type, uuid, state, timeout=0, delay=1):
462 463 464
        if not isinstance(state, (tuple, list)):
            state = (state, )
        def callback(last_try):
465
            current_try = self.__getNodeState(node_type, uuid)
466 467
            return current_try in state, current_try
        self.expectCondition(callback, timeout, delay)
468

469
    def expectMasterState(self, uuid, state, timeout=0, delay=1):
470
        self.__expectNodeState(NodeTypes.MASTER, uuid, state, timeout,
471 472 473
                delay)

    def expectStorageState(self, uuid, state, timeout=0, delay=1):
474
        self.__expectNodeState(NodeTypes.STORAGE, uuid, state,
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
                timeout, delay)

    def expectRunning(self, process, timeout=0, delay=1):
        self.expectStorageState(process.getUUID(), NodeStates.RUNNING, timeout,
                delay)

    def expectPending(self, process, timeout=0, delay=1):
        self.expectStorageState(process.getUUID(), NodeStates.PENDING, timeout,
                delay)

    def expectUnknown(self, process, timeout=0, delay=1):
        self.expectStorageState(process.getUUID(), NodeStates.UNKNOWN, timeout,
                delay)

    def expectUnavailable(self, process, timeout=0, delay=1):
        self.expectStorageState(process.getUUID(),
                NodeStates.TEMPORARILY_DOWN, timeout, delay)
492

493
    def expectPrimary(self, uuid=None, timeout=0, delay=1):
494
        def callback(last_try):
495
            current_try = self.getPrimary()
496 497 498 499 500 501
            if None not in (uuid, current_try) and uuid != current_try:
                raise AssertionError, 'An unexpected primary arised: %r, ' \
                    'expected %r' % (dump(current_try), dump(uuid))
            return uuid is None or uuid == current_try, current_try
        self.expectCondition(callback, timeout, delay)

502
    def expectOudatedCells(self, number, timeout=0, delay=1):
503 504
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
505
            number_of_oudated = 0
506 507
            for row in row_list:
                for cell in row[1]:
508
                    if cell[1] == CellStates.OUT_OF_DATE:
509 510
                        number_of_oudated += 1
            return number_of_oudated == number, number_of_oudated
511 512
        self.expectCondition(callback, timeout, delay)

513
    def expectAssignedCells(self, process, number, timeout=0, delay=1):
514 515 516 517 518
        def callback(last_try):
            row_list = self.neoctl.getPartitionRowList()[1]
            assigned_cells_number = 0
            for row in row_list:
                for cell in row[1]:
519
                    if cell[0] == process.getUUID():
520 521 522 523
                        assigned_cells_number += 1
            return assigned_cells_number == number, assigned_cells_number
        self.expectCondition(callback, timeout, delay)

524 525 526 527 528
    def expectClusterState(self, state, timeout=0, delay=1):
        def callback(last_try):
            current_try = self.neoctl.getClusterState()
            return current_try == state, current_try
        self.expectCondition(callback, timeout, delay)
529

530
    def expectClusterRecovering(self, timeout=0, delay=1):
531
        self.expectClusterState(ClusterStates.RECOVERING, timeout, delay)
532

533
    def expectClusterVerifying(self, timeout=0, delay=1):
534
        self.expectClusterState(ClusterStates.VERIFYING, timeout, delay)
535 536

    def expectClusterRunning(self, timeout=0, delay=1):
537
        self.expectClusterState(ClusterStates.RUNNING, timeout, delay)
538

539 540 541 542 543 544
    def expectAlive(self, process, timeout=0, delay=1):
        def callback(last_try):
            current_try = process.isAlive()
            return current_try, current_try
        self.expectCondition(callback, timeout, delay)

545 546 547 548 549 550 551 552 553 554
    def expectStorageNotKnown(self, process, timeout=0, delay=1):
        # /!\ Not Known != Unknown
        process_uuid = process.getUUID()
        def expected_storage_not_known(last_try):
            for storage in self.getStorageList():
                if storage[2] == process_uuid:
                    return False, storage
            return True, None
        self.expectCondition(expected_storage_not_known, timeout, delay)

555 556 557
    def __del__(self):
        if self.cleanup_on_delete:
            os.removedirs(self.temp_dir)
558

559

560
class NEOFunctionalTest(NeoTestBase):
561 562 563 564 565 566

    def getTempDirectory(self):
        # get the current temp directory or a new one
        temp_dir = os.environ.get('TEMP', None)
        if temp_dir is None:
            temp_dir = tempfile.mkdtemp(prefix='neo_')
567
            os.environ['TEMP'] = temp_dir
568
            print 'Using temp directory %r.' % (temp_dir, )
569
        # build the full path based on test case and current test method
570
        temp_dir = os.path.join(temp_dir, self.id())
571 572 573 574
        # build the path if needed
        if not os.path.exists(temp_dir):
            os.makedirs(temp_dir)
        return temp_dir
575

576 577 578 579
    def runWithTimeout(self, timeout, method, args=(), kwargs=None):
        if kwargs is None:
            kwargs = {}
        thread = threading.Thread(None, method, args=args, kwargs=kwargs)
580 581 582 583
        thread.setDaemon(True)
        thread.start()
        thread.join(timeout)
        self.assertFalse(thread.isAlive(), 'Run timeout')
584