__init__.py 26.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#
# Copyright (c) 2011 Nexedi SARL and Contributors. All Rights Reserved.
#                    Julien Muchembled <jm@nexedi.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

19
import os, random, socket, sys, tempfile, threading, time, types, weakref
20
import traceback
21
from collections import deque
22
from itertools import count
23
from functools import wraps
24
from zlib import decompress
25 26 27 28 29 30
from mock import Mock
import transaction, ZODB
import neo.admin.app, neo.master.app, neo.storage.app
import neo.client.app, neo.neoctl.app
from neo.client import Storage
from neo.lib import bootstrap, setupLog
31
from neo.lib.connection import BaseConnection, Connection
32
from neo.lib.connector import SocketConnector, \
33
    ConnectorConnectionRefusedException, ConnectorTryAgainException
34 35 36
from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes
from neo.lib.util import SOCKET_CONNECTORS_DICT, parseMasterList
37
from .. import NeoTestBase, getTempDirectory, setupMySQLdb, \
38
    ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_USER
39 40 41 42 43 44 45

BIND = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE], 0
LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])


class Serialized(object):

46 47 48 49
    @classmethod
    def init(cls):
        cls._global_lock = threading.Lock()
        cls._global_lock.acquire()
50 51
        cls._lock_list = deque()
        cls._lock_lock = threading.Lock()
52 53
        cls._pdb = False
        cls.pending = 0
54

55
    @classmethod
56
    def release(cls, lock=None, wake_other=True, stop=None):
57 58
        """Suspend lock owner and resume first suspended thread"""
        if lock is None:
59
            lock = cls._global_lock
60
            if stop: # XXX: we should fix ClusterStates.STOPPING
61
                cls.pending = frozenset(stop)
62
            else:
63
                cls.pending = 0
64 65
        try:
            sys._getframe(1).f_trace.im_self.set_continue()
66
            cls._pdb = True
67 68
        except AttributeError:
            pass
69
        q = cls._lock_list
70 71 72 73 74 75 76 77
        l = cls._lock_lock
        l.acquire()
        try:
            q.append(lock)
            if wake_other:
                q.popleft().release()
        finally:
            l.release()
78

79 80
    @classmethod
    def acquire(cls, lock=None):
81 82
        """Suspend all threads except lock owner"""
        if lock is None:
83
            lock = cls._global_lock
84
        lock.acquire()
85 86
        pending = cls.pending # XXX: getattr once to avoid race conditions
        if type(pending) is frozenset: # XXX
87 88
            if lock is cls._global_lock:
                cls.pending = 0
89
            elif threading.currentThread() in pending:
90
                sys.exit()
91 92
        if cls._pdb:
            cls._pdb = False
93 94 95 96 97 98
            try:
                sys.stdout.write(threading.currentThread().node_name)
            except AttributeError:
                pass
            pdb(1)

99 100
    @classmethod
    def tic(cls, lock=None):
101 102
        # switch to another thread
        # (the following calls are not supposed to be debugged into)
103
        cls.release(lock); cls.acquire(lock)
104

105 106
    @classmethod
    def background(cls):
107
        with cls._lock_lock:
108 109
            if cls._lock_list:
                cls._lock_list.popleft().release()
110

111
class SerializedEventManager(EventManager):
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

    _lock = None
    _timeout = 0

    @classmethod
    def decorate(cls, func):
        def decorator(*args, **kw):
            try:
                EventManager.__init__ = types.MethodType(
                    cls.__init__.im_func, None, EventManager)
                return func(*args, **kw)
            finally:
                EventManager.__init__ = types.MethodType(
                    cls._super__init__.im_func, None, EventManager)
        return wraps(func)(decorator)

    _super__init__ = EventManager.__init__.im_func

    def __init__(self):
        cls = self.__class__
        assert cls is EventManager
        self.__class__ = SerializedEventManager
        self._super__init__()

    def _poll(self, timeout=1):
        if self._pending_processing:
138
            assert timeout <= 0
139 140 141 142
        elif 0 == self._timeout == timeout == Serialized.pending == len(
            self.writer_set):
            return
        else:
143
            if self.writer_set and Serialized.pending == 0:
144 145 146 147 148 149 150
                Serialized.pending = 1
            # Jump to another thread before polling, so that when a message is
            # sent on the network, one can debug immediately the receiving part.
            # XXX: Unfortunately, this means we have a useless full-cycle
            #      before the first message is sent.
            # TODO: Detect where a message is sent to jump immediately to nodes
            #       that will do something.
