runner 11.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#! /usr/bin/env python
#
# Copyright (C) 2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import unittest
20
import tempfile
21 22
import logging
import time
23
import os
24

25 26
# list of test modules
# each of them have to import its TestCase classes
27
UNIT_TEST_MODULES = [ 
28 29 30 31 32 33 34
    # generic parts
    'neo.tests.testBootstrap',
    'neo.tests.testConnection',
    'neo.tests.testEvent',
    'neo.tests.testHandler',
    'neo.tests.testNodes',
    'neo.tests.testProtocol',
35
    'neo.tests.testDispatcher',
36
    'neo.tests.testUtil',
37 38 39 40 41 42 43 44 45
    'neo.tests.testPT',
    # master application
    'neo.tests.master.testClientHandler',
    'neo.tests.master.testElectionHandler',
    'neo.tests.master.testMasterApp',
    'neo.tests.master.testMasterPT',
    'neo.tests.master.testRecoveryHandler',
    'neo.tests.master.testStorageHandler',
    'neo.tests.master.testVerificationHandler',
46
    'neo.tests.master.testTransactions',
47 48 49 50 51 52 53 54 55 56 57 58
    # storage application
    'neo.tests.storage.testClientHandler',
    'neo.tests.storage.testInitializationHandler',
    'neo.tests.storage.testMasterHandler',
    'neo.tests.storage.testStorageApp',
    'neo.tests.storage.testStorageHandler',
    'neo.tests.storage.testStorageMySQLdb',
    'neo.tests.storage.testVerificationHandler',
    # client application
    'neo.tests.client.testClientApp',
    'neo.tests.client.testClientHandler',
    'neo.tests.client.testConnectionPool',
59 60
]

61
FUNC_TEST_MODULES = [
62 63
    ('neo.tests.functional.testZODB', 'check'),
    'neo.tests.functional.testMaster',
64
    'neo.tests.functional.testCluster',
65 66
    'neo.tests.functional.testStorage',
    'neo.tests.functional.testImportExport',
67 68
]

69
# configuration 
70
UNIT_TESTS = True
71
FUNCTIONAL_TESTS = True
72
SEND_REPORT = False
73 74 75
CONSOLE_LOG = False
ATTACH_LOG = False # for ZODB test, only the client side is logged
LOG_FILE = 'neo.log' 
76
SENDER = 'gregory@nexedi.com'
77
RECIPIENTS = ['gregory@nexedi.com'] #['neo-report@erp5.org']
78 79 80 81 82 83 84 85 86 87
SMTP_SERVER = ( "mail.nexedi.com", "25")

# override logging configuration to send all messages to a file
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.FileHandler(LOG_FILE, 'w+')
format='[%(module)12s:%(levelname)s:%(lineno)3d] %(message)s'
formatter = logging.Formatter(format)
handler.setFormatter(formatter)
logger.addHandler(handler)
88
# enabled console logging if desired
89 90 91 92 93
if CONSOLE_LOG:
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)

94
class NeoTestRunner(unittest.TestResult):
95 96
    """ Custom result class to build report with statistics per module """

97 98 99
    def __init__(self):
        unittest.TestResult.__init__(self)
        self.modulesStats = {}
100
        self.failedImports = {}
101
        self.lastStart = None
102 103 104
        self.temp_directory = tempfile.mkdtemp(prefix='neo_')
        os.environ['TEMP'] = self.temp_directory
        print "Base directory : %s" % (self.temp_directory, )
105

106
    def run(self, name, modules):
107
        print '\n', name
108 109 110
        suite = unittest.TestSuite()
        loader = unittest.defaultTestLoader
        for test_module in modules:
111 112 113 114 115 116
            # load prefix if supplied
            if isinstance(test_module, tuple):
                test_module, prefix = test_module
                loader.testMethodPrefix = prefix
            else:
                loader.testMethodPrefix = 'test'
