CommitLockTests.py 7.35 KB
Newer Older
Jeremy Hylton's avatar
Jeremy Hylton committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the distributed commit lock."""

16
import threading
17 18
import time

Jeremy Hylton's avatar
Jeremy Hylton committed
19
from ZODB.Transaction import Transaction
20
from ZODB.TimeStamp import TimeStamp
Jeremy Hylton's avatar
Jeremy Hylton committed
21 22 23
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO

import ZEO.ClientStorage
24
from ZEO.Exceptions import ClientDisconnected
25
from ZEO.tests.TestThread import TestThread
Jeremy Hylton's avatar
Jeremy Hylton committed
26 27 28 29 30 31 32

ZERO = '\0'*8

class DummyDB:
    def invalidate(self, *args):
        pass

33
class WorkerThread(TestThread):
Jeremy Hylton's avatar
Jeremy Hylton committed
34 35 36 37

    # run the entire test in a thread so that the blocking call for
    # tpc_vote() doesn't hang the test suite.

38
    def __init__(self, testcase, storage, trans, method="tpc_finish"):
Jeremy Hylton's avatar
Jeremy Hylton committed
39 40 41
        self.storage = storage
        self.trans = trans
        self.method = method
42
        self.ready = threading.Event()
43
        TestThread.__init__(self, testcase)
Jeremy Hylton's avatar
Jeremy Hylton committed
44

45
    def testrun(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
46 47 48
        try:
            self.storage.tpc_begin(self.trans)
            oid = self.storage.new_oid()
49 50
            p = zodb_pickle(MinPO("c"))
            self.storage.store(oid, ZERO, p, '', self.trans)
Jeremy Hylton's avatar
Jeremy Hylton committed
51
            oid = self.storage.new_oid()
52 53
            p = zodb_pickle(MinPO("c"))
            self.storage.store(oid, ZERO, p, '', self.trans)
54
            self.myvote()
Jeremy Hylton's avatar
Jeremy Hylton committed
55 56 57 58
            if self.method == "tpc_finish":
                self.storage.tpc_finish(self.trans)
            else:
                self.storage.tpc_abort(self.trans)
59
        except ClientDisconnected:
Jeremy Hylton's avatar
Jeremy Hylton committed
60 61
            pass

62 63 64 65 66 67
    def myvote(self):
        # The vote() call is synchronous, which makes it difficult to
        # coordinate the action of multiple threads that all call
        # vote().  This method sends the vote call, then sets the
        # event saying vote was called, then waits for the vote
        # response.  It digs deep into the implementation of the client.
Jeremy Hylton's avatar
Jeremy Hylton committed
68

69 70 71
        # This method is a replacement for:
        #     self.ready.set()
        #     self.storage.tpc_vote(self.trans)
Jeremy Hylton's avatar
Jeremy Hylton committed
72

73 74 75 76 77
        rpc = self.storage._server.rpc
        msgid = rpc._deferred_call('vote', self.storage._serial)
        self.ready.set()
        rpc._deferred_wait(msgid)
        self.storage._check_serials()
Jeremy Hylton's avatar
Jeremy Hylton committed
78

79
class CommitLockTests:
Jeremy Hylton's avatar
Jeremy Hylton committed
80

81
    NUM_CLIENTS = 5
Jeremy Hylton's avatar
Jeremy Hylton committed
82

83 84 85 86
    # The commit lock tests verify that the storage successfully
    # blocks and restarts transactions when there is contention for a
    # single storage.  There are a lot of cases to cover.  transaction
    # has finished.
Jeremy Hylton's avatar
Jeremy Hylton committed
87

88 89 90 91 92
    # The general flow of these tests is to start a transaction by
    # getting far enough into 2PC to acquire the commit lock.  Then
    # begin one or more other connections that also want to commit.
    # This causes the commit lock code to be exercised.  Once the
    # other connections are started, the first transaction completes.
Jeremy Hylton's avatar
Jeremy Hylton committed
93 94 95 96 97 98 99

    def _cleanup(self):
        for store, trans in self._storages:
            store.tpc_abort(trans)
            store.close()
        self._storages = []

100 101 102
    def _start_txn(self):
        txn = Transaction()
        self._storage.tpc_begin(txn)
103
        oid = self._storage.new_oid()
104 105
        self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', txn)
        return oid, txn
Jeremy Hylton's avatar
Jeremy Hylton committed
106

107 108 109 110 111
    def checkCommitLockVoteFinish(self):
        oid, txn = self._start_txn()
        self._storage.tpc_vote(txn)

        self._begin_threads()
Jeremy Hylton's avatar
Jeremy Hylton committed
112

113 114
        self._storage.tpc_finish(txn)
        self._storage.load(oid, '')
Jeremy Hylton's avatar
Jeremy Hylton committed
115

116
        self._finish_threads()
Jeremy Hylton's avatar
Jeremy Hylton committed
117 118

        self._dostore()
119 120 121 122 123
        self._cleanup()
        
    def checkCommitLockVoteAbort(self):
        oid, txn = self._start_txn()
        self._storage.tpc_vote(txn)
Jeremy Hylton's avatar
Jeremy Hylton committed
124

125
        self._begin_threads()
Jeremy Hylton's avatar
Jeremy Hylton committed
126

127 128 129 130 131 132 133 134 135 136 137 138
        self._storage.tpc_abort(txn)

        self._finish_threads()

        self._dostore()
        self._cleanup()
        
    def checkCommitLockVoteClose(self):
        oid, txn = self._start_txn()
        self._storage.tpc_vote(txn)

        self._begin_threads()
Jeremy Hylton's avatar
Jeremy Hylton committed
139

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
        self._storage.close()

        self._finish_threads()
        self._cleanup()

    def _get_trans_id(self):
        self._dostore()
        L = self._storage.undoInfo()
        return L[0]['id']

    def _begin_undo(self, trans_id):
        rpc = self._storage._server.rpc
        return rpc._deferred_call('transactionalUndo', trans_id,
                                  self._storage._serial)

    def _finish_undo(self, msgid):
        return self._storage._server.rpc._deferred_wait(msgid)
        
    def checkCommitLockUndoFinish(self):
        trans_id = self._get_trans_id()
        oid, txn = self._start_txn()
        msgid = self._begin_undo(trans_id)

        self._begin_threads()

        self._finish_undo(msgid)
        self._storage.tpc_vote(txn)
        self._storage.tpc_finish(txn)
        self._storage.load(oid, '')

        self._finish_threads()

        self._dostore()
        self._cleanup()
        
    def checkCommitLockUndoAbort(self):
        trans_id = self._get_trans_id()
        oid, txn = self._start_txn()
        msgid = self._begin_undo(trans_id)

        self._begin_threads()

        self._finish_undo(msgid)
        self._storage.tpc_vote(txn)
        self._storage.tpc_abort(txn)

        self._finish_threads()

        self._dostore()
        self._cleanup()
        
    def checkCommitLockUndoClose(self):
        trans_id = self._get_trans_id()
        oid, txn = self._start_txn()
        msgid = self._begin_undo(trans_id)

        self._begin_threads()

        self._finish_undo(msgid)
        self._storage.tpc_vote(txn)
        self._storage.close()

        self._finish_threads()

        self._cleanup()
        
    def _begin_threads(self):
        # Start a second transaction on a different connection without
        # blocking the test thread.
        self._storages = []
Jeremy Hylton's avatar
Jeremy Hylton committed
210
        self._threads = []
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
        
        for i in range(self.NUM_CLIENTS):
            storage = self._duplicate_client()
            txn = Transaction()
            tid = self._get_timestamp()
            
            t = WorkerThread(self, storage, txn)
            self._threads.append(t)
            t.start()
            t.ready.wait()
        
            # Close on the connections abnormally to test server response
            if i == 0:
                storage.close()
            else:
                self._storages.append((storage, txn))
Jeremy Hylton's avatar
Jeremy Hylton committed
227

228
    def _finish_threads(self):
Jeremy Hylton's avatar
Jeremy Hylton committed
229
        for t in self._threads:
230
            t.cleanup()
Jeremy Hylton's avatar
Jeremy Hylton committed
231 232 233 234 235 236 237

    def _duplicate_client(self):
        "Open another ClientStorage to the same server."
        # XXX argh it's hard to find the actual address
        # The rpc mgr addr attribute is a list.  Each element in the
        # list is a socket domain (AF_INET, AF_UNIX, etc.) and an
        # address.
238
        addr = self._storage._addr
Jeremy Hylton's avatar
Jeremy Hylton committed
239 240 241 242 243 244 245 246
        new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
        new.registerDB(DummyDB(), None)
        return new

    def _get_timestamp(self):
        t = time.time()
        t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
        return `t`