#!/usr/bin/env python import sys import os import math import traceback from time import time from neo.tests import DB_PREFIX from neo.tests.benchmark import BenchmarkRunner from ZODB.FileStorage import FileStorage class MatrixImportBenchmark(BenchmarkRunner): error_log = '' _size = None def add_options(self, parser): parser.add_option('-d', '--datafs') parser.add_option('', '--min-storages', type='int', default=1) parser.add_option('', '--max-storages', type='int', default=2) parser.add_option('', '--min-replicas', type='int', default=0) parser.add_option('', '--max-replicas', type='int', default=1) parser.add_option('', '--threaded', action="store_true") def load_options(self, options, args): if options.datafs and not os.path.exists(options.datafs): sys.exit('Missing or wrong data.fs argument') return dict( datafs = options.datafs, min_s = options.min_storages, max_s = options.max_storages, min_r = options.min_replicas, max_r = options.max_replicas, threaded = options.threaded, ) def start(self): # build storage (logarithm) & replicas (linear) lists min_s, max_s = self._config.min_s, self._config.max_s min_r, max_r = self._config.min_r, self._config.max_r min_s2 = int(math.log(min_s, 2)) max_s2 = int(math.log(max_s, 2)) storages = [2 ** x for x in range(min_s2, max_s2 + 1)] if storages[0] < min_s: storages[0] = min_s if storages[-1] < max_s: storages.append(max_s) replicas = range(min_r, max_r + 1) result_list = [self.runMatrix(storages, replicas) for x in xrange(self._config.repeat)] results = {} for s in storages: results[s] = z = {} for r in replicas: if r < s: x = [x[s][r] for x in result_list if x[s][r] is not None] if x: z[r] = min(x) else: z[r] = None return self.buildReport(storages, replicas, results) def runMatrix(self, storages, replicas): stats = {} for s in storages: stats[s] = z = {} for r in replicas: if r < s: z[r] = self.runImport(1, s, r, 100) return stats def runImport(self, masters, storages, replicas, partitions): datafs = self._config.datafs if datafs: dfs_storage = FileStorage(file_name=self._config.datafs) else: datafs = 'PROD1' import random, neo.tests.stat_zodb dfs_storage = getattr(neo.tests.stat_zodb, datafs)( random.Random(0)).as_storage(5000) print "Import of %s with m=%s, s=%s, r=%s, p=%s" % ( datafs, masters, storages, replicas, partitions) if self._config.threaded: from neo.tests.threaded import NEOCluster else: from neo.tests.functional import NEOCluster neo = NEOCluster( db_list=['%s_matrix_%u' % (DB_PREFIX, i) for i in xrange(storages)], clear_databases=True, master_count=masters, partitions=partitions, replicas=replicas, verbose=self._config.verbose, ) neo.start() neo_storage = neo.getZODBStorage() if not self._config.threaded: assert len(neo.getStorageList()) == storages neo.expectOudatedCells(number=0) # import start = time() try: try: neo_storage.copyTransactionsFrom(dfs_storage) end = time() size = dfs_storage.getSize() if self._size is None: self._size = size else: assert self._size == size return end - start except: traceback.print_exc() self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % ( masters, storages, replicas, partitions) self.error_log += "\n%s\n" % ''.join(traceback.format_exc()) return None finally: neo.stop() def buildReport(self, storages, replicas, results): # draw an array with results dfs_size = self._size self.add_status('Input size', dfs_size and '%-.1f MB' % (dfs_size / 1e6) or 'N/A') fmt = '|' + '|'.join([' %8s '] * (len(replicas) + 1)) + '|\n' sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n' report = sep report += fmt % tuple(['S\R'] + range(0, len(replicas))) report += sep failures = 0 speedlist = [] for s in storages: values = [] assert s in results for r in replicas: if r in results[s]: result = results[s][r] if result is None: values.append('FAIL') failures += 1 else: result = dfs_size / (result * 1e3) values.append('%8.1f' % result) speedlist.append(result) else: values.append('N/A') report += fmt % (tuple([s] + values)) report += sep report += self.error_log if failures: info = '%d failures' % (failures, ) else: info = '%.1f KB/s' % (sum(speedlist) / len(speedlist)) return info, report def main(args=None): MatrixImportBenchmark().run() if __name__ == "__main__": main()