117 118 119
            try:
                test_module = __import__(test_module, globals(), locals(), ['*'])
            except ImportError, err:
120
                self.failedImports[test_module] = err
121 122 123 124 125
                print "Import of %s failed : %s" % (test_module, err)
                continue
            suite.addTests(loader.loadTestsFromModule(test_module))
        suite.run(self)

126
    class ModuleStats(object):
127
        run = 0
128 129 130 131 132 133
        errors = 0
        success = 0
        failures = 0
        time = 0.0

    def _getModuleStats(self, test):
134 135
        module = test.__class__.__module__
        module = tuple(module.split('.'))
136 137 138 139 140 141 142 143 144 145
        try:
            return self.modulesStats[module] 
        except KeyError:
            self.modulesStats[module] = self.ModuleStats()
            return self.modulesStats[module]

    def _updateTimer(self, stats):
        stats.time += time.time() - self.lastStart

    def startTest(self, test):
146
        print ' *', test.__class__.__module__, test._TestCase__testMethodName
147 148 149 150
        unittest.TestResult.startTest(self, test)
        module = test.__class__.__name__
        method = test._TestCase__testMethodName
        logging.info(" * TEST %s" % test)
151 152
        stats = self._getModuleStats(test)
        stats.run += 1
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
        self.lastStart = time.time()

    def addSuccess(self, test):
        unittest.TestResult.addSuccess(self, test)
        stats = self._getModuleStats(test)
        stats.success += 1
        self._updateTimer(stats)

    def addError(self, test, err):
        unittest.TestResult.addError(self, test, err)
        stats = self._getModuleStats(test)
        stats.errors += 1
        self._updateTimer(stats)

    def addFailure(self, test, err):
        unittest.TestResult.addFailure(self, test, err)
        stats = self._getModuleStats(test)
        stats.failures += 1
        self._updateTimer(stats)

173 174 175
    def _buildSystemInfo(self):
        import platform
        import datetime
176
        success = self.testsRun - len(self.errors) - len(self.failures)
177 178 179 180 181
        s = """
    Date        : %s
    Node        : %s
    Processor   : %s (%s)
    System      : %s (%s)
182
    Directory   : %s
183
    Status      : %7.3f%%
184 185 186 187 188 189 190
        """ % (
            datetime.date.today().isoformat(),
            platform.node(),
            platform.processor(),
            platform.architecture()[0],
            platform.system(),
            platform.release(),
191
            self.temp_directory,
192
            success * 100.0 / self.testsRun,
193 194 195
        )
        return s

196
    def _buildSummary(self):
197 198 199 200 201 202
        # visual 
        header       = "%25s |   run   | success |  errors |  fails  |   time   \n" % 'Test Module'
        separator    = "%25s-+---------+---------+---------+---------+----------\n" % ('-' * 25)
        format       = "%25s |   %3s   |   %3s   |   %3s   |   %3s   | %6.2fs   \n"
        group_f      = "%25s |         |         |         |         |          \n" 
        # header
203
        s = ' ' * 30 + ' NEO TESTS REPORT'
204
        s += '\n\n'
205
        s += self._buildSystemInfo()
206 207 208 209 210 211 212 213 214 215 216 217
        s += '\n' + header + separator
        group = None
        t_success = 0
        # for each test case
        for k, v in sorted(self.modulesStats.items()):
            # display group below its content
            _group = '.'.join(k[:-1])
            if group is None:
                group = _group
            if _group != group:
                s += separator + group_f % group + separator
                group = _group
218 219 220 221 222 223 224
            # test case stats
            t_success += v.success
            run, success = v.run or '.', v.success or '.'
            errors, failures = v.errors or '.', v.failures or '.'
            name = k[-1].lstrip('test')
            args = (name, run, success, errors, failures, v.time)
            s += format % args
225 226 227
        # the last group
        s += separator  + group_f % group + separator
        # the final summary