151
            Serialized.tic(self._lock)
152 153
            if timeout != 0:
                timeout = self._timeout
154
                if timeout != 0 and Serialized.pending == 1:
155 156 157 158
                    Serialized.pending = timeout = 0
        EventManager._poll(self, timeout)


159 160 161 162 163 164 165 166 167 168 169 170 171
class Node(object):

    def filterConnection(self, *peers):
        addr = lambda c: c and (c.accepted_from or c.getAddress())
        addr_set = set(addr(c.connector) for peer in peers
            for c in peer.em.connection_dict.itervalues()
            if isinstance(c, Connection))
        addr_set.discard(None)
        conn_list = (c for c in self.em.connection_dict.itervalues()
            if isinstance(c, Connection) and addr(c.connector) in addr_set)
        return ConnectionFilter(*conn_list)

class ServerNode(Node):
172

173 174
    _server_class_dict = {}

175 176 177
    class __metaclass__(type):
        def __init__(cls, name, bases, d):
            type.__init__(cls, name, bases, d)
178
            if Node not in bases and threading.Thread not in cls.__mro__:
179
                cls.__bases__ = bases + (threading.Thread,)
180 181 182 183 184 185
                cls.node_type = getattr(NodeTypes, name[:-11].upper())
                cls._node_list = []
                cls._virtual_ip = socket.inet_ntop(ADDRESS_TYPE,
                    LOCAL_IP[:-1] + chr(2 + len(cls._server_class_dict)))
                cls._server_class_dict[cls._virtual_ip] = cls

186 187 188 189 190
    @staticmethod
    def resetPorts():
        for cls in ServerNode._server_class_dict.itervalues():
            del cls._node_list[:]

191 192 193 194 195 196 197 198 199 200 201 202 203
    @classmethod
    def newAddress(cls):
        address = cls._virtual_ip, len(cls._node_list)
        cls._node_list.append(None)
        return address

    @classmethod
    def resolv(cls, address):
        try:
            cls = cls._server_class_dict[address[0]]
        except KeyError:
            return address
        return cls._node_list[address[1]].getListeningAddress()
204 205

    @SerializedEventManager.decorate
206
    def __init__(self, cluster=None, address=None, **kw):
207 208
        if not address:
            address = self.newAddress()
209 210 211 212 213 214
        if cluster is None:
            master_nodes = kw['master_nodes']
            name = kw['name']
        else:
            master_nodes = kw.get('master_nodes', cluster.master_nodes)
            name = kw.get('name', cluster.name)
215 216
        port = address[1]
        self._node_list[port] = weakref.proxy(self)
217 218 219
        self._init_args = init_args = kw.copy()
        init_args['cluster'] = cluster
        init_args['address'] = address
220
        threading.Thread.__init__(self)
221
        self.daemon = True
222
        self.node_name = '%s_%u' % (self.node_type, port)
223 224
        kw.update(getCluster=name, getBind=address,
                  getMasters=parseMasterList(master_nodes, address))
225 226
        super(ServerNode, self).__init__(Mock(kw))

227
    def getVirtualAddress(self):
228
        return self._init_args['address']
229

230 231
    def resetNode(self):
        assert not self.isAlive()
232
        kw = self._init_args
233 234
        kw['getUUID'] = self.uuid
        self.__dict__.clear()
235
        self.__init__(**kw)
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258

    def start(self):
        Serialized.pending = 1
        self.em._lock = l = threading.Lock()
        l.acquire()
        Serialized.release(l, wake_other=0)
        threading.Thread.start(self)

    def run(self):
        try:
            Serialized.acquire(self.em._lock)
            super(ServerNode, self).run()
        finally:
            self._afterRun()
            neo.lib.logging.debug('stopping %r', self)
            Serialized.background()

    def _afterRun(self):
        try:
            self.listening_conn.close()
        except AttributeError:
            pass

259 260 261 262 263 264 265
    def stop(self):
        try:
            Serialized.release(stop=(self,))
            self.join()
        finally:
            Serialized.acquire()

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
    def getListeningAddress(self):
        try:
            return self.listening_conn.getAddress()
        except AttributeError:
            raise ConnectorConnectionRefusedException

class AdminApplication(ServerNode, neo.admin.app.Application):
    pass

