matrix 5.8 KB
Newer Older
1
#!/usr/bin/env python
2 3 4 5

import sys
import os
import math
6
import traceback
7 8
from time import time

9
from neo.tests.benchmark import BenchmarkRunner
10 11
from ZODB.FileStorage import FileStorage

12 13
class MatrixImportBenchmark(BenchmarkRunner):

14 15
    error_log = ''

16 17
    def add_options(self, parser):
        parser.add_option('-d', '--datafs')
Julien Muchembled's avatar
Julien Muchembled committed
18 19 20 21
        parser.add_option('', '--min-storages', type='int', default=1)
        parser.add_option('', '--max-storages', type='int', default=2)
        parser.add_option('', '--min-replicas', type='int', default=0)
        parser.add_option('', '--max-replicas', type='int', default=1)
22
        parser.add_option('', '--threaded', action="store_true")
23 24

    def load_options(self, options, args):
25
        if options.datafs and not os.path.exists(options.datafs):
26 27 28
            sys.exit('Missing or wrong data.fs argument')
        return dict(
            datafs = options.datafs,
Julien Muchembled's avatar
Julien Muchembled committed
29 30 31 32
            min_s = options.min_storages,
            max_s = options.max_storages,
            min_r = options.min_replicas,
            max_r = options.max_replicas,
33
            threaded = options.threaded,
34 35 36 37 38 39 40 41 42 43 44 45 46 47
        )

    def start(self):
        # build storage (logarithm) & replicas (linear) lists
        min_s, max_s = self._config.min_s, self._config.max_s
        min_r, max_r = self._config.min_r, self._config.max_r
        min_s2 = int(math.log(min_s, 2))
        max_s2 = int(math.log(max_s, 2))
        storages = [2 ** x for x in range(min_s2, max_s2 + 1)]
        if storages[0] < min_s:
            storages[0] = min_s
        if storages[-1] < max_s:
            storages.append(max_s)
        replicas = range(min_r, max_r + 1)
48 49 50 51
        if self._config.threaded:
            from neo.tests.threaded import NEOCluster
            NEOCluster.patch() # XXX ugly
        try:
52 53
            result_list = [self.runMatrix(storages, replicas)
                           for x in xrange(self._config.repeat)]
54 55 56 57
        finally:
            if self._config.threaded:
                from neo.tests.threaded import NEOCluster
                NEOCluster.unpatch()# XXX ugly
58 59 60 61 62 63 64 65 66 67
        results = {}
        for s in storages:
            results[s] = z = {}
            for r in replicas:
                if r < s:
                    x = [x[s][r] for x in result_list if x[s][r] is not None]
                    if x:
                        z[r] = max(x)
                    else:
                        z[r] = None
68 69 70 71 72
        return self.buildReport(storages, replicas, results)

    def runMatrix(self, storages, replicas):
        stats = {}
        for s in storages:
73 74 75 76
            stats[s] = z = {}
            for r in replicas:
                if r < s:
                    z[r] = self.runImport(1, s, r, 100)
77 78 79
        return stats

    def runImport(self, masters, storages, replicas, partitions):
80 81 82 83 84 85 86
        datafs = self._config.datafs
        if datafs:
            dfs_storage = FileStorage(file_name=self._config.datafs)
        else:
            datafs = 'PROD1'
            import random, neo.tests.stat_zodb
            dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
87
                random.Random(0)).as_storage(10000)
88
        print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
89
                datafs, masters, storages, replicas, partitions)
90
        # cluster
91
        kw = dict(
92 93 94 95 96 97
            db_list=['neot_matrix_%d' % i for i in xrange(storages)],
            clear_databases=True,
            partitions=partitions,
            replicas=replicas,
            verbose=self._config.verbose,
        )
98 99 100 101 102 103
        if self._config.threaded:
            from neo.tests.threaded import NEOCluster
            neo = NEOCluster(master_count=masters, **kw)
        else:
            from neo.tests.functional import NEOCluster
            neo = NEOCluster(master_node_count=masters, **kw)
104
        neo.start()
105 106
        neo_storage = neo.getZODBStorage()
        # import
107
        start = time()
108
        try:
109 110
            try:
                neo_storage.copyTransactionsFrom(dfs_storage)
111 112
                end = time()
                return dfs_storage.getSize() / ((end - start) * 1e3)
113 114
            except:
                traceback.print_exc()
115 116
                self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
                    masters, storages, replicas, partitions)
117
                self.error_log += "\n%s\n" % ''.join(traceback.format_exc())
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
                return None
        finally:
            neo.stop()

    def buildReport(self, storages, replicas, results):
        config = self._config
        self.add_status('Min storages', config.min_s)
        self.add_status('Max storages', config.max_s)
        self.add_status('Min replicas', config.min_r)
        self.add_status('Max replicas', config.max_r)
        # draw an array with results
        fmt = '|' + '|'.join(['  %8s  '] * (len(replicas) + 1)) + '|\n'
        sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n'
        report = sep
        report += fmt % tuple(['S\R'] + range(0, len(replicas)))
133
        report += sep
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        failures = 0
        speedlist = []
        for s in storages:
            values = []
            assert s in results
            for r in replicas:
                if r in results[s]:
                    if results[s][r] is None:
                        values.append('FAIL')
                        failures += 1
                    else:
                        values.append('%8.1f' % results[s][r])
                        speedlist.append(results[s][r])
                else:
                    values.append('N/A')
            report += fmt % (tuple([s] + values))
            report += sep
151
        report += self.error_log
152
        if failures:
153 154 155 156
            info = '%d failures' % (failures, )
        else:
            info = '%.1f KB/s' % (sum(speedlist) / len(speedlist))
        return info, report
157

158
def main(args=None):
159
    MatrixImportBenchmark().run()
160

161 162 163
if __name__ == "__main__":
    main()