228
        errors, failures = len(self.errors) or '.', len(self.failures) or '.'
229 230
        args = ("Summary", self.testsRun, t_success, errors, failures, self.time)
        s += format % args + separator + '\n'
231 232 233 234
        return s

    def _buildErrors(self):
        s = '\n'
235 236 237 238 239
        test_formatter = lambda t: '%s.py %s.%s' % (
            t.__class__.__module__.replace('.', '/'),
            t.__class__.__name__,
            t._TestCase__testMethodName,
        )
240 241 242
        if len(self.errors):
            s += '\nERRORS:\n'
            for test, trace in self.errors:
243
                s += "%s\n" % test_formatter(test)
244
                s += "-------------------------------------------------------------\n"
245
                s += trace
246
                s += "-------------------------------------------------------------\n"
247 248 249 250
                s += '\n'
        if len(self.failures):
            s += '\nFAILURES:\n'
            for test, trace in self.failures:
251
                s += "%s\n" % test_formatter(test)
252
                s += "-------------------------------------------------------------\n"
253
                s += trace
254
                s += "-------------------------------------------------------------\n"
255 256 257
                s += '\n'
        return s

258 259 260 261 262 263 264 265 266
    def _buildWarnings(self):
        s = '\n'
        if self.failedImports:
            s += 'Failed imports :\n'
            for module, err in self.failedImports.items():
                s += '%s:\n%s' % (module, err)
        s += '\n'
        return s

267 268 269 270 271 272
    def build(self):
        self.time = sum([s.time for s in self.modulesStats.values()])
        args = (self.testsRun, len(self.errors), len(self.failures))
        self.subject = "Neo : %s Tests, %s Errors, %s Failures" % args
        self.summary = self._buildSummary()
        self.errors = self._buildErrors()
273
        self.warnings = self._buildWarnings()
274

275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
    def sendReport(self):
        """ Send a mail with the report summary """

        import smtplib
        from email.MIMEMultipart import MIMEMultipart
        from email.MIMEText import MIMEText

        # build the email
        msg = MIMEMultipart()
        msg['Subject'] = self.subject
        msg['From']    = SENDER
        msg['To']      = ', '.join(RECIPIENTS)
        #msg.preamble = self.subject
        msg.epilogue = ''

        # Add custom headers for client side filtering
        msg['X-ERP5-Tests'] = 'NEO'
        if self.wasSuccessful():
          msg['X-ERP5-Tests-Status'] = 'OK'

        # write the body
296
        body = MIMEText(self.summary + self.warnings + self.errors)
297 298 299 300 301 302 303 304 305 306 307 308 309
        msg.attach(body)

        # attach the log file
        if ATTACH_LOG:
            log = MIMEText(file(LOG_FILE, 'r').read())
            log.add_header('Content-Disposition', 'attachment', filename=LOG_FILE)
            msg.attach(log)

        # Send the email via our own SMTP server.
        s = smtplib.SMTP()
        s.connect(*SMTP_SERVER)
        mail = msg.as_string()
        for recipient in RECIPIENTS:
310 311 312 313
            try:
                s.sendmail(SENDER, recipient, mail)
            except smtplib.SMTPRecipientsRefused:
                print "Mail for %s fails" % recipient
314
        s.close()
315 316

if __name__ == "__main__":
317 318 319

    if not UNIT_TESTS and not FUNCTIONAL_TESTS:
        raise RuntimeError('Nothing to run')
320
    
321
    # run and build the report
322
    runner = NeoTestRunner()
323 324
    if UNIT_TESTS:
        runner.run('Unit tests', UNIT_TEST_MODULES)
325
    if FUNCTIONAL_TESTS:
326
        runner.run('Functional tests', FUNC_TEST_MODULES)
327 328
    runner.build()
    print runner.errors
329
    print runner.warnings
330
    print runner.summary
331 332 333
    # send a mail
    if SEND_REPORT:
        runner.sendReport()