class MasterApplication(ServerNode, neo.master.app.Application):
    pass

class StorageApplication(ServerNode, neo.storage.app.Application):

    def resetNode(self, clear_database=False):
281
        self._init_args['getReset'] = clear_database
282 283 284 285 286 287 288 289 290 291 292 293 294
        dm = self.dm
        super(StorageApplication, self).resetNode()
        if dm and not clear_database:
            self.dm = dm

    def _afterRun(self):
        super(StorageApplication, self)._afterRun()
        try:
            self.dm.close()
            self.dm = None
        except StandardError: # AttributeError & ProgrammingError
            pass

295
    def switchTables(self):
296
        adapter = self._init_args['getAdapter']
297 298 299 300
        dm = self.dm
        if adapter == 'BTree':
            dm._obj, dm._tobj = dm._tobj, dm._obj
            dm._trans, dm._ttrans = dm._ttrans, dm._trans
301 302 303 304
            uncommitted_data = dm._uncommitted_data
            for checksum, (_, _, index) in dm._data.iteritems():
                uncommitted_data[checksum] = len(index)
                index.clear()
305 306 307 308 309 310 311 312 313 314 315
        elif adapter == 'MySQL':
            q = dm.query
            dm.begin()
            for table in ('trans', 'obj'):
                q('RENAME TABLE %s to tmp' % table)
                q('RENAME TABLE t%s to %s' % (table, table))
                q('RENAME TABLE tmp to t%s' % table)
            dm.commit()
        else:
            assert False

316
    def getDataLockInfo(self):
317
        adapter = self._init_args['getAdapter']
318 319
        dm = self.dm
        if adapter == 'BTree':
320
            checksum_dict = dict((x, x) for x in dm._data)
321
        elif adapter == 'MySQL':
322
            checksum_dict = dict(dm.query("SELECT id, hash FROM data"))
323 324
        else:
            assert False
325 326 327
        assert set(dm._uncommitted_data).issubset(checksum_dict)
        get = dm._uncommitted_data.get
        return dict((v, get(k, 0)) for k, v in checksum_dict.iteritems())
328

329
class ClientApplication(Node, neo.client.app.Application):
330 331

    @SerializedEventManager.decorate
332 333
    def __init__(self, master_nodes, name):
        super(ClientApplication, self).__init__(master_nodes, name)
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
        self.em._lock = threading.Lock()

    def setPoll(self, master=False):
        if master:
            self.em._timeout = 1
            if not self.em._lock.acquire(0):
                Serialized.background()
        else:
            Serialized.release(wake_other=0); Serialized.acquire()
            self.em._timeout = 0

    def __del__(self):
        try:
            super(ClientApplication, self).__del__()
        finally:
349 350
            if self.poll_thread.isAlive():
                Serialized.background()
351 352
    close = __del__

353 354 355 356 357 358 359 360 361 362 363
    def filterConnection(self, *peers):
        conn_list = []
        for peer in peers:
            if isinstance(peer, MasterApplication):
                conn = self._getMasterConnection()
            else:
                assert isinstance(peer, StorageApplication)
                conn = self.cp.getConnForNode(self.nm.getByUUID(peer.uuid))
            conn_list.append(conn)
        return ConnectionFilter(*conn_list)

364 365 366
class NeoCTL(neo.neoctl.app.NeoCTL):

    @SerializedEventManager.decorate
367 368
    def __init__(self, *args, **kw):
        super(NeoCTL, self).__init__(*args, **kw)
369
        self.em._timeout = -1
370 371


372
class LoggerThreadName(str):
373

374 375
    def __new__(cls, default='TEST'):
        return str.__new__(cls, default)
376

377
    def __getattribute__(self, attr):
378 379
        return getattr(str(self), attr)

380 381 382
    def __hash__(self):
        return id(self)

383 384 385 386
    def __str__(self):
        try:
            return threading.currentThread().node_name
        except AttributeError:
387
            return str.__str__(self)
388

389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435

class Patch(object):

    def __init__(self, patched, **patch):
        (name, patch), = patch.iteritems()
        wrapped = getattr(patched, name)
        wrapper = lambda *args, **kw: patch(wrapped, *args, **kw)
        orig = patched.__dict__.get(name)
        setattr(patched, name, wraps(wrapped)(wrapper))
        if orig is None:
            self._revert = lambda: delattr(patched, name)
        else:
            self._revert = lambda: setattr(patched, name, orig)

    def __del__(self):
        self._revert()


class ConnectionFilter(object):

    def __init__(self, *conns):
        self.filter_dict = {}
        self.lock = threading.Lock()
        self.conn_list = [(conn, self._patch(conn)) for conn in conns]

    def _patch(self, conn):
        assert '_addPacket' not in conn.__dict__
        lock = self.lock
        filter_dict = self.filter_dict
        orig = conn.__class__._addPacket
        queue = deque()
        def _addPacket(packet):
            lock.acquire()
            try:
                if not queue:
                    for filter in filter_dict:
                        if filter(conn, packet):
                            break
                    else:
                        return orig(conn, packet)
                queue.append(packet)
            finally:
                lock.release()
        conn._addPacket = _addPacket
        return queue

    def __call__(self, revert=1):
436
        with self.lock:
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
            self.filter_dict.clear()
            self._retry()
            if revert:
                for conn, queue in self.conn_list:
                    assert not queue
                    del conn._addPacket
                del self.conn_list[:]

    def _retry(self):
        for conn, queue in self.conn_list:
            while queue:
                packet = queue.popleft()
                for filter in self.filter_dict:
                    if filter(conn, packet):
                        queue.appendleft(packet)
                        break
                else:
                    conn.__class__._addPacket(conn, packet)
                    continue
                break

    def clear(self):
        self(0)

    def add(self, filter, *patches):
462
        with self.lock:
463 464 465
            self.filter_dict[filter] = patches

    def remove(self, *filters):
466
        with self.lock:
467 468 469 470 471 472 473
            for filter in filters:
                del self.filter_dict[filter]
            self._retry()

    def __contains__(self, filter):
        return filter in self.filter_dict

474 475 476 477 478 479 480
class NEOCluster(object):

    BaseConnection_checkTimeout = staticmethod(BaseConnection.checkTimeout)
    SocketConnector_makeClientConnection = staticmethod(
        SocketConnector.makeClientConnection)
    SocketConnector_makeListeningConnection = staticmethod(
        SocketConnector.makeListeningConnection)
481
    SocketConnector_receive = staticmethod(SocketConnector.receive)
482 483
    SocketConnector_send = staticmethod(SocketConnector.send)
    Storage__init__ = staticmethod(Storage.__init__)
484 485
    _patch_count = 0
    _resource_dict = weakref.WeakValueDictionary()
486

487 488 489 490 491 492
    def _allocate(self, resource, new):
        result = resource, new()
        while result in self._resource_dict:
            result = resource, new()
        self._resource_dict[result] = self
        return result[1]
493

494 495 496
    @staticmethod
    def _patch():
        cls = NEOCluster
497 498 499
        cls._patch_count += 1
        if cls._patch_count > 1:
            return
500
        def makeClientConnection(self, addr):
501
            real_addr = ServerNode.resolv(addr)
502 503 504 505 506 507
            try:
                return cls.SocketConnector_makeClientConnection(self, real_addr)
            finally:
                self.remote_addr = addr
        def send(self, msg):
            result = cls.SocketConnector_send(self, msg)
508
            if type(Serialized.pending) is not frozenset:
509
                Serialized.pending = 1
510
            return result
511 512 513 514 515 516 517 518 519 520 521 522 523 524
        def receive(self):
            # If the peer sent an entire packet, make sure we read it entirely,
            # otherwise Serialize.pending would be reset to 0.
            data = ''
            try:
                while True:
                    d = cls.SocketConnector_receive(self)
                    if not d:
                        return data
                    data += d
            except ConnectorTryAgainException:
                if data:
                    return data
                raise
525 526 527 528 529 530 531
        # TODO: 'sleep' should 'tic' in a smart way, so that storages can be
        #       safely started even if the cluster isn't.
        bootstrap.sleep = lambda seconds: None
        BaseConnection.checkTimeout = lambda self, t: None
        SocketConnector.makeClientConnection = makeClientConnection
        SocketConnector.makeListeningConnection = lambda self, addr: \
            cls.SocketConnector_makeListeningConnection(self, BIND)
532
        SocketConnector.receive = receive
533 534
        SocketConnector.send = send
        Storage.setupLog = lambda *args, **kw: None
535
        Serialized.init()
536

537 538 539
    @staticmethod
    def _unpatch():
        cls = NEOCluster
540 541 542 543
        assert cls._patch_count > 0
        cls._patch_count -= 1
        if cls._patch_count:
            return
544 545 546 547 548 549
        bootstrap.sleep = time.sleep
        BaseConnection.checkTimeout = cls.BaseConnection_checkTimeout
        SocketConnector.makeClientConnection = \
            cls.SocketConnector_makeClientConnection
        SocketConnector.makeListeningConnection = \
            cls.SocketConnector_makeListeningConnection
550
        SocketConnector.receive = cls.SocketConnector_receive
551 552 553 554 555
        SocketConnector.send = cls.SocketConnector_send
        Storage.setupLog = setupLog

    def __init__(self, master_count=1, partitions=1, replicas=0,
                       adapter=os.getenv('NEO_TESTS_ADAPTER', 'BTree'),
556
                       storage_count=None, db_list=None, clear_databases=True,
557
                       db_user=DB_USER, db_password='', verbose=None):
558 559 560 561 562 563 564
        if verbose is not None:
            temp_dir = os.getenv('TEMP') or \
                os.path.join(tempfile.gettempdir(), 'neo_tests')
            os.path.exists(temp_dir) or os.makedirs(temp_dir)
            log_file = tempfile.mkstemp('.log', '', temp_dir)[1]
            print 'Logging to %r' % log_file
            setupLog(LoggerThreadName(), log_file, verbose)
565 566 567 568 569
        self.name = 'neo_%s' % self._allocate('name',
            lambda: random.randint(0, 100))
        master_list = [MasterApplication.newAddress()
                       for _ in xrange(master_count)]
        self.master_nodes = ' '.join('%s:%s' % x for x in master_list)
570 571 572
        weak_self = weakref.proxy(self)
        kw = dict(cluster=weak_self, getReplicas=replicas, getAdapter=adapter,
                  getPartitions=partitions, getReset=clear_databases)
573 574
        self.master_list = [MasterApplication(address=x, **kw)
                            for x in master_list]
575 576 577
        if db_list is None:
            if storage_count is None:
                storage_count = replicas + 1
578 579 580
            index = count().next
            db_list = ['%s%u' % (DB_PREFIX, self._allocate('db', index))
                       for _ in xrange(storage_count)]
581 582 583 584 585 586 587
        if adapter == 'MySQL':
            setupMySQLdb(db_list, db_user, db_password, clear_databases)
            db = '%s:%s@%%s' % (db_user, db_password)
        elif adapter == 'BTree':
            db = '%s'
        else:
            assert False, adapter
588 589 590
        self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
                             for x in db_list]
        self.admin_list = [AdminApplication(**kw)]
591 592 593
        self.client = ClientApplication(name=self.name,
            master_nodes=self.master_nodes)
        self.neoctl = NeoCTL(self.admin.getVirtualAddress())
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610

    # A few shortcuts that work when there's only 1 master/storage/admin
    @property
    def master(self):
        master, = self.master_list
        return master
    @property
    def storage(self):
        storage, = self.storage_list
        return storage
    @property
    def admin(self):
        admin, = self.admin_list
        return admin
    ###

    def reset(self, clear_database=False):
611
        for node_type in 'master', 'storage', 'admin':
612 613 614 615 616
            kw = {}
            if node_type == 'storage':
                kw['clear_database'] = clear_database
            for node in getattr(self, node_type + '_list'):
                node.resetNode(**kw)
617 618 619
        self.client = ClientApplication(name=self.name,
            master_nodes=self.master_nodes)
        self.neoctl = NeoCTL(self.admin.getVirtualAddress())
620

621
    def start(self, storage_list=None, fast_startup=False):
622
        self._patch()
623 624 625 626 627
        for node_type in 'master', 'admin':
            for node in getattr(self, node_type + '_list'):
                node.start()
        self.tic()
        if fast_startup:
628
            self._startCluster()
629 630 631 632 633 634
        if storage_list is None:
            storage_list = self.storage_list
        for node in storage_list:
            node.start()
        self.tic()
        if not fast_startup:
635
            self._startCluster()
636
            self.tic()
637 638
        state = self.neoctl.getClusterState()
        assert state == ClusterStates.RUNNING, state
639 640
        self.enableStorageList(storage_list)

641 642 643 644 645 646 647 648 649 650 651
    def _startCluster(self):
        try:
            self.neoctl.startCluster()
        except RuntimeError:
            self.tic()
            if self.neoctl.getClusterState() not in (
                      ClusterStates.RUNNING,
                      ClusterStates.VERIFYING,
                  ):
                raise

652 653 654 655 656 657
    def enableStorageList(self, storage_list):
        self.neoctl.enableStorageList([x.uuid for x in storage_list])
        self.tic()
        for node in storage_list:
            assert self.getNodeState(node) == NodeStates.RUNNING

658 659 660 661 662 663 664
    @property
    def db(self):
        try:
            return self._db
        except AttributeError:
            self._db = db = ZODB.DB(storage=self.getZODBStorage())
            return db
665 666

    def stop(self):
667 668
        if hasattr(self, '_db') and self.client.em._timeout == 0:
            self.client.setPoll(True)
669
        self.__dict__.pop('_db', self.client).close()
670 671
        #self.neoctl.setClusterState(ClusterStates.STOPPING) # TODO
        try:
672 673 674
            Serialized.release(stop=
                self.admin_list + self.storage_list + self.master_list)
            for node_type in 'admin', 'storage', 'master':
675 676 677 678 679
                for node in getattr(self, node_type + '_list'):
                    if node.isAlive():
                        node.join()
        finally:
            Serialized.acquire()
680
        self._unpatch()
681

682 683 684
    @staticmethod
    def tic(force=False):
        # XXX: Should we automatically switch client in slave mode if it isn't ?
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701
        if force:
            Serialized.tic()
        while Serialized.pending:
            Serialized.tic()

    def getNodeState(self, node):
        uuid = node.uuid
        for node in self.neoctl.getNodeList(node.node_type):
            if node[2] == uuid:
                return node[3]

    def getOudatedCells(self):
        return [cell for row in self.neoctl.getPartitionRowList()[1]
                     for cell in row[1]
                     if cell[1] == CellStates.OUT_OF_DATE]

    def getZODBStorage(self, **kw):
702 703 704
        # automatically put client in master mode
        if self.client.em._timeout == 0:
            self.client.setPoll(True)
705 706
        return Storage.Storage(None, self.name, _app=self.client, **kw)

707
    def populate(self, dummy_zodb=None, random=random):
708 709
        if dummy_zodb is None:
            from ..stat_zodb import PROD1
710
            dummy_zodb = PROD1(random)
711 712
        preindex = {}
        as_storage = dummy_zodb.as_storage
713 714
        return lambda count: self.getZODBStorage().importFrom(
            as_storage(count), preindex=preindex)
715

716 717
    def getTransaction(self):
        txn = transaction.TransactionManager()
718
        return txn, self.db.open(transaction_manager=txn)
719

720 721 722 723 724 725 726 727 728 729
    def __del__(self, __print_exc=traceback.print_exc):
        try:
            self.neoctl.close()
            for node_type in 'admin', 'storage', 'master':
                for node in getattr(self, node_type + '_list'):
                    node.close()
            self.client.em.close()
        except:
            __print_exc()
            raise
730

731 732 733 734
    def extraCellSortKey(self, key):
        return Patch(self.client.cp, _getCellSortKey=lambda orig, *args:
            (orig(*args), key(*args)))

735

736
class NEOThreadedTest(NeoTestBase):
737 738 739 740

    def setupLog(self):
        log_file = os.path.join(getTempDirectory(), self.id() + '.log')
        setupLog(LoggerThreadName(), log_file, True)
741

742 743 744 745
    def tearDown(self):
        super(NEOThreadedTest, self).tearDown()
        ServerNode.resetPorts()

746 747 748 749 750 751 752 753 754 755
    def getUnpickler(self, conn):
        reader = conn._reader
        def unpickler(data, compression=False):
            if compression:
                data = decompress(data)
            obj = reader.getGhost(data)
            reader.setGhostState(obj, data)
            return obj
        return unpickler

756 757 758 759 760
    class newThread(threading.Thread):

        def __init__(self, func, *args, **kw):
            threading.Thread.__init__(self)
            self.__target = func, args, kw
761
            self.daemon = True
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
            self.start()

        def run(self):
            try:
                apply(*self.__target)
                self.__exc_info = None
            except:
                self.__exc_info = sys.exc_info()

        def join(self, timeout=None):
            threading.Thread.join(self, timeout)
            if not self.isAlive() and self.__exc_info:
                etype, value, tb = self.__exc_info
                del self.__exc_info
                raise etype, value, tb